Caching is the most impactful performance optimization available. A well-designed caching strategy can reduce latency by 10-100x and reduce load on origin systems by 90% or more. This guide covers caching at every layer - from CDN to application to database.
The Caching Pyramid
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CDN / Edge Cache โ โ Milliseconds, global
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Application Cache โ โ Milliseconds
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Database Query Cache โ โ Milliseconds
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Primary Database โ โ Milliseconds to seconds
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Each layer caches different data with different characteristics.
Redis Deep Dive
Redis Data Structures
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
# Strings - Simple values
r.set('user:123:name', 'John Doe')
r.set('user:123:email', '[email protected]')
r.setex('session:abc123', 3600, json.dumps({'user_id': 123}))
# Hashes - Objects
r.hset('user:123', mapping={
'name': 'John Doe',
'email': '[email protected]',
'created_at': '2024-01-15'
})
user = r.hgetall('user:123')
# Lists - Ordered collections
r.lpush('recent_searches:user123', 'shoes')
r.ltrim('recent_searches:user123', 0, 9) # Keep only 10
# Sets - Unique collections
r.sadd('product:views:2024-01-15', 'product1', 'product2', 'product3')
unique_viewers = r.scard('product:views:2024-01-15')
# Sorted Sets - Leaderboards
r.zadd('leaderboard:score', {'player1': 100, 'player2': 95, 'player3': 90})
top_players = r.zrevrange('leaderboard:score', 0, 9, withscores=True)
Caching Patterns
Cache-Aside (Lazy Loading):
# Standard cache-aside pattern
def get_user(user_id):
# Try cache first
cached = redis.get(f'user:{user_id}')
if cached:
return json.loads(cached)
# Cache miss - fetch from database
user = db.query('SELECT * FROM users WHERE id = ?', user_id)
if user:
# Store in cache with TTL
redis.setex(f'user:{user_id}', 3600, json.dumps(user))
return user
def update_user(user_id, data):
# Update database first
db.execute('UPDATE users SET ... WHERE id = ?', user_id, data)
# Invalidate cache
redis.delete(f'user:{user_id}')
Write-Through:
# Write-through - cache updated on every write
def create_order(order_data):
# Write to database
order_id = db.insert('orders', order_data)
# Write to cache immediately
cache_key = f'order:{order_id}'
redis.setex(cache_key, 3600, json.dumps({**order_data, 'id': order_id}))
return order_id
def update_order(order_id, data):
db.update('orders', data, 'id = ?', order_id)
# Update cache
cache_key = f'order:{order_id}'
cached = redis.get(cache_key)
if cached:
order = json.loads(cached)
order.update(data)
redis.setex(cache_key, 3600, json.dumps(order))
Write-Behind:
# Write-behind - async database writes
import asyncio
from collections import deque
write_queue = deque()
async def update_user_async(user_id, data):
# Update cache immediately
redis.setex(f'user:{user_id}', 3600, json.dumps(data))
# Queue for async database write
write_queue.append({
'table': 'users',
'id': user_id,
'data': data
})
async def flush_writes():
while True:
if write_queue:
batch = []
while write_queue and len(batch) < 100:
batch.append(write_queue.popleft())
# Bulk write to database
db.batch_update('users', batch)
await asyncio.sleep(1)
Redis Cluster Patterns
# Redis Cluster connection
from redis.cluster import RedisCluster
nodes = [
{'host': 'redis-1', 'port': 6379},
{'host': 'redis-2', 'port': 6379},
{'host': 'redis-3', 'port': 6379},
]
rc = RedisCluster(startup_nodes=nodes, decode_responses=True)
# Automatic key distribution
rc.set('user:1:name', 'Alice')
rc.set('user:2:name', 'Bob')
# Keys automatically sharded across nodes
Redis Pub/Sub for Cache Invalidation
# Publisher - notify other services of changes
def invalidate_user_cache(user_id):
# Local cache delete
redis.delete(f'user:{user_id}')
# Notify other instances
redis.publish('cache:invalidate', json.dumps({
'key': f'user:{user_id}',
'pattern': f'user:{user_id}:*'
}))
# Subscriber - listen for invalidations
def listen_for_invalidation():
pubsub = redis.pubsub()
pubsub.subscribe('cache:invalidate')
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
if '*' in data['pattern']:
# Handle pattern
for key in redis.keys(data['pattern']):
redis.delete(key)
else:
redis.delete(data['key'])
Application-Level Caching
In-Memory Caching
from functools import lru_cache
from threading import Lock
import time
# Simple in-memory cache with TTL
class Cache:
def __init__(self, ttl=300):
self._cache = {}
self._timestamps = {}
self._ttl = ttl
self._lock = Lock()
def get(self, key):
with self._lock:
if key in self._cache:
if time.time() - self._timestamps[key] < self._ttl:
return self._cache[key]
del self._cache[key]
del self._timestamps[key]
return None
def set(self, key, value):
with self._lock:
self._cache[key] = value
self._timestamps[key] = time.time()
def delete(self, key):
with self._lock:
self._cache.pop(key, None)
self._timestamps.pop(key, None)
def clear(self):
with self._lock:
self._cache.clear()
self._timestamps.clear()
# Usage with LRU cache
@lru_cache(maxsize=1000)
def get_product_category(product_id):
# Expensive database query
return db.query('SELECT category FROM products WHERE id = ?', product_id)
# Manual cache management
from cachetools import TTLCache, cached
product_cache = TTLCache(maxsize=10000, ttl=300)
@cached(cache=product_cache)
def get_product(product_id):
return db.query('SELECT * FROM products WHERE id = ?', product_id)
Distributed Caching Patterns
# Multi-layer cache with local + Redis
class MultiLayerCache:
def __init__(self, local_ttl=60, remote_ttl=3600):
self.local = {} # Local dict
self.redis = redis.Redis()
self.local_ttl = local_ttl
self.remote_ttl = remote_ttl
def get(self, key):
# Check local cache first
if key in self.local:
value, timestamp = self.local[key]
if time.time() - timestamp < self.local_ttl:
return value
del self.local[key]
# Check Redis
value = self.redis.get(key)
if value:
# Populate local cache
self.local[key] = (value, time.time())
return json.loads(value)
return None
def set(self, key, value):
# Write to both layers
self.redis.setex(key, self.remote_ttl, json.dumps(value))
self.local[key] = (value, time.time())
CDN Caching
Cache-Control Headers
# Dynamic response with proper cache headers
def get_product(request, product_id):
product = db.get_product(product_id)
# Per-user responses shouldn't be cached
if request.user:
return jsonify(product) # No cache headers = no CDN caching
# Public, cacheable content
response = jsonify(product)
response.headers['Cache-Control'] = 'public, max-age=300'
response.headers['Vary'] = 'Accept-Encoding'
return response
def get_static_assets(request, filename):
# Long cache for static assets (with versioned URLs)
response = send_file(f'static/{filename}')
response.headers['Cache-Control'] = 'public, max-age=31536000, immutable'
return response
def get_user_dashboard(request):
# Private - only browser cache
response = jsonify(request.user.dashboard)
response.headers['Cache-Control'] = 'private, max-age=0, no-cache'
return response
CDN Cache Invalidation
# CloudFront invalidation via Lambda
import boto3
def invalidate_cdn(paths):
cloudfront = boto3.client('cloudfront')
response = cloudfront.create_invalidation(
DistributionId='E1234567890ABC',
InvalidationBatch={
'CallerReference': f'invalidation-{int(time.time())}',
'Paths': {
'Quantity': len(paths),
'Items': paths
}
}
)
return response['Invalidation']['Id']
# Usage
invalidate_cdn([
'/api/products/*',
'/static/images/*'
])
// Cloudflare cache purge via API
async function purgeCache(urls) {
const response = await fetch(
'https://api.cloudflare.com/client/v4/zones/:zone_id/purge_cache',
{
method: 'POST',
headers: {
'Authorization': `Bearer ${API_TOKEN}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
files: urls,
tags: ['product-update'],
prefixs: ['/api/']
})
}
);
return response.json();
}
Cache Invalidation Strategies
Time-Based Invalidation
# Fixed TTL
CACHE_TTL = {
'user_profile': 3600, # 1 hour
'product_list': 300, # 5 minutes
'config': 86400, # 24 hours
'analytics': 60, # 1 minute
}
def get_cached(key, ttl_key, fetcher):
cached = redis.get(key)
if cached:
return json.loads(cached)
value = fetcher()
redis.setex(key, CACHE_TTL[ttl_key], json.dumps(value))
return value
Event-Based Invalidation
# Invalidate on data changes
def on_product_updated(product_id):
# Invalidate specific cache
redis.delete(f'product:{product_id}')
# Invalidate list caches
for pattern in ['products:list:*', 'products:category:*']:
for key in redis.keys(pattern):
redis.delete(key)
# Notify other services
redis.publish('cache:invalidate', {
'type': 'product',
'id': product_id
})
Stale-While-Revalidate
# Serve stale content while refreshing in background
def get_product_with_revalidation(product_id):
cache_key = f'product:{product_id}'
# Try to get from cache
cached = redis.get(cache_key)
if cached:
product = json.loads(cached)
# Check if stale
is_stale = redis.ttl(cache_key) < 60
if is_stale:
# Schedule background refresh (don't block)
asyncio.create_task(refresh_product_cache(product_id))
return product
# Cache miss - fetch synchronously
return refresh_product_cache(product_id)
async def refresh_product_cache(product_id):
product = db.get_product(product_id)
# Update cache
redis.setex(f'product:{product_id}', 3600, json.dumps(product))
return product
Cache Monitoring
Key Metrics
# Prometheus Redis exporter configuration
- name: redis
rules:
- alert: CacheHitRateLow
expr: |
(redis_keyspace_hits_total /
(redis_keyspace_hits_total + redis_keyspace_misses_total)) < 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "Cache hit rate below 80%"
- alert: RedisMemoryHigh
expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "Redis memory usage above 90%"
- alert: RedisEvictionsHigh
expr: rate(redis_evicted_keys_total[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High number of key evictions"
Cache Analytics
# Cache hit/miss tracking
class CacheMetrics:
def __init__(self):
self.hits = 0
self.misses = 0
self.errors = 0
def record_hit(self):
self.hits += 1
def record_miss(self):
self.misses += 1
@property
def hit_rate(self):
total = self.hits + self.misses
return self.hits / total if total > 0 else 0
metrics = CacheMetrics()
def get_cached(key):
try:
value = redis.get(key)
if value:
metrics.record_hit()
return json.loads(value)
metrics.record_miss()
return None
except Exception:
metrics.errors += 1
return None
Memcached vs Redis
| Feature | Redis | Memcached |
|---|---|---|
| Data structures | Strings, Hashes, Lists, Sets, Sorted Sets | Strings only |
| Persistence | RDB + AOF | No |
| Clustering | Redis Cluster | Memcached SASL |
| Pub/Sub | Yes | No |
| Lua scripts | Yes | No |
| Memory efficiency | Moderate | Higher |
| Performance | Very fast | Extremely fast |
| Use case | Rich data, persistence | Simple caching |
# Memcached is simpler - no complex data types
import pymemcache
mc = pymemcache.Client(('localhost', 11211))
# Simple string caching only
mc.set('key', 'value')
value = mc.get('key')
# No TTL with default
mc.set('key', 'value', expire=3600)
Best Practices
Cache Anti-Patterns
Don’t cache too much data:
# Bad: Caching entire database rows
user = cache.get(f'user:{user_id}') # Contains password hash!
# Good: Cache only what's needed
user_summary = cache.get(f'user:{user_id}:summary')
Don’t use indefinite TTLs:
# Bad: No expiration
redis.set('config', json.dumps(config))
# Good: Reasonable TTL with refresh
redis.setex('config', 3600, json.dumps(config))
Handle cache misses gracefully:
# Bad: Cache stampede
for i in range(100): # 100 simultaneous requests
user = get_user(123) # All hit DB simultaneously
# Good: Request coalescing
lock = redis.lock('user:123:lock', timeout=5)
if lock.acquire():
try:
user = get_user(123)
finally:
lock.release()
else:
# Wait for other request to populate cache
time.sleep(0.1)
user = get_user(123)
Conclusion
Effective caching requires strategy, not just technology:
- Use CDN for all static content
- Implement cache-aside for read-heavy workloads
- Use write-through for critical data
- Implement cache invalidation on data changes
- Monitor hit rates and adjust TTLs
- Plan for cache failures gracefully
Start with CDN, add Redis for session and query caching, then layer in application caches as needed.
External Resources
Related Articles
- Frontend Performance Optimization - Client-side caching
- CDN Configuration - Edge caching
- Database DevOps - Database caching
Comments