Skip to main content
โšก Calmops

Caching Strategies: Redis, CDN, and HTTP Caching for High-Performance Applications

Introduction

Caching is one of the most effective techniques for improving application performance. By storing frequently accessed data in fast storage, caching reduces latency, decreases database load, and improves overall system throughput. However, caching introduces complexity around data freshness, cache invalidation, and consistency.

This guide explores caching strategies at multiple levels: application-level caching with Redis, content delivery networks (CDN), HTTP caching headers, and cache invalidation patterns. Understanding when and how to apply each strategy is essential for building high-performance applications.

Redis Caching

Redis Fundamentals

Redis is an in-memory data store that serves as a versatile caching solution. It supports various data structures including strings, hashes, lists, sets, and sorted sets. Redis provides sub-millisecond latency, making it ideal for frequently accessed data.

import redis
import json
from typing import Optional, Any
from dataclasses import dataclass
import time

@dataclass
class CacheConfig:
    host: str = "localhost"
    port: int = 6379
    db: int = 0
    password: Optional[str] = None
    max_connections: int = 50
    socket_timeout: float = 5.0
    socket_connect_timeout: float = 5.0

class RedisCache:
    """Redis cache implementation with connection pooling."""
    
    def __init__(self, config: CacheConfig = None):
        self.config = config or CacheConfig()
        self.pool = redis.ConnectionPool(
            host=self.config.host,
            port=self.config.port,
            db=self.config.db,
            password=self.config.password,
            max_connections=self.config.max_connections,
            socket_timeout=self.config.socket_timeout,
            socket_connect_timeout=self.config.socket_connect_timeout,
            decode_responses=True
        )
        self.client = redis.Redis(connection_pool=self.pool)
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache."""
        try:
            value = self.client.get(key)
            return json.loads(value) if value else None
        except redis.RedisError:
            return None
    
    def set(
        self,
        key: str,
        value: Any,
        ttl: int = None,
        nx: bool = False,
        xx: bool = False
    ) -> bool:
        """Set value in cache with optional TTL."""
        try:
            serialized = json.dumps(value)
            return self.client.set(key, serialized, ex=ttl, nx=nx, xx=xx)
        except redis.RedisError:
            return False
    
    def delete(self, key: str) -> bool:
        """Delete key from cache."""
        try:
            return self.client.delete(key) > 0
        except redis.RedisError:
            return False
    
    def delete_pattern(self, pattern: str) -> int:
        """Delete all keys matching pattern."""
        try:
            keys = self.client.keys(pattern)
            if keys:
                return self.client.delete(*keys)
            return 0
        except redis.RedisError:
            return 0
    
    def exists(self, key: str) -> bool:
        """Check if key exists."""
        try:
            return self.client.exists(key) > 0
        except redis.RedisError:
            return False
    
    def increment(self, key: str, amount: int = 1) -> int:
        """Increment value."""
        try:
            return self.client.incrby(key, amount)
        except redis.RedisError:
            return 0
    
    def get_or_set(self, key: str, fetch_func, ttl: int = 300) -> Optional[Any]:
        """Get value or set if not exists using double-checked locking."""
        # First check (fast path)
        value = self.get(key)
        if value is not None:
            return value
        
        # Acquire lock and fetch
        lock_key = f"lock:{key}"
        if self.client.set(lock_key, "1", nx=True, ex=10):
            try:
                # Double check after acquiring lock
                value = self.get(key)
                if value is not None:
                    return value
                
                # Fetch and cache
                value = fetch_func()
                self.set(key, value, ttl)
                return value
            finally:
                self.client.delete(lock_key)
        
        # Another process is fetching, wait and retry
        time.sleep(0.1)
        return self.get(key)

Caching Patterns

class UserCache:
    """User-specific caching with Redis."""
    
    def __init__(self, cache: RedisCache):
        self.cache = cache
        self.key_prefix = "user:"
        self.default_ttl = 3600  # 1 hour
    
    def _key(self, user_id: str) -> str:
        return f"{self.key_prefix}{user_id}"
    
    def get_user(self, user_id: str) -> Optional[dict]:
        """Get user from cache."""
        return self.cache.get(self._key(user_id))
    
    def set_user(self, user_id: str, user_data: dict, ttl: int = None) -> None:
        """Cache user data."""
        self.cache.set(self._key(user_id), user_data, ttl or self.default_ttl)
    
    def invalidate_user(self, user_id: str) -> None:
        """Invalidate cached user."""
        self.cache.delete(self._key(user_id))
    
    def get_user_sessions(self, user_id: str) -> list:
        """Get user's active sessions."""
        key = f"{self.key_prefix}{user_id}:sessions"
        return self.cache.client.lrange(key, 0, -1) or []
    
    def add_user_session(self, user_id: str, session_id: str) -> None:
        """Add session to user's session list."""
        key = f"{self.key_prefix}{user_id}:sessions"
        self.cache.client.rpush(key, session_id)
        self.cache.client.expire(key, 86400)  # 24 hours
    
    def invalidate_all_user_data(self, user_id: str) -> None:
        """Invalidate all cached data for a user."""
        pattern = f"{self.key_prefix}{user_id}*"
        self.cache.delete_pattern(pattern)

