Skip to main content
โšก Calmops

Distributed Transactions Patterns: SAGA, Two-Phase Commit, and Eventual Consistency

Introduction

Distributed transactions are one of the most challenging aspects of microservices architecture. In a monolithic application, transactions are straightforward: a single database handles all operations atomically. In microservices, each service typically manages its own database, and operations span multiple services. The traditional ACID transaction model breaks down, requiring new approaches to maintain data consistency.

The CAP theorem tells us that in a distributed system, we cannot simultaneously achieve consistency, availability, and partition tolerance. When network partitions occur, we must choose between consistency and availability. Most modern systems choose availability, accepting eventual consistency as a trade-off. This shift requires fundamentally different approaches to maintaining data integrity.

This guide explores the major patterns for handling distributed transactions: the SAGA pattern for long-running workflows, two-phase commit for strong consistency when needed, and eventual consistency models that embrace the distributed nature of modern systems.

The Challenge of Distributed Transactions

Why Traditional Transactions Fail

In a monolithic application, a single database transaction can ensure that all operations either complete together or fail together. If an error occurs, the database rolls back all changes, maintaining consistency. This model is simple, reliable, and well-understood.

In microservices, each service owns its data and cannot participate in another service’s database transaction. When an operation spans multiple services, there is no global transaction to ensure atomicity. If one service succeeds and another fails, the system can become inconsistent.

Consider an e-commerce order that involves the user service, inventory service, payment service, and shipping service. If the payment succeeds but the inventory service fails, you have a charged customer with no reserved inventory. The traditional solution of rolling back the transaction is not possible because each service has already committed its part.

# The problem: No global transaction in microservices
class OrderService:
    def create_order(self, user_id: str, items: list):
        # Step 1: Create order in order database
        order = self.order_repo.create(user_id, items)
        
        # Step 2: Reserve inventory (separate service)
        try:
            self.inventory_service.reserve(order.id, items)
        except InventoryError:
            # Order is created but inventory failed
            # How do we roll back the order?
            self.order_repo.update_status(order.id, "failed")
            raise
        
        # Step 3: Process payment (separate service)
        try:
            self.payment_service.charge(order.user_id, order.total)
        except PaymentError:
            # Inventory is reserved but payment failed
            # We need to release inventory
            self.inventory_service.release(order.id)
            self.order_repo.update_status(order.id, "failed")
            raise
        
        # All succeeded
        self.order_repo.update_status(order.id, "confirmed")
        return order

Understanding Consistency Models

Different consistency models offer different trade-offs between consistency, availability, and performance.

Strong Consistency guarantees that any read receives the most recent write. This is what traditional databases provide. In distributed systems, achieving strong consistency requires coordination that can impact availability and performance.

Eventual Consistency guarantees that if no new updates are made, eventually all reads will return the last written value. This model provides high availability and partition tolerance but allows temporary inconsistencies.

Read Your Writes Consistency guarantees that after a client writes data, it will always see its own writes in subsequent reads. This is important for user experience but does not guarantee consistency across all clients.

Monotonic Reads Consistency guarantees that if a client reads a value, subsequent reads will not return earlier values. This prevents the user from seeing data “go backward” in time.

SAGA Pattern

Introduction to SAGA

The SAGA pattern provides a way to manage distributed transactions without global locking. Instead of a single atomic transaction, a SAGA breaks the operation into a sequence of local transactions. Each local transaction updates the database and publishes an event to trigger the next step. If a step fails, the SAGA executes compensating transactions to undo the previous steps.

The key insight is that compensating transactions are not true rollbacks but rather inverse operations that restore the system to a consistent state. For example, if you reserved inventory, the compensation is to release it. If you charged a credit card, the compensation is to issue a refund.

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List, Callable, Optional
from enum import Enum
import uuid

class StepStatus(Enum):
    PENDING = "pending"
    EXECUTING = "executing"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"

@dataclass
class SagaStep:
    name: str
    execute: Callable
    compensate: Optional[Callable] = None
    status: StepStatus = StepStatus.PENDING
    input_data: Dict = None
    output_data: Dict = None
    error: str = None

@dataclass
class SagaContext:
    saga_id: str
    name: str
    steps: List[SagaStep]
    current_step: int = 0
    started_at: datetime = None
    completed_at: datetime = None
    
    @classmethod
    def create(cls, name: str) -> "SagaContext":
        return cls(
            saga_id=str(uuid.uuid4()),
            name=name,
            steps=[],
            started_at=datetime.utcnow()
        )

