Complete examples

This page aims to provide full examples, from sending data to Stream Machine to receiving the data through the Egress.

These examples are meant for demonstration purposes and use the Egress interface to receive events. For production ready interfaces, we refer to the Batch and Kafka exporters.

In order to run these examples, you need the following:

  • An input stream to send data to (if you don’t know how, click here to learn how to create streams)

  • The credentials for this stream (presented upon creation)

  • The billing id for your account (you can find your billing id via the CLI or in the portal)

Please see the readme of the examples presented below on how to provide the credentials to the example code.

Sending data

The following demo application displays how dummy data can be sent with a certain frequency. The data that is sent is quite static and does not result in any useful patterns for analysis, however, it does show how data can be constructed and tranferred to Stream Machine.

  • Java

  • Python

  • NodeJS

java driver clickstream

This example is also available on GitHub. Please see the repository for the readme.

package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.serializer.SerializationType;
import io.streammachine.schemas.demo.v1.DemoEvent;
import io.streammachine.schemas.demo.v1.StrmMeta;
import org.slf4j.Logger;

import java.util.Random;
import java.util.UUID;

import static java.util.Collections.singletonList;
import static org.slf4j.LoggerFactory.getLogger;

public class Sender {

    private static final Logger LOG = getLogger(Sender.class);

    private static Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        new Sender().run(args);
    }

    /**
     * Generate a DemoEvent from a Java class that corresponds with a the streammachine/demo/1.0.0 schema.
     * These Java classes are generated and provided by Stream Machine, based on the
     * serialization schema.
     * <p>
     *
     * @return a {@link io.streammachine.schemas.StreamMachineEvent}
     */
    private static DemoEvent createAvroEvent() {
        int consentLevel = RANDOM.nextInt(4);

        return DemoEvent.newBuilder()
                .setStrmMeta(StrmMeta.newBuilder()
                        .setEventContractRef("streammachine/example/1.2.3")
                        .setConsentLevels(singletonList(consentLevel))
                        .build())
                .setUniqueIdentifier(UUID.randomUUID().toString())
                .setSomeSensitiveValue("A value that should be encrypted")
                .setConsistentValue("a-user-session")
                .setNotSensitiveValue("Hello from Java")
                .build();
    }

    /**
     * start sending hardcoded avro events.
     *
     * @param args 3 parameters: [billingId, clientId, clientSecret]
     * @throws InterruptedException
     */
    private void run(String[] args) throws InterruptedException {
        StreamMachineClient client = ClientBuilder.createStreamMachineClient(args);

        while (true) {
            var event = createAvroEvent();

            client.send(event, SerializationType.AVRO_BINARY)
                  .whenComplete((response, exception) -> {
                      if (exception != null) {
                          LOG.error("An exception occurred while trying to send an event to Stream Machine", exception);
                      }

                      if (response.getStatus() == 204) {
                          LOG.debug("{}", response.getStatus());
                      } else if (response.getStatus() == 400) {
                          // Try to change the value for the url field in the createAvroEvent method below to something that is not a url
                          // You can see that the Stream Machine gateway rejects the
                          // message, stating that the field does not match the regex
                          // provided in resources/schema/avro/strm.json
                          LOG.debug("Bad request: {}", response.getContentAsString());
                      }
                  });

            Thread.sleep(500);
        }
    }
}

streammachine driver streammachine schemas clickstream avro

This example is also available on GitHub. Please see the repository for the readme.

404: Not Found

nodejs driver

This example is also available on GitHub. Please see the repository for the readme.

import { Sender } from '@streammachine.io/nodejs-driver';
import { DemoEvent } from '@streammachine.io/schemas-demo-avro';

const CONFIG = require('../assets/config.json');

// Copy the template credentials file, name it `credentials.json` and fill out the values
const CREDENTIALS = require('../assets/credentials.json');

const startSender = async () => {
  // Note: the schema id is hard coded in this example, in the config.json. This will be dynamically determined in a future version
  const sender = new Sender({
    ...CONFIG,
    ...CREDENTIALS,
  });

  // Make sure to listen for error events, otherwise Node does not handle the error events (they're escalated)
  sender.on('error', (error) => {
    console.log('Sender', error);
  });

  await sender.connect().catch((e) => {
    console.error(`Connect error ${e}`, e);
  });

  setInterval(async function () {
    try {
      const r = await sender.send(createEvent(), 'AVRO_BINARY');

      console.log(`Status ${r.status}`);
      if (r.status !== 204) {
        console.error(`An error occurred while sending event:`, r);
      }
    } catch (e) {
      console.error(`Error: ${e.message}`, e);
    }
  }, 100);
};

