Skip to main content
โšก Calmops

Rate Limiting and Throttling: Building Resilient APIs

Introduction

Rate limiting and throttling are essential techniques for protecting APIs from abuse, preventing service degradation, and ensuring fair resource allocation among users. Whether you’re protecting against malicious attacks, preventing accidental overload, or implementing tiered access plans, understanding rate limiting is crucial for building production-ready systems.

This article covers rate limiting algorithms, implementation strategies, distributed rate limiting with Redis, and best practices for API protection.

Understanding Rate Limiting

Why Rate Limiting Matters

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Without Rate Limiting                               โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Requests                                                       โ”‚
โ”‚      โ”‚                                                         โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                               โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                            โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                           โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ  ๐Ÿ”ด Service            โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ      Degradation        โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                         โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                         โ”‚
โ”‚      โ”‚                                                         โ”‚
โ”‚      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ Time            โ”‚
โ”‚                                                                 โ”‚
โ”‚  Results:                                                       โ”‚
โ”‚  - Service unavailability                                      โ”‚
โ”‚  - Poor user experience                                        โ”‚
โ”‚  - Resource exhaustion                                         โ”‚
โ”‚  - Cost overruns                                               โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              With Rate Limiting                                  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Requests                   Limit: 100/min                      โ”‚
โ”‚      โ”‚                                                        โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                    โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                    โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                    โ”‚
โ”‚      โ”‚  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ  โœ… Stable         โ”‚
โ”‚      โ”‚          (throttled)   (throttled)                      โ”‚
โ”‚      โ”‚                                                         โ”‚
โ”‚      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ Time            โ”‚
โ”‚                                                                 โ”‚
โ”‚  Results:                                                       โ”‚
โ”‚  - Reliable service                                            โ”‚
โ”‚  - Fair resource allocation                                     โ”‚
โ”‚  - Predictable costs                                           โ”‚
โ”‚  - Better UX for legitimate users                              โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Rate Limiting vs Throttling

Aspect Rate Limiting Throttling
Purpose Limit request count Control request rate
Granularity Per time window Continuous
Response 429 Too Many Requests 429 or slow down
Use Case API protection Resource management

Rate Limiting Algorithms

1. Fixed Window

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Fixed Window Algorithm                        โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Window: 1 minute                                              โ”‚
โ”‚                                                                 โ”‚
โ”‚  Minute 1: โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ (10 requests) โœ“                        โ”‚
โ”‚  Minute 2: โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ (20 requests) โœ“         โ”‚
โ”‚  Minute 3: โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ (18 requests) โœ“           โ”‚
โ”‚  Minute 4: โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ (25) ๐Ÿ”ด Blocked    โ”‚
โ”‚                                                                 โ”‚
โ”‚  Problem: Burst at window boundaries                           โ”‚
โ”‚  Example: 10:59:55 (5 req) + 11:00:05 (5 req) = 10 in 20s    โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
from datetime import datetime, timedelta
from collections import defaultdict
import threading

class FixedWindowRateLimiter:
    """Fixed window rate limiter."""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = defaultdict(list)
        self.lock = threading.Lock()
    
    def is_allowed(self, key: str) -> bool:
        """Check if request is allowed."""
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=self.window_seconds)
        
        with self.lock:
            # Clean old requests
            self.requests[key] = [
                req_time for req_time in self.requests[key]
                if req_time > window_start
            ]
            
            # Check limit
            if len(self.requests[key]) >= self.max_requests:
                return False
            
            # Record request
            self.requests[key].append(now)
            return True
    
    def get_remaining(self, key: str) -> int:
        """Get remaining requests."""
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=self.window_seconds)
        
        with self.lock:
            current = len([
                req_time for req_time in self.requests[key]
                if req_time > window_start
            ])
            return max(0, self.max_requests - current)
    
    def get_reset_time(self, key: str) -> datetime:
        """Get window reset time."""
        now = datetime.utcnow()
        
        with self.lock:
            if not self.requests[key]:
                return now + timedelta(seconds=self.window_seconds)
            
            oldest = min(self.requests[key])
            return oldest + timedelta(seconds=self.window_seconds)

