The Bulkhead pattern isolates critical resources into separate pools to prevent cascade failures. Named after ship bulkheads that contain flooding to one section, this pattern ensures that failure in one part of the system doesn’t bring down the entire application.
Understanding Bulkhead Pattern
The Problem Without Bulkheads
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Single Resource Pool (No Isolation) โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Application Thread Pool โ โ
โ โ (100 threads total) โ โ
โ โ โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โ โ Request โ โ Request โ โ Request โ โ Request โ ... โ โ
โ โ โ 1 โ โ 2 โ โ 3 โ โ 4 โ โ โ
โ โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ โ
โ โ โ โ โ โ โ โ
โ โ โโโโโโโโโโโโดโโโโโโโโโโโดโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โ โผ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Slow External API โ โ โ
โ โ โ (takes 30+ seconds) โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โ โโโโโโโโโโโโโโดโโโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โ All 100 threads waiting! โ โ
โ โ โ โ
โ โ โ New requests rejected (pool exhausted) โ โ
โ โ โ App becomes completely unresponsive โ โ
โ โ โ Everything fails, not just the slow API calls โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
With Bulkhead Isolation
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Bulkhead Pattern - Resource Isolation โ
โ โ
โ โโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโ โ
โ โ Thread Pool โ Thread Pool โ Thread Pool โ โ
โ โ for API โ for DB โ for Cache โ โ
โ โ (20 threads)โ (30 threads) โ (10 threads) โ โ
โ โโโโโโโโฌโโโโโโโโดโโโโโโโฌโโโโโโโโดโโโโโโโฌโโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ External โ โ Database โ โ Redis โ โ
โ โ API โ โ Connection โ โ Cache โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโ โ
โ โ Slow API โ โ Only 20 threads affected โ
โ โ (30+ secs) โ (20% of capacity) โ
โ โโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โ 80 threads still available โ
โ โ DB and Cache operations continue โ
โ โ App remains responsive โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Types of Bulkheads
Thread Pool Bulkhead
import concurrent.futures
from queue import Queue
import threading
class ThreadPoolBulkhead:
def __init__(
self,
name: str,
core_size: int = 10,
max_size: int = 20,
queue_size: int = 100,
keep_alive_seconds: int = 60
):
self.name = name
self.executor = ThreadPoolExecutor(
max_workers=max_size,
thread_name_prefix=f"bulkhead-{name}",
keep_alive_time=keep_alive_seconds
)
self.queue = Queue(maxsize=queue_size)
self._metrics = BulkheadMetrics(name)
def submit(self, fn, *args, **kwargs):
if self.queue.full():
self._metrics.increment_rejected()
raise BulkheadRejectedError(
f"Bulkhead {self.name} is at capacity"
)
future = self.executor.submit(fn, *args, **kwargs)
self._metrics.increment_submitted()
future.add_done_callback(
lambda f: self._metrics.increment_completed()
)
return future
async def submit_async(self, fn, *args, **kwargs):
loop = asyncio.get_event_loop()
if self.queue.full():
self._metrics.increment_rejected()
raise BulkheadRejectedError(
f"Bulkhead {self.name} is at capacity"
)
future = loop.run_in_executor(
self.executor,
lambda: fn(*args, **kwargs)
)
self._metrics.increment_submitted()
return await future
def get_metrics(self) -> dict:
return {
"name": self.name,
"submitted": self._metrics.submitted,
"completed": self._metrics.completed,
"rejected": self._metrics.rejected,
"active": self._metrics.active,
"queue_size": self.queue.qsize()
}
class BulkheadMetrics:
def __init__(self, name: str):
self.name = name
self.submitted = 0
self.completed = 0
self.rejected = 0
self.active = 0
self._lock = threading.Lock()
def increment_submitted(self):
with self._lock:
self.submitted += 1
self.active += 1
def increment_completed(self):
with self._lock:
self.completed += 1
self.active = max(0, self.active - 1)
def increment_rejected(self):
with self._lock:
self.rejected += 1
Connection Pool Bulkhead
import asyncio
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ConnectionPoolBulkhead:
host: str
port: int
min_connections: int = 5
max_connections: int = 50
max_waiters: int = 100
acquire_timeout: float = 30.0
idle_timeout: float = 300.0
_connections: list = field(default_factory=list, init=False)
_waiters: asyncio.Queue = field(default_factory=asyncio.Queue, init=False)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
_semaphore: asyncio.Semaphore = field(init=False)
def __post_init__(self):
self._semaphore = asyncio.Semaphore(self.max_connections)
async def acquire(self) -> 'Connection':
if not self._connections:
await self._create_connection()
try:
async with asyncio.timeout(self.acquire_timeout):
while not self._connections:
await self._waiters.get()
conn = self._connections.pop()
return conn
except asyncio.TimeoutError:
raise ConnectionPoolExhaustedError(
f"Could not acquire connection within {self.acquire_timeout}s"
)
async def release(self, conn: 'Connection'):
if conn.is_healthy():
self._connections.append(conn)
else:
await self._create_connection()
if not self._waiters.empty():
self._waiters.put_nowait(True)
async def _create_connection(self):
async with self._semaphore:
conn = Connection(self.host, self.port)
await conn.connect()
return conn
async def close_all(self):
for conn in self._connections:
await conn.close()
self._connections.clear()
class ConnectionPoolManager:
def __init__(self):
self.pools: dict[str, ConnectionPoolBulkhead] = {}
def get_pool(self, name: str) -> ConnectionPoolBulkhead:
return self.pools.get(name)
def create_pool(
self,
name: str,
config: dict
) -> ConnectionPoolBulkhead:
pool = ConnectionPoolBulkhead(**config)
self.pools[name] = pool
return pool
Process Isolation Bulkhead
import subprocess
import multiprocessing
from dataclasses import dataclass
@dataclass
class ProcessBulkhead:
name: str
max_processes: int = 4
worker_script: str = None
_processes: list = field(default_factory=list, init=False)
_work_queue: multiprocessing.Queue = field(init=False)
_result_queue: multiprocessing.Queue = field(init=False)
_lock: multiprocessing.Lock = field(init=False)
def __post_init__(self):
self._work_queue = multiprocessing.Queue()
self._result_queue = multiprocessing.Queue()
self._lock = multiprocessing.Lock()
def submit(self, task: dict) -> multiprocessing.Process:
if len(self._processes) >= self.max_processes:
raise ProcessBulkheadExhaustedError(
f"All {self.max_processes} processes busy"
)
process = multiprocessing.Process(
target=self._worker,
args=(self._work_queue, self._result_queue)
)
process.start()
self._processes.append(process)
self._work_queue.put(task)
return process
def _worker(self, work_queue, result_queue):
while True:
task = work_queue.get()
if task is None:
break
try:
result = self._execute_task(task)
result_queue.put({"success": True, "result": result})
except Exception as e:
result_queue.put({"success": False, "error": str(e)})
def _execute_task(self, task: dict) -> any:
# Execute task in isolated process
pass
Implementation Examples
Python Bulkhead with Resilience4j-style API
import time
import asyncio
from functools import wraps
from typing import Callable, TypeVar, ParamSpec
P = ParamSpec('P')
T = TypeVar('T')
class Bulkhead:
def __init__(
self,
max_concurrent_calls: int = 100,
max_waiting_threads: int = 50,
permitted_number_of_calls: int = 1000,
sliding_window_size: int = 100
):
self.max_concurrent_calls = max_concurrent_calls
self.max_waiting_threads = max_waiting_threads
self.semaphore = asyncio.Semaphore(max_concurrent_calls)
self.metrics = BulkheadMetrics(sliding_window_size)
async def execute(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
if not self.semaphore.locked():
async with self.semaphore:
self.metrics.record_start()
start_time = time.time()
try:
result = await fn(*args, **kwargs)
self.metrics.record_success(time.time() - start_time)
return result
except Exception as e:
self.metrics.record_failure(e)
raise
else:
self.metrics.record_rejected()
raise BulkheadFullError(
f"Bulkhead full: {self.max_concurrent_calls} concurrent calls"
)
def bulkhead(bulkhead_instance: Bulkhead):
def decorator(fn: Callable) -> Callable:
@wraps(fn)
async def async_wrapper(*args, **kwargs):
return await bulkhead_instance.execute(fn, *args, **kwargs)
@wraps(fn)
def sync_wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(
bulkhead_instance.execute(fn, *args, **kwargs)
)
if asyncio.iscoroutinefunction(fn):
return async_wrapper
return sync_wrapper
return decorator
# Usage
api_bulkhead = Bulkhead(max_concurrent_calls=50)
db_bulkhead = Bulkhead(max_concurrent_calls=30)
class ExternalAPIClient:
@bulkhead(api_bulkhead)
async def call_api(self, endpoint: str):
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as resp:
return await resp.json()
class DatabaseClient:
@bulkhead(db_bulkhead)
async def query(self, sql: str):
async with pool.acquire() as conn:
return await conn.fetch(sql)
Bulkhead with Fallback
class BulkheadWithFallback:
def __init__(self, bulkhead: Bulkhead):
self.bulkhead = bulkhead
async def execute_with_fallback(
self,
fn: Callable,
fallback: Callable,
*args, **kwargs
):
try:
return await self.bulkhead.execute(fn, *args, **kwargs)
except BulkheadFullError:
if fallback:
return await fallback(*args, **kwargs)
raise
except Exception as e:
if fallback:
return await fallback(*args, **kwargs, error=e)
raise
# Example usage with fallback
async def get_user_with_fallback(user_id: str):
bulkhead_exec = BulkheadWithFallback(user_bulkhead)
async def fetch_from_db():
return await db.fetch("SELECT * FROM users WHERE id = ?", user_id)
async def fetch_from_cache(error=None):
cache_key = f"user:{user_id}"
cached = await redis.get(cache_key)
if cached:
logger.warning(f"Fallback used for user {user_id}")
return json.loads(cached)
return None
return await bulkhead_exec.execute_with_fallback(
fetch_from_db,
fetch_from_cache
)
Spring Boot Bulkhead (Java)
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
@Service
public class ExternalApiService {
@Bulkhead(name = "externalApiBulkhead", fallbackMethod = "fallback")
@CircuitBreaker(name = "externalApiCircuitBreaker", fallbackMethod = "circuitFallback")
public ExternalResponse callExternalApi(Request request) {
return externalApiClient.call(request);
}
private ExternalResponse fallback(Request request, BulkheadFullException ex) {
log.warn("Bulkhead full for API call, returning fallback");
return ExternalResponse.builder()
.status("DEGRADED")
.message("Service temporarily busy")
.fallback(true)
.build();
}
private ExternalResponse circuitFallback(Request request, Exception ex) {
log.warn("Circuit breaker open, returning circuit fallback");
return ExternalResponse.builder()
.status("UNAVAILABLE")
.message("Service unavailable")
.fallback(true)
.build();
}
}
// Configuration
@Configuration
public class BulkheadConfig {
@Bean
public BulkheadRegistry bulkheadRegistry() {
return BulkheadConfigCustom.of(
Map.of(
"externalApiBulkhead", BulkheadConfig.custom()
.maxConcurrentCalls(50)
.maxWaitDuration(Duration.ofMillis(500))
.fallbackDisabled(false)
.build()
)
);
}
}
Monitoring Bulkheads
Metrics Collection
class BulkheadMonitor:
def __init__(self, statsd_client: StatsD):
self.statsd = statsd_client
self.bulkheads: dict[str, Bulkhead] = {}
def register_bulkhead(self, name: str, bulkhead: Bulkhead):
self.bulkheads[name] = bulkhead
async def collect_metrics(self):
for name, bulkhead in self.bulkheads.items():
metrics = bulkhead.get_metrics()
self.statsd.gauge(
f"bulkhead.{name}.available",
metrics["available"]
)
self.statsd.gauge(
f"bulkhead.{name}.active",
metrics["active"]
)
self.statsd.increment(
f"bulkhead.{name}.calls",
metrics["calls"]
)
self.statsd.increment(
f"bulkhead.{name}.rejections",
metrics["rejections"]
)
# Prometheus metrics
from prometheus_client import Counter, Gauge, Histogram
BULKHEAD_CALLS = Counter(
'bulkhead_calls_total',
'Total bulkhead calls',
['bulkhead_name', 'status']
)
BULKHEAD_AVAILABLE = Gauge(
'bulkhead_available_slots',
'Available bulkhead slots',
['bulkhead_name']
)
BULKHEAD_WAIT_TIME = Histogram(
'bulkhead_wait_seconds',
'Time waiting for bulkhead',
['bulkhead_name'],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
Alerting Rules
# Prometheus alerting rules
groups:
- name: bulkhead
rules:
- alert: BulkheadHighRejectionRate
expr: |
sum(rate(bulkhead_rejections_total[5m])) by (bulkhead_name)
/ sum(rate(bulkhead_calls_total[5m])) by (bulkhead_name) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "High rejection rate on {{ $labels.bulkhead_name }}"
description: "Bulkhead {{ $labels.bulkhead_name }} rejecting >10% of calls"
- alert: BulkheadAlmostFull
expr: |
bulkhead_available_slots / bulkhead_max_slots < 0.1
for: 1m
labels:
severity: critical
annotations:
summary: "Bulkhead {{ $labels.bulkhead_name }} almost exhausted"
Best Practices
Good Patterns
GOOD_PATTERNS = {
"size_bulkheads_appropriately": """
# Right-size bulkheads based on resource needs
โ
Good:
- API calls: 20-50 threads (I/O bound, can handle more)
- Database: 10-30 connections (connection limited)
- Cache: 20-50 connections
- File I/O: 5-10 threads (often disk bound)
โ Bad:
- Same pool size for all resources
- Over-provisioning (wastes resources)
- Under-provisioning (too many rejections)
""",
"monitor_per_bulkhead": """
# Monitor each bulkhead individually
โ
Good:
- Track available slots per bulkhead
- Alert on rejection thresholds
- Monitor wait times per resource type
- Correlate with downstream service health
โ Bad:
- Only monitoring total app threads
- No visibility into which resource is failing
- Same alert for all bulkheads
""",
"combine_with_circuit_breaker": """
# Use bulkhead with circuit breaker
โ
Good:
# If bulkhead is full for extended time
# Open circuit to fail fast
bulkhead.full โ circuit.partial_open โ circuit.open
โ Bad:
# Bulkhead fills up but circuit stays closed
# Continues accepting requests that will be rejected
"""
}
Bad Patterns
BAD_PATTERNS = {
"single_pool_for_all": """
โ Bad:
# One thread pool for entire application
pool = ThreadPoolExecutor(max_workers=100)
async def handle_request(req):
db_result = await pool.submit(db_query, req) # Blocks
api_result = await pool.submit(call_api, req) # Also blocks
# If API is slow, DB queries also wait!
โ
Good:
# Separate pools for different resource types
api_pool = ThreadPoolExecutor(max_workers=20)
db_pool = ThreadPoolExecutor(max_workers=30)
cache_pool = ThreadPoolExecutor(max_workers=10)
async def handle_request(req):
db_result = await db_pool.submit(db_query, req)
api_result = await api_pool.submit(call_api, req)
""",
"no_fallback_strategy": """
โ Bad:
# No fallback when bulkhead rejects
async def get_data(req):
return await bulkhead.execute(expensive_call)
# Client gets 500 error on rejection
โ
Good:
# Fallback to degraded mode
async def get_data(req):
try:
return await bulkhead.execute(expensive_call)
except BulkheadFullError:
return await get_from_cache(req) # Graceful degradation
""",
"ignoring_bulkhead_metrics": """
โ Bad:
# Create bulkheads but don't monitor them
api_pool = ThreadPoolExecutor(max_workers=50)
# No metrics collection
โ
Good:
# Monitor and alert on bulkhead health
async def monitor_pools():
for name, pool in pools.items():
queue_size = pool._work_queue.qsize()
active = pool._active_count
if queue_size > threshold:
alert(f"{name} queue growing: {queue_size}")
"""
}
Comparing Bulkhead with Other Patterns
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Bulkhead vs Circuit Breaker vs Timeout โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Pattern โ Purpose โ Trigger โ
โ โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโ
โ Timeout โ Limit wait time โ Time exceeded โ
โ Bulkhead โ Limit concurrent โ Pool full โ
โ Circuit Breaker โ Stop calling failing โ Failure threshold โ
โ โ service โ exceeded โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Often Used Together โ โ
โ โ โ โ
โ โ Request โโโบ [Timeout] โโโบ [Bulkhead] โโโบ [Circuit] โ โ
โ โ โ โ โ โ โ
โ โ Kill if too Limit Stop if โ โ
โ โ long concurrent broken โ โ
โ โ calls service โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Summary
The Bulkhead pattern provides resource isolation to prevent cascade failures:
- Thread Pool Bulkhead - Limits concurrent executions for different operations
- Connection Pool Bulkhead - Isolates database/cache connections
- Process Bulkhead - Complete process isolation for dangerous operations
Key benefits:
- Prevents one slow service from consuming all resources
- Provides graceful degradation when limits are reached
- Enables targeted monitoring and alerting
- Works well with Circuit Breaker and Timeout patterns
Choose isolation granularity based on:
- Resource characteristics (I/O bound vs CPU bound)
- Failure blast radius (what should stay available)
- Operational overhead (more pools = more complexity)
- Cost of unavailability (critical paths need isolation)
Comments