Tag Archive: Big Data



How to bring your python machine learning model in production using RESTful APIs

In recent years there is much attention to bringing machine learning models in production whereas up to a few years ago the results of machine learning came into slides or some dashboards.
Bringing machine learning in production is important to integrate the outputs of machine learning with other systems.

What does “bring in production” mean?

To bring a machine learning in production means to run the machine learning model more often and integrate and use the model’s output in other systems.

There are some ways that you can bring your machine learning models in production, such as:

  1. To build a web-service around it and use it in real time (API calls, microservice architecture)
  2. Schedule your code to be run regularly (such as Oozie and Airflow)
  3. Stream analytics (such as Spark Streams) for lambda/kappa architect

The focus of this article is to discuss the first way of going on production. This method is usually used in web-based environments and microservices architect.

Python and modeling

In this article, we build up a sample machine learning model for Online Shoppers’ Purchasing Intention Dataset Data Set, available and discussed in https://bit.ly/2UnSeRX.
In the below you can find the code for the data preparation and modeling:

  

import pandas as pd # to use dataframes and etc. 
import numpy as np #for arithmatic operationms
from time import strptime #to convert month abbrivations to numeric values
from sklearn.model_selection import train_test_split #to split up the samples
from sklearn.tree import DecisionTreeRegressor #ression tree model
from sklearn.metrics import confusion_matrix # to check the confusion matrix and evaluate the accuracy

#reading the data
dataset=pd.read_csv(".../online_shoppers_intention.csv")

#preparing for split
df=dataset.drop("Revenue",axis=1)
y=dataset["Revenue"].map(lambda x: int(x))
X_train, X_test, y_train, y_test = train_test_split(df, y, test_size=0.2)

#making a copy (to be able to change the values)
X_train=X_train.copy()
X_test=X_test.copy()

#data prep phase
def dataPrep(localData):
    #The problem is "June"is the full month name and the rest abbriviation --> we turn all in abbr.
    localData["Month"]= localData["Month"].map(lambda x: str.replace(x,"June","Jun"))
    # Our model doesn't ingest text, we transform it to int
    localData["Month"]= localData["Month"].map(lambda x: strptime(x,"%b").tm_mon)
    # The weeknd should also be turned into int
    localData["Weekend"]= localData["Weekend"].map(lambda x: int(x))
    # turning string to int
    localData["VisitorType"] = localData["VisitorType"].astype('category').cat.codes
    return localData

#Sending the data through data prep phase
X_train=dataPrep(X_train)
X_test=dataPrep(X_test)
#define the regression tree
regr_tree = DecisionTreeRegressor(max_depth=200)
#fiting the tree with the training data
regr_tree.fit(X_train, y_train)
# running the predictions
pred=regr_tree.predict(X_test)
# looking at the confusion matrix
confusion_matrix(y_test,pred)


For data scientists, the above code should be very familiar. We read the data, do very few data wranglings and model it with decision trees.

Save the model

The next step which does not appear in data scientists workflow regularly is to save the model on hard-drive. This step is necessary if you bring your python code in production.
In below you can see how “joblib” can be of assistant to do this:

  

from sklearn.externals import joblib
joblib.dump(regr_tree, '.../model3.pkl')


Build your Flask back-end

If you are not familiar with building back-end programs and RESTful APIs, I highly recommend reading https://bit.ly/2AVTwxW and other related materials. But in short Web Services, and RESTful APIs are servers provide functions (on the server). The application can remotely call those function and get the outputs back. In our example, we call our machine learning model from anywhere through internet and TCPIP protocol. Once the model is called with the data, the result of classification is back to the client or the computer which has already called the machine learning model.
Discussing details about web-services and web APIs are beyond the scope of this article but you can find many interesting articles on this by some internet search.
In below we use Flask to build the webservice around the machine learning model.

  

from flask import Flask, request
from flask_restful import Resource, Api
from sklearn.externals import joblib
import pandas as pd


app = Flask(__name__)
api = Api(app)