2. Sliding Window

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                   Sliding Window Algorithm                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Current time: 11:00:30                                         โ”‚
โ”‚  Window: 1 minute                                               โ”‚
โ”‚                                                                 โ”‚
โ”‚  Requests in window:                                            โ”‚
โ”‚  11:00:05, 11:00:10, 11:00:15, 11:00:20, 11:00:25            โ”‚
โ”‚                  โ†“                                               โ”‚
โ”‚  Count: 5 requests in last 60 seconds                           โ”‚
โ”‚                                                                 โ”‚
โ”‚  Next request at 11:00:30 โ†’ 6th request โ†’ Blocked              โ”‚
โ”‚                                                                 โ”‚
โ”‚  Advantage: More accurate, no boundary bursts                   โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
from datetime import datetime, timedelta
from collections import defaultdict
import threading

class SlidingWindowRateLimiter:
    """Sliding window rate limiter with log-based tracking."""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = defaultdict(list)
        self.lock = threading.Lock()
    
    def is_allowed(self, key: str) -> bool:
        """Check if request is allowed using sliding window."""
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=self.window_seconds)
        
        with self.lock:
            # Remove old requests
            self.requests[key] = [
                req_time for req_time in self.requests[key]
                if req_time > window_start
            ]
            
            # Check if allowed
            if len(self.requests[key]) >= self.max_requests:
                return False
            
            # Add new request
            self.requests[key].append(now)
            return True
    
    def get_current_count(self, key: str) -> int:
        """Get current request count in window."""
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=self.window_seconds)
        
        with self.lock:
            return len([
                req_time for req_time in self.requests[key]
                if req_time > window_start
            ])

3. Token Bucket

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                   Token Bucket Algorithm                        โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Bucket Capacity: 100 tokens                                    โ”‚
โ”‚  Refill Rate: 10 tokens/second                                  โ”‚
โ”‚                                                                 โ”‚
โ”‚  Initial:  [โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ] 100 tokens                 โ”‚
โ”‚                                                                 โ”‚
โ”‚  After 5s:  [โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ          ] 50 tokens              โ”‚
โ”‚                                                                 โ”‚
โ”‚  Request:   [โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ          ] -1 = 49 tokens โœ“       โ”‚
โ”‚  Request:   [โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ          ] -1 = 48 tokens โœ“       โ”‚
โ”‚  Request:   [โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ          ] -1 = 47 tokens โœ“       โ”‚
โ”‚  Request:   [โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ          ] -1 = 46 tokens โœ“       โ”‚
โ”‚                                                                 โ”‚
โ”‚  When bucket empty: Request blocked (429)                       โ”‚
โ”‚                                                                 โ”‚
โ”‚  Allows burst traffic while maintaining average rate          โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
from datetime import datetime, timedelta
import threading
import math

class TokenBucketRateLimiter:
    """Token bucket rate limiter for burst handling."""
    
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate  # tokens per second
        self.buckets = {}
        self.lock = threading.Lock()
    
    def _get_bucket(self, key: str) -> dict:
        """Get or create bucket for key."""
        now = datetime.utcnow()
        
        if key not in self.buckets:
            self.buckets[key] = {
                'tokens': float(self.capacity),
                'last_refill': now,
            }
            return self.buckets[key]
        
        bucket = self.buckets[key]
        
        # Refill tokens based on elapsed time
        elapsed = (now - bucket['last_refill']).total_seconds()
        tokens_to_add = elapsed * self.refill_rate
        
        bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
        bucket['last_refill'] = now
        
        return bucket
    
    def is_allowed(self, key: str, cost: int = 1) -> bool:
        """Check if request is allowed."""
        with self.lock:
            bucket = self._get_bucket(key)
            
            if bucket['tokens'] >= cost:
                bucket['tokens'] -= cost
                return True
            
            return False
    
    def wait_time(self, key: str, cost: int = 1) -> float:
        """Calculate wait time until request can be processed."""
        with self.lock:
            bucket = self._get_bucket(key)
            
            if bucket['tokens'] >= cost:
                return 0.0
            
            tokens_needed = cost - bucket['tokens']
            return tokens_needed / self.refill_rate
    
    def get_remaining(self, key: str) -> float:
        """Get remaining tokens."""
        with self.lock:
            bucket = self._get_bucket(key)
            return bucket['tokens']

4. Leaky Bucket

