Skip to main content

Caching Strategies: Complete Guide to In-Memory, Redis, and CDN Caching

Published: March 13, 2026 Updated: May 8, 2026 Larry Qu 14 min read

Introduction

Caching is one of the most powerful techniques for improving application performance and scalability. When implemented correctly, caching can reduce database load by orders of magnitude, decrease response times from hundreds of milliseconds to microseconds, and dramatically improve user experience. However, caching introduces complexity, particularly around data consistency and cache invalidation.

This comprehensive guide covers caching strategies at every layer of the application stack - from browser caching through CDN edge servers, application-level caching with Redis and in-memory stores, down to database query caching. You’ll learn when to cache, what to cache, how to cache effectively, and most importantly, how to invalidate caches without causing data inconsistencies.

The key to effective caching is understanding the trade-offs. Caching improves read performance at the cost of increased infrastructure complexity and potential staleness. This guide will help you navigate these trade-offs and implement caching solutions that deliver maximum benefit with minimal risk.

Understanding Caching Layers

The Caching Hierarchy

Modern applications use multiple caching layers, each with different characteristics:

┌─────────────────────────────────────────────────────────────────────┐
│                         Caching Layers                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                       │
│  ┌─────────────────────────────────────────────────────────────────┐ │
│  │ 1. Browser Cache - CSS, JS, Images, API Responses              │ │
│  ├─────────────────────────────────────────────────────────────────┤ │
│  │ 2. CDN (Edge) - Static Assets, API Responses, HTML             │ │
│  ├─────────────────────────────────────────────────────────────────┤ │
│  │ 3. Application Cache - Redis, Memcached                        │ │
│  ├─────────────────────────────────────────────────────────────────┤ │
│  │ 4. In-Memory Cache - Local LRU, Process-level                  │ │
│  ├─────────────────────────────────────────────────────────────────┤ │
│  │ 5. Database Cache - Query Cache, Buffer Pool                  │ │
│  └─────────────────────────────────────────────────────────────────┘ │
│                                                                       │
│  Latency:  Browser (0ms) < CDN (5-20ms) < Memory (0.1ms) < DB (10ms+)│
│                                                                       │
└─────────────────────────────────────────────────────────────────────┘

Layer Characteristics

Layer Latency Capacity Persistence Use Case
Browser < 1ms Limited Session User-specific data
CDN 5-20ms Large TTL-based Static assets
Redis 0.1-1ms Medium-Large Optional Session, API cache
In-Memory < 0.1ms Small None Hot data
Database 10-100ms Large Yes Query results

In-Memory Caching

Python functools.lru_cache

The simplest form of caching uses Python’s built-in LRU (Least Recently Used) cache:

from functools import lru_cache
import time
from typing import Optional
import hashlib
import json


@lru_cache(maxsize=128)
def get_user(user_id: int) -> dict:
    """Cache user lookup with LRU strategy.
    
    Automatically evicts least recently used entries when maxsize is reached.
    """
    # Simulate database query
    time.sleep(0.01)  # Imagine this is a DB call
    return {
        'id': user_id,
        'name': f'User {user_id}',
        'email': f'user{user_id}@example.com'
    }


# Cache statistics for monitoring
cache_info = get_user.cache_info()
print(f"Hits: {cache_info.hits}, Misses: {cache_info.misses}")
print(f"Hit rate: {cache_info.hits / (cache_info.hits + cache_info.misses):.2%}")

# Clear cache manually if needed
get_user.cache_clear()


# Cache with TTL using custom implementation
from functools import wraps
from datetime import datetime, timedelta


def cached_ttl(ttl_seconds: int = 300):
    """Custom cache with time-to-live."""
    cache = {}
    timestamps = {}
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key from args
            key = str(args) + str(sorted(kwargs.items()))
            key_hash = hashlib.md5(key.encode()).hexdigest()
            
            now = datetime.now()
            
            # Check if cached and not expired
            if key_hash in cache:
                if now - timestamps[key_hash] < timedelta(seconds=ttl_seconds):
                    return cache[key_hash]
            
            # Compute and cache
            result = func(*args, **kwargs)
            cache[key_hash] = result
            timestamps[key_hash] = now
            
            return result
        
        wrapper.cache_clear = lambda: (cache.clear(), timestamps.clear())
        return wrapper
    return decorator


@cached_ttl(ttl_seconds=60)
def expensive_computation(n: int) -> int:
    """Example function with TTL-based caching."""
    return sum(i ** 2 for i in range(n))

