Skip to main content
โšก Calmops

Raft Consensus Algorithm: Complete Implementation Guide 2026

Introduction

The Raft consensus algorithm has become one of the most widely adopted consensus protocols in modern distributed systems. Designed specifically to be understandable and implementable, Raft provides the same safety and liveness guarantees as the more complex Paxos algorithm while being significantly more approachable.

This comprehensive guide covers the Raft algorithm in depth, from its fundamental concepts through practical implementation considerations. Whether you’re building a distributed database, a coordination service, or any system requiring strong consistency across multiple nodes, understanding Raft is essential.

Understanding Distributed Consensus

The Consensus Problem

Distributed consensus is one of the most fundamental problems in distributed systems:

"""
The Consensus Problem requires:

1. Agreement: All non-faulty nodes agree on the same value
2. Validity: If all nodes propose the same value, they agree on that value
3. Termination: Every non-faulty node eventually decides on some value

This is deceptively simple to state but remarkably difficult to achieve
in the presence of network partitions, node failures, and asynchronous communication.
"""

consensus_challenges = {
    "network_partitions": "Nodes cannot communicate but may both think they're leader",
    "node_failures": "Nodes can crash and recover at any time",
    "asynchronous_communication": "Messages can be delayed indefinitely",
    "byzantine_failures": "Nodes can behave arbitrarily (optional)"
}

Why Raft Instead of Paxos?

Raft was designed to solve the understandability problem:

comparison = {
    "Paxos": {
        "pro": "Elegant theoretical foundation, widely proven",
        "con": "Extremely difficult to understand and implement",
        "con": "Multiple variants (Basic, Multi, Fast, Cheap)",
        "con": "Hard to build practical systems on"
    },
    "Raft": {
        "pro": "Designed for understandability",
        "pro": "Clear separation of concerns",
        "pro": "Well-documented with reference implementation",
        "pro": "Easier to build correct systems",
        "con": "May be slightly less efficient in some cases"
    }
}

Core Concepts and Data Structures

Node States and Roles

In Raft, each node can be in one of three states:

from enum import Enum

class NodeState(Enum):
    FOLLOWER = "follower"      # Default state, receives heartbeats
    CANDIDATE = "candidate"    # Running for leader
    LEADER = "leader"          # Handling client requests

class RaftNode:
    """
    Core Raft node data structures.
    """
    
    def __init__(self, node_id):
        # Identity
        self.node_id = node_id
        
        # Persistent state (must survive restart)
        self.current_term = 0           # Monotonically increasing term
        self.voted_for = None           # Candidate voted for in current term
        self.log = []                   # Log entries (term, command, index)
        
        # Volatile state
        self.state = NodeState.FOLLOWER
        self.votes_received = set()      # Set of nodes that voted for us
        self.leader_id = None           # Current leader (for followers)
        
        # Volatile state (reinitialized after restart)
        self.commit_index = 0           # Highest committed log index
        self.last_applied = 0           # Highest applied to state machine
        
        # Leader-specific volatile state
        self.next_index = {}            # Next log index to send to each follower
        self.match_index = {}           # Highest matched index for each follower
        
        # Timing
        self.election_timeout = None     # Randomized election timeout
        self.last_heartbeat = None       # Last received heartbeat

Log Entry Structure

@dataclass
class LogEntry:
    """A single entry in the Raft log."""
    term: int                          # Term when entry was created
    index: int                         # Position in log
    command: Any                       # State machine command
    timestamp: float = None            # When entry was created
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()

RPC Messages

Raft uses two primary RPC types:

@dataclass
class RequestVoteParams:
    """Parameters for RequestVote RPC."""
    term: int                 # Candidate's term
    candidate_id: int         # Candidate requesting vote
    last_log_index: int       # Index of candidate's last log entry
    last_log_term: int        # Term of candidate's last log entry

@dataclass
class RequestVoteResult:
    """Result of RequestVote RPC."""
    term: int                 # Current term for candidate to update
    vote_granted: bool        # True means candidate received vote

@dataclass
class AppendEntriesParams:
    """Parameters for AppendEntries RPC (heartbeat or log replication)."""
    term: int                 # Leader's term
    leader_id: int            # Leader's identity
    prev_log_index: int       # Index of log entry immediately preceding new entries
    prev_log_term: int        # Term of prev_log_index entry
    entries: List[LogEntry]   # Log entries to store (empty for heartbeat)
    leader_commit: int         # Leader's commit index

