Introduction
Database transactions are the foundation of reliable data operations. Understanding ACID properties, isolation levels, and concurrency control is essential for building robust applications.
Transaction management is the foundation of reliable data operations. This guide starts with fundamentals and advances through high-concurrency patterns.
ACID Properties
┌─────────────────────────────────────────────────────────────────────┐
│ ACID PROPERTIES │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ A │ │ C │ │ I │ │ D │ │
│ │ Atomicity│ │Consistency│ │Isolation │ │ Durability│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Atomicity: All or nothing │
│ Consistency: Valid state to valid state │
│ Isolation: Concurrent transactions appear serial │
│ Durability: Committed data survives crashes │
│ │
└─────────────────────────────────────────────────────────────────────┘
Isolation Levels
Understanding Levels
| Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads |
|---|---|---|---|
| READ UNCOMMITTED | Possible | Possible | Possible |
| READ COMMITTED | ❌ | Possible | Possible |
| REPEATABLE READ | ❌ | ❌ | Possible |
| SERIALIZABLE | ❌ | ❌ | ❌ |
Implementation
# PostgreSQL isolation levels
import psycopg2
from psycopg2 import sql
class TransactionManager:
def __init__(self, connection):
self.conn = connection
def set_isolation_level(self, level: str):
"""Set transaction isolation level"""
cursor = self.conn.cursor()
cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
async def read_committed(self, query: str):
"""READ COMMITTED - default in PostgreSQL"""
self.set_isolation_level("READ COMMITTED")
return await self.execute(query)
async def repeatable_read(self, query: str):
"""REPEATABLE READ - prevents non-repeatable reads"""
self.set_isolation_level("REPEATABLE READ")
return await self.execute(query)
async def serializable(self, query: str):
"""SERIALIZABLE - highest isolation"""
self.set_isolation_level("SERIALIZABLE")
return await self.execute(query)
# Example: Serializable transaction
async def transfer_funds_serializable(from_account: str, to_account: str, amount: float):
conn = await get_connection()
try:
# Set serializable isolation
await conn.set_isolation_level("SERIALIZABLE")
async with conn.transaction():
# Deduct from source
await conn.execute("""
UPDATE accounts
SET balance = balance - %s
WHERE id = %s AND balance >= %s
""", (amount, from_account, amount))
# Add to destination
await conn.execute("""
UPDATE accounts
SET balance = balance + %s
WHERE id = %s
""", (amount, to_account))
return True
except Exception as e:
await conn.rollback()
raise TransferError(f"Transfer failed: {e}")
Concurrency Control
Optimistic Locking
# Optimistic locking with version field
class OptimisticLock:
"""Optimistic concurrency control"""
async def update_with_lock(self, conn, table: str, data: dict, version: int):
"""Update only if version matches"""
# Build update query
query = f"""
UPDATE {table}
SET {', '.join(f'{k} = %s' for k in data.keys())},
version = version + 1
WHERE id = %s AND version = %s
"""
values = list(data.values()) + [data['id'], version]
cursor = await conn.execute(query, values)
if cursor.rowcount == 0:
raise ConcurrentModificationError("Record was modified by another transaction")
return True
# Usage with SQLAlchemy
class UserRepository:
def __init__(self, session):
self.session = session
async def update_user(self, user_id: int, updates: dict):
user = await self.session.get(User, user_id)
# Add version check
updates['version'] = user.version + 1
result = await self.session.execute(
update(User)
.where(User.id == user_id, User.version == user.version)
.values(**updates)
)
if result.rowcount == 0:
raise ConcurrentModificationError("User was modified")
await self.session.commit()
Pessimistic Locking
# Pessimistic locking with SELECT FOR UPDATE
class PessimisticLock:
"""Pessimistic concurrency control"""
async def transfer_with_lock(self, conn, from_id: str, to_id: str, amount: float):
"""Lock rows during transaction"""
async with conn.transaction():
# Lock source account (blocks other transactions)
source = await conn.fetchrow(
"SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
from_id
)
# Check balance
if source['balance'] < amount:
raise InsufficientFundsError()
# Lock destination account
dest = await conn.fetchrow(
"SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
to_id
)
# Perform transfer
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_id
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_id
)
return True
Transaction Patterns
Saga Pattern
# Saga pattern for distributed transactions
class Saga:
"""Saga pattern for managing distributed transactions"""
def __init__(self, steps: list):
self.steps = steps
self.completed_steps = []
async def execute(self) -> bool:
try:
for step in self.steps:
result = await step.execute()
self.completed_steps.append(step)
return True
except Exception as e:
# Compensate in reverse order
await self.compensate()
raise SagaExecutionError(f"Saga failed: {e}")
async def compensate(self):
"""Rollback completed steps"""
for step in reversed(self.completed_steps):
try:
await step.compensate()
except Exception as e:
# Log but continue compensation
log.error(f"Compensation failed for {step}: {e}")
# Example: Order processing saga
class OrderSaga:
@staticmethod
async def create_order(order_data: dict):
steps = [
ReserveInventoryStep(order_data),
ProcessPaymentStep(order_data),
CreateOrderStep(order_data),
NotifyCustomerStep(order_data)
]
saga = Saga(steps)
return await saga.execute()
class ReserveInventoryStep(SagaStep):
async def execute(self):
# Reserve inventory
await inventory_client.reserve(
items=self.data['items'],
order_id=self.data['order_id']
)
async def compensate(self):
# Release inventory
await inventory_client.release(
order_id=self.data['order_id']
)
Outbox Pattern
# Outbox pattern for reliable messaging
class OutboxPattern:
"""Reliable event publishing via database outbox"""
async def publish_event(self, conn, event_type: str, payload: dict):
"""Write to outbox table within transaction"""
async with conn.transaction():
# 1. Update business data
await conn.execute(
"UPDATE orders SET status = 'PROCESSING' WHERE id = $1",
payload['order_id']
)
# 2. Write to outbox (atomic!)
await conn.execute(
"""
INSERT INTO outbox (event_type, payload, created_at)
VALUES ($1, $2, NOW())
""",
event_type, json.dumps(payload)
)
return True
# Outbox processor
class OutboxProcessor:
"""Process outbox and publish events"""
async def process_outbox(self, batch_size: int = 100):
conn = await get_connection()
try:
# Get unprocessed events
events = await conn.fetch("""
SELECT * FROM outbox
WHERE processed = false
ORDER BY created_at
LIMIT $1
""", batch_size)
for event in events:
try:
# Publish to message broker
await self.publish(event)
# Mark as processed
await conn.execute(
"UPDATE outbox SET processed = true WHERE id = $1",
event['id']
)
except Exception as e:
# Increment retry count
await conn.execute("""
UPDATE outbox
SET retry_count = retry_count + 1,
last_error = $2
WHERE id = $1
""", event['id'], str(e))
finally:
await conn.close()
Handling Deadlocks
class DeadlockHandler:
"""Strategies for handling deadlocks"""
@staticmethod
async def execute_with_retry(func, max_retries: int = 3):
"""Retry on deadlock with exponential backoff"""
for attempt in range(max_retries):
try:
return await func()
except DeadlockError:
if attempt == max_retries - 1:
raise
# Exponential backoff
wait_time = (2 ** attempt) * 0.1
await asyncio.sleep(wait_time)
raise MaxRetriesExceededError()
# Prevent deadlocks with consistent lock ordering
async def safe_transfer(conn, from_id: str, to_id: str, amount: float):
"""Always acquire locks in same order"""
# Sort IDs to prevent deadlock
first_id, second_id = sorted([from_id, to_id])
async with conn.transaction():
# Always lock in same order
await conn.execute(
"SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
first_id
)
await conn.execute(
"SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
second_id
)
# Perform transfer
# ...
Best Practices
Good: Short Transactions
# Good: Keep transactions short
async def good_example(conn):
# Don't do this:
async with conn.transaction():
data = await fetch_data() # Could be slow
await process(data) # More processing
await more_processing() # Even more
# Transaction held open too long!
# Do this instead:
data = await fetch_data() # Outside transaction
processed = await process(data) # Outside transaction
async with conn.transaction():
await save_results(processed) # Quick write
Bad: Long-Running Transactions
# Bad: Don't hold locks during user interaction
async def bad_example(conn, user_request):
async with conn.transaction():
# User might go to lunch here!
await handle_request(user_request)
# Other transactions blocked...
Good: Explicit vs Implicit
# Explicit transaction control
async def explicit_transaction(conn):
try:
await conn.execute("BEGIN")
await conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 'A'")
await conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 'B'")
await conn.execute("COMMIT")
except:
await conn.execute("ROLLBACK")
raise
Conclusion
Transaction management is critical:
- Understand ACID: Foundation of data integrity
- Choose isolation level: Balance consistency vs performance
- Use optimistic locking: For low contention
- Use pessimistic locking: For high contention
- Handle deadlocks: Plan for failure
- Saga pattern: For distributed systems
Comments