Backpressure is the mechanism by which a system resists accepting more work than it can handle. When downstream systems can’t keep up, backpressure propagates upstream to slow down producers, preventing cascade failures and maintaining system stability.
Understanding Backpressure
The Problem Without Backpressure
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ System Collapse Without Backpressure โ
โ โ
โ Producer โโโบ [Queue A] โโโบ [Worker 1] โโโบ [Worker 2] โ
โ โ โ โ โ
โ Fast! โผ โผ โผ โ
โ Queue fills up Queue fills up Queue fills โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ Memory Exhaust Memory Exhaust Memory Exhaust โ
โ โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ SYSTEM CRASH โ โ
โ โ OutOfMemory โ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
With Backpressure
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Graceful Backpressure Handling โ
โ โ
โ Producer โโโบ [Queue A] โโโบ [Worker 1] โโโบ [Worker 2] โ
โ โ โ โ โ
โ Senses โผ โผ โผ โ
โ overload Queue fills Workers busy Workers โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโ Backpressure signal sent upstream! โโโโโโโโโโโบ โ
โ โ โ โ โ
โ Producer โผ โผ โผ โ
โ Slows down! โ Stable โ Stable โ Stable โ
โ โ
โ Results: โ
โ โ No memory exhaustion โ
โ โ System remains stable โ
โ โ Graceful degradation โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Backpressure Strategies
Drop Strategy
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import time
import asyncio
class DropPolicy(Enum):
DROP_TAIL = "drop_tail" # Drop newest
DROP_HEAD = "drop_head" # Drop oldest
DROP_RANDOM = "drop_random" # Drop random
DROP_PRIORITY = "drop_priority" # Drop lowest priority
class DropStrategy:
def __init__(self, policy: DropPolicy = DropPolicy.DROP_TAIL):
self.policy = policy
def drop(self, queue: list) -> Optional[any]:
if not queue:
return None
if self.policy == DropPolicy.DROP_TAIL:
return queue.pop()
elif self.policy == DropPolicy.DROP_HEAD:
return queue.pop(0)
elif self.policy == DropPolicy.DROP_RANDOM:
import random
index = random.randint(0, len(queue) - 1)
return queue.pop(index)
elif self.policy == DropPolicy.DROP_PRIORITY:
return self._drop_lowest_priority(queue)
return None
def _drop_lowest_priority(self, queue: list) -> any:
return min(queue, key=lambda x: x.get("priority", 0))
class BoundedQueueWithDrop:
def __init__(self, max_size: int, drop_policy: DropPolicy = DropPolicy.DROP_TAIL):
self.max_size = max_size
self.queue = []
self.drop_strategy = DropStrategy(drop_policy)
self._lock = asyncio.Lock()
self.metrics = {"enqueued": 0, "dropped": 0, "processed": 0}
async def enqueue(self, item: dict) -> bool:
async with self._lock:
if len(self.queue) >= self.max_size:
dropped = self.drop_strategy.drop(self.queue)
self.metrics["dropped"] += 1
if dropped:
await self._on_item_dropped(dropped)
self.queue.append(item)
self.metrics["enqueued"] += 1
return True
async def dequeue(self) -> Optional[dict]:
async with self._lock:
if self.queue:
item = self.queue.pop(0)
self.metrics["processed"] += 1
return item
return None
async def _on_item_dropped(self, item: dict):
pass
Block Strategy
class BlockingBackpressure:
"""Block producers when queue is full."""
def __init__(self, max_size: int, block_timeout: float = 30.0):
self.max_size = max_size
self.queue = asyncio.Queue(maxsize=max_size)
self.block_timeout = block_timeout
async def enqueue(self, item: dict) -> bool:
try:
await asyncio.wait_for(
self.queue.put(item),
timeout=self.block_timeout
)
return True
except asyncio.TimeoutError:
logger.warning(f"Backpressure: Blocked for {self.block_timeout}s")
return False
async def dequeue(self) -> Optional[dict]:
return await self.queue.get()
class TokenBucketWithBlock:
"""Token bucket with blocking when empty."""
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1, block: bool = True) -> bool:
async with self._lock:
while True:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
if not block:
return False
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait_time)
def _refill(self):
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now
Signal Strategy
class BackpressureSignal:
"""Signal backpressure status to producers."""
def __init__(self, queue: asyncio.Queue, high_watermark: float = 0.8, low_watermark: float = 0.4):
self.queue = queue
self.high_watermark = high_watermark
self.low_watermark = low_watermark
self.is_pressured = False
self._callbacks = []
def register_callback(self, callback: Callable):
self._callbacks.append(callback)
async def check(self):
fill_ratio = self.queue.qsize() / self.queue.maxsize
was_pressured = self.is_pressured
self.is_pressured = fill_ratio >= self.high_watermark
if self.is_pressured and not was_pressured:
for cb in self._callbacks:
await cb(backpressure=True)
elif not self.is_pressured and was_pressured:
for cb in self._callbacks:
await cb(backpressure=False)
def should_backoff(self) -> bool:
return self.is_pressured
class ProducerWithBackpressure:
"""Producer that responds to backpressure signals."""
def __init__(self, signal: BackpressureSignal):
self.signal = signal
self.production_rate = 100 # items per second
self.min_rate = 1
async def produce(self, item_generator):
while True:
if self.signal.should_backoff():
self.production_rate = max(
self.min_rate,
self.production_rate * 0.9 # Reduce by 10%
)
else:
self.production_rate = min(100, self.production_rate * 1.1)
delay = 1.0 / self.production_rate
await asyncio.sleep(delay)
item = next(item_generator)
yield item
Implementing Flow Control
TCP-Style Window Control
class SlidingWindowFlowControl:
"""TCP-like sliding window for flow control."""
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.in_flight = 0
self._lock = asyncio.Lock()
self._semaphore = asyncio.Semaphore(window_size)
async def send(self, item: dict) -> bool:
async with self._semaphore:
async with self._lock:
self.in_flight += 1
try:
result = await self._process(item)
return result
finally:
async with self._lock:
self.in_flight -= 1
async def _process(self, item: dict) -> bool:
await asyncio.sleep(0.01) # Simulate processing
return True
def get_window_status(self) -> dict:
return {
"window_size": self.window_size,
"in_flight": self.in_flight,
"available": self.window_size - self.in_flight,
"utilization": self.in_flight / self.window_size
}
class AdaptiveWindowController:
"""Adjusts window size based on performance."""
def __init__(self, initial_window: int = 10, min_window: int = 1, max_window: int = 100):
self.window = initial_window
self.min_window = min_window
self.max_window = max_window
self.rtt_history = []
self.error_history = []
async def record_success(self, rtt: float):
self.rtt_history.append(rtt)
if len(self.rtt_history) > 100:
self.rtt_history.pop(0)
avg_rtt = sum(self.rtt_history) / len(self.rtt_history)
if avg_rtt < 0.1: # Fast response
self.window = min(self.max_window, self.window + 1)
elif avg_rtt > 0.5: # Slow response
self.window = max(self.min_window, self.window - 1)
async def record_error(self):
self.error_history.append(time.time())
self.error_history = [t for t in self.error_history if time.time() - t < 60]
error_rate = len(self.error_history) / 60
if error_rate > 0.1: # High error rate
self.window = max(self.min_window, self.window // 2)
Producer-Consumer with Backpressure
class BackpressureProcessor:
def __init__(self, queue_size: int = 1000, num_workers: int = 4):
self.queue = asyncio.Queue(maxsize=queue_size)
self.workers = [asyncio.create_task(self._worker(i)) for i in range(num_workers)]
self.backpressure = BackpressureSignal(self.queue)
self._running = True
async def _worker(self, worker_id: int):
while self._running:
try:
item = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
await self._process_item(item)
self.queue.task_done()
except asyncio.TimeoutError:
continue
async def _process_item(self, item: dict):
await asyncio.sleep(0.01)
async def submit(self, item: dict) -> bool:
await self.backpressure.check()
if self.backpressure.should_backoff():
logger.warning("Backpressure active, rejecting item")
return False
try:
await asyncio.wait_for(
self.queue.put(item),
timeout=5.0
)
return True
except asyncio.TimeoutError:
logger.error("Queue full, could not enqueue")
return False
async def shutdown(self):
self._running = False
await asyncio.gather(*self.workers, return_exceptions=True)
HTTP Flow Control Middleware
from aiohttp import web
class FlowControlMiddleware:
def __init__(self, max_concurrent: int = 100, queue_size: int = 500):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.queue = asyncio.Queue(maxsize=queue_size)
self.active_requests = 0
self.queued_requests = 0
self.rejected_requests = 0
@web.middleware
async def handle(self, request: web.Request, handler):
async with self.semaphore:
self.active_requests += 1
try:
response = await handler(request)
return response
finally:
self.active_requests -= 1
def get_status(self) -> dict:
return {
"active": self.active_requests,
"queued": self.queued_requests,
"rejected": self.rejected_requests,
"utilization": self.active_requests / 100
}
class RateLimitFlowControl:
def __init__(self, rate: int, burst: int):
self.tokens = burst
self.rate = rate
self.max_tokens = burst
self.last_check = time.time()
self._lock = asyncio.Lock()
async def acquire(self) -> bool:
async with self._lock:
now = time.time()
elapsed = now - self.last_check
self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
self.last_check = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
@web.middleware
async def handle(self, request: web.Request, handler):
if await self.acquire():
return await handler(request)
return web.Response(
status=429,
headers={"Retry-After": "1"},
text="Rate limit exceeded"
)
Load Shedding
class LoadShedder:
"""Drop requests when system is overloaded."""
def __init__(self, cpu_threshold: float = 0.9, memory_threshold: float = 0.9):
self.cpu_threshold = cpu_threshold
self.memory_threshold = memory_threshold
self.shed_count = 0
self.total_count = 0
def should_shed(self) -> bool:
import psutil
self.total_count += 1
cpu = psutil.cpu_percent()
memory = psutil.virtual_memory().percent
if cpu > self.cpu_threshold * 100 or memory > self.memory_threshold * 100:
self.shed_count += 1
return True
return False
def get_shed_rate(self) -> float:
if self.total_count == 0:
return 0
return self.shed_count / self.total_count
class PriorityLoadShedder:
"""Shed lower priority requests first."""
PRIORITY_MAP = {
"critical": 0,
"high": 1,
"normal": 2,
"low": 3,
"batch": 4
}
def __init__(self, max_load: float = 0.8):
self.max_load = max_load
self.current_load = 0
def calculate_priority(self, request: web.Request) -> int:
priority_header = request.headers.get("X-Request-Priority", "normal")
return self.PRIORITY_MAP.get(priority_header, 2)
@web.middleware
async def handle(self, request: web.Request, handler):
import psutil
current_load = psutil.cpu_percent() / 100
if current_load > self.max_load:
priority = self.calculate_priority(request)
if priority > 1: # Shed low/normal priority
return web.Response(
status=503,
headers={"Retry-After": "30"},
text="Service overloaded"
)
return await handler(request)
class CircuitBreakerLoadShedder:
"""Open circuit when overloaded and shed all requests."""
def __init__(self, failure_threshold: int = 50, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.circuit_open = False
self.last_failure_time = None
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.circuit_open = True
def record_success(self):
self.failures = 0
self.circuit_open = False
@web.middleware
async def handle(self, request: web.Request, handler):
if self.circuit_open:
if time.time() - self.last_failure_time > self.timeout:
self.circuit_open = False
self.failures = 0
else:
return web.Response(
status=503,
text="Service temporarily unavailable"
)
try:
response = await handler(request)
self.record_success()
return response
except Exception as e:
self.record_failure()
raise
Monitoring Backpressure
class BackpressureMonitor:
def __init__(self):
self.metrics = {
"queue_depth": [],
"drop_rate": [],
"backpressure_active": [],
"response_time": []
}
async def record_queue_depth(self, depth: int, max_depth: int):
self.metrics["queue_depth"].append({
"depth": depth,
"max": max_depth,
"ratio": depth / max_depth,
"timestamp": time.time()
})
async def record_drop(self, count: int, total: int):
self.metrics["drop_rate"].append({
"dropped": count,
"total": total,
"rate": count / total if total > 0 else 0,
"timestamp": time.time()
})
async def record_response_time(self, duration: float):
self.metrics["response_time"].append({
"duration": duration,
"timestamp": time.time()
})
def get_health_status(self) -> dict:
recent_queue = self.metrics["queue_depth"][-10:]
recent_drops = self.metrics["drop_rate"][-10:]
avg_queue_ratio = sum(m["ratio"] for m in recent_queue) / len(recent_queue) if recent_queue else 0
avg_drop_rate = sum(m["rate"] for m in recent_drops) / len(recent_drops) if recent_drops else 0
if avg_drop_rate > 0.1:
status = "critical"
elif avg_queue_ratio > 0.8:
status = "degraded"
else:
status = "healthy"
return {
"status": status,
"avg_queue_ratio": avg_queue_ratio,
"avg_drop_rate": avg_drop_rate,
"backpressure_active": avg_queue_ratio > 0.7
}
Best Practices
GOOD_PATTERNS = {
"detect_early": """
# Monitor queue depth and response times early
โ
Good:
- Set alerts at 50% queue capacity
- Monitor for increasing latency
- Track error rates
โ Bad:
- Only react when queue is full
- Wait for failures to detect overload
""",
"graceful_degradation": """
# Shed load gracefully instead of crashing
โ
Good:
- Return degraded responses
- Prioritize critical requests
- Provide useful error messages
โ Bad:
- Accept all requests then crash
- Drop connections without response
""",
"multiple_strategies": """
# Use combination of strategies
โ
Good:
- Backpressure at application level
- Load shedding at gateway level
- Circuit breakers for downstream
โ Bad:
- Rely on single mechanism
- No defense in depth
"""
}
BAD_PATTERNS = {
"infinite_queue": """
โ Bad:
queue = asyncio.Queue() # No maxsize!
# Queue grows until OOM crash
โ
Good:
queue = asyncio.Queue(maxsize=10000)
# Bounded queue with backpressure
""",
"no_monitoring": """
โ Bad:
# No visibility into backpressure
async def process():
await queue.put(item) # Could block forever!
โ
Good:
# Monitor queue depth
if queue.qsize() > queue.maxsize * 0.8:
alert("Approaching backpressure")
""",
"ignore_feedback": """
โ Bad:
# Don't respond to backpressure signals
producer.send(item) # Always send!
โ
Good:
# Respect backpressure
if should_backoff():
await sleep(delay)
producer.send(item)
"""
}
Summary
Backpressure and flow control are essential for system resilience:
- Drop Strategy - Drop requests when queue is full (newest, oldest, or random)
- Block Strategy - Block producers until consumers catch up
- Signal Strategy - Notify producers to slow down
Key principles:
- Always bound queues and pools
- Monitor for backpressure conditions
- Implement graceful degradation
- Use multiple layers of protection
- Make backpressure visible to operators
Proper backpressure handling prevents cascade failures and keeps systems stable under overload.
Comments