Skip to main content
โšก Calmops

API Rate Limiting and Throttling: Complete Guide to Protecting Your Services

Introduction

Rate limiting is essential for protecting APIs from abuse, preventing service degradation, and ensuring fair usage across all clients. Without proper rate limiting, a single misbehaving client can consume all available resources, causing legitimate users to experience slowdowns or outages.

This comprehensive guide covers everything from fundamental rate limiting algorithms to distributed implementations at scale. You’ll learn when to use different strategies, how to implement them in production systems, and best practices for API gateway integration.

The goal is to build robust rate limiting that protects your services while providing a good experience for legitimate users. This means clearly communicating limits, returning helpful headers, and implementing graceful degradation when limits are exceeded.

Understanding Rate Limiting

Why Rate Limiting Matters

Rate limiting serves multiple critical purposes:

  • Prevents abuse: Stops malicious users from overwhelming your system
  • Ensures fairness: All clients get fair access to resources
  • Protects costs: Prevents unexpected billing spikes from runaway usage
  • Maintains stability: Keeps services responsive during traffic spikes
  • Enables planning: Helps you understand actual usage patterns

Rate Limiting Dimensions

Dimension Description
Requests per time Number of requests allowed in a time window
Bandwidth Data transfer limits per time period
Concurrent connections Simultaneous connections from a client
Payload size Maximum request/response size

Rate Limiting Algorithms

Token Bucket

The token bucket algorithm is widely used for its flexibility. It allows for burst traffic while maintaining a long-term rate:

