Skip to main content
โšก Calmops

Saga Pattern: Managing Distributed Transactions in Microservices

The Saga pattern provides a way to manage distributed transactions across multiple services without using traditional two-phase commit. Instead of ACID properties, sagas use a sequence of local transactions with compensation actions for rollback.

Understanding the Saga Pattern

The Distributed Transaction Problem

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚            Traditional ACID Transaction (Not Possible)           โ”‚
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚  Order  โ”‚โ”€โ”€โ”€โ–ถโ”‚ Payment โ”‚โ”€โ”€โ”€โ–ถโ”‚Inventoryโ”‚โ”€โ”€โ”€โ–ถโ”‚Shipping โ”‚    โ”‚
โ”‚  โ”‚ Service โ”‚    โ”‚ Service โ”‚    โ”‚ Service โ”‚    โ”‚ Service โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚       โ”‚              โ”‚              โ”‚              โ”‚          โ”‚
โ”‚       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚
โ”‚                         โ”‚                                       โ”‚
โ”‚                         โ–ผ                                       โ”‚
โ”‚              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                           โ”‚
โ”‚              โ”‚   Two-Phase Commit   โ”‚                           โ”‚
โ”‚              โ”‚   (Not Available    โ”‚                           โ”‚
โ”‚              โ”‚   Across Services!) โ”‚                          โ”‚
โ”‚              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                           โ”‚
โ”‚                                                                 โ”‚
โ”‚  Problems:                                                      โ”‚
โ”‚  โœ— Each service has own database                               โ”‚
โ”‚  โœ— No distributed transaction coordinator                      โ”‚
โ”‚  โœ— Network partitions can cause locks                          โ”‚
โ”‚  โœ— Not scalable across service boundaries                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Saga Solution

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Saga Pattern Solution                         โ”‚
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚                    Create Order Saga                       โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                                  โ”‚
โ”‚  Step 1: Order Service          Step 2: Payment Service        โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”            โ”‚
โ”‚  โ”‚ Create Order    โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Charge Payment  โ”‚            โ”‚
โ”‚  โ”‚ (pending)       โ”‚โ—€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚                 โ”‚            โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ”‚
โ”‚         โ”‚                               โ”‚                       โ”‚
โ”‚         โ”‚ Compensation:                 โ”‚ Compensation:         โ”‚
โ”‚         โ”‚ Delete Order                  โ”‚ Refund Payment       โ”‚
โ”‚         โ–ผ                               โ–ผ                       โ”‚
โ”‚  Step 3: Inventory           Step 4: Shipping                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”            โ”‚
โ”‚  โ”‚ Reserve Stock  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Create Shipmentโ”‚            โ”‚
โ”‚  โ”‚                 โ”‚โ—€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚                 โ”‚            โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ”‚
โ”‚         โ”‚                               โ”‚                       โ”‚
โ”‚         โ”‚ Compensation:                 โ”‚ Compensation:         โ”‚
โ”‚         โ”‚ Release Stock                 โ”‚ Cancel Shipment      โ”‚
โ”‚                                                                  โ”‚
โ”‚  If ANY step fails: Execute compensations in REVERSE order!   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Choreography vs Orchestration

Choreography-Based Saga

# Event-driven: Each service publishes and subscribes to events
class OrderService:
    async def create_order(self, order_data: dict) -> Order:
        order = Order(status="PENDING", **order_data)
        await self.db.save(order)
        
        await self.event_bus.publish(OrderCreatedEvent(
            order_id=order.id,
            customer_id=order.customer_id,
            items=order.items,
            total_amount=order.total_amount
        ))
        
        return order

class PaymentService:
    def __init__(self, event_bus: EventBus):
        event_bus.subscribe(OrderCreatedEvent, self.handle_order_created)
    
    async def handle_order_created(self, event: OrderCreatedEvent):
        try:
            payment = await self.charge_customer(
                customer_id=event.customer_id,
                amount=event.total_amount
            )
            
            await self.event_bus.publish(PaymentSucceededEvent(
                order_id=event.order_id,
                payment_id=payment.id
            ))
            
        except PaymentFailed as e:
            await self.event_bus.publish(PaymentFailedEvent(
                order_id=event.order_id,
                reason=str(e)
            ))

