Categories for Stream Processing



Looking back on AWS Summit Benelux 2018

Last week I visited AWS Summit Benelux together with Sander. AWS Summit is all about cloud computing and the topics that surround cloud computing. This being my first AWS conference I can say it was a really nice experience. Sure there was room for improvement (no coffee or tea after the opening keynote being one), but other than that it was a very good experience. Getting inside was a breeze with all the different check-in points and after you entered you were directly on the exhibitor floor where a lot of Amazon partners showed their products.

Opening keynote

The day started with an introduction by Kamini Aisola, Head of Amazon Benelux. With this being my first AWS summit it was great to see Kamini showing some numbers about the conference: 2000 attendees and 28 technical sessions. She also showed us the growth pattern of AWS with an increasing growth of 49% compared to last year. That’s really impressive!

Who are builders?

Shortly after, Amazon.com CTO Werner Vogels started with his opening keynote. Werner showed how AWS evolved from being ‘just’ an IaaS company to now offering more than 125 different services. More than 90% of the developed services were based on customer feedback from the last couple of years. That’s probably one of the reasons why AWS is growing so rapidly and customers are adopting the AWS platform.

What I noticed throughout the entire keynote is that AWS is constantly thinking about what builders want to build (in the cloud) and what kind of tools those builders need to have to be successful. These tools come in different forms and sizes, but I noticed there is a certain pattern in how services evolve or are grown at AWS. The overall trend I noticed during the talks is that engineers or builders should have to spend less time focussing on lower level infrastructure and can start to really focus on delivering business value by leveraging the services that AWS has to offer.

During the keynote Werner ran through a couple of different focus areas for which he showed what AWS is currently offering. In this post I won’t go through all of them, because I expect you can probably watch a recording of the keynote on youtube soon, but I’ll highlight a few.

Let’s first start with the state of Machine Learning and analytics. Werner looked back at how machine learning evolved at Amazon.com and how services were developed to make machine learning more accessible for teams within the organisation. Out of this came a really nice mission statement:

AWS want’s to put machine learning in the hands of every developer and data scientist.

To achieve this mission AWS is currently offering a layered ML stack to engineers looking into to using ML on the AWS platform.

The layers go from low-level libraries to pre-build functionalities based on these lower level layers. I really liked that fact that these services are built in such a way that engineers can decide at which level of complexity they want to start using the ML services offered by AWS. Most of the time data engineers and data scientist will start from either SageMaker or even lower, but most application developers might just want to use a pre-built functionality like image recognition, text processing or speech recognition. See for instance this really awesome post on using Facial recognition by my colleague Roberto.

Another example of this layered approach was with regards to container support on AWS. A few years back Amazon added container support to their offering with Amazon Elastic Container Service (Amazon ECS). This allowed Amazon ECS helped customers run containers on AWS without having to manage all servers and manager their own container orchestration software. ECS delivered all of this. Now fast forwarding a few years Amazon is now offering Amazon EKS (managed Kubernetes on Amazon) after they noticed that about 63% of managed Kubernetes clusters ran on AWS. Kubernetes has become the current industry standard when it comes to container orchestration, so this makes a lot of sense. In addition, Amazon now also offers Amazon Fargate. With Fargate they take the next step which means that Fargate allows you as the developer to focus on running containers ‘without having to think about managing servers or clusters’.

During his keynote, Werner also mentioned the Well-Architected framework. The Well-Architect framework has been developed to help cloud architects run their applications in the cloud based on AWS best practices. When implemented correctly it allows you to fully focus on your functional requirements to deliver business value to your customers. The framework is based on the following five pillars:

  1. Operational Excellence
  2. Security
  3. Reliability
  4. Performance Efficiency
  5. Cost Optimization

I had not heard about the framework before, so during the weekend I read through some of its documentation. Some of the items are pretty straightforward, but others might give you some insights in what it means to run applications in the cloud. One aspect of the Well-Architected framework, Security, had been recurring throughout the entire keynote.

Werner emphasised a very important point during his presentation:

Security is EVERYONE’s job

With all the data breaches happening lately I think this is a really good point to make. Security should be everybody’s number one priority these days.

During the keynote, there were a couple of customers that showed how AWS had helped them achieve a certain goal. Bastiaan Terhorst, CPO at WeTransfer explained that being a cloud-scale company comes with certain problems. He explained how they moved from a brittle situation towards a more scalable solution. They could not modify the schema of their DB anymore without breaking the application, which is horrible if you reach a certain scale and customer base. They had to rearchitect the way they worked with incoming data and using historic data for reporting. I really liked the fact that he shared some hard-learned lessons about database scalability issues that can occur when you reach a certain scale.