import time
from threading import Lock
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class TokenBucket:
    """Token bucket rate limiter with thread-safe implementation."""
    
    def __init__(self, capacity: int, refill_rate: float):
        """
        Initialize token bucket.
        
        Args:
            capacity: Maximum number of tokens in the bucket
            refill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = float(capacity)
        self.last_refill = time.time()
        self.lock = Lock()
        self._consumed = 0
        self._rejected = 0
    
    def _refill(self) -> None:
        """Refill tokens based on elapsed time."""
        now = time.time()
        elapsed = now - self.last_refill
        
        # Add tokens based on elapsed time
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now
    
    def allow_request(self, tokens: int = 1) -> bool:
        """
        Check if request is allowed.
        
        Args:
            tokens: Number of tokens to consume
            
        Returns:
            True if request is allowed, False otherwise
        """
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                self._consumed += 1
                return True
            
            self._rejected += 1
            return False
    
    def wait_for_token(self, tokens: int = 1, timeout: Optional[float] = None) -> bool:
        """
        Block until tokens are available.
        
        Args:
            tokens: Number of tokens needed
            timeout: Maximum time to wait (None = wait forever)
            
        Returns:
            True if tokens acquired, False if timeout
        """
        start = time.time()
        
        while True:
            if self.allow_request(tokens):
                return True
            
            if timeout is not None and (time.time() - start) >= timeout:
                return False
            
            # Wait before retrying
            time.sleep(0.01)
    
    @property
    def available_tokens(self) -> float:
        """Get current available tokens."""
        with self.lock:
            self._refill()
            return self.tokens
    
    def get_stats(self) -> dict:
        """Get rate limiter statistics."""
        with self.lock:
            total = self._consumed + self._rejected
            return {
                'consumed': self._consumed,
                'rejected': self._rejected,
                'available_tokens': self.tokens,
                'rejection_rate': self._rejected / total if total > 0 else 0
            }

Leaky Bucket

The leaky bucket algorithm provides constant-rate processing, smoothing out traffic bursts:

import time
from threading import Lock
from collections import deque


class LeakyBucket:
    """Leaky bucket rate limiter - processes requests at constant rate."""
    
    def __init__(self, capacity: int, leak_rate: float):
        """
        Initialize leaky bucket.
        
        Args:
            capacity: Maximum bucket size (requests that can be queued)
            leak_rate: Requests processed per second
        """
        self.capacity = capacity
        self.leak_rate = leak_rate
        self.level = 0.0
        self.last_leak = time.time()
        self.lock = Lock()
        self.queue = deque(maxlen=capacity)
    
    def _leak(self) -> None:
        """Process (leak) requests from bucket."""
        now = time.time()
        elapsed = now - self.last_leak
        
        # Calculate how many requests have "leaked" out
        leaked = elapsed * self.leak_rate
        self.level = max(0, self.level - leaked)
        self.last_leak = now
        
        # Process queued requests
        while self.queue and self.level < self.capacity:
            self.queue.popleft()
            self.level += 1
    
    def allow_request(self) -> bool:
        """Check if request can be processed."""
        with self.lock:
            self._leak()
            
            if self.level < self.capacity:
                self.level += 1
                return True
            
            return False
    
    def get_wait_time(self) -> float:
        """Get estimated wait time for next request."""
        with self.lock:
            self._leak()
            return self.level / self.leak_rate if self.leak_rate > 0 else 0

Sliding Window

The sliding window algorithm provides smoother rate limiting with no burst at window boundaries:

import time
from collections import deque
from threading import Lock


class SlidingWindow:
    """Sliding window rate limiter - precise rate limiting."""
    
    def __init__(self, max_requests: int, window_size: float):
        """
        Initialize sliding window.
        
        Args:
            max_requests: Maximum requests allowed in window
            window_size: Window size in seconds
        """
        self.max_requests = max_requests
        self.window_size = window_size
        self.requests = deque()
        self.lock = Lock()
    
    def _clean_old_requests(self, current_time: float) -> None:
        """Remove requests outside the current window."""
        cutoff_time = current_time - self.window_size
        
        while self.requests and self.requests[0] < cutoff_time:
            self.requests.popleft()
    
    def allow_request(self) -> bool:
        """Check if request is allowed."""
        current_time = time.time()
        
        with self.lock:
            self._clean_old_requests(current_time)
            
            if len(self.requests) < self.max_requests:
                self.requests.append(current_time)
                return True
            
            return False
    
    def get_remaining(self) -> int:
        """Get remaining requests in current window."""
        current_time = time.time()
        
        with self.lock:
            self._clean_old_requests(current_time)
            return self.max_requests - len(self.requests)
    
    def get_reset_time(self) -> float:
        """Get time until window resets."""
        current_time = time.time()
        
        with self.lock:
            self._clean_old_requests(current_time)
            
            if not self.requests:
                return 0
            
            oldest = self.requests[0]
            return (oldest + self.window_size) - current_time

Sliding Log Algorithm

For more accurate rate limiting:

import time
from threading import Lock
from collections import defaultdict


class SlidingLogRateLimiter:
    """Sliding log rate limiter - most accurate but memory intensive."""
    
    def __init__(self, max_requests: int, window_size: float):
        self.max_requests = max_requests
        self.window_size = window_size
        self.logs = defaultdict(list)
        self.lock = Lock()
    
    def _clean_logs(self, client_id: str, current_time: float) -> None:
        """Remove old log entries."""
        cutoff = current_time - self.window_size
        logs = self.logs[client_id]
        
        # Remove old entries
        self.logs[client_id] = [t for t in logs if t > cutoff]
    
    def allow_request(self, client_id: str) -> bool:
        """Check if request is allowed for client."""
        current_time = time.time()
        
        with self.lock:
            self._clean_logs(client_id, current_time)
            
            if len(self.logs[client_id]) < self.max_requests:
                self.logs[client_id].append(current_time)
                return True
            
            return False
    
    def get_reset_time(self, client_id: str) -> float:
        """Get time until oldest request expires."""
        current_time = time.time()
        
        with self.lock:
            if client_id not in self.logs or not self.logs[client_id]:
                return 0
            
            oldest = min(self.logs[client_id])
            return (oldest + self.window_size) - current_time

Distributed Rate Limiting

Redis-Based Rate Limiter

import redis
import time
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class RedisRateLimiter:
    """Distributed rate limiter using Redis."""
    
    def __init__(self, redis_client: redis.Redis, key_prefix: str = 'ratelimit'):
        self.redis = redis_client
        self.key_prefix = key_prefix
    
    def token_bucket(self, key: str, capacity: int, refill_rate: float) -> bool:
        """
        Distributed token bucket using Redis.
        
        Uses Redis atomic operations for thread-safety.
        """
        now = time.time()
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        
        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1])
        local last_refill = tonumber(bucket[2])
        
        if tokens == nil then
            tokens = capacity
            last_refill = now
        end
        
        -- Refill tokens
        local elapsed = now - last_refill
        tokens = math.min(capacity, tokens + elapsed * refill_rate)
        
        -- Check if request allowed
        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, math.ceil(capacity / refill_rate))
            return 1
        else
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, math.ceil(capacity / refill_rate))
            return 0
        end
        """
        
        try:
            result = self.redis.eval(
                lua_script, 1,
                f"{self.key_prefix}:{key}",
                capacity, refill_rate, now
            )
            return bool(result)
        except redis.RedisError as e:
            logger.warning(f"Redis error, allowing request: {e}")
            return True  # Fail open on Redis errors
    
    def sliding_window(self, key: str, max_requests: int, window_seconds: int) -> bool:
        """
        Distributed sliding window rate limiter.
        """
        now = time.time()
        window_start = now - window_seconds
        redis_key = f"{self.key_prefix}:{key}"
        
        try:
            pipe = self.redis.pipeline()
            
            # Remove old entries
            pipe.zremrangebyscore(redis_key, 0, window_start)
            
            # Count current requests
            pipe.zcard(redis_key)
            
            # Add new request
            pipe.zadd(redis_key, {str(now): now})
            
            # Set expiration
            pipe.expire(redis_key, window_seconds)
            
            results = pipe.execute()
            current_count = results[1]
            
            if current_count < max_requests:
                return True
            
            # Too many requests, remove the one we just added
            self.redis.zrem(redis_key, str(now))
            return False
            
        except redis.RedisError as e:
            logger.warning(f"Redis error, allowing request: {e}")
            return True
    
    def fixed_window(self, key: str, max_requests: int, window_seconds: int) -> bool:
        """
        Simple fixed window counter using Redis INCR.
        """
        redis_key = f"{self.key_prefix}:{key}:{int(time.time() // window_seconds)}"
        
        try:
            count = self.redis.incr(redis_key)
            
            if count == 1:
                self.redis.expire(redis_key, window_seconds)
            
            return count <= max_requests
            
        except redis.RedisError as e:
            logger.warning(f"Redis error, allowing request: {e}")
            return True


