Skip to main content
โšก Calmops

API Rate Limiting: Token Bucket, Leaky Bucket, and Sliding Window Algorithms

Introduction

Rate limiting is essential for protecting APIs from abuse, preventing service degradation, and ensuring fair usage among consumers. Without rate limits, a single misbehaving client can exhaust server resources, causing denial of service for all other users.

This guide covers rate limiting algorithms, implementation patterns, and practical strategies for protecting your APIs at any scale.

Why Rate Limiting Matters

Rate limiting protects against:

  • Denial of Service: Prevents single clients from overwhelming servers
  • Resource Exhaustion: Ensures fair allocation of compute, memory, and network
  • Cost Control: Limits API call costs for metered services
  • Service Stability: Maintains consistent performance for all users

Rate Limiting Algorithms

Token Bucket Algorithm

The token bucket algorithm allows burst traffic while enforcing an average rate:

import time
import threading

class TokenBucket:
    def __init__(self, capacity, refill_rate):
        self.capacity = capacity  # Max tokens
        self.refill_rate = refill_rate  # Tokens per second
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = threading.Lock()
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now
    
    def consume(self, tokens=1):
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    def get_wait_time(self, tokens=1):
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                return 0
            
            needed = tokens - self.tokens
            return needed / self.refill_rate

Leaky Bucket Algorithm

The leaky bucket enforces a steady output rate, smoothing bursty traffic:

class LeakyBucket:
    def __init__(self, capacity, leak_rate):
        self.capacity = capacity
        self.leak_rate = leak_rate  # Requests per second
        self.level = 0
        self.last_leak = time.time()
        self.lock = threading.Lock()
    
    def _leak(self):
        now = time.time()
        elapsed = now - self.last_leak
        leaked = elapsed * self.leak_rate
        
        self.level = max(0, self.level - leaked)
        self.last_leak = now
    
    def allow_request(self):
        with self.lock:
            self._leak()
            
            if self.level < self.capacity:
                self.level += 1
                return True
            
            return False

Sliding Window Log

Tracks individual request timestamps for precise rate limiting:

from collections import deque

class SlidingWindowLog:
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()
        self.lock = threading.Lock()
    
    def allow_request(self):
        with self.lock:
            now = time.time()
            cutoff = now - self.window_seconds
            
            # Remove expired timestamps
            while self.requests and self.requests[0] < cutoff:
                self.requests.popleft()
            
            if len(self.requests) < self.max_requests:
                self.requests.append(now)
                return True
            
            return False
    
    def get_remaining(self):
        with self.lock:
            now = time.time()
            cutoff = now - self.window_seconds
            
            while self.requests and self.requests[0] < cutoff:
                self.requests.popleft()
            
            return self.max_requests - len(self.requests)

Sliding Window Counter

More memory-efficient sliding window using two fixed windows:

class SlidingWindowCounter:
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.window1 = 0
        self.window2 = 0
        self.window1_start = time.time()
        self.window2_start = time.time() - window_seconds
        self.lock = threading.Lock()
    
    def allow_request(self):
        with self.lock:
            now = time.time()
            elapsed = now - self.window1_start
            
            # Slide window if needed
            if elapsed >= self.window_seconds:
                self.window2 = self.window1
                self.window2_start = self.window1_start
                self.window1 = 0
                self.window1_start = now
                elapsed = 0
            
            # Calculate weighted count
            if elapsed > 0:
                ratio = (self.window_seconds - elapsed) / self.window_seconds
                current_count = self.window1 + int(self.window2 * ratio)
            else:
                current_count = self.window1
            
            if current_count < self.max_requests:
                self.window1 += 1
                return True
            
            return False

Redis-Based Distributed Rate Limiting

Token Bucket with Redis

import redis
import time

class RedisTokenBucket:
    def __init__(self, redis_client, key, capacity, refill_rate):
        self.redis = redis_client
        self.key = f"ratelimit:{key}"
        self.capacity = capacity
        self.refill_rate = refill_rate
    
    def consume(self, tokens=1):
        key = self.key
        
        # Lua script for atomic operation
        script = """
        local tokens_key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local tokens_needed = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])
        
        local tokens = tonumber(redis.call('GET', tokens_key) or capacity)
        local last_refill = tonumber(redis.call('GET', tokens_key .. ':time') or now)
        
        local elapsed = now - last_refill
        local new_tokens = math.min(capacity, tokens + (elapsed * refill_rate))
        
        if new_tokens >= tokens_needed then
            new_tokens = new_tokens - tokens_needed
            redis.call('SET', tokens_key, new_tokens)
            redis.call('SET', tokens_key .. ':time', now)
            return 1
        end
        
        return 0
        """
        
        result = self.redis.eval(
            script,
            1,
            self.key,
            self.capacity,
            self.refill_rate,
            tokens,
            int(time.time())
        )
        
        return result == 1

