Tag Archive: kafka



A practical guide to the quirks of Kafka Streams

Kafka Streams is a lightweight library that reads from a Kafka topic, does some processing and writes back to a Kafka topic. It processes messages one by one but is also able to keep some state. Because it leverages the mechanics of Kafka consumer groups, scaling out is easy. Each instance of your Kafka Streams application will take responsibility for a subset of topic partitions and will process a subset of the keys that go through your stream.

Kafka Streams at first glance

One of the first things we need to keep in mind when we start developing with Kafka Streams is that there are 2 different API’s that we can use. There is the high-level DSL (using the KStreamsBuilder) and the low-level Processor API (using the TopologyBuilder). The DSL is easier to use than the Processor API at the cost of flexibility and control over details.

We will begin our journey by learning more about the DSL, knowing that sooner or later we will have to rewrite our application using the Processor API. It doesn’t take long before we stumble upon the word count example.


val source: KStream[String, String] = KStreamBuilderS.stream[String, String]("streams-file-input")
val counts: KTableS[String, Long] = source
  .flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
  .map { (_, value) => (value, value) }
  .groupByKey
  .count("Counts")

Easy! Let’s just skip the documentation and finish our application!

Our first Kafka Streams application

Unfortunately, we soon return to the documentation because each aggregation on our KStream seems to return a KTable and we want to learn more about this stream/table duality. Aggregations also allow for windowing, so we continue reading about windowing. Now that we know something about the theoretical context, we return to our code. For our use case, we need 1 aggregate result for each window. However, we soon discover that each input message results in an output message on our output topic. This means that all the intermediate aggregates are spammed on the output topic. This is our first disappointment.

A simple join

The DSL has all the usual suspects like filter, map, and flatMap. But even though join is also one of those usual suspects that we have done a thousand times, it would be best to read the documentation on join semantics before trying out some code. For in Kafka Streams, there are a bit more choices involved in joining, due to the stream/table duality. But whatever our join looks like, we should know something about Kafka’s partitioning. Joins can be done most efficiently when the value that you want to join on, is the key of both streams and both source topics have the same number of partitions. If this is the case, the streams are co-partitioned, which means that each task that is created by the application instances, can be assigned to one (co-)partition where it will find all the data it needs for doing the join in one place.

State

Wherever there are aggregations, windows or joins, there is state involved. KS will create a new RocksBD StateStore for each window in order to store the messages that fall within that window. Unfortunately, we would like to have a lot of windows in our application, and since we also have about 500,000 different keys, we soon discover that this quickly grows out of hands. After having turned the documentation inside out, we learn that each one of those stores has a cache size of 100 MB by default. But even after we change this to 0, our KS application is too state-heavy.

Interactive queries

The same StateStores that allow us to do joining and aggregating also allows us to keep a materialized view of a Kafka topic. The data in the topic will be stored by the application. If the topic is not already compacted, the local key-value-store will compact your data locally. The data will constantly be updated from the topic it is built from.

The store can be scaled out by running an extra instance of our application. Partitions will be divided among tasks, and tasks will be divided among instances, which results in each instance holding a subset of the data. This can lead to the situation where an instance is queried for a key that is contained in another instance. The queried instance may not be able to provide the value corresponding to the key, it knows, however, which other instance does hold the key. So we can relay the query. The downside is that we have to implement this API by ourselves.

In order for this to work, we need an extra configuration element

    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s"${settings.Streams.host}:${settings.Streams.port}")

Testing our app

A popular library for unit-testing Kafka Streams apps seems to be MockedStreams. However, not all topologies can be successfully tested with MockedStreams. But we can skip unit testing; integration tests are more important, anyway. Should we try using some EmbeddedKafka or spin up docker containers with docker-it? In the future, testing Kafka Streams apps will hopefully be easier (https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams).