class RateLimiter:
    """Rate limiter using Redis."""
    
    def __init__(self, cache: RedisCache):
        self.cache = cache
    
    def is_allowed(
        self,
        identifier: str,
        max_requests: int,
        window_seconds: int
    ) -> tuple[bool, int]:
        """
        Check if request is allowed.
        Returns (is_allowed, remaining_requests).
        """
        key = f"ratelimit:{identifier}"
        
        pipe = self.cache.client.pipeline()
        pipe.incr(key)
        pipe.expire(key, window_seconds)
        results = pipe.execute()
        
        current = results[0]
        remaining = max(0, max_requests - current)
        
        return current <= max_requests, remaining

HTTP Caching

Cache-Control Headers

HTTP provides a rich set of caching mechanisms through headers. Understanding these headers is essential for building efficient web applications.

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import hashlib

@dataclass
class CachePolicy:
    """Cache policy configuration."""
    max_age: Optional[int] = None  # Seconds
    s_maxage: Optional[int] = None  # Shared cache max age
    no_cache: bool = False
    no_store: bool = False
    private: bool = False
    must_revalidate: bool = False
    stale_while_revalidate: Optional[int] = None
    etag: Optional[str] = None
    last_modified: Optional[datetime] = None

class HTTPCache:
    """HTTP caching utilities."""
    
    @staticmethod
    def build_cache_control(policy: CachePolicy) -> str:
        """Build Cache-Control header value."""
        directives = []
        
        if policy.max_age is not None:
            directives.append(f"max-age={policy.max_age}")
        if policy.s_maxage is not None:
            directives.append(f"s-maxage={policy.s_maxage}")
        if policy.no_cache:
            directives.append("no-cache")
        if policy.no_store:
            directives.append("no-store")
        if policy.private:
            directives.append("private")
        if policy.must_revalidate:
            directives.append("must-revalidate")
        if policy.stale_while_revalidate is not None:
            directives.append(f"stale-while-revalidate={policy.stale_while_revalidate}")
        
        return ", ".join(directives)
    
    @staticmethod
    def generate_etag(content: str) -> str:
        """Generate ETag from content."""
        return f'"{hashlib.md5(content.encode()).hexdigest()}"'
    
    @staticmethod
    def parse_etag(header: str) -> str:
        """Parse ETag header."""
        return header.strip('"')
    
    @staticmethod
    def check_etag_match(etag: str, if_none_match: str) -> bool:
        """Check if ETag matches If-None-Match header."""
        return etag == HTTPCache.parse_etag(if_none_match)

class CacheableResponse:
    """Wrapper for cacheable HTTP responses."""
    
    def __init__(
        self,
        content: bytes,
        content_type: str = "application/json",
        policy: CachePolicy = None
    ):
        self.content = content
        self.content_type = content_type
        self.policy = policy or CachePolicy()
        self._etag = None
    
    @property
    def etag(self) -> str:
        if self._etag is None:
            self._etag = HTTPCache.generate_etag(self.content.decode())
        return self._etag
    
    def to_headers(self, request_headers: dict = None) -> dict:
        """Convert to HTTP response headers."""
        headers = {
            "Content-Type": self.content_type,
            "Cache-Control": HTTPCache.build_cache_control(self.policy),
        }
        
        if self.policy.etag or self.policy.max_age:
            headers["ETag"] = self.etag
        
        if self.policy.last_modified:
            headers["Last-Modified"] = self.policy.last_modified.strftime(
                "%a, %d %b %Y %H:%M:%S GMT"
            )
        
        # Check conditional request
        if request_headers:
            if_none_match = request_headers.get("If-None-Match")
            if if_none_match and HTTPCache.check_etag_match(self.etag, if_none_match):
                headers["Status"] = "304 Not Modified"
        
        return headers