Tim Bogaert, CTO at de Persgroep also showed how they moved from being a silo-ed organization with own datacenters and waterfall long-running projects towards all-in AWS with an agile approach and teams following the “You Build It, You Run It” mantra. It was an interesting story because I see a lot of larger enterprises still struggling with these transitions.

After the morning keynote, the breakout sessions started. There were 7 parallel tracks and all with different topics, so plenty to choose from. During the day I attended only a few, so here goes.

Improve Productivity with Continuous Integration & Delivery

This really nice talk by Clara Ligouri (software engineer for AWS Developer Tools) and Jamie van Brunschot (Cloud engineer at Coolblue) gave a good insight into all the different tools provided by AWS to support the full development and deployment lifecycle of an application.

Clara modified some code in Cloud9 (the online IDE), debugged some code, ran CI jobs, tests and deployments all from within her browser and pushed a new change to production within only a matter of minutes. It shows how far the current state of being a cloud-native developer has really come. I looked at Cloud9 years ago. Way before they were acquired by Amazon. I’ve always been a bit skeptical when it comes to using an online IDE. I remember having some good discussions with the CTO at my former company about if this would really be the next step for IDEs and software development in general. I’m just so comfortable with IntelliJ for Java development and it always works (even if I do not have any internet ;-)). I do wonder if anybody reading this is already using Cloud9 (or any other Web IDE) and is doing his / her development fully in the cloud. If you do, please leave a comment, I would love to learn from your experiences. The other tools like CodePipeline and CodeDeploy definitely looked interesting, so I need to find some time to play around with them.

GDPR

Next up was a talk on GDPR. The room was quite packed. I didn’t expect that though, because everybody should be GDPR compliant by now right? 🙂 Well not really. Companies are still implementing changes to be compliant with GDPR. The talk by Christian Hesse looked at different aspects of GDPR like:

  • The right to data portability
  • The right to be forgotten
  • Privacy by design
  • Data breach notification

He also talked about the shared responsibility model when it comes to being GDPR compliant. AWS as the processor of personal data and the company using AWS being the controller are both responsible for making sure data stays safe. GDPR is a hot topic and I guess it will stay so for the rest of the year at least. It’s something that we as engineers will always need to keep in the back of our minds while developing new applications or features.

Serverless

In the afternoon I also attended a talk on Serverless by Prakash Palanisamy (Solutions Architect, Amazon Web Services) and Joachim den Hertog (Solutions Architect, ReSnap / Albelli). This presentation gave a nice overview of Serverless and Step functions, but also showed new improvements like the Serverless Application Repository, save Serverless deployments and incremental deployments. Joachim gave some insights into how Albelli was using Serverless and Machine Learning on the AWS platform for their online photo book creator application called ReSnap.

Unfortunately I had to leave early, so I missed the end of the Serverless talk and the last breakout session, but all in all AWS Summit Benelux was a very nice experience with some interesting customer cases and architectures. For a ‘free’ event it was amazingly organized, I learned some new things and had a chance to speak with some people about how they used AWS. It has triggered me to spend some more time with AWS and its services. Let’s see what interesting things I can do on the next Luminis TechDay.

Build On!


A practical guide to the quirks of Kafka Streams

Kafka Streams is a lightweight library that reads from a Kafka topic, does some processing and writes back to a Kafka topic. It processes messages one by one but is also able to keep some state. Because it leverages the mechanics of Kafka consumer groups, scaling out is easy. Each instance of your Kafka Streams application will take responsibility for a subset of topic partitions and will process a subset of the keys that go through your stream.

Kafka Streams at first glance

One of the first things we need to keep in mind when we start developing with Kafka Streams is that there are 2 different API’s that we can use. There is the high-level DSL (using the KStreamsBuilder) and the low-level Processor API (using the TopologyBuilder). The DSL is easier to use than the Processor API at the cost of flexibility and control over details.

We will begin our journey by learning more about the DSL, knowing that sooner or later we will have to rewrite our application using the Processor API. It doesn’t take long before we stumble upon the word count example.


val source: KStream[String, String] = KStreamBuilderS.stream[String, String]("streams-file-input")
val counts: KTableS[String, Long] = source
  .flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
  .map { (_, value) => (value, value) }
  .groupByKey
  .count("Counts")

Easy! Let’s just skip the documentation and finish our application!

Our first Kafka Streams application

Unfortunately, we soon return to the documentation because each aggregation on our KStream seems to return a KTable and we want to learn more about this stream/table duality. Aggregations also allow for windowing, so we continue reading about windowing. Now that we know something about the theoretical context, we return to our code. For our use case, we need 1 aggregate result for each window. However, we soon discover that each input message results in an output message on our output topic. This means that all the intermediate aggregates are spammed on the output topic. This is our first disappointment.

