Complete examples

This page aims to provide full examples, from sending data to Stream Machine to receiving the data through the Egress.

These examples are meant for demonstration purposes and use the Egress interface to receive events. For production ready interfaces, we refer to the Batch and Kafka exporters.

In order to run these examples, you need the following:

  • An input stream to send data to (if you don’t know how, click here to learn how to create streams)

  • The credentials for this stream (presented upon creation)

  • The billing id for your account (you can find your billing id via the CLI or in the portal)

Please see the readme of the examples presented below on how to provide the credentials to the example code.

Sending data

The following demo application displays how dummy data can be sent with a certain frequency. The data that is sent is quite static and does not result in any useful patterns for analysis, however, it does show how data can be constructed and tranferred to Stream Machine.

  • Java

  • Python

  • NodeJS

java driver clickstream

This example is also available on GitHub. Please see the repository for the readme.

package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.domain.Config;
import io.streammachine.driver.serializer.SerializationType;
import io.streammachine.public_schemas.clickstream.ClickstreamEvent;
import io.streammachine.public_schemas.clickstream.Customer;
import io.streammachine.public_schemas.clickstream.StrmMeta;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;

import static java.util.Collections.singletonList;

@Slf4j
public class Sender {
    private static Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        new Sender().run(args);
    }

    /**
     * start sending hardcoded avro events.
     *
     * @param args 3 parameters: [billingId, clientId, clientSecret]
     * @throws InterruptedException
     */
    private void run(String[] args) throws InterruptedException {
        if (args.length != 3) {
            System.out.println("Ensure that you've provided all required input arguments: [billingId, clientId, clientSecret]");
            System.exit(1);
        }

        var billingId = args[0];
        var clientId = args[1];
        var clientSecret = args[2];

        var config = Config.builder().build();
        StreamMachineClient client = StreamMachineClient.builder()
                .billingId(billingId)
                .clientId(clientId)
                .clientSecret(clientSecret)
                .config(config)
                .build();

        while (true) {
            var event = createAvroEvent();

            client.send(event, SerializationType.AVRO_BINARY)
                    .whenComplete((response, exception) -> {
                        if (exception != null) {
                            log.error("An exception occurred while trying to send an event to Stream Machine", exception);
                        }

                        if (response.getStatus() == 204) {
                            log.debug("{}", response.getStatus());
                        } else if (response.getStatus() == 400) {
                            // Try to change the value for the url field in the createAvroEvent method below to something that is not a url
                            // You can see that the Stream Machine gateway rejects the
                            // message, stating that the field does not match the regex
                            // provided in resources/schema/avro/strm.json
                            log.debug("Bad request: {}", response.getContentAsString());
                        }
                    });

            Thread.sleep(500);
        }
    }

    /**
     * Generate a ClickstreamEvent from a Java class that corresponds with a the clickstream schema.
     * These Java classes are generated and provided by Streammachine, based on the
     * schema definition and the serialization schema.
     * <p>
     * This particular event corresponds to an Avro schema that you can see in resources/schema/avro/schema.avsc and resources/schema/avro/strm.json
     *
     * @return a {@link io.streammachine.schemas.StreamMachineEvent}
     */
    private static ClickstreamEvent createAvroEvent() {
        int consentLevel = RANDOM.nextBoolean() ? 1 : 0;

        return ClickstreamEvent.newBuilder()
                .setAbTests(singletonList("abc"))
                .setEventType("button x clicked")
                .setCustomer(Customer.newBuilder()
                        .setId("some-identifier")
                        .build())
                .setReferrer("https://www.streammachine.io")
                .setUserAgent("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36")
                .setProducerSessionId("session-01")
                .setConversion(1)
                .setStrmMeta(StrmMeta.newBuilder()
                        .setTimestamp(System.currentTimeMillis())
                        .setSchemaId("clickstream")
                        .setNonce(0)
                        .setConsentLevels(singletonList(consentLevel))
                        .build())
                .setUrl("https://portal.streammachine.io")
                .build();
    }
}

streammachine driver streammachine schemas clickstream avro

