The Saga pattern provides a way to manage distributed transactions across multiple services without using traditional two-phase commit. Instead of ACID properties, sagas use a sequence of local transactions with compensation actions for rollback.
Understanding the Saga Pattern
The Distributed Transaction Problem
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Traditional ACID Transaction (Not Possible) โ
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Order โโโโโถโ Payment โโโโโถโInventoryโโโโโถโShipping โ โ
โ โ Service โ โ Service โ โ Service โ โ Service โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ โ โ
โ โโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Two-Phase Commit โ โ
โ โ (Not Available โ โ
โ โ Across Services!) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Problems: โ
โ โ Each service has own database โ
โ โ No distributed transaction coordinator โ
โ โ Network partitions can cause locks โ
โ โ Not scalable across service boundaries โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Saga Solution
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Saga Pattern Solution โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Create Order Saga โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Step 1: Order Service Step 2: Payment Service โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ Create Order โโโโโโโโโโโโถโ Charge Payment โ โ
โ โ (pending) โโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ Compensation: โ Compensation: โ
โ โ Delete Order โ Refund Payment โ
โ โผ โผ โ
โ Step 3: Inventory Step 4: Shipping โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ Reserve Stock โโโโโโโโโโโโถโ Create Shipmentโ โ
โ โ โโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ Compensation: โ Compensation: โ
โ โ Release Stock โ Cancel Shipment โ
โ โ
โ If ANY step fails: Execute compensations in REVERSE order! โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Choreography vs Orchestration
Choreography-Based Saga
# Event-driven: Each service publishes and subscribes to events
class OrderService:
async def create_order(self, order_data: dict) -> Order:
order = Order(status="PENDING", **order_data)
await self.db.save(order)
await self.event_bus.publish(OrderCreatedEvent(
order_id=order.id,
customer_id=order.customer_id,
items=order.items,
total_amount=order.total_amount
))
return order
class PaymentService:
def __init__(self, event_bus: EventBus):
event_bus.subscribe(OrderCreatedEvent, self.handle_order_created)
async def handle_order_created(self, event: OrderCreatedEvent):
try:
payment = await self.charge_customer(
customer_id=event.customer_id,
amount=event.total_amount
)
await self.event_bus.publish(PaymentSucceededEvent(
order_id=event.order_id,
payment_id=payment.id
))
except PaymentFailed as e:
await self.event_bus.publish(PaymentFailedEvent(
order_id=event.order_id,
reason=str(e)
))
class InventoryService:
def __init__(self, event_bus: EventBus):
event_bus.subscribe(PaymentSucceededEvent, self.handle_payment_success)
async def handle_payment_success(self, event: PaymentSucceededEvent):
try:
await self.reserve_stock(event.order_id)
await self.event_bus.publish(StockReservedEvent(
order_id=event.order_id
))
except InsufficientStock:
await self.event_bus.publish(StockReservationFailedEvent(
order_id=event.order_id,
reason="Insufficient stock"
))
# Handling failures in choreography
class OrderService:
def __init__(self, event_bus: EventBus):
event_bus.subscribe(PaymentFailedEvent, self.handle_payment_failed)
event_bus.subscribe(StockReservationFailedEvent, self.handle_stock_failed)
event_bus.subscribe(ShipmentFailedEvent, self.handle_shipment_failed)
async def handle_payment_failed(self, event: PaymentFailedEvent):
await self.db.update(
event.order_id,
status="PAYMENT_FAILED",
failure_reason=event.reason
)
async def handle_stock_failed(self, event: StockReservationFailedEvent):
await self.db.update(
event.order_id,
status="OUT_OF_STOCK",
failure_reason=event.reason
)
Orchestration-Based Saga
class OrderSagaOrchestrator:
def __init__(
self,
order_service: OrderService,
payment_service: PaymentService,
inventory_service: InventoryService,
shipping_service: ShippingService,
saga_log: SagaLog
):
self.order = order_service
self.payment = payment_service
self.inventory = inventory_service
self.shipping = shipping_service
self.saga_log = saga_log
async def execute_create_order_saga(self, order_data: dict) -> SagaResult:
saga_id = str(uuid.uuid4())
saga = SagaState(
id=saga_id,
state="STARTED",
current_step=0,
compensating=False
)
steps = [
SagaStep(
name="create_order",
execute=lambda: self.order.create(order_data),
compensate=lambda: self.order.cancel(saga_id)
),
SagaStep(
name="charge_payment",
execute=lambda: self.payment.charge(
order_data["customer_id"],
order_data["total_amount"]
),
compensate=lambda: self.payment.refund(saga_id)
),
SagaStep(
name="reserve_inventory",
execute=lambda: self.inventory.reserve(order_data["items"]),
compensate=lambda: self.inventory.release(saga_id)
),
SagaStep(
name="create_shipment",
execute=lambda: self.shipping.create(
order_data["shipping_address"]
),
compensate=lambda: self.shipping.cancel(saga_id)
),
]
try:
for i, step in enumerate(saga.steps):
saga.current_step = i
saga.current_action = step.name
await self.saga_log.log(saga)
result = await step.execute()
saga.state = f"STEP_{i}_COMPLETED"
await self.saga_log.log(saga)
saga.state = "COMPLETED"
await self.saga_log.log(saga)
return SagaResult(success=True, order_id=saga_id)
except Exception as e:
await self._execute_compensations(saga, steps)
return SagaResult(success=False, error=str(e))
async def _execute_compensations(
self,
saga: SagaState,
steps: list[SagaStep]
):
saga.state = "COMPENSATING"
saga.compensating = True
for i in range(saga.current_step - 1, -1, -1):
step = steps[i]
if step.compensate:
try:
saga.current_action = f"COMPENSATING_{step.name}"
await self.saga_log.log(saga)
await step.compensate()
except Exception as comp_error:
saga.compensation_failures.append({
"step": step.name,
"error": str(comp_error)
})
saga.state = "COMPENSATED" if not saga.compensation_failures else "COMPENSATION_PARTIAL"
await self.saga_log.log(saga)
Implementation Details
Saga State Management
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
class SagaStatus(Enum):
STARTED = "started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
COMPENSATION_FAILED = "compensation_failed"
FAILED = "failed"
@dataclass
class SagaState:
id: str
name: str
status: SagaStatus
current_step: int = 0
started_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
completed_at: datetime = None
payload: dict = field(default_factory=dict)
completed_steps: list[dict] = field(default_factory=list)
compensation_failures: list[dict] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
class SagaRepository:
def __init__(self, db_connection):
self.db = db_connection
async def save(self, saga: SagaState):
await self.db.execute("""
INSERT INTO sagas (id, name, status, current_step, payload,
completed_steps, compensation_failures,
started_at, updated_at, completed_at, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
status = excluded.status,
current_step = excluded.current_step,
updated_at = excluded.updated_at,
completed_steps = excluded.completed_steps,
compensation_failures = excluded.compensation_failures,
completed_at = excluded.completed_at
""", (
saga.id, saga.name, saga.status.value, saga.current_step,
json.dumps(saga.payload), json.dumps(saga.completed_steps),
json.dumps(saga.compensation_failures),
saga.started_at, saga.updated_at, saga.completed_at,
json.dumps(saga.metadata)
))
async def get(self, saga_id: str) -> SagaState | None:
row = await self.db.fetchone(
"SELECT * FROM sagas WHERE id = ?",
(saga_id,)
)
if not row:
return None
return SagaState(
id=row["id"],
name=row["name"],
status=SagaStatus(row["status"]),
current_step=row["current_step"],
payload=json.loads(row["payload"]),
completed_steps=json.loads(row["completed_steps"]),
compensation_failures=json.loads(row["compensation_failures"]),
started_at=row["started_at"],
updated_at=row["updated_at"],
completed_at=row["completed_at"],
metadata=json.loads(row["metadata"])
)
async def get_pending_sagas(self, limit: int = 100) -> list[SagaState]:
rows = await self.db.fetchall("""
SELECT * FROM sagas
WHERE status IN (?, ?, ?)
ORDER BY started_at ASC
LIMIT ?
""", (
SagaStatus.STARTED.value,
SagaStatus.IN_PROGRESS.value,
SagaStatus.COMPENSATING.value,
limit
))
return [self._row_to_saga(row) for row in rows]
Compensation Actions
class CompensationBuilder:
"""Build compensation actions for each step."""
@staticmethod
def order_compensations() -> dict:
return {
"create_order": Compensation(
forward=OrderService.create_order,
backward=OrderService.cancel_order,
timeout_seconds=30,
retry_policy=RetryPolicy(max_attempts=3, backoff="exponential")
),
"confirm_order": Compensation(
forward=OrderService.confirm_order,
backward=OrderService.unconfirm_order,
timeout_seconds=30,
retry_policy=RetryPolicy(max_attempts=3, backoff="exponential")
)
}
@staticmethod
def payment_compensations() -> dict:
return {
"charge_payment": Compensation(
forward=PaymentService.charge,
backward=PaymentService.refund,
timeout_seconds=60,
retry_policy=RetryPolicy(max_attempts=3, backoff="exponential"),
idempotency_key="payment_id"
),
"authorize_payment": Compensation(
forward=PaymentService.authorize,
backward=PaymentService.void_authorization,
timeout_seconds=30,
retry_policy=RetryPolicy(max_attempts=2, backoff="linear")
)
}
class CompensationExecutor:
def __init__(self, saga_repository: SagaRepository):
self.saga_repo = saga_repository
async def execute_compensation(
self,
saga_id: str,
step_name: str,
forward_result: any
) -> CompensationResult:
saga = await self.saga_repo.get(saga_id)
compensation_action = self._get_compensation(saga.name, step_name)
try:
result = await self._execute_with_retry(
compensation_action,
forward_result
)
await self._log_compensation_success(saga, step_name, result)
return CompensationResult(success=True, result=result)
except Exception as e:
await self._log_compensation_failure(saga, step_name, e)
if compensation_action.critical:
await self._alert_on_compensation_failure(saga, step_name, e)
return CompensationResult(success=False, error=str(e))
async def _execute_with_retry(
self,
compensation: Compensation,
context: any
) -> any:
last_error = None
for attempt in range(compensation.retry_policy.max_attempts):
try:
return await asyncio.wait_for(
compensation.backward(context),
timeout=compensation.timeout_seconds
)
except Exception as e:
last_error = e
if attempt < compensation.retry_policy.max_attempts - 1:
delay = self._calculate_backoff(
compensation.retry_policy.backoff,
attempt
)
await asyncio.sleep(delay)
raise last_error
Idempotency in Sagas
class IdempotentSagaStep:
def __init__(self, cache: Redis):
self.cache = cache
async def execute_idempotent(
self,
step_id: str,
execute_fn: callable,
*args, **kwargs
) -> any:
cached_result = await self.cache.get(f"step_result:{step_id}")
if cached_result:
return json.loads(cached_result)
result = await execute_fn(*args, **kwargs)
await self.cache.setex(
f"step_result:{step_id}",
3600,
json.dumps(result)
)
return result
class SagaIdempotencyManager:
def __init__(self, redis: Redis):
self.redis = redis
async def check_step_completed(
self,
saga_id: str,
step_name: str
) -> bool:
key = f"saga:{saga_id}:step:{step_name}"
return await self.redis.exists(key)
async def mark_step_completed(
self,
saga_id: str,
step_name: str,
result: any
):
key = f"saga:{saga_id}:step:{step_name}"
await self.redis.setex(
key,
86400, # 24 hours
json.dumps({
"result": result,
"completed_at": datetime.utcnow().isoformat()
})
)
async def mark_step_failed(
self,
saga_id: str,
step_name: str,
error: str
):
key = f"saga:{saga_id}:step:{step_name}:failed"
await self.redis.setex(
key,
86400,
json.dumps({
"error": error,
"failed_at": datetime.utcnow().isoformat()
})
)
Handling Failures
Retry with Backoff
class SagaRetryPolicy:
def __init__(
self,
max_attempts: int = 3,
backoff_type: str = "exponential",
base_delay: float = 1.0,
max_delay: float = 60.0
):
self.max_attempts = max_attempts
self.backoff_type = backoff_type
self.base_delay = base_delay
self.max_delay = max_delay
def calculate_delay(self, attempt: int) -> float:
if self.backoff_type == "exponential":
delay = self.base_delay * (2 ** attempt)
elif self.backoff_type == "linear":
delay = self.base_delay * attempt
elif self.backoff_type == "constant":
delay = self.base_delay
else:
delay = self.base_delay
return min(delay, self.max_delay)
async def execute_with_saga_retry(
saga_id: str,
step_name: str,
execute_fn: callable,
retry_policy: SagaRetryPolicy,
error_handler: callable
) -> any:
last_error = None
for attempt in range(retry_policy.max_attempts):
try:
return await execute_fn()
except RetryableError as e:
last_error = e
if attempt < retry_policy.max_attempts - 1:
delay = retry_policy.calculate_delay(attempt)
logger.warning(
f"Saga {saga_id} step {step_name} failed, "
f"retrying in {delay}s (attempt {attempt + 1})"
)
await asyncio.sleep(delay)
except NonRetryableError as e:
raise e
await error_handler(saga_id, step_name, last_error)
raise last_error
Dead Letter and Recovery
class SagaDeadLetterQueue:
def __init__(self, db_connection, notification_service):
self.db = db_connection
self.notifier = notification_service
async def add_to_dead_letter(
self,
saga: SagaState,
failed_step: str,
error: Exception
):
await self.db.execute("""
INSERT INTO saga_dead_letter
(saga_id, saga_name, failed_step, error_message,
saga_payload, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
saga.id, saga.name, failed_step, str(error),
json.dumps(saga.payload), datetime.utcnow()
))
await self.notifier.alert(
subject=f"Saga {saga.name} requires manual intervention",
body=f"Saga {saga.id} failed at step {failed_step}. "
f"Manual compensation may be required."
)
class SagaRecoveryService:
def __init__(
self,
saga_repo: SagaRepository,
dead_letter: SagaDeadLetterQueue,
alert_system: AlertSystem
):
self.saga_repo = saga_repo
self.dead_letter = dead_letter
self.alert = alert_system
async def recover_stuck_sagas(self):
stuck_sagas = await self.saga_repo.get_pending_sagas()
for saga in stuck_sagas:
if await self._is_stuck_too_long(saga):
await self._attempt_recovery(saga)
async def _attempt_recovery(self, saga: SagaState):
try:
if saga.status == SagaStatus.COMPENSATING:
await self._resume_compensation(saga)
else:
await self._restart_saga(saga)
except Exception as e:
await self.dead_letter.add_to_dead_letter(
saga,
saga.current_step,
e
)
await self.alert.send_saga_alert(saga, e)
async def _resume_compensation(self, saga: SagaState):
steps = await self._get_saga_steps(saga.name)
for i in range(saga.current_step - 1, -1, -1):
step = steps[i]
if i not in [s["step_index"] for s in saga.completed_steps]:
continue
try:
await self._execute_compensation(saga, step)
except Exception as e:
logger.error(f"Compensation failed for step {i}: {e}")
break
Timeout Handling
class SagaTimeoutManager:
def __init__(self, redis: Redis, callback_queue: asyncio.Queue):
self.redis = redis
self.callbacks = callback_queue
async def schedule_timeout(
self,
saga_id: str,
step_name: str,
timeout_seconds: int
):
timeout_key = f"saga_timeout:{saga_id}:{step_name}"
await self.redis.setex(
timeout_key,
timeout_seconds,
json.dumps({
"saga_id": saga_id,
"step_name": step_name,
"scheduled_at": time.time()
})
)
async def cancel_timeout(self, saga_id: str, step_name: str):
timeout_key = f"saga_timeout:{saga_id}:{step_name}"
await self.redis.delete(timeout_key)
async def check_timeouts(self):
keys = await self.redis.keys("saga_timeout:*")
for key in keys:
value = await self.redis.get(key)
if not value:
continue
timeout_data = json.loads(value)
await self.callbacks.put(SagaTimeoutEvent(
saga_id=timeout_data["saga_id"],
step_name=timeout_data["step_name"]
))
await self.redis.delete(key)
class SagaTimeoutHandler:
def __init__(self, saga_repo: SagaRepository):
self.saga_repo = saga_repo
async def handle_timeout(self, event: SagaTimeoutEvent):
saga = await self.saga_repo.get(event.saga_id)
if not saga or saga.status in [
SagaStatus.COMPLETED,
SagaStatus.COMPENSATED
]:
return
logger.warning(
f"Saga {saga.id} timed out at step {event.step_name}"
)
await self._execute_compensations(saga)
await self.saga_repo.update_status(
saga.id,
SagaStatus.COMPENSATED
)
Best Practices
Good Patterns
GOOD_PATTERNS = {
"define_compensation_upfront": """
# Always define compensation action when designing the forward action
โ
Good:
class TransferMoneyStep:
def __init__(self):
self.forward = self._transfer
self.compensate = self._reverse_transfer # Always defined
async def _transfer(self, from, to, amount):
await db.decrement_balance(from, amount)
await db.increment_balance(to, amount)
async def _reverse_transfer(self, from, to, amount):
await db.increment_balance(from, amount)
await db.decrement_balance(to, amount)
โ Bad:
# Only define forward action
async def transfer_money(from, to, amount):
# No compensation logic defined
pass
""",
"idempotent_steps": """
# Make saga steps idempotent to handle retries
โ
Good:
async def charge_payment(payment_data):
existing = await db.find_payment(payment_data["idempotency_key"])
if existing:
return existing # Already charged
return await db.create_payment(payment_data)
โ Bad:
# Creates duplicate charges on retry
async def charge_payment(payment_data):
return await db.create_payment(payment_data)
""",
"timeout_for_each_step": """
# Set timeouts for each saga step
โ
Good:
STEP_TIMEOUTS = {
"create_order": 5,
"charge_payment": 30,
"reserve_inventory": 10,
"create_shipment": 15
}
โ Bad:
# No timeouts - saga can hang indefinitely
"""
}
Bad Patterns
BAD_PATTERNS = {
"no_compensation": """
โ Bad:
# Step without compensation logic
# What happens if step 3 fails?
Step 1: Create Order โ
Step 2: Charge Payment โ
Step 3: Reserve Stock โ (fails)
# Now what? Order charged but no stock!
โ
Good:
# Always have compensation
Step 1: Create Order โ
Step 2: Charge Payment โ (compensation: refund)
Step 3: Reserve Stock โ (compensation: release)
# Can now rollback properly
""",
"synchronous_wait": """
โ Bad:
# Blocking wait in saga
async def execute_saga(self, order):
result = await self.order_service.create(order)
# BAD: Blocking wait!
while True:
status = await self.check_payment_status(result.id)
if status != "pending":
break
await asyncio.sleep(1)
# Continues only when payment completes
# This defeats the purpose of sagas!
โ
Good:
# Event-driven: proceed asynchronously
async def handle_payment_succeeded(event):
await self.reserve_inventory(event.order_id)
""",
"ignore_compensation_failures": """
โ Bad:
async def compensate(self, saga):
for step in reversed(saga.completed_steps):
try:
await step.compensate()
except:
pass # Silently ignore!
โ
Good:
async def compensate(self, saga):
failures = []
for step in reversed(saga.completed_steps):
try:
await step.compensate()
except Exception as e:
failures.append({"step": step.name, "error": str(e)})
await self.alert.alert(f"Compensation failed: {e}")
await self.saga_repo.record_compensation_failures(failures)
"""
}
Testing Sagas
import pytest
class SagaTestHelper:
@pytest.fixture
def mock_services(self):
return {
"order": MockOrderService(),
"payment": MockPaymentService(),
"inventory": MockInventoryService()
}
async def test_saga_success_flow(self, mock_services):
saga = CreateOrderSaga(mock_services)
result = await saga.execute(
customer_id="cust_123",
items=[{"product_id": "prod_1", "quantity": 2}],
total=100.00
)
assert result.success
assert mock_services.order.created
assert mock_services.payment.charged
assert mock_services.inventory.reserved
async def test_saga_compensation_on_failure(self, mock_services):
mock_services.payment.charge_failure = Exception("Insufficient funds")
saga = CreateOrderSaga(mock_services)
result = await saga.execute(
customer_id="cust_123",
items=[{"product_id": "prod_1", "quantity": 2}],
total=100.00
)
assert not result.success
assert mock_services.order.cancelled
assert not mock_services.payment.charged
assert not mock_services.inventory.reserved
async def test_saga_partial_compensation(self, mock_services):
mock_services.inventory.reserve_failure = Exception("Out of stock")
saga = CreateOrderSaga(mock_services)
result = await saga.execute(
customer_id="cust_123",
items=[{"product_id": "prod_1", "quantity": 2}],
total=100.00
)
assert not result.success
assert mock_services.order.cancelled
assert mock_services.payment.refunded # Compensation run
assert not mock_services.inventory.reserved
Summary
The Saga pattern enables distributed transactions in microservices:
- Choreography - Event-driven, services publish/subscribe, simpler but harder to trace
- Orchestration - Central coordinator, explicit flow, easier debugging but single point
- Compensation - Each forward action has a corresponding rollback action
- Idempotency - Steps must handle duplicate executions gracefully
- Timeouts - Set timeouts and handle missing responses
- Dead Letter - Handle compensation failures with manual intervention queues
Sagas trade ACID for BASE (Basically Available, Soft state, Eventual consistency), making them suitable for loosely coupled microservices where strict consistency isn’t required.
Comments