class SagaOrchestrator:
    """Orchestrates SAGA execution with compensation."""
    
    def __init__(self, saga_log: "SagaLog"):
        self.saga_log = saga_log
    
    def add_step(self, saga: SagaContext, name: str, execute: Callable, compensate: Callable = None):
        saga.steps.append(SagaStep(
            name=name,
            execute=execute,
            compensate=compensate
        ))
    
    async def execute(self, saga: SagaContext) -> bool:
        """Execute saga, compensating on failure."""
        # Log saga start
        await self.saga_log.log_saga_start(saga)
        
        executed_steps = []
        
        try:
            for i, step in enumerate(saga.steps):
                step.status = StepStatus.EXECUTING
                step.input_data = self._get_step_input(saga, i)
                
                # Execute the step
                step.output_data = await step.execute(step.input_data)
                step.status = StepStatus.COMPLETED
                
                executed_steps.append(i)
                await self.saga_log.log_step_completed(saga, step)
        
        except Exception as e:
            # Compensate in reverse order
            for i in reversed(executed_steps):
                step = saga.steps[i]
                if step.compensate:
                    step.status = StepStatus.COMPENSATING
                    try:
                        await step.compensate(step.output_data)
                        step.status = StepStatus.COMPENSATED
                        await self.saga_log.log_step_compensated(saga, step)
                    except Exception as comp_error:
                        step.error = f"Compensation failed: {str(comp_error)}"
                        await self.saga_log.log_compensation_failed(saga, step)
                        # In production, schedule retry or alert
            
            saga.steps[saga.current_step].error = str(e)
            await self.saga_log.log_saga_failed(saga)
            return False
        
        saga.completed_at = datetime.utcnow()
        await self.saga_log.log_saga_completed(saga)
        return True
    
    def _get_step_input(self, saga: SagaContext, step_index: int) -> Dict:
        """Get input for a step from previous step outputs."""
        if step_index == 0:
            return {}
        
        prev_step = saga.steps[step_index - 1]
        return prev_step.output_data or {}

E-Commerce Order SAGA Example

class CreateOrderSaga:
    """SAGA for creating an order across multiple services."""
    
    def __init__(self, order_service, inventory_service, payment_service, notification_service):
        self.order_service = order_service
        self.inventory_service = inventory_service
        self.payment_service = payment_service
        self.notification_service = notification_service
    
    def build_saga(self, user_id: str, items: list) -> SagaContext:
        saga = SagaContext.create("create_order")
        
        # Step 1: Create order
        orchestrator = SagaOrchestrator(SagaLog())
        orchestrator.add_step(
            saga,
            name="create_order",
            execute=lambda _: self._create_order(user_id, items),
            compensate=self._undo_create_order
        )
        
        # Step 2: Reserve inventory
        orchestrator.add_step(
            saga,
            name="reserve_inventory",
            execute=lambda ctx: self._reserve_inventory(ctx["order_id"], items),
            compensate=self._undo_reserve_inventory
        )
        
        # Step 3: Process payment
        orchestrator.add_step(
            saga,
            name="process_payment",
            execute=lambda ctx: self._process_payment(ctx["order_id"], ctx["total"]),
            compensate=self._undo_process_payment
        )
        
        # Step 4: Send confirmation
        orchestrator.add_step(
            saga,
            name="send_confirmation",
            execute=lambda ctx: self._send_confirmation(ctx["order_id"]),
            compensate=self._undo_send_confirmation
        )
        
        return saga
    
    async def _create_order(self, user_id: str, items: list) -> Dict:
        order = await self.order_service.create_order(user_id, items)
        return {
            "order_id": order.id,
            "user_id": user_id,
            "items": items,
            "total": order.total
        }
    
    async def _undo_create_order(self, output: Dict) -> None:
        if "order_id" in output:
            await self.order_service.cancel_order(output["order_id"])
    
    async def _reserve_inventory(self, order_id: str, items: list) -> Dict:
        result = await self.inventory_service.reserve(order_id, items)
        return {"reservation_id": result.reservation_id, "items": items}
    
    async def _undo_reserve_inventory(self, output: Dict) -> None:
        if "reservation_id" in output:
            await self.inventory_service.release(output["reservation_id"])
    
    async def _process_payment(self, order_id: str, total: float) -> Dict:
        result = await self.payment_service.charge(order_id, total)
        return {"transaction_id": result.transaction_id, "amount": total}
    
    async def _undo_process_payment(self, output: Dict) -> None:
        if "transaction_id" in output:
            await self.payment_service.refund(output["transaction_id"])
    
    async def _send_confirmation(self, order_id: str) -> Dict:
        await self.notification_service.send_order_confirmation(order_id)
        return {"notified": True}
    
    async def _undo_send_confirmation(self, output: Dict) -> None:
        # Cannot really undo an email, but can send a follow-up
        pass

