Building distributed systems requires understanding fundamental challenges and patterns. This guide covers the core concepts that every architect and engineer should know — from the CAP theorem and consensus algorithms to modern approaches like CRDTs and gossip protocols.
Why Distributed Systems?
Applications grow beyond what a single machine can handle. Distributed systems distribute work across multiple nodes to achieve:
Benefits:
✓ Scalability - Add more machines to handle load
✓ Reliability - No single point of failure
✓ Performance - Parallel processing across machines
✓ Cost-effective - Use commodity hardware
✓ Availability - Continue running during failures
✓ Geographic distribution - Serve users worldwide
The Fallacies of Distributed Computing
Eight assumptions developers make that lead to failures in distributed systems. L Peter Deutsch and others at Sun Microsystems formulated these in the 1990s, and they remain relevant today:
- The network is reliable — Networks drop packets, disconnect, and experience latency spikes.
- Latency is zero — Every network call adds measurable delay. A local in-memory call takes nanoseconds; a cross-datacenter RPC takes milliseconds.
- Bandwidth is infinite — Network capacity is finite. Sending large payloads causes congestion.
- The network is secure — Networks are subject to eavesdropping, man-in-the-middle attacks, and unauthorized access.
- Topology doesn’t change — Network configurations change: servers are added, removed, or relocated.
- There is one administrator — Real systems span teams and organizations with different policies.
- Transport cost is zero — Serialization, compression, and protocol overhead consume CPU and memory.
- The network is homogeneous — Real networks mix hardware, OS versions, protocol stacks, and configurations.
Internalize these fallacies before designing any distributed system. Every architectural decision should account for the fact that the network is unreliable, latency matters, and failures are partial.
The CAP Theorem
Understanding CAP
The CAP theorem states that a distributed system can only guarantee two of three properties simultaneously:
flowchart LR
subgraph CAP[CAP Theorem]
direction TB
C[Consistency<br/>All nodes see same data]
A[Availability<br/>Every request gets a response]
P[Partition Tolerance<br/>System works despite network splits]
end
C --- A
A --- P
P --- C
CP[CP Systems<br/>ZooKeeper, etcd<br/>Strong consistency during partitions] -.-> C
CP -.-> P
AP[AP Systems<br/>Cassandra, DynamoDB<br/>High availability during partitions] -.-> A
AP -.-> P
Since network partitions are inevitable, the real choice is between CP (consistency + partition tolerance) and AP (availability + partition tolerance). A CA system — one that sacrifices partition tolerance — only works when there are no network partitions, which is impossible to guarantee.
Consistency Models
Strong Consistency:
Write → All nodes see write immediately
Reads always return the most recent write
Used in: databases, coordination services (etcd, ZooKeeper)
Eventual Consistency:
Write → Changes propagate asynchronously
Reads may return stale data for a window
Used in: DNS, CDNs, Amazon DynamoDB
Read-your-writes Consistency:
After a write, the same client always sees that write
Other clients may see stale data briefly
Used in: user session stores
Causal Consistency:
If event A caused event B, all nodes see A before B
Concurrent events can be seen in any order
Used in: collaborative editing, social feeds
Monotonic Reads:
Once a client reads a value, subsequent reads never return older values
Used in: analytics dashboards
Choosing a consistency model means choosing tradeoffs. Strong consistency simplifies application logic but reduces availability and throughput. Weaker models enable better performance but require the application to handle temporary inconsistencies.
Time and Ordering
In a distributed system, there is no global clock. Each machine has its own clock, and clocks drift. This makes determining the order of events fundamentally hard.
Why Time Matters
Ordering is essential for:
- Causality — Understanding which event happened before another
- Consistency — Ensuring replicas agree on update order
- Transactions — Serializing concurrent operations
- Debugging — Reconstructing event timelines across nodes
Physical Clocks and NTP
Network Time Protocol (NTP) synchronizes machine clocks but cannot eliminate drift entirely. Clock skew between machines can be tens to hundreds of milliseconds. GPS-based clocks (Google TrueTime) achieve bounds of 1-10ms but are not generally available.
Lamport Clocks
A Lamport clock assigns a monotonically increasing integer to each event. If event A causally precedes event B, then clock(A) < clock(B). However, the converse is not necessarily true — equal or lower clocks do not guarantee ordering.
class LamportClock:
def __init__(self):
self.time = 0
def tick(self):
"""Increment on local event."""
self.time += 1
return self.time
def update(self, other_time):
"""Update on receiving a message."""
self.time = max(self.time, other_time) + 1
def __repr__(self):
return f"LamportClock({self.time})"
Vector Clocks
Vector clocks track causality more precisely by maintaining one counter per node. This enables detection of concurrent updates — a critical capability for conflict resolution.
class VectorClock:
def __init__(self):
self.clock = {}
def increment(self, node):
self.clock[node] = self.clock.get(node, 0) + 1
def merge(self, other):
for node, time in other.clock.items():
self.clock[node] = max(self.clock.get(node, 0), time)
def happens_before(self, other):
return all(
self.clock.get(n, 0) <= other.clock.get(n, 0)
for n in set(self.clock.keys()) | set(other.clock.keys())
)
def concurrent(self, other):
return not self.happens_before(other) and not other.happens_before(self)
Hybrid Clocks
Hybrid clocks combine the readability of wall-clock timestamps with the ordering guarantees of logical clocks. They are used in production systems like CockroachDB and Cassandra.
class HybridClock:
def __init__(self):
self.wall_clock = time.time()
self.logical = 0
def now(self):
current_wall = time.time()
if current_wall > self.wall_clock:
self.wall_clock = current_wall
self.logical = 0
else:
self.logical += 1
return (self.wall_clock, self.logical)
Consensus Algorithms
Consensus allows a group of nodes to agree on a value despite failures. It is the foundation for leader election, distributed locking, state machine replication, and atomic commits.
Raft Algorithm
Raft is a consensus algorithm designed for understandability. It breaks consensus into three sub-problems: leader election, log replication, and safety.
Leader election — Nodes are in one of three states:
stateDiagram-v2
[*] --> Follower: start
Follower --> Candidate: election timeout
Candidate --> Leader: wins majority
Candidate --> Follower: higher term found
Leader --> Follower: discovers higher term
Follower --> Follower: receives heartbeat
Log replication — The leader receives client requests, appends them to its log, and replicates entries to followers. An entry is committed once it has been replicated to a majority of nodes.
class RaftNode:
def __init__(self, node_id, peers):
self.node_id = node_id
self.peers = peers
self.current_term = 0
self.state = "follower"
self.log = []
self.commit_index = 0
def start_election(self):
self.current_term += 1
self.state = "candidate"
self.votes = 1
for peer in self.peers:
if peer.request_vote(self.current_term, self.node_id, len(self.log)):
self.votes += 1
if self.votes > len(self.peers) // 2:
self.state = "leader"
self.broadcast_append_entries()
def broadcast_append_entries(self):
for peer in self.peers:
prev_log_index = len(self.log) - 1
prev_log_term = self.log[prev_log_index]["term"] if prev_log_index >= 0 else 0
peer.append_entries(
term=self.current_term,
leader_id=self.node_id,
prev_log_index=prev_log_index,
prev_log_term=prev_log_term,
entries=self.log[self.commit_index:],
leader_commit=self.commit_index,
)
def append_entries(self, term, leader_id, prev_log_index, prev_log_term, entries, leader_commit):
if term < self.current_term:
return False
self.current_term = term
self.state = "follower"
if self.log[prev_log_index]["term"] != prev_log_term:
return False
self.log = self.log[:prev_log_index + 1] + entries
self.commit_index = min(leader_commit, len(self.log) - 1)
return True
Paxos
Paxos provides the same guarantees as Raft but through a different mechanism. It uses three roles — proposer, acceptor, and learner — across two phases:
- Prepare phase: Proposer picks a proposal number N and sends prepare(N) to acceptors. Acceptors promise not to accept proposals numbered less than N and return any value they have already accepted.
- Accept phase: If the proposer receives promises from a majority, it sends accept(N, value) with either the value from the highest-numbered prior proposal or its own value. Acceptors accept unless they have already promised a higher-numbered proposal.
Paxos is provably correct but notoriously hard to understand and implement correctly. Raft replaced Paxos in most modern systems (etcd, Consul, TiKV) because it offers the same guarantees with a simpler model.
Practical Consensus Systems
| System | Algorithm | Use Case |
|---|---|---|
| etcd | Raft | Kubernetes coordination, service discovery |
| ZooKeeper | Zab (Paxos-like) | Configuration management, distributed locking |
| Consul | Raft | Service mesh, KV store, health checking |
| TiKV | Raft | Distributed database storage |
| Apache Kafka | Quorum-based | Event streaming, log replication |
| Google Chubby | Paxos | Distributed lock service |
Leader Election Patterns
Leader election ensures that only one node acts as the active coordinator at any time. Many systems implement it atop consensus services like etcd or ZooKeeper.
etcd-based Leader Election
import etcd3
class LeaderElector:
"""Elect a leader using etcd's ephemeral keys."""
def __init__(self, client, election_name, node_id, ttl=10):
self.client = client
self.key = f"/elections/{election_name}"
self.node_id = node_id
self.ttl = ttl
self.is_leader = False
def campaign(self):
try:
self.client.put(self.key, self.node_id, lease=self.ttl, prev_exist=False)
self.is_leader = True
print(f"Node {self.node_id} is now leader")
return True
except etcd3.exceptions.PreconditionFailedError:
current_leader, _ = self.client.get(self.key)
print(f"Leader is {current_leader}")
return False
def resign(self):
if self.is_leader:
self.client.delete(self.key)
self.is_leader = False
def watch(self, callback):
events, cancel = self.client.watch(self.key)
for event in events:
callback(event)
Lease-based Leader Election
Leases prevent split-brain scenarios by bounding how long a node can act as leader without renewal.
import time
import threading
class Lease:
"""Time-bound lease for leader election."""
def __init__(self, duration=10):
self.duration = duration
self.expires_at = time.time() + duration
self.lock = threading.Lock()
def renew(self):
with self.lock:
self.expires_at = time.time() + self.duration
def is_expired(self):
return time.time() > self.expires_at
def remaining(self):
return max(0, self.expires_at - time.time())
Fault Tolerance
Failure Detection
Failure detectors are the nervous system of a distributed system. They determine which nodes are alive and which have failed.
Heartbeat-based — The simplest approach: nodes send periodic “I am alive” messages. If N heartbeats are missed, the node is declared failed.
Phi Accrual Failure Detector — Used by Cassandra and Akka. Instead of a fixed timeout, it computes a suspicion level (phi) based on historical heartbeat inter-arrival times. A phi value of 1 means a 10% chance the node has failed; phi of 8 means a 99.9999% chance.
import time
import math
import statistics
class PhiAccrualDetector:
def __init__(self, phi_threshold=8, window_size=1000):
self.history = []
self.last_heartbeat = {}
self.phi_threshold = phi_threshold
self.window_size = window_size
def record_heartbeat(self, node_id):
now = time.time()
if node_id in self.last_heartbeat:
interval = now - self.last_heartbeat[node_id]
self.history.append(interval)
if len(self.history) > self.window_size:
self.history.pop(0)
self.last_heartbeat[node_id] = now
def phi(self, node_id):
if node_id not in self.last_heartbeat:
return float("inf")
elapsed = time.time() - self.last_heartbeat[node_id]
if len(self.history) < 2:
return 0.0
mean = statistics.mean(self.history)
variance = statistics.variance(self.history)
if variance == 0:
return 0.0 if elapsed < mean else float("inf")
std_dev = math.sqrt(variance)
cdf = 0.5 * (1 + math.erf((elapsed - mean) / (std_dev * math.sqrt(2))))
return -math.log10(1 - cdf)
def is_suspected(self, node_id):
return self.phi(node_id) > self.phi_threshold
Circuit Breaker
A circuit breaker prevents cascading failures by failing fast when a dependency is unhealthy.
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, success_threshold=3, timeout=30):
self.failure_count = 0
self.success_count = 0
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.state = CircuitState.CLOSED
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self.last_failure_time and time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
else:
self.failure_count = 0
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Quorums
A quorum is the minimum number of nodes that must participate in an operation for it to be valid. For N replicas, write quorum W and read quorum R must satisfy R + W > N for strong consistency.
class Quorum:
def __init__(self, nodes, read_quorum=None, write_quorum=None):
self.nodes = nodes
self.n = len(nodes)
self.r = read_quorum or (self.n // 2 + 1)
self.w = write_quorum or (self.n // 2 + 1)
def write(self, key, value):
successes = 0
for node in self.nodes:
if node.write(key, value):
successes += 1
if successes >= self.w:
return True
return False
def read(self, key):
values = []
for node in self.nodes:
val = node.read(key)
if val is not None:
values.append(val)
if len(values) >= self.r:
return self.resolve(values)
return None
def resolve(self, values):
return max(values, key=lambda v: v.timestamp)
Distributed Transactions
Distributed transactions coordinate state changes across multiple nodes. They trade performance for correctness guarantees.
Two-Phase Commit (2PC)
2PC coordinates transactions across multiple participants using a coordinator:
sequenceDiagram
participant C as Coordinator
participant P1 as Participant 1
participant P2 as Participant 2
C->>P1: Prepare
C->>P2: Prepare
P1-->>C: Ready
P2-->>C: Ready
C->>P1: Commit
C->>P2: Commit
P1-->>C: Ack
P2-->>C: Ack
2PC blocks if the coordinator fails. Three-Phase Commit (3PC) adds a pre-commit phase to avoid blocking, at the cost of increased message complexity.
Saga Pattern
Sagas break a distributed transaction into a sequence of local transactions, each with a compensating action for rollback. Sagas avoid the locking and blocking problems of 2PC.
class SagaStep:
def __init__(self, name, execute, compensate):
self.name = name
self.execute = execute
self.compensate = compensate
class Saga:
def __init__(self, steps):
self.steps = steps
async def run(self, context):
completed = []
for step in self.steps:
try:
await step.execute(context)
completed.append(step)
except Exception as e:
print(f"Step {step.name} failed: {e}")
for comp_step in reversed(completed):
try:
await comp_step.compensate(context)
except Exception as ce:
print(f"Compensation for {comp_step.name} failed: {ce}")
raise
booking_saga = Saga([
SagaStep("book_flight", book_flight, cancel_flight),
SagaStep("book_hotel", book_hotel, cancel_hotel),
SagaStep("process_payment", process_payment, refund_payment),
])
When to Use Each
| Approach | Consistency | Performance | Failure Handling | Use When |
|---|---|---|---|---|
| 2PC | Strong | Low | Blocks on coordinator failure | Small number of participants, strong consistency needed |
| 3PC | Strong | Moderate | Non-blocking | More participants, still need strong consistency |
| Saga | Eventual | High | Compensating actions | Long-running transactions, high throughput needed |
Data Replication
Replication Strategies
Synchronous replication waits for all replicas to acknowledge before confirming a write. It provides the strongest consistency but the highest latency.
Asynchronous replication writes to the primary and returns immediately, replicating changes in the background. It offers low latency but risks data loss on primary failure.
Quorum-based replication writes to W of N replicas and reads from R of N replicas, where R + W > N guarantees strong consistency.
class ReplicatedStore:
def __init__(self, strategy="quorum", replicas=None):
self.strategy = strategy
self.replicas = replicas or []
self.primary = replicas[0] if replicas else None
def write(self, key, value):
if self.strategy == "sync":
return all(r.write(key, value) for r in self.replicas)
elif self.strategy == "async":
self.primary.write(key, value)
for r in self.replicas[1:]:
r.async_write(key, value)
return True
elif self.strategy == "quorum":
successes = sum(1 for r in self.replicas if r.write(key, value))
return successes >= len(self.replicas) // 2 + 1
Gossip Protocols
Gossip protocols disseminate information through a cluster by having each node periodically exchange state with a random subset of peers. Information spreads exponentially — reaching all nodes in O(log N) rounds.
import random
class GossipNode:
def __init__(self, node_id, peers):
self.node_id = node_id
self.peers = peers
self.state = {}
def gossip_round(self):
target = random.choice(self.peers)
target.merge_state(self.state)
def merge_state(self, other_state):
for key, (value, timestamp) in other_state.items():
if key not in self.state or timestamp > self.state[key][1]:
self.state[key] = (value, timestamp)
Gossip is used by Cassandra (node discovery), AWS Dynamo (membership), and Bitcoin (transaction propagation).
Conflict Resolution
Last-Write-Wins (LWW) — Each value carries a timestamp. The value with the latest timestamp wins. Simple but can lose data when clocks are skewed.
Vector Clocks — Each node tracks version counters. Concurrent updates are detected and can be resolved by the application or by using CRDTs.
CRDTs: Conflict-Free Replicated Data Types
CRDTs are data structures designed for optimistic replication. They guarantee that replicas converge to the same state without requiring coordination, even in the presence of concurrent updates.
Types of CRDTs
State-based CRDTs (CvRDTs) — Replicas periodically exchange their full state. The merge function is commutative, associative, and idempotent.
Operation-based CRDTs (CmRDTs) — Replicas exchange operations rather than state. Operations must commute.
Common CRDTs
G-Counter (Grow-only Counter) — Only supports increment. Each replica maintains its own count; the total is the sum across all replicas.
class GCounter:
def __init__(self, node_id):
self.node_id = node_id
self.counts = {}
def increment(self):
self.counts[self.node_id] = self.counts.get(self.node_id, 0) + 1
def value(self):
return sum(self.counts.values())
def merge(self, other):
for node, count in other.counts.items():
self.counts[node] = max(self.counts.get(node, 0), count)
PN-Counter — Supports both increment and decrement by combining two G-Counters.
LWW-Register — Each write carries a timestamp. The last write always wins.
OR-Set (Observed-Remove Set) — Supports add and remove without conflicts. An element is removed only if its specific instance (identified by a unique tag) has been seen by all replicas.
class ORSet:
def __init__(self):
self.elements = {} # element -> set of tags
def add(self, element, tag):
if element not in self.elements:
self.elements[element] = set()
self.elements[element].add(tag)
def remove(self, element):
self.elements.pop(element, None)
def contains(self, element):
return element in self.elements and len(self.elements[element]) > 0
def merge(self, other):
for element, tags in other.elements.items():
if element not in self.elements:
self.elements[element] = set()
self.elements[element].update(tags)
When to Use CRDTs
CRDTs excel in:
- Collaborative applications — Google Docs, Figma, Notion
- Offline-first mobile apps — Local edits sync when connectivity returns
- Multi-region databases — Riak, Redis with CRDT support
- IoT systems — Devices operate independently and sync later
CRDTs are not a replacement for consensus-based strong consistency. They shine where availability and partition tolerance matter more than strong consistency.
Partitioning and Routing
Consistent Hashing
Consistent hashing distributes data across nodes with minimal redistribution when nodes join or leave. Each node is assigned positions on a hash ring; each key is assigned to the nearest node in clockwise order.
import hashlib
class ConsistentHashRing:
def __init__(self, virtual_nodes=100):
self.ring = {}
self.sorted_keys = []
self.virtual_nodes = virtual_nodes
def _hash(self, key):
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node_id):
for i in range(self.virtual_nodes):
vnode_key = f"{node_id}:{i}"
hash_val = self._hash(vnode_key)
self.ring[hash_val] = node_id
self.sorted_keys = sorted(self.ring.keys())
def remove_node(self, node_id):
for i in range(self.virtual_nodes):
vnode_key = f"{node_id}:{i}"
hash_val = self._hash(vnode_key)
self.ring.pop(hash_val, None)
self.sorted_keys = sorted(self.ring.keys())
def get_node(self, key):
if not self.ring:
return None
hash_val = self._hash(key)
for ring_key in self.sorted_keys:
if hash_val <= ring_key:
return self.ring[ring_key]
return self.ring[self.sorted_keys[0]]
Consistent hashing is used by Amazon Dynamo, Cassandra, and Discord for data partitioning.
Range Partitioning
Range partitioning divides data by key ranges. It supports efficient range queries but requires careful handling of hot spots and rebalancing.
class RangePartitioner:
def __init__(self, ranges):
self.ranges = sorted(ranges)
def partition(self, key):
for i, (start, end) in enumerate(self.ranges):
if start <= key < end:
return i
return len(self.ranges) - 1
def split(self, partition_idx, split_key):
start, end = self.ranges[partition_idx]
self.ranges[partition_idx] = (start, split_key)
self.ranges.insert(partition_idx + 1, (split_key, end))
Observability in Distributed Systems
Distributed systems require observability across three pillars: logs, metrics, and traces.
Distributed tracing tracks a single request as it traverses multiple services. Each service adds a span containing timing and metadata. Tools like Jaeger, Zipkin, and OpenTelemetry collect and visualize traces.
Metrics aggregation collects counters, gauges, and histograms from every node. Prometheus polls targets and stores time-series data, while Grafana provides dashboards.
Structured logging ensures every log entry carries correlation IDs, service names, and timestamps. The OpenTelemetry Collector processes, batches, and routes telemetry to backend systems.
Key observability patterns for distributed systems:
- Health check endpoints (
/health,/ready) for load balancers - Graceful degradation — return partial results when dependencies fail
- Redundancy at every level — eliminate single points of failure
- Chaos engineering — proactively test failure scenarios in production
Modern Trends (2025-2026)
Object Storage as the Database
Cloud object storage (Amazon S3, Azure Blob) is becoming a core architectural component for transactional and analytical systems. Systems like Amazon Aurora, Snowflake, and WarpStream use object storage for durability and scalability, separating compute from storage.
Object storage features continue to expand: cross-region replication, conditional writes, encryption, object versioning, and read-after-write consistency. These features reduce the need for custom replication and coordination logic.
New Programming Models
Platforms like Dapr, Temporal, Kalix, and wasmCloud abstract away the hardest parts of distributed systems — state management, partial failures, and workflows. WebAssembly enables portable, secure execution of business logic across runtimes.
Databases Are Decomposing
Modern databases disaggregate compute, storage, and memory. A single query may use different engines for parsing, optimization, execution, and storage. This decomposition enables independent scaling of each component.
Best Practices
Design Principles
Assume failures will happen. Build redundancy at every level. Test failure scenarios with chaos engineering. Minimize distributed dependencies — not every operation needs strong consistency. Prefer simpler consistency models and add complexity only when required.
Operational Practices
Monitor node health with adaptive failure detectors. Measure latency distributions, not just averages. Alert on anomalies, not static thresholds. Instrument service boundaries first before adding detailed tracing.
Common Pitfalls
Treating the network as reliable. Ignoring clock skew. Assuming synchronous calls are safe. Over-engineering when a simple approach suffices. Forgetting to test partition scenarios.
Conclusion
Distributed systems are built on foundational tradeoffs: consistency vs. availability (CAP), latency vs. throughput, and coordination vs. autonomy. Understanding these tradeoffs matters more than memorizing specific algorithms.
Start with simpler models — a single-region deployment with eventual consistency might serve 95% of use cases. Add strong consistency, consensus, and complex replication only where correctness demands it. Test failure scenarios thoroughly. The complexity of distributed systems is inherent — manage it by choosing the right abstractions and knowing when to keep things simple.
External Resources
- Designing Data-Intensive Applications
- Raft Paper
- Patterns of Distributed Systems
- Paxos Made Simple
- CRDTs: Conflict-Free Replicated Data Types
- Google SRE Book: Managing Critical State
- Introduction to Reliable and Secure Distributed Systems
Related Articles
- System Design Interview Guide
- Event-Driven Architecture
- Saga Pattern
- Circuit Breaker Pattern
- Load Balancing Strategies
Comments