Thread-Safe In-Memory Cache

For multi-threaded applications:

from threading import RLock
from collections import OrderedDict
from typing import Any, Optional
import time


class ThreadSafeLRUCache:
    """Thread-safe LRU cache with TTL support."""
    
    def __init__(self, maxsize: int = 128, default_ttl: int = 3600):
        self.maxsize = maxsize
        self.default_ttl = default_ttl
        self._cache = OrderedDict()
        self._timestamps = {}
        self._lock = RLock()
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache."""
        with self._lock:
            if key not in self._cache:
                return None
            
            # Check TTL
            if self._is_expired(key):
                self._remove(key)
                return None
            
            # Move to end (most recently used)
            self._cache.move_to_end(key)
            return self._cache[key]
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """Set value in cache."""
        with self._lock:
            ttl = ttl or self.default_ttl
            
            # Remove if exists
            if key in self._cache:
                self._remove(key)
            
            # Add new entry
            self._cache[key] = value
            self._timestamps[key] = time.time() + ttl
            
            # Evict if over capacity
            while len(self._cache) > self.maxsize:
                oldest = next(iter(self._cache))
                self._remove(oldest)
    
    def _is_expired(self, key: str) -> bool:
        """Check if entry is expired."""
        return time.time() > self._timestamps.get(key, 0)
    
    def _remove(self, key: str) -> None:
        """Remove entry from cache."""
        self._cache.pop(key, None)
        self._timestamps.pop(key, None)
    
    def clear(self) -> None:
        """Clear all cache entries."""
        with self._lock:
            self._cache.clear()
            self._timestamps.clear()
    
    def invalidate(self, key: str) -> None:
        """Invalidate specific key."""
        with self._lock:
            self._remove(key)


# Usage
cache = ThreadSafeLRUCache(maxsize=1000, default_ttl=300)
cache.set('user:123', {'name': 'John'})
user = cache.get('user:123')

Redis Caching Patterns

Redis Cache Implementation

import redis
import json
import logging
from typing import Any, Optional, Callable
from functools import wraps
import hashlib


logger = logging.getLogger(__name__)


class RedisCache:
    """Production-ready Redis cache wrapper."""
    
    def __init__(self, 
                 host: str = 'localhost', 
                 port: int = 6379, 
                 db: int = 0,
                 password: Optional[str] = None,
                 decode_responses: bool = True):
        self.client = redis.Redis(
            host=host,
            port=port,
            db=db,
            password=password,
            decode_responses=decode_responses,
            socket_connect_timeout=5,
            socket_timeout=5,
            retry_on_timeout=True
        )
        
        # Connection pool for better performance
        self.pool = redis.ConnectionPool(
            host=host, port=port, db=db,
            max_connections=50, decode_responses=True
        )
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache."""
        try:
            data = self.client.get(key)
            if data:
                return json.loads(data)
            return None
        except redis.RedisError as e:
            logger.warning(f"Cache get error: {e}")
            return None
    
    def set(self, 
             key: str, 
             value: Any, 
             ttl: int = 3600,
             nx: bool = False) -> bool:
        """Set value in cache with optional TTL."""
        try:
            serialized = json.dumps(value, default=str)
            if nx:
                return self.client.set(key, serialized, ex=ttl, nx=True)
            return self.client.set(key, serialized, ex=ttl)
        except (redis.RedisError, TypeError) as e:
            logger.warning(f"Cache set error: {e}")
            return False
    
    def get_many(self, keys: list) -> dict:
        """Get multiple values at once."""
        try:
            values = self.client.mget(keys)
            return {
                key: json.loads(val) if val else None 
                for key, val in zip(keys, values)
            }
        except redis.RedisError as e:
            logger.warning(f"Cache get_many error: {e}")
            return {key: None for key in keys}
    
    def set_many(self, mapping: dict, ttl: int = 3600) -> bool:
        """Set multiple values at once."""
        try:
            pipe = self.client.pipeline()
            for key, value in mapping.items():
                pipe.set(key, json.dumps(value, default=str), ex=ttl)
            pipe.execute()
            return True
        except redis.RedisError as e:
            logger.warning(f"Cache set_many error: {e}")
            return False
    
    def invalidate(self, key: str) -> bool:
        """Delete a key from cache."""
        try:
            return self.client.delete(key) > 0
        except redis.RedisError as e:
            logger.warning(f"Cache invalidate error: {e}")
            return False
    
    def invalidate_pattern(self, pattern: str) -> int:
        """Delete all keys matching pattern."""
        try:
            keys = self.client.keys(pattern)
            if keys:
                return self.client.delete(*keys)
            return 0
        except redis.RedisError as e:
            logger.warning(f"Cache invalidate_pattern error: {e}")
            return 0
    
    def increment(self, key: str, amount: int = 1) -> Optional[int]:
        """Increment a counter."""
        try:
            return self.client.incrby(key, amount)
        except redis.RedisError as e:
            logger.warning(f"Cache increment error: {e}")
            return None
    
    def exists(self, key: str) -> bool:
        """Check if key exists."""
        try:
            return self.client.exists(key) > 0
        except redis.RedisError as e:
            logger.warning(f"Cache exists error: {e}")
            return False