class Classify(Resource):
    def get(self): # get is the right http verb because it caches the outputs and is faster in general
        data = request.get_json() # greading the data
        data1 = pd.DataFrame.from_dict(data,orient='index') # converting data into  DataFrame (as our technique does not ingest json)
        data1=data1.transpose() # once Json is converted to DataFrame is not columnar, we need to convert it to columnar
        model = joblib.load('../model3.pkl') # loading the model from disk
        result = list(model.predict(data1)) # conversion to list because numpy.ndarray cannot be jsonified
        return result # returning the result of classification 



api.add_resource(Classify, '/classify')

app.run(port=5001)


Test your API

You can use various techniques to test if the back-end works. I use Postman software to test if the API is working.
You need to consider that we made a GET request in our Flask application. The motivation behind choosing GET request is the ability of the webservers to cash the results helping with the speed of the webservice.
Another consideration is we send the data in JSON format (in the format after data preparation phase) for the call and the results are back also in JSON.

  

{
    "Administrative":0,
    "Administrative_Duration": 0.0,
    "Informational": 0,
    "Informational_Duration": 0.0,
    "ProductRelated": 1,
    "ProductRelated_Duration": 0.0,
    "BounceRates": 0.2,
    "ExitRates": 0.2,
    "PageValues": 0.0,
    "SpecialDay": 0.0,
    "Month": 5,
    "OperatingSystems": 2,
    "Browser": 10,
    "Region": 5,
    "TrafficType": 1,
    "VisitorType": 87093223,
    "Weekend":0
}


Microservices architect:

I personally like to bring machine learning in production using RESTful APIs and the motivation behind it is because of microservices architect. The microservices architect lets developers to build up loosely coupled services and enables continuous delivery and deployment.

Scaling up

To scale up your webservice there are many choices of which I would recommend load balancing using Kubernetes


Setting up data analytics pipeline: the best practices

The picture is courtesy of https://bit.ly/2K44Nk5 1 Datapipeline Architect Example

In a data science analogy with the automotive industry, the data plays the role of the raw-oil which is not yet ready for combustion. The data modeling phase is comparable with combustion in the engines and data preparation is the refinery process turning raw-oil to the fuel i.e., ready for combustion. In this analogy data analytics pipeline includes all the steps from extracting the oil up to combustion, driving and reaching to the destination (analogous to reach the business goals). As you can imagine, the data (or oil in this analogy) goes through a various transformation and goes from one stage of the process to another. But the question is what is the best practice in terms of data format and tooling? Although there are many tools that make the best practice sometimes very use-case specific but generally JSON is the best practice for the data-format of communication or the lingua franca and Python is the best practice for orchestration, data preparation, analytics and live production.

What is the common inefficiency and why it happens?

The current inefficiency is overusing of tabular (csv-like) data-formats for communication or lingua franca. I believe data scientists still overuse the structured data types for communication within data analytics pipeline because of standard data-frame-like data formats offered by major analytic tools such as Python and R. Data scientists start getting used to data-frame mentality forgetting the fact that tabular storage of the data is a low scale solution, not optimized for communication and when it comes to bigger sets of data or flexibility to add new fields to the data, data-frames and their tabular form are non-efficient.

DataOps Pipeline and Data Analytics

A very important aspect for analytics being ignored in some circumstances is going live and getting integrated with other systems. DataOps is about setting up a set of tools from capturing data, storing them up to analytics and integration, falling into an interdisciplinary realm of the DevOps, Data Engineering, Analytics and Software Engineering (Hereinafter I use data analytics pipeline and DataOps pipeline interchangeably.) The modeling part and probably some parts in data prep phases need a data-frame like data format but the rest of the pipeline is more efficient and robust if is JSON native. It allows adding/removing features easier and is a compact form for communication between modules.

The picture is courtesy of https://zalando-jobsite.cdn.prismic.io/zalando-jobsite/2ed778169b702ca83c2505ceb65424d748351109_image_5-0d8e25c02668e476dd491d457f605d89.jpg 2

The role of Python

