Introduction
In distributed systems, failures are inevitable. When a service calls another service that is experiencing issues, naive retry strategies can amplify the problem, causing cascading failures across your entire system. The Circuit Breaker pattern provides a solution by detecting failures and preventing further requests to the failing service.
The Problem
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Cascading Failure Problem โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Without Circuit Breaker: โ
โ โ
โ User Requests โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโ โ
โ โService Aโ โ
โ โโโโโโฌโโโโโ โ
โ โ Slow (high latency) โ
โ โผ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โService Bโ โService Bโ โService Bโ โService Bโ โ
โ โ(failing)โ โ(failing)โ โ(failing)โ โ(failing)โ โ
โ โ timeout โ โ timeout โ โ timeout โ โ timeout โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ โ โ
โ โโโโโโดโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโ โ
โ โ โ
โ More requests queue up โ
โ Services become overwhelmed โ
โ System collapses โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ With Circuit Breaker โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ User Requests โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโ โ
โ โService Aโ โ
โ โโโโโโฌโโโโโ โ
โ โ โ
โ โโโโโโดโโโโโ โ
โ โCircuit โ โ
โ โBreaker โ โ
โ โ โ โ
โ โ โโโโโโโ โ CLOSED โ OPEN โ HALF_OPEN โ
โ โ โState โ โ โ
โ โ โโโโโโโ โ โ
โ โโโโโโฌโโโโโ โ
โ โ โ
โ โโโโโโดโโโโโ โ
โ โFallback โ Return cached data or error immediately โ
โ โโโโโโโโโโโ โ
โ โ
โ Service B is isolated โ
โ System remains healthy โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Circuit Breaker States
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Circuit Breaker States โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโ โ
โ โ CLOSED โ โ Normal operation โ
โ โ โ - Requests pass through โ
โ โ Requests โ - Failures are counted โ
โ โ pass โ - Circuit closes on failure threshold โ
โ โโโโโโฌโโโโโโ โ
โ โ Failure threshold reached โ
โ โผ โ
โ โโโโโโโโโโโโ โ
โ โ OPEN โ โ Service unavailable โ
โ โ โ - Requests are blocked โ
โ โRequests โ - Return fallback immediately โ
โ โ blocked โ - Timer starts for recovery โ
โ โโโโโโฌโโโโโโ โ
โ โ Recovery timeout elapsed โ
โ โผ โ
โ โโโโโโโโโโโโโโโโ โ
โ โ HALF_OPEN โ โ Testing recovery โ
โ โ โ - Limited requests allowed โ
โ โ Limited โ - If successful, close circuit โ
โ โ requests โ - If failed, reopen circuit โ
โ โโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Implementation
import time
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, TypeVar, Generic
import functools
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
"""Configuration for circuit breaker."""
failure_threshold: int = 5 # Failures before opening
success_threshold: int = 3 # Successes to close
timeout: float = 60.0 # Time before half-open
half_open_max_calls: int = 3 # Max calls in half-open
class CircuitBreakerOpen(Exception):
"""Exception raised when circuit is open."""
pass
class CircuitBreaker:
"""Circuit breaker implementation."""
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.lock = threading.RLock()
self.call_count = 0
def call(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute function with circuit breaker protection."""
with self.lock:
self._check_state()
if self.state == CircuitState.OPEN:
raise CircuitBreakerOpen(
f"Circuit {self.name} is OPEN. Failing fast."
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _check_state(self):
"""Check and update circuit state."""
if self.state == CircuitState.OPEN:
# Check if timeout has elapsed
if (self.last_failure_time and
time.time() - self.last_failure_time > self.config.timeout):
self._transition_to_half_open()
def _transition_to_half_open(self):
"""Transition to half-open state."""
print(f"[Circuit {self.name}] OPEN โ HALF_OPEN")
self.state = CircuitState.HALF_OPEN
self.call_count = 0
self.success_count = 0
def _on_success(self):
"""Handle successful call."""
with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
self.call_count += 1
if (self.success_count >= self.config.success_threshold or
self.call_count >= self.config.half_open_max_calls):
self._transition_to_closed()
elif self.state == CircuitState.CLOSED:
# Reset failure count on success
self.failure_count = 0
def _on_failure(self):
"""Handle failed call."""
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self._transition_to_open()
elif (self.state == CircuitState.CLOSED and
self.failure_count >= self.config.failure_threshold):
self._transition_to_open()
def _transition_to_open(self):
"""Transition to open state."""
print(f"[Circuit {self.name}] {self.state.value} โ OPEN")
self.state = CircuitState.OPEN
self.failure_count = 0
def _transition_to_closed(self):
"""Transition to closed state."""
print(f"[Circuit {self.name}] {self.state.value} โ CLOSED")
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
def get_state(self) -> dict:
"""Get current circuit breaker state."""
with self.lock:
return {
"name": self.name,
"state": self.state.value,
"failure_count": self.failure_count,
"success_count": self.success_count,
"last_failure_time": self.last_failure_time,
}
# Decorator version
def circuit_breaker(circuit_name: str, config: CircuitBreakerConfig = None):
"""Decorator for circuit breaker."""
breaker = CircuitBreaker(circuit_name, config)
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
return breaker.call(func, *args, **kwargs)
# Attach breaker to function for inspection
wrapper.circuit_breaker = breaker
return wrapper
return decorator
Circuit Breaker with Fallback
from typing import Callable, Optional, Any
import time
class FallbackCircuitBreaker(CircuitBreaker):
"""Circuit breaker with fallback support."""
def __init__(self, name: str, config: CircuitBreakerConfig = None,
fallback: Callable = None):
super().__init__(name, config)
self.fallback = fallback
self.fallback_cache = {}
self.cache_ttl = 300 # 5 minutes
def call_with_fallback(self, func: Callable, *args,
fallback_key: str = None,
**kwargs) -> Any:
"""Execute with circuit breaker and fallback."""
with self.lock:
self._check_state()
if self.state == CircuitState.OPEN:
if fallback_key and fallback_key in self.fallback_cache:
cached = self.fallback_cache[fallback_key]
if time.time() - cached["timestamp"] < self.cache_ttl:
print(f"[Circuit {self.name}] Returning cached fallback")
return cached["data"]
if self.fallback:
print(f"[Circuit {self.name}] Calling fallback")
return self.fallback(*args, **kwargs)
raise CircuitBreakerOpen(f"Circuit {self.name} is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
# Cache successful result for fallback
if fallback_key:
self.fallback_cache[fallback_key] = {
"data": result,
"timestamp": time.time()
}
return result
except Exception as e:
self._on_failure()
# Try fallback on exception
if fallback_key and fallback_key in self.fallback_cache:
cached = self.fallback_cache[fallback_key]
if time.time() - cached["timestamp"] < self.cache_ttl:
print(f"[Circuit {self.name}] Exception - returning cached fallback")
return cached["data"]
if self.fallback:
print(f"[Circuit {self.name}] Exception - calling fallback")
return self.fallback(*args, **kwargs)
raise
# Usage examples
def get_user_fallback(user_id: str) -> dict:
"""Fallback when user service is down."""
return {
"id": user_id,
"name": "Unknown User",
"cached": True,
"message": "Data may be stale"
}
def get_product_fallback(product_id: str) -> dict:
"""Fallback when product service is down."""
return {
"id": product_id,
"name": "Product Unavailable",
"price": 0.0,
"available": False
}
# Create circuit breakers
user_circuit = FallbackCircuitBreaker(
"user-service",
config=CircuitBreakerConfig(
failure_threshold=3,
success_threshold=2,
timeout=30.0
),
fallback=get_user_fallback
)
product_circuit = FallbackCircuitBreaker(
"product-service",
config=CircuitBreakerConfig(
failure_threshold=5,
success_threshold=3,
timeout=60.0
),
fallback=get_product_fallback
)
Resilience with Multiple Services
import asyncio
from typing import Optional
import aiohttp
class ResilientHTTPClient:
"""HTTP client with circuit breaker and retry."""
def __init__(self):
self.circuits = {}
self.session: Optional[aiohttp.ClientSession] = None
def get_circuit(self, service_name: str) -> FallbackCircuitBreaker:
"""Get or create circuit breaker for service."""
if service_name not in self.circuits:
self.circuits[service_name] = FallbackCircuitBreaker(
service_name,
config=CircuitBreakerConfig(
failure_threshold=5,
timeout=30.0
)
)
return self.circuits[service_name]
async def get(self, url: str, service: str, **kwargs) -> dict:
"""Make GET request with resilience."""
circuit = self.get_circuit(service)
async def do_request():
if not self.session:
self.session = aiohttp.ClientSession()
async with self.session.get(url, **kwargs) as response:
return await response.json()
return await asyncio.to_thread(
circuit.call_with_fallback,
do_request,
fallback_key=url
)
async def close(self):
"""Close HTTP session."""
if self.session:
await self.session.close()
class ServiceRegistry:
"""Registry for managing service endpoints."""
def __init__(self):
self.services = {
"users": "http://user-service:8000",
"products": "http://product-service:8001",
"orders": "http://order-service:8002",
"payments": "http://payment-service:8003",
}
self.circuits = {}
def get_service_url(self, service: str) -> str:
return self.services.get(service, "")
def get_circuit(self, service: str) -> CircuitBreaker:
if service not in self.circuits:
self.circuits[service] = CircuitBreaker(service)
return self.circuits[service]
def get_service_health(self) -> dict:
"""Get health status of all services."""
return {
name: circuit.get_state()
for name, circuit in self.circuits.items()
}
Monitoring Circuit Breakers
import prometheus_client as prometheus
from collections import defaultdict
# Prometheus metrics
circuit_state = prometheus.Gauge(
'circuit_breaker_state',
'Circuit breaker state (0=closed, 1=half-open, 2=open)',
['service']
)
circuit_failures = prometheus.Counter(
'circuit_breaker_failures_total',
'Total circuit breaker failures',
['service']
)
circuit_calls = prometheus.Counter(
'circuit_breaker_calls_total',
'Total circuit breaker calls',
['service', 'result'] # result: success, fallback, rejected
)
class CircuitBreakerMonitor:
"""Monitor and collect metrics for circuit breakers."""
def __init__(self):
self.breakers = defaultdict(list)
def register(self, circuit: CircuitBreaker):
self.breakers[circuit.name].append(circuit)
def collect_metrics(self):
"""Collect and export metrics."""
for name, circuits in self.breakers.items():
circuit = circuits[0] # Get first circuit
state = circuit.get_state()
# Update Prometheus metrics
state_value = {
CircuitState.CLOSED: 0,
CircuitState.HALF_OPEN: 1,
CircuitState.OPEN: 2,
}[circuit.state]
circuit_state.labels(service=name).set(state_value)
if state["failure_count"] > 0:
circuit_failures.labels(service=name).inc(state["failure_count"])
def get_status_report(self) -> dict:
"""Generate status report for all circuits."""
report = {}
for name, circuits in self.breakers.items():
circuit = circuits[0]
state = circuit.get_state()
report[name] = {
"status": state["state"],
"failures": state["failure_count"],
"healthy": circuit.state == CircuitState.CLOSED,
"last_failure": state["last_failure_time"],
}
return report
Conclusion
The Circuit Breaker pattern is essential for building resilient distributed systems. By detecting failures quickly and preventing cascading issues, circuit breakers help maintain system stability even when individual services fail.
Key takeaways:
- Use circuit breakers to isolate failing services
- Implement fallbacks for degraded functionality
- Monitor circuit breaker state and failures
- Configure appropriate thresholds based on service behavior
- Combine with retry policies for comprehensive resilience
Comments