Skip to main content
โšก Calmops

Scalable Database Architecture: Sharding, Replication, and Partitioning

Scalable Database Architecture: Sharding, Replication, and Partitioning

Databases become bottlenecks at scale. This guide covers sharding, replication, and partitioning for scaling to billions of rows.


Replication Strategies

Master-Slave Replication

import psycopg2
from datetime import datetime

class MasterSlaveReplication:
    """Traditional master-slave replication"""
    
    def __init__(self):
        # Master (writes)
        self.master = psycopg2.connect(
            "dbname=mydb user=admin host=master.db.example.com"
        )
        
        # Slaves (reads)
        self.slaves = [
            psycopg2.connect("dbname=mydb user=admin host=slave1.db.example.com"),
            psycopg2.connect("dbname=mydb user=admin host=slave2.db.example.com"),
            psycopg2.connect("dbname=mydb user=admin host=slave3.db.example.com"),
        ]
    
    def write(self, query: str, params: tuple):
        """Write always goes to master"""
        cursor = self.master.cursor()
        cursor.execute(query, params)
        self.master.commit()
        return cursor.rowcount
    
    def read(self, query: str, params: tuple):
        """Read from slaves (load balanced)"""
        import random
        
        slave = random.choice(self.slaves)
        cursor = slave.cursor()
        cursor.execute(query, params)
        return cursor.fetchall()

# Benefits:
# - Simple to implement
# - Good for read-heavy workloads
# - Failover capability

# Drawbacks:
# - Replication lag (eventual consistency)
# - Single master bottleneck
# - No automatic failover

Multi-Master Replication

class MultiMasterReplication:
    """Multiple masters for write scalability"""
    
    def __init__(self):
        self.masters = [
            psycopg2.connect("dbname=mydb user=admin host=master1.db.example.com"),
            psycopg2.connect("dbname=mydb user=admin host=master2.db.example.com"),
        ]
    
    def write(self, query: str, params: tuple):
        """Write to local master"""
        import random
        
        master = random.choice(self.masters)
        cursor = master.cursor()
        cursor.execute(query, params)
        master.commit()
        return cursor.rowcount
    
    def read(self, query: str, params: tuple):
        """Read from any master"""
        cursor = self.masters[0].cursor()
        cursor.execute(query, params)
        return cursor.fetchall()

# Benefits:
# - Write scalability
# - High availability
# - No single point of failure

# Drawbacks:
# - Complex conflict resolution
# - Higher replication lag
# - Increased network traffic

Sharding (Horizontal Partitioning)

Range Sharding

class RangeSharding:
    """Shard by key range"""
    
    def __init__(self):
        # Shard 0: user IDs 0-999,999
        self.shards = {
            0: psycopg2.connect("dbname=shard0 host=shard0.db.example.com"),
            1: psycopg2.connect("dbname=shard1 host=shard1.db.example.com"),
            2: psycopg2.connect("dbname=shard2 host=shard2.db.example.com"),
            3: psycopg2.connect("dbname=shard3 host=shard3.db.example.com"),
        }
    
    def get_shard_id(self, user_id: int) -> int:
        """Determine shard from user ID"""
        if user_id < 1000000:
            return 0
        elif user_id < 2000000:
            return 1
        elif user_id < 3000000:
            return 2
        else:
            return 3
    
    def write(self, user_id: int, query: str, params: tuple):
        """Write to correct shard"""
        shard_id = self.get_shard_id(user_id)
        cursor = self.shards[shard_id].cursor()
        cursor.execute(query, params)
        self.shards[shard_id].commit()
    
    def read(self, user_id: int, query: str, params: tuple):
        """Read from correct shard"""
        shard_id = self.get_shard_id(user_id)
        cursor = self.shards[shard_id].cursor()
        cursor.execute(query, params)
        return cursor.fetchall()

# Problem with range sharding:
# - Uneven distribution (hotspots)
# - Hard to re-shard when ranges change

Hash Sharding

class HashSharding:
    """Shard by hash of key"""
    
    def __init__(self, num_shards: int = 4):
        self.num_shards = num_shards
        self.shards = {}
        
        for i in range(num_shards):
            self.shards[i] = psycopg2.connect(
                f"dbname=shard{i} host=shard{i}.db.example.com"
            )
    
    def get_shard_id(self, user_id: int) -> int:
        """Hash user ID to shard"""
        import hashlib
        
        hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
        return hash_value % self.num_shards
    
    def write(self, user_id: int, data: dict):
        """Write to shard"""
        shard_id = self.get_shard_id(user_id)
        cursor = self.shards[shard_id].cursor()
        cursor.execute(
            "INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",
            (user_id, data['name'], data['email'])
        )
        self.shards[shard_id].commit()

# Benefits:
# - Uniform distribution
# - Simple sharding logic

# Drawbacks:
# - Resharding requires rehashing all data
# - Can't query across shards efficiently

Directory-Based Sharding

