Introduction
Rate limiting and throttling are essential techniques for protecting APIs from abuse, preventing service degradation, and ensuring fair resource allocation among users. Whether you’re protecting against malicious attacks, preventing accidental overload, or implementing tiered access plans, understanding rate limiting is crucial for building production-ready systems.
This article covers rate limiting algorithms, implementation strategies, distributed rate limiting with Redis, and best practices for API protection.
Understanding Rate Limiting
Why Rate Limiting Matters
┌─────────────────────────────────────────────────────────────────┐
│ Without Rate Limiting │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Requests │
│ │ │
│ │ ████████████████████████ │
│ │ ████████████████████████████ │
│ │ ██████████████████████████████ │
│ │ ████████████████████████████████ 🔴 Service │
│ │ ████████████████████████████████ Degradation │
│ │ ████████████████████████████████ │
│ │ ████████████████████████████████ │
│ │ │
│ └────────────────────────────────────────▶ Time │
│ │
│ Results: │
│ - Service unavailability │
│ - Poor user experience │
│ - Resource exhaustion │
│ - Cost overruns │
│ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ With Rate Limiting │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Requests Limit: 100/min │
│ │ │
│ │ ████████ ████████ ████████ ████████ │
│ │ ████████ ████████ ████████ ████████ │
│ │ ████████ ████████ ████████ ████████ │
│ │ ████████ ████████ ████████ ████████ ✅ Stable │
│ │ (throttled) (throttled) │
│ │ │
│ └────────────────────────────────────────▶ Time │
│ │
│ Results: │
│ - Reliable service │
│ - Fair resource allocation │
│ - Predictable costs │
│ - Better UX for legitimate users │
│ │
└─────────────────────────────────────────────────────────────────┘
Rate Limiting vs Throttling
| Aspect | Rate Limiting | Throttling |
|---|---|---|
| Purpose | Limit request count | Control request rate |
| Granularity | Per time window | Continuous |
| Response | 429 Too Many Requests | 429 or slow down |
| Use Case | API protection | Resource management |
Rate Limiting Algorithms
1. Fixed Window
┌─────────────────────────────────────────────────────────────────┐
│ Fixed Window Algorithm │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Window: 1 minute │
│ │
│ Minute 1: ████████████ (10 requests) ✓ │
│ Minute 2: ██████████████████████████ (20 requests) ✓ │
│ Minute 3: ████████████████████████ (18 requests) ✓ │
│ Minute 4: ████████████████████████████████ (25) 🔴 Blocked │
│ │
│ Problem: Burst at window boundaries │
│ Example: 10:59:55 (5 req) + 11:00:05 (5 req) = 10 in 20s │
│ │
└─────────────────────────────────────────────────────────────────┘
from datetime import datetime, timedelta
from collections import defaultdict
import threading
class FixedWindowRateLimiter:
"""Fixed window rate limiter."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
self.lock = threading.Lock()
def is_allowed(self, key: str) -> bool:
"""Check if request is allowed."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.window_seconds)
with self.lock:
# Clean old requests
self.requests[key] = [
req_time for req_time in self.requests[key]
if req_time > window_start
]
# Check limit
if len(self.requests[key]) >= self.max_requests:
return False
# Record request
self.requests[key].append(now)
return True
def get_remaining(self, key: str) -> int:
"""Get remaining requests."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.window_seconds)
with self.lock:
current = len([
req_time for req_time in self.requests[key]
if req_time > window_start
])
return max(0, self.max_requests - current)
def get_reset_time(self, key: str) -> datetime:
"""Get window reset time."""
now = datetime.utcnow()
with self.lock:
if not self.requests[key]:
return now + timedelta(seconds=self.window_seconds)
oldest = min(self.requests[key])
return oldest + timedelta(seconds=self.window_seconds)
2. Sliding Window
┌─────────────────────────────────────────────────────────────────┐
│ Sliding Window Algorithm │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Current time: 11:00:30 │
│ Window: 1 minute │
│ │
│ Requests in window: │
│ 11:00:05, 11:00:10, 11:00:15, 11:00:20, 11:00:25 │
│ ↓ │
│ Count: 5 requests in last 60 seconds │
│ │
│ Next request at 11:00:30 → 6th request → Blocked │
│ │
│ Advantage: More accurate, no boundary bursts │
│ │
└─────────────────────────────────────────────────────────────────┘
from datetime import datetime, timedelta
from collections import defaultdict
import threading
class SlidingWindowRateLimiter:
"""Sliding window rate limiter with log-based tracking."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
self.lock = threading.Lock()
def is_allowed(self, key: str) -> bool:
"""Check if request is allowed using sliding window."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.window_seconds)
with self.lock:
# Remove old requests
self.requests[key] = [
req_time for req_time in self.requests[key]
if req_time > window_start
]
# Check if allowed
if len(self.requests[key]) >= self.max_requests:
return False
# Add new request
self.requests[key].append(now)
return True
def get_current_count(self, key: str) -> int:
"""Get current request count in window."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.window_seconds)
with self.lock:
return len([
req_time for req_time in self.requests[key]
if req_time > window_start
])
3. Token Bucket
┌─────────────────────────────────────────────────────────────────┐
│ Token Bucket Algorithm │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Bucket Capacity: 100 tokens │
│ Refill Rate: 10 tokens/second │
│ │
│ Initial: [██████████████████████] 100 tokens │
│ │
│ After 5s: [██████████████ ] 50 tokens │
│ │
│ Request: [██████████████ ] -1 = 49 tokens ✓ │
│ Request: [██████████████ ] -1 = 48 tokens ✓ │
│ Request: [██████████████ ] -1 = 47 tokens ✓ │
│ Request: [██████████████ ] -1 = 46 tokens ✓ │
│ │
│ When bucket empty: Request blocked (429) │
│ │
│ Allows burst traffic while maintaining average rate │
│ │
└─────────────────────────────────────────────────────────────────┘
from datetime import datetime, timedelta
import threading
import math
class TokenBucketRateLimiter:
"""Token bucket rate limiter for burst handling."""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate # tokens per second
self.buckets = {}
self.lock = threading.Lock()
def _get_bucket(self, key: str) -> dict:
"""Get or create bucket for key."""
now = datetime.utcnow()
if key not in self.buckets:
self.buckets[key] = {
'tokens': float(self.capacity),
'last_refill': now,
}
return self.buckets[key]
bucket = self.buckets[key]
# Refill tokens based on elapsed time
elapsed = (now - bucket['last_refill']).total_seconds()
tokens_to_add = elapsed * self.refill_rate
bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
bucket['last_refill'] = now
return bucket
def is_allowed(self, key: str, cost: int = 1) -> bool:
"""Check if request is allowed."""
with self.lock:
bucket = self._get_bucket(key)
if bucket['tokens'] >= cost:
bucket['tokens'] -= cost
return True
return False
def wait_time(self, key: str, cost: int = 1) -> float:
"""Calculate wait time until request can be processed."""
with self.lock:
bucket = self._get_bucket(key)
if bucket['tokens'] >= cost:
return 0.0
tokens_needed = cost - bucket['tokens']
return tokens_needed / self.refill_rate
def get_remaining(self, key: str) -> float:
"""Get remaining tokens."""
with self.lock:
bucket = self._get_bucket(key)
return bucket['tokens']
4. Leaky Bucket
class LeakyBucketRateLimiter:
"""Leaky bucket algorithm for constant rate processing."""
def __init__(self, capacity: int, leak_rate: float):
self.capacity = capacity
self.leak_rate = leak_rate # requests per second
self.buckets = {}
self.lock = threading.Lock()
def _get_bucket(self, key: str) -> dict:
"""Get or create bucket for key."""
now = datetime.utcnow()
if key not in self.buckets:
self.buckets[key] = {
'level': 0,
'last_leak': now,
}
return self.buckets[key]
bucket = self.buckets[key]
# Leak tokens based on elapsed time
elapsed = (now - bucket['last_leak']).total_seconds()
leaked = elapsed * self.leak_rate
bucket['level'] = max(0, bucket['level'] - leaked)
bucket['last_leak'] = now
return bucket
def is_allowed(self, key: str) -> bool:
"""Check if request is allowed."""
with self.lock:
bucket = self._get_bucket(key)
if bucket['level'] < self.capacity:
bucket['level'] += 1
return True
return False
def get_remaining(self, key: str) -> int:
"""Get remaining capacity."""
with self.lock:
bucket = self._get_bucket(key)
return max(0, self.capacity - bucket['level'])
Distributed Rate Limiting with Redis
import redis
from datetime import datetime
import time
class RedisRateLimiter:
"""Distributed rate limiter using Redis."""
def __init__(self, redis_url: str, key_prefix: str = "ratelimit"):
self.redis = redis.from_url(redis_url)
self.key_prefix = key_prefix
def fixed_window(self, key: str, max_requests: int,
window_seconds: int) -> dict:
"""Fixed window rate limiting with Redis."""
window_key = f"{self.key_prefix}:{key}:{int(time.time() // window_seconds)}"
# Increment counter
current = self.redis.incr(window_key)
# Set expiry on first request
if current == 1:
self.redis.expire(window_key, window_seconds)
# Check limit
allowed = current <= max_requests
remaining = max(0, max_requests - current)
reset_time = (int(time.time() // window_seconds) + 1) * window_seconds
return {
'allowed': allowed,
'remaining': remaining,
'reset': reset_time,
'retry_after': max(0, window_seconds - (int(time.time()) % window_seconds))
}
def sliding_window(self, key: str, max_requests: int,
window_seconds: int) -> dict:
"""Sliding window rate limiting with Redis."""
now = time.time()
window_start = now - window_seconds
redis_key = f"{self.key_prefix}:sliding:{key}"
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(redis_key, 0, window_start)
# Count current requests
pipe.zcard(redis_key)
# Add current request
pipe.zadd(redis_key, {str(now): now})
# Set expiry
pipe.expire(redis_key, window_seconds)
results = pipe.execute()
current_count = results[1]
allowed = current_count < max_requests
remaining = max(0, max_requests - current_count - 1)
return {
'allowed': allowed,
'remaining': remaining,
'reset': int(now + window_seconds),
}
def token_bucket(self, key: str, capacity: int,
refill_rate: float) -> dict:
"""Token bucket with Redis."""
bucket_key = f"{self.key_prefix}:token:{key}"
# Get current state
tokens, last_refill = self.redis.hmget(bucket_key, 'tokens', 'last_refill')
now = time.time()
if tokens is None:
# Initialize bucket
tokens = float(capacity)
last_refill = now
else:
tokens = float(tokens)
last_refill = float(last_refill)
# Calculate token refill
elapsed = now - last_refill
tokens = min(capacity, tokens + elapsed * refill_rate)
# Check if request is allowed
allowed = tokens >= 1
if allowed:
tokens -= 1
# Save state
self.redis.hset(bucket_key, mapping={
'tokens': tokens,
'last_refill': now
})
self.redis.expire(bucket_key, 3600) # 1 hour expiry
return {
'allowed': allowed,
'remaining': int(tokens),
'retry_after': 0 if allowed else (1 - tokens) / refill_rate
}
Implementation Examples
FastAPI Rate Limiter
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from datetime import datetime
import time
app = FastAPI()
# In-memory rate limiter
class RateLimiter:
def __init__(self, requests: int, window: int):
self.requests = requests
self.window = window
self.limiter = TokenBucketRateLimiter(requests, requests / window)
async def __call__(self, request: Request):
# Get client identifier
client_id = self._get_client_id(request)
if not self.limiter.is_allowed(client_id):
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={
'Retry-After': str(int(self.limiter.wait_time(client_id))),
'X-RateLimit-Limit': str(self.requests),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': str(int(time.time() + self.window)),
}
)
# Add rate limit headers to response
response = await request._send_request()
response.headers['X-RateLimit-Limit'] = str(self.requests)
response.headers['X-RateLimit-Remaining'] = str(
int(self.limiter.get_remaining(client_id))
)
return response
def _get_client_id(self, request: Request) -> str:
"""Get client identifier from request."""
# Try API key
api_key = request.headers.get('X-API-Key')
if api_key:
return f"api_key:{api_key}"
# Try JWT token
auth = request.headers.get('Authorization')
if auth:
return f"auth:{auth}"
# Fall back to IP
return f"ip:{request.client.host}"
# Different limits for different endpoints
rate_limit_strict = RateLimiter(requests=10, window=60) # 10/min
rate_limit_standard = RateLimiter(requests=100, window=60) # 100/min
rate_limit_search = RateLimiter(requests=30, window=60) # 30/min
@app.get("/api/users")
@rate_limit_standard
async def get_users():
return {"users": []}
@app.get("/api/search")
@rate_limit_search
async def search(query: str):
return {"results": []}
@app.post("/api/data")
@rate_limit_strict
async def create_data(data: dict):
return {"id": 1}
# Custom rate limit decorator
from functools import wraps
def rate_limit(requests: int, window: int):
"""Custom rate limit decorator."""
limiter = TokenBucketRateLimiter(requests, requests / window)
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Get key from request
# (simplified - would need to extract from args)
key = "default"
if not limiter.is_allowed(key):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={'Retry-After': str(int(limiter.wait_time(key)))}
)
return await func(*args, **kwargs)
return wrapper
return decorator
Express.js Rate Limiter
const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis');
const Redis = require('ioredis');
// Redis client
const redis = new Redis({
host: 'localhost',
port: 6379,
});
// Basic rate limiter
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // limit each IP to 100 requests per windowMs
message: 'Too many requests, please try again later',
standardHeaders: true,
legacyHeaders: false,
handler: (req, res) => {
res.status(429).json({
error: 'Rate limit exceeded',
retryAfter: res.getHeader('Retry-After')
});
}
});
// Custom limiter with Redis
const distributedLimiter = rateLimit({
windowMs: 60 * 1000, // 1 minute
max: 100,
store: new RedisStore({
prefix: 'rl:',
sendCommand: (...args) => redis.call(...args),
}),
keyGenerator: (req) => {
// Use API key if available
return req.headers['x-api-key'] || req.ip;
},
skip: (req) => {
// Skip rate limiting for health checks
return req.path === '/health';
}
});
// Different limits for different routes
const strictLimiter = rateLimit({
windowMs: 60 * 1000,
max: 10,
message: 'Strict limit exceeded'
});
const uploadLimiter = rateLimit({
windowMs: 60 * 1000,
max: 5,
message: 'Upload limit exceeded'
});
// Apply to routes
app.use('/api/', limiter);
app.use('/api/auth/login', strictLimiter);
app.use('/api/upload', uploadLimiter);
// Use with specific routes
app.get('/api/data', distributedLimiter, (req, res) => {
res.json({ data: 'example' });
});
Rate Limiting Strategies
Tiered Rate Limiting
class TieredRateLimiter:
"""Rate limiter with different tiers."""
TIERS = {
'free': {'requests': 100, 'window': 3600},
'basic': {'requests': 1000, 'window': 3600},
'pro': {'requests': 10000, 'window': 3600},
'enterprise': {'requests': 100000, 'window': 3600},
}
def __init__(self):
self.limiters = {}
for tier, config in self.TIERS.items():
self.limiters[tier] = TokenBucketRateLimiter(
config['requests'],
config['requests'] / config['window']
)
def get_limiter(self, tier: str) -> TokenBucketRateLimiter:
return self.limiters.get(tier, self.limiters['free'])
def is_allowed(self, tier: str, key: str) -> bool:
limiter = self.get_limiter(tier)
return limiter.is_allowed(key)
class RateLimitService:
"""Service to determine user tier and apply limits."""
def __init__(self, db, tiered_limiter: TieredRateLimiter):
self.db = db
self.limiter = tiered_limiter
async def check_rate_limit(self, request) -> dict:
"""Check rate limit for request."""
user = await self._get_user(request)
tier = user.get('tier', 'free')
limiter = self.limiter.get_limiter(tier)
allowed = limiter.is_allowed(f"user:{user['id']}")
tier_config = TieredRateLimiter.TIERS[tier]
return {
'allowed': allowed,
'tier': tier,
'limit': tier_config['requests'],
'remaining': int(limiter.get_remaining(f"user:{user['id']}")),
}
async def _get_user(self, request) -> dict:
# Get user from token
return {'id': '123', 'tier': 'pro'}
IP-based Rate Limiting
class IPRateLimiter:
"""IP-based rate limiting with different tiers."""
def __init__(self, redis_client):
self.redis = redis_client
def check(self, ip: str, endpoint: str) -> dict:
"""Check rate limit for IP."""
# Different limits for different endpoints
limits = {
'/api/auth/login': (5, 60), # 5 per minute
'/api/auth/register': (3, 3600), # 3 per hour
'/api/search': (30, 60), # 30 per minute
'/api/': (100, 60), # 100 per minute
}
# Get limit for endpoint
limit, window = limits.get(endpoint, limits['/api/'])
key = f"ratelimit:ip:{ip}:{endpoint}"
# Use Redis sliding window
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, window)
allowed = current <= limit
remaining = max(0, limit - current)
return {
'allowed': allowed,
'remaining': remaining,
'limit': limit,
'window': window,
}
Response Headers
def add_rate_limit_headers(response, limiter, key):
"""Add standard rate limit headers to response."""
remaining = limiter.get_remaining(key)
limit = limiter.max_requests if hasattr(limiter, 'max_requests') else 'unknown'
reset = limiter.get_reset_time(key) if hasattr(limiter, 'get_reset_time') else None
response.headers['X-RateLimit-Limit'] = str(limit)
response.headers['X-RateLimit-Remaining'] = str(remaining)
if reset:
response.headers['X-RateLimit-Reset'] = str(int(reset.timestamp()))
# If rate limited, add retry info
if remaining == 0 and hasattr(limiter, 'wait_time'):
wait = limiter.wait_time(key)
response.headers['Retry-After'] = str(int(wait))
return response
Best Practices
Configuration
| Setting | Recommendation |
|---|---|
| Limit Values | Start conservative, adjust based on usage |
| Window Size | Smaller windows = more responsive |
| Burst Allowance | Allow some burst for UX |
| Headers | Always include rate limit headers |
| Error Messages | Clear, helpful error messages |
Common Patterns
# Whitelist internal services
def should_skip_rate_limit(request):
return (
request.headers.get('X-Internal-Service') == 'true' or
request.ip in INTERNAL_IPS
)
# Progressive throttling
def get_rate_limit(request):
user = get_user(request)
if user.is_premium:
return 1000, 60
if user.is_verified:
return 100, 60
return 20, 60
# Graceful degradation
def rate_limit_fallback(request):
# Log for analysis
log_rate_limit_exceeded(request)
# Return cached response if available
cache_key = f"cached:{request.path}"
cached = redis.get(cache_key)
if cached:
return cached
raise RateLimitExceeded()
Monitoring and Analytics
import prometheus_client as prometheus
# Metrics
rate_limit_hits = prometheus.Counter(
'rate_limit_hits_total',
'Total rate limit hits',
['endpoint', 'tier', 'result']
)
rate_limit_remaining = prometheus.Gauge(
'rate_limit_remaining',
'Remaining requests',
['endpoint', 'tier']
)
def track_rate_limit(endpoint: str, tier: str, allowed: bool):
"""Track rate limit metrics."""
rate_limit_hits.labels(
endpoint=endpoint,
tier=tier,
result='allowed' if allowed else 'blocked'
).inc()
Additional Rate Limiting Implementations
Sliding Window Counter
More memory-efficient sliding window using two fixed windows:
import time
import threading
class SlidingWindowCounter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.window1 = 0
self.window2 = 0
self.window1_start = time.time()
self.window2_start = time.time() - window_seconds
self.lock = threading.Lock()
def allow_request(self):
with self.lock:
now = time.time()
elapsed = now - self.window1_start
if elapsed >= self.window_seconds:
self.window2 = self.window1
self.window2_start = self.window1_start
self.window1 = 0
self.window1_start = now
elapsed = 0
if elapsed > 0:
ratio = (self.window_seconds - elapsed) / self.window_seconds
current_count = self.window1 + int(self.window2 * ratio)
else:
current_count = self.window1
if current_count < self.max_requests:
self.window1 += 1
return True
return False
Sliding Log Algorithm
For most accurate rate limiting with per-client tracking:
from collections import defaultdict
import time
import threading
class SlidingLogRateLimiter:
"""Sliding log rate limiter - most accurate but memory intensive."""
def __init__(self, max_requests: int, window_size: float):
self.max_requests = max_requests
self.window_size = window_size
self.logs = defaultdict(list)
self.lock = Lock()
def _clean_logs(self, client_id: str, current_time: float) -> None:
cutoff = current_time - self.window_size
self.logs[client_id] = [t for t in self.logs[client_id] if t > cutoff]
def allow_request(self, client_id: str) -> bool:
current_time = time.time()
with self.lock:
self._clean_logs(client_id, current_time)
if len(self.logs[client_id]) < self.max_requests:
self.logs[client_id].append(current_time)
return True
return False
def get_reset_time(self, client_id: str) -> float:
current_time = time.time()
with self.lock:
if client_id not in self.logs or not self.logs[client_id]:
return 0
oldest = min(self.logs[client_id])
return (oldest + self.window_size) - current_time
Queue-Based Rate Limiting
import asyncio
import time
class QueueBasedRateLimiter:
def __init__(self, rate, burst):
self.rate = rate
self.tokens = burst
self.queue = asyncio.Queue()
self.last_update = time.time()
async def acquire(self, timeout=None):
while self.tokens <= 0:
await asyncio.sleep(0.1)
self._refill()
self.tokens -= 1
return True
def _refill(self):
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.tokens + elapsed * self.rate, self.rate)
self.last_update = now
Rate Limiting by Scope
from functools import wraps
from flask import Flask, request, abort, g
import time
app = Flask(__name__)
class RateLimiter:
def __init__(self):
self.limiters = {}
def get_limiter(self, key, max_requests, window):
if key not in self.limiters:
self.limiters[key] = SlidingWindowLog(max_requests, window)
return self.limiters[key]
limiter = RateLimiter()
def rate_limit(max_requests=100, window=60):
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
client_id = request.headers.get('X-Client-ID') or request.remote_addr
key = f"{client_id}:{request.endpoint}"
limiter_instance = limiter.get_limiter(key, max_requests, window)
if not limiter_instance.allow_request():
remaining = 0
retry_after = window
else:
remaining = limiter_instance.get_remaining()
retry_after = 0
response = f(*args, **kwargs)
if hasattr(response, 'headers'):
response.headers['X-RateLimit-Limit'] = str(max_requests)
response.headers['X-RateLimit-Remaining'] = str(remaining)
response.headers['X-RateLimit-Reset'] = str(int(time.time()) + window)
if retry_after > 0:
response.headers['Retry-After'] = str(retry_after)
return response
return wrapped
return decorator
Global Rate Limiting
@app.before_request
def global_limit():
global_key = f"global:{request.endpoint}"
limiter_instance = limiter.get_limiter(global_key, max_requests=10000, window=60)
if not limiter_instance.allow_request():
abort(429, description="Service-wide rate limit exceeded")
Per-User Rate Limiting
@app.before_request
def user_limit():
if not hasattr(g, 'user'):
return
user_key = f"user:{g.user.id}:{request.endpoint}"
limiter_instance = limiter.get_limiter(user_key, max_requests=1000, window=60)
if not limiter_instance.allow_request():
abort(429, description="User rate limit exceeded")
Per-IP Rate Limiting
@app.before_request
def ip_limit():
ip_key = f"ip:{request.remote_addr}"
limiter_instance = limiter.get_limiter(ip_key, max_requests=100, window=60)
if not limiter_instance.allow_request():
abort(429, description="IP rate limit exceeded")
Tiered Rate Limiting
@app.before_request
def tiered_limit():
user = get_current_user()
limits = {
'free': (100, 60),
'basic': (1000, 60),
'premium': (10000, 60),
'enterprise': (float('inf'), 60)
}
tier = user.subscription_tier if user else 'free'
max_requests, window = limits[tier]
key = f"user:{user.id if user else request.remote_addr}"
limiter_instance = limiter.get_limiter(key, max_requests, window)
if not limiter_instance.allow_request():
abort(429, description=f"Rate limit exceeded for {tier} tier")
Graceful Degradation
@app.route('/api/search')
@rate_limit(max_requests=100, window=60)
def search():
try:
results = perform_search(request.query)
except RateLimitExceeded:
results = get_cached_search(request.query)
return jsonify({
'results': results,
'warning': 'Using cached results - rate limit exceeded'
})
return jsonify({'results': results})
HTTP Rate Limit Headers
from dataclasses import dataclass
from typing import Optional
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = None
def build_rate_limit_headers(info: RateLimitInfo) -> dict:
headers = {
'X-RateLimit-Limit': str(info.limit),
'X-RateLimit-Remaining': str(info.remaining),
'X-RateLimit-Reset': str(info.reset)
}
if info.retry_after is not None:
headers['Retry-After'] = str(info.retry_after)
return headers
def build_429_response(info: RateLimitInfo) -> dict:
from flask import jsonify
return jsonify({
'error': 'rate_limit_exceeded',
'message': 'Too many requests',
'retry_after': info.retry_after or info.reset - int(time.time())
}), 429, {
'X-RateLimit-Limit': str(info.limit),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': str(info.reset),
'Retry-After': str(info.retry_after or info.reset - int(time.time()))
}
API Gateway Rate Limiting
NGINX
http {
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=10r/s;
limit_req_zone $http_x_api_key zone=apilimit:10m rate=100r/s;
server {
location /api/ {
limit_req zone=mylimit burst=20 nodelay;
limit_req zone=apilimit burst=50 nodelay;
proxy_pass http://backend;
}
}
}
AWS API Gateway
aws apigateway create-usage-plan \
--name "Premium Plan" \
--api-stages [{"apiId": "api123", "stage": "prod"}] \
--quota {"limit": 10000, "period": "DAY"} \
--throttle {"burstLimit": 100, "rateLimit": 50}
Client-Side Rate Limit Handling
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class RateLimitAwareSession(requests.Session):
"""Session that handles rate limiting automatically."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.mount("http://", adapter)
self.mount("https://", adapter)
def request(self, method, url, **kwargs):
response = super().request(method, url, **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
reset_time = response.headers.get('X-RateLimit-Reset')
if reset_time:
wait_time = max(0, int(reset_time) - int(time.time()))
else:
wait_time = retry_after
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
response = super().request(method, url, **kwargs)
return response
Prometheus Monitoring Decorator
from prometheus_client import Counter, Histogram, Gauge
rate_limit_hits = Counter(
'rate_limit_hits_total',
'Total rate limit hits',
['endpoint', 'limiter_type']
)
rate_limit_latency = Histogram(
'rate_limit_check_latency_seconds',
'Rate limit check latency'
)
current_requests = Gauge(
'rate_limit_current_requests',
'Current requests in window',
['endpoint']
)
def track_rate_limit_metrics(limiter, endpoint):
def decorator(f):
@wraps(f)
async def wrapped(*args, **kwargs):
with rate_limit_latency.time():
allowed = limiter.allow_request()
rate_limit_hits.labels(
endpoint=endpoint,
limiter_type=type(limiter).__name__
).inc()
current_requests.labels(endpoint=endpoint).set(
limiter.get_remaining()
)
if not allowed:
raise RateLimitExceeded()
return await f(*args, **kwargs)
return wrapped
return decorator
Conclusion
Rate limiting is essential for API protection and reliability. The choice of algorithm depends on your specific requirements: fixed window for simplicity, sliding window for accuracy, token bucket for burst handling, or leaky bucket for constant rate processing.
Key takeaways:
- Choose the right algorithm for your use case
- Use Redis for distributed rate limiting
- Implement tiered limits based on user plans
- Always include rate limit headers in responses
- Monitor and adjust limits based on actual usage
Rate Limiting with Redis + Lua
For atomic, server-side rate limiting with Redis Lua scripting:
-- rate_limiter.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
-- Sliding window using sorted set
local now = tonumber(ARGV[3])
local oldest = now - window
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, oldest)
-- Count current requests
local current = redis.call('ZCARD', key)
if current < limit then
-- Add new request
redis.call('ZADD', key, now, now .. math.random())
redis.call('EXPIRE', key, window)
return {1, limit - current - 1}
else
return {0, 0}
end
Composite Rate Limiting
Combine IP and user-based rate limiting for defense in depth:
class CompositeRateLimiter:
"""Combine IP and user-based rate limiting."""
def __init__(self, ip_limiter, user_limiter):
self.ip_limiter = ip_limiter
self.user_limiter = user_limiter
def is_allowed(self, request):
ip = get_client_ip(request)
user = request.user
ip_allowed, ip_remaining = self.ip_limiter.is_allowed(ip)
if not ip_allowed:
return False, "IP rate limit exceeded", ip_remaining
if user:
user_allowed, user_remaining = self.user_limiter.is_allowed(user.id)
if not user_allowed:
return False, "User rate limit exceeded", user_remaining
return True, "OK", min(ip_remaining, user_remaining or 999)
SlowAPI Integration
from fastapi import FastAPI, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.responses import JSONResponse
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded", "retry_after": str(exc.detail)}
)
@app.get("/api/resource")
@limiter.limit("100/minute")
async def get_resource(request: Request):
return {"data": "content"}
auth_limiter = Limiter(key_func=get_remote_address)
@app.post("/api/auth/login")
@auth_limiter.limit("5/minute")
async def login(request: Request):
pass
Algorithm Comparison
| Algorithm | Memory | Accuracy | Burst | Implementation |
|---|---|---|---|---|
| Token Bucket | Low | Good | Yes | Simple |
| Leaky Bucket | Low | Excellent | No | Moderate |
| Sliding Window | High | Excellent | No | Complex |
| Fixed Window | Low | Poor | Yes | Simple |
Resources
- Rate Limiting Algorithms (Wikipedia)
- Token Bucket Algorithm (Wikipedia)
- HTTP 429 Status Code (MDN)
- Rate Limiting - AWS Well-Architected
- RFC 6585 - Additional HTTP Status Codes
- Express Rate Limit
- Flask-Limiter
- Django-Ratelimit
- Redis Rate Limiting
- NGINX Rate Limiting
- Stripe API Rate Limits
- Kong Rate Limiting Plugin
- Redis Rate Limiting Patterns
Comments