Introduction
Rate limiting is essential for protecting APIs from abuse, preventing service degradation, and ensuring fair usage across all clients. Without proper rate limiting, a single misbehaving client can consume all available resources, causing legitimate users to experience slowdowns or outages.
This comprehensive guide covers everything from fundamental rate limiting algorithms to distributed implementations at scale. You’ll learn when to use different strategies, how to implement them in production systems, and best practices for API gateway integration.
The goal is to build robust rate limiting that protects your services while providing a good experience for legitimate users. This means clearly communicating limits, returning helpful headers, and implementing graceful degradation when limits are exceeded.
Understanding Rate Limiting
Why Rate Limiting Matters
Rate limiting serves multiple critical purposes:
- Prevents abuse: Stops malicious users from overwhelming your system
- Ensures fairness: All clients get fair access to resources
- Protects costs: Prevents unexpected billing spikes from runaway usage
- Maintains stability: Keeps services responsive during traffic spikes
- Enables planning: Helps you understand actual usage patterns
Rate Limiting Dimensions
| Dimension | Description |
|---|---|
| Requests per time | Number of requests allowed in a time window |
| Bandwidth | Data transfer limits per time period |
| Concurrent connections | Simultaneous connections from a client |
| Payload size | Maximum request/response size |
Rate Limiting Algorithms
Token Bucket
The token bucket algorithm is widely used for its flexibility. It allows for burst traffic while maintaining a long-term rate:
import time
from threading import Lock
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class TokenBucket:
"""Token bucket rate limiter with thread-safe implementation."""
def __init__(self, capacity: int, refill_rate: float):
"""
Initialize token bucket.
Args:
capacity: Maximum number of tokens in the bucket
refill_rate: Tokens added per second
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = float(capacity)
self.last_refill = time.time()
self.lock = Lock()
self._consumed = 0
self._rejected = 0
def _refill(self) -> None:
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_refill
# Add tokens based on elapsed time
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
def allow_request(self, tokens: int = 1) -> bool:
"""
Check if request is allowed.
Args:
tokens: Number of tokens to consume
Returns:
True if request is allowed, False otherwise
"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
self._consumed += 1
return True
self._rejected += 1
return False
def wait_for_token(self, tokens: int = 1, timeout: Optional[float] = None) -> bool:
"""
Block until tokens are available.
Args:
tokens: Number of tokens needed
timeout: Maximum time to wait (None = wait forever)
Returns:
True if tokens acquired, False if timeout
"""
start = time.time()
while True:
if self.allow_request(tokens):
return True
if timeout is not None and (time.time() - start) >= timeout:
return False
# Wait before retrying
time.sleep(0.01)
@property
def available_tokens(self) -> float:
"""Get current available tokens."""
with self.lock:
self._refill()
return self.tokens
def get_stats(self) -> dict:
"""Get rate limiter statistics."""
with self.lock:
total = self._consumed + self._rejected
return {
'consumed': self._consumed,
'rejected': self._rejected,
'available_tokens': self.tokens,
'rejection_rate': self._rejected / total if total > 0 else 0
}
Leaky Bucket
The leaky bucket algorithm provides constant-rate processing, smoothing out traffic bursts:
import time
from threading import Lock
from collections import deque
class LeakyBucket:
"""Leaky bucket rate limiter - processes requests at constant rate."""
def __init__(self, capacity: int, leak_rate: float):
"""
Initialize leaky bucket.
Args:
capacity: Maximum bucket size (requests that can be queued)
leak_rate: Requests processed per second
"""
self.capacity = capacity
self.leak_rate = leak_rate
self.level = 0.0
self.last_leak = time.time()
self.lock = Lock()
self.queue = deque(maxlen=capacity)
def _leak(self) -> None:
"""Process (leak) requests from bucket."""
now = time.time()
elapsed = now - self.last_leak
# Calculate how many requests have "leaked" out
leaked = elapsed * self.leak_rate
self.level = max(0, self.level - leaked)
self.last_leak = now
# Process queued requests
while self.queue and self.level < self.capacity:
self.queue.popleft()
self.level += 1
def allow_request(self) -> bool:
"""Check if request can be processed."""
with self.lock:
self._leak()
if self.level < self.capacity:
self.level += 1
return True
return False
def get_wait_time(self) -> float:
"""Get estimated wait time for next request."""
with self.lock:
self._leak()
return self.level / self.leak_rate if self.leak_rate > 0 else 0
Sliding Window
The sliding window algorithm provides smoother rate limiting with no burst at window boundaries:
import time
from collections import deque
from threading import Lock
class SlidingWindow:
"""Sliding window rate limiter - precise rate limiting."""
def __init__(self, max_requests: int, window_size: float):
"""
Initialize sliding window.
Args:
max_requests: Maximum requests allowed in window
window_size: Window size in seconds
"""
self.max_requests = max_requests
self.window_size = window_size
self.requests = deque()
self.lock = Lock()
def _clean_old_requests(self, current_time: float) -> None:
"""Remove requests outside the current window."""
cutoff_time = current_time - self.window_size
while self.requests and self.requests[0] < cutoff_time:
self.requests.popleft()
def allow_request(self) -> bool:
"""Check if request is allowed."""
current_time = time.time()
with self.lock:
self._clean_old_requests(current_time)
if len(self.requests) < self.max_requests:
self.requests.append(current_time)
return True
return False
def get_remaining(self) -> int:
"""Get remaining requests in current window."""
current_time = time.time()
with self.lock:
self._clean_old_requests(current_time)
return self.max_requests - len(self.requests)
def get_reset_time(self) -> float:
"""Get time until window resets."""
current_time = time.time()
with self.lock:
self._clean_old_requests(current_time)
if not self.requests:
return 0
oldest = self.requests[0]
return (oldest + self.window_size) - current_time
Sliding Log Algorithm
For more accurate rate limiting:
import time
from threading import Lock
from collections import defaultdict
class SlidingLogRateLimiter:
"""Sliding log rate limiter - most accurate but memory intensive."""
def __init__(self, max_requests: int, window_size: float):
self.max_requests = max_requests
self.window_size = window_size
self.logs = defaultdict(list)
self.lock = Lock()
def _clean_logs(self, client_id: str, current_time: float) -> None:
"""Remove old log entries."""
cutoff = current_time - self.window_size
logs = self.logs[client_id]
# Remove old entries
self.logs[client_id] = [t for t in logs if t > cutoff]
def allow_request(self, client_id: str) -> bool:
"""Check if request is allowed for client."""
current_time = time.time()
with self.lock:
self._clean_logs(client_id, current_time)
if len(self.logs[client_id]) < self.max_requests:
self.logs[client_id].append(current_time)
return True
return False
def get_reset_time(self, client_id: str) -> float:
"""Get time until oldest request expires."""
current_time = time.time()
with self.lock:
if client_id not in self.logs or not self.logs[client_id]:
return 0
oldest = min(self.logs[client_id])
return (oldest + self.window_size) - current_time
Distributed Rate Limiting
Redis-Based Rate Limiter
import redis
import time
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class RedisRateLimiter:
"""Distributed rate limiter using Redis."""
def __init__(self, redis_client: redis.Redis, key_prefix: str = 'ratelimit'):
self.redis = redis_client
self.key_prefix = key_prefix
def token_bucket(self, key: str, capacity: int, refill_rate: float) -> bool:
"""
Distributed token bucket using Redis.
Uses Redis atomic operations for thread-safety.
"""
now = time.time()
lua_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1])
local last_refill = tonumber(bucket[2])
if tokens == nil then
tokens = capacity
last_refill = now
end
-- Refill tokens
local elapsed = now - last_refill
tokens = math.min(capacity, tokens + elapsed * refill_rate)
-- Check if request allowed
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate))
return 1
else
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate))
return 0
end
"""
try:
result = self.redis.eval(
lua_script, 1,
f"{self.key_prefix}:{key}",
capacity, refill_rate, now
)
return bool(result)
except redis.RedisError as e:
logger.warning(f"Redis error, allowing request: {e}")
return True # Fail open on Redis errors
def sliding_window(self, key: str, max_requests: int, window_seconds: int) -> bool:
"""
Distributed sliding window rate limiter.
"""
now = time.time()
window_start = now - window_seconds
redis_key = f"{self.key_prefix}:{key}"
try:
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(redis_key, 0, window_start)
# Count current requests
pipe.zcard(redis_key)
# Add new request
pipe.zadd(redis_key, {str(now): now})
# Set expiration
pipe.expire(redis_key, window_seconds)
results = pipe.execute()
current_count = results[1]
if current_count < max_requests:
return True
# Too many requests, remove the one we just added
self.redis.zrem(redis_key, str(now))
return False
except redis.RedisError as e:
logger.warning(f"Redis error, allowing request: {e}")
return True
def fixed_window(self, key: str, max_requests: int, window_seconds: int) -> bool:
"""
Simple fixed window counter using Redis INCR.
"""
redis_key = f"{self.key_prefix}:{key}:{int(time.time() // window_seconds)}"
try:
count = self.redis.incr(redis_key)
if count == 1:
self.redis.expire(redis_key, window_seconds)
return count <= max_requests
except redis.RedisError as e:
logger.warning(f"Redis error, allowing request: {e}")
return True
class FixedWindowCounter:
"""Fixed window counter rate limiter."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.counters = {}
self.lock = Lock()
def _get_window(self) -> int:
"""Get current window number."""
return int(time.time() // self.window_seconds)
def allow_request(self, key: str) -> bool:
"""Check if request is allowed."""
window = self._get_window()
with self.lock:
counter_key = f"{key}:{window}"
current = self.counters.get(counter_key, 0)
if current < self.max_requests:
self.counters[counter_key] = current + 1
return True
return False
HTTP Headers for Rate Limiting
Standard Rate Limit Headers
from dataclasses import dataclass
from typing import Optional
@dataclass
class RateLimitInfo:
"""Rate limit information for HTTP headers."""
limit: int # Maximum requests allowed
remaining: int # Remaining requests in window
reset: int # Unix timestamp when window resets
retry_after: Optional[int] = None # Seconds to wait (when limited)
def build_rate_limit_headers(info: RateLimitInfo) -> dict:
"""Build rate limit headers."""
headers = {
'X-RateLimit-Limit': str(info.limit),
'X-RateLimit-Remaining': str(info.remaining),
'X-RateLimit-Reset': str(info.reset)
}
if info.retry_after is not None:
headers['Retry-After'] = str(info.retry_after)
return headers
def build_429_response(info: RateLimitInfo) -> dict:
"""Build 429 Too Many Requests response."""
from flask import jsonify
return jsonify({
'error': 'rate_limit_exceeded',
'message': 'Too many requests',
'retry_after': info.retry_after or info.reset - int(time.time())
}), 429, {
'X-RateLimit-Limit': str(info.limit),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': str(info.reset),
'Retry-After': str(info.retry_after or info.reset - int(time.time()))
}
API Gateway Integration
NGINX Rate Limiting
http {
# Define rate limit zone
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
# Define burst and queue
limit_req zone=api_limit burst=20 nodelay;
limit_req zone=api_limit burst=20 delay=10;
server {
location /api/ {
limit_req zone=api_limit burst=10 nodelay;
# Custom error page
limit_req_status 429;
limit_conn_status 429;
proxy_pass http://backend;
}
# Premium tier - higher limits
location /api/premium/ {
limit_req zone=premium_limit:10m rate=100r/s burst=50;
proxy_pass http://backend;
}
}
}
Kong API Gateway
# Kong rate limiting plugin
services:
- name: my-api
url: http://backend:8080
plugins:
- name: rate-limiting
config:
minute: 100
hour: 1000
policy: redis
redis_host: redis
redis_port: 6379
fault_tolerant: true
hide_client_headers: false
- name: rate-limiting
config:
minute: 10
hour: 100
policy: local
fault_tolerant: true
consumer: # Apply to specific consumers
username: premium-user
Best Practices
| Practice | Implementation |
|---|---|
| Use standard headers | X-RateLimit-* headers help clients |
| Implement retry logic | Exponential backoff on 429 |
| Use tiered limits | Different limits for different users |
| Fail gracefully | Don’t expose internal details |
| Monitor limits | Track near-limit scenarios |
| Consider cost | Rate limiting costs resources too |
Client Implementation
Python Retry Logic
import time
import requests
from typing import Optional
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class RateLimitAwareSession(requests.Session):
"""Session that handles rate limiting automatically."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.mount("http://", adapter)
self.mount("https://", adapter)
def request(self, method, url, **kwargs):
"""Make request with rate limit handling."""
response = super().request(method, url, **kwargs)
# Handle 429 responses
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
reset_time = response.headers.get('X-RateLimit-Reset')
if reset_time:
wait_time = max(0, int(reset_time) - int(time.time()))
else:
wait_time = retry_after
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
# Retry request
response = super().request(method, url, **kwargs)
return response
Conclusion
Rate limiting is essential for building resilient, scalable APIs. By implementing proper rate limiting strategies, you protect your services from abuse while providing fair access to legitimate users.
Key takeaways:
- Choose the right algorithm - Token bucket for burst tolerance, leaky bucket for smooth processing
- Use distributed systems - Redis-based limiting for multi-instance deployments
- Communicate clearly - Use standard HTTP headers to inform clients
- Implement retry logic - Help clients handle rate limits gracefully
- Monitor and tune - Adjust limits based on actual usage patterns
- Consider tiered access - Different limits for different customer tiers
By following these patterns and practices, you’ll build APIs that are both protected and user-friendly.
Resources
- RFC 6585: Additional HTTP Status Codes
- MDN: HTTP Rate Limiting
- Stripe API Rate Limits
- Kong Rate Limiting Plugin
- NGINX Rate Limiting
- Redis Rate Limiting Patterns
Comments