Skip to main content
โšก Calmops

Redis Real-World Use Cases: Caching, Sessions, Pub/Sub, and More

Introduction

Redis excels in production environments across numerous use cases. Its in-memory architecture, rich data structures, and built-in replication make it ideal for solving common distributed system challenges. This article explores practical implementations with production-ready code patterns.


1. Web Caching

Caching is Redis’s most common use case. By storing frequently accessed data in Redis, you dramatically reduce database load and improve response times.

Page Caching

import redis
import json
import hashlib
from datetime import timedelta

class RedisCache:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def _make_key(self, prefix, identifier):
        """Generate cache key"""
        return f"cache:{prefix}:{identifier}"
    
    def get_page(self, url):
        """Retrieve cached page"""
        key = self._make_key("page", url)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None
    
    def set_page(self, url, content, ttl=3600):
        """Cache page with TTL"""
        key = self._make_key("page", url)
        self.redis.setex(key, ttl, json.dumps(content))
    
    def invalidate_page(self, url):
        """Invalidate cached page"""
        key = self._make_key("page", url)
        self.redis.delete(key)
    
    def invalidate_pattern(self, pattern):
        """Invalidate pages matching pattern"""
        cursor = 0
        while True:
            cursor, keys = self.redis.scan(cursor, match=f"cache:page:{pattern}*", count=100)
            if keys:
                self.redis.delete(*keys)
            if cursor == 0:
                break

Django/Flask Integration

# Django with Redis cache
from django.core.cache import cache
from django.views import View
from django.http import JsonResponse

class ProductListView(View):
    def get(self, request):
        category = request.GET.get('category', 'all')
        
        # Try cache first
        cache_key = f"products:list:{category}"
        products = cache.get(cache_key)
        
        if products is None:
            # Fetch from database
            products = self.fetch_products_from_db(category)
            # Cache for 15 minutes
            cache.set(cache_key, products, 60 * 15)
        
        return JsonResponse({'products': products})
    
    def fetch_products_from_db(self, category):
        # Database query logic
        pass

Cache-Aside Pattern

def get_user_profile(user_id):
    """
    Cache-Aside Pattern:
    1. Check cache
    2. If miss, load from database
    3. Populate cache
    """
    cache_key = f"user:profile:{user_id}"
    
    # Step 1: Check cache
    profile = redis.get(cache_key)
    if profile:
        return json.loads(profile)
    
    # Step 2: Cache miss - fetch from DB
    profile = database.query("SELECT * FROM users WHERE id = ?", user_id)
    
    if profile:
        # Step 3: Populate cache
        redis.setex(cache_key, 3600, json.dumps(profile))
    
    return profile

2. Session Storage

Redis provides an excellent solution for distributed session management, replacing sticky sessions and enabling horizontal scaling.

Session Management Implementation

import uuid
import json
from datetime import timedelta

class RedisSession:
    def __init__(self, redis_client, ttl=3600):
        self.redis = redis_client
        self.ttl = ttl
        self.session_prefix = "session:"
    
    def create_session(self, user_id, metadata=None):
        """Create new session"""
        session_id = str(uuid.uuid4())
        session_key = f"{self.session_prefix}{session_id}"
        
        session_data = {
            'user_id': user_id,
            'created_at': datetime.utcnow().isoformat(),
            'metadata': metadata or {}
        }
        
        self.redis.setex(session_key, self.ttl, json.dumps(session_data))
        return session_id
    
    def get_session(self, session_id):
        """Retrieve session data"""
        session_key = f"{self.session_prefix}{session_id}"
        data = self.redis.get(session_key)
        return json.loads(data) if data else None
    
    def update_session(self, session_id, data):
        """Update session data"""
        session_key = f"{self.session_prefix}{session_id}"
        current = self.get_session(session_id)
        if current:
            current.update(data)
            self.redis.setex(session_key, self.ttl, json.dumps(current))
    
    def refresh_session(self, session_id):
        """Extend session TTL"""
        session_key = f"{self.session_prefix}{session_id}"
        if self.redis.exists(session_key):
            self.redis.expire(session_key, self.ttl)
    
    def destroy_session(self, session_id):
        """Delete session (logout)"""
        session_key = f"{self.session_prefix}{session_id}"
        self.redis.delete(session_key)