When testing our topology, we wonder why our stream is not behaving like we would expect. After a while, we start banging our heads against a wall. Then we turn the documentation inside out again and we find some settings that may be helpful.

    p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
    p.put(ProducerConfig.BATCH_SIZE_CONFIG, "1")

Our second Kafka Streams application

Then the inevitable happens: we need to do something that is not covered by the DSL; we need to resort to the Processor API. This would look a bit like the code below.

    val builder: TopologyBuilder = new KStreamBuilder()
    builder
      .addSource("in", keySerde.deserializer(), valueSerde.deserializer(), settings.Streams.inputTopic)
      .addProcessor("myProcessor", new MyProcessorSupplier(), "in")
      .addSink("out", settings.Streams.outputTopic, keySerde.serializer(), valueSerde.serializer(), "myProcessor")

The Processor interface lets us implement a process method that is called for each message, as well as a punctuate method that can be scheduled to run periodically. Inside these methods, we can use ProcessorContext.forward to forward messsages down the topology graph.

Periodically, in Kafka 0.11, means stream-time which means punctuate will be triggered by the first message that comes along after the method is scheduled to run. In our case we want to use wallclock-time, so we use the ScheduledThreadPoolExecutor to do our own scheduling. But if we do this, the ProcessorContext might have moved to a different node in our topology and the forward method will have unexpected behavior. The workaround for this is to get hold of the current node object and pass it along to our scheduled code.

val currentNode = context.asInstanceOf[ProcessorContextImpl].currentNode().asInstanceOf[ProcessorNode[MyKey, MyValue]]

In Kafka 1.0 a PunctuationType was introduced to make it possible to choose between wallclock-time and stream-time.

Conclusion

By the time we had finished our application, we had re-written it a few times, seen all parts of the documentation more often than we would like and searched through the KS source code. We were unfamiliar with all the settings and unaware of all the places where messages could be cached, delayed, postponed or dropped, and at certain moments we started doubting our notion of time.

In retrospect, we should have kept things simple. Don’t use too much state. Don’t think a clever Processor will bend KS’s quirks in our favor. Don’t waste too much code on workarounds, because before you know it there will be a new version released that will break the API or obsolete our workarounds.

And for those of you wanting to write an article or blog-post on Kafka Streams, make sure to finish it before the new release gets out.

Resoures

  • https://docs.confluent.io/current/streams/
  • http://www.bigendiandata.com/2016-12-05-Data-Types-Compared/
  • https://www.confluent.io/blog/avro-kafka-data/
  • https://github.com/confluentinc/kafka-streams-examples
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

Gather insights from user events – part 1

In this blog series I am going to explain our setup for a system that helps in getting insights based on user events. The solution generates events in the web application and in a mobile app. These events are send to a backend to be cleaned, enriched and stored. With these events we can create dashboards that show information about what the visitors do on the website and what pages are visited. We calculate the amount of visitors and the amount of visits to a page within a certain time window. The following image gives an overview of this solution.

data-insights-figuur-1

I start with the event generation in the website and the mobile app. Both run AngularJS. The mobile app is built using Ionic, which is AngularJS as well. The events are send to a REST backend created as a spring boot application. The spring boot application stores the events in Kafka. Using the Kafka Streams API I’ll show you how to do some basic window based calculations. A long the way I’ll explain some of the concepts or point you to other resources. In the next blogs I am going to talk about Elastic Logstash and Kibana as well as Apache Flink.

(more…)


A beginners guide to the SMACK stack – Part 1: Introduction

This is the introduction to a new series of blog posts about data analysis with the SMACK stack. Follow along in the coming weeks to learn more!

