Skip to main content
โšก Calmops

Caching Strategies: Redis, CDN, and Application Caching

Caching is the most impactful performance optimization available. A well-designed caching strategy can reduce latency by 10-100x and reduce load on origin systems by 90% or more. This guide covers caching at every layer - from CDN to application to database.

The Caching Pyramid

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚        CDN / Edge Cache             โ”‚  โ† Milliseconds, global
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚        Application Cache            โ”‚  โ† Milliseconds
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚        Database Query Cache         โ”‚  โ† Milliseconds
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚        Primary Database             โ”‚  โ† Milliseconds to seconds
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Each layer caches different data with different characteristics.

Redis Deep Dive

Redis Data Structures

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

# Strings - Simple values
r.set('user:123:name', 'John Doe')
r.set('user:123:email', '[email protected]')
r.setex('session:abc123', 3600, json.dumps({'user_id': 123}))

# Hashes - Objects
r.hset('user:123', mapping={
    'name': 'John Doe',
    'email': '[email protected]',
    'created_at': '2024-01-15'
})
user = r.hgetall('user:123')

# Lists - Ordered collections
r.lpush('recent_searches:user123', 'shoes')
r.ltrim('recent_searches:user123', 0, 9)  # Keep only 10

# Sets - Unique collections
r.sadd('product:views:2024-01-15', 'product1', 'product2', 'product3')
unique_viewers = r.scard('product:views:2024-01-15')

# Sorted Sets - Leaderboards
r.zadd('leaderboard:score', {'player1': 100, 'player2': 95, 'player3': 90})
top_players = r.zrevrange('leaderboard:score', 0, 9, withscores=True)

Caching Patterns

Cache-Aside (Lazy Loading):

# Standard cache-aside pattern
def get_user(user_id):
    # Try cache first
    cached = redis.get(f'user:{user_id}')
    if cached:
        return json.loads(cached)
    
    # Cache miss - fetch from database
    user = db.query('SELECT * FROM users WHERE id = ?', user_id)
    
    if user:
        # Store in cache with TTL
        redis.setex(f'user:{user_id}', 3600, json.dumps(user))
    
    return user

def update_user(user_id, data):
    # Update database first
    db.execute('UPDATE users SET ... WHERE id = ?', user_id, data)
    
    # Invalidate cache
    redis.delete(f'user:{user_id}')

Write-Through:

# Write-through - cache updated on every write
def create_order(order_data):
    # Write to database
    order_id = db.insert('orders', order_data)
    
    # Write to cache immediately
    cache_key = f'order:{order_id}'
    redis.setex(cache_key, 3600, json.dumps({**order_data, 'id': order_id}))
    
    return order_id

def update_order(order_id, data):
    db.update('orders', data, 'id = ?', order_id)
    
    # Update cache
    cache_key = f'order:{order_id}'
    cached = redis.get(cache_key)
    if cached:
        order = json.loads(cached)
        order.update(data)
        redis.setex(cache_key, 3600, json.dumps(order))

Write-Behind:

# Write-behind - async database writes
import asyncio
from collections import deque

write_queue = deque()

async def update_user_async(user_id, data):
    # Update cache immediately
    redis.setex(f'user:{user_id}', 3600, json.dumps(data))
    
    # Queue for async database write
    write_queue.append({
        'table': 'users',
        'id': user_id,
        'data': data
    })

async def flush_writes():
    while True:
        if write_queue:
            batch = []
            while write_queue and len(batch) < 100:
                batch.append(write_queue.popleft())
            
            # Bulk write to database
            db.batch_update('users', batch)
        
        await asyncio.sleep(1)

Redis Cluster Patterns

# Redis Cluster connection
from redis.cluster import RedisCluster

nodes = [
    {'host': 'redis-1', 'port': 6379},
    {'host': 'redis-2', 'port': 6379},
    {'host': 'redis-3', 'port': 6379},
]

rc = RedisCluster(startup_nodes=nodes, decode_responses=True)

# Automatic key distribution
rc.set('user:1:name', 'Alice')
rc.set('user:2:name', 'Bob')
# Keys automatically sharded across nodes

Redis Pub/Sub for Cache Invalidation

# Publisher - notify other services of changes
def invalidate_user_cache(user_id):
    # Local cache delete
    redis.delete(f'user:{user_id}')
    
    # Notify other instances
    redis.publish('cache:invalidate', json.dumps({
        'key': f'user:{user_id}',
        'pattern': f'user:{user_id}:*'
    }))
