Skip to main content
โšก Calmops

Database Patterns: Caching, Sharding, and Scaling Strategies

Introduction

Database performance is often the bottleneck in software systems. As applications scale, single database instances struggle to handle increased load. This guide covers the essential patterns for scaling databases: caching, replication, sharding, and the architectural decisions that make them work.

Scaling databases requires understanding trade-offs between consistency, availability, and performance. The right pattern depends on your access patterns, data size, and business requirements.

Caching Strategies

Multi-Level Caching

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                   Multi-Level Cache                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚   Request โ†’ CDN โ†’ App Cache โ†’ Database                      โ”‚
โ”‚              โ†“                                              โ”‚
โ”‚           Local                                             โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Redis Cache Implementation

import redis
import json
from functools import wraps
import hashlib

class CacheClient:
    
    def __init__(self, redis_url):
        self.client = redis.from_url(redis_url)
    
    def get(self, key):
        value = self.client.get(key)
        if value:
            return json.loads(value)
        return None
    
    def set(self, key, value, ttl=300):
        self.client.setex(
            key,
            ttl,
            json.dumps(value)
        )
    
    def delete(self, key):
        self.client.delete(key)
    
    def invalidate_pattern(self, pattern):
        keys = self.client.keys(pattern)
        if keys:
            self.client.delete(*keys)

# Cache-aside pattern
def cache_aside(cache, key_prefix):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Generate cache key
            key_args = str(args) + str(sorted(kwargs.items()))
            cache_key = f"{key_prefix}:{hashlib.md5(key_args.encode()).hexdigest()}"
            
            # Try cache first
            cached = cache.get(cache_key)
            if cached is not None:
                return cached
            
            # Fetch from database
            result = func(*args, **kwargs)
            
            # Store in cache
            cache.set(cache_key, result, ttl=300)
            
            return result
        return wrapper
    return decorator

# Usage
class UserService:
    
    def __init__(self, db, cache):
        self.db = db
        self.cache = cache
    
    @cache_aside(cache, "user")
    def get_user(self, user_id):
        return self.db.query(User).filter_by(id=user_id).first()

Write-Through Cache

class WriteThroughCache:
    
    def __init__(self, db, cache):
        self.db = db
        self.cache = cache
    
    def save(self, entity):
        # Write to database
        self.db.save(entity)
        
        # Write to cache immediately
        self.cache.set(
            f"{entity.__class__.__name__}:{entity.id}",
            entity.to_dict()
        )
        
        return entity
    
    def delete(self, entity_type, entity_id):
        # Delete from database
        self.db.delete(entity_type, entity_id)
        
        # Delete from cache
        self.cache.delete(f"{entity_type}:{entity_id}")

Cache Invalidation Strategies

Strategy Pros Cons
TTL Simple Stale data possible
Write-through Consistent Write latency
Write-behind Fast writes Data loss risk
Cache-aside Simple Multiple round trips

Read Replicas

Replication Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                   Read Replica Architecture                  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚                  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                         โ”‚
โ”‚                  โ”‚   Primary DB   โ”‚                         โ”‚
โ”‚                  โ”‚   (Writer)    โ”‚                         โ”‚
โ”‚                  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                         โ”‚
โ”‚                          โ”‚                                  โ”‚
โ”‚           โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                   โ”‚
โ”‚           โ–ผ              โ–ผ              โ–ผ                  โ”‚
โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚
โ”‚    โ”‚ Replica 1  โ”‚ โ”‚ Replica 2  โ”‚ โ”‚ Replica 3  โ”‚          โ”‚
โ”‚    โ”‚  (Reader)  โ”‚ โ”‚  (Reader)  โ”‚ โ”‚  (Reader)  โ”‚          โ”‚
โ”‚    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

PgBouncer for Connection Pooling

# pgbouncer.ini
[databases]
primary = host=primary-db port=5432 dbname=app

[pgbouncer]
listen_port = 6432
listen_addr = *
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 25
min_pool_size = 10
reserve_pool_size = 5
reserve_pool_timeout = 5

Application-Level Routing

from contextlib import contextmanager
import random

class DatabaseRouter:
    
    def __init__(self, primary, replicas):
        self.primary = primary
        self.replicas = replicas
    
    @contextmanager
    def get_connection(self, for_write=False):
        """Get database connection, routing to replica for reads."""
        if for_write or not self.replicas:
            conn = self.primary.connect()
        else:
            # Load balance across replicas
            replica = random.choice(self.replicas)
            conn = replica.connect()
        
        try:
            yield conn
        finally:
            conn.close()
    
    def read(self, query, *args):
        with self.get_connection(for_write=False) as conn:
            return conn.execute(query, args)
    
    def write(self, query, *args):
        with self.get_connection(for_write=True) as conn:
            return conn.execute(query, args)

Horizontal Sharding

Sharding Strategies

import hashlib

class ShardRouter:
    
    def __init__(self, shards):
        self.shards = shards
        self.num_shards = len(shards)
    
    def get_shard(self, key):
        """Determine which shard handles a key."""
        hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
        shard_index = hash_value % self.num_shards
        return self.shards[shard_index]
    
    def get_user_shard(self, user_id):
        """Shard by user ID."""
        return self.get_shard(f"user:{user_id}")
    
    def get_order_shard(self, order_id):
        """Shard by order ID."""
        return self.get_shard(f"order:{order_id}")

