Rate limiting is essential for protecting APIs from abuse, preventing DoS attacks, and ensuring fair usage. This guide covers algorithms, implementation patterns, and best practices.
Understanding Rate Limiting
Why Rate Limiting Matters
rate_limiting_purposes = {
"prevent_abuse": "Stop automated attacks, scraping",
"protect_backend": "Prevent overwhelming services",
"ensure_fairness": "All users get fair access",
"cost_control": "Limit resource consumption",
"revenue_protection": "Enforce tier limits"
}
Rate Limit Headers
# Standard rate limit headers
headers:
"X-RateLimit-Limit": "100" # Requests allowed in window
"X-RateLimit-Remaining": "95" # Requests left in window
"X-RateLimit-Reset": "1609459200" # Unix timestamp when window resets
# Also common:
# - Retry-After: seconds to wait (when limited)
# - RateLimit-Policy: "100;w=60"
Rate Limiting Algorithms
Fixed Window
# Fixed Window - Simple but has burst issues
class FixedWindowRateLimiter:
"""Rate limit with fixed time windows"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {} # {user_id: [(timestamp, count)]}
def is_allowed(self, user_id):
now = time.time()
window_start = int(now // self.window_seconds) * self.window_seconds
if user_id not in self.requests:
self.requests[user_id] = []
# Clean old entries
self.requests[user_id] = [
t for t in self.requests[user_id]
if t >= window_start
]
# Check limit
if len(self.requests[user_id]) >= self.max_requests:
return False, self.get_remaining(user_id)
# Add request
self.requests[user_id].append(now)
return True, self.get_remaining(user_id)
def get_remaining(self, user_id):
if user_id not in self.requests:
return self.max_requests
return self.max_requests - len(self.requests[user_id])
Sliding Window Log
# Sliding Window Log - Accurate but memory intensive
class SlidingWindowLogRateLimiter:
"""Rate limit with sliding window using request timestamps"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {} # {user_id: [timestamp1, timestamp2, ...]}
def is_allowed(self, user_id):
now = time.time()
window_start = now - self.window_seconds
if user_id not in self.requests:
self.requests[user_id] = []
# Remove old timestamps
self.requests[user_id] = [
t for t in self.requests[user_id]
if t > window_start
]
# Check limit
if len(self.requests[user_id]) >= self.max_requests:
oldest = min(self.requests[user_id])
retry_after = int(oldest + self.window_seconds - now) + 1
return False, 0, retry_after
# Add timestamp
self.requests[user_id].append(now)
return True, self.max_requests - len(self.requests[user_id]), 0
Sliding Window Counter
# Sliding Window Counter - Memory efficient, slightly less accurate
class SlidingWindowCounterRateLimiter:
"""Rate limit with sliding window counter"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.windows = {} # {user_id: {window_start: count}}
def is_allowed(self, user_id):
now = time.time()
current_window = int(now // self.window_seconds) * self.window_seconds
previous_window = current_window - self.window_seconds
if user_id not in self.windows:
self.windows[user_id] = {}
# Get counts
current_count = self.windows[user_id].get(current_window, 0)
previous_count = self.windows[user_id].get(previous_window, 0)
# Calculate weighted count
previous_weight = (now % self.window_seconds) / self.window_seconds
total_weighted = current_count + (previous_count * (1 - previous_weight))
if total_weighted >= self.max_requests:
return False, 0
# Increment
self.windows[user_id][current_window] = current_count + 1
return True, int(self.max_requests - total_weighted)
Token Bucket
# Token Bucket - Allows burst, smooth rate
class TokenBucketRateLimiter:
"""Token bucket algorithm for rate limiting"""
def __init__(self, rate, capacity):
self.rate = rate # Tokens added per second
self.capacity = capacity # Max tokens
self.buckets = {} # {user_id: (tokens, last_update)}
def is_allowed(self, user_id, tokens_needed=1):
now = time.time()
if user_id not in self.buckets:
# New user, start with full bucket
self.buckets[user_id] = (self.capacity, now)
tokens, last_update = self.buckets[user_id]
# Add tokens based on time passed
elapsed = now - last_update
tokens = min(self.capacity, tokens + elapsed * self.rate)
# Check if enough tokens
if tokens >= tokens_needed:
self.buckets[user_id] = (tokens - tokens_needed, now)
return True, int(tokens)
# Not enough tokens
tokens_needed_for_request = tokens_needed
wait_time = (tokens_needed_for_request - tokens) / self.rate
return False, 0, int(wait_time) + 1
def get_wait_time(self, user_id, tokens_needed=1):
allowed, current_tokens, wait = self.is_allowed(user_id, tokens_needed)
return wait
Leaky Bucket
# Leaky Bucket - Smooths output, constant rate
class LeakyBucketRateLimiter:
"""Leaky bucket algorithm for rate limiting"""
def __init__(self, rate, capacity):
self.rate = rate # Requests processed per second
self.capacity = capacity # Max queue size
self.buckets = {} # {user_id: (current_level, last_update)}
def is_allowed(self, user_id):
now = time.time()
if user_id not in self.buckets:
self.buckets[user_id] = (0, now)
level, last_update = self.buckets[user_id]
# Leak tokens
elapsed = now - last_update
leaked = elapsed * self.rate
level = max(0, level - leaked)
if level >= self.capacity:
# Bucket full
wait_time = (level + 1 - self.capacity) / self.rate
return False, 0, int(wait_time) + 1
# Add request
self.buckets[user_id] = (level + 1, now)
return True, self.capacity - int(level), 0
Distributed Rate Limiting
Redis-Based Rate Limiter
import redis
import time
class RedisRateLimiter:
"""Distributed rate limiter using Redis"""
def __init__(self, redis_client, key_prefix, max_requests, window_seconds):
self.redis = redis_client
self.key_prefix = key_prefix
self.max_requests = max_requests
self.window_seconds = window_seconds
def is_allowed(self, user_id):
key = f"{self.key_prefix}:{user_id}"
# Use Redis INCR with expiry
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, self.window_seconds)
results = pipe.execute()
current_count = results[0]
if current_count > self.max_requests:
ttl = self.redis.ttl(key)
return False, 0, ttl
return True, self.max_requests - current_count, 0
def is_allowed_sliding(self, user_id):
"""Sliding window using sorted set"""
key = f"{self.key_prefix}:sliding:{user_id}"
now = time.time()
window_start = now - self.window_seconds
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Add new entry
pipe.zadd(key, {str(now): now})
# Count
pipe.zcard(key)
# Set expiry
pipe.expire(key, self.window_seconds)
results = pipe.execute()
current_count = results[2]
if current_count > self.max_requests:
# Remove the entry we just added
self.redis.zrem(key, str(now))
return False, 0, self.window_seconds
return True, self.max_requests - current_count, 0
Rate Limiting with Redis + Lua
-- rate_limiter.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
-- Sliding window using sorted set
local now = tonumber(ARGV[3])
local oldest = now - window
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, oldest)
-- Count current requests
local current = redis.call('ZCARD', key)
if current < limit then
-- Add new request
redis.call('ZADD', key, now, now .. math.random())
redis.call('EXPIRE', key, window)
return {1, limit - current - 1}
else
return {0, 0}
end
Implementation Examples
Express.js Rate Limiting
const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis');
const Redis = require('ioredis');
// Redis store for distributed rate limiting
const redisClient = new Redis({
host: 'localhost',
port: 6379
});
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // limit each IP to 100 requests per window
standardHeaders: true,
legacyHeaders: false,
store: new RedisStore({
sendCommand: (...args) => redisClient.call(...args),
}),
keyGenerator: (req) => {
return req.user?.id || req.ip; // Use user ID if logged in
},
skip: (req) => {
return req.path === '/health'; // Skip rate limit for health
},
handler: (req, res) => {
res.status(429).json({
error: 'Too many requests',
retryAfter: res.getHeader('Retry-After')
});
}
});
// Apply to routes
app.use('/api/', limiter);
app.use('/api/auth', rateLimit({ windowMs: 60 * 1000, max: 5 })); // Stricter for auth
Python FastAPI Rate Limiting
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded", "retry_after": str(exc.detail)}
)
@app.get("/api/resource")
@limiter.limit("100/minute")
async def get_resource(request: Request):
return {"data": "content"}
# Different limits for different endpoints
auth_limiter = Limiter(key_func=get_remote_address)
@app.post("/api/auth/login")
@auth_limiter.limit("5/minute")
async def login(request: Request):
# Login logic
pass
Rate Limiting Strategies
Tiered Rate Limiting
class TieredRateLimiter:
"""Different limits for different user tiers"""
def __init__(self, limits_by_tier):
# limits_by_tier = {'free': 100, 'premium': 1000, 'enterprise': 10000}
self.limiters = {}
for tier, limit in limits_by_tier.items():
self.limiters[tier] = TokenBucketRateLimiter(
rate=limit / 60, # per second
capacity=limit
)
def is_allowed(self, user):
tier = user.get_tier()
limiter = self.limiters.get(tier, self.limiters['free'])
return limiter.is_allowed(user.id)
IP + User Rate Limiting
class CompositeRateLimiter:
"""Combine IP and user-based rate limiting"""
def __init__(self, ip_limiter, user_limiter):
self.ip_limiter = ip_limiter
self.user_limiter = user_limiter
def is_allowed(self, request):
ip = get_client_ip(request)
user = request.user
# Check IP limit first (faster fail)
ip_allowed, ip_remaining = self.ip_limiter.is_allowed(ip)
if not ip_allowed:
return False, "IP rate limit exceeded", ip_remaining
# Then check user limit
if user:
user_allowed, user_remaining = self.user_limiter.is_allowed(user.id)
if not user_allowed:
return False, "User rate limit exceeded", user_remaining
return True, "OK", min(ip_remaining, user_remaining or 999)
Best Practices
# Rate limiting best practices
design:
- "Use Redis for distributed systems"
- "Set appropriate limits per endpoint"
- "Return informative headers"
- "Use tiered limits for different users"
- "Implement circuit breaker for downstream failures"
headers:
- "X-RateLimit-Limit"
- "X-RateLimit-Remaining"
- "X-RateLimit-Reset"
monitoring:
- "Track rate limit hits"
- "Alert on high rejection rates"
- "Monitor latency impact"
responses:
- "429 Too Many Requests"
- "Retry-After header"
- "Clear error message"
Conclusion
Rate limiting protects your APIs:
- Algorithms: Token bucket for burst, sliding window for accuracy
- Distributed: Use Redis for multi-instance deployments
- Tiered: Different limits for different user tiers
- Monitor: Track and alert on rate limit metrics
Choose the algorithm based on your needs: simplicity (fixed window), accuracy (sliding window), or burst handling (token bucket).
Comments