Raft Consensus Algorithm
Sat 28 August 2021Consensus in Distributed Systems
In distributed systems, we will have multiple components running on multiple nodes which can communicate over a network, work together as a single system. For this system to be stable and reliable, these components need to talk to each other and need to agree on certain values. For example, in a distributed database, When we write data to it, before we save it, we need different nodes on that database to agree on the new data, so that when a read request goes to any of the available nodes, it won't return different data for the same request.
According to wikipedia
A fundamental problem in distributed computing and multi-agent systems is to achieve overall system reliability in the presence of a number of faulty processes. This often requires coordinating processes to reach consensus, or agree on some data value that is needed during computation.
Raft
Raft is a consensus algorithm that tries to solve this problem without many complications. Raft protocol depends on the existence of a leader in the system to operate. Each node can be in one of the following states.
- Leader
- Candidate
- Follower
Initially when the system starts, every node will be in Follower state. For the system to work properly as expected, there needs to be a leader. When the absence of a leader is detected, a new election is held and a new leader is elected. After this, the system can start accepting requests from clients. All the nodes will maintain an append only replication log to which it will record the incoming requests for changes to data in the system, all these changes will be sent to leader and leader will take care of propagating these changes to follower nodes.
Leader Election
Each election has a unique term number. After detecting the absence of the leader in the system, any node can stand for the next term and can request votes from other available nodes. Each node will wait for a random election timeout before transitioning to Candidate state from Follower state. After the timeout, the node will enter Candidate state, then it will send a vote request to the other nodes in the system. The vote request will have a new election term number(current term number will be increased by 1) and last commit index from its log. Before sending the vote requests, the node will vote for itself in the current term. A node can vote only once for a term, when one node receives the vote request, it will vote only for that term only if it had not previously voted for the same term and the commit index mentioned in the vote request is higher or equal to its own commit index. The node which gets the majority of votes will become the leader, if there is no majority, that election will be dropped and a new election will be held. After the node becomes leader it will start sending heartbeats to other nodes in the system indicating that it has won the election and it is the leader of the cluster for that election term. The leader node will transition to Leader state and other nodes in the system will transition to Follower state. The same procedure will be followed incase of failure of the leader node.
Log Replication
All the changes to the system have to go through the leader, whether it is for updating existing data or creating new data or deleting existing data. The leader will receive the request and records the changes in its log in uncommitted state against a new index and then sends AppendEntry request to its followers with the changes and the new index. The followers will receive the request and make the changes to its own replication log and send acknowledgement. When the leader nodes receive the acknowledgement from the majority of the nodes, the change will be marked as committed and a response will be sent to the client. After that the leader will send committed index along with the next AppendEntry message. The follower after receiving this message, will commit the changes and apply the changes to the data storage. When a new node joins the cluster or a follower becomes available after becoming unavailable, the replication log of that node will not be up-to-date. It is the responsibility of the leader node to replay all the missing changes and replicate them in those nodes. If any of the nodes cannot agree with any of the changes in the replication log with what is present in the leader node due to some network issues or any other kind of issues, the leader node will force that node to delete those changes and replicate the changes from the master node.
Building simple key value store based on raft
I tried implementing a simple distributed key value store, which uses the above raft protocol to address the consensus problem in this system. The functionality of the key value store is simple, given a key (string) and value (string), it will write it to a single json file. I was not considering LSM and MemTable for this. Given a write request, the system needs to accept the values and write it to a json file on every single node in the system and these json files need to be in sync.
For communication between nodes, I used gRPC. For accepting requests from a client, a HTTP API was exposed. The client can send a write request to the leader node and can send read requests to any of the available nodes. I have written this system in python using async await, gRPC, asyncio library.
In this system, I decided to reuse the AppendEntry request for heartbeat. When there is a write request to the leader node, it will send AppendEntry with a message otherwise it will send empty AppendEntry messages to the follower nodes (which can be considered as heartbeat).
Code is available at Github
We can run a node by running the run.py file. In that file there is a config dictionary, which can be used to mention the gRPC port, http port, data directories and other nodes present in the system.