Introduction
Caching is one of the most effective techniques for improving application performance. By storing frequently accessed data in fast storage, caching reduces latency, decreases database load, and improves overall system throughput. However, caching introduces complexity around data freshness, cache invalidation, and consistency.
This guide explores caching strategies at multiple levels: application-level caching with Redis, content delivery networks (CDN), HTTP caching headers, and cache invalidation patterns. Understanding when and how to apply each strategy is essential for building high-performance applications.
Redis Caching
Redis Fundamentals
Redis is an in-memory data store that serves as a versatile caching solution. It supports various data structures including strings, hashes, lists, sets, and sorted sets. Redis provides sub-millisecond latency, making it ideal for frequently accessed data.
import redis
import json
from typing import Optional, Any
from dataclasses import dataclass
import time
@dataclass
class CacheConfig:
host: str = "localhost"
port: int = 6379
db: int = 0
password: Optional[str] = None
max_connections: int = 50
socket_timeout: float = 5.0
socket_connect_timeout: float = 5.0
class RedisCache:
"""Redis cache implementation with connection pooling."""
def __init__(self, config: CacheConfig = None):
self.config = config or CacheConfig()
self.pool = redis.ConnectionPool(
host=self.config.host,
port=self.config.port,
db=self.config.db,
password=self.config.password,
max_connections=self.config.max_connections,
socket_timeout=self.config.socket_timeout,
socket_connect_timeout=self.config.socket_connect_timeout,
decode_responses=True
)
self.client = redis.Redis(connection_pool=self.pool)
def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
try:
value = self.client.get(key)
return json.loads(value) if value else None
except redis.RedisError:
return None
def set(
self,
key: str,
value: Any,
ttl: int = None,
nx: bool = False,
xx: bool = False
) -> bool:
"""Set value in cache with optional TTL."""
try:
serialized = json.dumps(value)
return self.client.set(key, serialized, ex=ttl, nx=nx, xx=xx)
except redis.RedisError:
return False
def delete(self, key: str) -> bool:
"""Delete key from cache."""
try:
return self.client.delete(key) > 0
except redis.RedisError:
return False
def delete_pattern(self, pattern: str) -> int:
"""Delete all keys matching pattern."""
try:
keys = self.client.keys(pattern)
if keys:
return self.client.delete(*keys)
return 0
except redis.RedisError:
return 0
def exists(self, key: str) -> bool:
"""Check if key exists."""
try:
return self.client.exists(key) > 0
except redis.RedisError:
return False
def increment(self, key: str, amount: int = 1) -> int:
"""Increment value."""
try:
return self.client.incrby(key, amount)
except redis.RedisError:
return 0
def get_or_set(self, key: str, fetch_func, ttl: int = 300) -> Optional[Any]:
"""Get value or set if not exists using double-checked locking."""
# First check (fast path)
value = self.get(key)
if value is not None:
return value
# Acquire lock and fetch
lock_key = f"lock:{key}"
if self.client.set(lock_key, "1", nx=True, ex=10):
try:
# Double check after acquiring lock
value = self.get(key)
if value is not None:
return value
# Fetch and cache
value = fetch_func()
self.set(key, value, ttl)
return value
finally:
self.client.delete(lock_key)
# Another process is fetching, wait and retry
time.sleep(0.1)
return self.get(key)
Caching Patterns
class UserCache:
"""User-specific caching with Redis."""
def __init__(self, cache: RedisCache):
self.cache = cache
self.key_prefix = "user:"
self.default_ttl = 3600 # 1 hour
def _key(self, user_id: str) -> str:
return f"{self.key_prefix}{user_id}"
def get_user(self, user_id: str) -> Optional[dict]:
"""Get user from cache."""
return self.cache.get(self._key(user_id))
def set_user(self, user_id: str, user_data: dict, ttl: int = None) -> None:
"""Cache user data."""
self.cache.set(self._key(user_id), user_data, ttl or self.default_ttl)
def invalidate_user(self, user_id: str) -> None:
"""Invalidate cached user."""
self.cache.delete(self._key(user_id))
def get_user_sessions(self, user_id: str) -> list:
"""Get user's active sessions."""
key = f"{self.key_prefix}{user_id}:sessions"
return self.cache.client.lrange(key, 0, -1) or []
def add_user_session(self, user_id: str, session_id: str) -> None:
"""Add session to user's session list."""
key = f"{self.key_prefix}{user_id}:sessions"
self.cache.client.rpush(key, session_id)
self.cache.client.expire(key, 86400) # 24 hours
def invalidate_all_user_data(self, user_id: str) -> None:
"""Invalidate all cached data for a user."""
pattern = f"{self.key_prefix}{user_id}*"
self.cache.delete_pattern(pattern)
class RateLimiter:
"""Rate limiter using Redis."""
def __init__(self, cache: RedisCache):
self.cache = cache
def is_allowed(
self,
identifier: str,
max_requests: int,
window_seconds: int
) -> tuple[bool, int]:
"""
Check if request is allowed.
Returns (is_allowed, remaining_requests).
"""
key = f"ratelimit:{identifier}"
pipe = self.cache.client.pipeline()
pipe.incr(key)
pipe.expire(key, window_seconds)
results = pipe.execute()
current = results[0]
remaining = max(0, max_requests - current)
return current <= max_requests, remaining
HTTP Caching
Cache-Control Headers
HTTP provides a rich set of caching mechanisms through headers. Understanding these headers is essential for building efficient web applications.
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import hashlib
@dataclass
class CachePolicy:
"""Cache policy configuration."""
max_age: Optional[int] = None # Seconds
s_maxage: Optional[int] = None # Shared cache max age
no_cache: bool = False
no_store: bool = False
private: bool = False
must_revalidate: bool = False
stale_while_revalidate: Optional[int] = None
etag: Optional[str] = None
last_modified: Optional[datetime] = None
class HTTPCache:
"""HTTP caching utilities."""
@staticmethod
def build_cache_control(policy: CachePolicy) -> str:
"""Build Cache-Control header value."""
directives = []
if policy.max_age is not None:
directives.append(f"max-age={policy.max_age}")
if policy.s_maxage is not None:
directives.append(f"s-maxage={policy.s_maxage}")
if policy.no_cache:
directives.append("no-cache")
if policy.no_store:
directives.append("no-store")
if policy.private:
directives.append("private")
if policy.must_revalidate:
directives.append("must-revalidate")
if policy.stale_while_revalidate is not None:
directives.append(f"stale-while-revalidate={policy.stale_while_revalidate}")
return ", ".join(directives)
@staticmethod
def generate_etag(content: str) -> str:
"""Generate ETag from content."""
return f'"{hashlib.md5(content.encode()).hexdigest()}"'
@staticmethod
def parse_etag(header: str) -> str:
"""Parse ETag header."""
return header.strip('"')
@staticmethod
def check_etag_match(etag: str, if_none_match: str) -> bool:
"""Check if ETag matches If-None-Match header."""
return etag == HTTPCache.parse_etag(if_none_match)
class CacheableResponse:
"""Wrapper for cacheable HTTP responses."""
def __init__(
self,
content: bytes,
content_type: str = "application/json",
policy: CachePolicy = None
):
self.content = content
self.content_type = content_type
self.policy = policy or CachePolicy()
self._etag = None
@property
def etag(self) -> str:
if self._etag is None:
self._etag = HTTPCache.generate_etag(self.content.decode())
return self._etag
def to_headers(self, request_headers: dict = None) -> dict:
"""Convert to HTTP response headers."""
headers = {
"Content-Type": self.content_type,
"Cache-Control": HTTPCache.build_cache_control(self.policy),
}
if self.policy.etag or self.policy.max_age:
headers["ETag"] = self.etag
if self.policy.last_modified:
headers["Last-Modified"] = self.policy.last_modified.strftime(
"%a, %d %b %Y %H:%M:%S GMT"
)
# Check conditional request
if request_headers:
if_none_match = request_headers.get("If-None-Match")
if if_none_match and HTTPCache.check_etag_match(self.etag, if_none_match):
headers["Status"] = "304 Not Modified"
return headers
Conditional Requests
class ConditionalRequestHandler:
"""Handle conditional HTTP requests (ETag, If-Modified-Since)."""
@staticmethod
def handle_conditional(
content: bytes,
etag: str,
if_none_match: str = None
) -> tuple[bytes, int]:
"""Handle ETag-based conditional request."""
if if_none_match and etag == HTTPCache.parse_etag(if_none_match):
return b"", 304 # Not Modified
return content, 200
@staticmethod
def handle_last_modified(
content: bytes,
last_modified: datetime,
if_modified_since: str = None
) -> tuple[bytes, int]:
"""Handle Last-Modified-based conditional request."""
if if_modified_since:
try:
ims = datetime.strptime(
if_modified_since, "%a, %d %b %Y %H:%M:%S GMT"
)
if last_modified <= ims:
return b"", 304
except ValueError:
pass
return content, 200
CDN Caching
CDN Integration
Content Delivery Networks cache content at edge locations worldwide, reducing latency for global users.
class CDNCache:
"""CDN caching utilities."""
def __init__(self, cdn_provider: str = "cloudflare"):
self.provider = cdn_provider
def build_cdn_url(
self,
original_url: str,
transformations: dict = None
) -> str:
"""Build CDN URL with optional transformations."""
if self.provider == "cloudflare":
return self._cloudflare_url(original_url, transformations)
elif self.provider == "fastly":
return self._fastly_url(original_url, transformations)
return original_url
def _cloudflare_url(
self,
url: str,
transformations: dict = None
) -> str:
"""Build Cloudflare Image Resizing URL."""
if not transformations:
return url
params = []
if "width" in transformations:
params.append(f"width={transformations['width']}")
if "height" in transformations:
params.append(f"height={transformations['height']}")
if "fit" in transformations:
params.append(f"fit={transformations['fit']}")
if "quality" in transformations:
params.append(f"quality={transformations['quality']}")
if params:
return f"{url}?{ '&'.join(params) }"
return url
def purge_cache(self, urls: list) -> dict:
"""Purge CDN cache for URLs."""
# Implementation depends on CDN provider
return {"purged": len(urls), "urls": urls}
def get_cache_status(self, url: str) -> dict:
"""Check cache status for a URL."""
return {
"url": url,
"cached": True,
"cache_status": "HIT"
}
Cache Invalidation Patterns
Invalidation Strategies
Cache invalidation is one of the hardest problems in caching. Several patterns help manage this complexity.
from enum import Enum
from typing import List, Callable
class InvalidationStrategy(Enum):
TIME_BASED = "time_based"
EVENT_BASED = "event_based"
MANUAL = "manual"
HYBRID = "hybrid"
class CacheInvalidator:
"""Cache invalidation manager."""
def __init__(self, cache: RedisCache):
self.cache = cache
self.subscribers: List[Callable] = []
def subscribe(self, callback: Callable) -> None:
"""Subscribe to invalidation events."""
self.subscribers.append(callback)
def invalidate(self, key: str) -> None:
"""Invalidate a cache key."""
self.cache.delete(key)
self._notify_subscribers("invalidate", key)
def invalidate_pattern(self, pattern: str) -> int:
"""Invalidate all keys matching pattern."""
count = self.cache.delete_pattern(pattern)
self._notify_subscribers("invalidate_pattern", pattern)
return count
def invalidate_by_entity(self, entity_type: str, entity_id: str) -> None:
"""Invalidate all cache entries for an entity."""
pattern = f"{entity_type}:{entity_id}*"
self.invalidate_pattern(pattern)
def _notify_subscribers(self, event: str, data: str) -> None:
"""Notify all subscribers of invalidation event."""
for callback in self.subscribers:
try:
callback(event, data)
except Exception:
pass
class TimeBasedExpiration:
"""Time-based cache expiration."""
def __init__(self, cache: RedisCache):
self.cache = cache
def set_with_soft_expire(
self,
key: str,
value: Any,
hard_ttl: int,
soft_ttl: int = None
) -> None:
"""
Set cache with soft expire.
Hard TTL: absolute expiration
Soft TTL: background refresh window
"""
soft = soft_ttl or hard_ttl // 2
# Store both hard and soft expire times
data = {
"value": value,
"hard_expire": time.time() + hard_ttl,
"soft_expire": time.time() + soft
}
self.cache.set(key, data, ttl=hard_ttl)
def get_with_background_refresh(
self,
key: str,
fetch_func: Callable,
hard_ttl: int,
soft_ttl: int = None
) -> Any:
"""Get value with background refresh on soft expire."""
data = self.cache.get(key)
if data is None:
return self._fetch_and_cache(key, fetch_func, hard_ttl)
now = time.time()
# Check if hard expired
if now > data["hard_expire"]:
return self._fetch_and_cache(key, fetch_func, hard_ttl)
# Check if soft expired - trigger background refresh
if now > data["soft_expire"]:
# Trigger async refresh (in production, use task queue)
self._trigger_background_refresh(key, fetch_func, hard_ttl)
return data["value"]
def _fetch_and_cache(
self,
key: str,
fetch_func: Callable,
ttl: int
) -> Any:
"""Fetch value and cache it."""
value = fetch_func()
self.cache.set(key, {"value": value, "hard_expire": time.time() + ttl, "soft_expire": time.time() + ttl // 2}, ttl)
return value
def _trigger_background_refresh(
self,
key: str,
fetch_func: Callable,
ttl: int
) -> None:
"""Trigger background refresh (simplified)."""
# In production, use Celery, RQ, or similar
import threading
thread = threading.Thread(
target=self._fetch_and_cache,
args=(key, fetch_func, ttl)
)
thread.start()
Event-Driven Invalidation
class EventDrivenCache:
"""Cache with event-driven invalidation."""
def __init__(self, cache: RedisCache, event_bus):
self.cache = cache
self.event_bus = event_bus
self._setup_subscriptions()
def _setup_subscriptions(self):
"""Subscribe to domain events for cache invalidation."""
self.event_bus.subscribe("user.updated", self._on_user_updated)
self.event_bus.subscribe("product.updated", self._on_product_updated)
self.event_bus.subscribe("order.completed", self._on_order_completed)
def _on_user_updated(self, event: dict) -> None:
"""Invalidate user-related cache entries."""
user_id = event["user_id"]
self.cache.delete_pattern(f"user:{user_id}*")
self.cache.delete_pattern(f"users:list*")
def _on_product_updated(self, event: dict) -> None:
"""Invalidate product-related cache entries."""
product_id = event["product_id"]
self.cache.delete_pattern(f"product:{product_id}")
self.cache.delete_pattern(f"products:list*")
def _on_order_completed(self, event: dict) -> None:
"""Invalidate order-related cache entries."""
user_id = event["user_id"]
self.cache.delete_pattern(f"order:{event['order_id']}")
self.cache.delete_pattern(f"user:{user_id}:orders*")
Conclusion
Effective caching requires strategy at multiple levels: application caching with Redis, HTTP caching with proper headers, and CDN caching for global content distribution. Cache invalidation remains challenging, but patterns like event-driven invalidation and time-based expiration help manage complexity.
Key principles: cache at the appropriate level, use proper cache headers, implement robust invalidation, and monitor cache effectiveness. Caching without strategy can lead to stale data and debugging nightmares.
Resources
- “Caching Architecture Guide” on AWS
- Redis Documentation
- MDN HTTP Caching Guide
- Cloudflare CDN Documentation
Comments