Skip to main content

Caching Strategies: Redis, CDN, and Application Caching

Created: February 27, 2026 Larry Qu 7 min read

Caching is the most impactful performance optimization available. A well-designed caching strategy can reduce latency by 10-100x and reduce load on origin systems by 90% or more. This guide covers caching at every layer - from CDN to application to database.

The Caching Pyramid

┌─────────────────────────────────────┐
│        CDN / Edge Cache             │  ← Milliseconds, global
├─────────────────────────────────────┤
│        Application Cache            │  ← Milliseconds
├─────────────────────────────────────┤
│        Database Query Cache         │  ← Milliseconds
├─────────────────────────────────────┤
│        Primary Database             │  ← Milliseconds to seconds
└─────────────────────────────────────┘

Each layer caches different data with different characteristics.

Redis Deep Dive

Redis Data Structures

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

# Strings - Simple values
r.set('user:123:name', 'John Doe')
r.set('user:123:email', '[email protected]')
r.setex('session:abc123', 3600, json.dumps({'user_id': 123}))

# Hashes - Objects
r.hset('user:123', mapping={
    'name': 'John Doe',
    'email': '[email protected]',
    'created_at': '2024-01-15'
})
user = r.hgetall('user:123')

# Lists - Ordered collections
r.lpush('recent_searches:user123', 'shoes')
r.ltrim('recent_searches:user123', 0, 9)  # Keep only 10

# Sets - Unique collections
r.sadd('product:views:2024-01-15', 'product1', 'product2', 'product3')
unique_viewers = r.scard('product:views:2024-01-15')

# Sorted Sets - Leaderboards
r.zadd('leaderboard:score', {'player1': 100, 'player2': 95, 'player3': 90})
top_players = r.zrevrange('leaderboard:score', 0, 9, withscores=True)

Caching Patterns

Cache-Aside (Lazy Loading):

# Standard cache-aside pattern
def get_user(user_id):
    # Try cache first
    cached = redis.get(f'user:{user_id}')
    if cached:
        return json.loads(cached)
    
    # Cache miss - fetch from database
    user = db.query('SELECT * FROM users WHERE id = ?', user_id)
    
    if user:
        # Store in cache with TTL
        redis.setex(f'user:{user_id}', 3600, json.dumps(user))
    
    return user

def update_user(user_id, data):
    # Update database first
    db.execute('UPDATE users SET ... WHERE id = ?', user_id, data)
    
    # Invalidate cache
    redis.delete(f'user:{user_id}')

Write-Through:

# Write-through - cache updated on every write
def create_order(order_data):
    # Write to database
    order_id = db.insert('orders', order_data)
    
    # Write to cache immediately
    cache_key = f'order:{order_id}'
    redis.setex(cache_key, 3600, json.dumps({**order_data, 'id': order_id}))
    
    return order_id

def update_order(order_id, data):
    db.update('orders', data, 'id = ?', order_id)
    
    # Update cache
    cache_key = f'order:{order_id}'
    cached = redis.get(cache_key)
    if cached:
        order = json.loads(cached)
        order.update(data)
        redis.setex(cache_key, 3600, json.dumps(order))

Write-Behind:

# Write-behind - async database writes
import asyncio
from collections import deque

write_queue = deque()

async def update_user_async(user_id, data):
    # Update cache immediately
    redis.setex(f'user:{user_id}', 3600, json.dumps(data))
    
    # Queue for async database write
    write_queue.append({
        'table': 'users',
        'id': user_id,
        'data': data
    })

async def flush_writes():
    while True:
        if write_queue:
            batch = []
            while write_queue and len(batch) < 100:
                batch.append(write_queue.popleft())
            
            # Bulk write to database
            db.batch_update('users', batch)
        
        await asyncio.sleep(1)

Redis Cluster Patterns

# Redis Cluster connection
from redis.cluster import RedisCluster

nodes = [
    {'host': 'redis-1', 'port': 6379},
    {'host': 'redis-2', 'port': 6379},
    {'host': 'redis-3', 'port': 6379},
]

rc = RedisCluster(startup_nodes=nodes, decode_responses=True)

# Automatic key distribution
rc.set('user:1:name', 'Alice')
rc.set('user:2:name', 'Bob')
# Keys automatically sharded across nodes

Redis Pub/Sub for Cache Invalidation

# Publisher - notify other services of changes
def invalidate_user_cache(user_id):
    # Local cache delete
    redis.delete(f'user:{user_id}')
    
    # Notify other instances
    redis.publish('cache:invalidate', json.dumps({
        'key': f'user:{user_id}',
        'pattern': f'user:{user_id}:*'
    }))