@dataclass
class AppendEntriesResult:
    """Result of AppendEntries RPC."""
    term: int                 # Current term for leader to update
    success: bool             # True if follower contained entry matching prev_log_index/term
    # Additional fields for optimization
    conflict_index: int = None    # First index that conflicts
    conflict_term: int = None     # Term of conflicting entry

Leader Election

Election Mechanism

The leader election process is central to Raft:

class LeaderElection:
    """
    Raft leader election implementation.
    """
    
    def start_election(self):
        """
        Start a new election (called when election timeout expires).
        
        Steps:
        1. Increment term
        2. Become candidate
        3. Vote for self
        4. Request votes from all other nodes
        5. If majority received, become leader
        """
        
        # Increment term (we're starting a new election)
        self.current_term += 1
        self.state = NodeState.CANDIDATE
        
        # Vote for self
        self.voted_for = self.node_id
        self.votes_received = {self.node_id}
        
        # Persist state (critical for safety)
        self.persist()
        
        # Request votes from all other nodes
        self.request_votes_from_all()
    
    def request_votes_from_all(self):
        """Send RequestVote RPC to all other nodes."""
        
        last_log_index = len(self.log) - 1
        last_log_term = self.log[last_log_index].term if self.log else 0
        
        for peer in self.peers:
            asyncio.create_task(self.send_request_vote(
                peer,
                RequestVoteParams(
                    term=self.current_term,
                    candidate_id=self.node_id,
                    last_log_index=last_log_index,
                    last_log_term=last_log_term
                )
            ))
    
    async def send_request_vote(self, peer, params):
        """Send RequestVote RPC to a peer."""
        
        result = await peer.request_vote(params)
        
        # Handle term update
        if result.term > self.current_term:
            self.current_term = result.term
            self.state = NodeState.FOLLOWER
            self.voted_for = None
            self.persist()
            return
        
        # Check if we won
        if result.vote_granted and self.state == NodeState.CANDIDATE:
            self.votes_received.add(peer.node_id)
            
            # Check for majority
            if len(self.votes_received) > len(self.peers) // 2:
                self.become_leader()
    
    def become_leader(self):
        """Transition to leader state."""
        
        self.state = NodeState.EVENT_LEADER
        self.leader_id = self.node_id
        
        # Initialize leader-specific state
        for peer in self.peers:
            self.next_index[peer.node_id] = len(self.log)
            self.match_index[peer.node_id] = 0
        
        # Immediately send heartbeats to establish leadership
        self.send_append_entries_to_all()

Voting Logic

The critical safety rule for voting:

def should_vote_for(self, params: RequestVoteParams) -> bool:
    """
    Determine if we should vote for a candidate.
    
    Safety rules:
    1. If candidate's term < our term, don't vote
    2. If we've already voted for someone else in this term, don't vote
    3. If candidate's log is not as up-to-date as ours, don't vote
    """
    
    # Rule 1: Outdated term
    if params.term < self.current_term:
        return False
    
    # Rule 2: Already voted
    if self.voted_for is not None and self.voted_for != params.candidate_id:
        return False
    
    # Rule 3: Log up-to-date check
    # Candidate's log must be at least as up-to-date as ours
    last_log_index = len(self.log) - 1
    last_log_term = self.log[last_log_index].term if self.log else 0
    
    if params.last_log_term < last_log_term:
        return False
    
    if params.last_log_term == last_log_term and params.last_log_index < last_log_index:
        return False
    
    # All rules pass - vote for candidate
    return True

def handle_request_vote(self, params: RequestVoteParams) -> RequestVoteResult:
    """Handle incoming RequestVote RPC."""
    
    # Update term if needed
    if params.term > self.current_term:
        self.current_term = params.term
        self.state = NodeState.FOLLOWER
        self.voted_for = None
        self.persist()
    
    # Check if we should vote
    if self.should_vote_for(params):
        self.voted_for = params.candidate_id
        self.persist()
        return RequestVoteResult(
            term=self.current_term,
            vote_granted=True
        )
    
    return RequestVoteResult(
        term=self.current_term,
        vote_granted=False
    )

Election Timeout and Randomization

import random
import asyncio

