Introduction
Rate limiting and throttling are essential techniques for protecting APIs from abuse, preventing service degradation, and ensuring fair resource allocation among users. Whether you’re protecting against malicious attacks, preventing accidental overload, or implementing tiered access plans, understanding rate limiting is crucial for building production-ready systems.
This article covers rate limiting algorithms, implementation strategies, distributed rate limiting with Redis, and best practices for API protection.
Understanding Rate Limiting
Why Rate Limiting Matters
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Without Rate Limiting โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Requests โ
โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ ๐ด Service โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ Degradation โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโถ Time โ
โ โ
โ Results: โ
โ - Service unavailability โ
โ - Poor user experience โ
โ - Resource exhaustion โ
โ - Cost overruns โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ With Rate Limiting โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Requests Limit: 100/min โ
โ โ โ
โ โ โโโโโโโโ โโโโโโโโ โโโโโโโโ โโโโโโโโ โ
โ โ โโโโโโโโ โโโโโโโโ โโโโโโโโ โโโโโโโโ โ
โ โ โโโโโโโโ โโโโโโโโ โโโโโโโโ โโโโโโโโ โ
โ โ โโโโโโโโ โโโโโโโโ โโโโโโโโ โโโโโโโโ โ
Stable โ
โ โ (throttled) (throttled) โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโถ Time โ
โ โ
โ Results: โ
โ - Reliable service โ
โ - Fair resource allocation โ
โ - Predictable costs โ
โ - Better UX for legitimate users โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Rate Limiting vs Throttling
| Aspect | Rate Limiting | Throttling |
|---|---|---|
| Purpose | Limit request count | Control request rate |
| Granularity | Per time window | Continuous |
| Response | 429 Too Many Requests | 429 or slow down |
| Use Case | API protection | Resource management |
Rate Limiting Algorithms
1. Fixed Window
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Fixed Window Algorithm โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Window: 1 minute โ
โ โ
โ Minute 1: โโโโโโโโโโโโ (10 requests) โ โ
โ Minute 2: โโโโโโโโโโโโโโโโโโโโโโโโโโ (20 requests) โ โ
โ Minute 3: โโโโโโโโโโโโโโโโโโโโโโโโ (18 requests) โ โ
โ Minute 4: โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ (25) ๐ด Blocked โ
โ โ
โ Problem: Burst at window boundaries โ
โ Example: 10:59:55 (5 req) + 11:00:05 (5 req) = 10 in 20s โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from datetime import datetime, timedelta
from collections import defaultdict
import threading
class FixedWindowRateLimiter:
"""Fixed window rate limiter."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
self.lock = threading.Lock()
def is_allowed(self, key: str) -> bool:
"""Check if request is allowed."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.window_seconds)
with self.lock:
# Clean old requests
self.requests[key] = [
req_time for req_time in self.requests[key]
if req_time > window_start
]
# Check limit
if len(self.requests[key]) >= self.max_requests:
return False
# Record request
self.requests[key].append(now)
return True
def get_remaining(self, key: str) -> int:
"""Get remaining requests."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.window_seconds)
with self.lock:
current = len([
req_time for req_time in self.requests[key]
if req_time > window_start
])
return max(0, self.max_requests - current)
def get_reset_time(self, key: str) -> datetime:
"""Get window reset time."""
now = datetime.utcnow()
with self.lock:
if not self.requests[key]:
return now + timedelta(seconds=self.window_seconds)
oldest = min(self.requests[key])
return oldest + timedelta(seconds=self.window_seconds)
2. Sliding Window
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Sliding Window Algorithm โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Current time: 11:00:30 โ
โ Window: 1 minute โ
โ โ
โ Requests in window: โ
โ 11:00:05, 11:00:10, 11:00:15, 11:00:20, 11:00:25 โ
โ โ โ
โ Count: 5 requests in last 60 seconds โ
โ โ
โ Next request at 11:00:30 โ 6th request โ Blocked โ
โ โ
โ Advantage: More accurate, no boundary bursts โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from datetime import datetime, timedelta
from collections import defaultdict
import threading
class SlidingWindowRateLimiter:
"""Sliding window rate limiter with log-based tracking."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
self.lock = threading.Lock()
def is_allowed(self, key: str) -> bool:
"""Check if request is allowed using sliding window."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.window_seconds)
with self.lock:
# Remove old requests
self.requests[key] = [
req_time for req_time in self.requests[key]
if req_time > window_start
]
# Check if allowed
if len(self.requests[key]) >= self.max_requests:
return False
# Add new request
self.requests[key].append(now)
return True
def get_current_count(self, key: str) -> int:
"""Get current request count in window."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.window_seconds)
with self.lock:
return len([
req_time for req_time in self.requests[key]
if req_time > window_start
])
3. Token Bucket
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Token Bucket Algorithm โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Bucket Capacity: 100 tokens โ
โ Refill Rate: 10 tokens/second โ
โ โ
โ Initial: [โโโโโโโโโโโโโโโโโโโโโโ] 100 tokens โ
โ โ
โ After 5s: [โโโโโโโโโโโโโโ ] 50 tokens โ
โ โ
โ Request: [โโโโโโโโโโโโโโ ] -1 = 49 tokens โ โ
โ Request: [โโโโโโโโโโโโโโ ] -1 = 48 tokens โ โ
โ Request: [โโโโโโโโโโโโโโ ] -1 = 47 tokens โ โ
โ Request: [โโโโโโโโโโโโโโ ] -1 = 46 tokens โ โ
โ โ
โ When bucket empty: Request blocked (429) โ
โ โ
โ Allows burst traffic while maintaining average rate โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from datetime import datetime, timedelta
import threading
import math
class TokenBucketRateLimiter:
"""Token bucket rate limiter for burst handling."""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate # tokens per second
self.buckets = {}
self.lock = threading.Lock()
def _get_bucket(self, key: str) -> dict:
"""Get or create bucket for key."""
now = datetime.utcnow()
if key not in self.buckets:
self.buckets[key] = {
'tokens': float(self.capacity),
'last_refill': now,
}
return self.buckets[key]
bucket = self.buckets[key]
# Refill tokens based on elapsed time
elapsed = (now - bucket['last_refill']).total_seconds()
tokens_to_add = elapsed * self.refill_rate
bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
bucket['last_refill'] = now
return bucket
def is_allowed(self, key: str, cost: int = 1) -> bool:
"""Check if request is allowed."""
with self.lock:
bucket = self._get_bucket(key)
if bucket['tokens'] >= cost:
bucket['tokens'] -= cost
return True
return False
def wait_time(self, key: str, cost: int = 1) -> float:
"""Calculate wait time until request can be processed."""
with self.lock:
bucket = self._get_bucket(key)
if bucket['tokens'] >= cost:
return 0.0
tokens_needed = cost - bucket['tokens']
return tokens_needed / self.refill_rate
def get_remaining(self, key: str) -> float:
"""Get remaining tokens."""
with self.lock:
bucket = self._get_bucket(key)
return bucket['tokens']
4. Leaky Bucket
class LeakyBucketRateLimiter:
"""Leaky bucket algorithm for constant rate processing."""
def __init__(self, capacity: int, leak_rate: float):
self.capacity = capacity
self.leak_rate = leak_rate # requests per second
self.buckets = {}
self.lock = threading.Lock()
def _get_bucket(self, key: str) -> dict:
"""Get or create bucket for key."""
now = datetime.utcnow()
if key not in self.buckets:
self.buckets[key] = {
'level': 0,
'last_leak': now,
}
return self.buckets[key]
bucket = self.buckets[key]
# Leak tokens based on elapsed time
elapsed = (now - bucket['last_leak']).total_seconds()
leaked = elapsed * self.leak_rate
bucket['level'] = max(0, bucket['level'] - leaked)
bucket['last_leak'] = now
return bucket
def is_allowed(self, key: str) -> bool:
"""Check if request is allowed."""
with self.lock:
bucket = self._get_bucket(key)
if bucket['level'] < self.capacity:
bucket['level'] += 1
return True
return False
def get_remaining(self, key: str) -> int:
"""Get remaining capacity."""
with self.lock:
bucket = self._get_bucket(key)
return max(0, self.capacity - bucket['level'])
Distributed Rate Limiting with Redis
import redis
from datetime import datetime
import time
class RedisRateLimiter:
"""Distributed rate limiter using Redis."""
def __init__(self, redis_url: str, key_prefix: str = "ratelimit"):
self.redis = redis.from_url(redis_url)
self.key_prefix = key_prefix
def fixed_window(self, key: str, max_requests: int,
window_seconds: int) -> dict:
"""Fixed window rate limiting with Redis."""
window_key = f"{self.key_prefix}:{key}:{int(time.time() // window_seconds)}"
# Increment counter
current = self.redis.incr(window_key)
# Set expiry on first request
if current == 1:
self.redis.expire(window_key, window_seconds)
# Check limit
allowed = current <= max_requests
remaining = max(0, max_requests - current)
reset_time = (int(time.time() // window_seconds) + 1) * window_seconds
return {
'allowed': allowed,
'remaining': remaining,
'reset': reset_time,
'retry_after': max(0, window_seconds - (int(time.time()) % window_seconds))
}
def sliding_window(self, key: str, max_requests: int,
window_seconds: int) -> dict:
"""Sliding window rate limiting with Redis."""
now = time.time()
window_start = now - window_seconds
redis_key = f"{self.key_prefix}:sliding:{key}"
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(redis_key, 0, window_start)
# Count current requests
pipe.zcard(redis_key)
# Add current request
pipe.zadd(redis_key, {str(now): now})
# Set expiry
pipe.expire(redis_key, window_seconds)
results = pipe.execute()
current_count = results[1]
allowed = current_count < max_requests
remaining = max(0, max_requests - current_count - 1)
return {
'allowed': allowed,
'remaining': remaining,
'reset': int(now + window_seconds),
}
def token_bucket(self, key: str, capacity: int,
refill_rate: float) -> dict:
"""Token bucket with Redis."""
bucket_key = f"{self.key_prefix}:token:{key}"
# Get current state
tokens, last_refill = self.redis.hmget(bucket_key, 'tokens', 'last_refill')
now = time.time()
if tokens is None:
# Initialize bucket
tokens = float(capacity)
last_refill = now
else:
tokens = float(tokens)
last_refill = float(last_refill)
# Calculate token refill
elapsed = now - last_refill
tokens = min(capacity, tokens + elapsed * refill_rate)
# Check if request is allowed
allowed = tokens >= 1
if allowed:
tokens -= 1
# Save state
self.redis.hset(bucket_key, mapping={
'tokens': tokens,
'last_refill': now
})
self.redis.expire(bucket_key, 3600) # 1 hour expiry
return {
'allowed': allowed,
'remaining': int(tokens),
'retry_after': 0 if allowed else (1 - tokens) / refill_rate
}
Implementation Examples
FastAPI Rate Limiter
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from datetime import datetime
import time
app = FastAPI()
# In-memory rate limiter
class RateLimiter:
def __init__(self, requests: int, window: int):
self.requests = requests
self.window = window
self.limiter = TokenBucketRateLimiter(requests, requests / window)
async def __call__(self, request: Request):
# Get client identifier
client_id = self._get_client_id(request)
if not self.limiter.is_allowed(client_id):
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={
'Retry-After': str(int(self.limiter.wait_time(client_id))),
'X-RateLimit-Limit': str(self.requests),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': str(int(time.time() + self.window)),
}
)
# Add rate limit headers to response
response = await request._send_request()
response.headers['X-RateLimit-Limit'] = str(self.requests)
response.headers['X-RateLimit-Remaining'] = str(
int(self.limiter.get_remaining(client_id))
)
return response
def _get_client_id(self, request: Request) -> str:
"""Get client identifier from request."""
# Try API key
api_key = request.headers.get('X-API-Key')
if api_key:
return f"api_key:{api_key}"
# Try JWT token
auth = request.headers.get('Authorization')
if auth:
return f"auth:{auth}"
# Fall back to IP
return f"ip:{request.client.host}"
# Different limits for different endpoints
rate_limit_strict = RateLimiter(requests=10, window=60) # 10/min
rate_limit_standard = RateLimiter(requests=100, window=60) # 100/min
rate_limit_search = RateLimiter(requests=30, window=60) # 30/min
@app.get("/api/users")
@rate_limit_standard
async def get_users():
return {"users": []}
@app.get("/api/search")
@rate_limit_search
async def search(query: str):
return {"results": []}
@app.post("/api/data")
@rate_limit_strict
async def create_data(data: dict):
return {"id": 1}
# Custom rate limit decorator
from functools import wraps
def rate_limit(requests: int, window: int):
"""Custom rate limit decorator."""
limiter = TokenBucketRateLimiter(requests, requests / window)
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Get key from request
# (simplified - would need to extract from args)
key = "default"
if not limiter.is_allowed(key):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={'Retry-After': str(int(limiter.wait_time(key)))}
)
return await func(*args, **kwargs)
return wrapper
return decorator
Express.js Rate Limiter
const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis');
const Redis = require('ioredis');
// Redis client
const redis = new Redis({
host: 'localhost',
port: 6379,
});
// Basic rate limiter
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // limit each IP to 100 requests per windowMs
message: 'Too many requests, please try again later',
standardHeaders: true,
legacyHeaders: false,
handler: (req, res) => {
res.status(429).json({
error: 'Rate limit exceeded',
retryAfter: res.getHeader('Retry-After')
});
}
});
// Custom limiter with Redis
const distributedLimiter = rateLimit({
windowMs: 60 * 1000, // 1 minute
max: 100,
store: new RedisStore({
prefix: 'rl:',
sendCommand: (...args) => redis.call(...args),
}),
keyGenerator: (req) => {
// Use API key if available
return req.headers['x-api-key'] || req.ip;
},
skip: (req) => {
// Skip rate limiting for health checks
return req.path === '/health';
}
});
// Different limits for different routes
const strictLimiter = rateLimit({
windowMs: 60 * 1000,
max: 10,
message: 'Strict limit exceeded'
});
const uploadLimiter = rateLimit({
windowMs: 60 * 1000,
max: 5,
message: 'Upload limit exceeded'
});
// Apply to routes
app.use('/api/', limiter);
app.use('/api/auth/login', strictLimiter);
app.use('/api/upload', uploadLimiter);
// Use with specific routes
app.get('/api/data', distributedLimiter, (req, res) => {
res.json({ data: 'example' });
});
Rate Limiting Strategies
Tiered Rate Limiting
class TieredRateLimiter:
"""Rate limiter with different tiers."""
TIERS = {
'free': {'requests': 100, 'window': 3600},
'basic': {'requests': 1000, 'window': 3600},
'pro': {'requests': 10000, 'window': 3600},
'enterprise': {'requests': 100000, 'window': 3600},
}
def __init__(self):
self.limiters = {}
for tier, config in self.TIERS.items():
self.limiters[tier] = TokenBucketRateLimiter(
config['requests'],
config['requests'] / config['window']
)
def get_limiter(self, tier: str) -> TokenBucketRateLimiter:
return self.limiters.get(tier, self.limiters['free'])
def is_allowed(self, tier: str, key: str) -> bool:
limiter = self.get_limiter(tier)
return limiter.is_allowed(key)
class RateLimitService:
"""Service to determine user tier and apply limits."""
def __init__(self, db, tiered_limiter: TieredRateLimiter):
self.db = db
self.limiter = tiered_limiter
async def check_rate_limit(self, request) -> dict:
"""Check rate limit for request."""
user = await self._get_user(request)
tier = user.get('tier', 'free')
limiter = self.limiter.get_limiter(tier)
allowed = limiter.is_allowed(f"user:{user['id']}")
tier_config = TieredRateLimiter.TIERS[tier]
return {
'allowed': allowed,
'tier': tier,
'limit': tier_config['requests'],
'remaining': int(limiter.get_remaining(f"user:{user['id']}")),
}
async def _get_user(self, request) -> dict:
# Get user from token
return {'id': '123', 'tier': 'pro'}
IP-based Rate Limiting
class IPRateLimiter:
"""IP-based rate limiting with different tiers."""
def __init__(self, redis_client):
self.redis = redis_client
def check(self, ip: str, endpoint: str) -> dict:
"""Check rate limit for IP."""
# Different limits for different endpoints
limits = {
'/api/auth/login': (5, 60), # 5 per minute
'/api/auth/register': (3, 3600), # 3 per hour
'/api/search': (30, 60), # 30 per minute
'/api/': (100, 60), # 100 per minute
}
# Get limit for endpoint
limit, window = limits.get(endpoint, limits['/api/'])
key = f"ratelimit:ip:{ip}:{endpoint}"
# Use Redis sliding window
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, window)
allowed = current <= limit
remaining = max(0, limit - current)
return {
'allowed': allowed,
'remaining': remaining,
'limit': limit,
'window': window,
}
Response Headers
def add_rate_limit_headers(response, limiter, key):
"""Add standard rate limit headers to response."""
remaining = limiter.get_remaining(key)
limit = limiter.max_requests if hasattr(limiter, 'max_requests') else 'unknown'
reset = limiter.get_reset_time(key) if hasattr(limiter, 'get_reset_time') else None
response.headers['X-RateLimit-Limit'] = str(limit)
response.headers['X-RateLimit-Remaining'] = str(remaining)
if reset:
response.headers['X-RateLimit-Reset'] = str(int(reset.timestamp()))
# If rate limited, add retry info
if remaining == 0 and hasattr(limiter, 'wait_time'):
wait = limiter.wait_time(key)
response.headers['Retry-After'] = str(int(wait))
return response
Best Practices
Configuration
| Setting | Recommendation |
|---|---|
| Limit Values | Start conservative, adjust based on usage |
| Window Size | Smaller windows = more responsive |
| Burst Allowance | Allow some burst for UX |
| Headers | Always include rate limit headers |
| Error Messages | Clear, helpful error messages |
Common Patterns
# Whitelist internal services
def should_skip_rate_limit(request):
return (
request.headers.get('X-Internal-Service') == 'true' or
request.ip in INTERNAL_IPS
)
# Progressive throttling
def get_rate_limit(request):
user = get_user(request)
if user.is_premium:
return 1000, 60
if user.is_verified:
return 100, 60
return 20, 60
# Graceful degradation
def rate_limit_fallback(request):
# Log for analysis
log_rate_limit_exceeded(request)
# Return cached response if available
cache_key = f"cached:{request.path}"
cached = redis.get(cache_key)
if cached:
return cached
raise RateLimitExceeded()
Monitoring and Analytics
import prometheus_client as prometheus
# Metrics
rate_limit_hits = prometheus.Counter(
'rate_limit_hits_total',
'Total rate limit hits',
['endpoint', 'tier', 'result']
)
rate_limit_remaining = prometheus.Gauge(
'rate_limit_remaining',
'Remaining requests',
['endpoint', 'tier']
)
def track_rate_limit(endpoint: str, tier: str, allowed: bool):
"""Track rate limit metrics."""
rate_limit_hits.labels(
endpoint=endpoint,
tier=tier,
result='allowed' if allowed else 'blocked'
).inc()
Conclusion
Rate limiting is essential for API protection and reliability. The choice of algorithm depends on your specific requirements: fixed window for simplicity, sliding window for accuracy, token bucket for burst handling, or leaky bucket for constant rate processing.
Key takeaways:
- Choose the right algorithm for your use case
- Use Redis for distributed rate limiting
- Implement tiered limits based on user plans
- Always include rate limit headers in responses
- Monitor and adjust limits based on actual usage
Resources
- Rate Limiting - AWS Well-Architected
- RFC 6585 - Additional HTTP Status Codes
- Express Rate Limit
- Redis Rate Limiting
Comments