# Decorator for function caching
def redis_cache(cache: RedisCache, ttl: int = 300, key_prefix: str = ''):
    """Decorator to cache function results."""
    def decorator(func: Callable):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Generate cache key
            key_parts = [key_prefix, func.__name__, str(args), str(sorted(kwargs.items()))]
            key = hashlib.md5(':'.join(key_parts).encode()).hexdigest()
            
            # Try to get from cache
            cached = cache.get(key)
            if cached is not None:
                return cached
            
            # Compute and cache
            result = func(*args, **kwargs)
            cache.set(key, result, ttl)
            
            return result
        
        wrapper.cache_clear = lambda: cache.invalidate_pattern(f"*{func.__name__}*")
        return wrapper
    return decorator


# Usage
redis_cache_client = RedisCache(host='localhost', port=6379)

@redis_cache(redis_cache_client, ttl=600, key_prefix='api')
def fetch_user_data(user_id: int) -> dict:
    """Example function with Redis caching."""
    # This would be a database call
    return {'id': user_id, 'data': 'expensive computation'}

Cache-Aside Pattern

def get_user(cache: RedisCache, user_id: int) -> Optional[dict]:
    """Cache-aside pattern implementation."""
    cache_key = f"user:{user_id}"
    
    # 1. Check cache first
    cached_user = cache.get(cache_key)
    if cached_user:
        logger.info(f"Cache hit for user {user_id}")
        return cached_user
    
    # 2. Cache miss - fetch from database
    logger.info(f"Cache miss for user {user_id}")
    user = database.fetch_user(user_id)
    
    if user:
        # 3. Store in cache for next time
        cache.set(cache_key, user, ttl=3600)
    
    return user


def update_user(cache: RedisCache, user_id: int, data: dict) -> bool:
    """Update user with proper cache invalidation."""
    # 1. Update database first
    success = database.update_user(user_id, data)
    
    if success:
        # 2. Invalidate cache
        cache.invalidate(f"user:{user_id}")
    
    return success

Write-Through and Write-Behind

class WriteThroughCache:
    """Write-through cache - write to both cache and DB."""
    
    def __init__(self, cache: RedisCache, db):
        self.cache = cache
        self.db = db
    
    def write(self, key: str, value: dict) -> bool:
        # Write to database first
        self.db.save(key, value)
        
        # Then write to cache
        return self.cache.set(key, value)


class WriteBehindCache:
    """Write-behind cache - async DB writes."""
    
    def __init__(self, cache: RedisCache, db, queue):
        self.cache = cache
        self.db = db
        self.queue = queue
    
    def write(self, key: str, value: dict) -> bool:
        # Write to cache immediately
        self.cache.set(key, value)
        
        # Queue for async DB write
        self.queue.put(('write', key, value))
        
        return True
    
    def process_queue(self):
        """Process queued writes."""
        while not self.queue.empty():
            operation, key, value = self.queue.get()
            if operation == 'write':
                self.db.save(key, value)

Redis Caching Patterns: Rate Limiting

Rate Limiter

class RateLimiter:
    """Rate limiter using Redis."""

    def __init__(self, cache: RedisCache):
        self.cache = cache

    def is_allowed(
        self,
        identifier: str,
        max_requests: int,
        window_seconds: int
    ) -> tuple[bool, int]:
        """
        Check if request is allowed.
        Returns (is_allowed, remaining_requests).
        """
        key = f"ratelimit:{identifier}"

        pipe = self.cache.client.pipeline()
        pipe.incr(key)
        pipe.expire(key, window_seconds)
        results = pipe.execute()

        current = results[0]
        remaining = max(0, max_requests - current)

        return current <= max_requests, remaining

CDN Caching

Cache Headers Deep Dive

