Apache Kafka Internals
Introduction
This article gives a glimpse of what exactly happens when a message is produced to Kafka, followed by how it is stored in Kafka and finally how it is consumed by a consumer.
Before that, let’s go through some basic constructs and terminologies used in Kafka.
Apache Kafka is a distributed pub/sub messaging system where producers can publish messages to Kafka and consumers can subscribe to certain classes of messages and consume them. It is often regarded as a distributed commit log since messages published to Kafka are stored reliably in order until the retention period.
Message: Message is a record of information. Each message has an optional key which is used for routing the message to appropriate partition in a topic, a mandatory value which is the actual information. Both key and value of the message are arrays of bytes.
Producer: Producer is an application that is responsible for publishing messages to Kafka.
Broker: Broker is an application that is responsible for persisting messages. Consumers interact with the broker to consume messages published to topics.
Cluster: Brokers operate as part of the cluster to share the load and provide fault tolerance.
Topic: Topic is used for classification of messages into logical groups.
Partition: Each topic is divided into multiple partitions. Each partition is replicated across a configurable number of brokers to handle redundancy and scalability. Each partition has one leader which is responsible for reads and writes and zero or more followers which replicate the leader.
Offset: Each message is uniquely identified by a topic, partition it belongs and the offset number. Offset is a continually increasing integer that identifies a message uniquely given a topic and a partition. Messages are ordered within a partition by offset number.
Consumer: Consumer is an application that is responsible for consuming messages published on specific topics. Consumers operate as part of the consumer group sharing the load and providing fault tolerance.
Zookeeper: Zookeeper is a distributed coordination service that is used by Kafka to store broker metadata, topic metadata, etc…