Python is a great programming language used not only by the scientific community but also the application developers. It is ready to be used as back-end and by combining it with Django you can build up full-stack web applications. Python has almost everything you need to set up a DataOps pipeline and is ready for integration and live production.

Python Example: transforming CSV to JSON and storing it in MongoDB

To show some capabilities of Python in combination with JSON, I have brought a simple example. In this example, a dataframe is converted to JSON (Python dictionaries) and is stored in MongoDB. MongoDB is an important database in today’s data storage as it is JSON native storing data in a document format bringing high flexibility .

<br />### Loading packages

from pymongo import MongoClient import pandas as pd

# Connecting to the database

client = MongoClient('localhost', 27017)

# Creating database and schema

db = client.pymongo_test posts = db.posts

# Defining a dummy dataframe

df = pd.DataFrame({'col1': [1, 2], 'col2': [0.5, 0.75]}, index=['a', 'b'])

# Transforming dataframe to a dictionary (JSON)

dic=df.to_dict()

# Writing to the database

result = posts.insert_one(dic) print('One post: {0}'.format(result.inserted_id))

The above example shows the ability of python in data transformation from dataframe to JSON and its ability to connect to various tooling (MongoDB in this example) in DataOps pipeline.

Recap

This article is an extension to my previous article on future of data science (https://bit.ly/2sz8EdM). In my earlier article, I have sketched the future of data science and have recommended data scientists to go towards full-stack. Once you have a full stack and various layers for DataOps / data analytics JSON is the lingua franca between modules bringing robustness and flexibility for this communication and Python is the orchestrator of various tools and techniques in this pipeline.


1: The picture is courtesy of https://cdn-images-1.medium.com/max/1600/1*8-NNHZhRVb5EPHK5iin92Q.png 2: The picture is courtesy of https://zalando-jobsite.cdn.prismic.io/zalando-jobsite/2ed778169b702ca83c2505ceb65424d748351109_image_5-0d8e25c02668e476dd491d457f605d89.jpg


A practical guide to the quirks of Kafka Streams

Kafka Streams is a lightweight library that reads from a Kafka topic, does some processing and writes back to a Kafka topic. It processes messages one by one but is also able to keep some state. Because it leverages the mechanics of Kafka consumer groups, scaling out is easy. Each instance of your Kafka Streams application will take responsibility for a subset of topic partitions and will process a subset of the keys that go through your stream.

Kafka Streams at first glance

One of the first things we need to keep in mind when we start developing with Kafka Streams is that there are 2 different API’s that we can use. There is the high-level DSL (using the KStreamsBuilder) and the low-level Processor API (using the TopologyBuilder). The DSL is easier to use than the Processor API at the cost of flexibility and control over details.

We will begin our journey by learning more about the DSL, knowing that sooner or later we will have to rewrite our application using the Processor API. It doesn’t take long before we stumble upon the word count example.


val source: KStream[String, String] = KStreamBuilderS.stream[String, String]("streams-file-input")
val counts: KTableS[String, Long] = source
  .flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
  .map { (_, value) => (value, value) }
  .groupByKey
  .count("Counts")

Easy! Let’s just skip the documentation and finish our application!

Our first Kafka Streams application

Unfortunately, we soon return to the documentation because each aggregation on our KStream seems to return a KTable and we want to learn more about this stream/table duality. Aggregations also allow for windowing, so we continue reading about windowing. Now that we know something about the theoretical context, we return to our code. For our use case, we need 1 aggregate result for each window. However, we soon discover that each input message results in an output message on our output topic. This means that all the intermediate aggregates are spammed on the output topic. This is our first disappointment.

A simple join

The DSL has all the usual suspects like filter, map, and flatMap. But even though join is also one of those usual suspects that we have done a thousand times, it would be best to read the documentation on join semantics before trying out some code. For in Kafka Streams, there are a bit more choices involved in joining, due to the stream/table duality. But whatever our join looks like, we should know something about Kafka’s partitioning. Joins can be done most efficiently when the value that you want to join on, is the key of both streams and both source topics have the same number of partitions. If this is the case, the streams are co-partitioned, which means that each task that is created by the application instances, can be assigned to one (co-)partition where it will find all the data it needs for doing the join in one place.

State

Wherever there are aggregations, windows or joins, there is state involved. KS will create a new RocksBD StateStore for each window in order to store the messages that fall within that window. Unfortunately, we would like to have a lot of windows in our application, and since we also have about 500,000 different keys, we soon discover that this quickly grows out of hands. After having turned the documentation inside out, we learn that each one of those stores has a cache size of 100 MB by default. But even after we change this to 0, our KS application is too state-heavy.

Interactive queries

The same StateStores that allow us to do joining and aggregating also allows us to keep a materialized view of a Kafka topic. The data in the topic will be stored by the application. If the topic is not already compacted, the local key-value-store will compact your data locally. The data will constantly be updated from the topic it is built from.

The store can be scaled out by running an extra instance of our application. Partitions will be divided among tasks, and tasks will be divided among instances, which results in each instance holding a subset of the data. This can lead to the situation where an instance is queried for a key that is contained in another instance. The queried instance may not be able to provide the value corresponding to the key, it knows, however, which other instance does hold the key. So we can relay the query. The downside is that we have to implement this API by ourselves.

In order for this to work, we need an extra configuration element

    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s"${settings.Streams.host}:${settings.Streams.port}")