Conditional Requests

class ConditionalRequestHandler:
    """Handle conditional HTTP requests (ETag, If-Modified-Since)."""
    
    @staticmethod
    def handle_conditional(
        content: bytes,
        etag: str,
        if_none_match: str = None
    ) -> tuple[bytes, int]:
        """Handle ETag-based conditional request."""
        if if_none_match and etag == HTTPCache.parse_etag(if_none_match):
            return b"", 304  # Not Modified
        
        return content, 200
    
    @staticmethod
    def handle_last_modified(
        content: bytes,
        last_modified: datetime,
        if_modified_since: str = None
    ) -> tuple[bytes, int]:
        """Handle Last-Modified-based conditional request."""
        if if_modified_since:
            try:
                ims = datetime.strptime(
                    if_modified_since, "%a, %d %b %Y %H:%M:%S GMT"
                )
                if last_modified <= ims:
                    return b"", 304
            except ValueError:
                pass
        
        return content, 200

CDN Caching

CDN Integration

Content Delivery Networks cache content at edge locations worldwide, reducing latency for global users.

class CDNCache:
    """CDN caching utilities."""
    
    def __init__(self, cdn_provider: str = "cloudflare"):
        self.provider = cdn_provider
    
    def build_cdn_url(
        self,
        original_url: str,
        transformations: dict = None
    ) -> str:
        """Build CDN URL with optional transformations."""
        if self.provider == "cloudflare":
            return self._cloudflare_url(original_url, transformations)
        elif self.provider == "fastly":
            return self._fastly_url(original_url, transformations)
        return original_url
    
    def _cloudflare_url(
        self,
        url: str,
        transformations: dict = None
    ) -> str:
        """Build Cloudflare Image Resizing URL."""
        if not transformations:
            return url
        
        params = []
        if "width" in transformations:
            params.append(f"width={transformations['width']}")
        if "height" in transformations:
            params.append(f"height={transformations['height']}")
        if "fit" in transformations:
            params.append(f"fit={transformations['fit']}")
        if "quality" in transformations:
            params.append(f"quality={transformations['quality']}")
        
        if params:
            return f"{url}?{ '&'.join(params) }"
        return url
    
    def purge_cache(self, urls: list) -> dict:
        """Purge CDN cache for URLs."""
        # Implementation depends on CDN provider
        return {"purged": len(urls), "urls": urls}
    
    def get_cache_status(self, url: str) -> dict:
        """Check cache status for a URL."""
        return {
            "url": url,
            "cached": True,
            "cache_status": "HIT"
        }

Cache Invalidation Patterns

Invalidation Strategies

Cache invalidation is one of the hardest problems in caching. Several patterns help manage this complexity.

from enum import Enum
from typing import List, Callable

class InvalidationStrategy(Enum):
    TIME_BASED = "time_based"
    EVENT_BASED = "event_based"
    MANUAL = "manual"
    HYBRID = "hybrid"

class CacheInvalidator:
    """Cache invalidation manager."""
    
    def __init__(self, cache: RedisCache):
        self.cache = cache
        self.subscribers: List[Callable] = []
    
    def subscribe(self, callback: Callable) -> None:
        """Subscribe to invalidation events."""
        self.subscribers.append(callback)
    
    def invalidate(self, key: str) -> None:
        """Invalidate a cache key."""
        self.cache.delete(key)
        self._notify_subscribers("invalidate", key)
    
    def invalidate_pattern(self, pattern: str) -> int:
        """Invalidate all keys matching pattern."""
        count = self.cache.delete_pattern(pattern)
        self._notify_subscribers("invalidate_pattern", pattern)
        return count
    
    def invalidate_by_entity(self, entity_type: str, entity_id: str) -> None:
        """Invalidate all cache entries for an entity."""
        pattern = f"{entity_type}:{entity_id}*"
        self.invalidate_pattern(pattern)
    
    def _notify_subscribers(self, event: str, data: str) -> None:
        """Notify all subscribers of invalidation event."""
        for callback in self.subscribers:
            try:
                callback(event, data)
            except Exception:
                pass