# Range-based sharding
class RangeShardRouter:
    
    def __init__(self, ranges):
        self.ranges = sorted(ranges, key=lambda r: r[0])
    
    def get_shard(self, key):
        """Find shard based on key range."""
        key_int = int(key)
        for min_val, max_val, shard in self.ranges:
            if min_val <= key_int <= max_val:
                return shard
        raise ValueError(f"No shard for key: {key}")

Consistent Hashing

import hashlib

class ConsistentHashRing:
    
    def __init__(self, nodes, virtual_nodes=100):
        self.ring = {}
        self.sorted_keys = []
        self.virtual_nodes = virtual_nodes
        
        for node in nodes:
            self.add_node(node)
    
    def add_node(self, node):
        for i in range(self.virtual_nodes):
            key = self._hash(f"{node}:{i}")
            self.ring[key] = node
            self.sorted_keys.append(key)
        
        self.sorted_keys.sort()
    
    def remove_node(self, node):
        for i in range(self.virtual_nodes):
            key = self._hash(f"{node}:{i}")
            del self.ring[key]
            self.sorted_keys.remove(key)
    
    def get_node(self, key):
        if not self.ring:
            return None
        
        hash_value = self._hash(key)
        
        # Find the first node with hash >= key's hash
        for node_hash in self.sorted_keys:
            if node_hash >= hash_value:
                return self.ring[node_hash]
        
        # Wrap around to first node
        return self.ring[self.sorted_keys[0]]
    
    def _hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

Database Performance Optimization

Query Optimization

-- Example: Optimized query with proper indexing
-- โŒ Slow: Missing indexes, full table scan
SELECT * FROM orders 
WHERE created_at > '2026-01-01' 
AND status = 'pending'
ORDER BY created_at DESC;

-- โœ… Optimized: With indexes
CREATE INDEX idx_orders_status_created 
ON orders(status, created_at DESC);

-- Use covering index to avoid table lookup
CREATE INDEX idx_orders_covering 
ON orders(status, created_at DESC) 
INCLUDE (user_id, total);

Connection Pool Tuning

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "postgresql://user:pass@localhost/db",
    poolclass=QueuePool,
    pool_size=20,           # Normal connections
    max_overflow=10,        # Additional connections under load
    pool_timeout=30,        # Wait time for connection
    pool_recycle=3600,     # Recycle connections hourly
    pool_pre_ping=True,    # Check connection before use
)

Batch Processing

async def batch_insert_users(users):
    """Insert users in batches for better performance."""
    BATCH_SIZE = 1000
    
    for i in range(0, len(users), BATCH_SIZE):
        batch = users[i:i + BATCH_SIZE]
        
        # Use bulk insert
        await db.execute(
            User.__table__.insert(),
            batch
        )
        
        # Commit after each batch
        await db.commit()
        
        print(f"Inserted {min(i + BATCH_SIZE, len(users))}/{len(users)}")

CQRS Implementation

Command Query Responsibility Segregation

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                        CQRS Architecture                     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚   Commands (Writes)          Queries (Reads)              โ”‚
โ”‚        โ†“                           โ†“                        โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                  โ”‚
โ”‚   โ”‚  Write  โ”‚                โ”‚  Read   โ”‚                  โ”‚
โ”‚   โ”‚   DB    โ”‚ โ”€โ”€ Events โ”€โ”€โ–บ  โ”‚   DB    โ”‚                  โ”‚
โ”‚   โ”‚(Primary)โ”‚                โ”‚(Denorm.)โ”‚                  โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                  โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
# Event handler for CQRS
class UserEventHandler:
    
    def __init__(self, read_db):
        self.read_db = read_db
    
    async def handle_user_created(self, event):
        # Project to read model
        await self.read_db.execute(
            """
            INSERT INTO users_read (id, email, name, created_at)
            VALUES (:id, :email, :name, :created_at)
            """,
            {
                "id": event["user_id"],
                "email": event["email"],
                "name": event["name"],
                "created_at": event["timestamp"]
            }
        )
    
    async def handle_user_updated(self, event):
        # Update read model
        await self.read_db.execute(
            """
            UPDATE users_read 
            SET email = :email, name = :name
            WHERE id = :id
            """,
            {
                "id": event["user_id"],
                "email": event.get("email"),
                "name": event.get("name")
            }
        )

Best Practices

  1. Measure before optimizing: Use metrics to identify bottlenecks
  2. Cache intelligently: Focus on frequently accessed data
  3. Denormalize for reads: Optimize read models separately
  4. Shard when necessary: After other optimizations are exhausted
  5. Monitor everything: Track query performance, cache hit rates
  6. Plan for failure: Design for database unavailability

Conclusion

Database scaling requires a multi-layered approach. Start with caching to reduce database load, then add read replicas for read scaling, and finally implement sharding when you need to scale writes. Each pattern has trade-offsโ€”choose based on your specific requirements and access patterns.

The key is to understand your data and access patterns, measure performance continuously, and evolve your strategy as your system grows.

Comments