Introduction
Rate limiting and throttling are essential for protecting APIs from abuse and ensuring fair resource allocation. Different algorithms offer different trade-offs between accuracy, memory usage, and implementation complexity. Understanding these algorithms enables you to choose the right approach for your use case.
This comprehensive guide covers rate limiting algorithms with practical implementations.
Core Concepts
Rate Limiting
Restricting number of requests per time period.
Throttling
Slowing down requests when limit is reached.
Token Bucket
Algorithm allowing burst traffic up to capacity.
Leaky Bucket
Algorithm smoothing traffic to constant rate.
Sliding Window
Algorithm tracking requests in time window.
Quota
Maximum number of requests allowed.
Backoff
Delaying retry after rate limit exceeded.
Algorithm Comparison
| Algorithm | Memory | Accuracy | Burst | Implementation |
|---|---|---|---|---|
| Token Bucket | Low | Good | Yes | Simple |
| Leaky Bucket | Low | Excellent | No | Moderate |
| Sliding Window | High | Excellent | No | Complex |
| Fixed Window | Low | Poor | Yes | Simple |
Token Bucket Algorithm
import time
class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.refill_rate = refill_rate # tokens per second
self.tokens = capacity
self.last_refill = time.time()
def allow_request(self, tokens=1):
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
# Add tokens based on elapsed time
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.refill_rate
)
self.last_refill = now
# Usage
bucket = TokenBucket(capacity=100, refill_rate=10) # 100 capacity, 10 tokens/sec
for i in range(150):
if bucket.allow_request():
print(f"Request {i} allowed")
else:
print(f"Request {i} denied")
time.sleep(0.05)
Leaky Bucket Algorithm
import time
from collections import deque
class LeakyBucket:
def __init__(self, capacity, leak_rate):
self.capacity = capacity
self.leak_rate = leak_rate # requests per second
self.queue = deque()
self.last_leak = time.time()
def allow_request(self):
self._leak()
if len(self.queue) < self.capacity:
self.queue.append(time.time())
return True
return False
def _leak(self):
now = time.time()
elapsed = now - self.last_leak
# Remove leaked requests
leaked = int(elapsed * self.leak_rate)
for _ in range(leaked):
if self.queue:
self.queue.popleft()
self.last_leak = now
# Usage
bucket = LeakyBucket(capacity=100, leak_rate=10) # 100 capacity, 10 req/sec
for i in range(150):
if bucket.allow_request():
print(f"Request {i} allowed")
else:
print(f"Request {i} denied")
time.sleep(0.05)
Sliding Window Algorithm
import time
from collections import defaultdict
class SlidingWindow:
def __init__(self, window_size, max_requests):
self.window_size = window_size # seconds
self.max_requests = max_requests
self.requests = defaultdict(list)
def allow_request(self, client_id):
now = time.time()
window_start = now - self.window_size
# Remove old requests outside window
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > window_start
]
# Check if limit exceeded
if len(self.requests[client_id]) < self.max_requests:
self.requests[client_id].append(now)
return True
return False
# Usage
window = SlidingWindow(window_size=60, max_requests=100) # 100 req/min
for i in range(150):
if window.allow_request('client-1'):
print(f"Request {i} allowed")
else:
print(f"Request {i} denied")
time.sleep(0.5)
Fixed Window Algorithm
import time
class FixedWindow:
def __init__(self, window_size, max_requests):
self.window_size = window_size
self.max_requests = max_requests
self.requests = {}
def allow_request(self, client_id):
now = time.time()
window_key = int(now / self.window_size)
if client_id not in self.requests:
self.requests[client_id] = {}
if window_key not in self.requests[client_id]:
self.requests[client_id][window_key] = 0
if self.requests[client_id][window_key] < self.max_requests:
self.requests[client_id][window_key] += 1
return True
return False
# Usage
window = FixedWindow(window_size=60, max_requests=100)
for i in range(150):
if window.allow_request('client-1'):
print(f"Request {i} allowed")
else:
print(f"Request {i} denied")
time.sleep(0.5)
Distributed Rate Limiting with Redis
import redis
import time
class RedisRateLimiter:
def __init__(self, redis_client, key_prefix='rate_limit'):
self.redis = redis_client
self.key_prefix = key_prefix
def allow_request(self, client_id, limit, window):
"""Token bucket with Redis"""
key = f"{self.key_prefix}:{client_id}"
# Get current tokens
current = self.redis.get(key)
if current is None:
current = limit
self.redis.setex(key, window, limit)
else:
current = int(current)
if current > 0:
self.redis.decr(key)
return True
return False
def get_remaining(self, client_id):
"""Get remaining requests"""
key = f"{self.key_prefix}:{client_id}"
remaining = self.redis.get(key)
return int(remaining) if remaining else 0
# Usage
redis_client = redis.Redis(host='localhost', port=6379)
limiter = RedisRateLimiter(redis_client)
for i in range(150):
if limiter.allow_request('client-1', limit=100, window=60):
print(f"Request {i} allowed")
else:
print(f"Request {i} denied")
remaining = limiter.get_remaining('client-1')
print(f"Remaining: {remaining}")
time.sleep(0.5)
Flask Integration
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
limiter = TokenBucket(capacity=100, refill_rate=10)
def rate_limit(f):
@wraps(f)
def decorated_function(*args, **kwargs):
client_id = request.remote_addr
if not limiter.allow_request():
return jsonify({'error': 'Rate limit exceeded'}), 429
return f(*args, **kwargs)
return decorated_function
@app.route('/api/users', methods=['GET'])
@rate_limit
def get_users():
return jsonify({'users': []})
@app.route('/api/users', methods=['POST'])
@rate_limit
def create_user():
return jsonify({'id': 1}), 201
Best Practices
- Choose Right Algorithm: Token bucket for burst, leaky bucket for smooth
- Per-Client Limits: Track limits per client
- Distributed Systems: Use Redis for distributed rate limiting
- Graceful Degradation: Return 429 with Retry-After header
- Monitoring: Track rate limit violations
- Tiered Limits: Different limits for different clients
- Whitelist: Exempt certain clients
- Documentation: Document rate limits clearly
- Testing: Test rate limiting behavior
- Adjustment: Monitor and adjust limits based on usage
External Resources
Documentation
Tools
Conclusion
Rate limiting is essential for API protection. Choose the right algorithm based on your requirements: token bucket for flexibility, leaky bucket for smoothness, sliding window for accuracy.
Implement distributed rate limiting with Redis for scalability, and always provide clear feedback to clients when limits are exceeded.
Rate limiting protects your APIs and ensures fair resource allocation.
Comments