Over the last 20 years, the architectures of (big) data platforms have changed and companies processing large amounts of data have moved from big Enterprise Data warehouse solutions to on-premise / cloud-based solutions, with Apache Hadoop (Hadoop Distributed File System, MapReduce, and YARN) as a fundamental building block. Architectures based on Hadoop tend to be focussed on (long running) batched or offline jobs, where data is captured to storage and then processed periodically. For usage in an online environment, this batch based processing was becoming too slow and business expectations were changing. Over the last 5-10 years, the demand for performing (near) real-time analysis has been pushing the industry into finding new solutions and architectural patterns to achieve these new goals. This has led to several ‘new’ architectural patterns like the Lambda architecture and the Kappa architecture. Both architectural patterns have a focus on processing data at speed (stream processing), where the Kappa architecture is purely focussed on a streaming (speed) layer and completely removes batch-oriented processing.

When designing a data platform there are many aspects that need to be taken into consideration:

  • the type of analysis – batch, (near) real-time, or both
  • the processing methodology – predictive, analytical, ad-hoc queries or reporting
  • data frequency and size — how much data is expected and at what frequency does it arrive at the platform
  • the type of data – transactional, historical, etc
  • the format of incoming data — structured, unstructured or semi-structured
  • the data consumers – who will be using the results

This list is by no means exhaustive, but it’s a starting point.

Organisations processing high volumes of data used to always pick a (single) vendor backed product stack, but these days there are so many  great, open source, reliable and proven solutions out there that you can easily take a best of breed approach and build your own stack. There is  a wide variety of components to select, so always do it based on your specific requirements.  One of the more popular, general purpose and best-of-breed big data stacks I’ve seen lately is the SMACK stack.

The SMACK stack

The SMACK Stack: Spark, Mesos, Akka, Cassandra and Kafka.

The SMACK stack consists of the following technologies:

  • Spark – Apache Spark™ is a fast and general engine for large-scale data processing. Spark allows you to combine SQL, streaming, and complex analytics. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. Spark has support for both real-time (Spark Streaming with µ batches) as well as batch (MapReduce) processing.
  • Mesos – Apache Mesos™ abstracts CPU, memory, storage, and other compute resources away from machines (physical or virtual), enabling fault-tolerant and elastic distributed systems to easily be built and run effectively. Mesos runs applications within its cluster and makes sure they are highly available and in the case of a machine failure will relocate applications to different nodes in the cluster.
  • Akka – Akka is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM. Akka uses the Actor Model to raise the abstraction level and provide a better platform to build scalable, resilient and responsive applications. Everything in Akka is designed to work in a distributed environment: all interactions of actors use pure message passing and everything is asynchronous.
  • Cassandra – Apache Cassandra™ is a proven, high performant, durable and fault tolerant NoSQL database. Cassandra can easily manage large amounts of data and offers robust support for clusters spanning multiple datacenters and geographical locations.
  • Kafka – Apache Kafka™ is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design to allow a single cluster to serve as the central data backbone for a large organization.

The tools are very easy to integrate with each other and serve their own purpose within a modern platform for Big Data applications. The ease of integration between has probably helped a lot in making it a popular solution, but it’s not the only reason. I think the most important reasons are because:

  1. it’s a concise toolbox that can deal with a wide variety of data processing scenarios
  2. it’s composed of proven, battle tested and widely used software components. The individual components are open source and backed by a large open-source community
  3. the stack is easily scalable and replication of data happens while still preserving low latencies
  4. the stack can run on a single cluster managed platform that can handle heterogeneous loads and any kind of applications

Over the next couple of weeks, we’ll be doing a deep-dive into each individual technology, so we can elaborate why these technologies combined are extremely powerful and give you a wide variety of options when designing your (big) data architecture.

Feel free to continue reading in the second part of this series, which covers Apache Mesos, the foundation of the SMACK stack.


JumpStarting Stream Processing with Kafka and Flink

In last few months, I have come across the topic of “Stream Processing” so much that it had become hard to ignore and definitely seemed like a new wave of Big Data technologies. We are living in the world of innovation and rapid changes thus I was cautious about investing my time in every new hype. After going through few articles, attending meetups and listening to some nice presentations from conferences, I am convinced that Stream processing is here to stay and being rapidly adopted by the industry. In this blog, I am presenting a summary of my understanding of the de-facto tools used in stream processing.