Flask-Session Integration

from flask import Flask, session
from flask_session import Session

app = Flask(__name__)
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_PERMANENT'] = False
app.config['SESSION_USE_SIGNER'] = True
app.config['SESSION_KEY_PREFIX'] = 'flask_session:'
app.config['SESSION_REDIS'] = redis.Redis(host='localhost', port=6379, db=1)

Session(app)

@app.route('/login', methods=['POST'])
def login():
    user_id = authenticate_user(request.json)
    session['user_id'] = user_id
    session['logged_in'] = True
    return {'status': 'success'}

@app.route('/logout')
def logout():
    session.clear()
    return {'status': 'logged out'}

3. Message Queues and Pub/Sub

Redis supports two messaging patterns: classic Pub/Sub for broadcasting and Streams for durable message queues.

Pub/Sub Implementation

import threading
import time

class RedisPubSub:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def publish(self, channel, message):
        """Publish message to channel"""
        return self.redis.publish(channel, json.dumps(message))
    
    def subscribe(self, channel, callback):
        """Subscribe to channel (blocking)"""
        pubsub = self.redis.pubsub()
        pubsub.subscribe(channel)
        
        for message in pubsub.listen():
            if message['type'] == 'message':
                data = json.loads(message['data'])
                callback(data)
    
    def pattern_subscribe(self, pattern, callback):
        """Subscribe to pattern (e.g., notifications:*)"""
        pubsub = self.redis.pubsub()
        pubsub.psubscribe(pattern)
        
        for message in pubsub.listen():
            if message['type'] == 'pmessage':
                data = json.loads(message['data'])
                callback(message['channel'], data)


# Real-time notification system
def notification_handler(data):
    print(f"Received notification: {data['title']}")

# Publisher
def send_notification(user_id, notification_type, content):
    pubsub = RedisPubSub(redis_client)
    pubsub.publish(f"user:{user_id}:notifications", {
        'type': notification_type,
        'title': content['title'],
        'body': content['body'],
        'timestamp': time.time()
    })

# Subscriber (run in background)
subscriber = RedisPubSub(redis_client)
threading.Thread(
    target=subscriber.subscribe,
    args=("notifications:*", notification_handler),
    daemon=True
).start()

Redis Streams (Durable Queue)

class RedisStreamQueue:
    def __init__(self, redis_client, stream_name):
        self.redis = redis_client
        self.stream = stream_name
        self.consumer_group = f"{stream_name}:group"
    
    def setup(self):
        """Create consumer group if not exists"""
        try:
            self.redis.xgroup_create(
                self.stream, 
                self.consumer_group, 
                id='0', 
                mkstream=True
            )
        except redis.exceptions.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise
    
    def add_message(self, data):
        """Add message to stream"""
        message_id = self.redis.xadd(self.stream, data)
        return message_id
    
    def consume(self, consumer_name, count=1, block=5000):
        """Consume messages (blocking)"""
        messages = self.redis.xreadgroup(
            self.consumer_group,
            consumer_name,
            {self.stream: '>'},
            count=count,
            block=block
        )
        return messages
    
    def acknowledge(self, message_id):
        """Mark message as processed"""
        return self.redis.xack(self.stream, self.consumer_group, message_id)


# Worker implementation
def process_message(message):
    # Process the message
    print(f"Processing: {message}")
    
queue = RedisStreamQueue(redis_client, "orders:queue")
queue.setup()

while True:
    messages = queue.consume("worker-1", count=5)
    for stream, entries in messages:
        for msg_id, data in entries:
            try:
                process_message(data)
                queue.acknowledge(msg_id)
            except Exception as e:
                print(f"Error processing: {e}")

4. Rate Limiting

Implement distributed rate limiting using Redis atomic operations.

Sliding Window Rate Limiter

import time

