Tag Archive: smack



A practical guide to the quirks of Kafka Streams

Kafka Streams is a lightweight library that reads from a Kafka topic, does some processing and writes back to a Kafka topic. It processes messages one by one but is also able to keep some state. Because it leverages the mechanics of Kafka consumer groups, scaling out is easy. Each instance of your Kafka Streams application will take responsibility for a subset of topic partitions and will process a subset of the keys that go through your stream.

Kafka Streams at first glance

One of the first things we need to keep in mind when we start developing with Kafka Streams is that there are 2 different API’s that we can use. There is the high-level DSL (using the KStreamsBuilder) and the low-level Processor API (using the TopologyBuilder). The DSL is easier to use than the Processor API at the cost of flexibility and control over details.

We will begin our journey by learning more about the DSL, knowing that sooner or later we will have to rewrite our application using the Processor API. It doesn’t take long before we stumble upon the word count example.


val source: KStream[String, String] = KStreamBuilderS.stream[String, String]("streams-file-input")
val counts: KTableS[String, Long] = source
  .flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
  .map { (_, value) => (value, value) }
  .groupByKey
  .count("Counts")

Easy! Let’s just skip the documentation and finish our application!

Our first Kafka Streams application

Unfortunately, we soon return to the documentation because each aggregation on our KStream seems to return a KTable and we want to learn more about this stream/table duality. Aggregations also allow for windowing, so we continue reading about windowing. Now that we know something about the theoretical context, we return to our code. For our use case, we need 1 aggregate result for each window. However, we soon discover that each input message results in an output message on our output topic. This means that all the intermediate aggregates are spammed on the output topic. This is our first disappointment.

A simple join

The DSL has all the usual suspects like filter, map, and flatMap. But even though join is also one of those usual suspects that we have done a thousand times, it would be best to read the documentation on join semantics before trying out some code. For in Kafka Streams, there are a bit more choices involved in joining, due to the stream/table duality. But whatever our join looks like, we should know something about Kafka’s partitioning. Joins can be done most efficiently when the value that you want to join on, is the key of both streams and both source topics have the same number of partitions. If this is the case, the streams are co-partitioned, which means that each task that is created by the application instances, can be assigned to one (co-)partition where it will find all the data it needs for doing the join in one place.

State

Wherever there are aggregations, windows or joins, there is state involved. KS will create a new RocksBD StateStore for each window in order to store the messages that fall within that window. Unfortunately, we would like to have a lot of windows in our application, and since we also have about 500,000 different keys, we soon discover that this quickly grows out of hands. After having turned the documentation inside out, we learn that each one of those stores has a cache size of 100 MB by default. But even after we change this to 0, our KS application is too state-heavy.

Interactive queries

The same StateStores that allow us to do joining and aggregating also allows us to keep a materialized view of a Kafka topic. The data in the topic will be stored by the application. If the topic is not already compacted, the local key-value-store will compact your data locally. The data will constantly be updated from the topic it is built from.

The store can be scaled out by running an extra instance of our application. Partitions will be divided among tasks, and tasks will be divided among instances, which results in each instance holding a subset of the data. This can lead to the situation where an instance is queried for a key that is contained in another instance. The queried instance may not be able to provide the value corresponding to the key, it knows, however, which other instance does hold the key. So we can relay the query. The downside is that we have to implement this API by ourselves.

In order for this to work, we need an extra configuration element

    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s"${settings.Streams.host}:${settings.Streams.port}")

Testing our app

A popular library for unit-testing Kafka Streams apps seems to be MockedStreams. However, not all topologies can be successfully tested with MockedStreams. But we can skip unit testing; integration tests are more important, anyway. Should we try using some EmbeddedKafka or spin up docker containers with docker-it? In the future, testing Kafka Streams apps will hopefully be easier (https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams).

When testing our topology, we wonder why our stream is not behaving like we would expect. After a while, we start banging our heads against a wall. Then we turn the documentation inside out again and we find some settings that may be helpful.

    p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
    p.put(ProducerConfig.BATCH_SIZE_CONFIG, "1")

Our second Kafka Streams application