Fixed Window Counter with Redis

class RedisFixedWindow:
    def __init__(self, redis_client, key, max_requests, window_seconds):
        self.redis = redis_client
        self.key = f"ratelimit:{key}"
        self.max_requests = max_requests
        self.window_seconds = window_seconds
    
    def allow_request(self):
        key = f"{self.key}:{int(time.time() // self.window_seconds)}"
        
        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, self.window_seconds * 2)
        results = pipe.execute()
        
        return results[0] <= self.max_requests

Sliding Window with Redis Sorted Sets

class RedisSlidingWindow:
    def __init__(self, redis_client, key, max_requests, window_seconds):
        self.redis = redis_client
        self.key = f"ratelimit:{key}"
        self.max_requests = max_requests
        self.window_seconds = window_seconds
    
    def allow_request(self):
        now = time.time()
        window_start = now - self.window_seconds
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(self.key, 0, window_start)
        
        # Count current requests
        pipe.zcard(self.key)
        
        # Add current request
        pipe.zadd(self.key, {str(now): now})
        
        # Set expiry
        pipe.expire(self.key, self.window_seconds + 1)
        
        results = pipe.execute()
        current_count = results[1]
        
        if current_count < self.max_requests:
            return True
        
        # Remove the request we just added
        self.redis.zrem(self.key, str(now))
        return False

HTTP Rate Limiting Headers

Standard rate limit headers for client guidance:

from flask import Flask, request, jsonify
from functools import wraps
import time

app = Flask(__name__)

class RateLimiter:
    def __init__(self):
        self.limiters = {}
    
    def get_limiter(self, key, max_requests, window):
        if key not in self.limiters:
            self.limiters[key] = SlidingWindowLog(max_requests, window)
        return self.limiters[key]

limiter = RateLimiter()

def rate_limit(max_requests=100, window=60):
    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kwargs):
            # Get client identifier
            client_id = request.headers.get('X-Client-ID') or request.remote_addr
            key = f"{client_id}:{request.endpoint}"
            
            limiter_instance = limiter.get_limiter(key, max_requests, window)
            
            if not limiter_instance.allow_request():
                remaining = 0
                retry_after = window
            else:
                remaining = limiter_instance.get_remaining()
                retry_after = 0
            
            response = f(*args, **kwargs)
            
            # Add rate limit headers
            if hasattr(response, 'headers'):
                response.headers['X-RateLimit-Limit'] = str(max_requests)
                response.headers['X-RateLimit-Remaining'] = str(remaining)
                response.headers['X-RateLimit-Reset'] = str(int(time.time()) + window)
                
                if retry_after > 0:
                    response.headers['Retry-After'] = str(retry_after)
            
            return response
        
        return wrapped
    return decorator

@app.route('/api/data')
@rate_limit(max_requests=100, window=60)
def get_data():
    return jsonify({'data': 'example'})

Rate Limiting Strategies by Scope

Global Rate Limiting

# Rate limit across all users
@app.before_request
def global_limit():
    global_key = f"global:{request.endpoint}"
    limiter = get_limiter(global_key, max_requests=10000, window=60)
    
    if not limiter.allow_request():
        abort(429, description="Service-wide rate limit exceeded")

Per-User Rate Limiting

# Rate limit per authenticated user
@app.before_request
def user_limit():
    if not hasattr(g, 'user'):
        return
    
    user_key = f"user:{g.user.id}:{request.endpoint}"
    limiter = get_limiter(user_key, max_requests=1000, window=60)
    
    if not limiter.allow_request():
        abort(429, description="User rate limit exceeded")

Per-IP Rate Limiting

# Rate limit by IP address
@app.before_request
def ip_limit():
    ip_key = f"ip:{request.remote_addr}"
    limiter = get_limiter(ip_key, max_requests=100, window=60)
    
    if not limiter.allow_request():
        abort(429, description="IP rate limit exceeded")

Tiered Rate Limiting