class ElectionTimeoutManager:
    """
    Manages election timeouts with randomization.
    """
    
    # Typical values
    MIN_ELECTION_TIMEOUT_MS = 1500  # 1.5 seconds
    MAX_ELECTION_TIMEOUT_MS = 3000  # 3 seconds
    HEARTBEAT_INTERVAL_MS = 500     # Heartbeats every 500ms
    
    def __init__(self, node):
        self.node = node
        self.election_timer = None
        self.heartbeat_timer = None
    
    def reset_election_timer(self):
        """Reset election timeout (called on receiving valid heartbeat)."""
        
        if self.election_timer:
            self.election_timer.cancel()
        
        # Random timeout to prevent split votes
        timeout_ms = random.randint(
            self.MIN_ELECTION_TIMEOUT_MS,
            self.MAX_ELECTION_TIMEOUT_MS
        )
        
        self.election_timer = asyncio.create_task(
            self._election_timeout_handler(timeout_ms / 1000)
        )
    
    async def _election_timeout_handler(self, timeout_seconds):
        """Handle election timeout - start election."""
        
        await asyncio.sleep(timeout_seconds)
        
        # Check if we should start election
        if self.node.state == NodeState.FOLLOWER:
            # No heartbeat received - start election
            self.node.start_election()
    
    def start_leader_heartbeats(self):
        """Start sending periodic heartbeats (called when becoming leader)."""
        
        if self.heartbeat_timer:
            self.heartbeat_timer.cancel()
        
        async def send_heartbeats():
            while self.node.state == NodeState.LEADER:
                self.node.send_append_entries_to_all()
                await asyncio.sleep(self.HEARTBEAT_INTERVAL_MS / 1000)
        
        self.heartbeat_timer = asyncio.create_task(send_heartbeats())

Log Replication

The Replication Process

class LogReplication:
    """
    Raft log replication implementation.
    """
    
    def receive_client_request(self, command):
        """
        Leader receives client request.
        
        Process:
        1. Append command to local log
        2. Send AppendEntries to followers
        3. When majority acknowledge, commit and apply
        """
        
        if self.state != NodeState.LEADER:
            # Redirect to leader
            return {"error": "not_leader", "leader": self.leader_id}
        
        # Append to local log
        entry = LogEntry(
            term=self.current_term,
            index=len(self.log),
            command=command
        )
        self.log.append(entry)
        self.persist()
        
        # Replicate to followers
        self.replicate_to_followers()
        
        return {"status": "started", "index": entry.index}
    
    def replicate_to_followers(self):
        """Send AppendEntries to all followers."""
        
        for peer in self.peers:
            asyncio.create_task(self.replicate_to_follower(peer))
    
    async def replicate_to_follower(self, peer):
        """Replicate log to a specific follower."""
        
        # Get prev log info
        prev_log_index = self.next_index[peer.node_id] - 1
        prev_log_term = self.log[prev_log_index].term if prev_log_index >= 0 else 0
        
        # Get entries to send
        entries = self.log[self.next_index[peer.node_id]:]
        
        # Send AppendEntries
        params = AppendEntriesParams(
            term=self.current_term,
            leader_id=self.node_id,
            prev_log_index=prev_log_index,
            prev_log_term=prev_log_term,
            entries=entries,
            leader_commit=self.commit_index
        )
        
        result = await peer.append_entries(params)
        
        # Handle response
        if result.success:
            # Update next and match index
            self.next_index[peer.node_id] = len(self.log)
            self.match_index[peer.node_id] = prev_log_index + len(entries)
            
            # Check if we can commit
            self.check_commit()
        
        else:
            # Handle failure - step back
            if result.term > self.current_term:
                # We're outdated
                self.current_term = result.term
                self.state = NodeState.FOLLOWER
                self.persist()
            
            else:
                # Log mismatch - decrement next_index and retry
                self.next_index[peer.node_id] = max(1, self.next_index[peer.node_id] - 1)
                # Retry with smaller batch
                await self.replicate_to_follower(peer)

Committing Entries

def check_commit(self):
    """
    Check if we can commit new entries.
    
    Raft commits entries when:
    1. Entry is stored on majority of nodes
    2. Leader has applied all prior entries
    """
    
    # Find the highest index that might be committed
    # Check each index from commit_index + 1
    for index in range(self.commit_index + 1, len(self.log)):
        # Count how many nodes have this entry
        count = 1  # Leader has it
        for peer in self.peers:
            if self.match_index[peer.node_id] >= index:
                count += 1
        
        # Check majority and term
        if count > len(self.peers) // 2:
            # Check that the entry's term is current leader's term
            # (Raft doesn't commit entries from previous terms directly)
            if self.log[index].term == self.current_term:
                # Commit this entry
                self.commit_index = index
    
    # Apply uncommitted entries to state machine
    self.apply_to_state_machine()

