Complete examples
Sending data
The following demo application displays how dummy data can be sent with a frequency of 5 per second.
This example is also available on GitHub.
package io.streammachine.examples;
import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.domain.Config;
import io.streammachine.driver.serializer.SerializationType;
import io.streammachine.schemas.strmcatalog.clickstream.ClickstreamEvent;
import io.streammachine.schemas.strmcatalog.clickstream.Customer;
import io.streammachine.schemas.strmcatalog.clickstream.StrmMeta;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.Response;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@Slf4j
public class Sender {
public static void main(String[] args) throws InterruptedException {
new Sender().run(args);
}
/**
* start sending hardcoded avro events.
*
* @param args 3 parameters: [billingId, clientId, clientSecret]
* @throws InterruptedException
*/
private void run(String[] args) throws InterruptedException {
if (args.length != 3) {
System.out.println("Ensure that you've provided all required input arguments: [billingId, clientId, clientSecret]");
System.exit(1);
}
var billingId = args[0];
var clientId = args[1];
var clientSecret = args[2];
var config = Config.builder().build();
StreamMachineClient client = StreamMachineClient.builder()
.billingId(billingId)
.clientId(clientId)
.clientSecret(clientSecret)
.config(config)
.build();
while (true) {
var event = createAvroEvent();
Response response = client.send(event, SerializationType.AVRO_BINARY).join();
log.debug("{}: {}", response.getStatusCode(), response.getStatusText());
if (response.getStatusCode() == 400) {
// Try to change the value for the url field in the createAvroEvent method below to something that is not a url
// You can see that the Stream Machine gateway rejects the
// message, stating that the field does not match the regex
// provided in resources/schema/avro/strm.json
log.debug("Bad request: {}", response.getResponseBody());
}
Thread.sleep(200);
}
}
/**
* Generate a ClickstreamEvent from a Java class that corresponds with a the clickstream schema.
* These Java classes are generated and provided by Streammachine, based on the
* schema definition and the serialization schema.
* <p>
* This particular event corresponds to an Avro schema that you can see in resources/schema/avro/schema.avsc and resources/schema/avro/strm.json
*
* @return a {@link io.streammachine.driver.domain.StreamMachineEvent}
*/
private static ClickstreamEvent createAvroEvent() {
return ClickstreamEvent.newBuilder()
.setAbTests(singletonList("abc"))
.setEventType("button x clicked")
.setCustomer(Customer.newBuilder()
.setId("some-identifier")
.build())
.setReferrer("https://www.streammachine.io")
.setUserAgent("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36")
.setProducerSessionId("session-01")
.setConversion(1)
.setStrmMeta(StrmMeta.newBuilder()
.setTimestamp(System.currentTimeMillis())
.setSchemaId("clickstream")
.setNonce(0)
.setConsentLevels(asList(0, 1, 2))
.build())
.setUrl("https://portal.streammachine.io")
.build();
}
}
This example is also available on GitHub.
import asyncio
import logging
import sys
from clickstream.io.streammachine.schemas.strmcatalog.clickstream import ClickstreamEvent
from streammachine.driver import StreamMachineClient, ClientConfig, StreamMachineEvent, current_time_millis, \
SerializationType
from args import StreamMachineProperties
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class Sender(object):
"""
An Asynchronous generator that periodically creates an event and sends it to streammachine
"""
def __init__(self, billing_id, client_id, client_secret):
self._config = ClientConfig(log_level=logging.DEBUG)
self._client = StreamMachineClient(billing_id, client_id, client_secret, self._config)
self._logger = logging.getLogger(__name__)
self._logger.setLevel(logging.DEBUG)
def __aiter__(self):
return self
async def __anext__(self):
event = create_avro_event()
return await self._client.send(event, SerializationType.AVRO_BINARY)
async def start_timers(self):
await self._client.start_timers()
def create_avro_event() -> StreamMachineEvent:
"""
create a dummy event
:return:
"""
event = ClickstreamEvent()
event.abTests = ["abc"]
event.eventType = "button x clicked"
event.customer.id = "integration-test"
event.referrer = "https://www.streammachine.io"
event.userAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36"
event.producerSessionId = "session-01"
event.conversion = 1
event.strmMeta.timestamp = current_time_millis()
event.strmMeta.schemaId = "clickstream"
event.strmMeta.nonce = 0
event.strmMeta.consentLevels = [0, 1, 2]
event.url = "https://portal.streammachine.io"
return event
async def main(props):
sender = Sender(props.billing_id, props.client_id, props.client_secret)
await sender.start_timers() # re-authorization jwt tokens
async for response in sender:
if response == 204: # event correctly accepted by endpoint
log.info(f"Event sent, response {response}")
else:
log.error(f"Something went wrong while trying to send event to Stream Machine, response: {response}")
await asyncio.sleep(0.2)
if __name__ == '__main__':
logging.basicConfig(stream=sys.stderr)
asyncio.run(main(StreamMachineProperties.from_args()))
Receiving data
And the application shown below, displays how the data can be received through server sent events.
This example is also available on GitHub.
package io.streammachine.examples;
import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.domain.Config;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.Response;
import org.asynchttpclient.ws.WebSocket;
import org.asynchttpclient.ws.WebSocketListener;
@Slf4j
public class Receiver {
public static void main(String[] args) {
new Receiver().run(args);
}
private void run(String[] args) {
if (args.length != 3) {
System.out.println("Ensure that you've provided all required input arguments: [billingId, clientId, clientSecret]");
System.exit(1);
}
var billingId = args[0];
var clientId = args[1];
var clientSecret = args[2];
var config = Config.builder().build();
StreamMachineClient client = StreamMachineClient.builder()
.billingId(billingId)
.clientId(clientId)
.clientSecret(clientSecret)
.config(config)
.build();
Response isAlive = client.egressIsAlive();
log.debug("{}: {}", isAlive.getStatusCode(), isAlive.getStatusText());
client.startReceivingWs(true, new StreamMachineEventWsListener());
}
private static class StreamMachineEventWsListener implements WebSocketListener {
@Override
public void onOpen(WebSocket websocket) {
log.info("Opened websocket connection...");
}
@Override
public void onClose(WebSocket websocket, int code, String reason) {
log.info("Closed websocked connection...");
}
@Override
public void onTextFrame(String payload, boolean finalFragment, int rsv) {
System.out.println(payload);
}
@Override
public void onError(Throwable t) {
log.error("An error occurred", t);
}
}
}
This example is also available on GitHub.
import asyncio
import json
import logging
import sys
from streammachine.driver import StreamMachineClient, ClientConfig
from args import StreamMachineProperties
async def event_handler(event):
"""
callback handler for the events from Streammachine
:param event: this is either json event, or a base64 encoded avro binary, depending on the
first parameter of StreamMachineClient.start_receiving_sse.
Decoding an Avro binary is not yet supported.
"""
print(json.loads(event))
async def main(props):
"""
Your async main code that instantiates the client, starts its re-authorization timer, and installs a callback
"""
config = ClientConfig(log_level=logging.DEBUG)
client = StreamMachineClient(props.billing_id, props.client_id, props.client_secret, config)
await client.start_timers()
await client.start_receiving_sse(True, event_handler)
if __name__ == '__main__':
logging.basicConfig(stream=sys.stderr)
asyncio.run(main(StreamMachineProperties.from_args()))
Configuration properties
The default configuration properties are shown below. If necessary or requested, you can change them in the builder for the Config
that the StreamMachineClient
requires.
Property | Value |
---|---|
Gateway Protocol |
|
Gateway Host |
|
Gateway Endpoint |
|
Egress Protocol |
|
Egress Host |
|
Egress Endpoint |
|
Authentication Protocol |
|
Authentication Host |
|
Authentication Endpoint |
|
Authentication Refresh Endpoint |
|