Complete examples

This page aims to provide full examples, from sending data to Stream Machine to receiving the data through the Egress.

These examples are meant for demonstration purposes and use the Egress interface to receive events. For production ready interfaces, we refer to the Batch and Kafka exporters.

In order to run these examples, you need the following:

  • An input stream to send data to (if you don’t know how, click here to learn how to create streams)

  • The credentials for this stream (presented upon creation)

  • The billing id for your account (you can find your billing id via the CLI or in the portal)

Please see the readme of the examples presented below on how to provide the credentials to the example code.

Sending data

The following demo application displays how dummy data can be sent with a certain frequency. The data that is sent is quite static and does not result in any useful patterns for analysis, however, it does show how data can be constructed and tranferred to Stream Machine.

  • Java

This example is also available on GitHub. Please see the repository for the readme.

package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.domain.Config;
import io.streammachine.driver.serializer.SerializationType;
import io.streammachine.public_schemas.clickstream.ClickstreamEvent;
import io.streammachine.public_schemas.clickstream.Customer;
import io.streammachine.public_schemas.clickstream.StrmMeta;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;

import static java.util.Collections.singletonList;

@Slf4j
public class Sender {
    private static Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        new Sender().run(args);
    }

    /**
     * start sending hardcoded avro events.
     *
     * @param args 3 parameters: [billingId, clientId, clientSecret]
     * @throws InterruptedException
     */
    private void run(String[] args) throws InterruptedException {
        if (args.length != 3) {
            System.out.println("Ensure that you've provided all required input arguments: [billingId, clientId, clientSecret]");
            System.exit(1);
        }

        var billingId = args[0];
        var clientId = args[1];
        var clientSecret = args[2];

        var config = Config.builder().build();
        StreamMachineClient client = StreamMachineClient.builder()
                .billingId(billingId)
                .clientId(clientId)
                .clientSecret(clientSecret)
                .config(config)
                .build();

        while (true) {
            var event = createAvroEvent();

            client.send(event, SerializationType.AVRO_BINARY)
                    .whenComplete((response, exception) -> {
                        if (exception != null) {
                            log.error("An exception occurred while trying to send an event to Stream Machine", exception);
                        }

                        if (response.getStatus() == 204) {
                            log.debug("{}", response.getStatus());
                        } else if (response.getStatus() == 400) {
                            // Try to change the value for the url field in the createAvroEvent method below to something that is not a url
                            // You can see that the Stream Machine gateway rejects the
                            // message, stating that the field does not match the regex
                            // provided in resources/schema/avro/strm.json
                            log.debug("Bad request: {}", response.getContentAsString());
                        }
                    });

            Thread.sleep(500);
        }
    }

    /**
     * Generate a ClickstreamEvent from a Java class that corresponds with a the clickstream schema.
     * These Java classes are generated and provided by Streammachine, based on the
     * schema definition and the serialization schema.
     * <p>
     * This particular event corresponds to an Avro schema that you can see in resources/schema/avro/schema.avsc and resources/schema/avro/strm.json
     *
     * @return a {@link io.streammachine.schemas.StreamMachineEvent}
     */
    private static ClickstreamEvent createAvroEvent() {
        int consentLevel = RANDOM.nextBoolean() ? 1 : 0;

        return ClickstreamEvent.newBuilder()
                .setAbTests(singletonList("abc"))
                .setEventType("button x clicked")
                .setCustomer(Customer.newBuilder()
                        .setId("some-identifier")
                        .build())
                .setReferrer("https://www.streammachine.io")
                .setUserAgent("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36")
                .setProducerSessionId("session-01")
                .setConversion(1)
                .setStrmMeta(StrmMeta.newBuilder()
                        .setTimestamp(System.currentTimeMillis())
                        .setSchemaId("clickstream")
                        .setNonce(0)
                        .setConsentLevels(singletonList(consentLevel))
                        .build())
                .setUrl("https://portal.streammachine.io")
                .build();
    }
}

Receiving data

And the application shown below, displays how the data can be received through the Egress, which is transferred over Websocket.

  • Java

This example is also available on GitHub. Please see the repository for the readme.

package io.streammachine.examples;

import io.streammachine.driver.client.StreamMachineClient;
import io.streammachine.driver.common.WebSocketConsumer;
import io.streammachine.driver.domain.Config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.websocket.api.Session;

@Slf4j
public class Receiver {
    public static void main(String[] args) {
        new Receiver().run(args);
    }

    private void run(String[] args) {
        if (args.length != 3) {
            System.out.println("Ensure that you've provided all required input arguments: [billingId, clientId, clientSecret]");
            System.exit(1);
        }

        var billingId = args[0];
        var clientId = args[1];
        var clientSecret = args[2];

        var config = Config.builder().build();
        StreamMachineClient client = StreamMachineClient.builder()
                .billingId(billingId)
                .clientId(clientId)
                .clientSecret(clientSecret)
                .config(config)
                .build();

        ContentResponse isAlive = client.egressIsAlive();

        log.debug("{}: {}", isAlive.getStatus(), isAlive.getReason());

        client.startReceivingWs(true, new StreamMachineEventWsListener());
    }

    private static class StreamMachineEventWsListener extends WebSocketConsumer {
        @Override
        public void onWebSocketClose(int statusCode, String reason) {
            log.info("Closed websocked connection...");
            log.info("Code: {}", statusCode);
            log.info("Reason: {}", reason);
        }

        @Override
        public void onWebSocketConnect(Session sess) {
            log.info("Opened websocket connection...");
        }

        @Override
        public void onWebSocketError(Throwable cause) {
            log.error("An error occurred", cause);
        }

        @Override
        public void onWebSocketText(String message) {
            System.out.println(message);
        }
    }
}

Configuration properties

The default configuration properties are shown below. If necessary or requested, you can provide the properties to your Stream Machine client.

Property Value

Gateway Scheme

https

Gateway Port

443

Gateway Host

in.strm.services

Gateway Endpoint

/event

Egress Scheme

https

Egress Host

out.strm.services

Egress Endpoint

/ws

Authentication Scheme

https

Authentication Host

auth.strm.services

Authentication Endpoint

/auth

Authentication Refresh Endpoint

/refresh