class FixedWindowCounter:
    """Fixed window counter rate limiter."""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.counters = {}
        self.lock = Lock()
    
    def _get_window(self) -> int:
        """Get current window number."""
        return int(time.time() // self.window_seconds)
    
    def allow_request(self, key: str) -> bool:
        """Check if request is allowed."""
        window = self._get_window()
        
        with self.lock:
            counter_key = f"{key}:{window}"
            current = self.counters.get(counter_key, 0)
            
            if current < self.max_requests:
                self.counters[counter_key] = current + 1
                return True
            
            return False

HTTP Headers for Rate Limiting

Standard Rate Limit Headers

from dataclasses import dataclass
from typing import Optional


@dataclass
class RateLimitInfo:
    """Rate limit information for HTTP headers."""
    limit: int          # Maximum requests allowed
    remaining: int      # Remaining requests in window
    reset: int          # Unix timestamp when window resets
    retry_after: Optional[int] = None  # Seconds to wait (when limited)


def build_rate_limit_headers(info: RateLimitInfo) -> dict:
    """Build rate limit headers."""
    headers = {
        'X-RateLimit-Limit': str(info.limit),
        'X-RateLimit-Remaining': str(info.remaining),
        'X-RateLimit-Reset': str(info.reset)
    }
    
    if info.retry_after is not None:
        headers['Retry-After'] = str(info.retry_after)
    
    return headers


def build_429_response(info: RateLimitInfo) -> dict:
    """Build 429 Too Many Requests response."""
    from flask import jsonify
    
    return jsonify({
        'error': 'rate_limit_exceeded',
        'message': 'Too many requests',
        'retry_after': info.retry_after or info.reset - int(time.time())
    }), 429, {
        'X-RateLimit-Limit': str(info.limit),
        'X-RateLimit-Remaining': '0',
        'X-RateLimit-Reset': str(info.reset),
        'Retry-After': str(info.retry_after or info.reset - int(time.time()))
    }

API Gateway Integration

NGINX Rate Limiting

http {
    # Define rate limit zone
    limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
    
    # Define burst and queue
    limit_req zone=api_limit burst=20 nodelay;
    limit_req zone=api_limit burst=20 delay=10;
    
    server {
        location /api/ {
            limit_req zone=api_limit burst=10 nodelay;
            
            # Custom error page
            limit_req_status 429;
            limit_conn_status 429;
            
            proxy_pass http://backend;
        }
        
        # Premium tier - higher limits
        location /api/premium/ {
            limit_req zone=premium_limit:10m rate=100r/s burst=50;
            proxy_pass http://backend;
        }
    }
}

Kong API Gateway

# Kong rate limiting plugin
services:
  - name: my-api
    url: http://backend:8080
    plugins:
      - name: rate-limiting
        config:
          minute: 100
          hour: 1000
          policy: redis
          redis_host: redis
          redis_port: 6379
          fault_tolerant: true
          hide_client_headers: false
          
      - name: rate-limiting
        config:
          minute: 10
          hour: 100
          policy: local
          fault_tolerant: true
        consumer:  # Apply to specific consumers
          username: premium-user

Best Practices

Practice Implementation
Use standard headers X-RateLimit-* headers help clients
Implement retry logic Exponential backoff on 429
Use tiered limits Different limits for different users
Fail gracefully Don’t expose internal details
Monitor limits Track near-limit scenarios
Consider cost Rate limiting costs resources too

Client Implementation

Python Retry Logic

import time
import requests
from typing import Optional
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


class RateLimitAwareSession(requests.Session):
    """Session that handles rate limiting automatically."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # Configure retry strategy
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.mount("http://", adapter)
        self.mount("https://", adapter)
    
    def request(self, method, url, **kwargs):
        """Make request with rate limit handling."""
        response = super().request(method, url, **kwargs)
        
        # Handle 429 responses
        if response.status_code == 429:
            retry_after = int(response.headers.get('Retry-After', 60))
            reset_time = response.headers.get('X-RateLimit-Reset')
            
            if reset_time:
                wait_time = max(0, int(reset_time) - int(time.time()))
            else:
                wait_time = retry_after
            
            print(f"Rate limited. Waiting {wait_time}s...")
            time.sleep(wait_time)
            
            # Retry request
            response = super().request(method, url, **kwargs)
        
        return response

Conclusion

Rate limiting is essential for building resilient, scalable APIs. By implementing proper rate limiting strategies, you protect your services from abuse while providing fair access to legitimate users.

Key takeaways:

  1. Choose the right algorithm - Token bucket for burst tolerance, leaky bucket for smooth processing
  2. Use distributed systems - Redis-based limiting for multi-instance deployments
  3. Communicate clearly - Use standard HTTP headers to inform clients
  4. Implement retry logic - Help clients handle rate limits gracefully
  5. Monitor and tune - Adjust limits based on actual usage patterns
  6. Consider tiered access - Different limits for different customer tiers

By following these patterns and practices, you’ll build APIs that are both protected and user-friendly.

Resources

Comments