Introduction
Database performance is often the bottleneck in software systems. As applications scale, single database instances struggle to handle increased load. This guide covers the essential patterns for scaling databases: caching, replication, sharding, and the architectural decisions that make them work.
Scaling databases requires understanding trade-offs between consistency, availability, and performance. The right pattern depends on your access patterns, data size, and business requirements.
Caching Strategies
Multi-Level Caching
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Multi-Level Cache โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Request โ CDN โ App Cache โ Database โ
โ โ โ
โ Local โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Redis Cache Implementation
import redis
import json
from functools import wraps
import hashlib
class CacheClient:
def __init__(self, redis_url):
self.client = redis.from_url(redis_url)
def get(self, key):
value = self.client.get(key)
if value:
return json.loads(value)
return None
def set(self, key, value, ttl=300):
self.client.setex(
key,
ttl,
json.dumps(value)
)
def delete(self, key):
self.client.delete(key)
def invalidate_pattern(self, pattern):
keys = self.client.keys(pattern)
if keys:
self.client.delete(*keys)
# Cache-aside pattern
def cache_aside(cache, key_prefix):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
key_args = str(args) + str(sorted(kwargs.items()))
cache_key = f"{key_prefix}:{hashlib.md5(key_args.encode()).hexdigest()}"
# Try cache first
cached = cache.get(cache_key)
if cached is not None:
return cached
# Fetch from database
result = func(*args, **kwargs)
# Store in cache
cache.set(cache_key, result, ttl=300)
return result
return wrapper
return decorator
# Usage
class UserService:
def __init__(self, db, cache):
self.db = db
self.cache = cache
@cache_aside(cache, "user")
def get_user(self, user_id):
return self.db.query(User).filter_by(id=user_id).first()
Write-Through Cache
class WriteThroughCache:
def __init__(self, db, cache):
self.db = db
self.cache = cache
def save(self, entity):
# Write to database
self.db.save(entity)
# Write to cache immediately
self.cache.set(
f"{entity.__class__.__name__}:{entity.id}",
entity.to_dict()
)
return entity
def delete(self, entity_type, entity_id):
# Delete from database
self.db.delete(entity_type, entity_id)
# Delete from cache
self.cache.delete(f"{entity_type}:{entity_id}")
Cache Invalidation Strategies
| Strategy | Pros | Cons |
|---|---|---|
| TTL | Simple | Stale data possible |
| Write-through | Consistent | Write latency |
| Write-behind | Fast writes | Data loss risk |
| Cache-aside | Simple | Multiple round trips |
Read Replicas
Replication Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Read Replica Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโ โ
โ โ Primary DB โ โ
โ โ (Writer) โ โ
โ โโโโโโโโโฌโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โ
โ โ Replica 1 โ โ Replica 2 โ โ Replica 3 โ โ
โ โ (Reader) โ โ (Reader) โ โ (Reader) โ โ
โ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
PgBouncer for Connection Pooling
# pgbouncer.ini
[databases]
primary = host=primary-db port=5432 dbname=app
[pgbouncer]
listen_port = 6432
listen_addr = *
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 25
min_pool_size = 10
reserve_pool_size = 5
reserve_pool_timeout = 5
Application-Level Routing
from contextlib import contextmanager
import random
class DatabaseRouter:
def __init__(self, primary, replicas):
self.primary = primary
self.replicas = replicas
@contextmanager
def get_connection(self, for_write=False):
"""Get database connection, routing to replica for reads."""
if for_write or not self.replicas:
conn = self.primary.connect()
else:
# Load balance across replicas
replica = random.choice(self.replicas)
conn = replica.connect()
try:
yield conn
finally:
conn.close()
def read(self, query, *args):
with self.get_connection(for_write=False) as conn:
return conn.execute(query, args)
def write(self, query, *args):
with self.get_connection(for_write=True) as conn:
return conn.execute(query, args)
Horizontal Sharding
Sharding Strategies
import hashlib
class ShardRouter:
def __init__(self, shards):
self.shards = shards
self.num_shards = len(shards)
def get_shard(self, key):
"""Determine which shard handles a key."""
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
shard_index = hash_value % self.num_shards
return self.shards[shard_index]
def get_user_shard(self, user_id):
"""Shard by user ID."""
return self.get_shard(f"user:{user_id}")
def get_order_shard(self, order_id):
"""Shard by order ID."""
return self.get_shard(f"order:{order_id}")
# Range-based sharding
class RangeShardRouter:
def __init__(self, ranges):
self.ranges = sorted(ranges, key=lambda r: r[0])
def get_shard(self, key):
"""Find shard based on key range."""
key_int = int(key)
for min_val, max_val, shard in self.ranges:
if min_val <= key_int <= max_val:
return shard
raise ValueError(f"No shard for key: {key}")
Consistent Hashing
import hashlib
class ConsistentHashRing:
def __init__(self, nodes, virtual_nodes=100):
self.ring = {}
self.sorted_keys = []
self.virtual_nodes = virtual_nodes
for node in nodes:
self.add_node(node)
def add_node(self, node):
for i in range(self.virtual_nodes):
key = self._hash(f"{node}:{i}")
self.ring[key] = node
self.sorted_keys.append(key)
self.sorted_keys.sort()
def remove_node(self, node):
for i in range(self.virtual_nodes):
key = self._hash(f"{node}:{i}")
del self.ring[key]
self.sorted_keys.remove(key)
def get_node(self, key):
if not self.ring:
return None
hash_value = self._hash(key)
# Find the first node with hash >= key's hash
for node_hash in self.sorted_keys:
if node_hash >= hash_value:
return self.ring[node_hash]
# Wrap around to first node
return self.ring[self.sorted_keys[0]]
def _hash(self, key):
return int(hashlib.md5(key.encode()).hexdigest(), 16)
Database Performance Optimization
Query Optimization
-- Example: Optimized query with proper indexing
-- โ Slow: Missing indexes, full table scan
SELECT * FROM orders
WHERE created_at > '2026-01-01'
AND status = 'pending'
ORDER BY created_at DESC;
-- โ
Optimized: With indexes
CREATE INDEX idx_orders_status_created
ON orders(status, created_at DESC);
-- Use covering index to avoid table lookup
CREATE INDEX idx_orders_covering
ON orders(status, created_at DESC)
INCLUDE (user_id, total);
Connection Pool Tuning
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql://user:pass@localhost/db",
poolclass=QueuePool,
pool_size=20, # Normal connections
max_overflow=10, # Additional connections under load
pool_timeout=30, # Wait time for connection
pool_recycle=3600, # Recycle connections hourly
pool_pre_ping=True, # Check connection before use
)
Batch Processing
async def batch_insert_users(users):
"""Insert users in batches for better performance."""
BATCH_SIZE = 1000
for i in range(0, len(users), BATCH_SIZE):
batch = users[i:i + BATCH_SIZE]
# Use bulk insert
await db.execute(
User.__table__.insert(),
batch
)
# Commit after each batch
await db.commit()
print(f"Inserted {min(i + BATCH_SIZE, len(users))}/{len(users)}")
CQRS Implementation
Command Query Responsibility Segregation
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CQRS Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Commands (Writes) Queries (Reads) โ
โ โ โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Write โ โ Read โ โ
โ โ DB โ โโ Events โโโบ โ DB โ โ
โ โ(Primary)โ โ(Denorm.)โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Event handler for CQRS
class UserEventHandler:
def __init__(self, read_db):
self.read_db = read_db
async def handle_user_created(self, event):
# Project to read model
await self.read_db.execute(
"""
INSERT INTO users_read (id, email, name, created_at)
VALUES (:id, :email, :name, :created_at)
""",
{
"id": event["user_id"],
"email": event["email"],
"name": event["name"],
"created_at": event["timestamp"]
}
)
async def handle_user_updated(self, event):
# Update read model
await self.read_db.execute(
"""
UPDATE users_read
SET email = :email, name = :name
WHERE id = :id
""",
{
"id": event["user_id"],
"email": event.get("email"),
"name": event.get("name")
}
)
Best Practices
- Measure before optimizing: Use metrics to identify bottlenecks
- Cache intelligently: Focus on frequently accessed data
- Denormalize for reads: Optimize read models separately
- Shard when necessary: After other optimizations are exhausted
- Monitor everything: Track query performance, cache hit rates
- Plan for failure: Design for database unavailability
Conclusion
Database scaling requires a multi-layered approach. Start with caching to reduce database load, then add read replicas for read scaling, and finally implement sharding when you need to scale writes. Each pattern has trade-offsโchoose based on your specific requirements and access patterns.
The key is to understand your data and access patterns, measure performance continuously, and evolve your strategy as your system grows.
Comments