Testing our app

A popular library for unit-testing Kafka Streams apps seems to be MockedStreams. However, not all topologies can be successfully tested with MockedStreams. But we can skip unit testing; integration tests are more important, anyway. Should we try using some EmbeddedKafka or spin up docker containers with docker-it? In the future, testing Kafka Streams apps will hopefully be easier (https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams).

When testing our topology, we wonder why our stream is not behaving like we would expect. After a while, we start banging our heads against a wall. Then we turn the documentation inside out again and we find some settings that may be helpful.

    p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
    p.put(ProducerConfig.BATCH_SIZE_CONFIG, "1")

Our second Kafka Streams application

Then the inevitable happens: we need to do something that is not covered by the DSL; we need to resort to the Processor API. This would look a bit like the code below.

    val builder: TopologyBuilder = new KStreamBuilder()
    builder
      .addSource("in", keySerde.deserializer(), valueSerde.deserializer(), settings.Streams.inputTopic)
      .addProcessor("myProcessor", new MyProcessorSupplier(), "in")
      .addSink("out", settings.Streams.outputTopic, keySerde.serializer(), valueSerde.serializer(), "myProcessor")

The Processor interface lets us implement a process method that is called for each message, as well as a punctuate method that can be scheduled to run periodically. Inside these methods, we can use ProcessorContext.forward to forward messsages down the topology graph.

Periodically, in Kafka 0.11, means stream-time which means punctuate will be triggered by the first message that comes along after the method is scheduled to run. In our case we want to use wallclock-time, so we use the ScheduledThreadPoolExecutor to do our own scheduling. But if we do this, the ProcessorContext might have moved to a different node in our topology and the forward method will have unexpected behavior. The workaround for this is to get hold of the current node object and pass it along to our scheduled code.

val currentNode = context.asInstanceOf[ProcessorContextImpl].currentNode().asInstanceOf[ProcessorNode[MyKey, MyValue]]

In Kafka 1.0 a PunctuationType was introduced to make it possible to choose between wallclock-time and stream-time.

Conclusion

By the time we had finished our application, we had re-written it a few times, seen all parts of the documentation more often than we would like and searched through the KS source code. We were unfamiliar with all the settings and unaware of all the places where messages could be cached, delayed, postponed or dropped, and at certain moments we started doubting our notion of time.

In retrospect, we should have kept things simple. Don’t use too much state. Don’t think a clever Processor will bend KS’s quirks in our favor. Don’t waste too much code on workarounds, because before you know it there will be a new version released that will break the API or obsolete our workarounds.

And for those of you wanting to write an article or blog-post on Kafka Streams, make sure to finish it before the new release gets out.