class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def is_allowed(self, key, limit, window):
        """
        Sliding window rate limiter
        Returns: (allowed: bool, remaining: int)
        """
        now = time.time()
        window_start = now - window
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)
        
        # Count current requests
        pipe.zcard(key)
        
        # Add current request
        pipe.zadd(key, {str(now): now})
        
        # Set TTL
        pipe.expire(key, int(window))
        
        results = pipe.execute()
        current_count = results[1]
        
        if current_count < limit:
            return True, limit - current_count - 1
        return False, 0
    
    def allow_request(self, identifier, limit, period):
        """Simple endpoint rate limiting"""
        key = f"rate_limit:{identifier}"
        return self.is_allowed(key, limit, period)


# Usage as middleware
def rate_limit_middleware(limiter, limit=100, period=60):
    def decorator(func):
        def wrapper(*args, **kwargs):
            client_id = get_client_id()  # Extract from request
            allowed, remaining = limiter.allow_request(client_id, limit, period)
            
            if not allowed:
                return {
                    'error': 'Rate limit exceeded',
                    'retry_after': period
                }, 429
            
            # Add rate limit headers
            response = func(*args, **kwargs)
            response.headers['X-RateLimit-Limit'] = str(limit)
            response.headers['X-RateLimit-Remaining'] = str(remaining)
            return response
        return wrapper
    return decorator

Fixed Window Counter