class LeakyBucketRateLimiter:
    """Leaky bucket algorithm for constant rate processing."""
    
    def __init__(self, capacity: int, leak_rate: float):
        self.capacity = capacity
        self.leak_rate = leak_rate  # requests per second
        self.buckets = {}
        self.lock = threading.Lock()
    
    def _get_bucket(self, key: str) -> dict:
        """Get or create bucket for key."""
        now = datetime.utcnow()
        
        if key not in self.buckets:
            self.buckets[key] = {
                'level': 0,
                'last_leak': now,
            }
            return self.buckets[key]
        
        bucket = self.buckets[key]
        
        # Leak tokens based on elapsed time
        elapsed = (now - bucket['last_leak']).total_seconds()
        leaked = elapsed * self.leak_rate
        
        bucket['level'] = max(0, bucket['level'] - leaked)
        bucket['last_leak'] = now
        
        return bucket
    
    def is_allowed(self, key: str) -> bool:
        """Check if request is allowed."""
        with self.lock:
            bucket = self._get_bucket(key)
            
            if bucket['level'] < self.capacity:
                bucket['level'] += 1
                return True
            
            return False
    
    def get_remaining(self, key: str) -> int:
        """Get remaining capacity."""
        with self.lock:
            bucket = self._get_bucket(key)
            return max(0, self.capacity - bucket['level'])

Distributed Rate Limiting with Redis

import redis
from datetime import datetime
import time