Resoures

  • https://docs.confluent.io/current/streams/
  • http://www.bigendiandata.com/2016-12-05-Data-Types-Compared/
  • https://www.confluent.io/blog/avro-kafka-data/
  • https://github.com/confluentinc/kafka-streams-examples
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

JumpStarting Stream Processing with Kafka and Flink

In last few months, I have come across the topic of “Stream Processing” so much that it had become hard to ignore and definitely seemed like a new wave of Big Data technologies. We are living in the world of innovation and rapid changes thus I was cautious about investing my time in every new hype. After going through few articles, attending meetups and listening to some nice presentations from conferences, I am convinced that Stream processing is here to stay and being rapidly adopted by the industry. In this blog, I am presenting a summary of my understanding of the de-facto tools used in stream processing.

Big Data Infrastructure since the advent of  Hadoop stack hadn’t undergone a major change until very recently with new technology trends like Microservices, Internet of Things & Data localization in Apps has led to infrastructure requirements that should support processing and aggregation of real-time events and various consumer processes with the capability to handle those events at a high throughput . In last few years, there were mainly 2 patterns of programming

  • Request/Response
  • Batch

Request response is the traditional method  wherein the server gets a single request from the customer and the customer gets a response which might or might not be trivial depending on the backend of the application.  Then, came the need for processing of log files using map/reduce and coming up with intelligent analytics which gave rise to Hadoop stack based batch processing wherein the Hadoop jobs ran for hours or sometimes overnight to provide insights into log data and user activity.

In last, couple of years, there arose a need to develop frameworks which can perform real-time analytics on events coming from various devices or microservices or click stream from a user group (based on geography etc.) . This led to the development of in-house tools and frameworks by companies such as LinkedIn, Twitter, and Google for stream analytics. But, these tools were less mature as compared to their Batch/Hadoop counterparts. Also, there wasn’t much industry consensus around tools and technologies for stream processing. Thus, the situation was something like this 

Stream_Gap                             Fig 1 . From the talk “Apache Kafka and the Next 700 Stream Processing Systems” by Jay Kreps [1]

Hence, for the developer community, it was hard to get their hands on mature tools which could be used for stream processing. Then few years ago, Linkedin open-sourced Apache Kafka, which is a high throughput messaging system which led to rapid innovation in this field. Kafka is now becoming the backbone of streaming pipeline and being adopted rapidly [2].  Let’s look at few of the key concept of Apache Kafka.

Apache Kafka

Kafka is a messaging system built for scalability and high throughput, sounds like JMS ? Kafka comes with a few big advantages over traditional JMS. I would choose Kafka over traditional JMS queueing solution if I have to persist my messages for longer periods of times (upto days). Another reason is that, if I have to support multiple consumers that would be reading the messaging at their own pace as different Datastores have different write speeds depending upon their core functionality. Consumers, can also “replay” messages if they want and if they are down then they can start from the point where they left off once they are up again. Thus, these features make Kafka highly usable in an environment where heterogeneous frameworks, libraries are involved as each can consume messages based on a polling model at a throughput which they can handle which isn’t possible with traditional messaging systems.  Let’s have a closer look at why Kafka scales so well –

Topic Partition subsets

Fig 2 – A Topic along with it’s subset partitions

In Kafka, a Topic is an important abstraction which represents logical stream of data, this stream of data achieves parallelism when it’s partitioned in parallel streams which are called partitions. The messages/events in these partitions are identified by a unique Id or message offset which represents increasing timestamp within that partition. This image from the official docs [3] further elaborates on this  –

 

Anatomy of a Topic

                                                                Fig 3 – From the official Kafka docs[3], Anatomy of a Topic

Thus, we can see that messages are written in the sequential order and the consumer also consumes the messages sequentially.

Let’s look at the overall architecture of Kafka.

Broker Consumer Grouop

                                                                     Fig 4 – Internal Architecture of Kafka