This example is also available on GitHub. Please see the repository for the readme.

import asyncio
import logging
import sys

from clickstream.io.streammachine.public_schemas.clickstream import ClickstreamEvent

from streammachine.driver import StreamMachineClient, ClientConfig, StreamMachineEvent, current_time_millis, \
    SerializationType

from args import StreamMachineProperties

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)


class Sender(object):
    """
    An Asynchronous generator that periodically creates an event and sends it to streammachine
    """

    def __init__(self, billing_id, client_id, client_secret):
        self._config = ClientConfig(log_level=logging.DEBUG)
        self._client = StreamMachineClient(billing_id, client_id, client_secret, self._config)
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)

    def __aiter__(self):
        return self

    async def __anext__(self):
        event = create_avro_event()
        return await self._client.send(event, SerializationType.AVRO_BINARY)

    async def start_timers(self):
        await self._client.start_timers()


def create_avro_event() -> StreamMachineEvent:
    """
    create a dummy event
    :return:
    """
    event = ClickstreamEvent()
    event.abTests = ["abc"]
    event.eventType = "button x clicked"
    event.customer.id = "integration-test"
    event.referrer = "https://www.streammachine.io"
    event.userAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36"
    event.producerSessionId = "session-01"
    event.conversion = 1

    event.strmMeta.timestamp = current_time_millis()
    event.strmMeta.schemaId = "clickstream"
    event.strmMeta.nonce = 0
    event.strmMeta.consentLevels = [0, 1, 2]
    event.url = "https://portal.streammachine.io"

    return event


async def main(props):
    sender = Sender(props.billing_id, props.client_id, props.client_secret)
    await sender.start_timers()  # re-authorization jwt tokens

    async for response in sender:
        if response == 204:  # event correctly accepted by endpoint
            log.info(f"Event sent, response {response}")
        else:
            log.error(f"Something went wrong while trying to send event to Stream Machine, response: {response}")

        await asyncio.sleep(0.2)


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stderr)
    asyncio.run(main(StreamMachineProperties.from_args()))

nodejs driver

This example is also available on GitHub. Please see the repository for the readme.

import {Type} from "avsc";
import * as fs from "fs";
import {ClientStreamEvent, Sender} from "@streammachine.io/nodejs-driver";

const CONFIG = require("../assets/config.json");

// Copy the template credentials file, name it `credentials.json` and fill out the values
const CREDENTIALS = require("../assets/credentials.json");

// Hard coded schema for this example
const SCHEMA = JSON.parse(fs.readFileSync("assets/clickstream.avsc", "utf-8"));

async function startSender() {
    // Note: the schema id is hard coded in this example, in the config.json. This will be dynamically determined in a future version
    const sender = new Sender({
        ...CONFIG,
        ...CREDENTIALS,
        type: Type.forSchema(SCHEMA),
    });

    // Make sure to listen for error events, otherwise Node does not handle the error events (they're escalated)
    sender.on("error", (error) => {
        console.log("Sender", error);
    });

    await sender.connect()
        .catch(e => {
            console.error(`Connect error ${e}`);
        });

    setInterval(async function () {
        try {
            const r = await sender.send(EVENT);

            if (r.status !== 204) {
                console.debug(`An error occurred while sending event:`, r)
            }
        } catch (e) {
            console.error(`Error: ${e}`);
        }
    }, 100);
}

// This interface is only required, if a schema is used like this (through a file). A future example will also contain generated classes from the schema, which simplifies this example even more.
interface MyStreamEvent extends ClientStreamEvent {
    abTests: string[];
    eventType: string;
    customer: { id: string };
    referrer: string;
    userAgent: string;
    producerSessionId: string;
    conversion: number;
    url: string;
}

const EVENT: MyStreamEvent = {
    abTests: ["abc"],
    eventType: "button x clicked",
    customer: {id: "customer-id"},
    referrer: "https://www.streammachine.io",
    userAgent: "node-js",
    producerSessionId: "prodsesid",
    conversion: 1,
    url: "https://portal.streammachine.io/",
    strmMeta: {
        // the other fields are filled in by the Stream Machine Client
        consentLevels: [0, 1, 2],
    },
};