class RedisRateLimiter:
    """Distributed rate limiter using Redis."""
    
    def __init__(self, redis_url: str, key_prefix: str = "ratelimit"):
        self.redis = redis.from_url(redis_url)
        self.key_prefix = key_prefix
    
    def fixed_window(self, key: str, max_requests: int, 
                    window_seconds: int) -> dict:
        """Fixed window rate limiting with Redis."""
        window_key = f"{self.key_prefix}:{key}:{int(time.time() // window_seconds)}"
        
        # Increment counter
        current = self.redis.incr(window_key)
        
        # Set expiry on first request
        if current == 1:
            self.redis.expire(window_key, window_seconds)
        
        # Check limit
        allowed = current <= max_requests
        remaining = max(0, max_requests - current)
        reset_time = (int(time.time() // window_seconds) + 1) * window_seconds
        
        return {
            'allowed': allowed,
            'remaining': remaining,
            'reset': reset_time,
            'retry_after': max(0, window_seconds - (int(time.time()) % window_seconds))
        }
    
    def sliding_window(self, key: str, max_requests: int,
                       window_seconds: int) -> dict:
        """Sliding window rate limiting with Redis."""
        now = time.time()
        window_start = now - window_seconds
        redis_key = f"{self.key_prefix}:sliding:{key}"
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(redis_key, 0, window_start)
        
        # Count current requests
        pipe.zcard(redis_key)
        
        # Add current request
        pipe.zadd(redis_key, {str(now): now})
        
        # Set expiry
        pipe.expire(redis_key, window_seconds)
        
        results = pipe.execute()
        current_count = results[1]
        
        allowed = current_count < max_requests
        remaining = max(0, max_requests - current_count - 1)
        
        return {
            'allowed': allowed,
            'remaining': remaining,
            'reset': int(now + window_seconds),
        }
    
    def token_bucket(self, key: str, capacity: int, 
                    refill_rate: float) -> dict:
        """Token bucket with Redis."""
        bucket_key = f"{self.key_prefix}:token:{key}"
        
        # Get current state
        tokens, last_refill = self.redis.hmget(bucket_key, 'tokens', 'last_refill')
        
        now = time.time()
        
        if tokens is None:
            # Initialize bucket
            tokens = float(capacity)
            last_refill = now
        else:
            tokens = float(tokens)
            last_refill = float(last_refill)
        
        # Calculate token refill
        elapsed = now - last_refill
        tokens = min(capacity, tokens + elapsed * refill_rate)
        
        # Check if request is allowed
        allowed = tokens >= 1
        
        if allowed:
            tokens -= 1
        
        # Save state
        self.redis.hset(bucket_key, mapping={
            'tokens': tokens,
            'last_refill': now
        })
        self.redis.expire(bucket_key, 3600)  # 1 hour expiry
        
        return {
            'allowed': allowed,
            'remaining': int(tokens),
            'retry_after': 0 if allowed else (1 - tokens) / refill_rate
        }

Implementation Examples

FastAPI Rate Limiter

from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from datetime import datetime
import time

app = FastAPI()

# In-memory rate limiter
class RateLimiter:
    def __init__(self, requests: int, window: int):
        self.requests = requests
        self.window = window
        self.limiter = TokenBucketRateLimiter(requests, requests / window)
    
    async def __call__(self, request: Request):
        # Get client identifier
        client_id = self._get_client_id(request)
        
        if not self.limiter.is_allowed(client_id):
            raise HTTPException(
                status_code=429,
                detail="Too many requests",
                headers={
                    'Retry-After': str(int(self.limiter.wait_time(client_id))),
                    'X-RateLimit-Limit': str(self.requests),
                    'X-RateLimit-Remaining': '0',
                    'X-RateLimit-Reset': str(int(time.time() + self.window)),
                }
            )
        
        # Add rate limit headers to response
        response = await request._send_request()
        
        response.headers['X-RateLimit-Limit'] = str(self.requests)
        response.headers['X-RateLimit-Remaining'] = str(
            int(self.limiter.get_remaining(client_id))
        )
        
        return response
    
    def _get_client_id(self, request: Request) -> str:
        """Get client identifier from request."""
        # Try API key
        api_key = request.headers.get('X-API-Key')
        if api_key:
            return f"api_key:{api_key}"
        
        # Try JWT token
        auth = request.headers.get('Authorization')
        if auth:
            return f"auth:{auth}"
        
        # Fall back to IP
        return f"ip:{request.client.host}"


# Different limits for different endpoints
rate_limit_strict = RateLimiter(requests=10, window=60)   # 10/min
rate_limit_standard = RateLimiter(requests=100, window=60) # 100/min
rate_limit_search = RateLimiter(requests=30, window=60)    # 30/min


@app.get("/api/users")
@rate_limit_standard
async def get_users():
    return {"users": []}


@app.get("/api/search")
@rate_limit_search
async def search(query: str):
    return {"results": []}


@app.post("/api/data")
@rate_limit_strict  
async def create_data(data: dict):
    return {"id": 1}


# Custom rate limit decorator
from functools import wraps

def rate_limit(requests: int, window: int):
    """Custom rate limit decorator."""
    limiter = TokenBucketRateLimiter(requests, requests / window)
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Get key from request
            # (simplified - would need to extract from args)
            key = "default"
            
            if not limiter.is_allowed(key):
                raise HTTPException(
                    status_code=429,
                    detail="Rate limit exceeded",
                    headers={'Retry-After': str(int(limiter.wait_time(key)))}
                )
            
            return await func(*args, **kwargs)
        return wrapper
    return decorator

Express.js Rate Limiter

const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis');
const Redis = require('ioredis');

// Redis client
const redis = new Redis({
    host: 'localhost',
    port: 6379,
});

// Basic rate limiter
const limiter = rateLimit({
    windowMs: 15 * 60 * 1000, // 15 minutes
    max: 100, // limit each IP to 100 requests per windowMs
    message: 'Too many requests, please try again later',
    standardHeaders: true,
    legacyHeaders: false,
    handler: (req, res) => {
        res.status(429).json({
            error: 'Rate limit exceeded',
            retryAfter: res.getHeader('Retry-After')
        });
    }
});

// Custom limiter with Redis
const distributedLimiter = rateLimit({
    windowMs: 60 * 1000, // 1 minute
    max: 100,
    store: new RedisStore({
        prefix: 'rl:',
        sendCommand: (...args) => redis.call(...args),
    }),
    keyGenerator: (req) => {
        // Use API key if available
        return req.headers['x-api-key'] || req.ip;
    },
    skip: (req) => {
        // Skip rate limiting for health checks
        return req.path === '/health';
    }
});

// Different limits for different routes
const strictLimiter = rateLimit({
    windowMs: 60 * 1000,
    max: 10,
    message: 'Strict limit exceeded'
});

const uploadLimiter = rateLimit({
    windowMs: 60 * 1000,
    max: 5,
    message: 'Upload limit exceeded'
});

// Apply to routes
app.use('/api/', limiter);
app.use('/api/auth/login', strictLimiter);
app.use('/api/upload', uploadLimiter);

// Use with specific routes
app.get('/api/data', distributedLimiter, (req, res) => {
    res.json({ data: 'example' });
});

Rate Limiting Strategies

Tiered Rate Limiting

class TieredRateLimiter:
    """Rate limiter with different tiers."""
    
    TIERS = {
        'free': {'requests': 100, 'window': 3600},
        'basic': {'requests': 1000, 'window': 3600},
        'pro': {'requests': 10000, 'window': 3600},
        'enterprise': {'requests': 100000, 'window': 3600},
    }
    
    def __init__(self):
        self.limiters = {}
        
        for tier, config in self.TIERS.items():
            self.limiters[tier] = TokenBucketRateLimiter(
                config['requests'],
                config['requests'] / config['window']
            )
    
    def get_limiter(self, tier: str) -> TokenBucketRateLimiter:
        return self.limiters.get(tier, self.limiters['free'])
    
    def is_allowed(self, tier: str, key: str) -> bool:
        limiter = self.get_limiter(tier)
        return limiter.is_allowed(key)


class RateLimitService:
    """Service to determine user tier and apply limits."""
    
    def __init__(self, db, tiered_limiter: TieredRateLimiter):
        self.db = db
        self.limiter = tiered_limiter
    
    async def check_rate_limit(self, request) -> dict:
        """Check rate limit for request."""
        user = await self._get_user(request)
        tier = user.get('tier', 'free')
        
        limiter = self.limiter.get_limiter(tier)
        allowed = limiter.is_allowed(f"user:{user['id']}")
        
        tier_config = TieredRateLimiter.TIERS[tier]
        
        return {
            'allowed': allowed,
            'tier': tier,
            'limit': tier_config['requests'],
            'remaining': int(limiter.get_remaining(f"user:{user['id']}")),
        }
    
    async def _get_user(self, request) -> dict:
        # Get user from token
        return {'id': '123', 'tier': 'pro'}

IP-based Rate Limiting

class IPRateLimiter:
    """IP-based rate limiting with different tiers."""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def check(self, ip: str, endpoint: str) -> dict:
        """Check rate limit for IP."""
        # Different limits for different endpoints
        limits = {
            '/api/auth/login': (5, 60),      # 5 per minute
            '/api/auth/register': (3, 3600),  # 3 per hour
            '/api/search': (30, 60),          # 30 per minute
            '/api/': (100, 60),               # 100 per minute
        }
        
        # Get limit for endpoint
        limit, window = limits.get(endpoint, limits['/api/'])
        
        key = f"ratelimit:ip:{ip}:{endpoint}"
        
        # Use Redis sliding window
        current = self.redis.incr(key)
        if current == 1:
            self.redis.expire(key, window)
        
        allowed = current <= limit
        remaining = max(0, limit - current)
        
        return {
            'allowed': allowed,
            'remaining': remaining,
            'limit': limit,
            'window': window,
        }

Response Headers

def add_rate_limit_headers(response, limiter, key):
    """Add standard rate limit headers to response."""
    
    remaining = limiter.get_remaining(key)
    limit = limiter.max_requests if hasattr(limiter, 'max_requests') else 'unknown'
    reset = limiter.get_reset_time(key) if hasattr(limiter, 'get_reset_time') else None
    
    response.headers['X-RateLimit-Limit'] = str(limit)
    response.headers['X-RateLimit-Remaining'] = str(remaining)
    
    if reset:
        response.headers['X-RateLimit-Reset'] = str(int(reset.timestamp()))
    
    # If rate limited, add retry info
    if remaining == 0 and hasattr(limiter, 'wait_time'):
        wait = limiter.wait_time(key)
        response.headers['Retry-After'] = str(int(wait))
    
    return response

Best Practices

Configuration

Setting Recommendation
Limit Values Start conservative, adjust based on usage
Window Size Smaller windows = more responsive
Burst Allowance Allow some burst for UX
Headers Always include rate limit headers
Error Messages Clear, helpful error messages

Common Patterns

# Whitelist internal services
def should_skip_rate_limit(request):
    return (
        request.headers.get('X-Internal-Service') == 'true' or
        request.ip in INTERNAL_IPS
    )

# Progressive throttling
def get_rate_limit(request):
    user = get_user(request)
    
    if user.is_premium:
        return 1000, 60
    
    if user.is_verified:
        return 100, 60
    
    return 20, 60

# Graceful degradation
def rate_limit_fallback(request):
    # Log for analysis
    log_rate_limit_exceeded(request)
    
    # Return cached response if available
    cache_key = f"cached:{request.path}"
    cached = redis.get(cache_key)
    
    if cached:
        return cached
    
    raise RateLimitExceeded()

Monitoring and Analytics

import prometheus_client as prometheus

# Metrics
rate_limit_hits = prometheus.Counter(
    'rate_limit_hits_total',
    'Total rate limit hits',
    ['endpoint', 'tier', 'result']
)

rate_limit_remaining = prometheus.Gauge(
    'rate_limit_remaining',
    'Remaining requests',
    ['endpoint', 'tier']
)

def track_rate_limit(endpoint: str, tier: str, allowed: bool):
    """Track rate limit metrics."""
    rate_limit_hits.labels(
        endpoint=endpoint,
        tier=tier,
        result='allowed' if allowed else 'blocked'
    ).inc()

Conclusion

Rate limiting is essential for API protection and reliability. The choice of algorithm depends on your specific requirements: fixed window for simplicity, sliding window for accuracy, token bucket for burst handling, or leaky bucket for constant rate processing.

Key takeaways:

  • Choose the right algorithm for your use case
  • Use Redis for distributed rate limiting
  • Implement tiered limits based on user plans
  • Always include rate limit headers in responses
  • Monitor and adjust limits based on actual usage

Resources

Comments