Publishing a message from producer
A producer is an application that is responsible for publishing a message to a specific topic. Each message is published to a specific topic and partition.
Each ProducerRecord (the message object which will be published) is identified by a topic, a partition, an optional key, and mandatory value.
There are multiple steps involved in publishing a message to a Kafka broker.
Identifying the partition
Producers are responsible for identifying the partition to which messages should be published. There are different strategies to help producers decide the partition.
- If the partition number is passed as an input, it is used as the partition.
- If the message has a key, it uses the default partitioner (which takes the hash of the key modulo number of partitions ) to decide the partition. We can also implement our own custom partitioner.
- If the message key is not specified, then the producer randomly distributes the messages across partitions using the round-robin algorithm.
Discovering the leader of the partition
Once the partition is identified, the producer is responsible for identifying the leader of the partition to which the message should be published.
When a producer application is bootstrapped, it is given a list of broker URLs which are called bootstrap servers (bootstrap.servers configuration). These servers are used to establish an initial connection with brokers and eventually producer identifies all the brokers in the cluster.
Metadata caching
Each broker is capable of providing metadata information about the cluster such as brokers, topics, which brokers have partitions, their leaders, etc…
Producers periodically issue a FetchMetadataRequest to one of the bootstrap servers to get cluster metadata and cache at their data (Refresh interval of metadata is governed by metadata.max.age.ms).
This information is used to decide the leader of the partition to which the message should be published. In case, the message is published to a broker which is not the leader, the broker responds with a Not a Leader exception following which producers refresh their metadata cache.
Serialization
Once the partition for the message is selected, it is passed through a serializer module. Serializer is responsible for converting the key and value of the message into a byte array. There are various popular serialization frameworks like Avro, Google Protobuf which provide type safety and schema evolution.
Batching
Once the partition and serialization phase is complete, the message is added to a batch. A batch is a group of messages that belong to the same topic and same partition. Batches can also be compressed to reduce size on network transfer (compression.type configuration).
Once the batch is ready, a separate thread is responsible for sending that batch to an appropriate broker. Configuring linger.ms makes the thread wait for additional messages in the batch till that time improving throughput at the cost of an increase in propagation delay.
Sending the message over the wire
The producer can either send the message synchronously in which case it waits on the Future. The producer can also send the message asynchronously with a callback specified that gets triggered when a batch is published to the broker.
Response handling
The successful response from the broker on publishing the message includes a RecordMetadata which includes the topic, partition information along with the offset of the message. If the batch fails, the broker throws an exception.
Exceptions could be Retryable Exception (Ex: no leader exception) or Non-Retryable Exception (Authentication failure)
Retryable exceptions can be retried a fixed number of times using retries configuration with a backoff period between retries (retry.backoff.ms configuration) after which it propagates the exception back to the client.
Order of messages will not be guaranteed with retries configured since the latter message might succeed before the former message because of retries. Configuring max.inflight.requests.per.connection configuration to 1 guarantee that there can only be one message in-flight on producer
Acknowledgment policies
There are different acknowledge policies that a producer can adopt based on acks configuration
- acks=0: The Producer does not wait for broker response. It places the message in the network buffer and returns.
- acks=1: The producer will wait only for the response from the leader.
- acks=-1 or acks=all: Producer will wait for the response from ISR(in-sync) replicas.
How the Broker stores the message
Brokers are Kafka servers which are responsible for persisting messages produced by producer and make the message available to the consumer.
Brokers operate as part of the cluster. Each broker uses Zookeeper for coordination by storing metadata.
Broker bootstrapping
Each broker has a unique identifier (identified by broker.id configuration). When a broker is started, it registers itself in Zookeeper as an ephemeral node under /brokers/ids. It also sets a watch on that node so that the broker is notified on node addition and deletion.
Each broker in the cluster periodically sends a heartbeat to Zookeeper to stay alive. When Zookeeper does not receive a heartbeat from a broker for a given period of time, it deletes the ephemeral node of that broker registered under /brokers/ids and sends a notification to all brokers in the cluster.
For a broker to be part of the cluster, they should be registered to the same Zookeeper.
Cluster Controller
One of the brokers in the cluster is elected as a cluster controller which is responsible for performing administrative tasks like reassigning partitions on broker failures etc…
When a broker is started, it tries to become the active cluster controller by registering itself in Zookeeper by creating an ephemeral node under /controller. If the broker succeeds, it becomes the active controller. If it fails with an exception controller node already exists, the broker sets a watch on the /controller node to receives notifications when the controller dies.
The controller node also periodically sends a heartbeat to Zookeeper to stay alive. When Zookeeper does not receive a heartbeat from a controller for a given period of time, it deletes the ephemeral node and sends a notification to all brokers in the cluster to elect the new controller.
Topics, Partitions and Log segments
The topic is a category to which messages are published.
Each topic can have many partitions. A partition is an ordered, immutable sequence of messages that are continuously appended. Partitions are the fundamental building blocks for scalability and redundancy. Each message is assigned an offset which is ordered within a partition.
A topic can be configured with a replication factor that denotes how many copies of partitions should exist. These partitions are distributed across brokers to provide fault tolerance. Partitions can also be configured to be rack aware to spread replicas of the same partition across different racks.
Each partition will have one leader which is responsible for reads and writes and zero or more followers which passively replicate the leader. Leader and followers are together called replicas.
Each partition has an ISR (in-sync replicas ) which is the subset of replicas list that is currently alive and caught up with the leader. When the leader of the partition crashes, one of the in-sync replicas is promoted as a leader.
The leader will remove the replica from the list of in-sync replicas if the following conditions are not met
- If the follower has not sent any fetch requests to the leader for the window period of time given by replica.lag.time.max.ms
- If the follower falls behind by more replica.lag.max.messages messages
- If the follower does not send periodic heartbeats to Zookeeper to indicate its liveliness
Each partition is a physical directory on broker disk under log.dir or log.dirs configuration. A partition cannot be split across multiple brokers or multiple disks on the same broker.
Each partition will have multiple log segment files with one active log segment and messages are always appended to the active log segment. When the size of the log segment exceeds log.segment.bytes, the current log segment is closed and a new log segment is opened. Only closed log segments are eligible for expiration.
A Closed log segment is purged when one of the following conditions is met
- Closed segment files are older than log.retention.ms or log.retention.minutes or log.retention.hours
- The total amount of bytes of messages retained per partition exceeds the value governed by log.retention.bytes
Each log segment file has a corresponding index file that maintains a relative offset and relative position to the corresponding log segment file. segment.index.bytes control the size of the index file and index.interval.bytes describe after how many bytes index, the entry will be added.
Kafka Persistence
Each batch of the message published is stored in the active log segment of the partition in exactly the same format as published by the producer. The message format is consistent across producers, consumers, brokers removing the overhead of serialization and deserialization.
Kafka uses page cache for reads and writes. All reads will directly happen from page cache and write first get applied to page cache and fsync’ed periodically.
Additionally, Kafka uses zero-copy to efficiently transfer data from page cache directly to the network socket without going through intermediate user-space buffers and disk reads and reduces the context switch between kernel space and user-space.
Replication in Kafka
Every read and write to the partition goes through the leader. The leader writes the message to the write-ahead log. A message is considered committed only if all in-sync replicas write the message to their write-ahead log. The leader also maintains a high watermark which is the latest committed message in the log.
Each topic can be configured with min.insync.replicas which is the minimum number of replicas that must acknowledge the message before considering the message as committed. Only committed messages are visible to the consumer. Producers can configure acks policy separately.
Log compaction
The retention policy on Kafka topics can be configured as either “compact” or “delete”. Delete purges old segment files based on log.retention.bytes or log.retention.ms
If the retention policy is configured as compact, Kafka will retain only the latest message for every message key. This is especially useful if we are getting a lot of updates for a given key and we are interested in only the latest message. Ex: user update event.
Leader Election
When the broker crashes, it fails to send a heartbeat to Zookeeper. Zookeeper’s session times out and it notifies cluster controller on the broker failure. Cluster controller fetches all partitions for which the broker was the leader, picks up the next entry in ISR and promotes it as a leader. This is followed by notification to respective followers on the new leader.
How Consumers consume the message
Consumers are applications that are responsible for consuming messages stored in the broker.
Consumer groups
Consumers operate as part of a consumer group. Each consumer is responsible for processing messages from a fair share of partitions. Each partition is consumed by exactly one consumer. Adding consumers to a consumer group is the way to scale consumption of messages in a topic as long as the number of partitions is always greater than or equal to the number of consumers.
When we add or remove consumers from the consumer group, partitions are redistributed across consumers to share the load.
It is also possible to have multiple consumer groups for the same topic when we have multiple applications that need to read data from the same topic for different use cases.
Group coordinator and membership management
Each consumer group will be designated a Kafka broker which is called group coordinator. The group coordinator is responsible for managing the state of the consumer group and also takes part in the partition assignment when there is a rebalance (Moving partition assignment from one consumer to another consumer).
When the consumer wants to join a consumer group, it sends a request to the group coordinator. If it is the first consumer of the consumer group, it becomes the group leader. Other consumers become the followers of the leader.
When there is a rebalance, the group coordinator gives the list of active consumers to the leader of the consumer. The leader is responsible for deciding which partitions should be consumed by which consumer using either range based partition assignment strategy or round-robin based partition assignment strategy. We can also provide our own implementation by implementing the interface PartitionAssignor. Once the partition assignment is decided, it is handed over to the group coordinator which broadcasts this information to all the consumers.
Consumers periodically send a heartbeat to the group coordinator to indicate the liveliness (governed by heartbeat.interval.ms configuration). When the group coordinator does not get a heartbeat from any consumer within the session.timeout.ms, the group coordinator considers the consumer dead and initiates the rebalance.
When the consumer shuts down gracefully, it informs the group coordinator and the group coordinator initiates a rebalance.
We can add hooks in our code when partitions are added or removed from the consumer by implementing ConsumerRebalanceListener interface.
Metadata discovery
When a consumer is initialized, it is given a list of bootstrap servers which is the list of Kafka brokers.
Consumers periodically issue a FetchMetadataRequest to one of the bootstrap servers to get cluster metadata and cache at their data (Refresh interval of metadata is governed by metadata.max.age.ms).
This information is used to decide the leader of the partition from where the message should be consumed.
Message Consumption
The way consumer consumes message from Kafka is by subscribing to a list of topics and by calling the poll() method. The poll method returns a list of records (max records governed by max.poll .records. The interval between successive polls is governed by max.poll.interval.ms configuration.
The first time, the consumer calls poll, it initiates a rebalance described above. Each poll waits until the fetch.min.bytes amount of data is available or until the fetch.max.wait.ms time whichever happens first. This prevents the consumer from doing busy polling back and forth on the broker.
The configuration max.partition.fetch.bytes control the amount of bytes broker can return per partition. This way consumers can limit the amount of the data returned by the broker based on the memory reservation capacity of the consumer.
Message Processing and checkpointing
Once the message is consumed from poll(), it is available for processing. Consumers have to periodically checkpoint the consumed offset so that in-case consumer crashes, another consumer can pick up from where the previous consumer left.
There are two ways of checkpointing
- auto-commit: This enables automatic commit of offsets periodically when auto.commit.enabled configuration is true. The interval of the commit is governed by auto.commit.interval.ms. Commits are handled inside the poll method which commits the largest offset returned by the previous poll. Depending on whether we commit before processing or after processing, auto-commit guarantees at most once semantics or at least once semantics.
- manual-commit: Auto commit can be turned off by setting auto.commit.enabled configuration to false. Manual commits can be either made synchronously in which case it commits the latest offset returned by poll or it can be async in which case failure to checkpoint do not throw an exception.
The offsets committed is pushed to a log compacted internal topic in Kafka broker called _consumer_offsets so that offsets are persistent during crash recovery. This information is also cached in memory for faster access.
When there is no valid offset to consume from the partition, auto.offset.reset configuration dictates how to consume messages from the partition : earliest (consume from the beginning of the partition) or latest (consume from the end of the partition).