Introduction
Database transactions are the foundation of reliable data operations. Understanding ACID properties, isolation levels, and concurrency control is essential for building robust applications.
This guide covers transaction management: from basic concepts to advanced patterns for high-concurrency systems.
ACID Properties
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ACID PROPERTIES โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ A โ โ C โ โ I โ โ D โ โ
โ โ Atomicityโ โConsistencyโ โIsolation โ โ Durabilityโ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ
โ Atomicity: All or nothing โ
โ Consistency: Valid state to valid state โ
โ Isolation: Concurrent transactions appear serial โ
โ Durability: Committed data survives crashes โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Isolation Levels
Understanding Levels
| Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads |
|---|---|---|---|
| READ UNCOMMITTED | Possible | Possible | Possible |
| READ COMMITTED | โ | Possible | Possible |
| REPEATABLE READ | โ | โ | Possible |
| SERIALIZABLE | โ | โ | โ |
Implementation
# PostgreSQL isolation levels
import psycopg2
from psycopg2 import sql
class TransactionManager:
def __init__(self, connection):
self.conn = connection
def set_isolation_level(self, level: str):
"""Set transaction isolation level"""
cursor = self.conn.cursor()
cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
async def read_committed(self, query: str):
"""READ COMMITTED - default in PostgreSQL"""
self.set_isolation_level("READ COMMITTED")
return await self.execute(query)
async def repeatable_read(self, query: str):
"""REPEATABLE READ - prevents non-repeatable reads"""
self.set_isolation_level("REPEATABLE READ")
return await self.execute(query)
async def serializable(self, query: str):
"""SERIALIZABLE - highest isolation"""
self.set_isolation_level("SERIALIZABLE")
return await self.execute(query)
# Example: Serializable transaction
async def transfer_funds_serializable(from_account: str, to_account: str, amount: float):
conn = await get_connection()
try:
# Set serializable isolation
await conn.set_isolation_level("SERIALIZABLE")
async with conn.transaction():
# Deduct from source
await conn.execute("""
UPDATE accounts
SET balance = balance - %s
WHERE id = %s AND balance >= %s
""", (amount, from_account, amount))
# Add to destination
await conn.execute("""
UPDATE accounts
SET balance = balance + %s
WHERE id = %s
""", (amount, to_account))
return True
except Exception as e:
await conn.rollback()
raise TransferError(f"Transfer failed: {e}")
Concurrency Control
Optimistic Locking
# Optimistic locking with version field
class OptimisticLock:
"""Optimistic concurrency control"""
async def update_with_lock(self, conn, table: str, data: dict, version: int):
"""Update only if version matches"""
# Build update query
query = f"""
UPDATE {table}
SET {', '.join(f'{k} = %s' for k in data.keys())},
version = version + 1
WHERE id = %s AND version = %s
"""
values = list(data.values()) + [data['id'], version]
cursor = await conn.execute(query, values)
if cursor.rowcount == 0:
raise ConcurrentModificationError("Record was modified by another transaction")
return True
# Usage with SQLAlchemy
class UserRepository:
def __init__(self, session):
self.session = session
async def update_user(self, user_id: int, updates: dict):
user = await self.session.get(User, user_id)
# Add version check
updates['version'] = user.version + 1
result = await self.session.execute(
update(User)
.where(User.id == user_id, User.version == user.version)
.values(**updates)
)
if result.rowcount == 0:
raise ConcurrentModificationError("User was modified")
await self.session.commit()
Pessimistic Locking
# Pessimistic locking with SELECT FOR UPDATE
class PessimisticLock:
"""Pessimistic concurrency control"""
async def transfer_with_lock(self, conn, from_id: str, to_id: str, amount: float):
"""Lock rows during transaction"""
async with conn.transaction():
# Lock source account (blocks other transactions)
source = await conn.fetchrow(
"SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
from_id
)
# Check balance
if source['balance'] < amount:
raise InsufficientFundsError()
# Lock destination account
dest = await conn.fetchrow(
"SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
to_id
)
# Perform transfer
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_id
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_id
)
return True
Transaction Patterns
Saga Pattern
# Saga pattern for distributed transactions
class Saga:
"""Saga pattern for managing distributed transactions"""
def __init__(self, steps: list):
self.steps = steps
self.completed_steps = []
async def execute(self) -> bool:
try:
for step in self.steps:
result = await step.execute()
self.completed_steps.append(step)
return True
except Exception as e:
# Compensate in reverse order
await self.compensate()
raise SagaExecutionError(f"Saga failed: {e}")
async def compensate(self):
"""Rollback completed steps"""
for step in reversed(self.completed_steps):
try:
await step.compensate()
except Exception as e:
# Log but continue compensation
log.error(f"Compensation failed for {step}: {e}")
# Example: Order processing saga
class OrderSaga:
@staticmethod
async def create_order(order_data: dict):
steps = [
ReserveInventoryStep(order_data),
ProcessPaymentStep(order_data),
CreateOrderStep(order_data),
NotifyCustomerStep(order_data)
]
saga = Saga(steps)
return await saga.execute()
class ReserveInventoryStep(SagaStep):
async def execute(self):
# Reserve inventory
await inventory_client.reserve(
items=self.data['items'],
order_id=self.data['order_id']
)
async def compensate(self):
# Release inventory
await inventory_client.release(
order_id=self.data['order_id']
)
Outbox Pattern
# Outbox pattern for reliable messaging
class OutboxPattern:
"""Reliable event publishing via database outbox"""
async def publish_event(self, conn, event_type: str, payload: dict):
"""Write to outbox table within transaction"""
async with conn.transaction():
# 1. Update business data
await conn.execute(
"UPDATE orders SET status = 'PROCESSING' WHERE id = $1",
payload['order_id']
)
# 2. Write to outbox (atomic!)
await conn.execute(
"""
INSERT INTO outbox (event_type, payload, created_at)
VALUES ($1, $2, NOW())
""",
event_type, json.dumps(payload)
)
return True
# Outbox processor
class OutboxProcessor:
"""Process outbox and publish events"""
async def process_outbox(self, batch_size: int = 100):
conn = await get_connection()
try:
# Get unprocessed events
events = await conn.fetch("""
SELECT * FROM outbox
WHERE processed = false
ORDER BY created_at
LIMIT $1
""", batch_size)
for event in events:
try:
# Publish to message broker
await self.publish(event)
# Mark as processed
await conn.execute(
"UPDATE outbox SET processed = true WHERE id = $1",
event['id']
)
except Exception as e:
# Increment retry count
await conn.execute("""
UPDATE outbox
SET retry_count = retry_count + 1,
last_error = $2
WHERE id = $1
""", event['id'], str(e))
finally:
await conn.close()
Handling Deadlocks
class DeadlockHandler:
"""Strategies for handling deadlocks"""
@staticmethod
async def execute_with_retry(func, max_retries: int = 3):
"""Retry on deadlock with exponential backoff"""
for attempt in range(max_retries):
try:
return await func()
except DeadlockError:
if attempt == max_retries - 1:
raise
# Exponential backoff
wait_time = (2 ** attempt) * 0.1
await asyncio.sleep(wait_time)
raise MaxRetriesExceededError()
# Prevent deadlocks with consistent lock ordering
async def safe_transfer(conn, from_id: str, to_id: str, amount: float):
"""Always acquire locks in same order"""
# Sort IDs to prevent deadlock
first_id, second_id = sorted([from_id, to_id])
async with conn.transaction():
# Always lock in same order
await conn.execute(
"SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
first_id
)
await conn.execute(
"SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
second_id
)
# Perform transfer
# ...
Best Practices
Good: Short Transactions
# Good: Keep transactions short
async def good_example(conn):
# Don't do this:
async with conn.transaction():
data = await fetch_data() # Could be slow
await process(data) # More processing
await more_processing() # Even more
# Transaction held open too long!
# Do this instead:
data = await fetch_data() # Outside transaction
processed = await process(data) # Outside transaction
async with conn.transaction():
await save_results(processed) # Quick write
Bad: Long-Running Transactions
# Bad: Don't hold locks during user interaction
async def bad_example(conn, user_request):
async with conn.transaction():
# User might go to lunch here!
await handle_request(user_request)
# Other transactions blocked...
Good: Explicit vs Implicit
# Explicit transaction control
async def explicit_transaction(conn):
try:
await conn.execute("BEGIN")
await conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 'A'")
await conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 'B'")
await conn.execute("COMMIT")
except:
await conn.execute("ROLLBACK")
raise
Conclusion
Transaction management is critical:
- Understand ACID: Foundation of data integrity
- Choose isolation level: Balance consistency vs performance
- Use optimistic locking: For low contention
- Use pessimistic locking: For high contention
- Handle deadlocks: Plan for failure
- Saga pattern: For distributed systems
Comments