Introduction
Rate limiting is essential for protecting APIs from abuse, preventing service degradation, and ensuring fair usage among consumers. Without rate limits, a single misbehaving client can exhaust server resources, causing denial of service for all other users.
This guide covers rate limiting algorithms, implementation patterns, and practical strategies for protecting your APIs at any scale.
Why Rate Limiting Matters
Rate limiting protects against:
- Denial of Service: Prevents single clients from overwhelming servers
- Resource Exhaustion: Ensures fair allocation of compute, memory, and network
- Cost Control: Limits API call costs for metered services
- Service Stability: Maintains consistent performance for all users
Rate Limiting Algorithms
Token Bucket Algorithm
The token bucket algorithm allows burst traffic while enforcing an average rate:
import time
import threading
class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity # Max tokens
self.refill_rate = refill_rate # Tokens per second
self.tokens = capacity
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
def consume(self, tokens=1):
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def get_wait_time(self, tokens=1):
with self.lock:
self._refill()
if self.tokens >= tokens:
return 0
needed = tokens - self.tokens
return needed / self.refill_rate
Leaky Bucket Algorithm
The leaky bucket enforces a steady output rate, smoothing bursty traffic:
class LeakyBucket:
def __init__(self, capacity, leak_rate):
self.capacity = capacity
self.leak_rate = leak_rate # Requests per second
self.level = 0
self.last_leak = time.time()
self.lock = threading.Lock()
def _leak(self):
now = time.time()
elapsed = now - self.last_leak
leaked = elapsed * self.leak_rate
self.level = max(0, self.level - leaked)
self.last_leak = now
def allow_request(self):
with self.lock:
self._leak()
if self.level < self.capacity:
self.level += 1
return True
return False
Sliding Window Log
Tracks individual request timestamps for precise rate limiting:
from collections import deque
class SlidingWindowLog:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
self.lock = threading.Lock()
def allow_request(self):
with self.lock:
now = time.time()
cutoff = now - self.window_seconds
# Remove expired timestamps
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def get_remaining(self):
with self.lock:
now = time.time()
cutoff = now - self.window_seconds
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
return self.max_requests - len(self.requests)
Sliding Window Counter
More memory-efficient sliding window using two fixed windows:
class SlidingWindowCounter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.window1 = 0
self.window2 = 0
self.window1_start = time.time()
self.window2_start = time.time() - window_seconds
self.lock = threading.Lock()
def allow_request(self):
with self.lock:
now = time.time()
elapsed = now - self.window1_start
# Slide window if needed
if elapsed >= self.window_seconds:
self.window2 = self.window1
self.window2_start = self.window1_start
self.window1 = 0
self.window1_start = now
elapsed = 0
# Calculate weighted count
if elapsed > 0:
ratio = (self.window_seconds - elapsed) / self.window_seconds
current_count = self.window1 + int(self.window2 * ratio)
else:
current_count = self.window1
if current_count < self.max_requests:
self.window1 += 1
return True
return False
Redis-Based Distributed Rate Limiting
Token Bucket with Redis
import redis
import time
class RedisTokenBucket:
def __init__(self, redis_client, key, capacity, refill_rate):
self.redis = redis_client
self.key = f"ratelimit:{key}"
self.capacity = capacity
self.refill_rate = refill_rate
def consume(self, tokens=1):
key = self.key
# Lua script for atomic operation
script = """
local tokens_key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local tokens_needed = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local tokens = tonumber(redis.call('GET', tokens_key) or capacity)
local last_refill = tonumber(redis.call('GET', tokens_key .. ':time') or now)
local elapsed = now - last_refill
local new_tokens = math.min(capacity, tokens + (elapsed * refill_rate))
if new_tokens >= tokens_needed then
new_tokens = new_tokens - tokens_needed
redis.call('SET', tokens_key, new_tokens)
redis.call('SET', tokens_key .. ':time', now)
return 1
end
return 0
"""
result = self.redis.eval(
script,
1,
self.key,
self.capacity,
self.refill_rate,
tokens,
int(time.time())
)
return result == 1
Fixed Window Counter with Redis
class RedisFixedWindow:
def __init__(self, redis_client, key, max_requests, window_seconds):
self.redis = redis_client
self.key = f"ratelimit:{key}"
self.max_requests = max_requests
self.window_seconds = window_seconds
def allow_request(self):
key = f"{self.key}:{int(time.time() // self.window_seconds)}"
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, self.window_seconds * 2)
results = pipe.execute()
return results[0] <= self.max_requests
Sliding Window with Redis Sorted Sets
class RedisSlidingWindow:
def __init__(self, redis_client, key, max_requests, window_seconds):
self.redis = redis_client
self.key = f"ratelimit:{key}"
self.max_requests = max_requests
self.window_seconds = window_seconds
def allow_request(self):
now = time.time()
window_start = now - self.window_seconds
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(self.key, 0, window_start)
# Count current requests
pipe.zcard(self.key)
# Add current request
pipe.zadd(self.key, {str(now): now})
# Set expiry
pipe.expire(self.key, self.window_seconds + 1)
results = pipe.execute()
current_count = results[1]
if current_count < self.max_requests:
return True
# Remove the request we just added
self.redis.zrem(self.key, str(now))
return False
HTTP Rate Limiting Headers
Standard rate limit headers for client guidance:
from flask import Flask, request, jsonify
from functools import wraps
import time
app = Flask(__name__)
class RateLimiter:
def __init__(self):
self.limiters = {}
def get_limiter(self, key, max_requests, window):
if key not in self.limiters:
self.limiters[key] = SlidingWindowLog(max_requests, window)
return self.limiters[key]
limiter = RateLimiter()
def rate_limit(max_requests=100, window=60):
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# Get client identifier
client_id = request.headers.get('X-Client-ID') or request.remote_addr
key = f"{client_id}:{request.endpoint}"
limiter_instance = limiter.get_limiter(key, max_requests, window)
if not limiter_instance.allow_request():
remaining = 0
retry_after = window
else:
remaining = limiter_instance.get_remaining()
retry_after = 0
response = f(*args, **kwargs)
# Add rate limit headers
if hasattr(response, 'headers'):
response.headers['X-RateLimit-Limit'] = str(max_requests)
response.headers['X-RateLimit-Remaining'] = str(remaining)
response.headers['X-RateLimit-Reset'] = str(int(time.time()) + window)
if retry_after > 0:
response.headers['Retry-After'] = str(retry_after)
return response
return wrapped
return decorator
@app.route('/api/data')
@rate_limit(max_requests=100, window=60)
def get_data():
return jsonify({'data': 'example'})
Rate Limiting Strategies by Scope
Global Rate Limiting
# Rate limit across all users
@app.before_request
def global_limit():
global_key = f"global:{request.endpoint}"
limiter = get_limiter(global_key, max_requests=10000, window=60)
if not limiter.allow_request():
abort(429, description="Service-wide rate limit exceeded")
Per-User Rate Limiting
# Rate limit per authenticated user
@app.before_request
def user_limit():
if not hasattr(g, 'user'):
return
user_key = f"user:{g.user.id}:{request.endpoint}"
limiter = get_limiter(user_key, max_requests=1000, window=60)
if not limiter.allow_request():
abort(429, description="User rate limit exceeded")
Per-IP Rate Limiting
# Rate limit by IP address
@app.before_request
def ip_limit():
ip_key = f"ip:{request.remote_addr}"
limiter = get_limiter(ip_key, max_requests=100, window=60)
if not limiter.allow_request():
abort(429, description="IP rate limit exceeded")
Tiered Rate Limiting
# Different limits for different tiers
@app.before_request
def tiered_limit():
user = get_current_user()
limits = {
'free': (100, 60),
'basic': (1000, 60),
'premium': (10000, 60),
'enterprise': (float('inf'), 60)
}
tier = user.subscription_tier if user else 'free'
max_requests, window = limits[tier]
key = f"user:{user.id if user else request.remote_addr}"
limiter = get_limiter(key, max_requests, window)
if not limiter.allow_request():
abort(429, description=f"Rate limit exceeded for {tier} tier")
Handling Rate Limit Exceeded
Graceful Degradation
@app.route('/api/search')
@rate_limit(max_requests=100, window=60)
def search():
try:
results = perform_search(request.query)
except RateLimitExceeded:
# Return cached results as fallback
results = get_cached_search(request.query)
return jsonify({
'results': results,
'warning': 'Using cached results - rate limit exceeded'
})
return jsonify({'results': results})
Queue-Based Rate Limiting
import asyncio
from queue import Queue
class QueueBasedRateLimiter:
def __init__(self, rate, burst):
self.rate = rate
self.tokens = burst
self.queue = Queue()
self.last_update = time.time()
async def acquire(self, timeout=None):
while self.tokens <= 0:
await asyncio.sleep(0.1)
self._refill()
self.tokens -= 1
return True
def _refill(self):
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.tokens + elapsed * self.rate, self.rate)
self.last_update = now
API Gateway Rate Limiting
Kong Configuration
# Kong rate limiting plugin
curl -X POST http://kong:8001/services/my-service/plugins \
-d "name=rate-limiting" \
-d "config.minute=100" \
-d "config.policy=redis" \
-d "config.redis_host=redis-host" \
-d "config.hide_client_headers=false"
NGINX Rate Limiting
http {
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=10r/s;
limit_req_zone $http_x_api_key zone=apilimit:10m rate=100r/s;
server {
location /api/ {
limit_req zone=mylimit burst=20 nodelay;
# API key based limiting
limit_req zone=apilimit burst=50 nodelay;
proxy_pass http://backend;
}
}
}
AWS API Gateway
# Create usage plan
aws apigateway create-usage-plan \
--name "Premium Plan" \
--api-stages [{"apiId": "api123", "stage": "prod"}] \
--quota {"limit": 10000, "period": "DAY" \
--throttle {"burstLimit": 100, "rateLimit": 50}
Monitoring and Metrics
from prometheus_client import Counter, Histogram, Gauge
rate_limit_hits = Counter(
'rate_limit_hits_total',
'Total rate limit hits',
['endpoint', 'limiter_type']
)
rate_limit_latency = Histogram(
'rate_limit_check_latency_seconds',
'Rate limit check latency'
)
current_requests = Gauge(
'rate_limit_current_requests',
'Current requests in window',
['endpoint']
)
def track_rate_limit_metrics(limiter, endpoint):
def decorator(f):
@wraps(f)
async def wrapped(*args, **kwargs):
with rate_limit_latency.time():
allowed = limiter.allow_request()
rate_limit_hits.labels(
endpoint=endpoint,
limiter_type=type(limiter).__name__
).inc()
current_requests.labels(endpoint=endpoint).set(
limiter.get_remaining()
)
if not allowed:
raise RateLimitExceeded()
return await f(*args, **kwargs)
return wrapped
return decorator
Best Practices
- Use Standard Headers: Include X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset
- Choose Algorithm Based on Use Case: Token bucket for API flexibility, leaky bucket for smooth traffic
- Implement Multi-Layer Limits: Global, per-user, per-IP limits together
- Graceful Degradation: Provide fallback responses when limits are hit
- Monitor and Tune: Adjust limits based on actual traffic patterns
- Document Limits: Make rate limits clear to API consumers
- Consider Cost: Rate limiting reduces infrastructure costs significantly
Conclusion
Rate limiting is essential for API security and stability. Start with simple fixed-window limits, then graduate to sliding window or token bucket for more precise control. Implement Redis-based distributed rate limiting for multi-instance deployments, and always provide clear feedback to clients through standard HTTP headers.
Comments