Backpressure is the mechanism by which a system resists accepting more work than it can handle. When downstream systems can’t keep up, backpressure propagates upstream to slow down producers, preventing cascade failures and maintaining system stability.
Understanding Backpressure
The Problem Without Backpressure
┌─────────────────────────────────────────────────────────────────┐
│ System Collapse Without Backpressure │
│ │
│ Producer ──► [Queue A] ──► [Worker 1] ──► [Worker 2] │
│ │ │ │ │
│ Fast! ▼ ▼ ▼ │
│ Queue fills up Queue fills up Queue fills │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Memory Exhaust Memory Exhaust Memory Exhaust │
│ │ │ │ │
│ └──────────────────┴──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ SYSTEM CRASH │ │
│ │ OutOfMemory │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
With Backpressure
┌─────────────────────────────────────────────────────────────────┐
│ Graceful Backpressure Handling │
│ │
│ Producer ──► [Queue A] ──► [Worker 1] ──► [Worker 2] │
│ │ │ │ │
│ Senses ▼ ▼ ▼ │
│ overload Queue fills Workers busy Workers │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ◄──────── Backpressure signal sent upstream! ──────────► │
│ │ │ │ │
│ Producer ▼ ▼ ▼ │
│ Slows down! ✓ Stable ✓ Stable ✓ Stable │
│ │
│ Results: │
│ ✓ No memory exhaustion │
│ ✓ System remains stable │
│ ✓ Graceful degradation │
└─────────────────────────────────────────────────────────────────┘
Backpressure Strategies
Drop Strategy
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import time
import asyncio
class DropPolicy(Enum):
DROP_TAIL = "drop_tail" # Drop newest
DROP_HEAD = "drop_head" # Drop oldest
DROP_RANDOM = "drop_random" # Drop random
DROP_PRIORITY = "drop_priority" # Drop lowest priority
class DropStrategy:
def __init__(self, policy: DropPolicy = DropPolicy.DROP_TAIL):
self.policy = policy
def drop(self, queue: list) -> Optional[any]:
if not queue:
return None
if self.policy == DropPolicy.DROP_TAIL:
return queue.pop()
elif self.policy == DropPolicy.DROP_HEAD:
return queue.pop(0)
elif self.policy == DropPolicy.DROP_RANDOM:
import random
index = random.randint(0, len(queue) - 1)
return queue.pop(index)
elif self.policy == DropPolicy.DROP_PRIORITY:
return self._drop_lowest_priority(queue)
return None
def _drop_lowest_priority(self, queue: list) -> any:
return min(queue, key=lambda x: x.get("priority", 0))
class BoundedQueueWithDrop:
def __init__(self, max_size: int, drop_policy: DropPolicy = DropPolicy.DROP_TAIL):
self.max_size = max_size
self.queue = []
self.drop_strategy = DropStrategy(drop_policy)
self._lock = asyncio.Lock()
self.metrics = {"enqueued": 0, "dropped": 0, "processed": 0}
async def enqueue(self, item: dict) -> bool:
async with self._lock:
if len(self.queue) >= self.max_size:
dropped = self.drop_strategy.drop(self.queue)
self.metrics["dropped"] += 1
if dropped:
await self._on_item_dropped(dropped)
self.queue.append(item)
self.metrics["enqueued"] += 1
return True
async def dequeue(self) -> Optional[dict]:
async with self._lock:
if self.queue:
item = self.queue.pop(0)
self.metrics["processed"] += 1
return item
return None
async def _on_item_dropped(self, item: dict):
pass
Block Strategy
class BlockingBackpressure:
"""Block producers when queue is full."""
def __init__(self, max_size: int, block_timeout: float = 30.0):
self.max_size = max_size
self.queue = asyncio.Queue(maxsize=max_size)
self.block_timeout = block_timeout
async def enqueue(self, item: dict) -> bool:
try:
await asyncio.wait_for(
self.queue.put(item),
timeout=self.block_timeout
)
return True
except asyncio.TimeoutError:
logger.warning(f"Backpressure: Blocked for {self.block_timeout}s")
return False
async def dequeue(self) -> Optional[dict]:
return await self.queue.get()
class TokenBucketWithBlock:
"""Token bucket with blocking when empty."""
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1, block: bool = True) -> bool:
async with self._lock:
while True:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
if not block:
return False
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait_time)
def _refill(self):
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now
Signal Strategy
class BackpressureSignal:
"""Signal backpressure status to producers."""
def __init__(self, queue: asyncio.Queue, high_watermark: float = 0.8, low_watermark: float = 0.4):
self.queue = queue
self.high_watermark = high_watermark
self.low_watermark = low_watermark
self.is_pressured = False
self._callbacks = []
def register_callback(self, callback: Callable):
self._callbacks.append(callback)
async def check(self):
fill_ratio = self.queue.qsize() / self.queue.maxsize
was_pressured = self.is_pressured
self.is_pressured = fill_ratio >= self.high_watermark
if self.is_pressured and not was_pressured:
for cb in self._callbacks:
await cb(backpressure=True)
elif not self.is_pressured and was_pressured:
for cb in self._callbacks:
await cb(backpressure=False)
def should_backoff(self) -> bool:
return self.is_pressured
class ProducerWithBackpressure:
"""Producer that responds to backpressure signals."""
def __init__(self, signal: BackpressureSignal):
self.signal = signal
self.production_rate = 100 # items per second
self.min_rate = 1
async def produce(self, item_generator):
while True:
if self.signal.should_backoff():
self.production_rate = max(
self.min_rate,
self.production_rate * 0.9 # Reduce by 10%
)
else:
self.production_rate = min(100, self.production_rate * 1.1)
delay = 1.0 / self.production_rate
await asyncio.sleep(delay)
item = next(item_generator)
yield item
Implementing Flow Control
TCP-Style Window Control
class SlidingWindowFlowControl:
"""TCP-like sliding window for flow control."""
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.in_flight = 0
self._lock = asyncio.Lock()
self._semaphore = asyncio.Semaphore(window_size)
async def send(self, item: dict) -> bool:
async with self._semaphore:
async with self._lock:
self.in_flight += 1
try:
result = await self._process(item)
return result
finally:
async with self._lock:
self.in_flight -= 1
async def _process(self, item: dict) -> bool:
await asyncio.sleep(0.01) # Simulate processing
return True
def get_window_status(self) -> dict:
return {
"window_size": self.window_size,
"in_flight": self.in_flight,
"available": self.window_size - self.in_flight,
"utilization": self.in_flight / self.window_size
}
class AdaptiveWindowController:
"""Adjusts window size based on performance."""
def __init__(self, initial_window: int = 10, min_window: int = 1, max_window: int = 100):
self.window = initial_window
self.min_window = min_window
self.max_window = max_window
self.rtt_history = []
self.error_history = []
async def record_success(self, rtt: float):
self.rtt_history.append(rtt)
if len(self.rtt_history) > 100:
self.rtt_history.pop(0)
avg_rtt = sum(self.rtt_history) / len(self.rtt_history)
if avg_rtt < 0.1: # Fast response
self.window = min(self.max_window, self.window + 1)
elif avg_rtt > 0.5: # Slow response
self.window = max(self.min_window, self.window - 1)
async def record_error(self):
self.error_history.append(time.time())
self.error_history = [t for t in self.error_history if time.time() - t < 60]
error_rate = len(self.error_history) / 60
if error_rate > 0.1: # High error rate
self.window = max(self.min_window, self.window // 2)
Producer-Consumer with Backpressure
class BackpressureProcessor:
def __init__(self, queue_size: int = 1000, num_workers: int = 4):
self.queue = asyncio.Queue(maxsize=queue_size)
self.workers = [asyncio.create_task(self._worker(i)) for i in range(num_workers)]
self.backpressure = BackpressureSignal(self.queue)
self._running = True
async def _worker(self, worker_id: int):
while self._running:
try:
item = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
await self._process_item(item)
self.queue.task_done()
except asyncio.TimeoutError:
continue
async def _process_item(self, item: dict):
await asyncio.sleep(0.01)
async def submit(self, item: dict) -> bool:
await self.backpressure.check()
if self.backpressure.should_backoff():
logger.warning("Backpressure active, rejecting item")
return False
try:
await asyncio.wait_for(
self.queue.put(item),
timeout=5.0
)
return True
except asyncio.TimeoutError:
logger.error("Queue full, could not enqueue")
return False
async def shutdown(self):
self._running = False
await asyncio.gather(*self.workers, return_exceptions=True)
HTTP Flow Control Middleware
from aiohttp import web
class FlowControlMiddleware:
def __init__(self, max_concurrent: int = 100, queue_size: int = 500):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.queue = asyncio.Queue(maxsize=queue_size)
self.active_requests = 0
self.queued_requests = 0
self.rejected_requests = 0
@web.middleware
async def handle(self, request: web.Request, handler):
async with self.semaphore:
self.active_requests += 1
try:
response = await handler(request)
return response
finally:
self.active_requests -= 1
def get_status(self) -> dict:
return {
"active": self.active_requests,
"queued": self.queued_requests,
"rejected": self.rejected_requests,
"utilization": self.active_requests / 100
}
class RateLimitFlowControl:
def __init__(self, rate: int, burst: int):
self.tokens = burst
self.rate = rate
self.max_tokens = burst
self.last_check = time.time()
self._lock = asyncio.Lock()
async def acquire(self) -> bool:
async with self._lock:
now = time.time()
elapsed = now - self.last_check
self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
self.last_check = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
@web.middleware
async def handle(self, request: web.Request, handler):
if await self.acquire():
return await handler(request)
return web.Response(
status=429,
headers={"Retry-After": "1"},
text="Rate limit exceeded"
)
Load Shedding
class LoadShedder:
"""Drop requests when system is overloaded."""
def __init__(self, cpu_threshold: float = 0.9, memory_threshold: float = 0.9):
self.cpu_threshold = cpu_threshold
self.memory_threshold = memory_threshold
self.shed_count = 0
self.total_count = 0
def should_shed(self) -> bool:
import psutil
self.total_count += 1
cpu = psutil.cpu_percent()
memory = psutil.virtual_memory().percent
if cpu > self.cpu_threshold * 100 or memory > self.memory_threshold * 100:
self.shed_count += 1
return True
return False
def get_shed_rate(self) -> float:
if self.total_count == 0:
return 0
return self.shed_count / self.total_count
class PriorityLoadShedder:
"""Shed lower priority requests first."""
PRIORITY_MAP = {
"critical": 0,
"high": 1,
"normal": 2,
"low": 3,
"batch": 4
}
def __init__(self, max_load: float = 0.8):
self.max_load = max_load
self.current_load = 0
def calculate_priority(self, request: web.Request) -> int:
priority_header = request.headers.get("X-Request-Priority", "normal")
return self.PRIORITY_MAP.get(priority_header, 2)
@web.middleware
async def handle(self, request: web.Request, handler):
import psutil
current_load = psutil.cpu_percent() / 100
if current_load > self.max_load:
priority = self.calculate_priority(request)
if priority > 1: # Shed low/normal priority
return web.Response(
status=503,
headers={"Retry-After": "30"},
text="Service overloaded"
)
return await handler(request)
class CircuitBreakerLoadShedder:
"""Open circuit when overloaded and shed all requests."""
def __init__(self, failure_threshold: int = 50, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.circuit_open = False
self.last_failure_time = None
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.circuit_open = True
def record_success(self):
self.failures = 0
self.circuit_open = False
@web.middleware
async def handle(self, request: web.Request, handler):
if self.circuit_open:
if time.time() - self.last_failure_time > self.timeout:
self.circuit_open = False
self.failures = 0
else:
return web.Response(
status=503,
text="Service temporarily unavailable"
)
try:
response = await handler(request)
self.record_success()
return response
except Exception as e:
self.record_failure()
raise
Monitoring Backpressure
class BackpressureMonitor:
def __init__(self):
self.metrics = {
"queue_depth": [],
"drop_rate": [],
"backpressure_active": [],
"response_time": []
}
async def record_queue_depth(self, depth: int, max_depth: int):
self.metrics["queue_depth"].append({
"depth": depth,
"max": max_depth,
"ratio": depth / max_depth,
"timestamp": time.time()
})
async def record_drop(self, count: int, total: int):
self.metrics["drop_rate"].append({
"dropped": count,
"total": total,
"rate": count / total if total > 0 else 0,
"timestamp": time.time()
})
async def record_response_time(self, duration: float):
self.metrics["response_time"].append({
"duration": duration,
"timestamp": time.time()
})
def get_health_status(self) -> dict:
recent_queue = self.metrics["queue_depth"][-10:]
recent_drops = self.metrics["drop_rate"][-10:]
avg_queue_ratio = sum(m["ratio"] for m in recent_queue) / len(recent_queue) if recent_queue else 0
avg_drop_rate = sum(m["rate"] for m in recent_drops) / len(recent_drops) if recent_drops else 0
if avg_drop_rate > 0.1:
status = "critical"
elif avg_queue_ratio > 0.8:
status = "degraded"
else:
status = "healthy"
return {
"status": status,
"avg_queue_ratio": avg_queue_ratio,
"avg_drop_rate": avg_drop_rate,
"backpressure_active": avg_queue_ratio > 0.7
}
Best Practices
GOOD_PATTERNS = {
"detect_early": """
# Monitor queue depth and response times early
✅ Good:
- Set alerts at 50% queue capacity
- Monitor for increasing latency
- Track error rates
❌ Bad:
- Only react when queue is full
- Wait for failures to detect overload
""",
"graceful_degradation": """
# Shed load gracefully instead of crashing
✅ Good:
- Return degraded responses
- Prioritize critical requests
- Provide useful error messages
❌ Bad:
- Accept all requests then crash
- Drop connections without response
""",
"multiple_strategies": """
# Use combination of strategies
✅ Good:
- Backpressure at application level
- Load shedding at gateway level
- Circuit breakers for downstream
❌ Bad:
- Rely on single mechanism
- No defense in depth
"""
}
BAD_PATTERNS = {
"infinite_queue": """
❌ Bad:
queue = asyncio.Queue() # No maxsize!
# Queue grows until OOM crash
✅ Good:
queue = asyncio.Queue(maxsize=10000)
# Bounded queue with backpressure
""",
"no_monitoring": """
❌ Bad:
# No visibility into backpressure
async def process():
await queue.put(item) # Could block forever!
✅ Good:
# Monitor queue depth
if queue.qsize() > queue.maxsize * 0.8:
alert("Approaching backpressure")
""",
"ignore_feedback": """
❌ Bad:
# Don't respond to backpressure signals
producer.send(item) # Always send!
✅ Good:
# Respect backpressure
if should_backoff():
await sleep(delay)
producer.send(item)
"""
}
Reactive Streams and Backpressure
The Reactive Streams specification standardizes backpressure for async data pipelines. Libraries like RxPy, Project Reactor, and Akka Streams implement this protocol:
from rx import operators as ops
from rx.subject import Subject
class ReactiveBackpressureStream:
def __init__(self, buffer_size: int = 256):
self.subject = Subject()
self.buffer_size = buffer_size
self.requested = 0
def request(self, n: int):
"""Consumer signals how many items it can handle."""
self.requested += n
async def on_next(self, item: dict):
"""Producer emits only if consumer requested."""
if self.requested <= 0:
return # Backpressure: drop or buffer
self.subject.on_next(item)
self.requested -= 1
def on_error(self, error: Exception):
self.subject.on_error(error)
def on_completed(self):
self.subject.on_completed()
# Usage: consumer controls the flow
stream = ReactiveBackpressureStream(buffer_size=100)
stream.request(5) # "I can handle 5 items"
This demand-driven model ensures producers never outpace consumers, regardless of speed differences.
Message Broker Backpressure
When using Kafka, RabbitMQ, or similar brokers, backpressure is set at the consumer level:
# Kafka consumer with backpressure via max.poll.records
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"events",
bootstrap_servers=["localhost:9092"],
max_poll_records=100, # Max records per poll
max_poll_interval_ms=300000, # 5 min processing window
fetch_max_bytes=52428800 # 50 MB per fetch
)
for message in consumer:
if await is_system_overloaded():
consumer.pause() # Stop fetching
await wait_for_recovery()
consumer.resume()
await process(message)
Key broker-level backpressure settings:
| Broker | Mechanism | Configuration |
|---|---|---|
| Kafka | max.poll.records, consumer pause/resume |
Per consumer group |
| RabbitMQ | QoS prefetch count | basic.qos(prefetch_count=100) |
| SQS | Visibility timeout, max receive count | Queue-level |
| Redis Streams | XREADGROUP COUNT, consumer group blocking |
Per consumer |
Kubernetes-Aware Backpressure
In containerized environments, backpressure should integrate with platform signals:
# Kubernetes HPA with custom metrics
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: api-service
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: api-service
minReplicas: 3
maxReplicas: 20
metrics:
- type: Pods
pods:
metric:
name: queue_depth_ratio
target:
type: AverageValue
averageValue: "0.7" # Scale at 70% queue depth
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
The application’s backpressure metrics (queue depth, drop rate, response time) should feed into autoscalers, dashboards, and alerting.
Related Articles
- Bulkhead Pattern
- Circuit Breaker Pattern
- Rate Limiting Strategies
- API Gateway Pattern
- Microservices Patterns
Conclusion
Backpressure and flow control are non-negotiable for building resilient distributed systems. The core strategies form a layered defense:
- Drop — Shed excess load using bounded queues with defined drop policies (tail, head, priority-based)
- Block — Throttle producers with blocking queues and token buckets when consumers lag
- Signal — Propagate pressure upstream so producers can adapt their rate dynamically
- Window Control — TCP-style sliding windows that adjust concurrency based on RTT and error rates
- Reactive Streams — Demand-driven data flow where consumers explicitly request capacity
Apply these principles across all layers of your stack:
| Layer | Strategy |
|---|---|
| Application | Bounded queues, semaphores, reactive streams |
| Gateway | Rate limiting, load shedding, circuit breakers |
| Message Broker | Prefetch limits, consumer pause/resume |
| Infrastructure | HPA scaling, CPU/memory monitoring |
| Client | Retry with backoff, backpressure signals |
Proper backpressure prevents the thundering-herd problem, avoids OOM crashes, and keeps systems stable under overload. Start with bounded queues, add monitoring, then layer on adaptive controls as your system grows.
Comments