Concurrency and At Least Once Semantics with the New Kafka Consumer

Kanak BiscuitwalaJune 8, 2016

This post is the first in a series that describes our experience in adopting Apache Kafka.

Here at Sift Science, we have introduced Kafka as the messaging layer between our distributed services. In particular, we treat Kafka topics as message queues, which source services produce to and target services consume from. For example, if we receive an API call from a customer indicating that one of their users is a bad actor, we enqueue information about that user so our machine learning service can update our data models.

Since we’re working with customer data – and intending to use messaging as a source of truth with respect to the data we receive – we must build at least once (ALO) semantics on top of Kafka’s consumer interfaces, to ensure that we’re not dropping any data. We also aim to process as many incoming events as quickly as we can. Given that there is a relatively large amount of work required to process each of our messages, we have a strong requirement to process messages in parallel. The following is a collection of steps we’ve taken and lessons we’ve learned in order to enable multithreaded ALO consumption with the new KafkaConsumer API.

Thread Safety

Out of the box, KafkaConsumer can only be safely used by a single thread at a time. Because our servers have sufficient resources to handle multiple messages at once, we needed to wrap the consumer API in a way that allowed a thread pool to process Kafka-consumed messages in parallel. Essentially, what we do now is maintain an in-memory concurrent buffer for messages polled from Kafka; and each thread either consumes from that buffer or asks KafkaConsumer to refill the buffer, and then consumes a message from the refilled buffer.

Generally speaking, we’ve opted for coarse-grained locking when talking to KafkaConsumer, since we expect that the most common case is direct consumption of messages from the concurrent buffer, and any overhead from communicating with Kafka brokers will outweigh concurrency overhead.

Our buffer design is actually multiple buffers, one per partition. This allows us to quickly dump pending messages when Kafka revokes assigned partitions. Messages are retrieved in a round-robin scheme, one partition at a time. This ensures that we don’t starve any single partition. Below is a rough diagram of the mechanism.

kafka image 1

We poll from Kafka, insert into sub-buffers based on partition, and then deliver messages to a thread pool using a round-robin scheme over partitions.

 

Offset Management

In order to provide ALO guarantees, we need to manage committing Kafka offsets ourselves. By default, Kafka will commit the newest consumed offset every 10 seconds. However, such a commit strategy provides no guarantees about whether all of those committed offsets were actually processed. Thus, using Kafka’s default may potentially lead to data loss. Because our customers rely on our ability to ingest data and use it to provide actionable insights, any loss of data is unacceptable. Consequently, we’ve created an offset manager that can always answer the following question: for each partition, what is the newest offset such that all preceding offsets have been processed? This response is the precise offset that we need to report to Kafka to direct or reset the consumer group if the current consumer disconnects.

Here’s an example: Let’s say we’ve been assigned two partitions, P0 and P1. After polling Kafka, our buffer contains the following partition-offset pairs: (P0, 0), (P1, 0), and (P1, 1). Our convention (and Kafka’s) is that the higher offset is the newer message. Now, our thread pool starts working on each of the three messages, so all are in-flight and none is complete:

kafka image 2

Now let’s say that (P0, 0) completes. That also allows us to commit (P0, 0) since there are no older in-flight offsets for P0.

kafka image 3

Next, (P1, 1) completes. We can’t yet commit any offsets to Kafka because (P1, 0) is still in flight. OffsetManager is responsible for saying that nothing new is safe to commit yet.

kafka image 4

Finally (P1, 0) completes. It’s now safe to tell Kafka to commit (P1, 1) as the newest offset such that all preceding offsets are complete. OffsetManager, again, will indicate that 1 is the correct offset.

kafka image 5

Heartbeating

As of Kafka 0.9, calling poll (  )[ref]Kafka 0.10 also supports heartbeating during commits.[/ref] is the only way to tell Kafka that our service is consuming. This is problematic for two reasons: 1) we can’t control how long it takes to consume a single message; and 2) we can’t tell Kafka how many messages to return when we poll. The first issue is relevant to us because message consumption may lead to database calls or even requests to third party services, for which a time bound may not even be desirable. The latter matters because each consumer in a consumer group may get back a large number of messages[ref]Kafka 0.10 will support putting an upper bound on messages returned. Kafka 0.9 does support restricting the bytes returned, but this requires great care and tuning.[/ref] if consumption from any partition has fallen behind. Combined, the two means that there may be an even longer delay before we’re ready to poll for more messages.

Given that polling is the only way to heartbeat, our consumer has a thread that periodically pauses all partitions, polls, and then resumes all partitions. Generally, this poll will return no records, unless the consumer is assigned a new partition. In that case, the returned records are added to the consumption buffer.

An alternative approach to this heartbeating technique is to increase the Kafka heartbeating timeout configuration. However, we opted not to go in this direction because we still wanted fast failure detection, as well as a solution that works equally well for topics whose messages may be handled very differently.

Consumed Bytes Tuning

Though Kafka 0.9 does not support restricting the number of messages returned by a poll, it is possible to restrict the number of bytes consumed per partition, i.e. max.partition.fetch.bytes. This is quite useful in ensuring that no consumer is holding a significant number of incomplete messages, and in keeping the memory footprint of those messages in check. For topics whose message sizes don’t have great variance, this works quite well. However, an important point to keep in mind is this restriction should be greater than the largest message that can be produced to the topic.

By default, the maximum fetch size per partition is 1MB, which we found to be on average too many messages to buffer locally. We selected our fetch sizes based on instrumenting our producers with metrics about message sizes.

Topic Partition Count Tuning

As a rule of thumb, the number of partitions we select for a topic is always some multiple of the number of consumers. We tend to start with a small number of partitions, since it’s possible to increase the number of partitions through Kafka’s operations tools, but it’s not possible to decrease the number of partitions. Additionally, the configuration for the maximum fetch bytes is per-partition, so if a host is assigned a large number of partitions, it may be overwhelmed with messages. It is critical, however, to have at least as many partitions as consumers in the consumer group. Neglecting to do so will lead to an infinite rebalance loop.

Open Source

We recognize that some of the problems we’ve solved with our libraries may not be unique to our usage patterns. We’re currently exploring the possibility of releasing these libraries to the public – stay tuned!