Choreography versus Orchestration

SAGAs can be implemented in two ways: choreography and orchestration.

Choreography uses events to coordinate steps. Each step publishes an event when it completes, and the next step subscribes to that event. There is no central coordinator; each service reacts to events.

Orchestration uses a central coordinator (orchestrator) that tells each participant what to do and when. The orchestrator knows the entire workflow and handles success and failure.

Choreography is simpler for small workflows with few participants. Each service only needs to know about the events it produces and consumes. However, as the workflow grows, it becomes difficult to understand the overall flow and debug issues.

Orchestration provides better visibility and control. The orchestrator knows the state of the entire workflow and can make decisions based on the overall context. However, it introduces a central point of failure and requires additional infrastructure.

# Choreography-based SAGA using events
class OrderCreatedEvent:
    def __init__(self, order_id, user_id, items, total):
        self.order_id = order_id
        self.user_id = user_id
        self.items = items
        self.total = total

class InventoryReservedEvent:
    def __init__(self, order_id, reservation_id):
        self.order_id = order_id
        self.reservation_id = reservation_id

class ChoreographySaga:
    """SAGA using event choreography."""
    
    def __init__(self, event_bus, inventory_service, payment_service):
        self.event_bus = event_bus
        self.inventory = inventory_service
        self.payment = payment_service
        self.pending_orders: Dict[str, dict] = {}
    
    def setup_subscriptions(self):
        # Subscribe to order creation
        self.event_bus.subscribe("order.created", self.on_order_created)
        # Subscribe to inventory reservation
        self.event_bus.subscribe("inventory.reserved", self.on_inventory_reserved)
        # Subscribe to payment completion
        self.event_bus.subscribe("payment.completed", self.on_payment_completed)
    
    async def on_order_created(self, event: OrderCreatedEvent):
        """Handle order creation - reserve inventory."""
        try:
            result = await self.inventory.reserve(event.order_id, event.items)
            self.pending_orders[event.order_id] = {
                "order": event,
                "inventory": result
            }
            self.event_bus.publish(InventoryReservedEvent(event.order_id, result.reservation_id))
        except Exception as e:
            # Publish failure event
            self.event_bus.publish(OrderFailedEvent(event.order_id, str(e)))
    
    async def on_inventory_reserved(self, event: InventoryReservedEvent):
        """Handle inventory reservation - process payment."""
        order_data = self.pending_orders.get(event.order_id)
        if not order_data:
            return
        
        try:
            result = await self.payment.charge(
                event.order_id,
                order_data["order"].total
            )
            self.event_bus.publish(PaymentCompletedEvent(event.order_id, result.transaction_id))
        except Exception as e:
            # Trigger compensation
            await self.compensate_inventory(event.order_id, event.reservation_id)
    
    async def on_payment_completed(self, event: PaymentCompletedEvent):
        """Handle payment completion - finalize order."""
        order_data = self.pending_orders.get(event.order_id)
        if order_data:
            # Finalize order
            del self.pending_orders[event.order_id]
    
    async def compensate_inventory(self, order_id: str, reservation_id: str):
        """Release inventory reservation."""
        await self.inventory.release(reservation_id)
        self.event_bus.publish(OrderFailedEvent(order_id, "Payment failed"))

Two-Phase Commit

When to Use Two-Phase Commit

Two-phase commit (2PC) provides strong consistency by coordinating all participants to either commit or abort a transaction. Unlike SAGA, which accepts eventual consistency, 2PC ensures that all participants see the same data at the same time.

2PC is appropriate when strong consistency is required and the number of participants is small. It is commonly used in financial systems, inventory management, and other domains where temporary inconsistency is unacceptable.

However, 2PC has significant drawbacks. It is a blocking protocol: if the coordinator fails, participants may be left waiting indefinitely. It has high latency because all participants must respond before any can proceed. It does not scale to many participants.

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any
import time

class Vote(Enum):
    YES = "yes"
    NO = "no"

class GlobalState(Enum):
    INIT = "init"
    VOTING = "voting"
    COMMIT = "commit"
    ABORT = "abort"

