Skip to main content

Backpressure and Flow Control: Managing System Load Gracefully

Created: February 28, 2026 Larry Qu 11 min read

Backpressure is the mechanism by which a system resists accepting more work than it can handle. When downstream systems can’t keep up, backpressure propagates upstream to slow down producers, preventing cascade failures and maintaining system stability.

Understanding Backpressure

The Problem Without Backpressure

┌─────────────────────────────────────────────────────────────────┐
│              System Collapse Without Backpressure                   │
│                                                                 │
│  Producer ──► [Queue A] ──► [Worker 1] ──► [Worker 2]         │
│                  │                  │                  │         │
│   Fast!          ▼                  ▼                  ▼         │
│             Queue fills up     Queue fills up    Queue fills    │
│                  │                  │                  │         │
│                  ▼                  ▼                  ▼         │
│             Memory Exhaust   Memory Exhaust    Memory Exhaust   │
│                  │                  │                  │         │
│                  └──────────────────┴──────────────────┘        │
│                                    │                            │
│                                    ▼                            │
│                           ┌─────────────────┐                   │
│                           │   SYSTEM CRASH  │                   │
│                           │   OutOfMemory   │                   │
│                           └─────────────────┘                   │
└─────────────────────────────────────────────────────────────────┘

With Backpressure

┌─────────────────────────────────────────────────────────────────┐
│              Graceful Backpressure Handling                        │
│                                                                 │
│  Producer ──► [Queue A] ──► [Worker 1] ──► [Worker 2]         │
│                  │                  │                  │         │
│   Senses        ▼                  ▼                  ▼         │
│   overload      Queue fills       Workers busy       Workers   │
│                  │                  │                  │         │
│                  ▼                  ▼                  ▼         │
│  ◄──────── Backpressure signal sent upstream! ──────────►     │
│                  │                  │                  │         │
│  Producer       ▼                  ▼                  ▼         │
│  Slows down!   ✓ Stable          ✓ Stable           ✓ Stable │
│                                                                 │
│  Results:                                                       │
│  ✓ No memory exhaustion                                         │
│  ✓ System remains stable                                        │
│  ✓ Graceful degradation                                        │
└─────────────────────────────────────────────────────────────────┘

Backpressure Strategies

Drop Strategy

from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import time
import asyncio

class DropPolicy(Enum):
    DROP_TAIL = "drop_tail"           # Drop newest
    DROP_HEAD = "drop_head"           # Drop oldest
    DROP_RANDOM = "drop_random"       # Drop random
    DROP_PRIORITY = "drop_priority"   # Drop lowest priority

class DropStrategy:
    def __init__(self, policy: DropPolicy = DropPolicy.DROP_TAIL):
        self.policy = policy
    
    def drop(self, queue: list) -> Optional[any]:
        if not queue:
            return None
        
        if self.policy == DropPolicy.DROP_TAIL:
            return queue.pop()
        
        elif self.policy == DropPolicy.DROP_HEAD:
            return queue.pop(0)
        
        elif self.policy == DropPolicy.DROP_RANDOM:
            import random
            index = random.randint(0, len(queue) - 1)
            return queue.pop(index)
        
        elif self.policy == DropPolicy.DROP_PRIORITY:
            return self._drop_lowest_priority(queue)
        
        return None
    
    def _drop_lowest_priority(self, queue: list) -> any:
        return min(queue, key=lambda x: x.get("priority", 0))


class BoundedQueueWithDrop:
    def __init__(self, max_size: int, drop_policy: DropPolicy = DropPolicy.DROP_TAIL):
        self.max_size = max_size
        self.queue = []
        self.drop_strategy = DropStrategy(drop_policy)
        self._lock = asyncio.Lock()
        self.metrics = {"enqueued": 0, "dropped": 0, "processed": 0}
    
    async def enqueue(self, item: dict) -> bool:
        async with self._lock:
            if len(self.queue) >= self.max_size:
                dropped = self.drop_strategy.drop(self.queue)
                self.metrics["dropped"] += 1
                if dropped:
                    await self._on_item_dropped(dropped)
            
            self.queue.append(item)
            self.metrics["enqueued"] += 1
            return True
    
    async def dequeue(self) -> Optional[dict]:
        async with self._lock:
            if self.queue:
                item = self.queue.pop(0)
                self.metrics["processed"] += 1
                return item
        return None
    
    async def _on_item_dropped(self, item: dict):
        pass