from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timedelta


@dataclass
class CacheConfig:
    """Cache configuration for CDN responses."""
    
    # Cache-Control directives
    public: bool = True
    max_age: int = 3600  # Client cache time
    s_maxage: int = 86400  # Shared cache (CDN) time
    stale_while_revalidate: int = 60
    stale_if_error: int = 86400
    
    # Other headers
    etag: Optional[str] = None
    vary: tuple = ('Accept-Encoding',)
    
    def to_headers(self) -> dict:
        """Generate cache headers."""
        directives = []
        
        if self.public:
            directives.append('public')
        else:
            directives.append('private')
        
        directives.append(f'max-age={self.max_age}')
        directives.append(f's-maxage={self.s_maxage}')
        directives.append(f'stale-while-revalidate={self.stale_while_revalidate}')
        directives.append(f'stale-if-error={self.stale_if_error}')
        
        headers = {
            'Cache-Control': ', '.join(directives),
            'Vary': ', '.join(self.vary)
        }
        
        if self.etag:
            headers['ETag'] = self.etag
            
        return headers


def generate_etag(content: str) -> str:
    """Generate ETag from content."""
    import hashlib
    return f'"{hashlib.md5(content.encode()).hexdigest()}"'

### HTTP Caching Utilities

```python
from dataclasses import dataclass
from datetime import datetime


@dataclass
class CachePolicy:
    """Cache policy configuration."""
    max_age: int = None
    s_maxage: int = None
    no_cache: bool = False
    no_store: bool = False
    private: bool = False
    must_revalidate: bool = False
    stale_while_revalidate: int = None
    etag: str = None
    last_modified: datetime = None


class HTTPCache:
    """HTTP caching utilities."""

    @staticmethod
    def build_cache_control(policy: CachePolicy) -> str:
        """Build Cache-Control header value."""
        directives = []

        if policy.max_age is not None:
            directives.append(f"max-age={policy.max_age}")
        if policy.s_maxage is not None:
            directives.append(f"s-maxage={policy.s_maxage}")
        if policy.no_cache:
            directives.append("no-cache")
        if policy.no_store:
            directives.append("no-store")
        if policy.private:
            directives.append("private")
        if policy.must_revalidate:
            directives.append("must-revalidate")
        if policy.stale_while_revalidate is not None:
            directives.append(f"stale-while-revalidate={policy.stale_while_revalidate}")

        return ", ".join(directives)

    @staticmethod
    def generate_etag(content: str) -> str:
        """Generate ETag from content."""
        return f'"{hashlib.md5(content.encode()).hexdigest()}"'

    @staticmethod
    def check_etag_match(etag: str, if_none_match: str) -> bool:
        """Check if ETag matches If-None-Match header."""
        return etag == if_none_match.strip('"')


class CacheableResponse:
    """Wrapper for cacheable HTTP responses."""

    def __init__(self, content: bytes, content_type: str = "application/json", policy: CachePolicy = None):
        self.content = content
        self.content_type = content_type
        self.policy = policy or CachePolicy()
        self._etag = None

    @property
    def etag(self) -> str:
        if self._etag is None:
            self._etag = HTTPCache.generate_etag(self.content.decode())
        return self._etag

    def to_headers(self, request_headers: dict = None) -> dict:
        """Convert to HTTP response headers."""
        headers = {
            "Content-Type": self.content_type,
            "Cache-Control": HTTPCache.build_cache_control(self.policy),
        }

        if self.policy.etag or self.policy.max_age:
            headers["ETag"] = self.etag

        if self.policy.last_modified:
            headers["Last-Modified"] = self.policy.last_modified.strftime("%a, %d %b %Y %H:%M:%S GMT")

        if request_headers:
            if_none_match = request_headers.get("If-None-Match")
            if if_none_match and HTTPCache.check_etag_match(self.etag, if_none_match):
                headers["Status"] = "304 Not Modified"

        return headers


class ConditionalRequestHandler:
    """Handle conditional HTTP requests (ETag, If-Modified-Since)."""

    @staticmethod
    def handle_conditional(content: bytes, etag: str, if_none_match: str = None) -> tuple[bytes, int]:
        """Handle ETag-based conditional request."""
        if if_none_match and etag == if_none_match.strip('"'):
            return b"", 304
        return content, 200

    @staticmethod
    def handle_last_modified(content: bytes, last_modified: datetime, if_modified_since: str = None) -> tuple[bytes, int]:
        """Handle Last-Modified-based conditional request."""
        if if_modified_since:
            try:
                ims = datetime.strptime(if_modified_since, "%a, %d %b %Y %H:%M:%S GMT")
                if last_modified <= ims:
                    return b"", 304
            except ValueError:
                pass
        return content, 200
```text