# Different limits for different tiers
@app.before_request
def tiered_limit():
    user = get_current_user()
    
    limits = {
        'free': (100, 60),
        'basic': (1000, 60),
        'premium': (10000, 60),
        'enterprise': (float('inf'), 60)
    }
    
    tier = user.subscription_tier if user else 'free'
    max_requests, window = limits[tier]
    
    key = f"user:{user.id if user else request.remote_addr}"
    limiter = get_limiter(key, max_requests, window)
    
    if not limiter.allow_request():
        abort(429, description=f"Rate limit exceeded for {tier} tier")

Handling Rate Limit Exceeded

Graceful Degradation

@app.route('/api/search')
@rate_limit(max_requests=100, window=60)
def search():
    try:
        results = perform_search(request.query)
    except RateLimitExceeded:
        # Return cached results as fallback
        results = get_cached_search(request.query)
        return jsonify({
            'results': results,
            'warning': 'Using cached results - rate limit exceeded'
        })
    
    return jsonify({'results': results})

Queue-Based Rate Limiting

import asyncio
from queue import Queue

class QueueBasedRateLimiter:
    def __init__(self, rate, burst):
        self.rate = rate
        self.tokens = burst
        self.queue = Queue()
        self.last_update = time.time()
    
    async def acquire(self, timeout=None):
        while self.tokens <= 0:
            await asyncio.sleep(0.1)
            self._refill()
        
        self.tokens -= 1
        return True
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_update
        self.tokens = min(self.tokens + elapsed * self.rate, self.rate)
        self.last_update = now

API Gateway Rate Limiting

Kong Configuration

# Kong rate limiting plugin
curl -X POST http://kong:8001/services/my-service/plugins \
  -d "name=rate-limiting" \
  -d "config.minute=100" \
  -d "config.policy=redis" \
  -d "config.redis_host=redis-host" \
  -d "config.hide_client_headers=false"

NGINX Rate Limiting

http {
    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=10r/s;
    limit_req_zone $http_x_api_key zone=apilimit:10m rate=100r/s;
    
    server {
        location /api/ {
            limit_req zone=mylimit burst=20 nodelay;
            
            # API key based limiting
            limit_req zone=apilimit burst=50 nodelay;
            
            proxy_pass http://backend;
        }
    }
}

AWS API Gateway

# Create usage plan
aws apigateway create-usage-plan \
  --name "Premium Plan" \
  --api-stages [{"apiId": "api123", "stage": "prod"}] \
  --quota {"limit": 10000, "period": "DAY" \
  --throttle {"burstLimit": 100, "rateLimit": 50}

Monitoring and Metrics

from prometheus_client import Counter, Histogram, Gauge

rate_limit_hits = Counter(
    'rate_limit_hits_total',
    'Total rate limit hits',
    ['endpoint', 'limiter_type']
)

rate_limit_latency = Histogram(
    'rate_limit_check_latency_seconds',
    'Rate limit check latency'
)

current_requests = Gauge(
    'rate_limit_current_requests',
    'Current requests in window',
    ['endpoint']
)

def track_rate_limit_metrics(limiter, endpoint):
    def decorator(f):
        @wraps(f)
        async def wrapped(*args, **kwargs):
            with rate_limit_latency.time():
                allowed = limiter.allow_request()
            
            rate_limit_hits.labels(
                endpoint=endpoint,
                limiter_type=type(limiter).__name__
            ).inc()
            
            current_requests.labels(endpoint=endpoint).set(
                limiter.get_remaining()
            )
            
            if not allowed:
                raise RateLimitExceeded()
            
            return await f(*args, **kwargs)
        
        return wrapped
    return decorator

Best Practices

  1. Use Standard Headers: Include X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset
  2. Choose Algorithm Based on Use Case: Token bucket for API flexibility, leaky bucket for smooth traffic
  3. Implement Multi-Layer Limits: Global, per-user, per-IP limits together
  4. Graceful Degradation: Provide fallback responses when limits are hit
  5. Monitor and Tune: Adjust limits based on actual traffic patterns
  6. Document Limits: Make rate limits clear to API consumers
  7. Consider Cost: Rate limiting reduces infrastructure costs significantly

Conclusion

Rate limiting is essential for API security and stability. Start with simple fixed-window limits, then graduate to sliding window or token bucket for more precise control. Implement Redis-based distributed rate limiting for multi-instance deployments, and always provide clear feedback to clients through standard HTTP headers.

Comments