The Circuit Breaker pattern prevents cascading failures in distributed systems. When a service fails repeatedly, the circuit “opens” to stop further requests, giving the service time to recover.
Understanding Circuit Breaker
The Problem
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Cascading Failure Without Circuit Breaker โ
โ โ
โ Request โโโบ Service A โโโบ Service B โโโบ Service C โ
โ โ โ โ
โ โผ โผ โ
โ Timeout! Timeout! โ
โ โ โ โ
โ โโโโโโโโโฌโโโโโโโโโโโโ โ
โ โผ โ
โ Service A gets overwhelmed โ
โ More timeouts โ
โ Eventually fails entirely! โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ With Circuit Breaker โ
โ โ
โ Request โโโบ Circuit โโโบ Service B โ
โ Breaker โ
โ โ โ
โ โผ (if open) โ
โ Return fallback โ
โ โ โ
โ โผ โ
โ Service B recovers! โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Circuit Breaker States
circuit_breaker_states = {
"closed": {
"description": "Normal operation, requests pass through",
"transitions": "โ open when failure threshold reached"
},
"open": {
"description": "Requests blocked, return fallback immediately",
"transitions": "โ half-open after timeout"
},
"half_open": {
"description": "Limited requests allowed to test recovery",
"transitions": "โ closed if successful, โ open if failed"
}
}
Implementation
Basic Circuit Breaker
import time
from enum import Enum
from typing import Callable, Any
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker implementation"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception,
half_open_max_calls: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.half_open_calls = 0
def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
if not self._can_execute():
raise CircuitBreakerOpenError(
f"Circuit breaker is {self.state.value}"
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _can_execute(self) -> bool:
"""Check if request can be executed"""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if recovery timeout passed
if time.time() - self.last_failure_time >= self.recovery_timeout:
self._transition_to_half_open()
return True
return False
# Half-open state
return self.half_open_calls < self.half_open_max_calls
def _on_success(self):
"""Handle successful call"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.half_open_max_calls:
self._transition_to_closed()
# Reset failure count on success in closed state
if self.state == CircuitState.CLOSED:
self.failure_count = 0
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self._transition_to_open()
elif self.state == CircuitState.CLOSED:
if self.failure_count >= self.failure_threshold:
self._transition_to_open()
def _transition_to_open(self):
self.state = CircuitState.OPEN
self.success_count = 0
def _transition_to_half_open(self):
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
self.success_count = 0
def _transition_to_closed(self):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.half_open_calls = 0
class CircuitBreakerOpenError(Exception):
"""Raised when circuit breaker is open"""
pass
Decorator Usage
# Usage with decorator
circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60
)
@circuit_breaker
def call_external_service():
response = requests.get("https://external-api.com/data")
response.raise_for_status()
return response.json()
# Or with fallback
def call_with_fallback():
try:
return circuit_breaker.call(get_data)
except CircuitBreakerOpenError:
return get_cached_data() # Fallback
except requests.RequestException:
return get_cached_data() # Fallback for other errors
Python Resilience Library
# Using pybreaker
import pybreaker
# Configure circuit breaker
breaker = pybreaker.CircuitBreaker(
fail_max=5, # Open after 5 failures
reset_timeout=60, # Try again after 60 seconds
exclude=[ValueError] # Don't count these exceptions
)
@breaker
def call_service():
response = requests.get(url)
response.raise_for_status()
return response.json()
# With custom listener
class MyListener(pybreaker.CircuitBreakerListener):
def on_call_success(self, breaker):
print("Call succeeded")
def on_call_failure(self, breaker, exc):
print(f"Call failed: {exc}")
def on_state_change(self, breaker, old_state, new_state):
print(f"State changed: {old_state} -> {new_state}")
breaker.add_listener(MyListener())
Circuit Breaker with Fallback
Fallback Strategies
class CircuitBreakerWithFallback:
"""Circuit breaker with various fallback strategies"""
def __init__(self, breaker, fallback_strategy="cache_then_null"):
self.breaker = breaker
self.fallback_strategy = fallback_strategy
self.cache = {} # Simple in-memory cache
def execute(self, func, *args, **kwargs):
try:
return self.breaker.call(func, *args, **kwargs)
except CircuitBreakerOpenError:
return self._fallback(func.__name__, *args, **kwargs)
except Exception as e:
# Log error
logging.error(f"Service call failed: {e}")
return self._fallback(func.__name__, *args, **kwargs)
def _fallback(self, func_name, *args, **kwargs):
"""Execute fallback based on strategy"""
if self.fallback_strategy == "cache_then_null":
# Try cache, return None if not found
cached = self.cache.get(func_name)
if cached:
return cached
return None
elif self.fallback_strategy == "cache_then_stale":
# Return stale cache
return self.cache.get(func_name, {})
elif self.fallback_strategy == "default_value":
# Return default value
return {"status": "fallback", "data": []}
elif self.fallback_strategy == "queue":
# Queue for later processing
queue.put({"func": func_name, "args": args, "kwargs": kwargs})
return {"status": "queued"}
elif self.fallback_strategy == "circuit_broken":
# Re-raise to let caller handle
raise ServiceUnavailableError("Service unavailable")
def _update_cache(self, func_name, result):
"""Update cache with fresh result"""
self.cache[func_name] = result
Monitoring and Observability
Circuit Breaker Metrics
from dataclasses import dataclass
from typing import Dict
@dataclass
class CircuitBreakerMetrics:
"""Metrics for monitoring circuit breaker"""
state: str
failure_count: int
success_count: int
total_calls: int
rejection_count: int
@property
def failure_rate(self) -> float:
if self.total_calls == 0:
return 0.0
return self.failure_count / self.total_calls
@property
def is_healthy(self) -> bool:
return self.state == "closed" and self.failure_rate < 0.5
class InstrumentedCircuitBreaker(CircuitBreaker):
"""Circuit breaker with metrics"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.total_calls = 0
self.rejection_count = 0
self._metrics = CircuitBreakerMetrics(
state="closed",
failure_count=0,
success_count=0,
total_calls=0,
rejection_count=0
)
def call(self, func, *args, **kwargs):
self.total_calls += 1
if not self._can_execute():
self.rejection_count += 1
raise CircuitBreakerOpenError("Circuit open")
try:
result = func(*args, **kwargs)
self.success_count += 1
self._update_metrics()
return result
except self.expected_exception as e:
self.failure_count += 1
self._update_metrics()
raise e
def _update_metrics(self):
self._metrics = CircuitBreakerMetrics(
state=self.state.value,
failure_count=self.failure_count,
success_count=self.success_count,
total_calls=self.total_calls,
rejection_count=self.rejection_count
)
def get_metrics(self) -> CircuitBreakerMetrics:
return self._metrics
Health Check Endpoint
@app.get("/health/circuit-breaker")
def circuit_breaker_health():
"""Expose circuit breaker health"""
breakers = {
"user_service": user_circuit_breaker.get_metrics(),
"payment_service": payment_circuit_breaker.get_metrics(),
"notification_service": notification_circuit_breaker.get_metrics()
}
overall_healthy = all(
b.is_healthy for b in breakers.values()
)
return {
"status": "healthy" if overall_healthy else "degraded",
"breakers": {
name: {
"state": metrics.state,
"failure_rate": f"{metrics.failure_rate:.2%}",
"total_calls": metrics.total_calls
}
for name, metrics in breakers.items()
}
}
Best Practices
# Circuit breaker best practices
configuration:
- "Set failure threshold based on normal error rates"
- "Recovery timeout should allow service to recover"
- "Half-open state limits test requests"
fallback:
- "Always provide fallback responses"
- "Log circuit breaker events for debugging"
- "Consider stale data vs no data"
monitoring:
- "Track state changes"
- "Alert when circuit opens"
- "Monitor failure rates"
isolation:
- "Use separate breakers per dependency"
- "Don't share breakers across services"
- "Consider timeout + circuit breaker"
Conclusion
Circuit breakers prevent cascading failures:
- States: Closed (normal), Open (blocked), Half-open (testing)
- Fallback: Always provide fallback responses
- Monitoring: Track metrics and alert on state changes
Use circuit breakers for any external service call to build resilient systems.
Related Articles
- Rate Limiting Strategies
- Saga Pattern: Distributed Transactions
- Microservices Communication Patterns
Comments