class InventoryService:
    def __init__(self, event_bus: EventBus):
        event_bus.subscribe(PaymentSucceededEvent, self.handle_payment_success)
    
    async def handle_payment_success(self, event: PaymentSucceededEvent):
        try:
            await self.reserve_stock(event.order_id)
            
            await self.event_bus.publish(StockReservedEvent(
                order_id=event.order_id
            ))
            
        except InsufficientStock:
            await self.event_bus.publish(StockReservationFailedEvent(
                order_id=event.order_id,
                reason="Insufficient stock"
            ))

# Handling failures in choreography
class OrderService:
    def __init__(self, event_bus: EventBus):
        event_bus.subscribe(PaymentFailedEvent, self.handle_payment_failed)
        event_bus.subscribe(StockReservationFailedEvent, self.handle_stock_failed)
        event_bus.subscribe(ShipmentFailedEvent, self.handle_shipment_failed)
    
    async def handle_payment_failed(self, event: PaymentFailedEvent):
        await self.db.update(
            event.order_id,
            status="PAYMENT_FAILED",
            failure_reason=event.reason
        )
    
    async def handle_stock_failed(self, event: StockReservationFailedEvent):
        await self.db.update(
            event.order_id,
            status="OUT_OF_STOCK",
            failure_reason=event.reason
        )

Orchestration-Based Saga

class OrderSagaOrchestrator:
    def __init__(
        self,
        order_service: OrderService,
        payment_service: PaymentService,
        inventory_service: InventoryService,
        shipping_service: ShippingService,
        saga_log: SagaLog
    ):
        self.order = order_service
        self.payment = payment_service
        self.inventory = inventory_service
        self.shipping = shipping_service
        self.saga_log = saga_log
    
    async def execute_create_order_saga(self, order_data: dict) -> SagaResult:
        saga_id = str(uuid.uuid4())
        saga = SagaState(
            id=saga_id,
            state="STARTED",
            current_step=0,
            compensating=False
        )
        
        steps = [
            SagaStep(
                name="create_order",
                execute=lambda: self.order.create(order_data),
                compensate=lambda: self.order.cancel(saga_id)
            ),
            SagaStep(
                name="charge_payment",
                execute=lambda: self.payment.charge(
                    order_data["customer_id"],
                    order_data["total_amount"]
                ),
                compensate=lambda: self.payment.refund(saga_id)
            ),
            SagaStep(
                name="reserve_inventory",
                execute=lambda: self.inventory.reserve(order_data["items"]),
                compensate=lambda: self.inventory.release(saga_id)
            ),
            SagaStep(
                name="create_shipment",
                execute=lambda: self.shipping.create(
                    order_data["shipping_address"]
                ),
                compensate=lambda: self.shipping.cancel(saga_id)
            ),
        ]
        
        try:
            for i, step in enumerate(saga.steps):
                saga.current_step = i
                saga.current_action = step.name
                
                await self.saga_log.log(saga)
                
                result = await step.execute()
                
                saga.state = f"STEP_{i}_COMPLETED"
                await self.saga_log.log(saga)
            
            saga.state = "COMPLETED"
            await self.saga_log.log(saga)
            
            return SagaResult(success=True, order_id=saga_id)
            
        except Exception as e:
            await self._execute_compensations(saga, steps)
            return SagaResult(success=False, error=str(e))
    
    async def _execute_compensations(
        self, 
        saga: SagaState, 
        steps: list[SagaStep]
    ):
        saga.state = "COMPENSATING"
        saga.compensating = True
        
        for i in range(saga.current_step - 1, -1, -1):
            step = steps[i]
            
            if step.compensate:
                try:
                    saga.current_action = f"COMPENSATING_{step.name}"
                    await self.saga_log.log(saga)
                    
                    await step.compensate()
                    
                except Exception as comp_error:
                    saga.compensation_failures.append({
                        "step": step.name,
                        "error": str(comp_error)
                    })
        
        saga.state = "COMPENSATED" if not saga.compensation_failures else "COMPENSATION_PARTIAL"
        await self.saga_log.log(saga)

Implementation Details

Saga State Management

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json

class SagaStatus(Enum):
    STARTED = "started"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    COMPENSATION_FAILED = "compensation_failed"
    FAILED = "failed"

@dataclass
class SagaState:
    id: str
    name: str
    status: SagaStatus
    current_step: int = 0
    started_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
    completed_at: datetime = None
    payload: dict = field(default_factory=dict)
    completed_steps: list[dict] = field(default_factory=list)
    compensation_failures: list[dict] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)

class SagaRepository:
    def __init__(self, db_connection):
        self.db = db_connection
    
    async def save(self, saga: SagaState):
        await self.db.execute("""
            INSERT INTO sagas (id, name, status, current_step, payload, 
                             completed_steps, compensation_failures,
                             started_at, updated_at, completed_at, metadata)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(id) DO UPDATE SET
                status = excluded.status,
                current_step = excluded.current_step,
                updated_at = excluded.updated_at,
                completed_steps = excluded.completed_steps,
                compensation_failures = excluded.compensation_failures,
                completed_at = excluded.completed_at
        """, (
            saga.id, saga.name, saga.status.value, saga.current_step,
            json.dumps(saga.payload), json.dumps(saga.completed_steps),
            json.dumps(saga.compensation_failures),
            saga.started_at, saga.updated_at, saga.completed_at,
            json.dumps(saga.metadata)
        ))
    
    async def get(self, saga_id: str) -> SagaState | None:
        row = await self.db.fetchone(
            "SELECT * FROM sagas WHERE id = ?", 
            (saga_id,)
        )
        
        if not row:
            return None
        
        return SagaState(
            id=row["id"],
            name=row["name"],
            status=SagaStatus(row["status"]),
            current_step=row["current_step"],
            payload=json.loads(row["payload"]),
            completed_steps=json.loads(row["completed_steps"]),
            compensation_failures=json.loads(row["compensation_failures"]),
            started_at=row["started_at"],
            updated_at=row["updated_at"],
            completed_at=row["completed_at"],
            metadata=json.loads(row["metadata"])
        )
    
    async def get_pending_sagas(self, limit: int = 100) -> list[SagaState]:
        rows = await self.db.fetchall("""
            SELECT * FROM sagas 
            WHERE status IN (?, ?, ?)
            ORDER BY started_at ASC
            LIMIT ?
        """, (
            SagaStatus.STARTED.value,
            SagaStatus.IN_PROGRESS.value,
            SagaStatus.COMPENSATING.value,
            limit
        ))
        
        return [self._row_to_saga(row) for row in rows]

Compensation Actions

class CompensationBuilder:
    """Build compensation actions for each step."""
    
    @staticmethod
    def order_compensations() -> dict:
        return {
            "create_order": Compensation(
                forward=OrderService.create_order,
                backward=OrderService.cancel_order,
                timeout_seconds=30,
                retry_policy=RetryPolicy(max_attempts=3, backoff="exponential")
            ),
            "confirm_order": Compensation(
                forward=OrderService.confirm_order,
                backward=OrderService.unconfirm_order,
                timeout_seconds=30,
                retry_policy=RetryPolicy(max_attempts=3, backoff="exponential")
            )
        }
    
    @staticmethod
    def payment_compensations() -> dict:
        return {
            "charge_payment": Compensation(
                forward=PaymentService.charge,
                backward=PaymentService.refund,
                timeout_seconds=60,
                retry_policy=RetryPolicy(max_attempts=3, backoff="exponential"),
                idempotency_key="payment_id"
            ),
            "authorize_payment": Compensation(
                forward=PaymentService.authorize,
                backward=PaymentService.void_authorization,
                timeout_seconds=30,
                retry_policy=RetryPolicy(max_attempts=2, backoff="linear")
            )
        }

class CompensationExecutor:
    def __init__(self, saga_repository: SagaRepository):
        self.saga_repo = saga_repository
    
    async def execute_compensation(
        self, 
        saga_id: str, 
        step_name: str,
        forward_result: any
    ) -> CompensationResult:
        saga = await self.saga_repo.get(saga_id)
        
        compensation_action = self._get_compensation(saga.name, step_name)
        
        try:
            result = await self._execute_with_retry(
                compensation_action,
                forward_result
            )
            
            await self._log_compensation_success(saga, step_name, result)
            
            return CompensationResult(success=True, result=result)
            
        except Exception as e:
            await self._log_compensation_failure(saga, step_name, e)
            
            if compensation_action.critical:
                await self._alert_on_compensation_failure(saga, step_name, e)
            
            return CompensationResult(success=False, error=str(e))
    
    async def _execute_with_retry(
        self, 
        compensation: Compensation,
        context: any
    ) -> any:
        last_error = None
        
        for attempt in range(compensation.retry_policy.max_attempts):
            try:
                return await asyncio.wait_for(
                    compensation.backward(context),
                    timeout=compensation.timeout_seconds
                )
            except Exception as e:
                last_error = e
                if attempt < compensation.retry_policy.max_attempts - 1:
                    delay = self._calculate_backoff(
                        compensation.retry_policy.backoff,
                        attempt
                    )
                    await asyncio.sleep(delay)
        
        raise last_error

