Kafka Streams is a Java library for analyzing and processing data stored in Apache Kafka.As with any other stream processing platform, it is capable of performing real-time stateful and/or non-stateful data processing. In this post, I will try to describe why achieving high availability (99.99%) is problematic in Kafka Streams and what we can do to achieve it.

What we need to know


Before describing the problem and possible solutions, let's review the basic concepts of Kafka Streams. If you have worked with Kafka APIs for consumer/producers, you are already familiar with most of these paradigms. In the following sections, I will try to describe in a few words data storage in partitions, rebalancing consumer groups, and how basic Kafka client concepts fit into the Kafka Streams library.

Kafka: Data Partitioning


In the Kafka world, producer applications send data in the form of key-value pairs to a particular topic. The topic itself is divided into one or more partitions in Kafka brokers. Kafka uses the message key to specify in which partition the data should be written. Consequently, messages with the same key always end up in the same partition.

Consumer applications are organized into consumer groups, and each group may have one or more consumer instances.


The concumer instances are essentially a means of scaling the processing in your concumer group.

Kafka: Rebalancing the Consumer Group


As we said earlier, each instance of a consumer group gets a set of unique partitions from which it consumes data. Whenever a new consumer joins the group, a rebalancing must take place in order for it to receive a partition. The same happens when a consumer dies, the other consumers must pick up his partitions to ensure that all partitions are processed.

Kafka Streams


At the beginning of this post, we have learned that the Kafka Streams library is built on the basis of the APIs of Producers and Consumers and the data processing is organized in exactly the same way as the standard solution on Kafka. In the Kafka Streams configuration the field application.id is equivalent to group.id in the consumer API. Kafka Streams pre-creates a certain number of threads and each of them handles data from one or more batches of input tops. Speaking in consumer API terminology, streams are essentially the same as consumer instances from the same group. Streams are the primary way to scale data processing in Kafka Streams, this can be done vertically by increasing the number of threads for each Kafka Streams application on a single machine, or horizontally by adding an additional machine with the same application.id.

Kafka Streams: State Storage


In stream processing, there are operations with and without storing state. State is what allows the application to remember necessary information beyond the record currently being processed.

State operations, such as count, any type of aggregation, joins, etc., are much more complicated. This is because having only one record you can't determine the last state (say, count) for a given key, so you need to store the state of your thread in your application. As we discussed earlier, each thread processes a set of unique partitions, hence a thread only processes a subset of the entire dataset. This means that each Kafka Streams application thread with the same application.id maintains its own isolated state. We will not go into details about how the state is generated in Kafka Streams, but it is important to understand that the states are restored using a change-log topik (change-log topik) and are saved not only on the local disk, but also in the Kafka Broker. Saving the change-log as a separate topic in Kafka Broker is not only for fault tolerance, but also so that you can easily deploy new instances of Kafka Streams with the same application.id. Since the state is stored as a change-log topic on the broker side, a new instance can load its own state from that topic.

Why is ensuring high availability problematic with Kafka Streams?


We've covered the basic concepts and principles of data processing with Kafka Streams. Now let's try to put all the pieces together and analyze why achieving high availability can be problematic. From the previous sections, we should remember :

  1. The data in the Kafka topper is divided into partitions, which are distributed between Kafka Streams.
  2. Kafka Streams applications with the same application.id are essentially one consumer group, and each of its threads represents a separate isolated instance of the consumer.
  3. For state operations, a thread maintains its own state, which is "reserved" by the Kafka topic as a change log.
  4. When a new consumer instance joins or leaves the group, Kafka triggers rebalancing and event processing stops until rebalancing is complete.

 

TransferWise SPaaS (Stream Processing as a Service)


Before I cover the gist of this post, let me first tell you what we've created at TransferWise and why high availability is very important to us.

In TransferWise, we have multiple nodes running for streaming processing, and each node contains multiple instances of Kafka Streams for each product team. Kafka Streams instances that are targeted to a specific development team have a special application.id and typically have more than 5 threads. In general, teams typically have 10-20 threads (equivalent to the number of consumer instances) across the cluster. Applications that are deployed on nodes listen for input tops and perform several types of stateful and/or non-stateful operations on the input data and provide real-time data updates to downstream microservices.

Product teams need real-time updates to aggregate data. This is necessary in order to give our customers the ability to transfer money instantly. Our usual SLA is:


To give you an idea, during stress testing, the Kafka Streams application was able to process and aggregate 20,085 input messages per second. So a 10 second SLA under normal load sounded quite achievable. Unfortunately, our SLA was not achieved while performing rolling updates on the nodes where the applications are deployed, and I will describe below why this was the case.

Sliding node update


At TransferWise, we strongly believe in continuous delivery of our software and usually release new versions of our services a couple of times a day. Let's look at an example of a simple continuous service update and see what happens in the release process. Again, we have to remember that :

  1. The data in the Kafka topper is divided into partitions, which are distributed between Kafka Streams.
  2. Kafka Streams applications with the same application.id are essentially one consumer group, and each of its threads represents a separate isolated instance of the consumer.
  3. For state operations, a thread maintains its own state, which is "reserved" by the Kafka topic as a change log.
  4. When a new consumer instance joins or leaves the group, Kafka triggers rebalancing and event processing stops until rebalancing is complete.


The release process on a single node typically takes eight to nine seconds. During the release, instances of Kafka Streams on a node are "soft reset". Thus, for a single node, the time required to properly reboot the service is approximately eight to nine seconds. Obviously, shutting down a Kafka Streams instance on a node causes a rebalancing of the consumer group. Since the data is partitioned, all partitions that belonged to the parsed instance must be distributed among the active Kafka Streams applications with the same application.id. This also applies to aggregated data that has been stored on disk. Until this process is completed, the data will not be processed.