Block Strategy

class BlockingBackpressure:
    """Block producers when queue is full."""
    
    def __init__(self, max_size: int, block_timeout: float = 30.0):
        self.max_size = max_size
        self.queue = asyncio.Queue(maxsize=max_size)
        self.block_timeout = block_timeout
    
    async def enqueue(self, item: dict) -> bool:
        try:
            await asyncio.wait_for(
                self.queue.put(item),
                timeout=self.block_timeout
            )
            return True
        except asyncio.TimeoutError:
            logger.warning(f"Backpressure: Blocked for {self.block_timeout}s")
            return False
    
    async def dequeue(self) -> Optional[dict]:
        return await self.queue.get()


class TokenBucketWithBlock:
    """Token bucket with blocking when empty."""
    
    def __init__(self, rate: float, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1, block: bool = True) -> bool:
        async with self._lock:
            while True:
                self._refill()
                
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return True
                
                if not block:
                    return False
                
                wait_time = (tokens - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_update
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_update = now

Signal Strategy

class BackpressureSignal:
    """Signal backpressure status to producers."""
    
    def __init__(self, queue: asyncio.Queue, high_watermark: float = 0.8, low_watermark: float = 0.4):
        self.queue = queue
        self.high_watermark = high_watermark
        self.low_watermark = low_watermark
        self.is_pressured = False
        self._callbacks = []
    
    def register_callback(self, callback: Callable):
        self._callbacks.append(callback)
    
    async def check(self):
        fill_ratio = self.queue.qsize() / self.queue.maxsize
        
        was_pressured = self.is_pressured
        self.is_pressured = fill_ratio >= self.high_watermark
        
        if self.is_pressured and not was_pressured:
            for cb in self._callbacks:
                await cb(backpressure=True)
        elif not self.is_pressured and was_pressured:
            for cb in self._callbacks:
                await cb(backpressure=False)
    
    def should_backoff(self) -> bool:
        return self.is_pressured


class ProducerWithBackpressure:
    """Producer that responds to backpressure signals."""
    
    def __init__(self, signal: BackpressureSignal):
        self.signal = signal
        self.production_rate = 100  # items per second
        self.min_rate = 1
    
    async def produce(self, item_generator):
        while True:
            if self.signal.should_backoff():
                self.production_rate = max(
                    self.min_rate,
                    self.production_rate * 0.9  # Reduce by 10%
                )
            else:
                self.production_rate = min(100, self.production_rate * 1.1)
            
            delay = 1.0 / self.production_rate
            await asyncio.sleep(delay)
            
            item = next(item_generator)
            yield item

Implementing Flow Control

TCP-Style Window Control

class SlidingWindowFlowControl:
    """TCP-like sliding window for flow control."""
    
    def __init__(self, window_size: int = 10):
        self.window_size = window_size
        self.in_flight = 0
        self._lock = asyncio.Lock()
        self._semaphore = asyncio.Semaphore(window_size)
    
    async def send(self, item: dict) -> bool:
        async with self._semaphore:
            async with self._lock:
                self.in_flight += 1
            
            try:
                result = await self._process(item)
                return result
            finally:
                async with self._lock:
                    self.in_flight -= 1
    
    async def _process(self, item: dict) -> bool:
        await asyncio.sleep(0.01)  # Simulate processing
        return True
    
    def get_window_status(self) -> dict:
        return {
            "window_size": self.window_size,
            "in_flight": self.in_flight,
            "available": self.window_size - self.in_flight,
            "utilization": self.in_flight / self.window_size
        }


class AdaptiveWindowController:
    """Adjusts window size based on performance."""
    
    def __init__(self, initial_window: int = 10, min_window: int = 1, max_window: int = 100):
        self.window = initial_window
        self.min_window = min_window
        self.max_window = max_window
        self.rtt_history = []
        self.error_history = []
    
    async def record_success(self, rtt: float):
        self.rtt_history.append(rtt)
        if len(self.rtt_history) > 100:
            self.rtt_history.pop(0)
        
        avg_rtt = sum(self.rtt_history) / len(self.rtt_history)
        
        if avg_rtt < 0.1:  # Fast response
            self.window = min(self.max_window, self.window + 1)
        elif avg_rtt > 0.5:  # Slow response
            self.window = max(self.min_window, self.window - 1)
    
    async def record_error(self):
        self.error_history.append(time.time())
        self.error_history = [t for t in self.error_history if time.time() - t < 60]
        
        error_rate = len(self.error_history) / 60
        
        if error_rate > 0.1:  # High error rate
            self.window = max(self.min_window, self.window // 2)

Producer-Consumer with Backpressure

class BackpressureProcessor:
    def __init__(self, queue_size: int = 1000, num_workers: int = 4):
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.workers = [asyncio.create_task(self._worker(i)) for i in range(num_workers)]
        self.backpressure = BackpressureSignal(self.queue)
        self._running = True
    
    async def _worker(self, worker_id: int):
        while self._running:
            try:
                item = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=1.0
                )
                
                await self._process_item(item)
                self.queue.task_done()
                
            except asyncio.TimeoutError:
                continue
    
    async def _process_item(self, item: dict):
        await asyncio.sleep(0.01)
    
    async def submit(self, item: dict) -> bool:
        await self.backpressure.check()
        
        if self.backpressure.should_backoff():
            logger.warning("Backpressure active, rejecting item")
            return False
        
        try:
            await asyncio.wait_for(
                self.queue.put(item),
                timeout=5.0
            )
            return True
        except asyncio.TimeoutError:
            logger.error("Queue full, could not enqueue")
            return False
    
    async def shutdown(self):
        self._running = False
        await asyncio.gather(*self.workers, return_exceptions=True)

HTTP Flow Control Middleware

from aiohttp import web

class FlowControlMiddleware:
    def __init__(self, max_concurrent: int = 100, queue_size: int = 500):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.active_requests = 0
        self.queued_requests = 0
        self.rejected_requests = 0
    
    @web.middleware
    async def handle(self, request: web.Request, handler):
        async with self.semaphore:
            self.active_requests += 1
            
            try:
                response = await handler(request)
                return response
            finally:
                self.active_requests -= 1
    
    def get_status(self) -> dict:
        return {
            "active": self.active_requests,
            "queued": self.queued_requests,
            "rejected": self.rejected_requests,
            "utilization": self.active_requests / 100
        }


class RateLimitFlowControl:
    def __init__(self, rate: int, burst: int):
        self.tokens = burst
        self.rate = rate
        self.max_tokens = burst
        self.last_check = time.time()
        self._lock = asyncio.Lock()
    
    async def acquire(self) -> bool:
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_check
            self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
            self.last_check = now
            
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False
    
    @web.middleware
    async def handle(self, request: web.Request, handler):
        if await self.acquire():
            return await handler(request)
        
        return web.Response(
            status=429,
            headers={"Retry-After": "1"},
            text="Rate limit exceeded"
        )

Load Shedding

class LoadShedder:
    """Drop requests when system is overloaded."""
    
    def __init__(self, cpu_threshold: float = 0.9, memory_threshold: float = 0.9):
        self.cpu_threshold = cpu_threshold
        self.memory_threshold = memory_threshold
        self.shed_count = 0
        self.total_count = 0
    
    def should_shed(self) -> bool:
        import psutil
        self.total_count += 1
        
        cpu = psutil.cpu_percent()
        memory = psutil.virtual_memory().percent
        
        if cpu > self.cpu_threshold * 100 or memory > self.memory_threshold * 100:
            self.shed_count += 1
            return True
        
        return False
    
    def get_shed_rate(self) -> float:
        if self.total_count == 0:
            return 0
        return self.shed_count / self.total_count


class PriorityLoadShedder:
    """Shed lower priority requests first."""
    
    PRIORITY_MAP = {
        "critical": 0,
        "high": 1,
        "normal": 2,
        "low": 3,
        "batch": 4
    }
    
    def __init__(self, max_load: float = 0.8):
        self.max_load = max_load
        self.current_load = 0
    
    def calculate_priority(self, request: web.Request) -> int:
        priority_header = request.headers.get("X-Request-Priority", "normal")
        return self.PRIORITY_MAP.get(priority_header, 2)
    
    @web.middleware
    async def handle(self, request: web.Request, handler):
        import psutil
        current_load = psutil.cpu_percent() / 100
        
        if current_load > self.max_load:
            priority = self.calculate_priority(request)
            
            if priority > 1:  # Shed low/normal priority
                return web.Response(
                    status=503,
                    headers={"Retry-After": "30"},
                    text="Service overloaded"
                )
        
        return await handler(request)


class CircuitBreakerLoadShedder:
    """Open circuit when overloaded and shed all requests."""
    
    def __init__(self, failure_threshold: int = 50, timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.circuit_open = False
        self.last_failure_time = None
    
    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.failures >= self.failure_threshold:
            self.circuit_open = True
    
    def record_success(self):
        self.failures = 0
        self.circuit_open = False
    
    @web.middleware
    async def handle(self, request: web.Request, handler):
        if self.circuit_open:
            if time.time() - self.last_failure_time > self.timeout:
                self.circuit_open = False
                self.failures = 0
            else:
                return web.Response(
                    status=503,
                    text="Service temporarily unavailable"
                )
        
        try:
            response = await handler(request)
            self.record_success()
            return response
        except Exception as e:
            self.record_failure()
            raise

Monitoring Backpressure

class BackpressureMonitor:
    def __init__(self):
        self.metrics = {
            "queue_depth": [],
            "drop_rate": [],
            "backpressure_active": [],
            "response_time": []
        }
    
    async def record_queue_depth(self, depth: int, max_depth: int):
        self.metrics["queue_depth"].append({
            "depth": depth,
            "max": max_depth,
            "ratio": depth / max_depth,
            "timestamp": time.time()
        })
    
    async def record_drop(self, count: int, total: int):
        self.metrics["drop_rate"].append({
            "dropped": count,
            "total": total,
            "rate": count / total if total > 0 else 0,
            "timestamp": time.time()
        })
    
    async def record_response_time(self, duration: float):
        self.metrics["response_time"].append({
            "duration": duration,
            "timestamp": time.time()
        })
    
    def get_health_status(self) -> dict:
        recent_queue = self.metrics["queue_depth"][-10:]
        recent_drops = self.metrics["drop_rate"][-10:]
        
        avg_queue_ratio = sum(m["ratio"] for m in recent_queue) / len(recent_queue) if recent_queue else 0
        avg_drop_rate = sum(m["rate"] for m in recent_drops) / len(recent_drops) if recent_drops else 0
        
        if avg_drop_rate > 0.1:
            status = "critical"
        elif avg_queue_ratio > 0.8:
            status = "degraded"
        else:
            status = "healthy"
        
        return {
            "status": status,
            "avg_queue_ratio": avg_queue_ratio,
            "avg_drop_rate": avg_drop_rate,
            "backpressure_active": avg_queue_ratio > 0.7
        }

Best Practices

GOOD_PATTERNS = {
    "detect_early": """
# Monitor queue depth and response times early

✅ Good:
- Set alerts at 50% queue capacity
- Monitor for increasing latency
- Track error rates

❌ Bad:
- Only react when queue is full
- Wait for failures to detect overload
""",
    
    "graceful_degradation": """
# Shed load gracefully instead of crashing

✅ Good:
- Return degraded responses
- Prioritize critical requests
- Provide useful error messages

❌ Bad:
- Accept all requests then crash
- Drop connections without response
""",
    
    "multiple_strategies": """
# Use combination of strategies

✅ Good:
- Backpressure at application level
- Load shedding at gateway level
- Circuit breakers for downstream

❌ Bad:
- Rely on single mechanism
- No defense in depth
"""
}

BAD_PATTERNS = {
    "infinite_queue": """
❌ Bad:
queue = asyncio.Queue()  # No maxsize!

# Queue grows until OOM crash

✅ Good:
queue = asyncio.Queue(maxsize=10000)

# Bounded queue with backpressure
""",
    
    "no_monitoring": """
❌ Bad:
# No visibility into backpressure

async def process():
    await queue.put(item)  # Could block forever!

✅ Good:
# Monitor queue depth
if queue.qsize() > queue.maxsize * 0.8:
    alert("Approaching backpressure")
""",
    
    "ignore_feedback": """
❌ Bad:
# Don't respond to backpressure signals

producer.send(item)  # Always send!

✅ Good:
# Respect backpressure
if should_backoff():
    await sleep(delay)
producer.send(item)
"""
}

Reactive Streams and Backpressure

The Reactive Streams specification standardizes backpressure for async data pipelines. Libraries like RxPy, Project Reactor, and Akka Streams implement this protocol:

from rx import operators as ops
from rx.subject import Subject

class ReactiveBackpressureStream:
    def __init__(self, buffer_size: int = 256):
        self.subject = Subject()
        self.buffer_size = buffer_size
        self.requested = 0
    
    def request(self, n: int):
        """Consumer signals how many items it can handle."""
        self.requested += n
    
    async def on_next(self, item: dict):
        """Producer emits only if consumer requested."""
        if self.requested <= 0:
            return  # Backpressure: drop or buffer
        
        self.subject.on_next(item)
        self.requested -= 1
    
    def on_error(self, error: Exception):
        self.subject.on_error(error)
    
    def on_completed(self):
        self.subject.on_completed()

# Usage: consumer controls the flow
stream = ReactiveBackpressureStream(buffer_size=100)
stream.request(5)  # "I can handle 5 items"

This demand-driven model ensures producers never outpace consumers, regardless of speed differences.

Message Broker Backpressure

When using Kafka, RabbitMQ, or similar brokers, backpressure is set at the consumer level:

# Kafka consumer with backpressure via max.poll.records
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "events",
    bootstrap_servers=["localhost:9092"],
    max_poll_records=100,         # Max records per poll
    max_poll_interval_ms=300000,  # 5 min processing window
    fetch_max_bytes=52428800      # 50 MB per fetch
)

for message in consumer:
    if await is_system_overloaded():
        consumer.pause()          # Stop fetching
        await wait_for_recovery()
        consumer.resume()
    
    await process(message)

Key broker-level backpressure settings:

Broker Mechanism Configuration
Kafka max.poll.records, consumer pause/resume Per consumer group
RabbitMQ QoS prefetch count basic.qos(prefetch_count=100)
SQS Visibility timeout, max receive count Queue-level
Redis Streams XREADGROUP COUNT, consumer group blocking Per consumer

Kubernetes-Aware Backpressure

In containerized environments, backpressure should integrate with platform signals:

# Kubernetes HPA with custom metrics
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: api-service
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: api-service
  minReplicas: 3
  maxReplicas: 20
  metrics:
    - type: Pods
      pods:
        metric:
          name: queue_depth_ratio
        target:
          type: AverageValue
          averageValue: "0.7"  # Scale at 70% queue depth
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

The application’s backpressure metrics (queue depth, drop rate, response time) should feed into autoscalers, dashboards, and alerting.

Conclusion

Backpressure and flow control are non-negotiable for building resilient distributed systems. The core strategies form a layered defense:

  • Drop — Shed excess load using bounded queues with defined drop policies (tail, head, priority-based)
  • Block — Throttle producers with blocking queues and token buckets when consumers lag
  • Signal — Propagate pressure upstream so producers can adapt their rate dynamically
  • Window Control — TCP-style sliding windows that adjust concurrency based on RTT and error rates
  • Reactive Streams — Demand-driven data flow where consumers explicitly request capacity

Apply these principles across all layers of your stack:

Layer Strategy
Application Bounded queues, semaphores, reactive streams
Gateway Rate limiting, load shedding, circuit breakers
Message Broker Prefetch limits, consumer pause/resume
Infrastructure HPA scaling, CPU/memory monitoring
Client Retry with backoff, backpressure signals

Proper backpressure prevents the thundering-herd problem, avoids OOM crashes, and keeps systems stable under overload. Start with bounded queues, add monitoring, then layer on adaptive controls as your system grows.

Comments

Share this article

Scan to read on mobile

👍 Was this article helpful?