Big Data Infrastructure since the advent of  Hadoop stack hadn’t undergone a major change until very recently with new technology trends like Microservices, Internet of Things & Data localization in Apps has led to infrastructure requirements that should support processing and aggregation of real-time events and various consumer processes with the capability to handle those events at a high throughput . In last few years, there were mainly 2 patterns of programming

  • Request/Response
  • Batch

Request response is the traditional method  wherein the server gets a single request from the customer and the customer gets a response which might or might not be trivial depending on the backend of the application.  Then, came the need for processing of log files using map/reduce and coming up with intelligent analytics which gave rise to Hadoop stack based batch processing wherein the Hadoop jobs ran for hours or sometimes overnight to provide insights into log data and user activity.

In last, couple of years, there arose a need to develop frameworks which can perform real-time analytics on events coming from various devices or microservices or click stream from a user group (based on geography etc.) . This led to the development of in-house tools and frameworks by companies such as LinkedIn, Twitter, and Google for stream analytics. But, these tools were less mature as compared to their Batch/Hadoop counterparts. Also, there wasn’t much industry consensus around tools and technologies for stream processing. Thus, the situation was something like this 

Stream_Gap                             Fig 1 . From the talk “Apache Kafka and the Next 700 Stream Processing Systems” by Jay Kreps [1]

Hence, for the developer community, it was hard to get their hands on mature tools which could be used for stream processing. Then few years ago, Linkedin open-sourced Apache Kafka, which is a high throughput messaging system which led to rapid innovation in this field. Kafka is now becoming the backbone of streaming pipeline and being adopted rapidly [2].  Let’s look at few of the key concept of Apache Kafka.

Apache Kafka

Kafka is a messaging system built for scalability and high throughput, sounds like JMS ? Kafka comes with a few big advantages over traditional JMS. I would choose Kafka over traditional JMS queueing solution if I have to persist my messages for longer periods of times (upto days). Another reason is that, if I have to support multiple consumers that would be reading the messaging at their own pace as different Datastores have different write speeds depending upon their core functionality. Consumers, can also “replay” messages if they want and if they are down then they can start from the point where they left off once they are up again. Thus, these features make Kafka highly usable in an environment where heterogeneous frameworks, libraries are involved as each can consume messages based on a polling model at a throughput which they can handle which isn’t possible with traditional messaging systems.  Let’s have a closer look at why Kafka scales so well –

Topic Partition subsets

Fig 2 – A Topic along with it’s subset partitions

In Kafka, a Topic is an important abstraction which represents logical stream of data, this stream of data achieves parallelism when it’s partitioned in parallel streams which are called partitions. The messages/events in these partitions are identified by a unique Id or message offset which represents increasing timestamp within that partition. This image from the official docs [3] further elaborates on this  –

 

Anatomy of a Topic

                                                                Fig 3 – From the official Kafka docs[3], Anatomy of a Topic

Thus, we can see that messages are written in the sequential order and the consumer also consumes the messages sequentially.

Let’s look at the overall architecture of Kafka.

Broker Consumer Grouop

                                                                     Fig 4 – Internal Architecture of Kafka

Brokers are part of the Kafka cluster, a topic is divided into multiple partitions in order to balance load and also support parallelism.  Each consumer takes part in a consumer group, when we start a new consumer we specify a group label and based on that label grouping takes place. Each message of a topic gets delivered atleast once to a consumer instance of each subscribing group. You can see from the above image that partition 0 is sending the message of consumer 2 only, as both consumer 1 and 2 are part of the same consumer group hence the only one consumer instance of a subscribing group gets the message. If each consumer is in a different group then each message is broadcasted to all groups. Similarly, Consumer group 2 only has one consumer which gets messages from all 3 partitions. The total number of consumers in a group should never be more than total partitions of a subscribed topic. We can have a new 3rd Consumer in 1st group as there are 3 partitions but if we have a 4rth Consumer then it would be idle as there would be more Consumers in a group then number of partitions.

