Database replication distributes data across multiple nodes for high availability, read scaling, and disaster recovery. This guide covers replication strategies and implementations.
Replication Types
replication_types:
- name: "Primary-Replica (Leader-Follower)"
description: "One primary, multiple replicas"
pros: "Simple, consistent writes"
cons: "Single point of failure for writes"
- name: "Multi-Master"
description: "Multiple primaries accept writes"
pros: "No write bottleneck"
cons: "Conflict resolution complex"
- name: "Leaderless"
description: "No designated leader"
pros: "High availability"
cons: "Eventual consistency"
Primary-Replica Replication
Implementation
# PostgreSQL Primary-Replica setup
# PRIMARY (write to master)
import psycopg2
class PrimaryDatabase:
def __init__(self, connection_string):
self.conn = psycopg2.connect(connection_string)
def write(self, query, params):
with self.conn.cursor() as cur:
cur.execute(query, params)
self.conn.commit()
def get_replication_lag(self):
with self.conn.cursor() as cur:
cur.execute("""
SELECT now() - pg_last_xact_replay_timestamp()
AS lag
""")
return cur.fetchone()[0]
# Read from replica (load balancing)
class ReplicaPool:
def __init__(self, replica_hosts):
self.hosts = replica_hosts
self.current = 0
def read(self, query, params=None):
host = self.hosts[self.current]
self.current = (self.current + 1) % len(self.hosts)
conn = psycopg2.connect(host)
try:
with conn.cursor() as cur:
cur.execute(query, params or ())
return cur.fetchall()
finally:
conn.close()
def is_replica_synchronized(self, max_lag=1):
"""Check if replica is caught up"""
for host in self.hosts:
conn = psycopg2.connect(host)
try:
with conn.cursor() as cur:
cur.execute("SELECT pg_is_in_recovery()")
is_recovering = cur.fetchone()[0]
if is_recovering:
return False
finally:
conn.close()
return True
Streaming Replication
# PostgreSQL streaming replication config
# postgresql.conf (primary)
wal_level = replica
max_wal_senders = 3
wal_keep_size = 1GB
# postgresql.conf (replica)
primary_conninfo = 'host=primary port=5432 user=replication'
# Create replication slot
# On replica:
# pg_create_physical_replication_slot('replica_slot')
Multi-Master Replication
Conflict Resolution
class MultiMasterDB:
"""Simple last-writer-wins conflict resolution"""
def __init__(self, nodes):
self.nodes = nodes
def write(self, key, value, timestamp=None):
if timestamp is None:
timestamp = time.time()
# Write to all nodes
for node in self.nodes:
node.put(key, value, timestamp)
def read(self, key):
# Read from all and pick latest
latest = None
latest_value = None
for node in self.nodes:
value, ts = node.get(key)
if ts and (latest is None or ts > latest):
latest = ts
latest_value = value
return latest_value
# More sophisticated: CRDTs (Conflict-free Replicated Data Types)
from collections import Counter
class GCounter:
"""Grow-only counter CRDT"""
def __init__(self):
self.counts = {} # node_id -> count
def increment(self, node_id):
self.counts[node_id] = self.counts.get(node_id, 0) + 1
def value(self):
return sum(self.counts.values())
def merge(self, other):
for node_id, count in other.counts.items():
self.counts[node_id] = max(
self.counts.get(node_id, 0),
count
)
Leaderless Replication
Dynamo-Style Quorum
class LeaderlessDB:
def __init__(self, nodes, r=2, w=2):
self.nodes = nodes
self.r = r # Read quorum
self.w = w # Write quorum
def write(self, key, value):
# Write to N nodes
success_count = 0
for node in self.nodes:
if node.put(key, value):
success_count += 1
return success_count >= self.w
def read(self, key):
# Read from R nodes
values = []
for node in self.nodes[:self.r]:
value = node.get(key)
if value:
values.append(value)
# Return most recent (simplified)
return max(values, key=lambda v: v.timestamp) if values else None
Monitoring Replication
# Monitor replication health
def check_replication_health(conn, replica_conns):
health = {
"primary": {"healthy": True},
"replicas": []
}
# Check primary
with conn.cursor() as cur:
cur.execute("SELECT pg_is_in_recovery()")
health["primary"]["healthy"] = not cur.fetchone()[0]
# Check each replica
for replica in replica_conns:
try:
with replica.cursor() as cur:
# Check if receiving
cur.execute("SELECT pg_is_in_recovery()")
is_replica = cur.fetchone()[0]
# Get lag
if is_replica:
cur.execute("""
SELECT now() - pg_last_xact_replay_timestamp()
AS lag
""")
lag = cur.fetchone()[0]
health["replicas"].append({
"healthy": True,
"lag_seconds": lag.total_seconds()
})
except Exception as e:
health["replicas"].append({
"healthy": False,
"error": str(e)
})
return health
Best Practices
# Replication best practices
configuration:
- "Use async for lower latency"
- "Sync for critical data"
- "3 replicas minimum"
- "Spread across AZs"
monitoring:
- "Track replication lag"
- "Alert on high lag"
- "Monitor disk usage"
failover:
- "Automatic failover"
- "Test failover regularly"
- "Promote carefully"
Conclusion
Choose replication strategy:
- Primary-replica: Simple, read scaling
- Multi-master: Write scaling, needs conflict resolution
- Leaderless: Highest availability, eventual consistency
Always monitor replication lag and test failover procedures.
Comments