class TimeBasedExpiration:
    """Time-based cache expiration."""
    
    def __init__(self, cache: RedisCache):
        self.cache = cache
    
    def set_with_soft_expire(
        self,
        key: str,
        value: Any,
        hard_ttl: int,
        soft_ttl: int = None
    ) -> None:
        """
        Set cache with soft expire.
        Hard TTL: absolute expiration
        Soft TTL: background refresh window
        """
        soft = soft_ttl or hard_ttl // 2
        
        # Store both hard and soft expire times
        data = {
            "value": value,
            "hard_expire": time.time() + hard_ttl,
            "soft_expire": time.time() + soft
        }
        self.cache.set(key, data, ttl=hard_ttl)
    
    def get_with_background_refresh(
        self,
        key: str,
        fetch_func: Callable,
        hard_ttl: int,
        soft_ttl: int = None
    ) -> Any:
        """Get value with background refresh on soft expire."""
        data = self.cache.get(key)
        
        if data is None:
            return self._fetch_and_cache(key, fetch_func, hard_ttl)
        
        now = time.time()
        
        # Check if hard expired
        if now > data["hard_expire"]:
            return self._fetch_and_cache(key, fetch_func, hard_ttl)
        
        # Check if soft expired - trigger background refresh
        if now > data["soft_expire"]:
            # Trigger async refresh (in production, use task queue)
            self._trigger_background_refresh(key, fetch_func, hard_ttl)
        
        return data["value"]
    
    def _fetch_and_cache(
        self,
        key: str,
        fetch_func: Callable,
        ttl: int
    ) -> Any:
        """Fetch value and cache it."""
        value = fetch_func()
        self.cache.set(key, {"value": value, "hard_expire": time.time() + ttl, "soft_expire": time.time() + ttl // 2}, ttl)
        return value
    
    def _trigger_background_refresh(
        self,
        key: str,
        fetch_func: Callable,
        ttl: int
    ) -> None:
        """Trigger background refresh (simplified)."""
        # In production, use Celery, RQ, or similar
        import threading
        thread = threading.Thread(
            target=self._fetch_and_cache,
            args=(key, fetch_func, ttl)
        )
        thread.start()

Event-Driven Invalidation

class EventDrivenCache:
    """Cache with event-driven invalidation."""
    
    def __init__(self, cache: RedisCache, event_bus):
        self.cache = cache
        self.event_bus = event_bus
        self._setup_subscriptions()
    
    def _setup_subscriptions(self):
        """Subscribe to domain events for cache invalidation."""
        self.event_bus.subscribe("user.updated", self._on_user_updated)
        self.event_bus.subscribe("product.updated", self._on_product_updated)
        self.event_bus.subscribe("order.completed", self._on_order_completed)
    
    def _on_user_updated(self, event: dict) -> None:
        """Invalidate user-related cache entries."""
        user_id = event["user_id"]
        self.cache.delete_pattern(f"user:{user_id}*")
        self.cache.delete_pattern(f"users:list*")
    
    def _on_product_updated(self, event: dict) -> None:
        """Invalidate product-related cache entries."""
        product_id = event["product_id"]
        self.cache.delete_pattern(f"product:{product_id}")
        self.cache.delete_pattern(f"products:list*")
    
    def _on_order_completed(self, event: dict) -> None:
        """Invalidate order-related cache entries."""
        user_id = event["user_id"]
        self.cache.delete_pattern(f"order:{event['order_id']}")
        self.cache.delete_pattern(f"user:{user_id}:orders*")

Conclusion

Effective caching requires strategy at multiple levels: application caching with Redis, HTTP caching with proper headers, and CDN caching for global content distribution. Cache invalidation remains challenging, but patterns like event-driven invalidation and time-based expiration help manage complexity.

Key principles: cache at the appropriate level, use proper cache headers, implement robust invalidation, and monitor cache effectiveness. Caching without strategy can lead to stale data and debugging nightmares.

Resources

  • “Caching Architecture Guide” on AWS
  • Redis Documentation
  • MDN HTTP Caching Guide
  • Cloudflare CDN Documentation

Comments