Introduction
Redis excels in production environments across numerous use cases. Its in-memory architecture, rich data structures, and built-in replication make it ideal for solving common distributed system challenges. This article explores practical implementations with production-ready code patterns.
1. Web Caching
Caching is Redis’s most common use case. By storing frequently accessed data in Redis, you dramatically reduce database load and improve response times.
Page Caching
import redis
import json
import hashlib
from datetime import timedelta
class RedisCache:
def __init__(self, redis_client):
self.redis = redis_client
def _make_key(self, prefix, identifier):
"""Generate cache key"""
return f"cache:{prefix}:{identifier}"
def get_page(self, url):
"""Retrieve cached page"""
key = self._make_key("page", url)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set_page(self, url, content, ttl=3600):
"""Cache page with TTL"""
key = self._make_key("page", url)
self.redis.setex(key, ttl, json.dumps(content))
def invalidate_page(self, url):
"""Invalidate cached page"""
key = self._make_key("page", url)
self.redis.delete(key)
def invalidate_pattern(self, pattern):
"""Invalidate pages matching pattern"""
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match=f"cache:page:{pattern}*", count=100)
if keys:
self.redis.delete(*keys)
if cursor == 0:
break
Django/Flask Integration
# Django with Redis cache
from django.core.cache import cache
from django.views import View
from django.http import JsonResponse
class ProductListView(View):
def get(self, request):
category = request.GET.get('category', 'all')
# Try cache first
cache_key = f"products:list:{category}"
products = cache.get(cache_key)
if products is None:
# Fetch from database
products = self.fetch_products_from_db(category)
# Cache for 15 minutes
cache.set(cache_key, products, 60 * 15)
return JsonResponse({'products': products})
def fetch_products_from_db(self, category):
# Database query logic
pass
Cache-Aside Pattern
def get_user_profile(user_id):
"""
Cache-Aside Pattern:
1. Check cache
2. If miss, load from database
3. Populate cache
"""
cache_key = f"user:profile:{user_id}"
# Step 1: Check cache
profile = redis.get(cache_key)
if profile:
return json.loads(profile)
# Step 2: Cache miss - fetch from DB
profile = database.query("SELECT * FROM users WHERE id = ?", user_id)
if profile:
# Step 3: Populate cache
redis.setex(cache_key, 3600, json.dumps(profile))
return profile
2. Session Storage
Redis provides an excellent solution for distributed session management, replacing sticky sessions and enabling horizontal scaling.
Session Management Implementation
import uuid
import json
from datetime import timedelta
class RedisSession:
def __init__(self, redis_client, ttl=3600):
self.redis = redis_client
self.ttl = ttl
self.session_prefix = "session:"
def create_session(self, user_id, metadata=None):
"""Create new session"""
session_id = str(uuid.uuid4())
session_key = f"{self.session_prefix}{session_id}"
session_data = {
'user_id': user_id,
'created_at': datetime.utcnow().isoformat(),
'metadata': metadata or {}
}
self.redis.setex(session_key, self.ttl, json.dumps(session_data))
return session_id
def get_session(self, session_id):
"""Retrieve session data"""
session_key = f"{self.session_prefix}{session_id}"
data = self.redis.get(session_key)
return json.loads(data) if data else None
def update_session(self, session_id, data):
"""Update session data"""
session_key = f"{self.session_prefix}{session_id}"
current = self.get_session(session_id)
if current:
current.update(data)
self.redis.setex(session_key, self.ttl, json.dumps(current))
def refresh_session(self, session_id):
"""Extend session TTL"""
session_key = f"{self.session_prefix}{session_id}"
if self.redis.exists(session_key):
self.redis.expire(session_key, self.ttl)
def destroy_session(self, session_id):
"""Delete session (logout)"""
session_key = f"{self.session_prefix}{session_id}"
self.redis.delete(session_key)
Flask-Session Integration
from flask import Flask, session
from flask_session import Session
app = Flask(__name__)
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_PERMANENT'] = False
app.config['SESSION_USE_SIGNER'] = True
app.config['SESSION_KEY_PREFIX'] = 'flask_session:'
app.config['SESSION_REDIS'] = redis.Redis(host='localhost', port=6379, db=1)
Session(app)
@app.route('/login', methods=['POST'])
def login():
user_id = authenticate_user(request.json)
session['user_id'] = user_id
session['logged_in'] = True
return {'status': 'success'}
@app.route('/logout')
def logout():
session.clear()
return {'status': 'logged out'}
3. Message Queues and Pub/Sub
Redis supports two messaging patterns: classic Pub/Sub for broadcasting and Streams for durable message queues.
Pub/Sub Implementation
import threading
import time
class RedisPubSub:
def __init__(self, redis_client):
self.redis = redis_client
def publish(self, channel, message):
"""Publish message to channel"""
return self.redis.publish(channel, json.dumps(message))
def subscribe(self, channel, callback):
"""Subscribe to channel (blocking)"""
pubsub = self.redis.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
callback(data)
def pattern_subscribe(self, pattern, callback):
"""Subscribe to pattern (e.g., notifications:*)"""
pubsub = self.redis.pubsub()
pubsub.psubscribe(pattern)
for message in pubsub.listen():
if message['type'] == 'pmessage':
data = json.loads(message['data'])
callback(message['channel'], data)
# Real-time notification system
def notification_handler(data):
print(f"Received notification: {data['title']}")
# Publisher
def send_notification(user_id, notification_type, content):
pubsub = RedisPubSub(redis_client)
pubsub.publish(f"user:{user_id}:notifications", {
'type': notification_type,
'title': content['title'],
'body': content['body'],
'timestamp': time.time()
})
# Subscriber (run in background)
subscriber = RedisPubSub(redis_client)
threading.Thread(
target=subscriber.subscribe,
args=("notifications:*", notification_handler),
daemon=True
).start()
Redis Streams (Durable Queue)
class RedisStreamQueue:
def __init__(self, redis_client, stream_name):
self.redis = redis_client
self.stream = stream_name
self.consumer_group = f"{stream_name}:group"
def setup(self):
"""Create consumer group if not exists"""
try:
self.redis.xgroup_create(
self.stream,
self.consumer_group,
id='0',
mkstream=True
)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def add_message(self, data):
"""Add message to stream"""
message_id = self.redis.xadd(self.stream, data)
return message_id
def consume(self, consumer_name, count=1, block=5000):
"""Consume messages (blocking)"""
messages = self.redis.xreadgroup(
self.consumer_group,
consumer_name,
{self.stream: '>'},
count=count,
block=block
)
return messages
def acknowledge(self, message_id):
"""Mark message as processed"""
return self.redis.xack(self.stream, self.consumer_group, message_id)
# Worker implementation
def process_message(message):
# Process the message
print(f"Processing: {message}")
queue = RedisStreamQueue(redis_client, "orders:queue")
queue.setup()
while True:
messages = queue.consume("worker-1", count=5)
for stream, entries in messages:
for msg_id, data in entries:
try:
process_message(data)
queue.acknowledge(msg_id)
except Exception as e:
print(f"Error processing: {e}")
4. Rate Limiting
Implement distributed rate limiting using Redis atomic operations.
Sliding Window Rate Limiter
import time
class RateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, key, limit, window):
"""
Sliding window rate limiter
Returns: (allowed: bool, remaining: int)
"""
now = time.time()
window_start = now - window
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current requests
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set TTL
pipe.expire(key, int(window))
results = pipe.execute()
current_count = results[1]
if current_count < limit:
return True, limit - current_count - 1
return False, 0
def allow_request(self, identifier, limit, period):
"""Simple endpoint rate limiting"""
key = f"rate_limit:{identifier}"
return self.is_allowed(key, limit, period)
# Usage as middleware
def rate_limit_middleware(limiter, limit=100, period=60):
def decorator(func):
def wrapper(*args, **kwargs):
client_id = get_client_id() # Extract from request
allowed, remaining = limiter.allow_request(client_id, limit, period)
if not allowed:
return {
'error': 'Rate limit exceeded',
'retry_after': period
}, 429
# Add rate limit headers
response = func(*args, **kwargs)
response.headers['X-RateLimit-Limit'] = str(limit)
response.headers['X-RateLimit-Remaining'] = str(remaining)
return response
return wrapper
return decorator
Fixed Window Counter
class FixedWindowLimiter:
"""Simpler but slightly less accurate"""
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, key, limit, window_seconds):
"""Fixed window approach"""
window_key = f"fw:{key}:{int(time.time() // window_seconds)}"
pipe = self.redis.pipeline()
pipe.incr(window_key)
pipe.expire(window_key, window_seconds)
results = pipe.execute()
current_count = results[0]
return current_count <= limit
5. Distributed Locks
Implement distributed locking for critical sections in distributed systems.
Redis Lock Implementation
import time
import uuid
class RedisLock:
def __init__(self, redis_client, lock_name, timeout=10):
self.redis = redis_client
self.lock_name = f"lock:{lock_name}"
self.timeout = timeout
self.token = str(uuid.uuid4())
self.acquired = False
def acquire(self, blocking=True, blocking_timeout=30):
"""Acquire lock"""
start_time = time.time()
while True:
# Try to set lock with NX (only if not exists)
acquired = self.redis.set(
self.lock_name,
self.token,
nx=True,
ex=self.timeout
)
if acquired:
self.acquired = True
return True
if not blocking:
return False
if time.time() - start_time >= blocking_timeout:
return False
time.sleep(0.01) # Brief pause
def release(self):
"""Release lock (only if we own it)"""
if not self.acquired:
return False
# Lua script for atomic check-and-delete
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.lock_name, self.token)
self.acquired = False
return result
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# Usage
def process_payment(order_id):
lock = RedisLock(redis_client, f"payment:{order_id}")
if lock.acquire(blocking_timeout=5):
try:
# Critical section
order = get_order(order_id)
if order.status == 'pending':
order.status = 'processing'
save_order(order)
call_payment_gateway(order)
finally:
lock.release()
else:
raise Exception("Could not acquire lock - order being processed")
6. Real-Time Analytics
Counter and Analytics
class RedisAnalytics:
def __init__(self, redis_client):
self.redis = redis_client
def track_event(self, event_name, user_id, metadata=None):
"""Track user event"""
now = time.time()
# Daily counters
today = time.strftime("%Y-%m-%d")
pipe = self.redis.pipeline()
# Increment event counter
pipe.hincrby(f"analytics:events:{today}", event_name, 1)
# Track unique users
pipe.sadd(f"analytics:users:{today}", user_id)
# Store event in stream for later analysis
pipe.xadd("events:stream", {
'event': event_name,
'user_id': user_id,
'timestamp': str(now),
'metadata': json.dumps(metadata or {})
})
pipe.execute()
def get_daily_stats(self, date=None):
"""Get analytics for specific date"""
date = date or time.strftime("%Y-%m-%d")
events = self.redis.hgetall(f"analytics:events:{date}")
unique_users = self.redis.scard(f"analytics:users:{date}")
return {
'date': date,
'total_events': sum(int(v) for v in events.values()),
'unique_users': unique_users,
'events': events
}
Leaderboard Implementation
class Leaderboard:
def __init__(self, redis_client, leaderboard_name):
self.redis = redis_client
self.key = f"leaderboard:{leaderboard_name}"
def set_score(self, member, score):
"""Set or update score"""
return self.redis.zadd(self.key, {member: score})
def increment(self, member, increment):
"""Increment score"""
return self.redis.zincrby(self.key, increment, member)
def get_rank(self, member, reverse=False):
"""Get rank (0 = lowest by default)"""
if reverse:
return self.redis.zrevrank(self.key, member)
return self.redis.zrank(self.key, member)
def get_top(self, count=10, with_scores=True):
"""Get top players"""
return self.redis.zrevrange(self.key, 0, count - 1, withscores=with_scores)
def get_around_me(self, member, count=5):
"""Get players around a specific member"""
rank = self.redis.zrevrank(self.key, member)
if rank is None:
return []
start = max(0, rank - count)
end = rank + count
return self.redis.zrevrange(self.key, start, end, withscores=True)
def get_percentile(self, member):
"""Get percentile rank"""
rank = self.redis.zrevrank(self.key, member)
if rank is None:
return None
total = self.redis.zcard(self.key)
return (rank / total) * 100 if total > 0 else 0
# Usage
leaderboard = Leaderboard(redis_client, "game_season_1")
# Update scores
leaderboard.set_score("player_123", 1500)
leaderboard.increment("player_456", 100)
# Get rankings
top_10 = leaderboard.get_top(10)
my_rank = leaderboard.get_rank("player_123", reverse=True)
my_percentile = leaderboard.get_percentile("player_123")
7. Configuration and Feature Flags
class FeatureFlags:
def __init__(self, redis_client):
self.redis = redis_client
self.prefix = "feature:"
def enable(self, flag_name):
"""Enable feature flag"""
self.redis.set(f"{self.prefix}{flag_name}", "1")
def disable(self, flag_name):
"""Disable feature flag"""
self.redis.delete(f"{self.prefix}{flag_name}")
def is_enabled(self, flag_name):
"""Check if feature is enabled"""
return self.redis.get(f"{self.prefix}{flag_name}") == "1"
def enable_for_user(self, flag_name, user_id):
"""Enable for specific user"""
self.redis.sadd(f"{self.prefix}{flag_name}:users", user_id)
def is_enabled_for_user(self, flag_name, user_id):
"""Check if enabled for user (or globally)"""
if self.is_enabled(flag_name):
return True
return self.redis.sismember(f"{self.prefix}{flag_name}:users", user_id)
# Usage
flags = FeatureFlags(redis_client)
if flags.is_enabled("new_checkout_flow"):
return NewCheckoutView().render()
else:
return LegacyCheckoutView().render()
Best Practices Summary
| Use Case | Redis Data Type | TTL Recommendation |
|---|---|---|
| Page Cache | String | 5-60 minutes |
| Session | Hash/String | 24 hours |
| Rate Limit | Sorted Set/String | 1 minute-1 hour |
| Leaderboard | Sorted Set | Long-term |
| Pub/Sub | Stream/Pub-Sub | N/A (streams) |
| Locks | String (SET NX) | 10-30 seconds |
Conclusion
Redis’s versatility makes it an essential tool in modern application architecture. From simple caching to complex distributed systems patterns, Redis provides the building blocks for high-performance applications.
In the next article, we’ll explore Redis’s evolution in 2025-2026, including new features like vector search and Redis Stack capabilities that make it relevant for AI applications.
Comments