# Subscriber - listen for invalidations
def listen_for_invalidation():
    pubsub = redis.pubsub()
    pubsub.subscribe('cache:invalidate')
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            if '*' in data['pattern']:
                # Handle pattern
                for key in redis.keys(data['pattern']):
                    redis.delete(key)
            else:
                redis.delete(data['key'])

Application-Level Caching

In-Memory Caching

from functools import lru_cache
from threading import Lock
import time

# Simple in-memory cache with TTL
class Cache:
    def __init__(self, ttl=300):
        self._cache = {}
        self._timestamps = {}
        self._ttl = ttl
        self._lock = Lock()
    
    def get(self, key):
        with self._lock:
            if key in self._cache:
                if time.time() - self._timestamps[key] < self._ttl:
                    return self._cache[key]
                del self._cache[key]
                del self._timestamps[key]
        return None
    
    def set(self, key, value):
        with self._lock:
            self._cache[key] = value
            self._timestamps[key] = time.time()
    
    def delete(self, key):
        with self._lock:
            self._cache.pop(key, None)
            self._timestamps.pop(key, None)
    
    def clear(self):
        with self._lock:
            self._cache.clear()
            self._timestamps.clear()

# Usage with LRU cache
@lru_cache(maxsize=1000)
def get_product_category(product_id):
    # Expensive database query
    return db.query('SELECT category FROM products WHERE id = ?', product_id)

# Manual cache management
from cachetools import TTLCache, cached

product_cache = TTLCache(maxsize=10000, ttl=300)

@cached(cache=product_cache)
def get_product(product_id):
    return db.query('SELECT * FROM products WHERE id = ?', product_id)

Distributed Caching Patterns

# Multi-layer cache with local + Redis
class MultiLayerCache:
    def __init__(self, local_ttl=60, remote_ttl=3600):
        self.local = {}  # Local dict
        self.redis = redis.Redis()
        self.local_ttl = local_ttl
        self.remote_ttl = remote_ttl
    
    def get(self, key):
        # Check local cache first
        if key in self.local:
            value, timestamp = self.local[key]
            if time.time() - timestamp < self.local_ttl:
                return value
            del self.local[key]
        
        # Check Redis
        value = self.redis.get(key)
        if value:
            # Populate local cache
            self.local[key] = (value, time.time())
            return json.loads(value)
        
        return None
    
    def set(self, key, value):
        # Write to both layers
        self.redis.setex(key, self.remote_ttl, json.dumps(value))
        self.local[key] = (value, time.time())

CDN Caching

Cache-Control Headers

# Dynamic response with proper cache headers
def get_product(request, product_id):
    product = db.get_product(product_id)
    
    # Per-user responses shouldn't be cached
    if request.user:
        return jsonify(product)  # No cache headers = no CDN caching
    
    # Public, cacheable content
    response = jsonify(product)
    response.headers['Cache-Control'] = 'public, max-age=300'
    response.headers['Vary'] = 'Accept-Encoding'
    return response

def get_static_assets(request, filename):
    # Long cache for static assets (with versioned URLs)
    response = send_file(f'static/{filename}')
    response.headers['Cache-Control'] = 'public, max-age=31536000, immutable'
    return response

def get_user_dashboard(request):
    # Private - only browser cache
    response = jsonify(request.user.dashboard)
    response.headers['Cache-Control'] = 'private, max-age=0, no-cache'
    return response

CDN Cache Invalidation

# CloudFront invalidation via Lambda
import boto3

def invalidate_cdn(paths):
    cloudfront = boto3.client('cloudfront')
    
    response = cloudfront.create_invalidation(
        DistributionId='E1234567890ABC',
        InvalidationBatch={
            'CallerReference': f'invalidation-{int(time.time())}',
            'Paths': {
                'Quantity': len(paths),
                'Items': paths
            }
        }
    )
    
    return response['Invalidation']['Id']

# Usage
invalidate_cdn([
    '/api/products/*',
    '/static/images/*'
])
// Cloudflare cache purge via API
async function purgeCache(urls) {
  const response = await fetch(
    'https://api.cloudflare.com/client/v4/zones/:zone_id/purge_cache',
    {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${API_TOKEN}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        files: urls,
        tags: ['product-update'],
        prefixs: ['/api/']
      })
    }
  );
  
  return response.json();
}

Cache Invalidation Strategies

Time-Based Invalidation

# Fixed TTL
CACHE_TTL = {
    'user_profile': 3600,      # 1 hour
    'product_list': 300,        # 5 minutes
    'config': 86400,           # 24 hours
    'analytics': 60,            # 1 minute
}

