Tag Archive: aggregation



A practical guide to the quirks of Kafka Streams

Kafka Streams is a lightweight library that reads from a Kafka topic, does some processing and writes back to a Kafka topic. It processes messages one by one but is also able to keep some state. Because it leverages the mechanics of Kafka consumer groups, scaling out is easy. Each instance of your Kafka Streams application will take responsibility for a subset of topic partitions and will process a subset of the keys that go through your stream.

Kafka Streams at first glance

One of the first things we need to keep in mind when we start developing with Kafka Streams is that there are 2 different API’s that we can use. There is the high-level DSL (using the KStreamsBuilder) and the low-level Processor API (using the TopologyBuilder). The DSL is easier to use than the Processor API at the cost of flexibility and control over details.

We will begin our journey by learning more about the DSL, knowing that sooner or later we will have to rewrite our application using the Processor API. It doesn’t take long before we stumble upon the word count example.


val source: KStream[String, String] = KStreamBuilderS.stream[String, String]("streams-file-input")
val counts: KTableS[String, Long] = source
  .flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
  .map { (_, value) => (value, value) }
  .groupByKey
  .count("Counts")

Easy! Let’s just skip the documentation and finish our application!

Our first Kafka Streams application

Unfortunately, we soon return to the documentation because each aggregation on our KStream seems to return a KTable and we want to learn more about this stream/table duality. Aggregations also allow for windowing, so we continue reading about windowing. Now that we know something about the theoretical context, we return to our code. For our use case, we need 1 aggregate result for each window. However, we soon discover that each input message results in an output message on our output topic. This means that all the intermediate aggregates are spammed on the output topic. This is our first disappointment.

A simple join

The DSL has all the usual suspects like filter, map, and flatMap. But even though join is also one of those usual suspects that we have done a thousand times, it would be best to read the documentation on join semantics before trying out some code. For in Kafka Streams, there are a bit more choices involved in joining, due to the stream/table duality. But whatever our join looks like, we should know something about Kafka’s partitioning. Joins can be done most efficiently when the value that you want to join on, is the key of both streams and both source topics have the same number of partitions. If this is the case, the streams are co-partitioned, which means that each task that is created by the application instances, can be assigned to one (co-)partition where it will find all the data it needs for doing the join in one place.

State

Wherever there are aggregations, windows or joins, there is state involved. KS will create a new RocksBD StateStore for each window in order to store the messages that fall within that window. Unfortunately, we would like to have a lot of windows in our application, and since we also have about 500,000 different keys, we soon discover that this quickly grows out of hands. After having turned the documentation inside out, we learn that each one of those stores has a cache size of 100 MB by default. But even after we change this to 0, our KS application is too state-heavy.

Interactive queries

The same StateStores that allow us to do joining and aggregating also allows us to keep a materialized view of a Kafka topic. The data in the topic will be stored by the application. If the topic is not already compacted, the local key-value-store will compact your data locally. The data will constantly be updated from the topic it is built from.

The store can be scaled out by running an extra instance of our application. Partitions will be divided among tasks, and tasks will be divided among instances, which results in each instance holding a subset of the data. This can lead to the situation where an instance is queried for a key that is contained in another instance. The queried instance may not be able to provide the value corresponding to the key, it knows, however, which other instance does hold the key. So we can relay the query. The downside is that we have to implement this API by ourselves.

In order for this to work, we need an extra configuration element

    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s"${settings.Streams.host}:${settings.Streams.port}")

Testing our app

A popular library for unit-testing Kafka Streams apps seems to be MockedStreams. However, not all topologies can be successfully tested with MockedStreams. But we can skip unit testing; integration tests are more important, anyway. Should we try using some EmbeddedKafka or spin up docker containers with docker-it? In the future, testing Kafka Streams apps will hopefully be easier (https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams).

When testing our topology, we wonder why our stream is not behaving like we would expect. After a while, we start banging our heads against a wall. Then we turn the documentation inside out again and we find some settings that may be helpful.

    p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
    p.put(ProducerConfig.BATCH_SIZE_CONFIG, "1")

Our second Kafka Streams application

Then the inevitable happens: we need to do something that is not covered by the DSL; we need to resort to the Processor API. This would look a bit like the code below.

    val builder: TopologyBuilder = new KStreamBuilder()
    builder
      .addSource("in", keySerde.deserializer(), valueSerde.deserializer(), settings.Streams.inputTopic)
      .addProcessor("myProcessor", new MyProcessorSupplier(), "in")
      .addSink("out", settings.Streams.outputTopic, keySerde.serializer(), valueSerde.serializer(), "myProcessor")

The Processor interface lets us implement a process method that is called for each message, as well as a punctuate method that can be scheduled to run periodically. Inside these methods, we can use ProcessorContext.forward to forward messsages down the topology graph.

