Skip to main content
โšก Calmops

Rate Limiting Strategies: Algorithms, Implementation, and Best Practices

Rate limiting is essential for protecting APIs from abuse, preventing DoS attacks, and ensuring fair usage. This guide covers algorithms, implementation patterns, and best practices.

Understanding Rate Limiting

Why Rate Limiting Matters

rate_limiting_purposes = {
    "prevent_abuse": "Stop automated attacks, scraping",
    "protect_backend": "Prevent overwhelming services",
    "ensure_fairness": "All users get fair access",
    "cost_control": "Limit resource consumption",
    "revenue_protection": "Enforce tier limits"
}

Rate Limit Headers

# Standard rate limit headers

headers:
  "X-RateLimit-Limit": "100"       # Requests allowed in window
  "X-RateLimit-Remaining": "95"    # Requests left in window
  "X-RateLimit-Reset": "1609459200" # Unix timestamp when window resets

# Also common:
# - Retry-After: seconds to wait (when limited)
# - RateLimit-Policy: "100;w=60"

Rate Limiting Algorithms

Fixed Window

# Fixed Window - Simple but has burst issues

class FixedWindowRateLimiter:
    """Rate limit with fixed time windows"""
    
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = {}  # {user_id: [(timestamp, count)]}
    
    def is_allowed(self, user_id):
        now = time.time()
        window_start = int(now // self.window_seconds) * self.window_seconds
        
        if user_id not in self.requests:
            self.requests[user_id] = []
        
        # Clean old entries
        self.requests[user_id] = [
            t for t in self.requests[user_id]
            if t >= window_start
        ]
        
        # Check limit
        if len(self.requests[user_id]) >= self.max_requests:
            return False, self.get_remaining(user_id)
        
        # Add request
        self.requests[user_id].append(now)
        return True, self.get_remaining(user_id)
    
    def get_remaining(self, user_id):
        if user_id not in self.requests:
            return self.max_requests
        return self.max_requests - len(self.requests[user_id])

Sliding Window Log

# Sliding Window Log - Accurate but memory intensive

class SlidingWindowLogRateLimiter:
    """Rate limit with sliding window using request timestamps"""
    
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = {}  # {user_id: [timestamp1, timestamp2, ...]}
    
    def is_allowed(self, user_id):
        now = time.time()
        window_start = now - self.window_seconds
        
        if user_id not in self.requests:
            self.requests[user_id] = []
        
        # Remove old timestamps
        self.requests[user_id] = [
            t for t in self.requests[user_id]
            if t > window_start
        ]
        
        # Check limit
        if len(self.requests[user_id]) >= self.max_requests:
            oldest = min(self.requests[user_id])
            retry_after = int(oldest + self.window_seconds - now) + 1
            return False, 0, retry_after
        
        # Add timestamp
        self.requests[user_id].append(now)
        return True, self.max_requests - len(self.requests[user_id]), 0

Sliding Window Counter

# Sliding Window Counter - Memory efficient, slightly less accurate

class SlidingWindowCounterRateLimiter:
    """Rate limit with sliding window counter"""
    
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.windows = {}  # {user_id: {window_start: count}}
    
    def is_allowed(self, user_id):
        now = time.time()
        current_window = int(now // self.window_seconds) * self.window_seconds
        previous_window = current_window - self.window_seconds
        
        if user_id not in self.windows:
            self.windows[user_id] = {}
        
        # Get counts
        current_count = self.windows[user_id].get(current_window, 0)
        previous_count = self.windows[user_id].get(previous_window, 0)
        
        # Calculate weighted count
        previous_weight = (now % self.window_seconds) / self.window_seconds
        total_weighted = current_count + (previous_count * (1 - previous_weight))
        
        if total_weighted >= self.max_requests:
            return False, 0
        
        # Increment
        self.windows[user_id][current_window] = current_count + 1
        return True, int(self.max_requests - total_weighted)

Token Bucket

# Token Bucket - Allows burst, smooth rate

class TokenBucketRateLimiter:
    """Token bucket algorithm for rate limiting"""
    
    def __init__(self, rate, capacity):
        self.rate = rate        # Tokens added per second
        self.capacity = capacity  # Max tokens
        self.buckets = {}  # {user_id: (tokens, last_update)}
    
    def is_allowed(self, user_id, tokens_needed=1):
        now = time.time()
        
        if user_id not in self.buckets:
            # New user, start with full bucket
            self.buckets[user_id] = (self.capacity, now)
        
        tokens, last_update = self.buckets[user_id]
        
        # Add tokens based on time passed
        elapsed = now - last_update
        tokens = min(self.capacity, tokens + elapsed * self.rate)
        
        # Check if enough tokens
        if tokens >= tokens_needed:
            self.buckets[user_id] = (tokens - tokens_needed, now)
            return True, int(tokens)
        
        # Not enough tokens
        tokens_needed_for_request = tokens_needed
        wait_time = (tokens_needed_for_request - tokens) / self.rate
        return False, 0, int(wait_time) + 1
    
    def get_wait_time(self, user_id, tokens_needed=1):
        allowed, current_tokens, wait = self.is_allowed(user_id, tokens_needed)
        return wait

Leaky Bucket

# Leaky Bucket - Smooths output, constant rate

class LeakyBucketRateLimiter:
    """Leaky bucket algorithm for rate limiting"""
    
    def __init__(self, rate, capacity):
        self.rate = rate        # Requests processed per second
        self.capacity = capacity  # Max queue size
        self.buckets = {}  # {user_id: (current_level, last_update)}
    
    def is_allowed(self, user_id):
        now = time.time()
        
        if user_id not in self.buckets:
            self.buckets[user_id] = (0, now)
        
        level, last_update = self.buckets[user_id]
        
        # Leak tokens
        elapsed = now - last_update
        leaked = elapsed * self.rate
        level = max(0, level - leaked)
        
        if level >= self.capacity:
            # Bucket full
            wait_time = (level + 1 - self.capacity) / self.rate
            return False, 0, int(wait_time) + 1
        
        # Add request
        self.buckets[user_id] = (level + 1, now)
        return True, self.capacity - int(level), 0

Distributed Rate Limiting

Redis-Based Rate Limiter

import redis
import time

class RedisRateLimiter:
    """Distributed rate limiter using Redis"""
    
    def __init__(self, redis_client, key_prefix, max_requests, window_seconds):
        self.redis = redis_client
        self.key_prefix = key_prefix
        self.max_requests = max_requests
        self.window_seconds = window_seconds
    
    def is_allowed(self, user_id):
        key = f"{self.key_prefix}:{user_id}"
        
        # Use Redis INCR with expiry
        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, self.window_seconds)
        results = pipe.execute()
        
        current_count = results[0]
        
        if current_count > self.max_requests:
            ttl = self.redis.ttl(key)
            return False, 0, ttl
        
        return True, self.max_requests - current_count, 0
    
    def is_allowed_sliding(self, user_id):
        """Sliding window using sorted set"""
        key = f"{self.key_prefix}:sliding:{user_id}"
        now = time.time()
        window_start = now - self.window_seconds
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)
        
        # Add new entry
        pipe.zadd(key, {str(now): now})
        
        # Count
        pipe.zcard(key)
        
        # Set expiry
        pipe.expire(key, self.window_seconds)
        
        results = pipe.execute()
        current_count = results[2]
        
        if current_count > self.max_requests:
            # Remove the entry we just added
            self.redis.zrem(key, str(now))
            return False, 0, self.window_seconds
        
        return True, self.max_requests - current_count, 0