Idempotency in Sagas

class IdempotentSagaStep:
    def __init__(self, cache: Redis):
        self.cache = cache
    
    async def execute_idempotent(
        self,
        step_id: str,
        execute_fn: callable,
        *args, **kwargs
    ) -> any:
        cached_result = await self.cache.get(f"step_result:{step_id}")
        
        if cached_result:
            return json.loads(cached_result)
        
        result = await execute_fn(*args, **kwargs)
        
        await self.cache.setex(
            f"step_result:{step_id}",
            3600,
            json.dumps(result)
        )
        
        return result


class SagaIdempotencyManager:
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def check_step_completed(
        self, 
        saga_id: str, 
        step_name: str
    ) -> bool:
        key = f"saga:{saga_id}:step:{step_name}"
        return await self.redis.exists(key)
    
    async def mark_step_completed(
        self, 
        saga_id: str, 
        step_name: str, 
        result: any
    ):
        key = f"saga:{saga_id}:step:{step_name}"
        
        await self.redis.setex(
            key,
            86400,  # 24 hours
            json.dumps({
                "result": result,
                "completed_at": datetime.utcnow().isoformat()
            })
        )
    
    async def mark_step_failed(
        self, 
        saga_id: str, 
        step_name: str, 
        error: str
    ):
        key = f"saga:{saga_id}:step:{step_name}:failed"
        
        await self.redis.setex(
            key,
            86400,
            json.dumps({
                "error": error,
                "failed_at": datetime.utcnow().isoformat()
            })
        )

Handling Failures

Retry with Backoff

class SagaRetryPolicy:
    def __init__(
        self,
        max_attempts: int = 3,
        backoff_type: str = "exponential",
        base_delay: float = 1.0,
        max_delay: float = 60.0
    ):
        self.max_attempts = max_attempts
        self.backoff_type = backoff_type
        self.base_delay = base_delay
        self.max_delay = max_delay
    
    def calculate_delay(self, attempt: int) -> float:
        if self.backoff_type == "exponential":
            delay = self.base_delay * (2 ** attempt)
        elif self.backoff_type == "linear":
            delay = self.base_delay * attempt
        elif self.backoff_type == "constant":
            delay = self.base_delay
        else:
            delay = self.base_delay
        
        return min(delay, self.max_delay)


async def execute_with_saga_retry(
    saga_id: str,
    step_name: str,
    execute_fn: callable,
    retry_policy: SagaRetryPolicy,
    error_handler: callable
) -> any:
    last_error = None
    
    for attempt in range(retry_policy.max_attempts):
        try:
            return await execute_fn()
            
        except RetryableError as e:
            last_error = e
            
            if attempt < retry_policy.max_attempts - 1:
                delay = retry_policy.calculate_delay(attempt)
                logger.warning(
                    f"Saga {saga_id} step {step_name} failed, "
                    f"retrying in {delay}s (attempt {attempt + 1})"
                )
                await asyncio.sleep(delay)
                
        except NonRetryableError as e:
            raise e
    
    await error_handler(saga_id, step_name, last_error)
    raise last_error

Dead Letter and Recovery

class SagaDeadLetterQueue:
    def __init__(self, db_connection, notification_service):
        self.db = db_connection
        self.notifier = notification_service
    
    async def add_to_dead_letter(
        self, 
        saga: SagaState, 
        failed_step: str,
        error: Exception
    ):
        await self.db.execute("""
            INSERT INTO saga_dead_letter 
            (saga_id, saga_name, failed_step, error_message, 
             saga_payload, created_at)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            saga.id, saga.name, failed_step, str(error),
            json.dumps(saga.payload), datetime.utcnow()
        ))
        
        await self.notifier.alert(
            subject=f"Saga {saga.name} requires manual intervention",
            body=f"Saga {saga.id} failed at step {failed_step}. "
                 f"Manual compensation may be required."
        )


class SagaRecoveryService:
    def __init__(
        self, 
        saga_repo: SagaRepository,
        dead_letter: SagaDeadLetterQueue,
        alert_system: AlertSystem
    ):
        self.saga_repo = saga_repo
        self.dead_letter = dead_letter
        self.alert = alert_system
    
    async def recover_stuck_sagas(self):
        stuck_sagas = await self.saga_repo.get_pending_sagas()
        
        for saga in stuck_sagas:
            if await self._is_stuck_too_long(saga):
                await self._attempt_recovery(saga)
    
    async def _attempt_recovery(self, saga: SagaState):
        try:
            if saga.status == SagaStatus.COMPENSATING:
                await self._resume_compensation(saga)
            else:
                await self._restart_saga(saga)
                
        except Exception as e:
            await self.dead_letter.add_to_dead_letter(
                saga, 
                saga.current_step,
                e
            )
            await self.alert.send_saga_alert(saga, e)
    
    async def _resume_compensation(self, saga: SagaState):
        steps = await self._get_saga_steps(saga.name)
        
        for i in range(saga.current_step - 1, -1, -1):
            step = steps[i]
            
            if i not in [s["step_index"] for s in saga.completed_steps]:
                continue
            
            try:
                await self._execute_compensation(saga, step)
            except Exception as e:
                logger.error(f"Compensation failed for step {i}: {e}")
                break

Timeout Handling

class SagaTimeoutManager:
    def __init__(self, redis: Redis, callback_queue: asyncio.Queue):
        self.redis = redis
        self.callbacks = callback_queue
    
    async def schedule_timeout(
        self,
        saga_id: str,
        step_name: str,
        timeout_seconds: int
    ):
        timeout_key = f"saga_timeout:{saga_id}:{step_name}"
        
        await self.redis.setex(
            timeout_key,
            timeout_seconds,
            json.dumps({
                "saga_id": saga_id,
                "step_name": step_name,
                "scheduled_at": time.time()
            })
        )
    
    async def cancel_timeout(self, saga_id: str, step_name: str):
        timeout_key = f"saga_timeout:{saga_id}:{step_name}"
        await self.redis.delete(timeout_key)
    
    async def check_timeouts(self):
        keys = await self.redis.keys("saga_timeout:*")
        
        for key in keys:
            value = await self.redis.get(key)
            if not value:
                continue
            
            timeout_data = json.loads(value)
            
            await self.callbacks.put(SagaTimeoutEvent(
                saga_id=timeout_data["saga_id"],
                step_name=timeout_data["step_name"]
            ))
            
            await self.redis.delete(key)


class SagaTimeoutHandler:
    def __init__(self, saga_repo: SagaRepository):
        self.saga_repo = saga_repo
    
    async def handle_timeout(self, event: SagaTimeoutEvent):
        saga = await self.saga_repo.get(event.saga_id)
        
        if not saga or saga.status in [
            SagaStatus.COMPLETED,
            SagaStatus.COMPENSATED
        ]:
            return
        
        logger.warning(
            f"Saga {saga.id} timed out at step {event.step_name}"
        )
        
        await self._execute_compensations(saga)
        
        await self.saga_repo.update_status(
            saga.id,
            SagaStatus.COMPENSATED
        )

Best Practices

Good Patterns

GOOD_PATTERNS = {
    "define_compensation_upfront": """
# Always define compensation action when designing the forward action

โœ… Good:
class TransferMoneyStep:
    def __init__(self):
        self.forward = self._transfer
        self.compensate = self._reverse_transfer  # Always defined
    
    async def _transfer(self, from, to, amount):
        await db.decrement_balance(from, amount)
        await db.increment_balance(to, amount)
    
    async def _reverse_transfer(self, from, to, amount):
        await db.increment_balance(from, amount)
        await db.decrement_balance(to, amount)

โŒ Bad:
# Only define forward action
async def transfer_money(from, to, amount):
    # No compensation logic defined
    pass
""",
    
    "idempotent_steps": """
# Make saga steps idempotent to handle retries

โœ… Good:
async def charge_payment(payment_data):
    existing = await db.find_payment(payment_data["idempotency_key"])
    if existing:
        return existing  # Already charged
    
    return await db.create_payment(payment_data)

โŒ Bad:
# Creates duplicate charges on retry
async def charge_payment(payment_data):
    return await db.create_payment(payment_data)
""",
    
    "timeout_for_each_step": """
# Set timeouts for each saga step

โœ… Good:
STEP_TIMEOUTS = {
    "create_order": 5,
    "charge_payment": 30,
    "reserve_inventory": 10,
    "create_shipment": 15
}

โŒ Bad:
# No timeouts - saga can hang indefinitely
"""
}

Bad Patterns

BAD_PATTERNS = {
    "no_compensation": """
โŒ Bad:
# Step without compensation logic
# What happens if step 3 fails?

Step 1: Create Order     โœ“
Step 2: Charge Payment   โœ“
Step 3: Reserve Stock    โœ— (fails)
# Now what? Order charged but no stock!

โœ… Good:
# Always have compensation
Step 1: Create Order     โœ“
Step 2: Charge Payment  โœ“ (compensation: refund)
Step 3: Reserve Stock    โœ— (compensation: release)
# Can now rollback properly
""",
    
    "synchronous_wait": """
โŒ Bad:
# Blocking wait in saga
async def execute_saga(self, order):
    result = await self.order_service.create(order)
    
    # BAD: Blocking wait!
    while True:
        status = await self.check_payment_status(result.id)
        if status != "pending":
            break
        await asyncio.sleep(1)
    
    # Continues only when payment completes
    # This defeats the purpose of sagas!

โœ… Good:
# Event-driven: proceed asynchronously
async def handle_payment_succeeded(event):
    await self.reserve_inventory(event.order_id)
""",
    
    "ignore_compensation_failures": """
โŒ Bad:
async def compensate(self, saga):
    for step in reversed(saga.completed_steps):
        try:
            await step.compensate()
        except:
            pass  # Silently ignore!

โœ… Good:
async def compensate(self, saga):
    failures = []
    for step in reversed(saga.completed_steps):
        try:
            await step.compensate()
        except Exception as e:
            failures.append({"step": step.name, "error": str(e)})
            await self.alert.alert(f"Compensation failed: {e}")
    
    await self.saga_repo.record_compensation_failures(failures)
"""
}

Testing Sagas

import pytest

class SagaTestHelper:
    @pytest.fixture
    def mock_services(self):
        return {
            "order": MockOrderService(),
            "payment": MockPaymentService(),
            "inventory": MockInventoryService()
        }
    
    async def test_saga_success_flow(self, mock_services):
        saga = CreateOrderSaga(mock_services)
        
        result = await saga.execute(
            customer_id="cust_123",
            items=[{"product_id": "prod_1", "quantity": 2}],
            total=100.00
        )
        
        assert result.success
        assert mock_services.order.created
        assert mock_services.payment.charged
        assert mock_services.inventory.reserved
    
    async def test_saga_compensation_on_failure(self, mock_services):
        mock_services.payment.charge_failure = Exception("Insufficient funds")
        
        saga = CreateOrderSaga(mock_services)
        
        result = await saga.execute(
            customer_id="cust_123",
            items=[{"product_id": "prod_1", "quantity": 2}],
            total=100.00
        )
        
        assert not result.success
        assert mock_services.order.cancelled
        assert not mock_services.payment.charged
        assert not mock_services.inventory.reserved
    
    async def test_saga_partial_compensation(self, mock_services):
        mock_services.inventory.reserve_failure = Exception("Out of stock")
        
        saga = CreateOrderSaga(mock_services)
        
        result = await saga.execute(
            customer_id="cust_123",
            items=[{"product_id": "prod_1", "quantity": 2}],
            total=100.00
        )
        
        assert not result.success
        assert mock_services.order.cancelled
        assert mock_services.payment.refunded  # Compensation run
        assert not mock_services.inventory.reserved

Summary

The Saga pattern enables distributed transactions in microservices:

  • Choreography - Event-driven, services publish/subscribe, simpler but harder to trace
  • Orchestration - Central coordinator, explicit flow, easier debugging but single point
  • Compensation - Each forward action has a corresponding rollback action
  • Idempotency - Steps must handle duplicate executions gracefully
  • Timeouts - Set timeouts and handle missing responses
  • Dead Letter - Handle compensation failures with manual intervention queues

Sagas trade ACID for BASE (Basically Available, Soft state, Eventual consistency), making them suitable for loosely coupled microservices where strict consistency isn’t required.

Comments