Periodically, in Kafka 0.11, means stream-time which means punctuate will be triggered by the first message that comes along after the method is scheduled to run. In our case we want to use wallclock-time, so we use the ScheduledThreadPoolExecutor to do our own scheduling. But if we do this, the ProcessorContext might have moved to a different node in our topology and the forward method will have unexpected behavior. The workaround for this is to get hold of the current node object and pass it along to our scheduled code.

val currentNode = context.asInstanceOf[ProcessorContextImpl].currentNode().asInstanceOf[ProcessorNode[MyKey, MyValue]]

In Kafka 1.0 a PunctuationType was introduced to make it possible to choose between wallclock-time and stream-time.

Conclusion

By the time we had finished our application, we had re-written it a few times, seen all parts of the documentation more often than we would like and searched through the KS source code. We were unfamiliar with all the settings and unaware of all the places where messages could be cached, delayed, postponed or dropped, and at certain moments we started doubting our notion of time.

In retrospect, we should have kept things simple. Don’t use too much state. Don’t think a clever Processor will bend KS’s quirks in our favor. Don’t waste too much code on workarounds, because before you know it there will be a new version released that will break the API or obsolete our workarounds.

And for those of you wanting to write an article or blog-post on Kafka Streams, make sure to finish it before the new release gets out.

Resoures

  • https://docs.confluent.io/current/streams/
  • http://www.bigendiandata.com/2016-12-05-Data-Types-Compared/
  • https://www.confluent.io/blog/avro-kafka-data/
  • https://github.com/confluentinc/kafka-streams-examples
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

Looking ahead: new field collapsing feature in Elasticsearch

At Luminis Amsterdam, Search is one of our main focus points. Because of that, we closely keep an eye out for upcoming features.

Only a few weeks ago, I noticed that the following pull request (“Add field collapsing for search request”) was merged into the Elasticsearch code base, tagged for the 5.3/6.x release. This feature allows you to group your search results based on a specific key. In the past, this was merely possible by using a combination of an ‘aggregation’ and ‘top hits’.

Now a good question would be: ‘why would I want this?’ or ‘what is this grouping you are talking about?’. Imagine having a website where you sell Apple products. MacBook’s, iPhones, iPad’s etc… Let’s say because of functional requirements, we have to create separate documents for each variant of each device. (eg. separate documents for iPad Air 2 32GB Silver, iPad Air 2 32GB Gold etc..) When a user searches for the word ‘iPad’, having no result grouping, will mean that your users will see search results for all the iPads you are selling. This could mean that your result list looks like the following:

  1. iPad Air 2 32GB Pink
  2. iPad Air 2 128GB Pink
  3. iPad Air 2 32GB Space Grey
  4. iPad Air 2 128GB Space Grey
  5. ..
  6. ..
  7. ..
  8. ..
  9. ..
  10. iPad Pro 12.9 Inch 32GB Space Grey
  11. Ipad Case with happy colourful pictures on it.

Now for the sake of this example, let’s say we only show 10 products per page. If our user was really looking for an iPad case, he wouldn’t see this product, but instead, would be shown a long list of ‘the same’ iPad. This is not really user-friendly. Now, a better approach would be to group all the Ipad Air 2 products in one, so that it would take only 1 spot in the search results list. You would have to think of a visual presentation in order to notify the user that there are more variants of that same product.

As mentioned before, grouping of results was already possible in older versions of Elasticseach, but the downside of this old approach was that it would use a lot of memory when computing this on big data sets, plus that paginating result was not (really) possible. An example:

GET shop/_search
{
  "size": 0,
  "query": {
    "match": {
      "title": "iPad"
    }
  },
  "aggs": {
    "collapse_by_id": {
      "terms": {
        "field": "family_id",
        "size": 10,
        "order": {
          "max_score": "desc"
        }
      },
      "aggs": {
        "max_score": {
          "max": {
            "script": "_score"
          }
        },
        "top_hits_for_family": {
          "top_hits": {
            "size": 3
          }
        }
      }
    }
  }
}
  • We perform a Terms aggregation on the family_id, which results in the grouping we want. Next, we can use top_hits to get the documents belonging to that family.

All seems well. Now let’s say we have a website where users are viewing 10 products per page. In order for users to go to the next page, we would have to execute the same query, up the number of aggregations to 20 and remove the first 10 results. Aggregations use quite some processing power, so having to constantly aggregate over the complete set will not be really performant when having a big data set. Another way would be to eliminate the first page results by executing a query with for page 2 combined with a filter to eliminate the families already shown. All in all, this would be a lot of extra work in order to achieve a field collapsing feature.