Rate Limiting with Redis + Lua

-- rate_limiter.lua

local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

-- Sliding window using sorted set
local now = tonumber(ARGV[3])
local oldest = now - window

-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, oldest)

-- Count current requests
local current = redis.call('ZCARD', key)

if current < limit then
    -- Add new request
    redis.call('ZADD', key, now, now .. math.random())
    redis.call('EXPIRE', key, window)
    return {1, limit - current - 1}
else
    return {0, 0}
end

Implementation Examples

Express.js Rate Limiting

const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis');
const Redis = require('ioredis');

// Redis store for distributed rate limiting
const redisClient = new Redis({
    host: 'localhost',
    port: 6379
});

const limiter = rateLimit({
    windowMs: 15 * 60 * 1000, // 15 minutes
    max: 100, // limit each IP to 100 requests per window
    standardHeaders: true,
    legacyHeaders: false,
    store: new RedisStore({
        sendCommand: (...args) => redisClient.call(...args),
    }),
    keyGenerator: (req) => {
        return req.user?.id || req.ip; // Use user ID if logged in
    },
    skip: (req) => {
        return req.path === '/health'; // Skip rate limit for health
    },
    handler: (req, res) => {
        res.status(429).json({
            error: 'Too many requests',
            retryAfter: res.getHeader('Retry-After')
        });
    }
});

