Complete examples

This section helps you to set up a stream and start sending data to Stream Machine.

Setting up a stream

This section assumes that you have created an account on the portal.

Using the programming language examples.

In order to run these examples, you need the following:

  • An input stream to send data to (if you don’t know how, go here to learn how to create streams)

  • The credentials for this stream (presented upon stream creation). Either keep note of the returned values from the strm create stream command, or use --save flag to store them in the ~/.config/stream-machine/Stream directory.

The following demo applications show how dummy data can be sent with a certain frequency. The data that is sent is quite static and does not result in any useful patterns for analysis, however, it does show how data can be constructed and tranferred to Stream Machine.

Currently (Aug. 2021) every example language has a different configuration file format. This is incovenient and will be fixed. We aim to standardize this to the format created with strm create stream (stream-name) --save, so that getting up-and-running becomes easier.
  • Java

  • Python

  • NodeJS

java driver demo avro

This example is also available on GitHub. Please see the repository for the readme.

Short steps to start sending data:

git clone https://github.com/streammachineio/java-examples
cd java-examples
strm create stream demo --save
f=~/.config/stream-machine/saved-entities/Stream/demo.json
billingId=$(cat $f | jq -r '.ref.billingId')
clientId=$(cat $f | jq -r '.credentials[0].clientId')
clientSecret=$(cat $f | jq -r '.credentials[0].clientSecret')
mvn package
java -jar target/java-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
  $billingId $clientId $clientSecret
org.eclipse.jetty.util.log                  - Logging initialized ...
io.streammachine.driver.client.AuthService  - Initializing a new Auth Provider
io.streammachine.examples.Sender            - 204
io.streammachine.examples.Sender            - 204
io.streammachine.examples.Sender            - 204

...
Sender.java
package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.serializer.SerializationType;
import io.streammachine.schemas.demo.v1.DemoEvent;
import io.streammachine.schemas.demo.v1.StrmMeta;
import org.slf4j.Logger;

import java.util.Random;
import java.util.UUID;

import static java.util.Collections.singletonList;
import static org.slf4j.LoggerFactory.getLogger;

public class Sender {

    private static final Logger LOG = getLogger(Sender.class);

    private static Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        new Sender().run(args);
    }

    /**
     * Generate a DemoEvent from a Java class that corresponds with a the streammachine/demo/1.0.0 schema.
     * These Java classes are generated and provided by Stream Machine, based on the
     * serialization schema.
     * <p>
     *
     * @return a {@link io.streammachine.schemas.StreamMachineEvent}
     */
    private static DemoEvent createAvroEvent() {
        int consentLevel = RANDOM.nextInt(4);

        return DemoEvent.newBuilder()
                .setStrmMeta(StrmMeta.newBuilder()
                        .setEventContractRef("streammachine/example/1.2.3")
                        .setConsentLevels(singletonList(consentLevel))
                        .build())
                .setUniqueIdentifier(UUID.randomUUID().toString())
                .setSomeSensitiveValue("A value that should be encrypted")
                .setConsistentValue("a-user-session")
                .setNotSensitiveValue("Hello from Java")
                .build();
    }

    /**
     * start sending hardcoded avro events.
     *
     * @param args 3 parameters: [billingId, clientId, clientSecret]
     * @throws InterruptedException
     */
    private void run(String[] args) throws InterruptedException {
        StreamMachineClient client = ClientBuilder.createStreamMachineClient(args);

        while (true) {
            var event = createAvroEvent();

            client.send(event, SerializationType.AVRO_BINARY)
                  .whenComplete((response, exception) -> {
                      if (exception != null) {
                          LOG.error("An exception occurred while trying to send an event to Stream Machine", exception);
                      }

                      if (response.getStatus() == 204) {
                          LOG.debug("{}", response.getStatus());
                      } else if (response.getStatus() == 400) {
                          // Try to change the value for the url field in the createAvroEvent method below to something that is not a url
                          // You can see that the Stream Machine gateway rejects the
                          // message, stating that the field does not match the regex
                          // provided in resources/schema/avro/strm.json
                          LOG.debug("Bad request: {}", response.getContentAsString());
                      }
                  });

            Thread.sleep(500);
        }
    }
}

streammachine driver streammachine schemas clickstream avro

This example is also available on GitHub. Please see the repository for the readme.

Short steps to start sending data

