Skip to main content
โšก Calmops

Backpressure and Flow Control: Managing System Load Gracefully

Backpressure is the mechanism by which a system resists accepting more work than it can handle. When downstream systems can’t keep up, backpressure propagates upstream to slow down producers, preventing cascade failures and maintaining system stability.

Understanding Backpressure

The Problem Without Backpressure

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              System Collapse Without Backpressure                   โ”‚
โ”‚                                                                 โ”‚
โ”‚  Producer โ”€โ”€โ–บ [Queue A] โ”€โ”€โ–บ [Worker 1] โ”€โ”€โ–บ [Worker 2]         โ”‚
โ”‚                  โ”‚                  โ”‚                  โ”‚         โ”‚
โ”‚   Fast!          โ–ผ                  โ–ผ                  โ–ผ         โ”‚
โ”‚             Queue fills up     Queue fills up    Queue fills    โ”‚
โ”‚                  โ”‚                  โ”‚                  โ”‚         โ”‚
โ”‚                  โ–ผ                  โ–ผ                  โ–ผ         โ”‚
โ”‚             Memory Exhaust   Memory Exhaust    Memory Exhaust   โ”‚
โ”‚                  โ”‚                  โ”‚                  โ”‚         โ”‚
โ”‚                  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜        โ”‚
โ”‚                                    โ”‚                            โ”‚
โ”‚                                    โ–ผ                            โ”‚
โ”‚                           โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                   โ”‚
โ”‚                           โ”‚   SYSTEM CRASH  โ”‚                   โ”‚
โ”‚                           โ”‚   OutOfMemory   โ”‚                   โ”‚
โ”‚                           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

With Backpressure

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Graceful Backpressure Handling                        โ”‚
โ”‚                                                                 โ”‚
โ”‚  Producer โ”€โ”€โ–บ [Queue A] โ”€โ”€โ–บ [Worker 1] โ”€โ”€โ–บ [Worker 2]         โ”‚
โ”‚                  โ”‚                  โ”‚                  โ”‚         โ”‚
โ”‚   Senses        โ–ผ                  โ–ผ                  โ–ผ         โ”‚
โ”‚   overload      Queue fills       Workers busy       Workers   โ”‚
โ”‚                  โ”‚                  โ”‚                  โ”‚         โ”‚
โ”‚                  โ–ผ                  โ–ผ                  โ–ผ         โ”‚
โ”‚  โ—„โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Backpressure signal sent upstream! โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–บ     โ”‚
โ”‚                  โ”‚                  โ”‚                  โ”‚         โ”‚
โ”‚  Producer       โ–ผ                  โ–ผ                  โ–ผ         โ”‚
โ”‚  Slows down!   โœ“ Stable          โœ“ Stable           โœ“ Stable โ”‚
โ”‚                                                                 โ”‚
โ”‚  Results:                                                       โ”‚
โ”‚  โœ“ No memory exhaustion                                         โ”‚
โ”‚  โœ“ System remains stable                                        โ”‚
โ”‚  โœ“ Graceful degradation                                        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Backpressure Strategies

Drop Strategy

from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import time
import asyncio

class DropPolicy(Enum):
    DROP_TAIL = "drop_tail"           # Drop newest
    DROP_HEAD = "drop_head"           # Drop oldest
    DROP_RANDOM = "drop_random"       # Drop random
    DROP_PRIORITY = "drop_priority"   # Drop lowest priority

class DropStrategy:
    def __init__(self, policy: DropPolicy = DropPolicy.DROP_TAIL):
        self.policy = policy
    
    def drop(self, queue: list) -> Optional[any]:
        if not queue:
            return None
        
        if self.policy == DropPolicy.DROP_TAIL:
            return queue.pop()
        
        elif self.policy == DropPolicy.DROP_HEAD:
            return queue.pop(0)
        
        elif self.policy == DropPolicy.DROP_RANDOM:
            import random
            index = random.randint(0, len(queue) - 1)
            return queue.pop(index)
        
        elif self.policy == DropPolicy.DROP_PRIORITY:
            return self._drop_lowest_priority(queue)
        
        return None
    
    def _drop_lowest_priority(self, queue: list) -> any:
        return min(queue, key=lambda x: x.get("priority", 0))