Standby replicas


To reduce rebalancing time for Kafka Streams applications, there is the concept of standby replicas, which are defined in config as num.standby.replicas. Standby replicas are copies of the local state repository. This mechanism makes it possible to replicate the state store from one Kafka Streams instance to another. When a Kafka Streams thread dies for whatever reason, the duration of the state recovery process can be minimized. Unfortunately, for reasons I'll explain below, even backup replicas won't help with sliding service updates.

Suppose we have two instances of Kafka Streams on two different machines: node-a and node-b. For each of the Kafka Streams instances on those 2 nodes num.standby.replicas = 1. With this configuration, each Kafka Streams instance maintains its own copy of the storage on the other node. During a rolling update we have the following situation :

  1. A new version of the service has been deployed on node-a.
  2. Kafka Streams instance on node-a is disconnected.
  3. Rebalancing has begun.
  4. The storage from node-a has already been replicated to node-b, since we specified the configuration num.standby.replicas = 1.
  5. node-b already has a shadow copy of node-a, so the rebalancing process is almost instantaneous.
  6. The node-a starts up again.
  7. node-a joins the consumer group.
  8. The Kafka broker sees a new instance of Kafka Streams and starts rebalancing.


As we can see, num.standby.replicas only helps in scenarios where the node is completely down. This means that if node-a crashed, node-b could continue working correctly almost immediately. But in a rolling update situation, node-a would rejoin the group after the shutdown, and this last step would trigger rebalancing. When the node-a joins the consumer group after a reboot, it will be treated as a new instance of the consumer. Again, we must remember that real-time processing stops until the new instance recovers state from the change-log topic.
Note that rebalancing partitions while a new instance joins the group, does not apply to the Kafka Streams API, as this is how the Apache Kafka consumer group protocol works.

Goal Achievement : High Availability with Kafka Streams


Although Kafka client libraries do not provide built-in functionality for the problem mentioned above, there are some techniques that can be used to achieve high cluster availability during a rolling update. The idea behind redundant replicas remains valid, and having redundant machines available when the time is right is a good solution that we use to ensure high availability in the event of instance failures.

The problem with our original setup was that we had one consumer group for all commands on all nodes. Now instead of one consumer group we have two, and the second one acts as a "hot" cluster. In the prod, the nodes have a special variable CLUSTER_ID, which is added to the application.id of the Kafka Streams instances. Here is an example of the Spring Boot application.ymlconfiguration: application.yml spring.profiles: production
streaming-pipelines:
team-a-stream-app-id: "${CLUSTER_ID}-team-a-stream-app"
team-b-stream-app-id: "${CLUSTER_ID}-team-b-stream-app"

At one point in time, only one of the clusters is active, so the standby cluster does not send real-time messages to the downstream microservices. At release time, the standby cluster becomes active, allowing a rolling update to be performed on the first cluster. Because it's a completely different group of consumers, our clients don't even notice any disruption in processing, and subsequent services continue to receive messages from the newly active cluster. One obvious disadvantage of using a redundant group of consumers is the additional overhead and resource consumption, but nevertheless, this architecture provides additional assurance, control, and fault tolerance to our streaming processing system.

In addition to adding an extra cluster, there are other techniques to mitigate the problem of frequent rebalancing.

Increase group.initial.rebalance.delay.ms


Since Kafka 0.11.0.0 the group.initial.rebalance.delay.ms configuration has been added. According to the documentation, this configuration is responsible for :


For example, if we set 60000 milliseconds in this setting, we may have a one-minute window for a release during a rolling update. If the Kafka Streams instance successfully "restarts" in this time window, rebalancing will not be called. Note that the data for which the restarted Kafka Streams instance was responsible will still be unavailable until the node returns to operational mode. Let's say that if it takes about eight seconds to reboot an instance, you will have eight seconds of downtime for the data for which that instance is responsible.

Note, the main disadvantage of this concept is that if a node fails, you get an extra one minute delay in recovering from the current configuration.

Reducing segment size in change-log tops


The large delay in rebalancing Kafka Stream is related to restoring state stores from change-log tops. Change-log tops are compressed tops, which allows you to store the last record on a particular key in the top. I will briefly describe this concept below.

The tops in Kafka Broker are organized as segments. When a segment reaches the configured threshold size, a new segment is created and the previous segment is compacted. By default, this threshold is set to 1 GB. As you may know, the basic data structure underlying Kafka tops and their partitions is a preemptive log structure, which means that when messages are sent to a topic, they are always added to the last "active" segment and no compaction occurs.
So most of the storage states saved in changelog are always in the "active segment" file and never compacted, resulting in millions of uncompacted changelog messages. For Kafka Streams, this means that during rebalancing, when a Kafka Streams instance restores its state from the changelog topic, it needs to read many redundant entries from the changelog topic. Given that state stores only care about the last state, not the history, this processing time is wasted. Reducing the segment size will cause more aggressive data compression, so new Kafka Streams application instances can recover state much faster.

Conclusion


Even though Kafka Streams does not provide a built-in capability to provide high availability during rolling service updates, it can still be done at the infrastructure level. We must remember that Kafka Streams is not a "cluster framework" unlike Apache Flink or Apache Spark. It is a lightweight Java-based library that allows developers to build easily scalable streaming applications. Despite this, it provides the necessary building blocks to achieve ambitious streaming processing goals such as "99.99%" availability.