const createEvent = () => {
  const event = new DemoEvent();
  event.strmMeta = {
    eventContractRef: 'streammachine/example/1.2.3',
    consentLevels: [0],
  };

  event.uniqueIdentifier = 'string';
  event.someSensitiveValue = 'A value that should be encrypted';
  event.consistentValue = 'a-user-session';
  event.notSensitiveValue = 'Hello from NodeJS';

  return event;
};

startSender();

Receiving data

And the application shown below, displays how the data can be received through the Egress, which is transferred over Websocket.

  • Java

  • Python

  • NodeJS

java driver clickstream

This example is also available on GitHub. Please see the repository for the readme.

package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.common.WebSocketConsumer;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.websocket.api.Session;
import org.slf4j.Logger;

import static org.slf4j.LoggerFactory.getLogger;

public class Receiver {

    private static final Logger LOG = getLogger(Receiver.class);

    public static void main(String[] args) {
        new Receiver().run(args);
    }

    private void run(String[] args) {
        StreamMachineClient client = ClientBuilder.createStreamMachineClient(args);

        try {
            ContentResponse isAlive = client.egressIsAlive();

            LOG.debug("{}: {}", isAlive.getStatus(), isAlive.getReason());

            client.startReceivingWs(true, new StreamMachineEventWsListener());
        } catch (Exception e) {
            LOG.error("Exception checking isAlive: " + e.getMessage(), e);
            client.stop();
        }
    }

    private static class StreamMachineEventWsListener extends WebSocketConsumer {
        @Override
        public void onWebSocketClose(int statusCode, String reason) {
            LOG.info("Closed websocked connection...");
            LOG.info("Code: {}", statusCode);
            LOG.info("Reason: {}", reason);
        }

        @Override
        public void onWebSocketConnect(Session sess) {
            LOG.info("Opened websocket connection...");
        }

        @Override
        public void onWebSocketError(Throwable cause) {
            LOG.error("An error occurred", cause);
        }

        @Override
        public void onWebSocketText(String message) {
            System.out.println(message);
        }
    }
}

streammachine driver streammachine schemas clickstream avro

This example is also available on GitHub. Please see the repository for the readme.

import asyncio
import json
import logging
import sys

from args import StreamMachineProperties
from client_builder import ClientBuilder


async def event_handler(event):
    """
    callback handler for the events from Streammachine

    :param event: this is either json event, or a base64 encoded avro binary, depending on the
    first parameter of StreamMachineClient.start_receiving_sse.

    Decoding an Avro binary is not yet supported.
    """
    print(json.loads(event))


async def main(props):
    """
    Your async main code that instantiates the client, starts its re-authorization timer, and installs a callback
    """
    client = ClientBuilder.create_stream_machine_client()
    await client.start_timers()
    await client.start_receiving_ws(True, event_handler)


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stderr)
    asyncio.run(main(StreamMachineProperties.from_args()))

nodejs driver

This example is also available on GitHub. Please see the repository for the readme.

import { Receiver } from '@streammachine.io/nodejs-driver';

const CONFIG = require('../assets/config.json');
const CREDENTIALS = require('../assets/credentials.json');

async function startReceiver() {
  const receiver = new Receiver({
    ...CREDENTIALS,
    ...CONFIG,
  });

  receiver.on('event', (event) => {
    console.log(JSON.stringify(event));
  });

  receiver.on('error', (error) => {
    console.log('Error', error);
  });

  try {
    await receiver.connect();
  } catch (error) {
    console.log('Connect failed', error);
  }
}

startReceiver();

Configuration properties

The default configuration properties are shown below. If necessary or requested, you can provide the properties to your Stream Machine client.

Property Value

Gateway Scheme

https

Gateway Port

443

Gateway Host

in.strm.services

Gateway Endpoint

/event

Egress Scheme

https

Egress Host

out.strm.services

Egress Endpoint

/ws

Authentication Scheme

https

Authentication Host

auth.strm.services

Authentication Endpoint

/auth

Authentication Refresh Endpoint

/refresh