Introduction
Distributed transactions are one of the most challenging aspects of microservices architecture. In a monolithic application, transactions are straightforward: a single database handles all operations atomically. In microservices, each service typically manages its own database, and operations span multiple services. The traditional ACID transaction model breaks down, requiring new approaches to maintain data consistency.
The CAP theorem tells us that in a distributed system, we cannot simultaneously achieve consistency, availability, and partition tolerance. When network partitions occur, we must choose between consistency and availability. Most modern systems choose availability, accepting eventual consistency as a trade-off. This shift requires fundamentally different approaches to maintaining data integrity.
This guide explores the major patterns for handling distributed transactions: the SAGA pattern for long-running workflows, two-phase commit for strong consistency when needed, and eventual consistency models that embrace the distributed nature of modern systems.
The Challenge of Distributed Transactions
Why Traditional Transactions Fail
In a monolithic application, a single database transaction can ensure that all operations either complete together or fail together. If an error occurs, the database rolls back all changes, maintaining consistency. This model is simple, reliable, and well-understood.
In microservices, each service owns its data and cannot participate in another service’s database transaction. When an operation spans multiple services, there is no global transaction to ensure atomicity. If one service succeeds and another fails, the system can become inconsistent.
Consider an e-commerce order that involves the user service, inventory service, payment service, and shipping service. If the payment succeeds but the inventory service fails, you have a charged customer with no reserved inventory. The traditional solution of rolling back the transaction is not possible because each service has already committed its part.
# The problem: No global transaction in microservices
class OrderService:
def create_order(self, user_id: str, items: list):
# Step 1: Create order in order database
order = self.order_repo.create(user_id, items)
# Step 2: Reserve inventory (separate service)
try:
self.inventory_service.reserve(order.id, items)
except InventoryError:
# Order is created but inventory failed
# How do we roll back the order?
self.order_repo.update_status(order.id, "failed")
raise
# Step 3: Process payment (separate service)
try:
self.payment_service.charge(order.user_id, order.total)
except PaymentError:
# Inventory is reserved but payment failed
# We need to release inventory
self.inventory_service.release(order.id)
self.order_repo.update_status(order.id, "failed")
raise
# All succeeded
self.order_repo.update_status(order.id, "confirmed")
return order
Understanding Consistency Models
Different consistency models offer different trade-offs between consistency, availability, and performance.
Strong Consistency guarantees that any read receives the most recent write. This is what traditional databases provide. In distributed systems, achieving strong consistency requires coordination that can impact availability and performance.
Eventual Consistency guarantees that if no new updates are made, eventually all reads will return the last written value. This model provides high availability and partition tolerance but allows temporary inconsistencies.
Read Your Writes Consistency guarantees that after a client writes data, it will always see its own writes in subsequent reads. This is important for user experience but does not guarantee consistency across all clients.
Monotonic Reads Consistency guarantees that if a client reads a value, subsequent reads will not return earlier values. This prevents the user from seeing data “go backward” in time.
SAGA Pattern
Introduction to SAGA
The SAGA pattern provides a way to manage distributed transactions without global locking. Instead of a single atomic transaction, a SAGA breaks the operation into a sequence of local transactions. Each local transaction updates the database and publishes an event to trigger the next step. If a step fails, the SAGA executes compensating transactions to undo the previous steps.
The key insight is that compensating transactions are not true rollbacks but rather inverse operations that restore the system to a consistent state. For example, if you reserved inventory, the compensation is to release it. If you charged a credit card, the compensation is to issue a refund.
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List, Callable, Optional
from enum import Enum
import uuid
class StepStatus(Enum):
PENDING = "pending"
EXECUTING = "executing"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
@dataclass
class SagaStep:
name: str
execute: Callable
compensate: Optional[Callable] = None
status: StepStatus = StepStatus.PENDING
input_data: Dict = None
output_data: Dict = None
error: str = None
@dataclass
class SagaContext:
saga_id: str
name: str
steps: List[SagaStep]
current_step: int = 0
started_at: datetime = None
completed_at: datetime = None
@classmethod
def create(cls, name: str) -> "SagaContext":
return cls(
saga_id=str(uuid.uuid4()),
name=name,
steps=[],
started_at=datetime.utcnow()
)
class SagaOrchestrator:
"""Orchestrates SAGA execution with compensation."""
def __init__(self, saga_log: "SagaLog"):
self.saga_log = saga_log
def add_step(self, saga: SagaContext, name: str, execute: Callable, compensate: Callable = None):
saga.steps.append(SagaStep(
name=name,
execute=execute,
compensate=compensate
))
async def execute(self, saga: SagaContext) -> bool:
"""Execute saga, compensating on failure."""
# Log saga start
await self.saga_log.log_saga_start(saga)
executed_steps = []
try:
for i, step in enumerate(saga.steps):
step.status = StepStatus.EXECUTING
step.input_data = self._get_step_input(saga, i)
# Execute the step
step.output_data = await step.execute(step.input_data)
step.status = StepStatus.COMPLETED
executed_steps.append(i)
await self.saga_log.log_step_completed(saga, step)
except Exception as e:
# Compensate in reverse order
for i in reversed(executed_steps):
step = saga.steps[i]
if step.compensate:
step.status = StepStatus.COMPENSATING
try:
await step.compensate(step.output_data)
step.status = StepStatus.COMPENSATED
await self.saga_log.log_step_compensated(saga, step)
except Exception as comp_error:
step.error = f"Compensation failed: {str(comp_error)}"
await self.saga_log.log_compensation_failed(saga, step)
# In production, schedule retry or alert
saga.steps[saga.current_step].error = str(e)
await self.saga_log.log_saga_failed(saga)
return False
saga.completed_at = datetime.utcnow()
await self.saga_log.log_saga_completed(saga)
return True
def _get_step_input(self, saga: SagaContext, step_index: int) -> Dict:
"""Get input for a step from previous step outputs."""
if step_index == 0:
return {}
prev_step = saga.steps[step_index - 1]
return prev_step.output_data or {}
E-Commerce Order SAGA Example
class CreateOrderSaga:
"""SAGA for creating an order across multiple services."""
def __init__(self, order_service, inventory_service, payment_service, notification_service):
self.order_service = order_service
self.inventory_service = inventory_service
self.payment_service = payment_service
self.notification_service = notification_service
def build_saga(self, user_id: str, items: list) -> SagaContext:
saga = SagaContext.create("create_order")
# Step 1: Create order
orchestrator = SagaOrchestrator(SagaLog())
orchestrator.add_step(
saga,
name="create_order",
execute=lambda _: self._create_order(user_id, items),
compensate=self._undo_create_order
)
# Step 2: Reserve inventory
orchestrator.add_step(
saga,
name="reserve_inventory",
execute=lambda ctx: self._reserve_inventory(ctx["order_id"], items),
compensate=self._undo_reserve_inventory
)
# Step 3: Process payment
orchestrator.add_step(
saga,
name="process_payment",
execute=lambda ctx: self._process_payment(ctx["order_id"], ctx["total"]),
compensate=self._undo_process_payment
)
# Step 4: Send confirmation
orchestrator.add_step(
saga,
name="send_confirmation",
execute=lambda ctx: self._send_confirmation(ctx["order_id"]),
compensate=self._undo_send_confirmation
)
return saga
async def _create_order(self, user_id: str, items: list) -> Dict:
order = await self.order_service.create_order(user_id, items)
return {
"order_id": order.id,
"user_id": user_id,
"items": items,
"total": order.total
}
async def _undo_create_order(self, output: Dict) -> None:
if "order_id" in output:
await self.order_service.cancel_order(output["order_id"])
async def _reserve_inventory(self, order_id: str, items: list) -> Dict:
result = await self.inventory_service.reserve(order_id, items)
return {"reservation_id": result.reservation_id, "items": items}
async def _undo_reserve_inventory(self, output: Dict) -> None:
if "reservation_id" in output:
await self.inventory_service.release(output["reservation_id"])
async def _process_payment(self, order_id: str, total: float) -> Dict:
result = await self.payment_service.charge(order_id, total)
return {"transaction_id": result.transaction_id, "amount": total}
async def _undo_process_payment(self, output: Dict) -> None:
if "transaction_id" in output:
await self.payment_service.refund(output["transaction_id"])
async def _send_confirmation(self, order_id: str) -> Dict:
await self.notification_service.send_order_confirmation(order_id)
return {"notified": True}
async def _undo_send_confirmation(self, output: Dict) -> None:
# Cannot really undo an email, but can send a follow-up
pass
Choreography versus Orchestration
SAGAs can be implemented in two ways: choreography and orchestration.
Choreography uses events to coordinate steps. Each step publishes an event when it completes, and the next step subscribes to that event. There is no central coordinator; each service reacts to events.
Orchestration uses a central coordinator (orchestrator) that tells each participant what to do and when. The orchestrator knows the entire workflow and handles success and failure.
Choreography is simpler for small workflows with few participants. Each service only needs to know about the events it produces and consumes. However, as the workflow grows, it becomes difficult to understand the overall flow and debug issues.
Orchestration provides better visibility and control. The orchestrator knows the state of the entire workflow and can make decisions based on the overall context. However, it introduces a central point of failure and requires additional infrastructure.
# Choreography-based SAGA using events
class OrderCreatedEvent:
def __init__(self, order_id, user_id, items, total):
self.order_id = order_id
self.user_id = user_id
self.items = items
self.total = total
class InventoryReservedEvent:
def __init__(self, order_id, reservation_id):
self.order_id = order_id
self.reservation_id = reservation_id
class ChoreographySaga:
"""SAGA using event choreography."""
def __init__(self, event_bus, inventory_service, payment_service):
self.event_bus = event_bus
self.inventory = inventory_service
self.payment = payment_service
self.pending_orders: Dict[str, dict] = {}
def setup_subscriptions(self):
# Subscribe to order creation
self.event_bus.subscribe("order.created", self.on_order_created)
# Subscribe to inventory reservation
self.event_bus.subscribe("inventory.reserved", self.on_inventory_reserved)
# Subscribe to payment completion
self.event_bus.subscribe("payment.completed", self.on_payment_completed)
async def on_order_created(self, event: OrderCreatedEvent):
"""Handle order creation - reserve inventory."""
try:
result = await self.inventory.reserve(event.order_id, event.items)
self.pending_orders[event.order_id] = {
"order": event,
"inventory": result
}
self.event_bus.publish(InventoryReservedEvent(event.order_id, result.reservation_id))
except Exception as e:
# Publish failure event
self.event_bus.publish(OrderFailedEvent(event.order_id, str(e)))
async def on_inventory_reserved(self, event: InventoryReservedEvent):
"""Handle inventory reservation - process payment."""
order_data = self.pending_orders.get(event.order_id)
if not order_data:
return
try:
result = await self.payment.charge(
event.order_id,
order_data["order"].total
)
self.event_bus.publish(PaymentCompletedEvent(event.order_id, result.transaction_id))
except Exception as e:
# Trigger compensation
await self.compensate_inventory(event.order_id, event.reservation_id)
async def on_payment_completed(self, event: PaymentCompletedEvent):
"""Handle payment completion - finalize order."""
order_data = self.pending_orders.get(event.order_id)
if order_data:
# Finalize order
del self.pending_orders[event.order_id]
async def compensate_inventory(self, order_id: str, reservation_id: str):
"""Release inventory reservation."""
await self.inventory.release(reservation_id)
self.event_bus.publish(OrderFailedEvent(order_id, "Payment failed"))
Two-Phase Commit
When to Use Two-Phase Commit
Two-phase commit (2PC) provides strong consistency by coordinating all participants to either commit or abort a transaction. Unlike SAGA, which accepts eventual consistency, 2PC ensures that all participants see the same data at the same time.
2PC is appropriate when strong consistency is required and the number of participants is small. It is commonly used in financial systems, inventory management, and other domains where temporary inconsistency is unacceptable.
However, 2PC has significant drawbacks. It is a blocking protocol: if the coordinator fails, participants may be left waiting indefinitely. It has high latency because all participants must respond before any can proceed. It does not scale to many participants.
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any
import time
class Vote(Enum):
YES = "yes"
NO = "no"
class GlobalState(Enum):
INIT = "init"
VOTING = "voting"
COMMIT = "commit"
ABORT = "abort"
@dataclass
class ParticipantState:
participant_id: str
vote: Vote = None
has_prepared: bool = False
class TwoPhaseCommit:
"""Two-phase commit coordinator."""
def __init__(self, participants: List[str]):
self.participants = participants
self.state = GlobalState.INIT
self.participant_states: Dict[str, ParticipantState] = {
p: ParticipantState(p) for p in participants
}
self.transaction_id = None
def prepare(self, transaction_id: str) -> bool:
"""Phase 1: Prepare all participants."""
self.transaction_id = transaction_id
self.state = GlobalState.VOTING
# Ask all participants to prepare
all_yes = True
for participant in self.participants:
vote = self._send_prepare(participant)
self.participant_states[participant].vote = vote
self.participant_states[participant].has_prepared = True
if vote == Vote.NO:
all_yes = False
if all_yes:
self.state = GlobalState.COMMIT
return True
else:
self.state = GlobalState.ABORT
return False
def commit(self) -> bool:
"""Phase 2: Commit or abort based on vote."""
if self.state == GlobalState.COMMIT:
for participant in self.participants:
self._send_commit(participant)
return True
elif self.state == GlobalState.ABORT:
for participant in self.participants:
self._send_abort(participant)
return False
return False
def _send_prepare(self, participant: str) -> Vote:
"""Send prepare request to participant."""
# In production, this would be a network call
print(f"Coordinator: Sending PREPARE to {participant}")
# Simulate participant decision
return Vote.YES
def _send_commit(self, participant: str):
"""Send commit to participant."""
print(f"Coordinator: Sending COMMIT to {participant}")
def _send_abort(self, participant: str):
"""Send abort to participant."""
print(f"Coordinator: Sending ABORT to {participant}")
class Participant:
"""Two-phase commit participant."""
def __init__(self, participant_id: str):
self.id = participant_id
self.prepared = False
self.committed = False
self.aborted = False
def prepare(self) -> Vote:
"""Prepare to commit. Return YES if ready, NO otherwise."""
print(f"Participant {self.id}: Preparing")
# Check if we can commit
if self._can_commit():
self.prepared = True
return Vote.YES
else:
return Vote.NO
def commit(self):
"""Commit the transaction."""
print(f"Participant {self.id}: Committing")
self.committed = True
def abort(self):
"""Abort the transaction."""
print(f"Participant {self.id}: Aborting")
self.aborted = True
Limitations of Two-Phase Commit
While 2PC provides strong consistency, it has significant limitations that make it unsuitable for many microservices scenarios.
Blocking: If the coordinator fails after participants have prepared, those participants cannot make a decision. They must wait indefinitely for the coordinator to recover. This can lead to resource locks and system unavailability.
Latency: All participants must respond to the prepare phase before any can proceed to commit. This creates high latency, especially when participants are geographically distributed.
Scalability: 2PC requires all participants to respond, making it impractical for large numbers of participants. Each additional participant increases the probability of failure.
Coupling: All participants must be available for the transaction to proceed. This creates tight coupling between services and reduces system resilience.
For these reasons, 2PC is typically reserved for scenarios where strong consistency is absolutely required and the number of participants is small. Most microservices architectures use the SAGA pattern or eventual consistency instead.
Eventual Consistency Patterns
Embracing Asynchrony
Eventual consistency accepts that data may be temporarily inconsistent across services but will eventually become consistent. This model enables high availability, partition tolerance, and scalability. It requires a different mindset: instead of ensuring consistency at write time, you ensure that the system will eventually reach a consistent state.
Eventual consistency works well for many use cases. When a user creates an order, it is acceptable for the order history to show the order a few seconds later. When inventory is reserved, it is acceptable for the count to update asynchronously. The key is understanding which operations require strong consistency and which can tolerate eventual consistency.
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List
import asyncio
@dataclass
class InventoryItem:
product_id: str
reserved: int
available: int
@property
def total(self):
return self.reserved + self.available
class EventuallyConsistentInventory:
"""Inventory service with eventual consistency."""
def __init__(self, event_publisher):
self.items: Dict[str, InventoryItem] = {}
self.pending_updates: List[dict] = []
self.event_publisher = event_publisher
async def reserve(self, product_id: str, quantity: int) -> bool:
"""Reserve inventory. Uses optimistic locking."""
if product_id not in self.items:
return False
item = self.items[product_id]
# Optimistic check
if item.available < quantity:
return False
# Reserve the items
item.available -= quantity
item.reserved += quantity
# Publish event for other services
await self.event_publisher.publish("inventory.reserved", {
"product_id": product_id,
"quantity": quantity,
"available": item.available,
"reserved": item.reserved
})
return True
async def confirm_reservation(self, product_id: str, quantity: int):
"""Confirm reservation after payment succeeds."""
if product_id not in self.items:
return
item = self.items[product_id]
item.reserved -= quantity
await self.event_publisher.publish("inventory.confirmed", {
"product_id": product_id,
"quantity": quantity
})
async def release_reservation(self, product_id: str, quantity: int):
"""Release reservation if payment fails."""
if product_id not in self.items:
return
item = self.items[product_id]
item.reserved -= quantity
item.available += quantity
await self.event_publisher.publish("inventory.released", {
"product_id": product_id,
"quantity": quantity
})
Handling Conflicts
In eventually consistent systems, conflicts can arise when the same data is modified concurrently. Several strategies exist for handling these conflicts.
Last Writer Wins is the simplest approach: the most recent write succeeds. This is easy to implement but can lead to lost updates. It works well when conflicts are rare or when lost updates are acceptable.
Application-Level Resolution involves the application detecting conflicts and resolving them based on business logic. For example, when merging concurrent updates to a document, the application might combine the changes intelligently.
Conflict-Free Replicated Data Types (CRDTs) are data structures designed to handle concurrent updates automatically. CRDTs can be merged without coordination, always converging to a consistent state.
from typing import Set
class CRDTSet:
"""Conflict-free replicated set using LWW (Last-Writer-Wins)."""
def __init__(self):
self._adds: Dict[str, float] = {} # element -> timestamp
self._removes: Dict[str, float] = {} # element -> timestamp
def add(self, element: str, timestamp: float = None):
"""Add an element."""
ts = timestamp or time.time()
if element not in self._removes or ts > self._removes[element]:
self._adds[element] = ts
def remove(self, element: str, timestamp: float = None):
"""Remove an element."""
ts = timestamp or time.time()
self._removes[element] = ts
def contains(self, element: str) -> bool:
"""Check if element is in the set."""
if element not in self._adds:
return False
if element in self._removes and self._removes[element] > self._adds[element]:
return False
return True
def merge(self, other: "CRDTSet"):
"""Merge with another set."""
for element, timestamp in other._adds.items():
if element not in self._adds or timestamp > self._adds[element]:
self._adds[element] = timestamp
for element, timestamp in other._removes.items():
if element not in self._removes or timestamp > self._removes[element]:
self._removes[element] = timestamp
def get_all(self) -> Set[str]:
"""Get all elements in the set."""
result = set()
for element, add_ts in self._adds.items():
if element not in self._removes or add_ts > self._removes[element]:
result.add(element)
return result
Conclusion
Distributed transactions are fundamentally different from local transactions. The SAGA pattern provides a robust mechanism for managing long-running workflows with compensation for failures. Two-phase commit offers strong consistency when needed but at the cost of availability and scalability. Eventual consistency embraces the distributed nature of modern systems, accepting temporary inconsistency for improved resilience.
The choice of pattern depends on your specific requirements. For most microservices architectures, the SAGA pattern provides the right balance of consistency and resilience. For financial transactions and other high-stakes operations, 2PC may be appropriate. For high-scale systems with relaxed consistency requirements, eventual consistency with CRDTs or other conflict resolution mechanisms may be the best choice.
Remember that there is no one-size-fits-all solution. Most production systems use a combination of patterns, applying each where it makes the most sense. The key is understanding the trade-offs and making informed decisions based on your specific requirements.
Resources
- “Designing Data-Intensive Applications” by Martin Kleppmann
- “Building Microservices” by Sam Newman
- “Enterprise Integration Patterns” by Gregor Hohpe and Bobby Woolf
- Google Research: “Life beyond Distributed Transactions: an Apostate’s Opinion”
Comments