# Subscriber - listen for invalidations
def listen_for_invalidation():
    pubsub = redis.pubsub()
    pubsub.subscribe('cache:invalidate')
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            if '*' in data['pattern']:
                # Handle pattern
                for key in redis.keys(data['pattern']):
                    redis.delete(key)
            else:
                redis.delete(data['key'])

Application-Level Caching

In-Memory Caching

from functools import lru_cache
from threading import Lock
import time

# Simple in-memory cache with TTL
class Cache:
    def __init__(self, ttl=300):
        self._cache = {}
        self._timestamps = {}
        self._ttl = ttl
        self._lock = Lock()
    
    def get(self, key):
        with self._lock:
            if key in self._cache:
                if time.time() - self._timestamps[key] < self._ttl:
                    return self._cache[key]
                del self._cache[key]
                del self._timestamps[key]
        return None
    
    def set(self, key, value):
        with self._lock:
            self._cache[key] = value
            self._timestamps[key] = time.time()
    
    def delete(self, key):
        with self._lock:
            self._cache.pop(key, None)
            self._timestamps.pop(key, None)
    
    def clear(self):
        with self._lock:
            self._cache.clear()
            self._timestamps.clear()

# Usage with LRU cache
@lru_cache(maxsize=1000)
def get_product_category(product_id):
    # Expensive database query
    return db.query('SELECT category FROM products WHERE id = ?', product_id)

# Manual cache management
from cachetools import TTLCache, cached

product_cache = TTLCache(maxsize=10000, ttl=300)

@cached(cache=product_cache)
def get_product(product_id):
    return db.query('SELECT * FROM products WHERE id = ?', product_id)

Distributed Caching Patterns

# Multi-layer cache with local + Redis
class MultiLayerCache:
    def __init__(self, local_ttl=60, remote_ttl=3600):
        self.local = {}  # Local dict
        self.redis = redis.Redis()
        self.local_ttl = local_ttl
        self.remote_ttl = remote_ttl
    
    def get(self, key):
        # Check local cache first
        if key in self.local:
            value, timestamp = self.local[key]
            if time.time() - timestamp < self.local_ttl:
                return value
            del self.local[key]
        
        # Check Redis
        value = self.redis.get(key)
        if value:
            # Populate local cache
            self.local[key] = (value, time.time())
            return json.loads(value)
        
        return None
    
    def set(self, key, value):
        # Write to both layers
        self.redis.setex(key, self.remote_ttl, json.dumps(value))
        self.local[key] = (value, time.time())

CDN Caching

Cache-Control Headers

# Dynamic response with proper cache headers
def get_product(request, product_id):
    product = db.get_product(product_id)
    
    # Per-user responses shouldn't be cached
    if request.user:
        return jsonify(product)  # No cache headers = no CDN caching
    
    # Public, cacheable content
    response = jsonify(product)
    response.headers['Cache-Control'] = 'public, max-age=300'
    response.headers['Vary'] = 'Accept-Encoding'
    return response

def get_static_assets(request, filename):
    # Long cache for static assets (with versioned URLs)
    response = send_file(f'static/{filename}')
    response.headers['Cache-Control'] = 'public, max-age=31536000, immutable'
    return response

def get_user_dashboard(request):
    # Private - only browser cache
    response = jsonify(request.user.dashboard)
    response.headers['Cache-Control'] = 'private, max-age=0, no-cache'
    return response

CDN Cache Invalidation

# CloudFront invalidation via Lambda
import boto3

def invalidate_cdn(paths):
    cloudfront = boto3.client('cloudfront')
    
    response = cloudfront.create_invalidation(
        DistributionId='E1234567890ABC',
        InvalidationBatch={
            'CallerReference': f'invalidation-{int(time.time())}',
            'Paths': {
                'Quantity': len(paths),
                'Items': paths
            }
        }
    )
    
    return response['Invalidation']['Id']

# Usage
invalidate_cdn([
    '/api/products/*',
    '/static/images/*'
])
// Cloudflare cache purge via API
async function purgeCache(urls) {
  const response = await fetch(
    'https://api.cloudflare.com/client/v4/zones/:zone_id/purge_cache',
    {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${API_TOKEN}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        files: urls,
        tags: ['product-update'],
        prefixs: ['/api/']
      })
    }
  );
  
  return response.json();
}

Cache Invalidation Strategies

Time-Based Invalidation

# Fixed TTL
CACHE_TTL = {
    'user_profile': 3600,      # 1 hour
    'product_list': 300,        # 5 minutes
    'config': 86400,           # 24 hours
    'analytics': 60,            # 1 minute
}