@dataclass
class ParticipantState:
    participant_id: str
    vote: Vote = None
    has_prepared: bool = False

class TwoPhaseCommit:
    """Two-phase commit coordinator."""
    
    def __init__(self, participants: List[str]):
        self.participants = participants
        self.state = GlobalState.INIT
        self.participant_states: Dict[str, ParticipantState] = {
            p: ParticipantState(p) for p in participants
        }
        self.transaction_id = None
    
    def prepare(self, transaction_id: str) -> bool:
        """Phase 1: Prepare all participants."""
        self.transaction_id = transaction_id
        self.state = GlobalState.VOTING
        
        # Ask all participants to prepare
        all_yes = True
        for participant in self.participants:
            vote = self._send_prepare(participant)
            self.participant_states[participant].vote = vote
            self.participant_states[participant].has_prepared = True
            
            if vote == Vote.NO:
                all_yes = False
        
        if all_yes:
            self.state = GlobalState.COMMIT
            return True
        else:
            self.state = GlobalState.ABORT
            return False
    
    def commit(self) -> bool:
        """Phase 2: Commit or abort based on vote."""
        if self.state == GlobalState.COMMIT:
            for participant in self.participants:
                self._send_commit(participant)
            return True
        elif self.state == GlobalState.ABORT:
            for participant in self.participants:
                self._send_abort(participant)
            return False
        return False
    
    def _send_prepare(self, participant: str) -> Vote:
        """Send prepare request to participant."""
        # In production, this would be a network call
        print(f"Coordinator: Sending PREPARE to {participant}")
        # Simulate participant decision
        return Vote.YES
    
    def _send_commit(self, participant: str):
        """Send commit to participant."""
        print(f"Coordinator: Sending COMMIT to {participant}")
    
    def _send_abort(self, participant: str):
        """Send abort to participant."""
        print(f"Coordinator: Sending ABORT to {participant}")

class Participant:
    """Two-phase commit participant."""
    
    def __init__(self, participant_id: str):
        self.id = participant_id
        self.prepared = False
        self.committed = False
        self.aborted = False
    
    def prepare(self) -> Vote:
        """Prepare to commit. Return YES if ready, NO otherwise."""
        print(f"Participant {self.id}: Preparing")
        # Check if we can commit
        if self._can_commit():
            self.prepared = True
            return Vote.YES
        else:
            return Vote.NO
    
    def commit(self):
        """Commit the transaction."""
        print(f"Participant {self.id}: Committing")
        self.committed = True
    
    def abort(self):
        """Abort the transaction."""
        print(f"Participant {self.id}: Aborting")
        self.aborted = True

Limitations of Two-Phase Commit

While 2PC provides strong consistency, it has significant limitations that make it unsuitable for many microservices scenarios.

Blocking: If the coordinator fails after participants have prepared, those participants cannot make a decision. They must wait indefinitely for the coordinator to recover. This can lead to resource locks and system unavailability.

Latency: All participants must respond to the prepare phase before any can proceed to commit. This creates high latency, especially when participants are geographically distributed.

Scalability: 2PC requires all participants to respond, making it impractical for large numbers of participants. Each additional participant increases the probability of failure.

Coupling: All participants must be available for the transaction to proceed. This creates tight coupling between services and reduces system resilience.

For these reasons, 2PC is typically reserved for scenarios where strong consistency is absolutely required and the number of participants is small. Most microservices architectures use the SAGA pattern or eventual consistency instead.

Eventual Consistency Patterns

Embracing Asynchrony

Eventual consistency accepts that data may be temporarily inconsistent across services but will eventually become consistent. This model enables high availability, partition tolerance, and scalability. It requires a different mindset: instead of ensuring consistency at write time, you ensure that the system will eventually reach a consistent state.

Eventual consistency works well for many use cases. When a user creates an order, it is acceptable for the order history to show the order a few seconds later. When inventory is reserved, it is acceptable for the count to update asynchronously. The key is understanding which operations require strong consistency and which can tolerate eventual consistency.

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List
import asyncio

@dataclass
class InventoryItem:
    product_id: str
    reserved: int
    available: int
    
    @property
    def total(self):
        return self.reserved + self.available