class FixedWindowLimiter:
    """Simpler but slightly less accurate"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def is_allowed(self, key, limit, window_seconds):
        """Fixed window approach"""
        window_key = f"fw:{key}:{int(time.time() // window_seconds)}"
        
        pipe = self.redis.pipeline()
        pipe.incr(window_key)
        pipe.expire(window_key, window_seconds)
        results = pipe.execute()
        
        current_count = results[0]
        return current_count <= limit

5. Distributed Locks

Implement distributed locking for critical sections in distributed systems.

Redis Lock Implementation

import time
import uuid

class RedisLock:
    def __init__(self, redis_client, lock_name, timeout=10):
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"
        self.timeout = timeout
        self.token = str(uuid.uuid4())
        self.acquired = False
    
    def acquire(self, blocking=True, blocking_timeout=30):
        """Acquire lock"""
        start_time = time.time()
        
        while True:
            # Try to set lock with NX (only if not exists)
            acquired = self.redis.set(
                self.lock_name,
                self.token,
                nx=True,
                ex=self.timeout
            )
            
            if acquired:
                self.acquired = True
                return True
            
            if not blocking:
                return False
            
            if time.time() - start_time >= blocking_timeout:
                return False
            
            time.sleep(0.01)  # Brief pause
    
    def release(self):
        """Release lock (only if we own it)"""
        if not self.acquired:
            return False
        
        # Lua script for atomic check-and-delete
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        result = self.redis.eval(lua_script, 1, self.lock_name, self.token)
        self.acquired = False
        return result
    
    def __enter__(self):
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()


# Usage
def process_payment(order_id):
    lock = RedisLock(redis_client, f"payment:{order_id}")
    
    if lock.acquire(blocking_timeout=5):
        try:
            # Critical section
            order = get_order(order_id)
            if order.status == 'pending':
                order.status = 'processing'
                save_order(order)
                call_payment_gateway(order)
        finally:
            lock.release()
    else:
        raise Exception("Could not acquire lock - order being processed")

6. Real-Time Analytics

Counter and Analytics

class RedisAnalytics:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def track_event(self, event_name, user_id, metadata=None):
        """Track user event"""
        now = time.time()
        
        # Daily counters
        today = time.strftime("%Y-%m-%d")
        
        pipe = self.redis.pipeline()
        
        # Increment event counter
        pipe.hincrby(f"analytics:events:{today}", event_name, 1)
        
        # Track unique users
        pipe.sadd(f"analytics:users:{today}", user_id)
        
        # Store event in stream for later analysis
        pipe.xadd("events:stream", {
            'event': event_name,
            'user_id': user_id,
            'timestamp': str(now),
            'metadata': json.dumps(metadata or {})
        })
        
        pipe.execute()
    
    def get_daily_stats(self, date=None):
        """Get analytics for specific date"""
        date = date or time.strftime("%Y-%m-%d")
        
        events = self.redis.hgetall(f"analytics:events:{date}")
        unique_users = self.redis.scard(f"analytics:users:{date}")
        
        return {
            'date': date,
            'total_events': sum(int(v) for v in events.values()),
            'unique_users': unique_users,
            'events': events
        }

Leaderboard Implementation

class Leaderboard:
    def __init__(self, redis_client, leaderboard_name):
        self.redis = redis_client
        self.key = f"leaderboard:{leaderboard_name}"
    
    def set_score(self, member, score):
        """Set or update score"""
        return self.redis.zadd(self.key, {member: score})
    
    def increment(self, member, increment):
        """Increment score"""
        return self.redis.zincrby(self.key, increment, member)
    
    def get_rank(self, member, reverse=False):
        """Get rank (0 = lowest by default)"""
        if reverse:
            return self.redis.zrevrank(self.key, member)
        return self.redis.zrank(self.key, member)
    
    def get_top(self, count=10, with_scores=True):
        """Get top players"""
        return self.redis.zrevrange(self.key, 0, count - 1, withscores=with_scores)
    
    def get_around_me(self, member, count=5):
        """Get players around a specific member"""
        rank = self.redis.zrevrank(self.key, member)
        if rank is None:
            return []
        
        start = max(0, rank - count)
        end = rank + count
        
        return self.redis.zrevrange(self.key, start, end, withscores=True)
    
    def get_percentile(self, member):
        """Get percentile rank"""
        rank = self.redis.zrevrank(self.key, member)
        if rank is None:
            return None
        
        total = self.redis.zcard(self.key)
        return (rank / total) * 100 if total > 0 else 0


# Usage
leaderboard = Leaderboard(redis_client, "game_season_1")

# Update scores
leaderboard.set_score("player_123", 1500)
leaderboard.increment("player_456", 100)

# Get rankings
top_10 = leaderboard.get_top(10)
my_rank = leaderboard.get_rank("player_123", reverse=True)
my_percentile = leaderboard.get_percentile("player_123")

7. Configuration and Feature Flags

class FeatureFlags:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.prefix = "feature:"
    
    def enable(self, flag_name):
        """Enable feature flag"""
        self.redis.set(f"{self.prefix}{flag_name}", "1")
    
    def disable(self, flag_name):
        """Disable feature flag"""
        self.redis.delete(f"{self.prefix}{flag_name}")
    
    def is_enabled(self, flag_name):
        """Check if feature is enabled"""
        return self.redis.get(f"{self.prefix}{flag_name}") == "1"
    
    def enable_for_user(self, flag_name, user_id):
        """Enable for specific user"""
        self.redis.sadd(f"{self.prefix}{flag_name}:users", user_id)
    
    def is_enabled_for_user(self, flag_name, user_id):
        """Check if enabled for user (or globally)"""
        if self.is_enabled(flag_name):
            return True
        return self.redis.sismember(f"{self.prefix}{flag_name}:users", user_id)


# Usage
flags = FeatureFlags(redis_client)

if flags.is_enabled("new_checkout_flow"):
    return NewCheckoutView().render()
else:
    return LegacyCheckoutView().render()

Best Practices Summary

Use Case Redis Data Type TTL Recommendation
Page Cache String 5-60 minutes
Session Hash/String 24 hours
Rate Limit Sorted Set/String 1 minute-1 hour
Leaderboard Sorted Set Long-term
Pub/Sub Stream/Pub-Sub N/A (streams)
Locks String (SET NX) 10-30 seconds

Conclusion

Redis’s versatility makes it an essential tool in modern application architecture. From simple caching to complex distributed systems patterns, Redis provides the building blocks for high-performance applications.

In the next article, we’ll explore Redis’s evolution in 2025-2026, including new features like vector search and Redis Stack capabilities that make it relevant for AI applications.

Comments