Introduction
When a business operation spans multiple services โ create order, reserve inventory, charge payment โ you need all three to succeed or all three to fail. Traditional database transactions don’t work across service boundaries. This guide covers the patterns that do, with working code.
The core problem:
Order Service โ Inventory Service โ Payment Service
If payment fails after inventory is reserved:
- Inventory is reserved but no order exists
- Money not charged but inventory locked
โ Inconsistent state
Two-Phase Commit (2PC): Strong Consistency
2PC coordinates a transaction across multiple databases. A coordinator asks all participants to “prepare” (lock resources), then either commits or rolls back all of them.
Phase 1 (Prepare):
Coordinator โ "Can you commit?" โ DB1, DB2, DB3
All respond "Yes" โ proceed
Any responds "No" โ abort
Phase 2 (Commit):
Coordinator โ "Commit!" โ DB1, DB2, DB3
When to Use 2PC
โ Financial transactions requiring ACID guarantees
โ All participants are in the same data center
โ You can tolerate higher latency
โ Participants support XA transactions (PostgreSQL, MySQL)
โ Across geographically distributed systems
โ When any participant might be unavailable
โ High-throughput systems (2PC is blocking)
PostgreSQL XA Example
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
def transfer_funds_2pc(from_account: int, to_account: int, amount: float):
"""Transfer funds across two databases using 2PC."""
conn1 = psycopg2.connect("dbname=bank1")
conn2 = psycopg2.connect("dbname=bank2")
xid1 = conn1.xid(42, "transfer", "branch1")
xid2 = conn2.xid(42, "transfer", "branch2")
try:
# Phase 1: Prepare both databases
conn1.tpc_begin(xid1)
cur1 = conn1.cursor()
cur1.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account))
conn1.tpc_prepare()
conn2.tpc_begin(xid2)
cur2 = conn2.cursor()
cur2.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account))
conn2.tpc_prepare()
# Phase 2: Commit both
conn1.tpc_commit(xid1)
conn2.tpc_commit(xid2)
except Exception as e:
# Rollback both on any failure
try:
conn1.tpc_rollback(xid1)
except:
pass
try:
conn2.tpc_rollback(xid2)
except:
pass
raise
finally:
conn1.close()
conn2.close()
Saga Pattern: Eventual Consistency
Sagas break a distributed transaction into a sequence of local transactions. Each step publishes an event. If a step fails, compensating transactions undo the completed steps.
Order Saga:
1. Create Order (pending) โ publish OrderCreated
2. Reserve Inventory โ publish InventoryReserved
3. Charge Payment โ publish PaymentCharged
4. Confirm Order โ publish OrderConfirmed
Compensation (if payment fails):
3. Payment failed โ publish PaymentFailed
2. Release Inventory โ compensate step 2
1. Cancel Order โ compensate step 1
Choreography-Based Saga
Services react to events โ no central coordinator:
# order_service.py
import json
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode()
)
def create_order(user_id: str, items: list) -> str:
"""Step 1: Create order in pending state."""
order_id = generate_id()
# Local transaction
with db.transaction():
db.execute("""
INSERT INTO orders (id, user_id, status, items)
VALUES (%s, %s, 'pending', %s)
""", (order_id, user_id, json.dumps(items)))
# Publish event to trigger next step
producer.send('orders', {
'event_type': 'OrderCreated',
'order_id': order_id,
'user_id': user_id,
'items': items,
})
return order_id
# Listen for compensation events
consumer = KafkaConsumer('payments', bootstrap_servers=['kafka:9092'])
for message in consumer:
event = json.loads(message.value)
if event['event_type'] == 'PaymentFailed':
# Compensate: cancel the order
with db.transaction():
db.execute("""
UPDATE orders SET status = 'cancelled',
cancelled_reason = %s
WHERE id = %s
""", (event.get('reason', 'payment_failed'), event['order_id']))
producer.send('orders', {
'event_type': 'OrderCancelled',
'order_id': event['order_id'],
'reason': 'payment_failed',
})
Orchestration-Based Saga
A central orchestrator coordinates the steps โ easier to debug:
# order_saga_orchestrator.py
from enum import Enum
from dataclasses import dataclass
class SagaState(Enum):
STARTED = "started"
INVENTORY_RESERVED = "inventory_reserved"
PAYMENT_CHARGED = "payment_charged"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
@dataclass
class OrderSaga:
order_id: str
state: SagaState
compensation_steps: list
class OrderSagaOrchestrator:
def __init__(self, inventory_client, payment_client, db):
self.inventory = inventory_client
self.payment = payment_client
self.db = db
def execute(self, order_id: str, items: list, payment_info: dict) -> dict:
saga = OrderSaga(order_id=order_id, state=SagaState.STARTED, compensation_steps=[])
try:
# Step 1: Reserve inventory
reservation = self.inventory.reserve(order_id, items)
saga.state = SagaState.INVENTORY_RESERVED
saga.compensation_steps.append(
lambda: self.inventory.release(order_id, reservation['reservation_id'])
)
self._save_saga(saga)
# Step 2: Charge payment
charge = self.payment.charge(order_id, payment_info, reservation['total'])
saga.state = SagaState.PAYMENT_CHARGED
saga.compensation_steps.append(
lambda: self.payment.refund(order_id, charge['transaction_id'])
)
self._save_saga(saga)
# Step 3: Confirm order
self.db.execute(
"UPDATE orders SET status = 'confirmed' WHERE id = %s",
(order_id,)
)
saga.state = SagaState.COMPLETED
self._save_saga(saga)
return {"status": "success", "order_id": order_id}
except Exception as e:
# Execute compensation in reverse order
saga.state = SagaState.COMPENSATING
self._save_saga(saga)
for compensate in reversed(saga.compensation_steps):
try:
compensate()
except Exception as comp_err:
# Log but continue compensating
print(f"Compensation failed: {comp_err}")
saga.state = SagaState.FAILED
self._save_saga(saga)
raise
def _save_saga(self, saga: OrderSaga):
"""Persist saga state for recovery after crashes."""
self.db.execute("""
INSERT INTO saga_state (order_id, state, compensation_steps, updated_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (order_id) DO UPDATE
SET state = EXCLUDED.state, updated_at = NOW()
""", (saga.order_id, saga.state.value, json.dumps([])))
Outbox Pattern: Reliable Event Publishing
The outbox pattern solves the dual-write problem: how to update your database AND publish an event atomically.
The problem:
# WRONG: if Kafka publish fails, DB is updated but no event sent
db.execute("UPDATE orders SET status = 'confirmed' WHERE id = %s", (order_id,))
kafka.send('orders', {'event': 'OrderConfirmed', 'order_id': order_id}) # might fail!
The solution: Write the event to an outbox table in the same transaction, then a separate process publishes it:
# CORRECT: atomic write to DB + outbox
def confirm_order(order_id: str):
with db.transaction():
# Update business data
db.execute(
"UPDATE orders SET status = 'confirmed' WHERE id = %s",
(order_id,)
)
# Write event to outbox (same transaction)
db.execute("""
INSERT INTO outbox (id, topic, key, payload, created_at)
VALUES (gen_random_uuid(), 'orders', %s, %s, NOW())
""", (
order_id,
json.dumps({'event_type': 'OrderConfirmed', 'order_id': order_id})
))
# Transaction committed โ both DB update and outbox entry are durable
# outbox_publisher.py โ runs as a separate process
import time
def publish_outbox_events():
"""Continuously read from outbox and publish to Kafka."""
while True:
# Fetch unpublished events
events = db.fetchall("""
SELECT id, topic, key, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for event in events:
try:
kafka.send(
topic=event['topic'],
key=event['key'].encode(),
value=event['payload'].encode()
)
kafka.flush()
# Mark as published
db.execute(
"UPDATE outbox SET published_at = NOW() WHERE id = %s",
(event['id'],)
)
except Exception as e:
print(f"Failed to publish {event['id']}: {e}")
# Will retry on next iteration
time.sleep(0.1) # 100ms polling interval
-- Outbox table schema
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
topic VARCHAR(255) NOT NULL,
key VARCHAR(255),
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
INDEX idx_outbox_unpublished (created_at) WHERE published_at IS NULL
);
-- Clean up old published events
DELETE FROM outbox WHERE published_at < NOW() - INTERVAL '7 days';
Idempotency: Handle Duplicate Events
Sagas and event-driven systems deliver messages at-least-once. Your handlers must be idempotent:
def handle_payment_charged(event: dict):
"""Idempotent handler โ safe to call multiple times."""
order_id = event['order_id']
transaction_id = event['transaction_id']
# Check if already processed
existing = db.fetchone(
"SELECT id FROM processed_events WHERE event_id = %s",
(event['event_id'],)
)
if existing:
return # Already processed, skip
with db.transaction():
# Process the event
db.execute(
"UPDATE orders SET status = 'paid', transaction_id = %s WHERE id = %s",
(transaction_id, order_id)
)
# Record that we processed this event
db.execute(
"INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
(event['event_id'],)
)
Choosing the Right Pattern
Strong consistency required (financial, medical)?
โ 2PC if all services in same DC
โ Saga with careful compensation if distributed
High availability required?
โ Saga (choreography or orchestration)
โ Accept eventual consistency
Need reliable event publishing?
โ Always use Outbox pattern
Simple workflow (2-3 steps)?
โ Choreography saga
Complex workflow (5+ steps, many failure modes)?
โ Orchestration saga (easier to debug)
Need audit trail?
โ Event sourcing + Saga
Resources
- Saga Pattern โ Chris Richardson
- Outbox Pattern โ Debezium
- Designing Data-Intensive Applications โ Martin Kleppmann
- Seata โ Distributed Transaction Solution
Comments