A simple join

The DSL has all the usual suspects like filter, map, and flatMap. But even though join is also one of those usual suspects that we have done a thousand times, it would be best to read the documentation on join semantics before trying out some code. For in Kafka Streams, there are a bit more choices involved in joining, due to the stream/table duality. But whatever our join looks like, we should know something about Kafka’s partitioning. Joins can be done most efficiently when the value that you want to join on, is the key of both streams and both source topics have the same number of partitions. If this is the case, the streams are co-partitioned, which means that each task that is created by the application instances, can be assigned to one (co-)partition where it will find all the data it needs for doing the join in one place.

State

Wherever there are aggregations, windows or joins, there is state involved. KS will create a new RocksBD StateStore for each window in order to store the messages that fall within that window. Unfortunately, we would like to have a lot of windows in our application, and since we also have about 500,000 different keys, we soon discover that this quickly grows out of hands. After having turned the documentation inside out, we learn that each one of those stores has a cache size of 100 MB by default. But even after we change this to 0, our KS application is too state-heavy.

Interactive queries

The same StateStores that allow us to do joining and aggregating also allows us to keep a materialized view of a Kafka topic. The data in the topic will be stored by the application. If the topic is not already compacted, the local key-value-store will compact your data locally. The data will constantly be updated from the topic it is built from.

The store can be scaled out by running an extra instance of our application. Partitions will be divided among tasks, and tasks will be divided among instances, which results in each instance holding a subset of the data. This can lead to the situation where an instance is queried for a key that is contained in another instance. The queried instance may not be able to provide the value corresponding to the key, it knows, however, which other instance does hold the key. So we can relay the query. The downside is that we have to implement this API by ourselves.

In order for this to work, we need an extra configuration element

    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s"${settings.Streams.host}:${settings.Streams.port}")

Testing our app

A popular library for unit-testing Kafka Streams apps seems to be MockedStreams. However, not all topologies can be successfully tested with MockedStreams. But we can skip unit testing; integration tests are more important, anyway. Should we try using some EmbeddedKafka or spin up docker containers with docker-it? In the future, testing Kafka Streams apps will hopefully be easier (https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams).

When testing our topology, we wonder why our stream is not behaving like we would expect. After a while, we start banging our heads against a wall. Then we turn the documentation inside out again and we find some settings that may be helpful.

    p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
    p.put(ProducerConfig.BATCH_SIZE_CONFIG, "1")

Our second Kafka Streams application

Then the inevitable happens: we need to do something that is not covered by the DSL; we need to resort to the Processor API. This would look a bit like the code below.

    val builder: TopologyBuilder = new KStreamBuilder()
    builder
      .addSource("in", keySerde.deserializer(), valueSerde.deserializer(), settings.Streams.inputTopic)
      .addProcessor("myProcessor", new MyProcessorSupplier(), "in")
      .addSink("out", settings.Streams.outputTopic, keySerde.serializer(), valueSerde.serializer(), "myProcessor")

The Processor interface lets us implement a process method that is called for each message, as well as a punctuate method that can be scheduled to run periodically. Inside these methods, we can use ProcessorContext.forward to forward messsages down the topology graph.

Periodically, in Kafka 0.11, means stream-time which means punctuate will be triggered by the first message that comes along after the method is scheduled to run. In our case we want to use wallclock-time, so we use the ScheduledThreadPoolExecutor to do our own scheduling. But if we do this, the ProcessorContext might have moved to a different node in our topology and the forward method will have unexpected behavior. The workaround for this is to get hold of the current node object and pass it along to our scheduled code.

val currentNode = context.asInstanceOf[ProcessorContextImpl].currentNode().asInstanceOf[ProcessorNode[MyKey, MyValue]]

In Kafka 1.0 a PunctuationType was introduced to make it possible to choose between wallclock-time and stream-time.

Conclusion

By the time we had finished our application, we had re-written it a few times, seen all parts of the documentation more often than we would like and searched through the KS source code. We were unfamiliar with all the settings and unaware of all the places where messages could be cached, delayed, postponed or dropped, and at certain moments we started doubting our notion of time.

In retrospect, we should have kept things simple. Don’t use too much state. Don’t think a clever Processor will bend KS’s quirks in our favor. Don’t waste too much code on workarounds, because before you know it there will be a new version released that will break the API or obsolete our workarounds.

And for those of you wanting to write an article or blog-post on Kafka Streams, make sure to finish it before the new release gets out.

Resoures

  • https://docs.confluent.io/current/streams/
  • http://www.bigendiandata.com/2016-12-05-Data-Types-Compared/
  • https://www.confluent.io/blog/avro-kafka-data/
  • https://github.com/confluentinc/kafka-streams-examples
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics