Skip to main content
โšก Calmops

Circuit Breaker Pattern: Building Resilient Microservices

The Circuit Breaker pattern prevents cascading failures in distributed systems. When a service fails repeatedly, the circuit “opens” to stop further requests, giving the service time to recover.

Understanding Circuit Breaker

The Problem

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Cascading Failure Without Circuit Breaker         โ”‚
โ”‚                                                             โ”‚
โ”‚   Request โ”€โ”€โ–บ Service A โ”€โ”€โ–บ Service B โ”€โ”€โ–บ Service C        โ”‚
โ”‚                        โ”‚                   โ”‚                  โ”‚
โ”‚                        โ–ผ                   โ–ผ                  โ”‚
โ”‚                   Timeout!           Timeout!                 โ”‚
โ”‚                        โ”‚                   โ”‚                  โ”‚
โ”‚                        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                  โ”‚
โ”‚                                โ–ผ                             โ”‚
โ”‚                        Service A gets overwhelmed           โ”‚
โ”‚                        More timeouts                        โ”‚
โ”‚                        Eventually fails entirely!           โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              With Circuit Breaker                            โ”‚
โ”‚                                                             โ”‚
โ”‚   Request โ”€โ”€โ–บ Circuit โ”€โ”€โ–บ Service B                         โ”‚
โ”‚                 Breaker                                     โ”‚
โ”‚                       โ”‚                                     โ”‚
โ”‚                       โ–ผ (if open)                           โ”‚
โ”‚                 Return fallback                              โ”‚
โ”‚                       โ”‚                                     โ”‚
โ”‚                       โ–ผ                                     โ”‚
โ”‚                 Service B recovers!                         โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Circuit Breaker States

circuit_breaker_states = {
    "closed": {
        "description": "Normal operation, requests pass through",
        "transitions": "โ†’ open when failure threshold reached"
    },
    
    "open": {
        "description": "Requests blocked, return fallback immediately",
        "transitions": "โ†’ half-open after timeout"
    },
    
    "half_open": {
        "description": "Limited requests allowed to test recovery",
        "transitions": "โ†’ closed if successful, โ†’ open if failed"
    }
}

Implementation

Basic Circuit Breaker

import time
from enum import Enum
from typing import Callable, Any
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class CircuitBreaker:
    """Circuit breaker implementation"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        self.half_open_max_calls = half_open_max_calls
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.half_open_calls = 0
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""
        
        if not self._can_execute():
            raise CircuitBreakerOpenError(
                f"Circuit breaker is {self.state.value}"
            )
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _can_execute(self) -> bool:
        """Check if request can be executed"""
        
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            # Check if recovery timeout passed
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self._transition_to_half_open()
                return True
            return False
        
        # Half-open state
        return self.half_open_calls < self.half_open_max_calls
    
    def _on_success(self):
        """Handle successful call"""
        
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            
            if self.success_count >= self.half_open_max_calls:
                self._transition_to_closed()
        
        # Reset failure count on success in closed state
        if self.state == CircuitState.CLOSED:
            self.failure_count = 0
    
    def _on_failure(self):
        """Handle failed call"""
        
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.state == CircuitState.HALF_OPEN:
            self._transition_to_open()
        
        elif self.state == CircuitState.CLOSED:
            if self.failure_count >= self.failure_threshold:
                self._transition_to_open()
    
    def _transition_to_open(self):
        self.state = CircuitState.OPEN
        self.success_count = 0
    
    def _transition_to_half_open(self):
        self.state = CircuitState.HALF_OPEN
        self.half_open_calls = 0
        self.success_count = 0
    
    def _transition_to_closed(self):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.half_open_calls = 0


class CircuitBreakerOpenError(Exception):
    """Raised when circuit breaker is open"""
    pass

Decorator Usage

# Usage with decorator

circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    recovery_timeout=60
)

@circuit_breaker
def call_external_service():
    response = requests.get("https://external-api.com/data")
    response.raise_for_status()
    return response.json()


# Or with fallback
def call_with_fallback():
    try:
        return circuit_breaker.call(get_data)
    except CircuitBreakerOpenError:
        return get_cached_data()  # Fallback
    except requests.RequestException:
        return get_cached_data()  # Fallback for other errors

Python Resilience Library

# Using pybreaker

import pybreaker

# Configure circuit breaker
breaker = pybreaker.CircuitBreaker(
    fail_max=5,              # Open after 5 failures
    reset_timeout=60,        # Try again after 60 seconds
    exclude=[ValueError]      # Don't count these exceptions
)

@breaker
def call_service():
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

# With custom listener
class MyListener(pybreaker.CircuitBreakerListener):
    def on_call_success(self, breaker):
        print("Call succeeded")
    
    def on_call_failure(self, breaker, exc):
        print(f"Call failed: {exc}")
    
    def on_state_change(self, breaker, old_state, new_state):
        print(f"State changed: {old_state} -> {new_state}")

breaker.add_listener(MyListener())

Circuit Breaker with Fallback

Fallback Strategies

class CircuitBreakerWithFallback:
    """Circuit breaker with various fallback strategies"""
    
    def __init__(self, breaker, fallback_strategy="cache_then_null"):
        self.breaker = breaker
        self.fallback_strategy = fallback_strategy
        self.cache = {}  # Simple in-memory cache
    
    def execute(self, func, *args, **kwargs):
        try:
            return self.breaker.call(func, *args, **kwargs)
            
        except CircuitBreakerOpenError:
            return self._fallback(func.__name__, *args, **kwargs)
            
        except Exception as e:
            # Log error
            logging.error(f"Service call failed: {e}")
            return self._fallback(func.__name__, *args, **kwargs)
    
    def _fallback(self, func_name, *args, **kwargs):
        """Execute fallback based on strategy"""
        
        if self.fallback_strategy == "cache_then_null":
            # Try cache, return None if not found
            cached = self.cache.get(func_name)
            if cached:
                return cached
            return None
            
        elif self.fallback_strategy == "cache_then_stale":
            # Return stale cache
            return self.cache.get(func_name, {})
            
        elif self.fallback_strategy == "default_value":
            # Return default value
            return {"status": "fallback", "data": []}
            
        elif self.fallback_strategy == "queue":
            # Queue for later processing
            queue.put({"func": func_name, "args": args, "kwargs": kwargs})
            return {"status": "queued"}
        
        elif self.fallback_strategy == "circuit_broken":
            # Re-raise to let caller handle
            raise ServiceUnavailableError("Service unavailable")
    
    def _update_cache(self, func_name, result):
        """Update cache with fresh result"""
        self.cache[func_name] = result

Monitoring and Observability

Circuit Breaker Metrics

from dataclasses import dataclass
from typing import Dict

@dataclass
class CircuitBreakerMetrics:
    """Metrics for monitoring circuit breaker"""
    state: str
    failure_count: int
    success_count: int
    total_calls: int
    rejection_count: int
    
    @property
    def failure_rate(self) -> float:
        if self.total_calls == 0:
            return 0.0
        return self.failure_count / self.total_calls
    
    @property
    def is_healthy(self) -> bool:
        return self.state == "closed" and self.failure_rate < 0.5


class InstrumentedCircuitBreaker(CircuitBreaker):
    """Circuit breaker with metrics"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.total_calls = 0
        self.rejection_count = 0
        self._metrics = CircuitBreakerMetrics(
            state="closed",
            failure_count=0,
            success_count=0,
            total_calls=0,
            rejection_count=0
        )
    
    def call(self, func, *args, **kwargs):
        self.total_calls += 1
        
        if not self._can_execute():
            self.rejection_count += 1
            raise CircuitBreakerOpenError("Circuit open")
        
        try:
            result = func(*args, **kwargs)
            self.success_count += 1
            self._update_metrics()
            return result
            
        except self.expected_exception as e:
            self.failure_count += 1
            self._update_metrics()
            raise e
    
    def _update_metrics(self):
        self._metrics = CircuitBreakerMetrics(
            state=self.state.value,
            failure_count=self.failure_count,
            success_count=self.success_count,
            total_calls=self.total_calls,
            rejection_count=self.rejection_count
        )
    
    def get_metrics(self) -> CircuitBreakerMetrics:
        return self._metrics

Health Check Endpoint

@app.get("/health/circuit-breaker")
def circuit_breaker_health():
    """Expose circuit breaker health"""
    
    breakers = {
        "user_service": user_circuit_breaker.get_metrics(),
        "payment_service": payment_circuit_breaker.get_metrics(),
        "notification_service": notification_circuit_breaker.get_metrics()
    }
    
    overall_healthy = all(
        b.is_healthy for b in breakers.values()
    )
    
    return {
        "status": "healthy" if overall_healthy else "degraded",
        "breakers": {
            name: {
                "state": metrics.state,
                "failure_rate": f"{metrics.failure_rate:.2%}",
                "total_calls": metrics.total_calls
            }
            for name, metrics in breakers.items()
        }
    }

Best Practices

# Circuit breaker best practices

configuration:
  - "Set failure threshold based on normal error rates"
  - "Recovery timeout should allow service to recover"
  - "Half-open state limits test requests"

fallback:
  - "Always provide fallback responses"
  - "Log circuit breaker events for debugging"
  - "Consider stale data vs no data"

monitoring:
  - "Track state changes"
  - "Alert when circuit opens"
  - "Monitor failure rates"

isolation:
  - "Use separate breakers per dependency"
  - "Don't share breakers across services"
  - "Consider timeout + circuit breaker"

Conclusion

Circuit breakers prevent cascading failures:

  • States: Closed (normal), Open (blocked), Half-open (testing)
  • Fallback: Always provide fallback responses
  • Monitoring: Track metrics and alert on state changes

Use circuit breakers for any external service call to build resilient systems.


Comments