Introduction
The Two-Phase Commit (2PC) protocol stands as one of the foundational algorithms in distributed systems, providing atomic commit guarantees for transactions that span multiple independent nodes. When a transaction involves multiple databases or services, ensuring that either all changes are applied or none are applied becomes critically important for data consistency.
This comprehensive guide explores the 2PC protocol in depth, examining its mechanics, implementation considerations, failure scenarios, and alternatives. Whether you’re designing a distributed database, building a microservices architecture, or working with any system requiring coordinated commits across multiple nodes, understanding 2PC is essential.
Understanding Distributed Transactions
The Atomicity Challenge
In a single-database system, atomicity is guaranteed by the database’s transaction manager. When you begin a transaction, make several changes, and then commit, the database ensures all changes are applied together or rolled back together. This works because all operations occur within a single, centralized system with shared state.
Distributed transactions face a fundamentally different challenge. Consider an e-commerce system where an order spans three services:
# A single transaction spans multiple services
class OrderService:
def create_order(self, user_id, items):
# Transaction begins
inventory_reserved = self.reserve_inventory(items) # Service 1
payment_charged = self.charge_payment(user_id) # Service 2
order_created = self.create_order_record(items) # Service 3
# Transaction commits or rolls back
If reserve_inventory() and charge_payment() succeed but create_order_record() fails, you have a serious problem: inventory is reserved, payment was charged, but no order exists. The customer paid for nothing, or inventory is locked without an order.
What 2PC Achieves
The Two-Phase Commit protocol solves this problem by coordinating all participants to reach a collective decision:
- All participants commit if everyone can commit
- All participants abort if any participant cannot commit
There is no intermediate state where some participants commit and others don’t. This property is called atomicity, and 2PC guarantees it across distributed systems.
Protocol Mechanics
System Architecture
The 2PC protocol involves two types of participants:
class Participant:
"""
A participant (also called a cohort) is a node that executes
part of the distributed transaction.
"""
def __init__(self, participant_id):
self.participant_id = participant_id
self.transaction_manager = None
self.prepared = False
self.transaction_state = "idle"
def execute_operation(self, operation):
"""Execute the actual transaction operation."""
pass
def prepare(self):
"""
Phase 1: Prepare to commit.
The participant must:
1. Ensure it can commit the transaction
2. Persist the transaction in a prepare log (for recovery)
3. Acquire necessary locks
4. Vote YES or NO
"""
pass
def commit(self):
"""
Phase 2: Commit the transaction.
The participant:
1. Applies the changes
2. Releases locks
3. Sends acknowledgment
"""
pass
def abort(self):
"""
Phase 2: Abort the transaction.
The participant:
1. Rolls back all changes
2. Releases locks
3. Sends acknowledgment
"""
pass
The Coordinator
The coordinator is the central node that orchestrates the protocol:
class TwoPhaseCommitCoordinator:
"""
Coordinator that manages the 2PC protocol.
"""
def __init__(self, transaction_id):
self.transaction_id = transaction_id
self.participants = []
self.votes = {}
self.state = "init"
def add_participant(self, participant):
"""Add a participant to the transaction."""
self.participants.append(participant)
async def execute_transaction(self, operations):
"""
Execute a distributed transaction using 2PC.
"""
try:
# Execute operations on all participants
for participant in self.participants:
await participant.execute_operation(operations)
# Phase 1: Prepare
await self.prepare_phase()
# Phase 2: Commit or Abort based on votes
if self.should_commit():
await self.commit_phase()
else:
await self.abort_phase()
except Exception as e:
await self.abort_phase()
raise
Phase 1: Prepare Phase
In the prepare phase, the coordinator asks all participants if they can commit:
async def prepare_phase(self):
"""
Phase 1: The Prepare (or Voting) Phase
1. Coordinator sends PREPARE to all participants
2. Each participant:
- Checks if it can commit
- Writes PREPARE to its transaction log (for durability)
- Acquires necessary locks
- Votes YES (VOTE_COMMIT) or NO (VOTE_ABORT)
3. Coordinator collects all votes
"""
self.state = "prepare"
# Send prepare requests to all participants
prepare_tasks = []
for participant in self.participants:
task = self.send_prepare(participant)
prepare_tasks.append(task)
# Wait for all responses
responses = await asyncio.gather(*prepare_tasks, return_exceptions=True)
# Collect votes
for participant, response in zip(self.participants, responses):
if isinstance(response, Exception):
self.votes[participant.id] = "VOTE_ABORT"
else:
self.votes[participant.id] = response
return self.should_commit()
async def send_prepare(self, participant):
"""Send PREPARE message to participant and get vote."""
try:
vote = await participant.prepare()
return vote
except Exception as e:
logger.error(f"Participant {participant.id} failed to prepare: {e}")
return "VOTE_ABORT"
Participant Prepare Implementation
Each participant must carefully implement the prepare phase:
class DatabaseParticipant:
"""A database participant that implements 2PC."""
async def prepare(self):
"""
Prepare phase for a database participant.
Critical steps:
1. Check if transaction can be committed
2. Write PREPARE record to WAL (Write-Ahead Log)
3. Acquire all necessary locks
4. Return VOTE_COMMIT or VOTE_ABORT
"""
# Check if we can commit
if not self.can_commit():
await self.rollback_local_changes()
return "VOTE_ABORT"
# Write PREPARE to WAL - CRITICAL for recovery
# This ensures we can recover even if we crash after voting
await self.wal.write({
"type": "PREPARE",
"transaction_id": self.transaction_id,
"timestamp": datetime.utcnow()
})
# Acquire locks on all affected rows/tables
await self.acquire_locks()
# Vote to commit
return "VOTE_COMMIT"
def can_commit(self):
"""Check if we can commit this transaction."""
# Check constraints, resources, etc.
return True # Simplified
Phase 2: Commit Phase
Based on the votes, the coordinator decides to commit or abort:
def should_commit(self):
"""
Decision logic:
- If ALL votes are VOTE_COMMIT -> COMMIT
- If ANY vote is VOTE_ABORT -> ABORT
"""
return all(vote == "VOTE_COMMIT" for vote in self.votes.values())
async def commit_phase(self):
"""
Phase 2: The Commit Phase (successful case)
1. Coordinator writes COMMIT to its log
2. Coordinator sends GLOBAL_COMMIT to all participants
3. Each participant:
- Applies the transaction
- Releases locks
- Writes COMMIT to log
- Sends ACK to coordinator
"""
self.state = "commit"
# Coordinator writes COMMIT to log first (durability)
await self.coordinator_log.write({
"type": "COMMIT",
"transaction_id": self.transaction_id,
"participants": [p.id for p in self.participants]
})
# Send GLOBAL_COMMIT to all participants
commit_tasks = []
for participant in self.participants:
task = participant.commit()
commit_tasks.append(task)
# Wait for acknowledgments
await asyncio.gather(*commit_tasks, return_exceptions=True)
self.state = "committed"
async def abort_phase(self):
"""
Phase 2: The Abort Phase (failure case)
1. Coordinator writes ABORT to its log
2. Coordinator sends GLOBAL_ABORT to all participants
3. Each participant:
- Rolls back changes
- Releases locks
- Writes ABORT to log
- Sends ACK to coordinator
"""
self.state = "abort"
# Write ABORT to coordinator log
await self.coordinator_log.write({
"type": "ABORT",
"transaction_id": self.transaction_id
})
# Send GLOBAL_ABORT to all participants
abort_tasks = []
for participant in self.participants:
task = participant.abort()
abort_tasks.append(task)
# Wait for acknowledgments
await asyncio.gather(*abort_tasks, return_exceptions=True)
self.state = "aborted"
Participant Commit Implementation
class DatabaseParticipant:
async def commit(self):
"""
Commit phase for a participant.
1. Write COMMIT to WAL
2. Apply all pending changes
3. Release all locks
4. Send ACK
"""
# Write COMMIT to WAL
await self.wal.write({
"type": "COMMIT",
"transaction_id": self.transaction_id
})
# Apply changes (make them permanent)
await self.apply_pending_changes()
# Release all locks
await self.release_locks()
# Clear transaction state
self.transaction_state = "committed"
return "ACK"
async def abort(self):
"""
Abort phase for a participant.
1. Write ABORT to WAL
2. Roll back all changes
3. Release all locks
4. Send ACK
"""
# Write ABORT to WAL
await self.wal.write({
"type": "ABORT",
"transaction_id": self.transaction_id
})
# Roll back changes
await self.rollback_local_changes()
# Release all locks
await self.release_locks()
# Clear transaction state
self.transaction_state = "aborted"
return "ACK"
Failure Scenarios and Handling
Coordinator Failure During Protocol
One of the most critical failure scenarios is coordinator failure:
class RecoveryManager:
"""
Handles recovery from various failure scenarios.
"""
async def recover_participant(self, participant_id):
"""
Recovery logic for a participant.
When a participant restarts:
1. Read the transaction log
2. Check for prepared transactions
3. Ask coordinator for decision if needed
"""
# Read local transaction log
log_entries = await self.wal.read_for_recovery()
for entry in log_entries:
if entry["type"] == "PREPARE":
# We prepared but didn't know the final decision
transaction_id = entry["transaction_id"]
# Ask coordinator about the transaction
decision = await self.ask_coordinator(transaction_id)
if decision == "COMMIT":
await self.commit()
else:
await self.abort()
elif entry["type"] == "COMMIT":
# Already committed - confirm state
await self.confirm_committed(entry["transaction_id"])
elif entry["type"] == "ABORT":
# Already aborted - confirm state
await self.confirm_aborted(entry["transaction_id"])
async def ask_coordinator(self, transaction_id):
"""Ask coordinator for the decision on a transaction."""
try:
# Try to contact coordinator
response = await self.coordinator_client.query_decision(
transaction_id
)
return response["decision"]
except CoordinatorUnavailable:
# Coordinator is down - we must wait
# This is the BLOCKING problem of 2PC
raise CannotRecoverException(
"Coordinator unavailable - must wait for recovery"
)
Timeout Handling
Timeouts are critical in distributed systems:
class TwoPhaseCommitCoordinator:
"""
2PC coordinator with timeout handling.
"""
def __init__(self):
self.prepare_timeout = 30 # seconds
self.commit_timeout = 30 # seconds
async def execute_with_timeout(self, operation, timeout):
"""Execute operation with timeout."""
try:
return await asyncio.wait_for(
operation,
timeout=timeout
)
except asyncio.TimeoutError:
logger.error(f"Operation timed out after {timeout}s")
raise
async def prepare_phase_with_timeout(self):
"""Execute prepare phase with timeout."""
# Start prepare on all participants
prepare_tasks = []
for participant in self.participants:
task = asyncio.create_task(
self.send_prepare(participant)
)
prepare_tasks.append((participant, task))
# Wait for all with timeout
done, pending = await asyncio.wait(
[t for _, t in prepare_tasks],
timeout=self.prepare_timeout
)
# Handle timeouts
for participant, task in prepare_tasks:
if task in pending:
# This participant timed out
task.cancel()
self.votes[participant.id] = "VOTE_ABORT"
logger.warning(f"Participant {participant.id} timed out in prepare")
else:
# Get result
try:
self.votes[participant.id] = task.result()
except Exception:
self.votes[participant.id] = "VOTE_ABORT"
return self.should_commit()
Participant Failure Scenarios
class ParticipantFailureHandler:
"""
Handle various participant failure scenarios.
"""
async def handle_prepare_timeout(self, participant):
"""
What if participant doesn't respond to PREPARE?
Option 1: Abort the transaction
Option 2: Wait longer (risky if participant recovers slowly)
Typically: Abort if timeout occurs
"""
logger.warning(f"Participant {participant.id} timed out during PREPARE")
return "VOTE_ABORT"
async def handle_commit_timeout(self, participant):
"""
What if participant doesn't respond to COMMIT?
The participant may have:
1. Committed but ACK lost
2. Committed and waiting for more work
3. Failed after commit but before ACK
Solution: Retry COMMIT until ACK received
"""
logger.warning(f"Participant {participant.id} timed out during COMMIT")
# Keep retrying until we get ACK
while True:
try:
await participant.commit()
break
except Exception:
await asyncio.sleep(1) # Retry after 1 second
async def handle_abort_timeout(self, participant):
"""
What if participant doesn't respond to ABORT?
Similar to commit - keep retrying until ACK
"""
logger.warning(f"Participant {participant.id} timed out during ABORT")
while True:
try:
await participant.abort()
break
except Exception:
await asyncio.sleep(1)
Advantages and Disadvantages
Advantages
The Two-Phase Commit protocol offers several important benefits:
advantages = {
"atomicity": "Guarantees all-or-nothing commit across distributed nodes",
"consistency": "Ensures all participants see the same outcome",
"simplicity": "Relatively straightforward to understand and implement",
"universality": "Works with any transactional resource (databases, message queues, etc.)",
"standard": "Widely supported by databases and transaction managers"
}
Disadvantages
However, 2PC has significant limitations:
disadvantages = {
"blocking": "Participants can be blocked waiting for coordinator recovery",
"single_point_of_failure": "Coordinator failure can stall transactions",
"latency": "Requires multiple round-trips between coordinator and participants",
"lock_holding": "Locks are held during the entire protocol execution",
"scalability": "Doesn't scale well to many participants"
}
# Detailed explanation of blocking problem
blocking_explanation = """
The Blocking Problem:
====================
If the coordinator fails after sending PREPARE messages but before
sending the final decision, participants are left in a prepared state.
Example timeline:
1. Coordinator sends PREPARE to Participant A
2. Participant A votes YES and enters prepared state
3. Coordinator sends PREPARE to Participant B
4. Coordinator crashes before getting B's response
5. Participant A is now PREPARED and LOCKED
6. Coordinator is down - no decision can be made
7. Participant A must WAIT indefinitely for coordinator recovery
During this wait:
- Locks are held
- Resources are consumed
- Other transactions may be blocked
This is why 3PC and consensus algorithms (Paxos, Raft) were invented.
"""
Real-World Implementations
Database Systems
Many databases implement 2PC or variants:
database_implementations = {
"PostgreSQL": {
"name": "PostgreSQL Two-Phase Commit",
"description": "Full 2PC support with PREPARE TRANSACTION and COMMIT PREPARED",
"sql_example": """
PREPARE TRANSACTION 'transaction_id';
-- (later)
COMMIT PREPARED 'transaction_id';
"""
},
"MySQL": {
"name": "MySQL XA Support",
"description": "XA (eXtended Architecture) distributed transaction support",
"sql_example": """
XA START 'transaction_id';
-- SQL operations
XA END 'transaction_id';
XA PREPARE 'transaction_id';
XA COMMIT 'transaction_id';
"""
},
"Oracle": {
"name": "Oracle Two-Phase Commit",
"description": "Distributed transaction support across Oracle databases"
},
"SQL Server": {
"name": "MS DTC",
"description": "Microsoft Distributed Transaction Coordinator"
}
}
Message Queue Implementations
message_queue_2pc = {
"Kafka": {
"name": "Exactly-once Semantics",
"description": "Uses 2PC-like pattern with transactional producers and consumers",
"code_example": """
producer.initTransactions();
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
"""
},
"RabbitMQ": {
"name": "Publisher Confirms",
"description": "Publisher confirms for reliable delivery"
}
}
Alternatives to 2PC
Given its limitations, several alternatives exist:
Three-Phase Commit (3PC)
three_phase_commit = """
3PC adds a third phase to eliminate blocking:
Phase 1: Can Commit? (similar to 2PC prepare)
Phase 2: Pre-Commit (Coordinator sends "I will commit")
Phase 3: Do Commit (Coordinator sends "Commit now")
Key improvement:
- If coordinator fails, participants can timeout and decide
- No participant is left in uncertain state indefinitely
However, 3PC still has issues with network partitions.
"""
Saga Pattern
saga_pattern = """
For microservices, the Saga pattern is often preferred:
Instead of ACID transactions across services:
1. Each service executes its local transaction
2. If one fails, other services execute compensating transactions
3. The overall process is eventually consistent
Example:
- Service A: Reserve Inventory (local transaction)
- Service B: Charge Payment (local transaction)
- Service C: Create Order (local transaction)
If Service C fails:
- Service B: Refund Payment (compensating transaction)
- Service A: Release Inventory (compensating transaction)
Trade-off: Eventual consistency instead of strong consistency
"""
Consensus Algorithms
consensus_alternatives = {
"Paxos": "Leader-based consensus, widely used (ZooKeeper)",
"Raft": "Simpler alternative to Paxos (etcd, Consul)",
"Multi-Paxos": "Extends Paxos for replicated state machines"
}
# When to use which:
when_to_use = {
"2PC": "Simple, few participants, blocking is acceptable",
"3PC": "Want to reduce blocking, network partitions unlikely",
"Saga": "Microservices, eventual consistency is acceptable",
"Paxos/Raft": "Critical systems, need strong consistency, fault tolerance"
}
Best Practices
Implementation Guidelines
best_practices = {
"1_use_wal": "Always write to Write-Ahead Log before voting",
"2_timeout_config": "Configure timeouts appropriately for your network",
"3_retry_logic": "Implement retry logic for transient failures",
"4_monitoring": "Monitor prepared transactions and coordinator health",
"5_limit_participants": "Minimize the number of participants",
"6_consider_sagas": "Consider Saga pattern for microservices"
}
# Configuration example
configuration = {
"prepare_timeout_seconds": 30,
"commit_timeout_seconds": 60,
"max_retry_attempts": 3,
"retry_backoff_ms": 100,
"enable_recovery": True,
"log_prepared_transactions": True
}
When to Use 2PC
use_2pc_when = [
"You need strong atomicity guarantees",
"Number of participants is small (< 10)",
"Network is reliable",
"Blocking is acceptable (or rare)",
"Participants support 2PC natively"
]
avoid_2pc_when = [
"High availability is critical",
"Network partitions are common",
"Many participants involved",
"Eventual consistency is acceptable",
"System needs to scale horizontally"
]
Monitoring and Observability
Key Metrics to Track
metrics_to_monitor = {
"transaction_duration": "Time from start to commit/abort",
"prepare_latency": "Time for prepare phase",
"commit_latency": "Time for commit phase",
"abort_rate": "Percentage of aborted transactions",
"prepared_transactions": "Number of stuck prepared transactions",
"coordinator_failures": "Number of coordinator failures",
"participant_timeouts": "Number of participant timeouts"
}
# Alerting thresholds
alerting = {
"prepared_transactions_stuck": "> 0 for > 5 minutes",
"abort_rate": "> 10%",
"transaction_duration_p99": "> 10 seconds"
}
Conclusion
The Two-Phase Commit protocol remains a fundamental tool for achieving atomicity in distributed systems. While its blocking nature and single point of failure make it unsuitable for all use cases, it provides a straightforward solution for many distributed transaction scenarios.
Key takeaways:
- 2PC guarantees atomicity across distributed nodes through a coordinated commit/abort protocol
- The blocking problem is the main limitation - participants wait for coordinator recovery
- Proper implementation requires WAL logging, timeout handling, and recovery procedures
- Consider alternatives like Saga patterns or consensus algorithms for microservices or high-availability systems
Understanding 2PC provides a foundation for understanding more complex distributed transaction patterns and consensus algorithms.
Comments