Gather insights from user events – part 1

Posted on 2016-10-03 by

In this blog series I am going to explain our setup for a system that helps in getting insights based on user events. The solution generates events in the web application and in a mobile app. These events are send to a backend to be cleaned, enriched and stored. With these events we can create dashboards that show information about what the visitors do on the website and what pages are visited. We calculate the amount of visitors and the amount of visits to a page within a certain time window. The following image gives an overview of this solution.

data-insights-figuur-1

I start with the event generation in the website and the mobile app. Both run AngularJS. The mobile app is built using Ionic, which is AngularJS as well. The events are send to a REST backend created as a spring boot application. The spring boot application stores the events in Kafka. Using the Kafka Streams API I’ll show you how to do some basic window based calculations. A long the way I’ll explain some of the concepts or point you to other resources. In the next blogs I am going to talk about Elastic Logstash and Kibana as well as Apache Flink.

Before we dive into the details first a technical overview of the solution.

data-insights-figuur-2

Sending events from AngularJS

The website that we want to send events from is written in AngularJS. Sending the event is extracted to a separate service. The send method accepts a name and an object containing the additional properties. Next to the name and the provided properties, we add a few properties in the service. CreateDate, origin and visitorId. The origin is website, to make it unique in relation to the mobile app. The visitorId is obtained from the $rootScope as you can see in the following code block.

angular.module('services')
    .factory('UserEvent', ['$http', 'USER_EVENT_URL', 'User', '$rootScope', function ($http, USER_EVENT_URL, User, $rootScope) {
        return {
            send: function (eventName, additionalParams) {
                var message = additionalParams;
                if (!message) {
                    message = {};
                }
                message.name = eventName;
                message.createDate = new Date();
                message.origin = "website";
                message.visitorId = $rootScope.visitorId;

                var user = User.get();
                if (user) {
                    message.username = user.username;
                }
                $http.post(USER_EVENT_URL + "/event", message);
            }
        };
    }]);

The visitorId is obtained from a cookie or generated and stored in a cookie when the app is starting up in the browser. This is shown in the following code block.

    .run(['$rootScope', 'User', '$location', 'OAuth2', '$cookies', function ($rootScope, User, $location, OAuth2, $cookies) {

        var visitorId = $cookies.get('visitor_id');

        if (!visitorId) {
            visitorId = generateGuid();
            var expireDate = new Date();
            expireDate.setDate(expireDate.getDate() + 365);
            $cookies.put("visitor_id",visitorId,{"expires": expireDate});
        }

        $rootScope.visitorId = visitorId;

        function generateGuid() {
            var result, i, j;
            result = '';
            for(j=0; j<32; j++) {
                if( j == 8 || j == 12|| j == 16|| j == 20)
                    result = result + '-';
                i = Math.floor(Math.random()*16).toString(16).toUpperCase();
                result = result + i;
            }
            return result;
        }
    }]);

Now we can inject the UserEvent service into any controller and make it easy to send a user event to the back end. In the next section we create the backend using spring boot.

Creating the spring boot endpoint

The spring boot app is a very web basic application with one endpoint. The application is generated from Intellij using the integrated website: http://start.spring.io. The basis is the spring-boot-started-web project. We added code to enrich the event with information about the user agent, the remote host and the content length. Besides that we created a structure with a forwarder that decouples the REST endpoint from the mechanism to store the incoming events. In our case we store the events in a file a well as in Kafka. The next code block shows the main code for this endpoint. In the next section we move on to sending messages to Kafka.

@RequestMapping(path = "/event", method = RequestMethod.POST, consumes = APPLICATION_JSON_VALUE)
public ResponseEntity<String> storeEvent(@RequestBody String eventString, HttpServletRequest request) {
    if (eventString.isEmpty()) {
        return new ResponseEntity<>(BAD_REQUEST);
    }
    log.info("Received event:");
    eventParser.parseRawEvent(eventString)
            .map(event -> eventEnricher.enrich(event, request))
            .ifPresent(eventForwarder::forwardEvent);

    return new ResponseEntity<>(OK);
}

Installing Kafka

Installing Kafka is not hard, you do have to run Zookeeper, but everything is documented well. If you need guidance, check the quickstart on the kafka website. Since our kafka is running on an open network, we wanted to send all messages using SSL. The documentation is clear about generating certificates, so if you have no experience with truststores and keystores, you can find the necessary steps here. The next code block shows the config changes we had to make in server.propertiesto get the connection with Kafka running. We used a connection where all clients also need to have a valid and signed certificate. This way the client as well as the server checks for valid certificates.

