This article describes the internal working of Apache Zookeeper. It starts with the explanation of various components in Zookeeper and deep dives into its architecture.
Source: Google
Introduction
Apache Zookeeper is a distributed coordination service that is used by applications to implement various distributed primitives like leader election, configuration management, membership management, etc…
In an application involving multiple components, the components of the system need to work together and coordinate to achieve a result.
Example in a master-worker architecture
- The master needs to identify its workers which are processing tasks and which are idle to execute more tasks
- The master needs to identify the tasks assigned to each worker and its statuses
- The system should elect a new master if the previous master is not reachable.
These functionalities require the need for a central system to execute all these coordination activities and clients in-turn connects to the central system to make progress.
Zookeeper exposes a hierarchical key-value store persisting data in a namespace much like a filesystem or a tree data structure. It provides strong consistency and ordering guarantees.
Use cases
Some of the applications utilizing the services of Zookeeper include:
- Apache Kafka uses Zookeeper to store broker metadata, topic metadata, detecting broker failures, controller failures, etc…
- HBase uses Zookeeper to maintain cluster metadata.
- Apache Flink uses Zookeeper for the high availability of its JobManager.
znodes
Znodes are the fundamental abstraction provided by Zookeeper to represent a node in a tree-like structure.
A znode can store data in the form of a byte array and it can have child nodes.
Each znode also has an additional data structure called Stat which contains transaction id which created or modified the znode, transaction id which created or modified its children, timestamp, version for the data change, version for the child node change, ACL change, and the owner of the znode.
There are three different types of znodes:
- Persistent znodes: These znodes remain permanent in Zookeeper even when the client that created it dies. The only way of deleting such nodes is by explicitly invoking delete API.
- Ephemeral znodes: These znodes are transient in the sense that they remain in Zookeeper only during the active session of the client that created the znode. When the session expires or the client dies, the ephemeral nodes are automatically deleted.
- Sequential znodes: When the client requests for the creation of a sequential znode, Zookeeper automatically appends an incrementing sequence number to the path. This helps in determining the order of the creation of znodes. Sequential znodes can be either ephemeral or persistent.
Following are the APIs provided by Zookeeper to interact with znodes
- create /path data: Creates a new znode with the /path containing data
- delete /path: Deletes the znode /path
- exists /path: Checks whether the znode /path exists
- setData /path data: Sets the data of znode /path to data
- getData /path: Returns the data in /path
- getChildren /path: Returns the list of children under /path
Zookeeper limits the size of each znode to be 1MB by default.
Client and Server
Apache Zookeeper has two components client and the server. The server component is implemented in Java and it can either run as standalone or as a cluster of servers. The cluster of servers is called an ensemble. Zookeeper provides two implementations of client API: one in Java and another in C.
When Zookeeper runs as an ensemble, one of the servers is elected as a leader. The other servers act as followers. The leader is responsible for processing all writes and it sequences write operations and establishes the order of operations to be applied to followers. The followers vote on the operations to consider the write successful. Reads can be processed by any of the servers.
There is a third kind of server called observer which do not participate in the leader election or voting process. They learn about the changes and apply them without participating in the voting process. They are present largely for scalability reasons.
Session
When the client is initialized, it is passed a comma-separated list of Zookeeper server URLs which the client uses to connect to the ensemble. At any given point in time, the client can connect to only one server.
When the client connects to one of the servers, it establishes a session with the server by creating a handle. It uses a TCP connection to exchange messages with the server and every operation on Zookeeper should include the session handle.
The session can be transferred to another Zookeeper server and this operation is transparently handled by the Zookeeper client.
The client also maintains the last transaction id (zxid) it has seen so that it doesn’t connect to the server which has seen fewer transactions than itself.
Each session is also associated with a timeout called session timeout. When the Zookeeper server doesn’t hear from the client within the session timeout, it considers the client dead and expires the session. Any ephemeral znodes created by the client will also be deleted from the Zookeeper.
If the session timeout is time t, and if the client has not interacted with the server within 1/3t of time, it sends a heartbeat to the server. If the server with which the session was established is no longer alive, the Zookeeper client tries to find another server from the list with which it was initialized at time 2/3t and tries to re-initialize the session. The remaining time 1/3t is used as buffer time.
Sessions are touched either when the client initiates a new request or through explicit heartbeats which updates session expiration time.
Session expiration works through a data structure called expiry queue which keeps sessions in buckets corresponding to the range of session expiration time. A separate thread polls expiry queue periodically to check for any sessions eligible for expiration.
Watches
Zookeeper is a central coordination service used by applications to implement various primitives.
The clients of Zookeeper register the piece of information that needs to be coordinated centrally by creating znodes in Zookeeper.
For instance, in the case of leader election, the clients create an ephemeral znode with information on server-id as data to exercise leadership. In the case of metadata management and membership management, clients create persistent znodes with metadata information stored centrally.
Interested clients will now have access to this information and they can be notified of any changes to that particular znode. One approach is periodic polling by all participating clients but it is a very expensive operation.
Zookeeper takes the approach of notification to the clients. Clients can subscribe to changes to any znode in the Zookeeper by setting a watch.
Any change to the interested znode will trigger a watch which in-turn sends the notification to the client. Watches are one-shot operations which means once the watch is triggered, it is cleared.
To receive subsequent notifications on the changes to the same znode, clients have to re-register watch upon receiving the notification by reading the state the Zookeeper.
Zookeeper guarantees that notifications are delivered to the client before any other change is made to the same znode.
Watches can be set using Zookeeper APIs that read the state of the Zookeeper. The client can set a watch for changes to the data of a znode, changes to the children of a znode, or a znode being created or deleted.
Events are generated when a znode is created/deleted/modified, when a child node is created/deleted/modified or when the connection to a client is established or disconnected.
The event types that can be triggered by Watcher include
- None (Generated as a result of the client connection, disconnection, expiration events)
- NodeCreated
- NodeDeleted
- NodeDataChanged
- NodeChildrenChanged
- DataWatchRemoved
- ChildWatchRemoved
- PersistentWatchRemoved
Watches are always stored in memory and they are never persisted on disk. When a client disconnects from the server, its pending watches are removed from the server.
Since clients maintain the outstanding watches at their end as well, when its reconnected to one of the servers in the ensemble, the client will send a list of pending watches and the last transaction id it has seen. If any of the watched znodes have a modification timestamp greater than the transaction id, Zookeeper will trigger the watch.
Quorum
In a production environment, it is advised to run Zookeeper as a cluster of servers also referred to as ensemble for high availability.
When Zookeeper runs as an ensemble, it has to form a quorum. Quorum is the minimum number of servers that should be up and running for Zookeeper to function normally. It is also the minimum number of servers that should accept writes before returning success to the client.
If N is the total number of servers in the ensemble, quorum should be at-least N/2. There should be at least one server that intersects in the quorum.
For instance, if N=5 is the number of servers in the ensemble and N/2 >= 3 is the quorum, Zookeeper can tolerate up to 2 server failures and there will be at least one node which intersects in the quorum which has the updated data.
It is also recommended to have an odd number of quorums for better availability. In the previous example, N=5 servers, if the quorum is N/2 ≥ 4, then the cluster can only tolerate 1 server failure. This is the reason to have an odd number of servers for the quorum for high availability.
Bootstrapping ensemble
In an ensemble, servers can take one of the three roles: leader, follower, observer.
The Leader and followers are called participants as they take part in leader election and also during the voting process during write operations to Zookeeper.
Each of the servers can be in one of the following three states: LOOKING, LEADING, or FOLLOWING.
When the ensemble is bootstrapped, all the servers will be in LOOKING state. Each server sends a notification to every other server in the ensemble, passing the current vote.
The current vote is a combination of server identifier (sid) which is a numeric value and the most recent transaction that was executed by the server(zxid).
Each server on receiving current vote notification from every other server compares its vote with the incoming vote.
If the incoming zxid is higher than the zxid of the server, the server updates the current vote to the incoming vote which means the server which sent the notification has seen more transactions than the server which received it. Otherwise, the server retains its current vote.
If the incoming zxid is the same as the zxid of the server, the server with the highest server identifier (sid) wins.
In the case of bootstrapping, since the ensemble is starting fresh, it is the server with the highest sid wins and is elected as the leader.
Once the server receives the same vote from a quorum of servers, the server decides the leader. If the leader is the same as the current server, it enters the LEADING state. Otherwise, it enters the FOLLOWING state and connects to the leader to receive updates and synchronize the state.
When a new server is added to the ensemble, it follows the same process. It sends a broadcast notification to every server in the ensemble. The existing servers inform the new server of the current leader, post which the new server enters the FOLLOWING state and connects to the leader to sync its state.
There are two different mechanisms in which followers sync up the state of leaders.
If the follower is not lagging very behind, the leader simply sends the missing transaction. This operation is called DIFF.
If the follower is lagging very far behind, the leader sends the complete snapshot of the DataTree (described in the next section) and the follower recreates the state from the snapshot. This operation is called SNAP.
Data Storage
Each server in the ensemble has the following data structures to persist the znodes and its information for surviving crashes.
- In-memory DataTree
- Disk-based Transaction Log
- Snapshots
In-memory DataTree
In-memory DataTree is a composite data structure that consists of a NodeHashMap (Java’s ConcurrentHashMap under the hood) and DataNode.
It also includes a WatchManager one for data nodes and one for child nodes for storing watches in-memory.
Each entry in the ConcurrentHashMap of NodeHashMap is a key/value pair where the key is the path to the Zookeeper and the value being the DataNode itself. The NodeHashMap also includes a digest associated with each znode which is calculated using the information available in DataNode.
DataNode is the representation of a znode that contains data in the form of the byte array, children nodes (if any), ACL information, and the Stat data structure.
WatchManager data structure is responsible for holding a list of watches for each path which will be used to trigger client notifications.
Disk-based Transaction Log
Since the in-memory DataTree cannot survive crashes, to achieve high availability, every write operation to Zookeeper is appended to a transaction log that is file-based.
The write to the server is successful only if it successfully writes to the in-memory DataTree and the disk-based Transaction log
Modern operation systems make use of page cache heavily to speed up operations. As a result, every append operation first goes to page cache and fsync to disk happens periodically.
Hence there is still a chance of data loss when the operating system crashes. Zookeeper relies on replication to a quorum of servers for high availability.
To prevent transaction log from growing indefinitely, periodically the log file is closed and a new file is opened to receive new appends. The old file segments are deleted periodically once the in-memory DataTree is snapshotted to disk which survives crashes.
Snapshots
From time-to-time, in-memory DataTree is serialized and snapshotted onto disk along with session information onto a file so that the server can survive crashes and data is available upon restart.
Each snapshot is also tagged with the last transaction id (zxid) which is associated with the snapshot. This way, when the server is restarted, the snapshot can be used to restore the state of DataTree and the list of transactions that happened after the snapshot (dictated by the transaction id tagged with snapshot) can be recovered either from the master or from the transaction log.
The location of Snapshots and transaction log is specified in the dataDir configuration of Zookeeper.
Reads
Every client is connected to exactly one server in the ensemble at any given point in time through a session handle.
When the client issues a read operation to the server to which it is connected, the server first checks the validity of the session.
The server then reads the in-memory DataTree through the NodeHashMap data structure and returns the znode information.
In other words, reads are served locally and hence the reads are must faster than writes. The drawback of this is, reads might return stale data to the client since the reads are served locally and not using quorum.
Zookeeper prioritizes performance over consistency for the read operations. Hence read operations are not considered Linearizable but are considered sequential consistent with respect to the client.
If the application requires an up-to-date copy of the data, the client should issue a sync operation which will sync the client-side version with the leader before issuing read operation
Writes
Every client is connected to exactly one server in the ensemble at any given point in time through a session handle.
When the client issues a write operation, the server first validates the session and then forwards the write to the leader.
The leader converts the Request from the client into a Transaction that comprises a set of operations to modify the Zookeeper DataTree. In short, it basically acts as a sequencer of operations.
When the transaction is generated in the leader, a transaction identifier (zxid) is created which is used to sequence the order of operations. zxid is also used in leader election, where-in servers exchange zxids to figure out the server which has seen the latest transactions.
A zxid consists of two components: one being the counter which increments for every transaction and the other being the epoch. This epoch is incremented every time when leader election takes place. This helps associate a particular transaction with a leader epoch.
Once the transaction id is generated by the leader, it applies the transaction locally. It updates the in-memory DataTree and also appends the transaction to the transaction log.
Zookeeper ensures that writes are idempotent and atomic by making sure that transactions do not interfere with other transactions.
Write operations are Linearizable in Zookeeper. Writes should be acknowledged by a quorum of servers before returning a successful response to the client.
Zookeeper uses ZAB protocol which stands for Zookeeper Atomic Commit Protocol to make this happen which is essentially a two-phase commit.
Once the leader applies the changes locally, the following steps are initiated using ZAB protocol to ensure writes are persistent on a quorum of servers
- The leader sends a PROPOSAL which includes the transaction generated by the leader to all of its followers.
- Once the follower receives the PROPOSAL from the leader, they apply the transaction locally to its DataTree and transaction log and sends an ACK signal to the leader confirming the acknowledgment of the proposal.
- Once the leader receives the ACK signal from a quorum of servers, it sends a COMMIT request to followers which includes the zxid to commit the transaction and make it visible to clients.
Image credits: http://www.corejavaguru.com/bigdata/zookeeper/internals
ZAB provides the following list of guarantees
- If a transaction X is committed by one server, then all other servers must also commit the transaction X. This is called dependable delivery.
- If the leader proposes X and Y in the same order, each server must commit the transaction X before committing transaction Y. This property is referred to as casual order maintenance.
- If a transaction X gets committed before a transaction Y by some server, then the same order will have to be replicated by all the servers. This property is called total ordering
- A newly elected leader should have committed all transactions that will ever be committed from previous epochs before it starts broadcasting new transactions. This is possible by having suitable quorum where-in there will be at least one server which intersects between the previous and new epoch
- At no point in time will two servers have a quorum of supporters thereby avoiding split-brain scenarios where-in two servers assume they are the leaders and make progress leading to data inconsistencies.
Unlike a follower, observers learn about the proposals that have been committed via INFORM messages which include the complete proposal.
Once the write is durable across the quorum of servers, the leader checks if there are outstanding watches that need to be triggered using the WatchManager data structure.
If there are watches that need to be triggered, it generates a notification that gets propagated to the client by using session information that is stored alongside DataTree.
The session information includes the connection information between the client and the server. The leader serializes the WatchEvent and sends it over the wire through the respective channel of followers to the client. The ZooKeeper client library receives the serialized version of the watch event, transforms it back to a watch event, and propagates it to the application.
Conclusion
This completes the article on the fundamentals and architecture of Zookeeper, their workings and interactions, and how Zookeeper handles read and write operations.
For a more in-depth understanding of the internals of Zookeeper, would suggest downloading and tracing the source code of Zookeeper as it is simple to follow.