def get_cached(key, ttl_key, fetcher):
    cached = redis.get(key)
    if cached:
        return json.loads(cached)
    
    value = fetcher()
    redis.setex(key, CACHE_TTL[ttl_key], json.dumps(value))
    return value

Event-Based Invalidation

# Invalidate on data changes
def on_product_updated(product_id):
    # Invalidate specific cache
    redis.delete(f'product:{product_id}')
    
    # Invalidate list caches
    for pattern in ['products:list:*', 'products:category:*']:
        for key in redis.keys(pattern):
            redis.delete(key)
    
    # Notify other services
    redis.publish('cache:invalidate', {
        'type': 'product',
        'id': product_id
    })

Stale-While-Revalidate

# Serve stale content while refreshing in background
def get_product_with_revalidation(product_id):
    cache_key = f'product:{product_id}'
    
    # Try to get from cache
    cached = redis.get(cache_key)
    
    if cached:
        product = json.loads(cached)
        
        # Check if stale
        is_stale = redis.ttl(cache_key) < 60
        
        if is_stale:
            # Schedule background refresh (don't block)
            asyncio.create_task(refresh_product_cache(product_id))
        
        return product
    
    # Cache miss - fetch synchronously
    return refresh_product_cache(product_id)

async def refresh_product_cache(product_id):
    product = db.get_product(product_id)
    
    # Update cache
    redis.setex(f'product:{product_id}', 3600, json.dumps(product))
    
    return product

Cache Monitoring

Key Metrics

# Prometheus Redis exporter configuration
- name: redis
  rules:
    - alert: CacheHitRateLow
      expr: |
        (redis_keyspace_hits_total / 
        (redis_keyspace_hits_total + redis_keyspace_misses_total)) < 0.8
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "Cache hit rate below 80%"
        
    - alert: RedisMemoryHigh
      expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.9
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "Redis memory usage above 90%"
        
    - alert: RedisEvictionsHigh
      expr: rate(redis_evicted_keys_total[5m]) > 10
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "High number of key evictions"

Cache Analytics

# Cache hit/miss tracking
class CacheMetrics:
    def __init__(self):
        self.hits = 0
        self.misses = 0
        self.errors = 0
    
    def record_hit(self):
        self.hits += 1
    
    def record_miss(self):
        self.misses += 1
    
    @property
    def hit_rate(self):
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0

metrics = CacheMetrics()

def get_cached(key):
    try:
        value = redis.get(key)
        if value:
            metrics.record_hit()
            return json.loads(value)
        metrics.record_miss()
        return None
    except Exception:
        metrics.errors += 1
        return None

Memcached vs Redis

Feature Redis Memcached
Data structures Strings, Hashes, Lists, Sets, Sorted Sets Strings only
Persistence RDB + AOF No
Clustering Redis Cluster Memcached SASL
Pub/Sub Yes No
Lua scripts Yes No
Memory efficiency Moderate Higher
Performance Very fast Extremely fast
Use case Rich data, persistence Simple caching
# Memcached is simpler - no complex data types
import pymemcache

mc = pymemcache.Client(('localhost', 11211))

# Simple string caching only
mc.set('key', 'value')
value = mc.get('key')

# No TTL with default
mc.set('key', 'value', expire=3600)

Best Practices

Cache Anti-Patterns

Don’t cache too much data:

# Bad: Caching entire database rows
user = cache.get(f'user:{user_id}')  # Contains password hash!

# Good: Cache only what's needed
user_summary = cache.get(f'user:{user_id}:summary')

Don’t use indefinite TTLs:

# Bad: No expiration
redis.set('config', json.dumps(config))

# Good: Reasonable TTL with refresh
redis.setex('config', 3600, json.dumps(config))

Handle cache misses gracefully:

# Bad: Cache stampede
for i in range(100):  # 100 simultaneous requests
    user = get_user(123)  # All hit DB simultaneously

# Good: Request coalescing
lock = redis.lock('user:123:lock', timeout=5)
if lock.acquire():
    try:
        user = get_user(123)
    finally:
        lock.release()
else:
    # Wait for other request to populate cache
    time.sleep(0.1)
    user = get_user(123)

Conclusion

Effective caching requires strategy, not just technology:

  • Use CDN for all static content
  • Implement cache-aside for read-heavy workloads
  • Use write-through for critical data
  • Implement cache invalidation on data changes
  • Monitor hit rates and adjust TTLs
  • Plan for cache failures gracefully

Start with CDN, add Redis for session and query caching, then layer in application caches as needed.

External Resources

Comments