Introduction
The Raft consensus algorithm has become one of the most widely adopted consensus protocols in modern distributed systems. Designed specifically to be understandable and implementable, Raft provides the same safety and liveness guarantees as the more complex Paxos algorithm while being significantly more approachable.
This comprehensive guide covers the Raft algorithm in depth, from its fundamental concepts through practical implementation considerations. Whether you’re building a distributed database, a coordination service, or any system requiring strong consistency across multiple nodes, understanding Raft is essential.
Understanding Distributed Consensus
The Consensus Problem
Distributed consensus is one of the most fundamental problems in distributed systems:
"""
The Consensus Problem requires:
1. Agreement: All non-faulty nodes agree on the same value
2. Validity: If all nodes propose the same value, they agree on that value
3. Termination: Every non-faulty node eventually decides on some value
This is deceptively simple to state but remarkably difficult to achieve
in the presence of network partitions, node failures, and asynchronous communication.
"""
consensus_challenges = {
"network_partitions": "Nodes cannot communicate but may both think they're leader",
"node_failures": "Nodes can crash and recover at any time",
"asynchronous_communication": "Messages can be delayed indefinitely",
"byzantine_failures": "Nodes can behave arbitrarily (optional)"
}
Why Raft Instead of Paxos?
Raft was designed to solve the understandability problem:
comparison = {
"Paxos": {
"pro": "Elegant theoretical foundation, widely proven",
"con": "Extremely difficult to understand and implement",
"con": "Multiple variants (Basic, Multi, Fast, Cheap)",
"con": "Hard to build practical systems on"
},
"Raft": {
"pro": "Designed for understandability",
"pro": "Clear separation of concerns",
"pro": "Well-documented with reference implementation",
"pro": "Easier to build correct systems",
"con": "May be slightly less efficient in some cases"
}
}
Core Concepts and Data Structures
Node States and Roles
In Raft, each node can be in one of three states:
from enum import Enum
class NodeState(Enum):
FOLLOWER = "follower" # Default state, receives heartbeats
CANDIDATE = "candidate" # Running for leader
LEADER = "leader" # Handling client requests
class RaftNode:
"""
Core Raft node data structures.
"""
def __init__(self, node_id):
# Identity
self.node_id = node_id
# Persistent state (must survive restart)
self.current_term = 0 # Monotonically increasing term
self.voted_for = None # Candidate voted for in current term
self.log = [] # Log entries (term, command, index)
# Volatile state
self.state = NodeState.FOLLOWER
self.votes_received = set() # Set of nodes that voted for us
self.leader_id = None # Current leader (for followers)
# Volatile state (reinitialized after restart)
self.commit_index = 0 # Highest committed log index
self.last_applied = 0 # Highest applied to state machine
# Leader-specific volatile state
self.next_index = {} # Next log index to send to each follower
self.match_index = {} # Highest matched index for each follower
# Timing
self.election_timeout = None # Randomized election timeout
self.last_heartbeat = None # Last received heartbeat
Log Entry Structure
@dataclass
class LogEntry:
"""A single entry in the Raft log."""
term: int # Term when entry was created
index: int # Position in log
command: Any # State machine command
timestamp: float = None # When entry was created
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
RPC Messages
Raft uses two primary RPC types:
@dataclass
class RequestVoteParams:
"""Parameters for RequestVote RPC."""
term: int # Candidate's term
candidate_id: int # Candidate requesting vote
last_log_index: int # Index of candidate's last log entry
last_log_term: int # Term of candidate's last log entry
@dataclass
class RequestVoteResult:
"""Result of RequestVote RPC."""
term: int # Current term for candidate to update
vote_granted: bool # True means candidate received vote
@dataclass
class AppendEntriesParams:
"""Parameters for AppendEntries RPC (heartbeat or log replication)."""
term: int # Leader's term
leader_id: int # Leader's identity
prev_log_index: int # Index of log entry immediately preceding new entries
prev_log_term: int # Term of prev_log_index entry
entries: List[LogEntry] # Log entries to store (empty for heartbeat)
leader_commit: int # Leader's commit index
@dataclass
class AppendEntriesResult:
"""Result of AppendEntries RPC."""
term: int # Current term for leader to update
success: bool # True if follower contained entry matching prev_log_index/term
# Additional fields for optimization
conflict_index: int = None # First index that conflicts
conflict_term: int = None # Term of conflicting entry
Leader Election
Election Mechanism
The leader election process is central to Raft:
class LeaderElection:
"""
Raft leader election implementation.
"""
def start_election(self):
"""
Start a new election (called when election timeout expires).
Steps:
1. Increment term
2. Become candidate
3. Vote for self
4. Request votes from all other nodes
5. If majority received, become leader
"""
# Increment term (we're starting a new election)
self.current_term += 1
self.state = NodeState.CANDIDATE
# Vote for self
self.voted_for = self.node_id
self.votes_received = {self.node_id}
# Persist state (critical for safety)
self.persist()
# Request votes from all other nodes
self.request_votes_from_all()
def request_votes_from_all(self):
"""Send RequestVote RPC to all other nodes."""
last_log_index = len(self.log) - 1
last_log_term = self.log[last_log_index].term if self.log else 0
for peer in self.peers:
asyncio.create_task(self.send_request_vote(
peer,
RequestVoteParams(
term=self.current_term,
candidate_id=self.node_id,
last_log_index=last_log_index,
last_log_term=last_log_term
)
))
async def send_request_vote(self, peer, params):
"""Send RequestVote RPC to a peer."""
result = await peer.request_vote(params)
# Handle term update
if result.term > self.current_term:
self.current_term = result.term
self.state = NodeState.FOLLOWER
self.voted_for = None
self.persist()
return
# Check if we won
if result.vote_granted and self.state == NodeState.CANDIDATE:
self.votes_received.add(peer.node_id)
# Check for majority
if len(self.votes_received) > len(self.peers) // 2:
self.become_leader()
def become_leader(self):
"""Transition to leader state."""
self.state = NodeState.EVENT_LEADER
self.leader_id = self.node_id
# Initialize leader-specific state
for peer in self.peers:
self.next_index[peer.node_id] = len(self.log)
self.match_index[peer.node_id] = 0
# Immediately send heartbeats to establish leadership
self.send_append_entries_to_all()
Voting Logic
The critical safety rule for voting:
def should_vote_for(self, params: RequestVoteParams) -> bool:
"""
Determine if we should vote for a candidate.
Safety rules:
1. If candidate's term < our term, don't vote
2. If we've already voted for someone else in this term, don't vote
3. If candidate's log is not as up-to-date as ours, don't vote
"""
# Rule 1: Outdated term
if params.term < self.current_term:
return False
# Rule 2: Already voted
if self.voted_for is not None and self.voted_for != params.candidate_id:
return False
# Rule 3: Log up-to-date check
# Candidate's log must be at least as up-to-date as ours
last_log_index = len(self.log) - 1
last_log_term = self.log[last_log_index].term if self.log else 0
if params.last_log_term < last_log_term:
return False
if params.last_log_term == last_log_term and params.last_log_index < last_log_index:
return False
# All rules pass - vote for candidate
return True
def handle_request_vote(self, params: RequestVoteParams) -> RequestVoteResult:
"""Handle incoming RequestVote RPC."""
# Update term if needed
if params.term > self.current_term:
self.current_term = params.term
self.state = NodeState.FOLLOWER
self.voted_for = None
self.persist()
# Check if we should vote
if self.should_vote_for(params):
self.voted_for = params.candidate_id
self.persist()
return RequestVoteResult(
term=self.current_term,
vote_granted=True
)
return RequestVoteResult(
term=self.current_term,
vote_granted=False
)
Election Timeout and Randomization
import random
import asyncio
class ElectionTimeoutManager:
"""
Manages election timeouts with randomization.
"""
# Typical values
MIN_ELECTION_TIMEOUT_MS = 1500 # 1.5 seconds
MAX_ELECTION_TIMEOUT_MS = 3000 # 3 seconds
HEARTBEAT_INTERVAL_MS = 500 # Heartbeats every 500ms
def __init__(self, node):
self.node = node
self.election_timer = None
self.heartbeat_timer = None
def reset_election_timer(self):
"""Reset election timeout (called on receiving valid heartbeat)."""
if self.election_timer:
self.election_timer.cancel()
# Random timeout to prevent split votes
timeout_ms = random.randint(
self.MIN_ELECTION_TIMEOUT_MS,
self.MAX_ELECTION_TIMEOUT_MS
)
self.election_timer = asyncio.create_task(
self._election_timeout_handler(timeout_ms / 1000)
)
async def _election_timeout_handler(self, timeout_seconds):
"""Handle election timeout - start election."""
await asyncio.sleep(timeout_seconds)
# Check if we should start election
if self.node.state == NodeState.FOLLOWER:
# No heartbeat received - start election
self.node.start_election()
def start_leader_heartbeats(self):
"""Start sending periodic heartbeats (called when becoming leader)."""
if self.heartbeat_timer:
self.heartbeat_timer.cancel()
async def send_heartbeats():
while self.node.state == NodeState.LEADER:
self.node.send_append_entries_to_all()
await asyncio.sleep(self.HEARTBEAT_INTERVAL_MS / 1000)
self.heartbeat_timer = asyncio.create_task(send_heartbeats())
Log Replication
The Replication Process
class LogReplication:
"""
Raft log replication implementation.
"""
def receive_client_request(self, command):
"""
Leader receives client request.
Process:
1. Append command to local log
2. Send AppendEntries to followers
3. When majority acknowledge, commit and apply
"""
if self.state != NodeState.LEADER:
# Redirect to leader
return {"error": "not_leader", "leader": self.leader_id}
# Append to local log
entry = LogEntry(
term=self.current_term,
index=len(self.log),
command=command
)
self.log.append(entry)
self.persist()
# Replicate to followers
self.replicate_to_followers()
return {"status": "started", "index": entry.index}
def replicate_to_followers(self):
"""Send AppendEntries to all followers."""
for peer in self.peers:
asyncio.create_task(self.replicate_to_follower(peer))
async def replicate_to_follower(self, peer):
"""Replicate log to a specific follower."""
# Get prev log info
prev_log_index = self.next_index[peer.node_id] - 1
prev_log_term = self.log[prev_log_index].term if prev_log_index >= 0 else 0
# Get entries to send
entries = self.log[self.next_index[peer.node_id]:]
# Send AppendEntries
params = AppendEntriesParams(
term=self.current_term,
leader_id=self.node_id,
prev_log_index=prev_log_index,
prev_log_term=prev_log_term,
entries=entries,
leader_commit=self.commit_index
)
result = await peer.append_entries(params)
# Handle response
if result.success:
# Update next and match index
self.next_index[peer.node_id] = len(self.log)
self.match_index[peer.node_id] = prev_log_index + len(entries)
# Check if we can commit
self.check_commit()
else:
# Handle failure - step back
if result.term > self.current_term:
# We're outdated
self.current_term = result.term
self.state = NodeState.FOLLOWER
self.persist()
else:
# Log mismatch - decrement next_index and retry
self.next_index[peer.node_id] = max(1, self.next_index[peer.node_id] - 1)
# Retry with smaller batch
await self.replicate_to_follower(peer)
Committing Entries
def check_commit(self):
"""
Check if we can commit new entries.
Raft commits entries when:
1. Entry is stored on majority of nodes
2. Leader has applied all prior entries
"""
# Find the highest index that might be committed
# Check each index from commit_index + 1
for index in range(self.commit_index + 1, len(self.log)):
# Count how many nodes have this entry
count = 1 # Leader has it
for peer in self.peers:
if self.match_index[peer.node_id] >= index:
count += 1
# Check majority and term
if count > len(self.peers) // 2:
# Check that the entry's term is current leader's term
# (Raft doesn't commit entries from previous terms directly)
if self.log[index].term == self.current_term:
# Commit this entry
self.commit_index = index
# Apply uncommitted entries to state machine
self.apply_to_state_machine()
def apply_to_state_machine(self):
"""Apply committed entries to the state machine."""
while self.last_applied < self.commit_index:
self.last_applied += 1
entry = self.log[self.last_applied]
# Apply to state machine (implementation depends on use case)
result = self.state_machine.apply(entry.command)
# Send response to client if needed
if entry.client_request_id:
self.pending_results[entry.client_request_id] = result
Handling AppendEntries
def handle_append_entries(self, params: AppendEntriesParams) -> AppendEntriesResult:
"""Handle incoming AppendEntries RPC."""
# Update term if needed
if params.term > self.current_term:
self.current_term = params.term
self.state = NodeState.FOLLOWER
self.leader_id = params.leader_id
self.persist()
# Reset election timer
self.reset_election_timer()
# If we're leader, reject (another leader exists)
if self.state == NodeState.LEADER:
return AppendEntriesResult(term=self.current_term, success=False)
# Validate prev log entry
if params.prev_log_index > len(self.log) - 1:
return AppendEntriesResult(
term=self.current_term,
success=False,
conflict_index=len(self.log),
conflict_term=None
)
if params.prev_log_index >= 0:
if self.log[params.prev_log_index].term != params.prev_log_term:
# Term mismatch - find first conflicting term
conflict_term = self.log[params.prev_log_index].term
conflict_index = self._find_first_index_of_term(conflict_term)
return AppendEntriesResult(
term=self.current_term,
success=False,
conflict_index=conflict_index,
conflict_term=conflict_term
)
# Append new entries
self.log = self.log[:params.prev_log_index + 1]
self.log.extend(params.entries)
self.persist()
# Update commit index
if params.leader_commit > self.commit_index:
self.commit_index = min(params.leader_commit, len(self.log) - 1)
self.apply_to_state_machine()
return AppendEntriesResult(term=self.current_term, success=True)
Safety Guarantees
Election Safety
Raft guarantees that at most one leader exists at any time:
safety_guarantees = {
"election_safety": "At most one leader per term",
"leader_append_only": "Leader never overwrites or deletes entries",
"log_matching": "If two logs contain an entry with same index and term, logs are identical",
"leader_completeness": "If entry is committed, it will appear in all future leaders",
"state_machine_safety": "All nodes apply same log entries in same order"
}
# Election safety implementation:
# - A node only votes for one candidate per term
# - A candidate must have log as up-to-date as voter to get vote
# - Majority required to become leader
# - Terms monotonically increase
Log Matching Property
The log matching property ensures consistency:
def _find_first_index_of_term(self, term: int) -> int:
"""Find first index with given term."""
for i, entry in enumerate(self.log):
if entry.term == term:
return i
return len(self.log) # Not found
def _find_last_index_of_term(self, term: int) -> int:
"""Find last index with given term."""
for i in range(len(self.log) - 1, -1, -1):
if self.log[i].term == term:
return i
return -1 # Not found
Membership Changes
Joint Consensus
Adding or removing nodes safely requires a two-phase approach:
class MembershipChange:
"""
Safe membership changes using joint consensus.
"""
async def add_node(self, new_node):
"""
Add a new node using joint consensus.
Phase 1: New configuration (old + new)
- All operations require majority of (old + new)
- Both old and new nodes must replicate
Phase 2: New configuration alone
- Switch to new configuration
- Old nodes can be shut down
"""
# Add new node as non-voting member first
self.cluster_config.add_member(new_node, voting=False)
# Wait for node to catch up (replicate recent entries)
await self.wait_for_catchup(new_node)
# Promote to voting member - Joint consensus
joint_config = ClusterConfig(
old_members=self.cluster_config.members,
new_members=self.cluster_config.members | {new_node},
joint=True
)
# Commit joint configuration
self.log_append(ConfigurationChange(joint_config))
# Wait for joint config to commit on majority
await self.wait_for_commit(joint_config)
# Switch to new configuration
new_config = ClusterConfig(members=joint_config.new_members)
self.log_append(ConfigurationChange(new_config))
# Old nodes can now be removed if needed
Log Compaction and Snapshots
Why Snapshots Are Needed
Logs grow indefinitely, requiring compaction:
class LogCompaction:
"""
Raft log compaction using snapshots.
"""
def create_snapshot(self):
"""
Create a snapshot of the current state.
Snapshot contains:
- Last included index
- Last included term
- State machine state
"""
snapshot = Snapshot(
last_included_index=self.last_applied,
last_included_term=self.log[self.last_applied].term,
state=self.state_machine.serialize()
)
# Save snapshot to disk
self.save_snapshot(snapshot)
# Truncate log up to snapshot point
self.log = self.log[:self.last_applied + 1]
# Optionally delete older snapshots
self.persist()
async def install_snapshot(self, peer, params: InstallSnapshotParams):
"""
Send snapshot to a follower that's far behind.
This happens when follower's next_index is behind leader's
truncated log (due to snapshot).
"""
# Send snapshot in chunks
offset = 0
while offset < params.data.length:
chunk = params.data[offset:offset + CHUNK_SIZE]
await peer.send_snapshot_chunk(SnapshotChunk(
term=self.current_term,
leader_id=self.node_id,
offset=offset,
data=chunk,
done=(offset + CHUNK_SIZE >= params.data.length)
))
offset += CHUNK_SIZE
Practical Implementation Tips
Persistence
Critical data that must be persisted:
class PersistenceManager:
"""
Handles persistence of Raft state.
"""
def persist(self):
"""Save all persistent state to disk."""
data = {
"currentTerm": self.current_term,
"votedFor": self.voted_for,
"log": [(e.term, e.command) for e in self.log]
}
# Atomic write
temp_file = self.persist_path + ".tmp"
with open(temp_file, 'wb') as f:
pickle.dump(data, f)
os.replace(temp_file, self.persist_path)
def recover(self):
"""Recover state from disk on startup."""
if not os.path.exists(self.persist_path):
return
with open(self.persist_path, 'rb') as f:
data = pickle.load(f)
self.current_term = data["currentTerm"]
self.voted_for = data["votedFor"]
self.log = [LogEntry(term=t, index=i, command=c)
for i, (t, c) in enumerate(data["log"])]
Performance Considerations
performance_tips = {
"batch_reads": "Batch multiple client reads into single round-trip",
"pipeline": "Pipeline AppendEntries requests for throughput",
"snapshots": "Snapshot frequently to reduce log size",
"async": "Make RPCs asynchronous for better concurrency",
"udt": "Use efficient serialization (Protocol Buffers, MessagePack)"
}
# Common issues to avoid:
pitfalls = [
"Forgetting to persist before responding to RPCs",
"Not handling stale responses after term changes",
"Infinite retry loops on failedAppendEntries",
"Not randomizing election timeouts enough",
"Ignoring Snapshot installs from leader"
]
Monitoring and Debugging
Key Metrics
metrics_to_monitor = {
"leader_election": [
"Number of elections per minute",
"Election timeout frequency",
"Split vote frequency"
],
"log_replication": [
"Commit latency (P50, P95, P99)",
"Replication lag per follower",
"Number of uncommitted entries"
],
"snapshots": [
"Snapshot creation frequency",
"Snapshot size",
"Time to create/install snapshot"
],
"resource": [
"Log size",
"Memory usage",
"Disk I/O"
]
}
When to Use Raft
Use Raft When
use_cases = {
"distributed_databases": ["etcd", "Consul", "CockroachDB", "TiKV"],
"coordination": ["Service discovery", "Lock service", "Configuration management"],
"state_replication": ["Distributed caches", "Leader election", "Namespaces"]
}
choose_raft_when = [
"You need strong consistency",
"Number of nodes is small (< 7)",
"You value understandability and maintainability",
"Network partitions are rare",
"Simple deployment is preferred"
]
consider_alternatives_when = [
"Eventual consistency is acceptable",
"You need very high availability across datacenters",
"Scale to many nodes",
"Byzantine fault tolerance is required"
]
Conclusion
The Raft consensus algorithm provides a practical solution to the distributed consensus problem. Its design for understandability makes it an excellent choice for building reliable distributed systems.
Key takeaways:
- Leader election uses randomized timeouts and majority voting to ensure safety
- Log replication ensures all nodes apply the same commands in the same order
- Safety guarantees prevent split brains and ensure consistency
- Snapshots enable log compaction for long-running systems
- Membership changes use joint consensus for safe transitions
Raft’s clarity and proven correctness make it the go-to choice for most consensus needs in modern distributed systems.
Comments