Brokers are part of the Kafka cluster, a topic is divided into multiple partitions in order to balance load and also support parallelism.  Each consumer takes part in a consumer group, when we start a new consumer we specify a group label and based on that label grouping takes place. Each message of a topic gets delivered atleast once to a consumer instance of each subscribing group. You can see from the above image that partition 0 is sending the message of consumer 2 only, as both consumer 1 and 2 are part of the same consumer group hence the only one consumer instance of a subscribing group gets the message. If each consumer is in a different group then each message is broadcasted to all groups. Similarly, Consumer group 2 only has one consumer which gets messages from all 3 partitions. The total number of consumers in a group should never be more than total partitions of a subscribed topic. We can have a new 3rd Consumer in 1st group as there are 3 partitions but if we have a 4rth Consumer then it would be idle as there would be more Consumers in a group then number of partitions.

Brokers in Kafka are stateless which basically means that the broker doesn’t keep a record of how much messages a consumer has consumed, the message offset is for the consumer to keep track of not the broker. But this also makes it hard to delete redundant messages thus Kafka solves this problem by using a time-based SLA for the retention policy. Since the messages can stay for a while in Kafka thus the consumer can also rewind to an earlier offset or if a consumer instance crashes then that  partition from which it was reading from is assigned to another instance of the group. This distributed coordination is handled by Zookeeper, it’s responsibilities include keeping track of newly added brokers and consumers, rebalancing partitions- consumer group mapping when new consumers are added or removed and keeping track of the offset consumed in each partition.  Zookeeper maintains two registries ownership registry and an offset registry. The owner registry maintains the partition – consumer group mapping such that each consumer group has it’s own corresponding ownership registry and the offset registry contains the last consumed message offset of each partition.  Thus, with Kafka, it becomes easy to integrate with various frameworks consuming at the throughput which they can handle and is required by the application. Here’s how things would look once you have Kafka integrated with other tools.

Kafka Architecture

                              Fig 5 – From the talk “Demystifying Stream Processing with Apache Kafka”- by Neha Narkhede [4]

So far we have discussed Kafka which is the backbone of your streaming infrastructure, now let’s look at Apache Flink which is a stream processing framework and includes multiple processing libraries to enrich incoming events and messages.

Apache Flink

I heard about Flink very recently and have been really impressed by the traction it’s gaining in the developer community. It’s also one of the most active Apache Big Data projects and Flink meetups are coming up in all major tech cities of the world. Since I have been reading about Stream processing I realized that there are few key features which are important for any good stream processor to support. I was impressed to see that Flink is pretty new yet it supports all these key features, namely –

  • Processing engine with support for Streaming as well as Batch
  • Supporting various windowing paradigms
  • Support for Stateful Streaming
  • Faul Tolerant and high throughput
  • Complex Event Processing (CEP)
  • Backpressure handling
  • Easy Integration with existing Hadoop stack
  • Libraries for doing Machine Learning and graph processing.

The existing Hadoop stack which is good at batch processing already has so many moving parts that trying to configure it for stream processing is a difficult task, since various components like Oozi (job scheduler), HDFS (and flume for data loading), ML and graph libraries, & Batch processing jobs all have to work in perfect harmony. On top of that Hadoop has poor Stream support and no easy way to handle backpressure spikes. This makes Hadoop stack in streaming data processing even harder to use. Let’s take a look high-level view of Flink’s architecture

Flink architecture                                                                                Fig 6 – Flink’s architecture from the official docs [5]

For every submitted program a client is created which does the required pre-processing and turns the program into a parallel dataflow form which is then executed by the TaskManagers and the JobManager . JobManager is the primary coordinator for the whole execution cycle and is responsible for allotting tasks to TaskManager and also for resource management.

Interesting, thing about flink is that it contains so much functionality within it’s own framework that the number of moving parts in the streaming architecture goes down. Here are the  internal Flink components –

Flink stack

                                           Fig 7-  From the talk “Apache Flink: What, How, Why, Who, Where? ” by Slim Baltagi [6]

Flink engine’s which is a Distributed Streaming dataflow engine support both Streaming and Batch processing, along with the ability to support and use existing storage and deployment infrastructure, it supports multiple of domain specific libraries like FLinkML for machine learning, Gelly for graph analysis, Table for SQL, and FlinkCEP for complex event processing. Another interesting aspect of Flink is that existing big data jobs (Hadoop M/R, Cascading, Storm) can be executed on the Flink’s engine by means of an adapter thus this kind of flexibility is something which makes Flink center of the Streaming infrastructure processing.