Brokers in Kafka are stateless which basically means that the broker doesn’t keep a record of how much messages a consumer has consumed, the message offset is for the consumer to keep track of not the broker. But this also makes it hard to delete redundant messages thus Kafka solves this problem by using a time-based SLA for the retention policy. Since the messages can stay for a while in Kafka thus the consumer can also rewind to an earlier offset or if a consumer instance crashes then that  partition from which it was reading from is assigned to another instance of the group. This distributed coordination is handled by Zookeeper, it’s responsibilities include keeping track of newly added brokers and consumers, rebalancing partitions- consumer group mapping when new consumers are added or removed and keeping track of the offset consumed in each partition.  Zookeeper maintains two registries ownership registry and an offset registry. The owner registry maintains the partition – consumer group mapping such that each consumer group has it’s own corresponding ownership registry and the offset registry contains the last consumed message offset of each partition.  Thus, with Kafka, it becomes easy to integrate with various frameworks consuming at the throughput which they can handle and is required by the application. Here’s how things would look once you have Kafka integrated with other tools.

Kafka Architecture

                              Fig 5 – From the talk “Demystifying Stream Processing with Apache Kafka”- by Neha Narkhede [4]

So far we have discussed Kafka which is the backbone of your streaming infrastructure, now let’s look at Apache Flink which is a stream processing framework and includes multiple processing libraries to enrich incoming events and messages.

Apache Flink

I heard about Flink very recently and have been really impressed by the traction it’s gaining in the developer community. It’s also one of the most active Apache Big Data projects and Flink meetups are coming up in all major tech cities of the world. Since I have been reading about Stream processing I realized that there are few key features which are important for any good stream processor to support. I was impressed to see that Flink is pretty new yet it supports all these key features, namely –

  • Processing engine with support for Streaming as well as Batch
  • Supporting various windowing paradigms
  • Support for Stateful Streaming
  • Faul Tolerant and high throughput
  • Complex Event Processing (CEP)
  • Backpressure handling
  • Easy Integration with existing Hadoop stack
  • Libraries for doing Machine Learning and graph processing.

The existing Hadoop stack which is good at batch processing already has so many moving parts that trying to configure it for stream processing is a difficult task, since various components like Oozi (job scheduler), HDFS (and flume for data loading), ML and graph libraries, & Batch processing jobs all have to work in perfect harmony. On top of that Hadoop has poor Stream support and no easy way to handle backpressure spikes. This makes Hadoop stack in streaming data processing even harder to use. Let’s take a look high-level view of Flink’s architecture

Flink architecture                                                                                Fig 6 – Flink’s architecture from the official docs [5]

For every submitted program a client is created which does the required pre-processing and turns the program into a parallel dataflow form which is then executed by the TaskManagers and the JobManager . JobManager is the primary coordinator for the whole execution cycle and is responsible for allotting tasks to TaskManager and also for resource management.

Interesting, thing about flink is that it contains so much functionality within it’s own framework that the number of moving parts in the streaming architecture goes down. Here are the  internal Flink components –

Flink stack

                                           Fig 7-  From the talk “Apache Flink: What, How, Why, Who, Where? ” by Slim Baltagi [6]

Flink engine’s which is a Distributed Streaming dataflow engine support both Streaming and Batch processing, along with the ability to support and use existing storage and deployment infrastructure, it supports multiple of domain specific libraries like FLinkML for machine learning, Gelly for graph analysis, Table for SQL, and FlinkCEP for complex event processing. Another interesting aspect of Flink is that existing big data jobs (Hadoop M/R, Cascading, Storm) can be executed on the Flink’s engine by means of an adapter thus this kind of flexibility is something which makes Flink center of the Streaming infrastructure processing.