class DirectorySharding:
    """Shard directory for flexible mapping"""
    
    def __init__(self):
        self.shards = {
            0: psycopg2.connect("dbname=shard0 host=shard0.db.example.com"),
            1: psycopg2.connect("dbname=shard1 host=shard1.db.example.com"),
            2: psycopg2.connect("dbname=shard2 host=shard2.db.example.com"),
        }
        
        # Directory stored in Redis or separate DB
        self.directory_db = redis.Redis(host='localhost')
    
    def set_shard_mapping(self, user_id: int, shard_id: int):
        """Map user to shard"""
        self.directory_db.set(f"user:{user_id}:shard", shard_id)
    
    def get_shard_id(self, user_id: int) -> int:
        """Look up shard from directory"""
        shard_id = self.directory_db.get(f"user:{user_id}:shard")
        if shard_id is None:
            # Default or allocate new
            return 0
        return int(shard_id)
    
    def write(self, user_id: int, data: dict):
        """Write to shard"""
        shard_id = self.get_shard_id(user_id)
        cursor = self.shards[shard_id].cursor()
        cursor.execute(
            "INSERT INTO users (id, name) VALUES (%s, %s)",
            (user_id, data['name'])
        )
        self.shards[shard_id].commit()
    
    def reshard(self):
        """Reshard data without rehashing"""
        # Can remap users without moving data immediately
        # Lazy migration or background job
        pass

# Benefits:
# - Flexible sharding
# - Supports uneven distribution
# - Easier resharding

# Drawbacks:
# - Extra lookup overhead
# - Directory becomes bottleneck

Vertical Partitioning

class VerticalPartitioning:
    """Split table columns across databases"""
    
    def __init__(self):
        # Frequently accessed columns
        self.hot_db = psycopg2.connect("dbname=users_hot host=hot.db.example.com")
        
        # Rarely accessed columns
        self.cold_db = psycopg2.connect("dbname=users_cold host=cold.db.example.com")
    
    def write_user(self, user_id: int, user_data: dict):
        """Write to appropriate partition"""
        
        # Hot data
        cursor = self.hot_db.cursor()
        cursor.execute("""
            INSERT INTO users (id, name, email, last_login)
            VALUES (%s, %s, %s, %s)
        """, (user_id, user_data['name'], user_data['email'], 
              user_data.get('last_login')))
        self.hot_db.commit()
        
        # Cold data
        cursor = self.cold_db.cursor()
        cursor.execute("""
            INSERT INTO users (id, bio, preferences)
            VALUES (%s, %s, %s)
        """, (user_id, user_data.get('bio'), user_data.get('preferences')))
        self.cold_db.commit()
    
    def read_user(self, user_id: int, columns: list):
        """Read from appropriate partition"""
        
        user_data = {}
        
        if any(col in columns for col in ['name', 'email', 'last_login']):
            cursor = self.hot_db.cursor()
            cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
            hot_data = cursor.fetchone()
            user_data.update(hot_data)
        
        if any(col in columns for col in ['bio', 'preferences']):
            cursor = self.cold_db.cursor()
            cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
            cold_data = cursor.fetchone()
            user_data.update(cold_data)
        
        return user_data

# Benefits:
# - Different storage/performance per partition
# - Reduces I/O for hot data

Consistency Models

class ConsistencyModels:
    """Different consistency guarantees"""
    
    # Strong Consistency
    def strong_read_after_write(self):
        """Read from master always"""
        # Master write
        cursor = self.master.cursor()
        cursor.execute("UPDATE users SET balance = balance - 100 WHERE id = 1")
        self.master.commit()
        
        # Read from master (consistent)
        cursor = self.master.cursor()
        cursor.execute("SELECT balance FROM users WHERE id = 1")
        balance = cursor.fetchone()[0]
        
        assert balance == expected_balance  # Always correct
    
    # Eventual Consistency
    async def eventual_read_after_write(self):
        """Read from slave might be stale"""
        # Master write
        cursor = self.master.cursor()
        cursor.execute("UPDATE users SET status = 'active' WHERE id = 1")
        self.master.commit()
        
        # Read from slave (might be stale)
        cursor = self.slave.cursor()
        cursor.execute("SELECT status FROM users WHERE id = 1")
        status = cursor.fetchone()[0]
        
        # status might still be 'inactive' if replication lag
        # Will eventually be consistent
    
    # Read-Your-Writes Consistency
    def read_your_writes(self, user_id: int):
        """User sees their own writes immediately"""
        # Write to master
        self.write(user_id, {'name': 'John'})
        
        # Read from master (see own write)
        # Read from slave for other users (might be stale)

Query Patterns Across Shards

class ShardedQueryRouter:
    """Route queries across multiple shards"""
    
    async def query_all_shards(self, query: str) -> List:
        """Execute query on all shards, aggregate results"""
        
        import asyncio
        
        tasks = []
        for shard_id in range(self.num_shards):
            tasks.append(self._query_shard(shard_id, query))
        
        results = await asyncio.gather(*tasks)
        
        # Aggregate results
        all_results = []
        for result in results:
            all_results.extend(result)
        
        return all_results
    
    async def _query_shard(self, shard_id: int, query: str):
        """Execute query on single shard"""
        cursor = self.shards[shard_id].cursor()
        cursor.execute(query)
        return cursor.fetchall()

# Example: Get total balance across all users
total = sum(row[0] for row in await router.query_all_shards(
    "SELECT SUM(balance) FROM users"
))

Glossary

  • Replication: Copying data across multiple servers
  • Sharding: Horizontal partitioning across multiple servers
  • Partition: Subset of data
  • Shard: Independent partition with its own storage
  • Consistency: Agreement between replicas
  • Lag: Delay in replication

Resources

Comments