def apply_to_state_machine(self):
    """Apply committed entries to the state machine."""
    
    while self.last_applied < self.commit_index:
        self.last_applied += 1
        entry = self.log[self.last_applied]
        
        # Apply to state machine (implementation depends on use case)
        result = self.state_machine.apply(entry.command)
        
        # Send response to client if needed
        if entry.client_request_id:
            self.pending_results[entry.client_request_id] = result

Handling AppendEntries

def handle_append_entries(self, params: AppendEntriesParams) -> AppendEntriesResult:
    """Handle incoming AppendEntries RPC."""
    
    # Update term if needed
    if params.term > self.current_term:
        self.current_term = params.term
        self.state = NodeState.FOLLOWER
        self.leader_id = params.leader_id
        self.persist()
    
    # Reset election timer
    self.reset_election_timer()
    
    # If we're leader, reject (another leader exists)
    if self.state == NodeState.LEADER:
        return AppendEntriesResult(term=self.current_term, success=False)
    
    # Validate prev log entry
    if params.prev_log_index > len(self.log) - 1:
        return AppendEntriesResult(
            term=self.current_term,
            success=False,
            conflict_index=len(self.log),
            conflict_term=None
        )
    
    if params.prev_log_index >= 0:
        if self.log[params.prev_log_index].term != params.prev_log_term:
            # Term mismatch - find first conflicting term
            conflict_term = self.log[params.prev_log_index].term
            conflict_index = self._find_first_index_of_term(conflict_term)
            
            return AppendEntriesResult(
                term=self.current_term,
                success=False,
                conflict_index=conflict_index,
                conflict_term=conflict_term
            )
    
    # Append new entries
    self.log = self.log[:params.prev_log_index + 1]
    self.log.extend(params.entries)
    self.persist()
    
    # Update commit index
    if params.leader_commit > self.commit_index:
        self.commit_index = min(params.leader_commit, len(self.log) - 1)
        self.apply_to_state_machine()
    
    return AppendEntriesResult(term=self.current_term, success=True)

Safety Guarantees

Election Safety

Raft guarantees that at most one leader exists at any time:

safety_guarantees = {
    "election_safety": "At most one leader per term",
    "leader_append_only": "Leader never overwrites or deletes entries",
    "log_matching": "If two logs contain an entry with same index and term, logs are identical",
    "leader_completeness": "If entry is committed, it will appear in all future leaders",
    "state_machine_safety": "All nodes apply same log entries in same order"
}

# Election safety implementation:
# - A node only votes for one candidate per term
# - A candidate must have log as up-to-date as voter to get vote
# - Majority required to become leader
# - Terms monotonically increase

Log Matching Property

The log matching property ensures consistency:

def _find_first_index_of_term(self, term: int) -> int:
    """Find first index with given term."""
    for i, entry in enumerate(self.log):
        if entry.term == term:
            return i
    return len(self.log)  # Not found

def _find_last_index_of_term(self, term: int) -> int:
    """Find last index with given term."""
    for i in range(len(self.log) - 1, -1, -1):
        if self.log[i].term == term:
            return i
    return -1  # Not found

Membership Changes

Joint Consensus

Adding or removing nodes safely requires a two-phase approach:

class MembershipChange:
    """
    Safe membership changes using joint consensus.
    """
    
    async def add_node(self, new_node):
        """
        Add a new node using joint consensus.
        
        Phase 1: New configuration (old + new)
        - All operations require majority of (old + new)
        - Both old and new nodes must replicate
        
        Phase 2: New configuration alone
        - Switch to new configuration
        - Old nodes can be shut down
        """
        
        # Add new node as non-voting member first
        self.cluster_config.add_member(new_node, voting=False)
        
        # Wait for node to catch up (replicate recent entries)
        await self.wait_for_catchup(new_node)
        
        # Promote to voting member - Joint consensus
        joint_config = ClusterConfig(
            old_members=self.cluster_config.members,
            new_members=self.cluster_config.members | {new_node},
            joint=True
        )
        
        # Commit joint configuration
        self.log_append(ConfigurationChange(joint_config))
        
        # Wait for joint config to commit on majority
        await self.wait_for_commit(joint_config)
        
        # Switch to new configuration
        new_config = ClusterConfig(members=joint_config.new_members)
        self.log_append(ConfigurationChange(new_config))
        
        # Old nodes can now be removed if needed

Log Compaction and Snapshots

Why Snapshots Are Needed

Logs grow indefinitely, requiring compaction:

class LogCompaction:
    """
    Raft log compaction using snapshots.
    """
    
    def create_snapshot(self):
        """
        Create a snapshot of the current state.
        
        Snapshot contains:
        - Last included index
        - Last included term
        - State machine state
        """
        
        snapshot = Snapshot(
            last_included_index=self.last_applied,
            last_included_term=self.log[self.last_applied].term,
            state=self.state_machine.serialize()
        )
        
        # Save snapshot to disk
        self.save_snapshot(snapshot)
        
        # Truncate log up to snapshot point
        self.log = self.log[:self.last_applied + 1]
        
        # Optionally delete older snapshots
        self.persist()
    
    async def install_snapshot(self, peer, params: InstallSnapshotParams):
        """
        Send snapshot to a follower that's far behind.
        
        This happens when follower's next_index is behind leader's
        truncated log (due to snapshot).
        """
        
        # Send snapshot in chunks
        offset = 0
        while offset < params.data.length:
            chunk = params.data[offset:offset + CHUNK_SIZE]
            
            await peer.send_snapshot_chunk(SnapshotChunk(
                term=self.current_term,
                leader_id=self.node_id,
                offset=offset,
                data=chunk,
                done=(offset + CHUNK_SIZE >= params.data.length)
            ))
            
            offset += CHUNK_SIZE

Practical Implementation Tips

Persistence

Critical data that must be persisted:

class PersistenceManager:
    """
    Handles persistence of Raft state.
    """
    
    def persist(self):
        """Save all persistent state to disk."""
        
        data = {
            "currentTerm": self.current_term,
            "votedFor": self.voted_for,
            "log": [(e.term, e.command) for e in self.log]
        }
        
        # Atomic write
        temp_file = self.persist_path + ".tmp"
        with open(temp_file, 'wb') as f:
            pickle.dump(data, f)
        os.replace(temp_file, self.persist_path)
    
    def recover(self):
        """Recover state from disk on startup."""
        
        if not os.path.exists(self.persist_path):
            return
        
        with open(self.persist_path, 'rb') as f:
            data = pickle.load(f)
        
        self.current_term = data["currentTerm"]
        self.voted_for = data["votedFor"]
        self.log = [LogEntry(term=t, index=i, command=c) 
                    for i, (t, c) in enumerate(data["log"])]

Performance Considerations

performance_tips = {
    "batch_reads": "Batch multiple client reads into single round-trip",
    "pipeline": "Pipeline AppendEntries requests for throughput",
    "snapshots": "Snapshot frequently to reduce log size",
    "async": "Make RPCs asynchronous for better concurrency",
    "udt": "Use efficient serialization (Protocol Buffers, MessagePack)"
}

# Common issues to avoid:
pitfalls = [
    "Forgetting to persist before responding to RPCs",
    "Not handling stale responses after term changes",
    "Infinite retry loops on failedAppendEntries",
    "Not randomizing election timeouts enough",
    "Ignoring Snapshot installs from leader"
]

Monitoring and Debugging

Key Metrics

metrics_to_monitor = {
    "leader_election": [
        "Number of elections per minute",
        "Election timeout frequency",
        "Split vote frequency"
    ],
    "log_replication": [
        "Commit latency (P50, P95, P99)",
        "Replication lag per follower",
        "Number of uncommitted entries"
    ],
    "snapshots": [
        "Snapshot creation frequency",
        "Snapshot size",
        "Time to create/install snapshot"
    ],
    "resource": [
        "Log size",
        "Memory usage",
        "Disk I/O"
    ]
}

When to Use Raft

Use Raft When

use_cases = {
    "distributed_databases": ["etcd", "Consul", "CockroachDB", "TiKV"],
    "coordination": ["Service discovery", "Lock service", "Configuration management"],
    "state_replication": ["Distributed caches", "Leader election", "Namespaces"]
}

choose_raft_when = [
    "You need strong consistency",
    "Number of nodes is small (< 7)",
    "You value understandability and maintainability",
    "Network partitions are rare",
    "Simple deployment is preferred"
]

consider_alternatives_when = [
    "Eventual consistency is acceptable",
    "You need very high availability across datacenters",
    "Scale to many nodes",
    "Byzantine fault tolerance is required"
]

Conclusion

The Raft consensus algorithm provides a practical solution to the distributed consensus problem. Its design for understandability makes it an excellent choice for building reliable distributed systems.

Key takeaways:

  • Leader election uses randomized timeouts and majority voting to ensure safety
  • Log replication ensures all nodes apply the same commands in the same order
  • Safety guarantees prevent split brains and ensure consistency
  • Snapshots enable log compaction for long-running systems
  • Membership changes use joint consensus for safe transitions

Raft’s clarity and proven correctness make it the go-to choice for most consensus needs in modern distributed systems.


Comments