As discussed above in the key feature list, two important aspects of Streaming supported by Flink are Windowing and Stateful streaming. Windowing is basically the technique of executing aggregates over streams. Windows can be broadly classified into

  • Tumbling windows (no overlap)
  • Sliding windows (with overlap)

The above two concepts can be explained by the following 2 images

Tumbling windows

                             Fig 8 – From the talk “Unified Stream and Batch Processing with Apache Flink” by Ufuk Celebi [7]

 

Sliding windows

                       Fig 8 – From the talk “Unified Stream and Batch Processing with Apache Flink” by Ufuk Celebi [7]

In references, I have provided link to Flink APIs that support stream aggregations i.e. windowing.

Stream processing which supports basic filtering or simple transformation don’t need state but when it comes to more advanced concepts like aggregation on streams (windowing), complex transformation, complex event processing then it becomes necessary to support stateful streaming.

In the recent release of Flink, they have introduced a concept called Savepoints. The Flink task managers regularly create checkpoints of the job’s state being processed and under the hood Savepoints are basically pointers to any of the checkpoints, these Savepoints can be manually triggered and they never expire until discarded by the user. Let’s look at an image for a more clearer understandingSavepoints

                                                             Fig 9 – From the official docs -Savepoints [8]

Here the checkpoints C1 and C3 have been discarded as the checkpoint C4 is the latest checkpoint and all the earlier checkpoints except C2 have been discarded. The reason C2 is still there is because a Savepoint was created when C2 was the latest checkpoint and now that Savepoint has a pointer to the C2. Initially, the job’s state is stored in-memory and then checkpointed into a filesytem (like HDFS etc) and savepoint is basically a url to the HDFS location of the checkpointed state. In order to store a much larger state, Flink team is working towards providing a state backend based on RocksDB.

Here is an overview of a Streaming architecture using Kafka and Flink

Kafka Flink

           Fig 10 – From the talk “Advanced Streaming Analytics with Apache Flink and Apache Kafka” by Stephan Ewen [9]

So far, we have discussed both Flink and Kafka before concluding let’s just go through the Yahoo Benchmark for stream processors [10]

Yahoo_Banchmark

                                   Fig 11 – From the talk “Unified Stream and Batch Processing with Apache Flink” by Ufuk Celebi [7]

The Architecture consisted of Kafka clusters feeding the stream processors and the results of stream transformation were published in Redis and via Redis available to applications outside the architecture. As you can see that even at high throughput Storm and Flink maintained low latency.  This benchmark was further extended by Data Artisans [11], the company behind Flink, they took Yahoo’s benchmark as a starting point and upgraded the Flink’s cluster’s node interconnect to 10GigE from 1 GigE which was used by Yahoo. The results were very interesting as Flink not only outperformed storm but also saturated the Kafka link at around 3 million events/sec.

Conclusion

Stream processing is at an initial yet very interesting phase, and I hope after reading this blog you would give Kafka and Flink a try on your machine. Feel free to share your feedback/comments

References

[1] – https://www.youtube.com/watch?v=9RMOc0SwRro
[2] – http://www.confluent.io/confluent-unveils-next-generation-of-apache-kafka-as-enterprise-adoption-soars
[3] – http://kafka.apache.org/documentation.html
[4] – http://www.infoq.com/presentations/stream-processing-kafka
[5] – https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_general_arch.html
[6] – http://www.slideshare.net/sbaltagi/apacheflinkwhathowwhywhowherebyslimbaltagi-57825047
[7] – https://www.youtube.com/watch?v=8Uh3ycG3Wew
[8] – https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html  

[9]-  http://kafka-summit.org/sessions/advanced-streaming-analytics-with-apache-flink-and-apache-kafka/
[10] – https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
[11] – http://data-artisans.com/extending-the-yahoo-streaming-benchmark/