class BoundedQueueWithDrop:
    def __init__(self, max_size: int, drop_policy: DropPolicy = DropPolicy.DROP_TAIL):
        self.max_size = max_size
        self.queue = []
        self.drop_strategy = DropStrategy(drop_policy)
        self._lock = asyncio.Lock()
        self.metrics = {"enqueued": 0, "dropped": 0, "processed": 0}
    
    async def enqueue(self, item: dict) -> bool:
        async with self._lock:
            if len(self.queue) >= self.max_size:
                dropped = self.drop_strategy.drop(self.queue)
                self.metrics["dropped"] += 1
                if dropped:
                    await self._on_item_dropped(dropped)
            
            self.queue.append(item)
            self.metrics["enqueued"] += 1
            return True
    
    async def dequeue(self) -> Optional[dict]:
        async with self._lock:
            if self.queue:
                item = self.queue.pop(0)
                self.metrics["processed"] += 1
                return item
        return None
    
    async def _on_item_dropped(self, item: dict):
        pass

Block Strategy

class BlockingBackpressure:
    """Block producers when queue is full."""
    
    def __init__(self, max_size: int, block_timeout: float = 30.0):
        self.max_size = max_size
        self.queue = asyncio.Queue(maxsize=max_size)
        self.block_timeout = block_timeout
    
    async def enqueue(self, item: dict) -> bool:
        try:
            await asyncio.wait_for(
                self.queue.put(item),
                timeout=self.block_timeout
            )
            return True
        except asyncio.TimeoutError:
            logger.warning(f"Backpressure: Blocked for {self.block_timeout}s")
            return False
    
    async def dequeue(self) -> Optional[dict]:
        return await self.queue.get()