listeners=SSL://<IP-ADDRESS>:9092
advertised.listeners=SSL://<IP-ADDRESS>:9092
host.name=<IP-ADDRESS>
advertised.host.name=<IP-ADDRESS>
advertised.port=9092
security.inter.broker.protocol=SSL
ssl.keystore.location=/var/private/ssl/kafka/server.keystore.jks
ssl.keystore.password=<PASSWORD>
ssl.key.password=<PASSWORD>
ssl.truststore.location=/var/private/ssl/kafka/server.truststore.jks
ssl.truststore.password=
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
delete.topic.enable=true

Beware that some of these properties are now deprecated, but for some reason this is the combination that worked for me. This is the most important part for the installation. Besides using server as well as client side certificates, we also configured a firewall to enable access to certain ports for only specific IP addresses. Let us move on to the java producer that writes the messages to the Kafka Topic.

Sending messages to a Topic

We have chosen to use the new java based client. To use this client, you have to add the following dependency to the pom.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>

To connect to the Kafka cluster for sending events, you need the broker list a topic name and the locations and passwords for the keystore as well as the trust store since we are using SSL. In the method that is called after bean initialisation, we initialise the Kafka connection. The following code block shows this method.

@PostConstruct
public void initIt() {
    Properties kafkaProps = new Properties();

    kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    kafkaProps.put("ssl.truststore.location", trustStoreLocation);
    kafkaProps.put("ssl.truststore.password", trustStorePassword);
    kafkaProps.put("ssl.keystore.location", keyStoreLocation);
    kafkaProps.put("ssl.keystore.password", keyStorePassword);
    kafkaProps.put("ssl.key.password", keyPassword);
    kafkaProps.put("security.protocol", "SSL");

    producer = new org.apache.kafka.clients.producer.KafkaProducer<>(kafkaProps);
}

Next step is the actual message sending. I the next code block we show the method used to send a string, which in our case is a serialised JSON object.

public void send(String value) throws ExecutionException,
        InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
    producer.send(record).get();
}

That is it, now you can start reading events from the Kafka Topic. An easy way to try it out is to run a command line consumer that logs all passing events to the console. The next code block shows the command to do this and a few example events. After that we move on to creating a consumer that uses the stream API from Kafka.

bin/kafka-console-consumer.sh 
    --bootstrap-server ip.address:9092 
    --topic YourTopic 
    --new-consumer 
    --consumer.config ../client.properties

Using the Kafka Stream API

When programming against streams you can use applications like Spark or Flink. Kafka however comes with a low entry Stream API. Easy to embed in your java application with the most common tools that you need. One of them being creating time based windows. In the example we create time windows of a minute and count page visits and amount of visitors for a specific event. To create a stream consumer you need to add the following dependencies.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.0.1</version>
</dependency>

Just like with the producer we need to configure the connection to Kafka. In this example I extracted this config into a separate class. We need a few more properties than before.

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sample-1");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeper:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

streamsConfiguration.put("ssl.truststore.location", trustStoreLocation);
streamsConfiguration.put("ssl.truststore.password", trustStorePassword);
streamsConfiguration.put("ssl.keystore.location", keyStoreLocation);
streamsConfiguration.put("ssl.keystore.password", keyStorePassword);
streamsConfiguration.put("ssl.key.password", keyPassword);
streamsConfiguration.put("security.protocol", "SSL");

Code for Serde and JSon serialisation is coming from the original Kafka stream samples. The following code block shows the initialisation of the streams consumer.

@PostConstruct
public void init() {
    Properties streamsConfiguration = kafkaProps.createProperties();

    Map<String, Object> serdeProps = new HashMap<>();

    final Serializer<UserEvent> userEventSerializer = new JsonPOJOSerializer<>();
    serdeProps.put("JsonPOJOClass", UserEvent.class);
    userEventSerializer.configure(serdeProps, false);

    final Deserializer<UserEvent> userEventDeserializer = new JsonPOJODeserializer<>();
    serdeProps.put("JsonPOJOClass", UserEvent.class);
    userEventDeserializer.configure(serdeProps, false);

    final Serde<UserEvent> userEventSerde = Serdes.serdeFrom(userEventSerializer, userEventDeserializer);

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, UserEvent> textLines = builder.stream(stringSerde, userEventSerde, "inkoopjobs");

    createStreamVisitor(textLines);
    createStreamEvent(textLines);

    streams = new KafkaStreams(builder, streamsConfiguration);
    streams.start();
}