def get_cached(key, ttl_key, fetcher):
    cached = redis.get(key)
    if cached:
        return json.loads(cached)
    
    value = fetcher()
    redis.setex(key, CACHE_TTL[ttl_key], json.dumps(value))
    return value

Event-Based Invalidation

# Invalidate on data changes
def on_product_updated(product_id):
    # Invalidate specific cache
    redis.delete(f'product:{product_id}')
    
    # Invalidate list caches
    for pattern in ['products:list:*', 'products:category:*']:
        for key in redis.keys(pattern):
            redis.delete(key)
    
    # Notify other services
    redis.publish('cache:invalidate', {
        'type': 'product',
        'id': product_id
    })

Stale-While-Revalidate

# Serve stale content while refreshing in background
def get_product_with_revalidation(product_id):
    cache_key = f'product:{product_id}'
    
    # Try to get from cache
    cached = redis.get(cache_key)
    
    if cached:
        product = json.loads(cached)
        
        # Check if stale
        is_stale = redis.ttl(cache_key) < 60
        
        if is_stale:
            # Schedule background refresh (don't block)
            asyncio.create_task(refresh_product_cache(product_id))
        
        return product
    
    # Cache miss - fetch synchronously
    return refresh_product_cache(product_id)

async def refresh_product_cache(product_id):
    product = db.get_product(product_id)
    
    # Update cache
    redis.setex(f'product:{product_id}', 3600, json.dumps(product))
    
    return product

Cache Monitoring

Key Metrics

# Prometheus Redis exporter configuration
- name: redis
  rules:
    - alert: CacheHitRateLow
      expr: |
        (redis_keyspace_hits_total / 
        (redis_keyspace_hits_total + redis_keyspace_misses_total)) < 0.8
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "Cache hit rate below 80%"
        
    - alert: RedisMemoryHigh
      expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.9
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "Redis memory usage above 90%"
        
    - alert: RedisEvictionsHigh
      expr: rate(redis_evicted_keys_total[5m]) > 10
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "High number of key evictions"

Cache Analytics

# Cache hit/miss tracking
class CacheMetrics:
    def __init__(self):
        self.hits = 0
        self.misses = 0
        self.errors = 0
    
    def record_hit(self):
        self.hits += 1
    
    def record_miss(self):
        self.misses += 1
    
    @property
    def hit_rate(self):
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0

metrics = CacheMetrics()

def get_cached(key):
    try:
        value = redis.get(key)
        if value:
            metrics.record_hit()
            return json.loads(value)
        metrics.record_miss()
        return None
    except Exception:
        metrics.errors += 1
        return None

Memcached vs Redis

Feature Redis Memcached
Data structures Strings, Hashes, Lists, Sets, Sorted Sets Strings only
Persistence RDB + AOF No
Clustering Redis Cluster Memcached SASL
Pub/Sub Yes No
Lua scripts Yes No
Memory efficiency Moderate Higher
Performance Very fast Extremely fast
Use case Rich data, persistence Simple caching
# Memcached is simpler - no complex data types
import pymemcache

mc = pymemcache.Client(('localhost', 11211))

# Simple string caching only
mc.set('key', 'value')
value = mc.get('key')

# No TTL with default
mc.set('key', 'value', expire=3600)

Best Practices

Cache Anti-Patterns

Don’t cache too much data:

# Bad: Caching entire database rows
user = cache.get(f'user:{user_id}')  # Contains password hash!

# Good: Cache only what's needed
user_summary = cache.get(f'user:{user_id}:summary')

Don’t use indefinite TTLs:

# Bad: No expiration
redis.set('config', json.dumps(config))

# Good: Reasonable TTL with refresh
redis.setex('config', 3600, json.dumps(config))

Handle cache misses gracefully:

# Bad: Cache stampede
for i in range(100):  # 100 simultaneous requests
    user = get_user(123)  # All hit DB simultaneously

# Good: Request coalescing
lock = redis.lock('user:123:lock', timeout=5)
if lock.acquire():
    try:
        user = get_user(123)
    finally:
        lock.release()
else:
    # Wait for other request to populate cache
    time.sleep(0.1)
    user = get_user(123)

Conclusion

Effective caching requires strategy, not just technology:

  • Use CDN for all static content
  • Implement cache-aside for read-heavy workloads
  • Use write-through for critical data
  • Implement cache invalidation on data changes
  • Monitor hit rates and adjust TTLs
  • Plan for cache failures gracefully

Start with CDN, add Redis for session and query caching, then layer in application caches as needed.

External Resources

Resources

Comments

Share this article

Scan to read on mobile

👍 Was this article helpful?