Then the inevitable happens: we need to do something that is not covered by the DSL; we need to resort to the Processor API. This would look a bit like the code below.

    val builder: TopologyBuilder = new KStreamBuilder()
    builder
      .addSource("in", keySerde.deserializer(), valueSerde.deserializer(), settings.Streams.inputTopic)
      .addProcessor("myProcessor", new MyProcessorSupplier(), "in")
      .addSink("out", settings.Streams.outputTopic, keySerde.serializer(), valueSerde.serializer(), "myProcessor")

The Processor interface lets us implement a process method that is called for each message, as well as a punctuate method that can be scheduled to run periodically. Inside these methods, we can use ProcessorContext.forward to forward messsages down the topology graph.

Periodically, in Kafka 0.11, means stream-time which means punctuate will be triggered by the first message that comes along after the method is scheduled to run. In our case we want to use wallclock-time, so we use the ScheduledThreadPoolExecutor to do our own scheduling. But if we do this, the ProcessorContext might have moved to a different node in our topology and the forward method will have unexpected behavior. The workaround for this is to get hold of the current node object and pass it along to our scheduled code.

val currentNode = context.asInstanceOf[ProcessorContextImpl].currentNode().asInstanceOf[ProcessorNode[MyKey, MyValue]]

In Kafka 1.0 a PunctuationType was introduced to make it possible to choose between wallclock-time and stream-time.

Conclusion

By the time we had finished our application, we had re-written it a few times, seen all parts of the documentation more often than we would like and searched through the KS source code. We were unfamiliar with all the settings and unaware of all the places where messages could be cached, delayed, postponed or dropped, and at certain moments we started doubting our notion of time.

In retrospect, we should have kept things simple. Don’t use too much state. Don’t think a clever Processor will bend KS’s quirks in our favor. Don’t waste too much code on workarounds, because before you know it there will be a new version released that will break the API or obsolete our workarounds.

And for those of you wanting to write an article or blog-post on Kafka Streams, make sure to finish it before the new release gets out.

Resoures

  • https://docs.confluent.io/current/streams/
  • http://www.bigendiandata.com/2016-12-05-Data-Types-Compared/
  • https://www.confluent.io/blog/avro-kafka-data/
  • https://github.com/confluentinc/kafka-streams-examples
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

A beginners guide to the SMACK stack – Part 2: Mesos

In the first part of this series, we’ve briefly looked at the technologies that make up the SMACK stack. In this post, we’ll take a closer look at one of the fundamental layers of the stack: Apache Mesos.

What is Apache Mesos?

Apache Mesos is an open source cluster manager that provides efficient resource isolation and sharing across distributed applications or frameworks. Mesos manages a cluster of machines (virtual or physical) and can also be seen as a distributed systems kernel, because the Mesos kernel runs on every machine in the cluster and provides applications (e.g., Hadoop, Spark, Kafka, etc) with API’s for resource management and scheduling across an entire data center or cloud environment. One of the ideas behind Mesos is to deploy multiple distributed systems within the same shared pool of cluster nodes in order to increase resource utilization. Let’s get to know Mesos a little better by looking at some of the core Mesos concepts.

Architecture

By examining the Mesos architecture we will get to know the most important concepts to understand when dealing with Mesos.

 

mesos-architecture-2

 

As you can see in the above figure, a Mesos architecture consists of a master daemon that manages agent daemons running on nodes in the cluster. Apache Mesos also uses Apache ZooKeeper to operate. ZooKeeper acts as the master election service in the Mesos architecture and stores state for the Mesos nodes.

Frameworks

Next to the masters and agents, Mesos has the concept of frameworks. Frameworks within Mesos are responsible for running tasks on the Mesos agent nodes.

A Mesos framework consists of two major components:

  1. a scheduler that registers with the Mesos master to be offered resources
  2. an executor process that is launched on slave nodes to run the framework’s tasks

Mesos agents will notify the Mesos master about their available resources. Based on that information, the Mesos master determines how many resources to offer to each framework. After the Mesos master has decided which resources to offer, the scheduler then selects which of the offered resources to use. When a framework accepts offered resources, it passes Mesos a description of the tasks it wants to launch.

Now that we went over the concept of frameworks, schedulers, and executors, it’s interesting to point out that most of the components that make up the SMACK stack are available as a Mesos framework. Spark, Kafka, and Cassandra are all available as a framework for Mesos. Mesos has support for a lot of different frameworks, you can find a more extensive list of available frameworks on the Mesos frameworks documentation page.

Jobs