In this sample the UserEvent class is a basic pojo class with just two properties: Name and VisitorId. In the methods createStreamVisitor and createStreamEvent we consume the events from the Kafka Topic. The following code block shows one of them since the other one is almost a copy.

    private void createStreamVisitor(KStream<String, UserEvent> textLines) {
        KStream<String, Long> wordCounts = textLines
                .map((s, userEvent) -> KeyValue.pair(userEvent.getVisitorId(),userEvent.getName()))
                .through("RekeyedIntermediateTopic2")
                .countByKey(TimeWindows.of("minute-window-visitor", 60*1000), stringSerde)
                .toStream((key, value) -> String.format("%d-%s", key.window().start(), key.key()));
        wordCounts.to(stringSerde, longSerde, "CountsPerVisitor");
    }

In line 3 we create the key/value pair. In this case the key is the visitorId. Now for each 60 seconds we count the amount of events that occur per visitor. This is created in line 5. The counts are send to a topic CountsPerVisitor. We than use a consumer to read the Topic using a consumer. Which again needs an init method to start listening. This is shown in the next code block. We use the same connection properties as before in line 3. Than we stream the results by subscribing to the CountsPerVisitor Topic. Than for each entry we print out the counts per visitor and type of event using the different topics.

    @PostConstruct
    public void init() {
        Properties streamsConfiguration = kafkaProps.createProperties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sample-counts");
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, Long> textLines = builder.stream(stringSerde, longSerde, "CountsPerVisitor");

        textLines.foreach((key, value) -> {
            System.out.println("Count: " + key + "-" + value);
        });

        KStream<String, Long> textLines2 = builder.stream(stringSerde, longSerde, "CountsPerEvent");

        textLines2.foreach((key, value) -> {
            System.out.println("Count: " + key + "-" + value);
        });

        streams = new KafkaStreams(builder, streamsConfiguration);
        streams.start();
    }

Next step is of course to run both programs and see the stream of counts coming in. Below a bit of output.

Count: 1475496480000-05442A1E-835C-34AB-7BF8-8E972FDD7A7D-1
Count: 1475496480000-view_job-1
Count: 1475496540000-obtain_news_item-1
Count: 1475496540000-05442A1E-835C-34AB-7BF8-8E972FDD7A7D-1
Count: 1475496600000-click_homepage_details_interim_jobs_header-1
Count: 1475496600000-view_job-1
Count: 1475496600000-6808F840-FDAC-CADA-EEE5-A885CD63694B-1
Count: 1475496600000-6808F840-FDAC-CADA-EEE5-A885CD63694B-2
Count: 1475496600000-execute_search-1
Count: 1475496600000-6808F840-FDAC-CADA-EEE5-A885CD63694B-3
Count: 1475496600000-execute_search-2
Count: 1475496600000-6808F840-FDAC-CADA-EEE5-A885CD63694B-4
Count: 1475496660000-execute_search-1
Count: 1475496660000-32C8B64A-4C15-70CE-CBA2-FD8177A28373-1
Count: 1475496660000-execute_search-2
Count: 1475496660000-32C8B64A-4C15-70CE-CBA2-FD8177A28373-2
Count: 1475496660000-search_click_job-1
Count: 1475496660000-view_job-1
Count: 1475496660000-32C8B64A-4C15-70CE-CBA2-FD8177A28373-3
Count: 1475496660000-32C8B64A-4C15-70CE-CBA2-FD8177A28373-4
Count: 1475496660000-obtain_events-1
Count: 1475496660000-32C8B64A-4C15-70CE-CBA2-FD8177A28373-5

The format of these lines is: key-value in which the key contains the start of the time window and in case of visitor count the visitorId followed by a count of the amount of actions the visitor did. For the events you see the same id, however now the key id timestamp-event-name.

That’s it, now we have user events generated by the angularjs application, flowing though a spring boot app, stored in Kafka and read by the Kafka stream API to counts events and page visits by a visitor.

About Jettro Coenradie

I am a Software Developer / Architect with a lot of hands on experience in Java, AngularJS, Elasticsearch and lots of others tools. I like to use these technologies to help customers with the business challenges. On top of that I like to gather and share knowledge related to data analytics. I have experience with importing and transforming data as well as presenting and visualising the data. Currently I am working with tools like elasticsearch, logstash and Kibana but also D3 and C3 for graphics and other presentations.


Leave a Reply

Your email address will not be published. Required fields are marked *