class TokenBucketWithBlock:
    """Token bucket with blocking when empty."""
    
    def __init__(self, rate: float, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1, block: bool = True) -> bool:
        async with self._lock:
            while True:
                self._refill()
                
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return True
                
                if not block:
                    return False
                
                wait_time = (tokens - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_update
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_update = now

Signal Strategy

class BackpressureSignal:
    """Signal backpressure status to producers."""
    
    def __init__(self, queue: asyncio.Queue, high_watermark: float = 0.8, low_watermark: float = 0.4):
        self.queue = queue
        self.high_watermark = high_watermark
        self.low_watermark = low_watermark
        self.is_pressured = False
        self._callbacks = []
    
    def register_callback(self, callback: Callable):
        self._callbacks.append(callback)
    
    async def check(self):
        fill_ratio = self.queue.qsize() / self.queue.maxsize
        
        was_pressured = self.is_pressured
        self.is_pressured = fill_ratio >= self.high_watermark
        
        if self.is_pressured and not was_pressured:
            for cb in self._callbacks:
                await cb(backpressure=True)
        elif not self.is_pressured and was_pressured:
            for cb in self._callbacks:
                await cb(backpressure=False)
    
    def should_backoff(self) -> bool:
        return self.is_pressured


class ProducerWithBackpressure:
    """Producer that responds to backpressure signals."""
    
    def __init__(self, signal: BackpressureSignal):
        self.signal = signal
        self.production_rate = 100  # items per second
        self.min_rate = 1
    
    async def produce(self, item_generator):
        while True:
            if self.signal.should_backoff():
                self.production_rate = max(
                    self.min_rate,
                    self.production_rate * 0.9  # Reduce by 10%
                )
            else:
                self.production_rate = min(100, self.production_rate * 1.1)
            
            delay = 1.0 / self.production_rate
            await asyncio.sleep(delay)
            
            item = next(item_generator)
            yield item

Implementing Flow Control

TCP-Style Window Control

class SlidingWindowFlowControl:
    """TCP-like sliding window for flow control."""
    
    def __init__(self, window_size: int = 10):
        self.window_size = window_size
        self.in_flight = 0
        self._lock = asyncio.Lock()
        self._semaphore = asyncio.Semaphore(window_size)
    
    async def send(self, item: dict) -> bool:
        async with self._semaphore:
            async with self._lock:
                self.in_flight += 1
            
            try:
                result = await self._process(item)
                return result
            finally:
                async with self._lock:
                    self.in_flight -= 1
    
    async def _process(self, item: dict) -> bool:
        await asyncio.sleep(0.01)  # Simulate processing
        return True
    
    def get_window_status(self) -> dict:
        return {
            "window_size": self.window_size,
            "in_flight": self.in_flight,
            "available": self.window_size - self.in_flight,
            "utilization": self.in_flight / self.window_size
        }


class AdaptiveWindowController:
    """Adjusts window size based on performance."""
    
    def __init__(self, initial_window: int = 10, min_window: int = 1, max_window: int = 100):
        self.window = initial_window
        self.min_window = min_window
        self.max_window = max_window
        self.rtt_history = []
        self.error_history = []
    
    async def record_success(self, rtt: float):
        self.rtt_history.append(rtt)
        if len(self.rtt_history) > 100:
            self.rtt_history.pop(0)
        
        avg_rtt = sum(self.rtt_history) / len(self.rtt_history)
        
        if avg_rtt < 0.1:  # Fast response
            self.window = min(self.max_window, self.window + 1)
        elif avg_rtt > 0.5:  # Slow response
            self.window = max(self.min_window, self.window - 1)
    
    async def record_error(self):
        self.error_history.append(time.time())
        self.error_history = [t for t in self.error_history if time.time() - t < 60]
        
        error_rate = len(self.error_history) / 60
        
        if error_rate > 0.1:  # High error rate
            self.window = max(self.min_window, self.window // 2)

Producer-Consumer with Backpressure

class BackpressureProcessor:
    def __init__(self, queue_size: int = 1000, num_workers: int = 4):
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.workers = [asyncio.create_task(self._worker(i)) for i in range(num_workers)]
        self.backpressure = BackpressureSignal(self.queue)
        self._running = True
    
    async def _worker(self, worker_id: int):
        while self._running:
            try:
                item = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=1.0
                )
                
                await self._process_item(item)
                self.queue.task_done()
                
            except asyncio.TimeoutError:
                continue
    
    async def _process_item(self, item: dict):
        await asyncio.sleep(0.01)
    
    async def submit(self, item: dict) -> bool:
        await self.backpressure.check()
        
        if self.backpressure.should_backoff():
            logger.warning("Backpressure active, rejecting item")
            return False
        
        try:
            await asyncio.wait_for(
                self.queue.put(item),
                timeout=5.0
            )
            return True
        except asyncio.TimeoutError:
            logger.error("Queue full, could not enqueue")
            return False
    
    async def shutdown(self):
        self._running = False
        await asyncio.gather(*self.workers, return_exceptions=True)

HTTP Flow Control Middleware

from aiohttp import web

class FlowControlMiddleware:
    def __init__(self, max_concurrent: int = 100, queue_size: int = 500):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.active_requests = 0
        self.queued_requests = 0
        self.rejected_requests = 0
    
    @web.middleware
    async def handle(self, request: web.Request, handler):
        async with self.semaphore:
            self.active_requests += 1
            
            try:
                response = await handler(request)
                return response
            finally:
                self.active_requests -= 1
    
    def get_status(self) -> dict:
        return {
            "active": self.active_requests,
            "queued": self.queued_requests,
            "rejected": self.rejected_requests,
            "utilization": self.active_requests / 100
        }


class RateLimitFlowControl:
    def __init__(self, rate: int, burst: int):
        self.tokens = burst
        self.rate = rate
        self.max_tokens = burst
        self.last_check = time.time()
        self._lock = asyncio.Lock()
    
    async def acquire(self) -> bool:
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_check
            self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
            self.last_check = now
            
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False
    
    @web.middleware
    async def handle(self, request: web.Request, handler):
        if await self.acquire():
            return await handler(request)
        
        return web.Response(
            status=429,
            headers={"Retry-After": "1"},
            text="Rate limit exceeded"
        )

Load Shedding

class LoadShedder:
    """Drop requests when system is overloaded."""
    
    def __init__(self, cpu_threshold: float = 0.9, memory_threshold: float = 0.9):
        self.cpu_threshold = cpu_threshold
        self.memory_threshold = memory_threshold
        self.shed_count = 0
        self.total_count = 0
    
    def should_shed(self) -> bool:
        import psutil
        self.total_count += 1
        
        cpu = psutil.cpu_percent()
        memory = psutil.virtual_memory().percent
        
        if cpu > self.cpu_threshold * 100 or memory > self.memory_threshold * 100:
            self.shed_count += 1
            return True
        
        return False
    
    def get_shed_rate(self) -> float:
        if self.total_count == 0:
            return 0
        return self.shed_count / self.total_count


class PriorityLoadShedder:
    """Shed lower priority requests first."""
    
    PRIORITY_MAP = {
        "critical": 0,
        "high": 1,
        "normal": 2,
        "low": 3,
        "batch": 4
    }
    
    def __init__(self, max_load: float = 0.8):
        self.max_load = max_load
        self.current_load = 0
    
    def calculate_priority(self, request: web.Request) -> int:
        priority_header = request.headers.get("X-Request-Priority", "normal")
        return self.PRIORITY_MAP.get(priority_header, 2)
    
    @web.middleware
    async def handle(self, request: web.Request, handler):
        import psutil
        current_load = psutil.cpu_percent() / 100
        
        if current_load > self.max_load:
            priority = self.calculate_priority(request)
            
            if priority > 1:  # Shed low/normal priority
                return web.Response(
                    status=503,
                    headers={"Retry-After": "30"},
                    text="Service overloaded"
                )
        
        return await handler(request)


class CircuitBreakerLoadShedder:
    """Open circuit when overloaded and shed all requests."""
    
    def __init__(self, failure_threshold: int = 50, timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.circuit_open = False
        self.last_failure_time = None
    
    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.failures >= self.failure_threshold:
            self.circuit_open = True
    
    def record_success(self):
        self.failures = 0
        self.circuit_open = False
    
    @web.middleware
    async def handle(self, request: web.Request, handler):
        if self.circuit_open:
            if time.time() - self.last_failure_time > self.timeout:
                self.circuit_open = False
                self.failures = 0
            else:
                return web.Response(
                    status=503,
                    text="Service temporarily unavailable"
                )
        
        try:
            response = await handler(request)
            self.record_success()
            return response
        except Exception as e:
            self.record_failure()
            raise

Monitoring Backpressure

class BackpressureMonitor:
    def __init__(self):
        self.metrics = {
            "queue_depth": [],
            "drop_rate": [],
            "backpressure_active": [],
            "response_time": []
        }
    
    async def record_queue_depth(self, depth: int, max_depth: int):
        self.metrics["queue_depth"].append({
            "depth": depth,
            "max": max_depth,
            "ratio": depth / max_depth,
            "timestamp": time.time()
        })
    
    async def record_drop(self, count: int, total: int):
        self.metrics["drop_rate"].append({
            "dropped": count,
            "total": total,
            "rate": count / total if total > 0 else 0,
            "timestamp": time.time()
        })
    
    async def record_response_time(self, duration: float):
        self.metrics["response_time"].append({
            "duration": duration,
            "timestamp": time.time()
        })
    
    def get_health_status(self) -> dict:
        recent_queue = self.metrics["queue_depth"][-10:]
        recent_drops = self.metrics["drop_rate"][-10:]
        
        avg_queue_ratio = sum(m["ratio"] for m in recent_queue) / len(recent_queue) if recent_queue else 0
        avg_drop_rate = sum(m["rate"] for m in recent_drops) / len(recent_drops) if recent_drops else 0
        
        if avg_drop_rate > 0.1:
            status = "critical"
        elif avg_queue_ratio > 0.8:
            status = "degraded"
        else:
            status = "healthy"
        
        return {
            "status": status,
            "avg_queue_ratio": avg_queue_ratio,
            "avg_drop_rate": avg_drop_rate,
            "backpressure_active": avg_queue_ratio > 0.7
        }

Best Practices

GOOD_PATTERNS = {
    "detect_early": """
# Monitor queue depth and response times early

โœ… Good:
- Set alerts at 50% queue capacity
- Monitor for increasing latency
- Track error rates

โŒ Bad:
- Only react when queue is full
- Wait for failures to detect overload
""",
    
    "graceful_degradation": """
# Shed load gracefully instead of crashing

โœ… Good:
- Return degraded responses
- Prioritize critical requests
- Provide useful error messages

โŒ Bad:
- Accept all requests then crash
- Drop connections without response
""",
    
    "multiple_strategies": """
# Use combination of strategies

โœ… Good:
- Backpressure at application level
- Load shedding at gateway level
- Circuit breakers for downstream

โŒ Bad:
- Rely on single mechanism
- No defense in depth
"""
}

BAD_PATTERNS = {
    "infinite_queue": """
โŒ Bad:
queue = asyncio.Queue()  # No maxsize!

# Queue grows until OOM crash

โœ… Good:
queue = asyncio.Queue(maxsize=10000)

# Bounded queue with backpressure
""",
    
    "no_monitoring": """
โŒ Bad:
# No visibility into backpressure

async def process():
    await queue.put(item)  # Could block forever!

โœ… Good:
# Monitor queue depth
if queue.qsize() > queue.maxsize * 0.8:
    alert("Approaching backpressure")
""",
    
    "ignore_feedback": """
โŒ Bad:
# Don't respond to backpressure signals

producer.send(item)  # Always send!

โœ… Good:
# Respect backpressure
if should_backoff():
    await sleep(delay)
producer.send(item)
"""
}

Summary

Backpressure and flow control are essential for system resilience:

  • Drop Strategy - Drop requests when queue is full (newest, oldest, or random)
  • Block Strategy - Block producers until consumers catch up
  • Signal Strategy - Notify producers to slow down

Key principles:

  • Always bound queues and pools
  • Monitor for backpressure conditions
  • Implement graceful degradation
  • Use multiple layers of protection
  • Make backpressure visible to operators

Proper backpressure handling prevents cascade failures and keeps systems stable under overload.

Comments