As discussed above in the key feature list, two important aspects of Streaming supported by Flink are Windowing and Stateful streaming. Windowing is basically the technique of executing aggregates over streams. Windows can be broadly classified into

  • Tumbling windows (no overlap)
  • Sliding windows (with overlap)

The above two concepts can be explained by the following 2 images

Tumbling windows

                             Fig 8 – From the talk “Unified Stream and Batch Processing with Apache Flink” by Ufuk Celebi [7]

 

Sliding windows

                       Fig 8 – From the talk “Unified Stream and Batch Processing with Apache Flink” by Ufuk Celebi [7]

In references, I have provided link to Flink APIs that support stream aggregations i.e. windowing.

Stream processing which supports basic filtering or simple transformation don’t need state but when it comes to more advanced concepts like aggregation on streams (windowing), complex transformation, complex event processing then it becomes necessary to support stateful streaming.

In the recent release of Flink, they have introduced a concept called Savepoints. The Flink task managers regularly create checkpoints of the job’s state being processed and under the hood Savepoints are basically pointers to any of the checkpoints, these Savepoints can be manually triggered and they never expire until discarded by the user. Let’s look at an image for a more clearer understandingSavepoints

                                                             Fig 9 – From the official docs -Savepoints [8]

Here the checkpoints C1 and C3 have been discarded as the checkpoint C4 is the latest checkpoint and all the earlier checkpoints except C2 have been discarded. The reason C2 is still there is because a Savepoint was created when C2 was the latest checkpoint and now that Savepoint has a pointer to the C2. Initially, the job’s state is stored in-memory and then checkpointed into a filesytem (like HDFS etc) and savepoint is basically a url to the HDFS location of the checkpointed state. In order to store a much larger state, Flink team is working towards providing a state backend based on RocksDB.

Here is an overview of a Streaming architecture using Kafka and Flink

Kafka Flink

           Fig 10 – From the talk “Advanced Streaming Analytics with Apache Flink and Apache Kafka” by Stephan Ewen [9]

So far, we have discussed both Flink and Kafka before concluding let’s just go through the Yahoo Benchmark for stream processors [10]

Yahoo_Banchmark

                                   Fig 11 – From the talk “Unified Stream and Batch Processing with Apache Flink” by Ufuk Celebi [7]

The Architecture consisted of Kafka clusters feeding the stream processors and the results of stream transformation were published in Redis and via Redis available to applications outside the architecture. As you can see that even at high throughput Storm and Flink maintained low latency.  This benchmark was further extended by Data Artisans [11], the company behind Flink, they took Yahoo’s benchmark as a starting point and upgraded the Flink’s cluster’s node interconnect to 10GigE from 1 GigE which was used by Yahoo. The results were very interesting as Flink not only outperformed storm but also saturated the Kafka link at around 3 million events/sec.

Conclusion

Stream processing is at an initial yet very interesting phase, and I hope after reading this blog you would give Kafka and Flink a try on your machine. Feel free to share your feedback/comments

References

[1] – https://www.youtube.com/watch?v=9RMOc0SwRro
[2] – http://www.confluent.io/confluent-unveils-next-generation-of-apache-kafka-as-enterprise-adoption-soars
[3] – http://kafka.apache.org/documentation.html
[4] – http://www.infoq.com/presentations/stream-processing-kafka
[5] – https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_general_arch.html
[6] – http://www.slideshare.net/sbaltagi/apacheflinkwhathowwhywhowherebyslimbaltagi-57825047
[7] – https://www.youtube.com/watch?v=8Uh3ycG3Wew
[8] – https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html  

[9]-  http://kafka-summit.org/sessions/advanced-streaming-analytics-with-apache-flink-and-apache-kafka/
[10] – https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
[11] – http://data-artisans.com/extending-the-yahoo-streaming-benchmark/