async function sleep(millis) {
    return new Promise(resolve => setTimeout(resolve, millis));
}

startSender();

Receiving data

And the application shown below, displays how the data can be received through the Egress, which is transferred over Websocket.

  • Java

  • Python

  • NodeJS

java driver clickstream

This example is also available on GitHub. Please see the repository for the readme.

package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.common.WebSocketConsumer;
import io.streammachine.driver.domain.Config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.websocket.api.Session;

@Slf4j
public class Receiver {
    public static void main(String[] args) {
        new Receiver().run(args);
    }

    private void run(String[] args) {
        if (args.length != 3) {
            System.out.println("Ensure that you've provided all required input arguments: [billingId, clientId, clientSecret]");
            System.exit(1);
        }

        var billingId = args[0];
        var clientId = args[1];
        var clientSecret = args[2];

        var config = Config.builder().build();
        StreamMachineClient client = StreamMachineClient.builder()
                .billingId(billingId)
                .clientId(clientId)
                .clientSecret(clientSecret)
                .config(config)
                .build();

        ContentResponse isAlive = client.egressIsAlive();

        log.debug("{}: {}", isAlive.getStatus(), isAlive.getReason());

        client.startReceivingWs(true, new StreamMachineEventWsListener());
    }

    private static class StreamMachineEventWsListener extends WebSocketConsumer {
        @Override
        public void onWebSocketClose(int statusCode, String reason) {
            log.info("Closed websocked connection...");
            log.info("Code: {}", statusCode);
            log.info("Reason: {}", reason);
        }

        @Override
        public void onWebSocketConnect(Session sess) {
            log.info("Opened websocket connection...");
        }

        @Override
        public void onWebSocketError(Throwable cause) {
            log.error("An error occurred", cause);
        }

        @Override
        public void onWebSocketText(String message) {
            System.out.println(message);
        }
    }
}

streammachine driver streammachine schemas clickstream avro

This example is also available on GitHub. Please see the repository for the readme.

import asyncio
import json
import logging
import sys

from streammachine.driver import StreamMachineClient, ClientConfig

from args import StreamMachineProperties


async def event_handler(event):
    """
    callback handler for the events from Streammachine

    :param event: this is either json event, or a base64 encoded avro binary, depending on the
    first parameter of StreamMachineClient.start_receiving_sse.

    Decoding an Avro binary is not yet supported.
    """
    print(json.loads(event))


async def main(props):
    """
    Your async main code that instantiates the client, starts its re-authorization timer, and installs a callback
    """
    config = ClientConfig(log_level=logging.DEBUG)
    client = StreamMachineClient(props.billing_id, props.client_id, props.client_secret, config)
    await client.start_timers()
    await client.start_receiving_ws(True, event_handler)


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stderr)
    asyncio.run(main(StreamMachineProperties.from_args()))

nodejs driver

This example is also available on GitHub. Please see the repository for the readme.

import {Receiver} from "@streammachine.io/nodejs-driver";

const CONFIG = require("../assets/config.json");
const CREDENTIALS = require("../assets/credentials.json");

async function startReceiver() {
    const receiver = new Receiver({
        ...CREDENTIALS,
        ...CONFIG
    });

    receiver.on("event", (event) => {
        console.log(JSON.stringify(event));
    });

    receiver.on("error", (error) => {
        console.log("Receiver", error);
    });

    try {
        await receiver.connect();
    } catch (error) {
        console.log("Connect failed", error);
    }
}

startReceiver();

Configuration properties

The default configuration properties are shown below. If necessary or requested, you can provide the properties to your Stream Machine client.

Property Value

Gateway Scheme

https

Gateway Port

443

Gateway Host

in.strm.services

Gateway Endpoint

/event

Egress Scheme

https

Egress Host

out.strm.services

Egress Endpoint

/ws

Authentication Scheme

https

Authentication Host

auth.strm.services

Authentication Endpoint

/auth

Authentication Refresh Endpoint

/refresh