git clone https://github.com/streammachineio/python-examples
cd python-examples
python3 -m venv .venv
. .venv/bin/activate
python3 -m pip install -r requirements.txt
strm create stream demo --save
f=~/.config/stream-machine/saved-entities/Stream/demo.json
billingId=$(cat $f | jq -r '.ref.billingId')
clientId=$(cat $f | jq -r '.credentials[0].clientId')
clientSecret=$(cat $f | jq -r '.credentials[0].clientSecret')
python3 examples/sender_async.py --billing-id $billingId\
  --client-id $clientId --client-secret $clientSecret
DEBUG:streammachine.driver.client.auth:Initializing a new Auth Provider for SenderService
DEBUG:streammachine.driver.client.auth:authenticate
INFO:__main__:Event sent, response 204
INFO:__main__:Event sent, response 204
INFO:__main__:Event sent, response 204
...
sender_async.py
import asyncio
import logging
import sys
import random
import uuid

from streammachine.driver import SerializationType
from streammachine_io_streammachine_schemas_demo_v1.io.streammachine.schemas.demo.v1 import DemoEvent

from client_builder import ClientBuilder

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)


class Sender(object):
    """
    An Asynchronous generator that periodically creates an event and sends it to streammachine
    """

    def __init__(self):
        self._client = ClientBuilder.create_stream_machine_client()

    def __aiter__(self):
        return self

    async def __anext__(self):
        event = create_avro_event()
        return await self._client.send(event, SerializationType.AVRO_BINARY)

    async def start_timers(self):
        await self._client.start_timers()


def create_avro_event():
    event = DemoEvent()

    event.strmMeta.eventContractRef = "streammachine/example/1.3.0"
    event.strmMeta.consentLevels = [random.randint(0, 3)]

    event.uniqueIdentifier = str(uuid.uuid4())
    event.someSensitiveValue = "A value that should be encrypted"
    event.consistentValue = "a-user-session"
    event.notSensitiveValue = "Hello from Python"

    return event


async def main():
    sender = Sender()
    await sender.start_timers()  # re-authorization jwt tokens

    async for response in sender:
        if response == 204:  # event correctly accepted by endpoint
            log.info(f"Event sent, response {response}")
        else:
            log.error(f"Something went wrong while trying to send event to Stream Machine, response: {response}")

        await asyncio.sleep(0.2)


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stderr)
    asyncio.run(main())

nodejs driver schemas demo avro

This example is also available on GitHub. Please see the repository for the readme.

Quick steps getting started:

git clone https://github.com/streammachineio/nodejs-examples
cd nodejs-examples
strm create stream demo --save
cat ~/.config/stream-machine/saved-entities/Stream/demo.json | jq \
    '{billingId:.ref.billingId,
      clientId:.credentials[0].clientId,
      clientSecret:.credentials[0].clientSecret}' \
      > assets/credentials.json
npm i
npm run sender
> nodejs-driver-example@1.0.0 sender
> ts-node ./src/sender.ts

Status 204
Status 204
...
sender.ts
import { Sender } from '@streammachine.io/nodejs-driver';
import { DemoEvent } from '@streammachine.io/schemas-demo-avro';

const CONFIG = require('../assets/config.json');

// Copy the template credentials file, name it `credentials.json` and fill out the values
const CREDENTIALS = require('../assets/credentials.json');

const startSender = async () => {
  // Note: the schema id is hard coded in this example, in the config.json. This will be dynamically determined in a future version
  const sender = new Sender({
    ...CONFIG,
    ...CREDENTIALS,
  });

  // Make sure to listen for error events, otherwise Node does not handle the error events (they're escalated)
  sender.on('error', (error) => {
    console.log('Sender', error);
  });

  await sender.connect().catch((e) => {
    console.error(`Connect error ${e}`, e);
  });

  setInterval(async function () {
    try {
      const r = await sender.send(createEvent(), 'AVRO_BINARY');

      console.log(`Status ${r.status}`);
      if (r.status !== 204) {
        console.error(`An error occurred while sending event:`, r);
      }
    } catch (e) {
      console.error(`Error: ${e.message}`, e);
    }
  }, 100);
};

const createEvent = () => {
  const event = new DemoEvent();
  event.strmMeta = {
    eventContractRef: 'streammachine/example/1.2.3',
    consentLevels: [0],
  };

  event.uniqueIdentifier = 'string';
  event.someSensitiveValue = 'A value that should be encrypted';
  event.consistentValue = 'a-user-session';
  event.notSensitiveValue = 'Hello from NodeJS';

  return event;
};

startSender();

Receiving data

See strm listen web-socket for a debugging view on the events.

See exporting to Kafka or batch exporters for production event consuming.