Complete examples

Sending data

The following demo application displays how dummy data can be sent with a frequency of 5 per second.

  • Java

  • Python

This example is also available on GitHub.

package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.domain.Config;
import io.streammachine.driver.serializer.SerializationType;
import io.streammachine.schemas.strmcatalog.clickstream.ClickstreamEvent;
import io.streammachine.schemas.strmcatalog.clickstream.Customer;
import io.streammachine.schemas.strmcatalog.clickstream.StrmMeta;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.Response;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

@Slf4j
public class Sender {
    public static void main(String[] args) throws InterruptedException {
        new Sender().run(args);
    }

    /**
     * start sending hardcoded avro events.
     *
     * @param args 3 parameters: [billingId, clientId, clientSecret]
     * @throws InterruptedException
     */
    private void run(String[] args) throws InterruptedException {
        if (args.length != 3) {
            System.out.println("Ensure that you've provided all required input arguments: [billingId, clientId, clientSecret]");
            System.exit(1);
        }

        var billingId = args[0];
        var clientId = args[1];
        var clientSecret = args[2];

        var config = Config.builder().build();
        StreamMachineClient client = StreamMachineClient.builder()
                .billingId(billingId)
                .clientId(clientId)
                .clientSecret(clientSecret)
                .config(config)
                .build();

        while (true) {
            var event = createAvroEvent();
            Response response = client.send(event, SerializationType.AVRO_BINARY).join();
            log.debug("{}: {}", response.getStatusCode(), response.getStatusText());

            if (response.getStatusCode() == 400) {
                // Try to change the value for the url field in the createAvroEvent method below to something that is not a url
                // You can see that the Stream Machine gateway rejects the
                // message, stating that the field does not match the regex
                // provided in resources/schema/avro/strm.json
                log.debug("Bad request: {}", response.getResponseBody());
            }

            Thread.sleep(200);
        }
    }

    /**
     * Generate a ClickstreamEvent from a Java class that corresponds with a the clickstream schema.
     * These Java classes are generated and provided by Streammachine, based on the
     * schema definition and the serialization schema.
     * <p>
     * This particular event corresponds to an Avro schema that you can see in resources/schema/avro/schema.avsc and resources/schema/avro/strm.json
     *
     * @return a {@link io.streammachine.driver.domain.StreamMachineEvent}
     */
    private static ClickstreamEvent createAvroEvent() {
        return ClickstreamEvent.newBuilder()
                .setAbTests(singletonList("abc"))
                .setEventType("button x clicked")
                .setCustomer(Customer.newBuilder()
                        .setId("some-identifier")
                        .build())
                .setReferrer("https://www.streammachine.io")
                .setUserAgent("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36")
                .setProducerSessionId("session-01")
                .setConversion(1)
                .setStrmMeta(StrmMeta.newBuilder()
                        .setTimestamp(System.currentTimeMillis())
                        .setSchemaId("clickstream")
                        .setNonce(0)
                        .setConsentLevels(asList(0, 1, 2))
                        .build())
                .setUrl("https://portal.streammachine.io")
                .build();
    }
}

This example is also available on GitHub.

import asyncio
import logging
import sys

from clickstream.io.streammachine.schemas.strmcatalog.clickstream import ClickstreamEvent
from streammachine.driver import StreamMachineClient, ClientConfig, StreamMachineEvent, current_time_millis, \
    SerializationType

from args import StreamMachineProperties

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)


class Sender(object):
    """
    An Asynchronous generator that periodically creates an event and sends it to streammachine
    """

    def __init__(self, billing_id, client_id, client_secret):
        self._config = ClientConfig(log_level=logging.DEBUG)
        self._client = StreamMachineClient(billing_id, client_id, client_secret, self._config)
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)

    def __aiter__(self):
        return self

    async def __anext__(self):
        event = create_avro_event()
        return await self._client.send(event, SerializationType.AVRO_BINARY)

    async def start_timers(self):
        await self._client.start_timers()