### CDN Integration Examples

```python
import hashlib
from typing import Optional
import requests


class CloudflareCDN:
    """Cloudflare CDN cache management."""
    
    def __init__(self, zone_id: str, api_token: str):
        self.zone_id = zone_id
        self.api_token = api_token
        self.base_url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}"
        self.headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
    def purge_cache(self, paths: Optional[list] = None) -> bool:
        """Purge CDN cache."""
        if paths:
            # Purge specific paths
            data = {'files': paths}
        else:
            # Purge everything
            data = {'purge_everything': True}
        
        response = requests.post(
            f"{self.base_url}/purge_cache",
            headers=self.headers,
            json=data
        )
        
        return response.json().get('success', False)
    
    def set_cache_rule(self, url_pattern: str, ttl: int) -> bool:
        """Set custom cache rule."""
        # Using Cloudflare Page Rules or Cache Rules
        rules = {
            'rules': [{
                'actions': [
                    {'id': 'cache_level', 'value': 'cache_everything'},
                    {'id': 'edge_cache_ttl', 'value': ttl}
                ],
                'condition': {
                    'request': {'url': {'operator': 'matches', 'value': url_pattern}}
                }
            }]
        }
        
        response = requests.put(
            f"{self.base_url}/policies/filter",
            headers=self.headers,
            json=rules
        )
        
        return response.json().get('success', False)


class AWSCloudFront:
    """AWS CloudFront cache management."""
    
    def __init__(self, distribution_id: str, aws_access_key: str, aws_secret_key: str):
        self.distribution_id = distribution_id
        # Would use boto3 in production
        self.client = None  # boto3.client('cloudfront')
    
    def create_invalidation(self, paths: list) -> str:
        """Create CloudFront invalidation."""
        # return self.client.create_invalidation(
        #     DistributionId=self.distribution_id,
        #     InvalidationBatch={
        #         'Paths': {'Quantity': len(paths), 'Items': paths},
        #         'CallerReference': f'invalidation-{datetime.now().timestamp()}'
        #     }
        # )['Invalidation']['Id']
        return 'mock-invalidation-id'
```text

## Cache Invalidation Strategies

### Comparison of Strategies

| Strategy | Description | Pros | Cons | Use Case |
|----------|-------------|------|------|----------|
| TTL | Time-based expiration | Simple | Stale data possible | Static content |
| Write-through | Sync write to cache and DB | Always consistent | Higher write latency | Critical data |
| Write-behind | Async write to DB | Fast writes | Risk of data loss | High-volume writes |
| Write-around | Write to DB, invalidate cache | Simple | Cache miss on read | Rarely-read data |
| Event-based | Invalidate on data change | Responsive | Complex setup | Real-time apps |

### Event-Driven Invalidation

```python
import asyncio
from typing import Callable
import logging


class EventCacheInvalidator:
    """Event-driven cache invalidation."""
    
    def __init__(self, cache: RedisCache):
        self.cache = cache
        self.subscribers = {}
    
    def subscribe(self, event_type: str, callback: Callable):
        """Subscribe to cache invalidation events."""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(callback)
    
    async def publish(self, event_type: str, data: dict):
        """Publish cache invalidation event."""
        # Invalidate cache
        if 'key' in data:
            self.cache.invalidate(data['key'])
        elif 'pattern' in data:
            self.cache.invalidate_pattern(data['pattern'])
        
        # Notify subscribers
        if event_type in self.subscribers:
            for callback in self.subscribers[event_type]:
                await callback(data)
    
    def invalidate_user(self, user_id: int):
        """Invalidate all user-related cache."""
        patterns = [
            f"user:{user_id}",
            f"user:{user_id}:*",
            "users:list:*"
        ]
        
        for pattern in patterns:
            self.cache.invalidate_pattern(pattern)


# Usage
invalidator = EventCacheInvalidator(redis_cache_client)

# Subscribe to user update events
async def on_user_update(data):
    logging.info(f"User {data.get('user_id')} updated, cache invalidated")

invalidator.subscribe('user_updated', on_user_update)

# Trigger invalidation
await invalidator.publish('user_updated', {'user_id': 123})
```text

### Time-Based Expiration with Background Refresh