class EventuallyConsistentInventory:
    """Inventory service with eventual consistency."""
    
    def __init__(self, event_publisher):
        self.items: Dict[str, InventoryItem] = {}
        self.pending_updates: List[dict] = []
        self.event_publisher = event_publisher
    
    async def reserve(self, product_id: str, quantity: int) -> bool:
        """Reserve inventory. Uses optimistic locking."""
        if product_id not in self.items:
            return False
        
        item = self.items[product_id]
        
        # Optimistic check
        if item.available < quantity:
            return False
        
        # Reserve the items
        item.available -= quantity
        item.reserved += quantity
        
        # Publish event for other services
        await self.event_publisher.publish("inventory.reserved", {
            "product_id": product_id,
            "quantity": quantity,
            "available": item.available,
            "reserved": item.reserved
        })
        
        return True
    
    async def confirm_reservation(self, product_id: str, quantity: int):
        """Confirm reservation after payment succeeds."""
        if product_id not in self.items:
            return
        
        item = self.items[product_id]
        item.reserved -= quantity
        
        await self.event_publisher.publish("inventory.confirmed", {
            "product_id": product_id,
            "quantity": quantity
        })
    
    async def release_reservation(self, product_id: str, quantity: int):
        """Release reservation if payment fails."""
        if product_id not in self.items:
            return
        
        item = self.items[product_id]
        item.reserved -= quantity
        item.available += quantity
        
        await self.event_publisher.publish("inventory.released", {
            "product_id": product_id,
            "quantity": quantity
        })

Handling Conflicts

In eventually consistent systems, conflicts can arise when the same data is modified concurrently. Several strategies exist for handling these conflicts.

Last Writer Wins is the simplest approach: the most recent write succeeds. This is easy to implement but can lead to lost updates. It works well when conflicts are rare or when lost updates are acceptable.

Application-Level Resolution involves the application detecting conflicts and resolving them based on business logic. For example, when merging concurrent updates to a document, the application might combine the changes intelligently.

Conflict-Free Replicated Data Types (CRDTs) are data structures designed to handle concurrent updates automatically. CRDTs can be merged without coordination, always converging to a consistent state.

from typing import Set

class CRDTSet:
    """Conflict-free replicated set using LWW (Last-Writer-Wins)."""
    
    def __init__(self):
        self._adds: Dict[str, float] = {}  # element -> timestamp
        self._removes: Dict[str, float] = {}  # element -> timestamp
    
    def add(self, element: str, timestamp: float = None):
        """Add an element."""
        ts = timestamp or time.time()
        if element not in self._removes or ts > self._removes[element]:
            self._adds[element] = ts
    
    def remove(self, element: str, timestamp: float = None):
        """Remove an element."""
        ts = timestamp or time.time()
        self._removes[element] = ts
    
    def contains(self, element: str) -> bool:
        """Check if element is in the set."""
        if element not in self._adds:
            return False
        if element in self._removes and self._removes[element] > self._adds[element]:
            return False
        return True
    
    def merge(self, other: "CRDTSet"):
        """Merge with another set."""
        for element, timestamp in other._adds.items():
            if element not in self._adds or timestamp > self._adds[element]:
                self._adds[element] = timestamp
        
        for element, timestamp in other._removes.items():
            if element not in self._removes or timestamp > self._removes[element]:
                self._removes[element] = timestamp
    
    def get_all(self) -> Set[str]:
        """Get all elements in the set."""
        result = set()
        for element, add_ts in self._adds.items():
            if element not in self._removes or add_ts > self._removes[element]:
                result.add(element)
        return result

Conclusion

Distributed transactions are fundamentally different from local transactions. The SAGA pattern provides a robust mechanism for managing long-running workflows with compensation for failures. Two-phase commit offers strong consistency when needed but at the cost of availability and scalability. Eventual consistency embraces the distributed nature of modern systems, accepting temporary inconsistency for improved resilience.

The choice of pattern depends on your specific requirements. For most microservices architectures, the SAGA pattern provides the right balance of consistency and resilience. For financial transactions and other high-stakes operations, 2PC may be appropriate. For high-scale systems with relaxed consistency requirements, eventual consistency with CRDTs or other conflict resolution mechanisms may be the best choice.

Remember that there is no one-size-fits-all solution. Most production systems use a combination of patterns, applying each where it makes the most sense. The key is understanding the trade-offs and making informed decisions based on your specific requirements.

Resources

  • “Designing Data-Intensive Applications” by Martin Kleppmann
  • “Building Microservices” by Sam Newman
  • “Enterprise Integration Patterns” by Gregor Hohpe and Bobby Woolf
  • Google Research: “Life beyond Distributed Transactions: an Apostate’s Opinion”

Comments