Exactly-Once Processing in Kafka explained
This article explains how Exactly-Once Processing in Kafka works internally. It assumes that the reader is already familiar with the basics of Kafka and its ecosystem.
For a quick recap of Kafka, the reader can refer to my previous article Link.
Message Delivery Guarantees
Kafka supports three types of Message Delivery Guarantees.
- At-most once: Every message is persisted in Kafka at-most-once. Message loss is possible if the producer doesn’t retry on failures.
- At-least-once: Every message is guaranteed to be persisted in Kafka at-least-once. There is no chance of message loss but the message can be duplicated if the producer retries when the message is already persisted.
- Exactly-once: Every message is guaranteed to be persisted in Kafka exactly once without any duplicates and data loss even where there is a broker failure or producer retry.
In this article, we will understand how Kafka supports Exactly-Once Processing and how the Producer, Consumer, and the Broker components work together to achieve Exactly-Once Guarantee in Kafka.
There are two possible behaviors of Exactly-Once Processing in Kafka.
- Idempotent Guarantee: This restricts Exactly-Once Processing on a single Topic-Partition and within a single producer session. Exactly-Once Processing is not guaranteed when the producer is restarted.
- Transactional Guarantee: This ensures Exactly-Once processing on multiple Topic-Partitions and also supports Exactly-Once Processing across multiple producer sessions even when the producer is restarted multiple times.
This section gives an overview on some of the Terminologies which will be used in subsequent sections to achieve Exactly-Once Processing.
Producer ID (PID)
A Unique Identifier assigned to the producer by the broker that is not exposed to users but is passed on every request to the broker.
If transactional.id is not specified, a fresh PID is generated every-time on producer initialization.
If transactional.id is specified,the broker stores mapping of Transactional ID to PID so that it can return the same PID on producer restart.
A Unique Client provided Identifier that is used to identify a producer across application restarts and hence guarantee Exactly-Once processing across multiple sessions.
The broker stores a mapping of Transactional ID to PID so that it can identify the producer, given Transactional ID.
The epoch number is an integer that is used alongside PID to uniquely identify the latest active producer which is only relevant if transactional.id is set.
PID along with Epoch number is used to ensure that there are no Split-brain scenarios and zombie producers are prevented from making progress.
To achieve Exactly-Once Processing, the producer maintains Sequence Number for every message per PID and Topic Partition combination.
Sequence Number starts with 0 and monotonically increases for every message per PID and Topic Partition combination in the producer.
The broker also maintains the Sequence Number per PID and Topic Partition combination and rejects the request if it receives a message whose Sequence Number is not exactly one greater than what was stored in the broker.
A Control Message is a special type of marker message which is mainly used for internal communication between broker and clients and do not carry the actual message.
The two types of Control Messages are COMMIT and ABORT.
This section gives an overview on some of the Components which will be used in subsequent sections to achieve Exactly-Once Processing.
Transaction Coordinator is a module which runs inside each broker which manages the complete lifecycle of messages which are part of the Transaction.
Transaction Coordinator maintains a map of transactional.id to Transaction Metadata in-memory.
The metadata includes PID, Epoch Number, transaction timeout, last updated time of the transaction, transaction status, list of Topic Partitions among other details.
This information is also backed up on a log compacted topic in Transaction Log so that the information is persistent and can survive crashes.
Transaction Log is an internal log compacted Kafka topic that is used by the Transaction Coordinator to persist Transaction information.
Below is the flow chart which represents the sequence of steps that are executed in the Producer to achieve Exactly-Once processing.
The first step in achieving Exactly-Once processing is to enable idempotence by setting enable.idempotence=true in the producer config. This will enable Idempotence Guarantee Semantics.
We can optionally set transactional.id to some unique value that is constant across application restarts. This will enable Transactional Guarantee Semantics as long as transactional.id remains the same.
We can optionally override the configuration transaction.timeout.ms which is the maximum amount of time that the Transaction Coordinator will wait for the transaction to complete before aborting.
Following are the sequence of steps which are executed in an Idempotent Guarantee Semantics.
- Init PID
The producer initiates initPidRequest() to any broker and the Transaction Coordinator in that broker generates a fresh PID and epoch number set to 0.
2. Producer Send
For every PID and TopicPartition, the producer maintains a Sequence number that starts from 0 and is monotonically increased for every message.
The broker (leader of the partition) also maintains the Sequence Number for every PID and TopicPartition and rejects the request if the Sequence Number is not exactly one greater than the previous persisted value.
Following are the sequence of steps which are executed in a Transactional Guarantee Semantics.
- Init Transaction
The producer initiates initTransaction() method which internally finds the Transaction Coordinator followed by initializing the PID.
The producer initiates a search request to find the Transaction Coordinator for the transactional.id to any random broker. The broker returns the address details of the Transaction Coordinator to the producer.
This is followed by PID initialization. The producer initiates initPidRequest() to the Transaction Coordinator passing transactional.id and transaction.timeout.ms. The Transaction Coordinator returns PID and epoch number in response.
2. Begin Transaction
Once the transaction is initialized in the previous step, the producer starts the transaction using beginTransaction() which verifies whether the transaction was initialized before and there are no active transactions.
3. Producer Send
This is the actual processing loop where the application consumes messages from Kafka, transforms the data, optionally updates the local store, and finally produces the messages back to Kafka committing the consumer offsets in the consumer coordinator. This complete loop should be atomic.
Every request in this loop includes PID, Epoch, and Transactional ID along with other fields.
Following are the steps which are executed in this loop when we call send() method of the producer.
3a. AddPartitionsToTxnRequest: Every new TopicPartition involved in the Transaction is added to the Transaction Coordinator so that it is aware of TopicPartitions involved in the Transaction which can be used to send Control Messages at the end of the Transaction.
3b. ProduceRequest: For every PID and TopicPartition, the producer maintains a Sequence number that starts from 0 and is monotonically increased for every message. The broker (leader of the partition) also maintains the Sequence Number for every PID and TopicPartition and rejects the request if the Sequence Number is not exactly one greater than the previous persisted value.
3c. AddOffsetToTxnRequest: The producer exposes a sendOffsets API which internally calls AddOffsetToTxnRequest passing Consumer Group ID to the Transaction Coordinator from which it deduces TopicPartition of the consumer and logs them in the Transaction Log.
3d. TxnOffsetCommitRequest: The producer exposes a sendOffsets API which internally calls TxnOffsetCommitRequest to the Consumer Group Coordinator passing consumer offsets to commit in the __consumer_offsets topic. The offsets are not visible to consumers until the transaction is complete.
4. Commit Transaction/ Abort Transaction
Once the consume-transform-produce loop is complete for all messages in the transaction, the producer invokes commitTransaction() or abortTransaction() to the Transaction Coordinator which completes the Transaction.
When the broker is started, it also initializes Transaction Coordinator which runs inside each broker. Each Coordinator is responsible for set of producers identified by transactional.id.
hashcode(transactional.id) % total partitions in transactions log = partition Number of transaction log.
The leader broker of the above partition will be chosen as the Transaction Coordinator of the producer with the given transactional.id.
Once the partition number is chosen, Transaction Coordinator is initialized by reading the partition information from the Transaction Log topic which is a log compacted topic and builds a metadata cache in memory..
Metadata cache is a map of transactional.id to Transaction Metadata including epoch number, PID, timeout, transaction status, topic partitions data etc…
In this section, we will discuss the sequence of steps which are executed in the broker corresponding to the sequence in producer which was described in the previous section.
- Init Transaction
When the producer initiates a FindCoordinatorRequest to any random broker with the transactional.id, the algorithm mentioned above in the Bootstrapping step is used to determine the broker where the Transaction Coordinator resides for the given Producer.
Once the address of the Transaction Coordinator is resolved, the producer initiates a initPidRequest() to the Transaction Coordinator with the transaction.timeout.ms configuration.
Transaction Coordinator looks up for transactional.id mapping in Transaction Metadata Cache.
If there is no mapping, it generates a unique PID, sets epoch number to 0, updates Transaction Metadata and pushes the metadata to Transaction log and returns PID, epoch number back to the producer.
If there is a mapping of transactional.id to PID, it fetches the mapping and checks for any pending transaction created by previous instance of the producer with the same transactional.id .
If the transaction is prepared to commit, it initiates a COMMIT request, otherwise a ABORT request.
It then proceeds to bump up Epoch number so that previous instance of the producer is fenced off when it comes online, updates Transaction Metadata and and pushes the metadata to Transaction log and returns PID, epoch number back to the producer.
2. Producer Send
3a. AddPartitionsToTxnRequest: When the Transaction Coordinator receives this request, it verifies whether the transactional.id to PID mapping is valid along with Epoch Number.
It then proceeds to update Transaction Metadata with the Topic Partition which was sent by the Producer and also appends it to the Transaction log.
If this is the first request received by the Coordinator for the transaction, it also starts a timer so that it can abort the transaction on timeout.
3b. ProducerRequest: Every leader broker of the partition maintains a structure PID Snapshot which stores information of PID to it’s last seen Topic Partition Sequence Number which is used for de-duplication in-memory. This structure is periodically snapshotted into file system to survive crashes.
If the PID mapping doesn’t exist in the snapshot, the broker updates PID snapshot in-memory and appends the message to partition log with PID and epoch number.
Any message which is appended to the topic with PID and epoch number will not be visible to consumers until a COMMIT message is seen.
If the PID mapping exists in the snapshot and sequence number is exactly one greater than what was stored in the broker, the broker updates PID snapshot in-memory and appends the message to partition log with PID and epoch number.
If the sequence number is less than what was stored in the broker, it returns a Duplicate Sequence Number error message.
If the sequence number is greater than what was stored in the broker, it returns a Invalid Sequence Number error message.
3c. AddOffsetToTxnRequest: When the Transaction Coordinator receives this request, it verifies whether the transactional.id to PID mapping is valid along with Epoch Number.
The Coordinator then extracts the TopicPartition of the Consumer group from the group ID (Looks up from __consumer_offsets topic) passed and adds this information to Transaction Metadata and Transaction log.
3d. TxnOffsetCommitRequest: This request is sent to Consumer Group Coordinator of the consumer in the consumer-transform-produce loop.
The Consumer Group Coordinator first verifies PID and Epoch number to confirm whether the request is coming from a valid producer and then updates the offsets with PID and epoch number. This message will not be visible to consumers until a COMMIT message is seen.
4. Commit Transaction/ Abort Transaction
When the producer initiates a Commit/Abort Request to the Transaction Coordinator, the Coordinator first verifies transactional.id, PID and epoch number mapping.
The Coordinator creates a PREPARE_COMMIT or PREPARE_ABORT request and appends it to the Transaction log which is used as a synchronization point.
Once the Prepare_XX phase is complete, the Coordinator initiates a WriteTxnMarkerRequest which includes PID, Epoch number and the control message COMMIT or ABORT to the leader of the partition.
When the leader receives the request, it appends it to the partition log.
If it is a COMMIT message, the message will be returned by the broker to the consumers on FetchRequest. If it is a ABORT message, the message will be discarded on FetchRequest.
If the __consumer_offsets topic is one of the TopicPartitions in the transaction, the commit (or abort) marker is also written to the log, and the consumer coordinator is notified to materialize these offsets for COMMIT and drop for ABORT.
Kafka Consumers require us to set isolation.level configuration to achieve Exactly-Once Processing.
isolation.level configuration has two possible values READ_UNCOMMITTED and READ_COMMITTED.
Setting isolation.level=READ_UNCOMMITTED ensures that all messages are read in offset order.
The broker can even return the messages which are part of the Transaction and not completed yet. This can be problematic if the message that was part of the Transaction is aborted later leading to inconsistent behavior.
Setting isolation.level=READ_COMMITTED ensures that the broker returns non-transactional messages and committed messages only in case of transactional messages.
The broker stops when it encounters a message which is part of the transaction and not yet complete even though there exist non-transactional messages after that.
This guarantees that messages which are part of the transaction are visible to consumers only after the transaction is complete.
Kafka introduced two offsets namely Latest Stable Offset and High Watermark Offset to distinguish the read positions in the case of the READ_COMMITTED isolation level.
The Latest Stable Offset is the offset at which all transactional messages at lower offset have been either committed or aborted. This offset is used to compute Consumer Lag when the isolation level is READ_COMMITTED.
The High Watermark Offset is the latest offset recorded which is irrespective of whether the message is part of the transaction or not and whether the message was committed or not.
This completes our discussion on how Producer, Consumer and the Broker work together to achieve Exactly-Once Processing.