```python
import threading
import time
from typing import Any, Callable


class TimeBasedExpiration:
    """Time-based cache expiration with soft/hard TTL support."""

    def __init__(self, cache: RedisCache):
        self.cache = cache

    def set_with_soft_expire(self, key: str, value: Any, hard_ttl: int, soft_ttl: int = None) -> None:
        """Set cache with soft expire. Hard TTL: absolute expiration. Soft TTL: background refresh window."""
        soft = soft_ttl or hard_ttl // 2
        data = {
            "value": value,
            "hard_expire": time.time() + hard_ttl,
            "soft_expire": time.time() + soft
        }
        self.cache.set(key, data, ttl=hard_ttl)

    def get_with_background_refresh(self, key: str, fetch_func: Callable, hard_ttl: int, soft_ttl: int = None) -> Any:
        """Get value with background refresh on soft expire."""
        data = self.cache.get(key)

        if data is None:
            return self._fetch_and_cache(key, fetch_func, hard_ttl)

        now = time.time()

        if now > data["hard_expire"]:
            return self._fetch_and_cache(key, fetch_func, hard_ttl)

        if now > data["soft_expire"]:
            thread = threading.Thread(target=self._fetch_and_cache, args=(key, fetch_func, hard_ttl))
            thread.start()

        return data["value"]

    def _fetch_and_cache(self, key: str, fetch_func: Callable, ttl: int) -> Any:
        value = fetch_func()
        self.cache.set(key, {"value": value, "hard_expire": time.time() + ttl, "soft_expire": time.time() + ttl // 2}, ttl)
        return value
```text

## Monitoring and Optimization

### Cache Metrics

```python
import time
from dataclasses import dataclass
from typing import Dict


@dataclass
class CacheMetrics:
    """Cache performance metrics."""
    hits: int = 0
    misses: int = 0
    errors: int = 0
    invalidations: int = 0
    
    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0


class MonitoredCache(RedisCache):
    """Cache with metrics tracking."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metrics = CacheMetrics()
    
    def get(self, key: str):
        start = time.time()
        result = super().get(key)
        elapsed = time.time() - start
        
        if result is not None:
            self.metrics.hits += 1
        else:
            self.metrics.misses += 1
        
        # Log slow cache reads
        if elapsed > 0.1:  # > 100ms
            logging.warning(f"Slow cache read: {elapsed:.3f}s for {key}")
        
        return result
    
    def invalidate(self, key: str) -> bool:
        self.metrics.invalidations += 1
        return super().invalidate(key)
    
    def get_stats(self) -> Dict:
        """Get cache statistics."""
        return {
            'hits': self.metrics.hits,
            'misses': self.metrics.misses,
            'hit_rate': f"{self.metrics.hit_rate:.2%}",
            'invalidations': self.metrics.invalidations
        }
```text

## Best Practices

| Practice | Implementation |
|----------|----------------|
| Cache at multiple levels | Browser → CDN → Redis → In-memory |
| Use appropriate TTL | Short for dynamic, long for static |
| Monitor hit rates | Target 90%+ for hot data |
| Handle failures gracefully | Failover to source |
| Invalidate carefully | Use patterns, events |
| Consider data freshness | Balance performance vs consistency |
| Pre-warm cache | Load critical data at startup |
| Use compression | Reduce network transfer |

## Conclusion

Caching is essential for building high-performance, scalable applications. The key to successful caching lies in understanding your data access patterns, choosing appropriate strategies for each layer, and implementing robust invalidation mechanisms.

Key takeaways:

1. **Layer your caching** - Use multiple levels for optimal performance
2. **Choose the right strategy** - TTL, write-through, or write-behind based on use case
3. **Monitor continuously** - Track hit rates, latencies, and cache size
4. **Plan for invalidation** - Know how you'll handle cache updates before they happen
5. **Handle failures gracefully** - Caches should fail silently to primary data sources
6. **Pre-warm critical paths** - Load hot data at application startup

By implementing the strategies and patterns in this guide, you'll dramatically improve your application's performance and scalability while minimizing the complexity and risks associated with caching.

## Resources

- [Redis Documentation](https://redis.io/docs/)
- [MDN HTTP Caching](https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching)
- [Cloudflare Cache Guide](https://developers.cloudflare.com/cache/)
- [AWS ElastiCache Best Practices](https://docs.aws.amazon.com/AmazonElastiCache/latest/redug/)
- [Caching Patterns](https://cacheinvalidationpatterns.com/)

Comments

👍 Was this article helpful?