Skip to main content
โšก Calmops

Database Replication Strategies: Primary-Replica, Multi-Master, and Leaderless

Database replication distributes data across multiple nodes for high availability, read scaling, and disaster recovery. This guide covers replication strategies and implementations.

Replication Types

replication_types:
  - name: "Primary-Replica (Leader-Follower)"
    description: "One primary, multiple replicas"
    pros: "Simple, consistent writes"
    cons: "Single point of failure for writes"
    
  - name: "Multi-Master"
    description: "Multiple primaries accept writes"
    pros: "No write bottleneck"
    cons: "Conflict resolution complex"
    
  - name: "Leaderless"
    description: "No designated leader"
    pros: "High availability"
    cons: "Eventual consistency"

Primary-Replica Replication

Implementation

# PostgreSQL Primary-Replica setup

# PRIMARY (write to master)
import psycopg2

class PrimaryDatabase:
    def __init__(self, connection_string):
        self.conn = psycopg2.connect(connection_string)
    
    def write(self, query, params):
        with self.conn.cursor() as cur:
            cur.execute(query, params)
            self.conn.commit()
    
    def get_replication_lag(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                SELECT now() - pg_last_xact_replay_timestamp() 
                AS lag
            """)
            return cur.fetchone()[0]
# Read from replica (load balancing)

class ReplicaPool:
    def __init__(self, replica_hosts):
        self.hosts = replica_hosts
        self.current = 0
    
    def read(self, query, params=None):
        host = self.hosts[self.current]
        self.current = (self.current + 1) % len(self.hosts)
        
        conn = psycopg2.connect(host)
        try:
            with conn.cursor() as cur:
                cur.execute(query, params or ())
                return cur.fetchall()
        finally:
            conn.close()
    
    def is_replica_synchronized(self, max_lag=1):
        """Check if replica is caught up"""
        for host in self.hosts:
            conn = psycopg2.connect(host)
            try:
                with conn.cursor() as cur:
                    cur.execute("SELECT pg_is_in_recovery()")
                    is_recovering = cur.fetchone()[0]
                    if is_recovering:
                        return False
            finally:
                conn.close()
        return True

Streaming Replication

# PostgreSQL streaming replication config

# postgresql.conf (primary)
wal_level = replica
max_wal_senders = 3
wal_keep_size = 1GB

# postgresql.conf (replica)
primary_conninfo = 'host=primary port=5432 user=replication'

# Create replication slot
# On replica:
# pg_create_physical_replication_slot('replica_slot')

Multi-Master Replication

Conflict Resolution

class MultiMasterDB:
    """Simple last-writer-wins conflict resolution"""
    
    def __init__(self, nodes):
        self.nodes = nodes
    
    def write(self, key, value, timestamp=None):
        if timestamp is None:
            timestamp = time.time()
        
        # Write to all nodes
        for node in self.nodes:
            node.put(key, value, timestamp)
    
    def read(self, key):
        # Read from all and pick latest
        latest = None
        latest_value = None
        
        for node in self.nodes:
            value, ts = node.get(key)
            if ts and (latest is None or ts > latest):
                latest = ts
                latest_value = value
        
        return latest_value


# More sophisticated: CRDTs (Conflict-free Replicated Data Types)
from collections import Counter

class GCounter:
    """Grow-only counter CRDT"""
    
    def __init__(self):
        self.counts = {}  # node_id -> count
    
    def increment(self, node_id):
        self.counts[node_id] = self.counts.get(node_id, 0) + 1
    
    def value(self):
        return sum(self.counts.values())
    
    def merge(self, other):
        for node_id, count in other.counts.items():
            self.counts[node_id] = max(
                self.counts.get(node_id, 0), 
                count
            )

Leaderless Replication

Dynamo-Style Quorum

class LeaderlessDB:
    def __init__(self, nodes, r=2, w=2):
        self.nodes = nodes
        self.r = r  # Read quorum
        self.w = w  # Write quorum
    
    def write(self, key, value):
        # Write to N nodes
        success_count = 0
        for node in self.nodes:
            if node.put(key, value):
                success_count += 1
        
        return success_count >= self.w
    
    def read(self, key):
        # Read from R nodes
        values = []
        for node in self.nodes[:self.r]:
            value = node.get(key)
            if value:
                values.append(value)
        
        # Return most recent (simplified)
        return max(values, key=lambda v: v.timestamp) if values else None

Monitoring Replication

# Monitor replication health

def check_replication_health(conn, replica_conns):
    health = {
        "primary": {"healthy": True},
        "replicas": []
    }
    
    # Check primary
    with conn.cursor() as cur:
        cur.execute("SELECT pg_is_in_recovery()")
        health["primary"]["healthy"] = not cur.fetchone()[0]
    
    # Check each replica
    for replica in replica_conns:
        try:
            with replica.cursor() as cur:
                # Check if receiving
                cur.execute("SELECT pg_is_in_recovery()")
                is_replica = cur.fetchone()[0]
                
                # Get lag
                if is_replica:
                    cur.execute("""
                        SELECT now() - pg_last_xact_replay_timestamp() 
                        AS lag
                    """)
                    lag = cur.fetchone()[0]
                    
                    health["replicas"].append({
                        "healthy": True,
                        "lag_seconds": lag.total_seconds()
                    })
        except Exception as e:
            health["replicas"].append({
                "healthy": False,
                "error": str(e)
            })
    
    return health

Best Practices

# Replication best practices

configuration:
  - "Use async for lower latency"
  - "Sync for critical data"
  - "3 replicas minimum"
  - "Spread across AZs"

monitoring:
  - "Track replication lag"
  - "Alert on high lag"
  - "Monitor disk usage"

failover:
  - "Automatic failover"
  - "Test failover regularly"
  - "Promote carefully"

Conclusion

Choose replication strategy:

  • Primary-replica: Simple, read scaling
  • Multi-master: Write scaling, needs conflict resolution
  • Leaderless: Highest availability, eventual consistency

Always monitor replication lag and test failover procedures.


Comments