Skip to main content
โšก Calmops

Distributed Transactions: Saga Pattern, Outbox, and 2PC with Code

Introduction

When a business operation spans multiple services โ€” create order, reserve inventory, charge payment โ€” you need all three to succeed or all three to fail. Traditional database transactions don’t work across service boundaries. This guide covers the patterns that do, with working code.

The core problem:

Order Service โ†’ Inventory Service โ†’ Payment Service

If payment fails after inventory is reserved:
  - Inventory is reserved but no order exists
  - Money not charged but inventory locked
  โ†’ Inconsistent state

Two-Phase Commit (2PC): Strong Consistency

2PC coordinates a transaction across multiple databases. A coordinator asks all participants to “prepare” (lock resources), then either commits or rolls back all of them.

Phase 1 (Prepare):
  Coordinator โ†’ "Can you commit?" โ†’ DB1, DB2, DB3
  All respond "Yes" โ†’ proceed
  Any responds "No" โ†’ abort

Phase 2 (Commit):
  Coordinator โ†’ "Commit!" โ†’ DB1, DB2, DB3

When to Use 2PC

โœ“ Financial transactions requiring ACID guarantees
โœ“ All participants are in the same data center
โœ“ You can tolerate higher latency
โœ“ Participants support XA transactions (PostgreSQL, MySQL)

โœ— Across geographically distributed systems
โœ— When any participant might be unavailable
โœ— High-throughput systems (2PC is blocking)

PostgreSQL XA Example

import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

def transfer_funds_2pc(from_account: int, to_account: int, amount: float):
    """Transfer funds across two databases using 2PC."""

    conn1 = psycopg2.connect("dbname=bank1")
    conn2 = psycopg2.connect("dbname=bank2")

    xid1 = conn1.xid(42, "transfer", "branch1")
    xid2 = conn2.xid(42, "transfer", "branch2")

    try:
        # Phase 1: Prepare both databases
        conn1.tpc_begin(xid1)
        cur1 = conn1.cursor()
        cur1.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
                     (amount, from_account))
        conn1.tpc_prepare()

        conn2.tpc_begin(xid2)
        cur2 = conn2.cursor()
        cur2.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
                     (amount, to_account))
        conn2.tpc_prepare()

        # Phase 2: Commit both
        conn1.tpc_commit(xid1)
        conn2.tpc_commit(xid2)

    except Exception as e:
        # Rollback both on any failure
        try:
            conn1.tpc_rollback(xid1)
        except:
            pass
        try:
            conn2.tpc_rollback(xid2)
        except:
            pass
        raise
    finally:
        conn1.close()
        conn2.close()

Saga Pattern: Eventual Consistency

Sagas break a distributed transaction into a sequence of local transactions. Each step publishes an event. If a step fails, compensating transactions undo the completed steps.

Order Saga:
  1. Create Order (pending)     โ†’ publish OrderCreated
  2. Reserve Inventory          โ†’ publish InventoryReserved
  3. Charge Payment             โ†’ publish PaymentCharged
  4. Confirm Order              โ†’ publish OrderConfirmed

Compensation (if payment fails):
  3. Payment failed             โ†’ publish PaymentFailed
  2. Release Inventory          โ† compensate step 2
  1. Cancel Order               โ† compensate step 1

Choreography-Based Saga

Services react to events โ€” no central coordinator:

# order_service.py
import json
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode()
)

def create_order(user_id: str, items: list) -> str:
    """Step 1: Create order in pending state."""
    order_id = generate_id()

    # Local transaction
    with db.transaction():
        db.execute("""
            INSERT INTO orders (id, user_id, status, items)
            VALUES (%s, %s, 'pending', %s)
        """, (order_id, user_id, json.dumps(items)))

    # Publish event to trigger next step
    producer.send('orders', {
        'event_type': 'OrderCreated',
        'order_id': order_id,
        'user_id': user_id,
        'items': items,
    })

    return order_id

# Listen for compensation events
consumer = KafkaConsumer('payments', bootstrap_servers=['kafka:9092'])

for message in consumer:
    event = json.loads(message.value)

    if event['event_type'] == 'PaymentFailed':
        # Compensate: cancel the order
        with db.transaction():
            db.execute("""
                UPDATE orders SET status = 'cancelled',
                cancelled_reason = %s
                WHERE id = %s
            """, (event.get('reason', 'payment_failed'), event['order_id']))

        producer.send('orders', {
            'event_type': 'OrderCancelled',
            'order_id': event['order_id'],
            'reason': 'payment_failed',
        })

Orchestration-Based Saga

A central orchestrator coordinates the steps โ€” easier to debug:

# order_saga_orchestrator.py
from enum import Enum
from dataclasses import dataclass

class SagaState(Enum):
    STARTED = "started"
    INVENTORY_RESERVED = "inventory_reserved"
    PAYMENT_CHARGED = "payment_charged"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    FAILED = "failed"

@dataclass
class OrderSaga:
    order_id: str
    state: SagaState
    compensation_steps: list

class OrderSagaOrchestrator:
    def __init__(self, inventory_client, payment_client, db):
        self.inventory = inventory_client
        self.payment   = payment_client
        self.db        = db

    def execute(self, order_id: str, items: list, payment_info: dict) -> dict:
        saga = OrderSaga(order_id=order_id, state=SagaState.STARTED, compensation_steps=[])

        try:
            # Step 1: Reserve inventory
            reservation = self.inventory.reserve(order_id, items)
            saga.state = SagaState.INVENTORY_RESERVED
            saga.compensation_steps.append(
                lambda: self.inventory.release(order_id, reservation['reservation_id'])
            )
            self._save_saga(saga)

            # Step 2: Charge payment
            charge = self.payment.charge(order_id, payment_info, reservation['total'])
            saga.state = SagaState.PAYMENT_CHARGED
            saga.compensation_steps.append(
                lambda: self.payment.refund(order_id, charge['transaction_id'])
            )
            self._save_saga(saga)

            # Step 3: Confirm order
            self.db.execute(
                "UPDATE orders SET status = 'confirmed' WHERE id = %s",
                (order_id,)
            )
            saga.state = SagaState.COMPLETED
            self._save_saga(saga)

            return {"status": "success", "order_id": order_id}

        except Exception as e:
            # Execute compensation in reverse order
            saga.state = SagaState.COMPENSATING
            self._save_saga(saga)

            for compensate in reversed(saga.compensation_steps):
                try:
                    compensate()
                except Exception as comp_err:
                    # Log but continue compensating
                    print(f"Compensation failed: {comp_err}")

            saga.state = SagaState.FAILED
            self._save_saga(saga)

            raise

    def _save_saga(self, saga: OrderSaga):
        """Persist saga state for recovery after crashes."""
        self.db.execute("""
            INSERT INTO saga_state (order_id, state, compensation_steps, updated_at)
            VALUES (%s, %s, %s, NOW())
            ON CONFLICT (order_id) DO UPDATE
            SET state = EXCLUDED.state, updated_at = NOW()
        """, (saga.order_id, saga.state.value, json.dumps([])))

Outbox Pattern: Reliable Event Publishing

The outbox pattern solves the dual-write problem: how to update your database AND publish an event atomically.

The problem:

# WRONG: if Kafka publish fails, DB is updated but no event sent
db.execute("UPDATE orders SET status = 'confirmed' WHERE id = %s", (order_id,))
kafka.send('orders', {'event': 'OrderConfirmed', 'order_id': order_id})  # might fail!

The solution: Write the event to an outbox table in the same transaction, then a separate process publishes it:

# CORRECT: atomic write to DB + outbox
def confirm_order(order_id: str):
    with db.transaction():
        # Update business data
        db.execute(
            "UPDATE orders SET status = 'confirmed' WHERE id = %s",
            (order_id,)
        )

        # Write event to outbox (same transaction)
        db.execute("""
            INSERT INTO outbox (id, topic, key, payload, created_at)
            VALUES (gen_random_uuid(), 'orders', %s, %s, NOW())
        """, (
            order_id,
            json.dumps({'event_type': 'OrderConfirmed', 'order_id': order_id})
        ))
    # Transaction committed โ€” both DB update and outbox entry are durable
# outbox_publisher.py โ€” runs as a separate process
import time

def publish_outbox_events():
    """Continuously read from outbox and publish to Kafka."""
    while True:
        # Fetch unpublished events
        events = db.fetchall("""
            SELECT id, topic, key, payload
            FROM outbox
            WHERE published_at IS NULL
            ORDER BY created_at
            LIMIT 100
            FOR UPDATE SKIP LOCKED
        """)

        for event in events:
            try:
                kafka.send(
                    topic=event['topic'],
                    key=event['key'].encode(),
                    value=event['payload'].encode()
                )
                kafka.flush()

                # Mark as published
                db.execute(
                    "UPDATE outbox SET published_at = NOW() WHERE id = %s",
                    (event['id'],)
                )

            except Exception as e:
                print(f"Failed to publish {event['id']}: {e}")
                # Will retry on next iteration

        time.sleep(0.1)  # 100ms polling interval
-- Outbox table schema
CREATE TABLE outbox (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    topic       VARCHAR(255) NOT NULL,
    key         VARCHAR(255),
    payload     JSONB NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ,
    INDEX idx_outbox_unpublished (created_at) WHERE published_at IS NULL
);

-- Clean up old published events
DELETE FROM outbox WHERE published_at < NOW() - INTERVAL '7 days';

Idempotency: Handle Duplicate Events

Sagas and event-driven systems deliver messages at-least-once. Your handlers must be idempotent:

def handle_payment_charged(event: dict):
    """Idempotent handler โ€” safe to call multiple times."""
    order_id = event['order_id']
    transaction_id = event['transaction_id']

    # Check if already processed
    existing = db.fetchone(
        "SELECT id FROM processed_events WHERE event_id = %s",
        (event['event_id'],)
    )
    if existing:
        return  # Already processed, skip

    with db.transaction():
        # Process the event
        db.execute(
            "UPDATE orders SET status = 'paid', transaction_id = %s WHERE id = %s",
            (transaction_id, order_id)
        )

        # Record that we processed this event
        db.execute(
            "INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
            (event['event_id'],)
        )

Choosing the Right Pattern

Strong consistency required (financial, medical)?
  โ†’ 2PC if all services in same DC
  โ†’ Saga with careful compensation if distributed

High availability required?
  โ†’ Saga (choreography or orchestration)
  โ†’ Accept eventual consistency

Need reliable event publishing?
  โ†’ Always use Outbox pattern

Simple workflow (2-3 steps)?
  โ†’ Choreography saga

Complex workflow (5+ steps, many failure modes)?
  โ†’ Orchestration saga (easier to debug)

Need audit trail?
  โ†’ Event sourcing + Saga

Resources

Comments