def create_avro_event() -> StreamMachineEvent:
    """
    create a dummy event
    :return:
    """
    event = ClickstreamEvent()
    event.abTests = ["abc"]
    event.eventType = "button x clicked"
    event.customer.id = "integration-test"
    event.referrer = "https://www.streammachine.io"
    event.userAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36"
    event.producerSessionId = "session-01"
    event.conversion = 1

    event.strmMeta.timestamp = current_time_millis()
    event.strmMeta.schemaId = "clickstream"
    event.strmMeta.nonce = 0
    event.strmMeta.consentLevels = [0, 1, 2]
    event.url = "https://portal.streammachine.io"

    return event


async def main(props):
    sender = Sender(props.billing_id, props.client_id, props.client_secret)
    await sender.start_timers()  # re-authorization jwt tokens

    async for response in sender:
        if response == 204:  # event correctly accepted by endpoint
            log.info(f"Event sent, response {response}")
        else:
            log.error(f"Something went wrong while trying to send event to Stream Machine, response: {response}")

        await asyncio.sleep(0.2)


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stderr)
    asyncio.run(main(StreamMachineProperties.from_args()))

Receiving data

And the application shown below, displays how the data can be received through server sent events.

  • Java

  • Python

This example is also available on GitHub.

package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.domain.Config;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.Response;
import org.asynchttpclient.ws.WebSocket;
import org.asynchttpclient.ws.WebSocketListener;

@Slf4j
public class Receiver {
    public static void main(String[] args) {
        new Receiver().run(args);
    }

    private void run(String[] args) {
        if (args.length != 3) {
            System.out.println("Ensure that you've provided all required input arguments: [billingId, clientId, clientSecret]");
            System.exit(1);
        }

        var billingId = args[0];
        var clientId = args[1];
        var clientSecret = args[2];

        var config = Config.builder().build();

        StreamMachineClient client = StreamMachineClient.builder()
                .billingId(billingId)
                .clientId(clientId)
                .clientSecret(clientSecret)
                .config(config)
                .build();

        Response isAlive = client.egressIsAlive();
        log.debug("{}: {}", isAlive.getStatusCode(), isAlive.getStatusText());

        client.startReceivingWs(true, new StreamMachineEventWsListener());
    }

    private static class StreamMachineEventWsListener implements WebSocketListener {

        @Override
        public void onOpen(WebSocket websocket) {
            log.info("Opened websocket connection...");
        }

        @Override
        public void onClose(WebSocket websocket, int code, String reason) {
            log.info("Closed websocked connection...");
        }

        @Override
        public void onTextFrame(String payload, boolean finalFragment, int rsv) {
            System.out.println(payload);
        }

        @Override
        public void onError(Throwable t) {
            log.error("An error occurred", t);
        }
    }

}

This example is also available on GitHub.

import asyncio
import json
import logging
import sys

from streammachine.driver import StreamMachineClient, ClientConfig

from args import StreamMachineProperties


async def event_handler(event):
    """
    callback handler for the events from Streammachine

    :param event: this is either json event, or a base64 encoded avro binary, depending on the
    first parameter of StreamMachineClient.start_receiving_sse.

    Decoding an Avro binary is not yet supported.
    """
    print(json.loads(event))


async def main(props):
    """
    Your async main code that instantiates the client, starts its re-authorization timer, and installs a callback
    """
    config = ClientConfig(log_level=logging.DEBUG)
    client = StreamMachineClient(props.billing_id, props.client_id, props.client_secret, config)
    await client.start_timers()
    await client.start_receiving_sse(True, event_handler)


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stderr)
    asyncio.run(main(StreamMachineProperties.from_args()))

Configuration properties

The default configuration properties are shown below. If necessary or requested, you can change them in the builder for the Config that the StreamMachineClient requires.

Property Value

Gateway Protocol

https

Gateway Host

in.strm.services

Gateway Endpoint

/event

Egress Protocol

https

Egress Host

out.strm.services

Egress Endpoint

/sse

Authentication Protocol

https

Authentication Host

auth.strm.services

Authentication Endpoint

/auth

Authentication Refresh Endpoint

/refresh