Almost all data processing platforms will have the need for two different kinds of jobs:

  1. scheduled/periodic jobs – you can think of periodic batch aggregations or reporting jobs
  2. long-running jobs – stream processing or long running application jobs

For the first kind of jobs, you can use the Chronos framework. Chronos is a distributed and fault-tolerant scheduler that runs on top of Apache Mesos and can be used for job orchestration. If you’re familiar with Linux you can compare it to a distributed version of cron. However, compared to regular cron, Chronos has a number of advantages. For instance, it allows you to schedule your jobs using ISO8601 repeating interval notation, which enables more flexibility in job scheduling. Next to that Chronos also supports arbitrarily long dependency chains. Jobs can be triggered by the completion of other jobs. This can very useful at times.

For the long-running jobs, Mesos has the Marathon framework. Marathon is a fault tolerant and distributed ‘init’ system and can be used to deploy and run applications across a Mesos cluster. Marathon has many features that simplify running applications in a clustered environment, such as high-availability, node constraints, application health checks, an API for scriptability and service discovery. When it comes to application deployment Mesos has built in support for Blue-Green deployments and it also adds scaling and self-healing to the already big feature sets. In case a machine in the cluster dies, Marathon will make sure that the application is automatically spawned elsewhere in the cluster to make sure the application is always on and meets the preconfigured amount of instances.

Marathon can be used to run any kind of executable process and is also often used as a PaaS solutions for running containers. Other Mesos frameworks can also be launched from Marathon. In combination with the self-healing ability, Mesos and Marathon make a very robust platform for running any kind of application.

Getting some hands-on experience

You can build Mesos from source, but probably the easiest way to getting some hands-on experience with Mesos in the context of the SMACK stack is by installing DC/OS. DC/OS stands for the open source DataCenter Operating System developed by Mesosphere, the company which is also actively working on Apache Mesos, and DC/OS is built on top of Apache Mesos. When experimenting with new technologies I always like to use Vagrant or Docker. Luckily Mesosphere has released a DC/OS vagrant project which we can easily use, but before we get started, make sure you have Git, Vagrant, VirtualBox installed.

The first step is installing the vagrant-host manager plugin, which will alter our /etc/hosts file with some new hostnames, so we can easily connect to the instances.

$ vagrant plugin install vagrant-hostmanager

Now let’s clone the dcos vagrant project:

$ git clone https://github.com/dcos/dcos-vagrant

Let’s configure a 3 node installation by copying the correct Vagrant configuration:

$ cd dcos-vagrant
$ cp VagrantConfig-1m-1a-1p.yaml VagrantConfig.yaml

Now that we’ve configured everything, we can spin up our new DC/OS VMs.

$ vagrant up

During the installation, it might ask you for a password. You will need to enter your local machines password because vagrant is trying to alter your /etc/hosts file. It might take a little while before everything is up and running, but once it’s done you can navigate to http://m1.dcos/.

If everything is setup alright you should be able to log in and see the DC/OS dashboard.

As you can see from the screenshot above we currently have no tasks or services running, but now we’re all set to go and continue with our deep dive into the SMACK stack. Let’s install Marathon in DC/OS, so we can get some sense of what it takes to install a Mesos Framework.

Go to Universe -> Packages and install Marathon as a Mesos framework. Once installed Marathon you should be able to see you now have running tasks on the dashboard.

And by going to Services you should be able to see the marathon deployment happening within the Mesos cluster.

Now if you want you can also open the service and go to the Marathon UI to start creating groups or applications, but we’ll leave that for next time.

Marathon allows you to deploy processes and application containers and through the UI you can also easily scale these applications.

Mesos frameworks like Marathon, Kafka, Cassandra and Spark can also be easily scaled from within DC/OS. It’s just a matter of a few clicks.

In the above example, we installed Mesos via DC/OS in Vagrant, but you can also run DC/OS in the cloud or on premise. See the DC/OS get started page for more setup options.

Summary

Mesos is the fundamental layer of the SMACK stack that allows the applications to be run efficiently. It has all the features available to do proper auto recovery and can meet the scaling requirements required when you hit high load traffic on your architecture. Installing other parts of the stack is almost trivial with DC/OS and the Mesos frameworks for Kafka, Spark and Cassandra.

In the next part of this series, we will dive deeper into Cassandra and start adding Cassandra to our local SMACK stack for storage of our data at hand.

SaveSave