// Apply to routes
app.use('/api/', limiter);
app.use('/api/auth', rateLimit({ windowMs: 60 * 1000, max: 5 })); // Stricter for auth

Python FastAPI Rate Limiting

from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()

@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    return JSONResponse(
        status_code=429,
        content={"detail": "Rate limit exceeded", "retry_after": str(exc.detail)}
    )

@app.get("/api/resource")
@limiter.limit("100/minute")
async def get_resource(request: Request):
    return {"data": "content"}

# Different limits for different endpoints
auth_limiter = Limiter(key_func=get_remote_address)

@app.post("/api/auth/login")
@auth_limiter.limit("5/minute")
async def login(request: Request):
    # Login logic
    pass

Rate Limiting Strategies

Tiered Rate Limiting

class TieredRateLimiter:
    """Different limits for different user tiers"""
    
    def __init__(self, limits_by_tier):
        # limits_by_tier = {'free': 100, 'premium': 1000, 'enterprise': 10000}
        self.limiters = {}
        
        for tier, limit in limits_by_tier.items():
            self.limiters[tier] = TokenBucketRateLimiter(
                rate=limit / 60,  # per second
                capacity=limit
            )
    
    def is_allowed(self, user):
        tier = user.get_tier()
        limiter = self.limiters.get(tier, self.limiters['free'])
        return limiter.is_allowed(user.id)

IP + User Rate Limiting

class CompositeRateLimiter:
    """Combine IP and user-based rate limiting"""
    
    def __init__(self, ip_limiter, user_limiter):
        self.ip_limiter = ip_limiter
        self.user_limiter = user_limiter
    
    def is_allowed(self, request):
        ip = get_client_ip(request)
        user = request.user
        
        # Check IP limit first (faster fail)
        ip_allowed, ip_remaining = self.ip_limiter.is_allowed(ip)
        
        if not ip_allowed:
            return False, "IP rate limit exceeded", ip_remaining
        
        # Then check user limit
        if user:
            user_allowed, user_remaining = self.user_limiter.is_allowed(user.id)
            
            if not user_allowed:
                return False, "User rate limit exceeded", user_remaining
        
        return True, "OK", min(ip_remaining, user_remaining or 999)

Best Practices

# Rate limiting best practices

design:
  - "Use Redis for distributed systems"
  - "Set appropriate limits per endpoint"
  - "Return informative headers"
  - "Use tiered limits for different users"
  - "Implement circuit breaker for downstream failures"

headers:
  - "X-RateLimit-Limit"
  - "X-RateLimit-Remaining"
  - "X-RateLimit-Reset"

monitoring:
  - "Track rate limit hits"
  - "Alert on high rejection rates"
  - "Monitor latency impact"

responses:
  - "429 Too Many Requests"
  - "Retry-After header"
  - "Clear error message"

Conclusion

Rate limiting protects your APIs:

  • Algorithms: Token bucket for burst, sliding window for accuracy
  • Distributed: Use Redis for multi-instance deployments
  • Tiered: Different limits for different user tiers
  • Monitor: Track and alert on rate limit metrics

Choose the algorithm based on your needs: simplicity (fixed window), accuracy (sliding window), or burst handling (token bucket).


Comments