Now that Elasticsearch added the field collapsing feature, this becomes a lot easier. You can download my gist( with some setup for if you want to play along with the example. The gist contains some settings/mappings, test data and the queries which I will be showing you in a minute.

Alongside query, aggregations, suggestions, sorting/pagination options etc.. Elasticsearch has added a new ‘collapse’ feature:

GET shop/_search
{
  "query": {
    "match": {
      "title": "Ipad"
    }
  },
  "collapse": {
    "field": "family_id"
  }
}

The simplest version of collapse only takes a field name on which to form the grouping. If we execute this query, it will generate the following result:

"hits": {
    "total": 6,
    "max_score": null,
    "hits": [
      {
        "_index": "shop",
        "_type": "product",
        "_id": "5",
        "_score": 0.078307986,
        "_source": {
          "title": "iPad Pro ipad",
          "colour": "Space Grey",
          "brand": "Apple",
          "size": "128gb",
          "price": 899,
          "family_id": "apple-5678"
        },
        "fields": {
          "family_id": [
            "apple-5678"
          ]
        }
      },
      {
        "_index": "shop",
        "_type": "product",
        "_id": "1",
        "_score": 0.05406233,
        "_source": {
          "title": "iPad Air 2",
          "colour": "Silver",
          "brand": "Apple",
          "size": "32gb",
          "price": 399,
          "family_id": "apple-1234"
        },
        "fields": {
          "family_id": [
            "apple-1234"
          ]
        }
      }
    ]
  }

Notice the total amounts in the query response, showing the total amount of documents that were matched against the query. Our hits only return 2 hits, but if we look at the ‘fields’ section of the result, we can see our two unique family_id’s. The best matching result for each family_id is returned in the search results.

It is also possible to retrieve the documents directly for each family_id by adding an inner_hits block inside collapse:

GET shop/_search
{
  "query": {
    "match": {
      "title": "iPad"
    }
  },
  "collapse": {
    "field": "family_id",
    "inner_hits": {
      "name": "collapsed_by_family_id",
      "from": 1,
      "size": 2
    }
  }
}
  • You can use ‘from:1’ to exclude the first hit in the family, since it’s already the returned parent of the family

Which results in:

"hits": {
    "total": 6,
    "max_score": null,
    "hits": [
      {
        "_index": "shop",
        "_type": "product",
        "_id": "5",
        "_score": 0.078307986,
        "_source": {
          "title": "iPad Pro ipad",
          "colour": "Space Grey",
          "brand": "Apple",
          "size": "128gb",
          "price": 899,
          "family_id": "apple-5678"
        },
        "fields": {
          "family_id": [
            "apple-5678"
          ]
        },
        "inner_hits": {
          "collapsed_family_id": {
            "hits": {
              "total": 2,
              "max_score": 0.078307986,
              "hits": [
                {
                  "_index": "shop",
                  "_type": "product",
                  "_id": "6",
                  "_score": 0.066075005,
                  "_source": {
                    "title": "iPad Pro",
                    "colour": "Space Grey",
                    "brand": "Apple",
                    "size": "256gb",
                    "price": 999,
                    "family_id": "apple-5678"
                  }
                }
              ]
            }
          }
        }
      },
      {
        "_index": "shop",
        "_type": "product",
        "_id": "1",
        "_score": 0.05406233,
        "_source": {
          "title": "iPad Air 2",
          "colour": "Silver",
          "brand": "Apple",
          "size": "32gb",
          "price": 399,
          "family_id": "apple-1234"
        },
        "fields": {
          "family_id": [
            "apple-1234"
          ]
        },
        "inner_hits": {
          "collapsed_family_id": {
            "hits": {
              "total": 4,
              "max_score": 0.05406233,
              "hits": [
                {
                  "_index": "shop",
                  "_type": "product",
                  "_id": "2",
                  "_score": 0.05406233,
                  "_source": {
                    "title": "iPad Air 2",
                    "colour": "Gold",
                    "brand": "Apple",
                    "size": "32gb",
                    "price": 399,
                    "family_id": "apple-1234"
                  }
                },
                {
                  "_index": "shop",
                  "_type": "product",
                  "_id": "3",
                  "_score": 0.05406233,
                  "_source": {
                    "title": "iPad Air 2",
                    "colour": "Space Grey",
                    "brand": "Apple",
                    "size": "32gb",
                    "price": 399,
                    "family_id": "apple-1234"
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }

Paging was an issue with the old approach, but since documents are grouped inside the search results, paging works out of the box. Same way as it does for normal queries and with the same limitations.

A lot of people in the community have been waiting for this feature and I’m excited that it finally arrived. You can play around with the data set and try some more ‘collapsing’ (eg by color, brand, size etc..). I hope this gave you a small overview of what’s to come in the upcoming 5.3/6.x release.


ElasticSearch 2.0 and Pipeline Aggregations

ElasticSearch 2.0.0 beta is out and apart from many performance related updates, one major addition has been the pipeline aggregations. This has been one of the most anticipated feature requests of the new version, as the name suggests it allows us to set up a pipleline aggregation which is able to perform computation on the buckets produced as a result of the earlier aggregation.

ElasticSearch Pipeline aggregations are broadly classified in two types –

• Parent – A pipeline aggregation computes it’s output (bucket/aggregation) and this output gets added to the bucket/aggregation of the parent aggregation.
• Sibling – An existing aggregation becomes an input of a pipeline of aggregations and you get new aggregations at the same level as the sibling aggregation instead of it becoming part existing buckets on the input aggregations.

(more…)