Introduction
Database transactions are fundamental to data integrity. They ensure that database operations complete atomically, maintain consistency, and don’t interfere with each other. Understanding transactions is essential for building reliable applications that handle concurrent access correctly.
This guide covers ACID properties, isolation levels, transaction patterns, and strategies for managing concurrency in modern applications.
ACID Properties
Atomicity
All operations in a transaction succeed or all fail together:
import psycopg2
from psycopg2 import Error
def transfer_funds(from_account, to_account, amount):
conn = psycopg2.connect(database="bank")
try:
conn.autocommit = False
cursor = conn.cursor()
# Debit from source account
cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)
# Credit to destination account
cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)
# Both succeed or both fail
conn.commit()
except Error as e:
conn.rollback() # Undo all changes
raise TransferError(f"Transfer failed: {e}")
finally:
conn.close()
Consistency
Transactions maintain database invariants:
# Database constraint ensures consistency
# CREATE TABLE accounts (
# id SERIAL PRIMARY KEY,
# balance DECIMAL(15,2) CHECK (balance >= 0)
# );
def withdraw(account_id, amount):
"""Withdrawal maintains balance >= 0 constraint."""
conn = psycopg2.connect(database="bank")
try:
with conn.cursor() as cursor:
# This will fail if balance would go negative
cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, account_id)
)
conn.commit()
except Error as e:
conn.rollback()
raise InsufficientFundsError(f"Withdrawal failed: {e}")
finally:
conn.close()
Isolation
Concurrent transactions appear to execute sequentially:
# Transaction 1: Read isolation
def get_account_balance(account_id):
conn = psycopg2.connect(database="bank")
# Default isolation level depends on database
cursor = conn.cursor()
cursor.execute(
"SELECT balance FROM accounts WHERE id = %s",
(account_id,)
)
balance = cursor.fetchone()[0]
conn.close()
return balance
# Transaction isolation prevents dirty reads
def get_balances():
conn = psycopg2.connect(database="bank")
conn.isolation_level = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
cursor = conn.cursor()
cursor.execute("SELECT id, balance FROM accounts")
balances = cursor.fetchall()
conn.close()
return balances
Durability
Committed data survives system failures:
# PostgreSQL: Synchronous commit ensures durability
def commit_with_durability(conn):
cursor = conn.cursor()
# Synchronous commit - waits for WAL write
cursor.execute("SET synchronous_commit = ON")
cursor.execute("UPDATE accounts SET balance = balance - 100")
conn.commit()
# Data is durable after commit returns
Isolation Levels
Read Uncommitted
Lowest isolation - can see uncommitted changes:
-- MySQL: Set isolation level
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
-- Can read uncommitted data from other transactions
SELECT * FROM accounts WHERE id = 1;
-- Might see: balance = 500 (even if other tx hasn't committed)
Read Committed
Default in PostgreSQL and Oracle - only see committed changes:
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- PostgreSQL: Each query sees snapshot from start of query
-- This can cause non-repeatable reads
BEGIN;
SELECT balance FROM accounts WHERE id = 1; -- Returns 500
-- (Another transaction updates balance to 400 and commits)
SELECT balance FROM accounts WHERE id = 1; -- Returns 400!
COMMIT;
Repeatable Read
MySQL default - same read returns consistent results:
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- PostgreSQL: Uses MVCC, but may cause phantom reads
BEGIN;
SELECT balance FROM accounts WHERE id = 1; -- Returns 500
-- (Another transaction inserts new row)
SELECT * FROM accounts; -- Includes new row!
COMMIT;
Serializable
Highest isolation - transactions appear to run sequentially:
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- PostgreSQL: Uses SSI (Serializable Snapshot Isolation)
BEGIN;
SELECT SUM(balance) FROM accounts; -- Returns 10000
-- (Another transaction adds new account)
SELECT SUM(balance) FROM accounts; -- May fail with serialization failure
COMMIT;
Isolation Level Comparison
| Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads |
|---|---|---|---|
| Read Uncommitted | Possible | Possible | Possible |
| Read Committed | Prevented | Possible | Possible |
| Repeatable Read | Prevented | Prevented | Possible |
| Serializable | Prevented | Prevented | Prevented |
Transaction Patterns
Savepoints
def complex_operation():
conn = psycopg2.connect(database="bank")
try:
conn.autocommit = False
cursor = conn.cursor()
# Create savepoint
cursor.execute("SAVEPOINT step1")
try:
# First operation
cursor.execute("INSERT INTO transactions (type) VALUES ('deposit')")
conn.commit()
except Error:
# Rollback to savepoint
cursor.execute("ROLLBACK TO SAVEPOINT step1")
# Continue with other operations
cursor.execute("INSERT INTO transactions (type) VALUES ('withdrawal')")
conn.commit()
finally:
conn.close()
Explicit Locking
# Row-level lock
def transfer_with_lock(from_account, to_account, amount):
conn = psycopg2.connect(database="bank")
try:
conn.autocommit = False
cursor = conn.cursor()
# Lock rows in consistent order to prevent deadlocks
accounts = sorted([from_account, to_account])
for account_id in accounts:
cursor.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(account_id,)
)
# Perform transfer
cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)
cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)
conn.commit()
finally:
conn.close()
# Table-level lock
def archive_old_records():
conn = psycopg2.connect(database="bank")
try:
cursor = conn.cursor()
# Lock table exclusively
cursor.execute("LOCK TABLE accounts IN ACCESS EXCLUSIVE MODE")
# Move old records
cursor.execute("""
INSERT INTO accounts_archive
SELECT * FROM accounts WHERE created_at < NOW() - INTERVAL '1 year'
""")
cursor.execute("""
DELETE FROM accounts WHERE created_at < NOW() - INTERVAL '1 year'
""")
conn.commit()
finally:
conn.close()
Optimistic Locking
class OptimisticLockError(Exception):
pass
def update_with_optimistic_lock(cursor, table, record_id, updates, version):
"""Update only if version matches."""
# Build update query
set_clause = ", ".join([f"{k} = %s" for k in updates.keys()])
values = list(updates.values()) + [version, record_id]
query = f"""
UPDATE {table}
SET {set_clause}, version = version + 1
WHERE id = %s AND version = %s
"""
cursor.execute(query, values)
if cursor.rowcount == 0:
raise OptimisticLockError(f"Record {record_id} was modified by another transaction")
return cursor.rowcount
# Usage
def update_user_email(user_id, new_email, current_version):
conn = psycopg2.connect(database="app")
try:
cursor = conn.cursor()
update_with_optimistic_lock(
cursor,
"users",
user_id,
{"email": new_email},
current_version
)
conn.commit()
finally:
conn.close()
Pessimistic Locking
def update_with_pessimistic_lock(cursor, table, record_id):
"""Lock and hold until transaction ends."""
cursor.execute(
f"SELECT * FROM {table} WHERE id = %s FOR UPDATE",
(record_id,)
)
return cursor.fetchone()
Concurrency Control
Deadlock Detection
-- PostgreSQL: Set deadlock timeout
SET deadlock_timeout = '1s';
-- View waiting transactions
SELECT * FROM pg_stat_activity
WHERE wait_event_type = 'Lock';
-- Kill blocking transaction
SELECT pg_terminate_backend(pid);
import time
def transfer_with_deadlock_handling(from_account, to_account, amount, max_retries=3):
for attempt in range(max_retries):
conn = psycopg2.connect(database="bank")
try:
conn.autocommit = False
cursor = conn.cursor()
# Lock in consistent order
first, second = sorted([from_account, to_account])
cursor.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(first,)
)
cursor.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(second,)
)
cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)
cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)
conn.commit()
return True
except psycopg2.errors.LockNotAvailable:
conn.rollback()
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
continue
finally:
conn.close()
raise TransferError("Max retries exceeded due to lock conflicts")
Advisory Locks
# PostgreSQL advisory locks
def process_with_advisory_lock(lock_id, process_func):
conn = psycopg2.connect(database="app")
try:
cursor = conn.cursor()
# Acquire advisory lock
cursor.execute("SELECT pg_advisory_xact_lock(%s)", (lock_id,))
# Do the work
result = process_func()
conn.commit()
return result
finally:
conn.close()
# Try to acquire without blocking
def try_acquire_lock(lock_id):
conn = psycopg2.connect(database="app")
try:
cursor = conn.cursor()
cursor.execute("SELECT pg_try_advisory_xact_lock(%s)", (lock_id,))
return cursor.fetchone()[0]
finally:
conn.close()
Distributed Transactions
Two-Phase Commit
class TwoPhaseCommit:
def __init__(self, coordinators):
self.coordinators = coordinators # List of database connections
self.prepared = {}
def execute(self, operations):
"""Execute distributed transaction with 2PC."""
# Phase 1: Prepare
for name, conn in self.coordinators.items():
try:
conn.autocommit = False
cursor = conn.cursor()
# Execute operation
operation = operations[name]
cursor.execute(operation['sql'], operation['params'])
# Prepare for commit
cursor.execute("PREPARE TRANSACTION 'tx_" + name + "'")
self.prepared[name] = True
except Exception as e:
# Rollback this and all prepared
self._rollback_all(name)
raise TwoPCError(f"Prepare failed for {name}: {e}")
# Phase 2: Commit
for name, conn in self.coordinators.items():
try:
cursor = conn.cursor()
cursor.execute("COMMIT PREPARED 'tx_" + name + "'")
except Exception as e:
# This is problematic - need manual intervention
self._mark_inconsistent(name)
raise TwoPCError(f"Commit failed for {name}: {e}")
def _rollback_all(self, failed_name):
for name, conn in self.coordinators.items():
if name in self.prepared:
try:
cursor = conn.cursor()
cursor.execute("ROLLBACK PREPARED 'tx_" + name + "'")
except:
pass
def _mark_inconsistent(self, name):
# Log for manual resolution
print(f"WARNING: Transaction inconsistent for {name}")
Saga Pattern
class Saga:
def __init__(self):
self.steps = []
self.compensations = []
def add_step(self, forward, compensation):
"""Add saga step."""
self.steps.append(forward)
self.compensations.append(compensation)
def execute(self, context):
"""Execute saga with compensation on failure."""
completed = []
try:
for i, step in enumerate(self.steps):
result = step(context)
completed.append(i)
except Exception as e:
# Compensate in reverse order
for i in reversed(completed):
try:
self.compensations[i](context)
except Exception as comp_error:
# Log compensation failure
print(f"Compensation failed: {comp_error}")
raise SagaError(f"Saga failed at step {len(completed)}: {e}")
return context
# Example: Order processing saga
def create_order_saga():
saga = Saga()
# Step 1: Create order
def create_order(ctx):
order = db.orders.create(customer_id=ctx['customer_id'])
ctx['order_id'] = order.id
return ctx
def cancel_order(ctx):
db.orders.cancel(ctx['order_id'])
saga.add_step(create_order, cancel_order)
# Step 2: Reserve inventory
def reserve_inventory(ctx):
db.inventory.reserve(
items=ctx['items'],
order_id=ctx['order_id']
)
def release_inventory(ctx):
db.inventory.release(order_id=ctx['order_id'])
saga.add_step(reserve_inventory, release_inventory)
# Step 3: Process payment
def process_payment(ctx):
db.payments.charge(
customer_id=ctx['customer_id'],
amount=ctx['amount'],
order_id=ctx['order_id']
)
def refund_payment(ctx):
db.payments.refund(order_id=ctx['order_id'])
saga.add_step(process_payment, refund_payment)
return saga
Transaction Monitoring
import logging
from prometheus_client import Counter, Histogram
transaction_commits = Counter(
'db_transactions_commits_total',
'Total committed transactions',
['database', 'isolation_level']
)
transaction_rollbacks = Counter(
'db_transactions_rollbacks_total',
'Total rolled back transactions',
['database', 'reason']
)
transaction_duration = Histogram(
'db_transaction_duration_seconds',
'Transaction duration',
['database', 'operation']
)
class InstrumentedConnection:
def __init__(self, conn, db_name):
self.conn = conn
self.db_name = db_name
def commit(self):
with transaction_duration.labels(self.db_name, 'commit').time():
self.conn.commit()
transaction_commits.labels(self.db_name, 'unknown').inc()
def rollback(self, reason='unknown'):
with transaction_duration.labels(self.db_name, 'rollback').time():
self.conn.rollback()
transaction_rollbacks.labels(self.db_name, reason).inc()
Best Practices
- Keep Transactions Short: Minimize lock hold time
- Access Data in Consistent Order: Prevent deadlocks
- Use Appropriate Isolation Level: Balance consistency vs. performance
- Handle Failures Explicitly: Always rollback on errors
- Avoid Nested Transactions: Use savepoints instead
- Monitor for Long-Running Queries: Identify blocking issues
- Test Concurrency: Use tools to simulate concurrent access
Conclusion
Understanding database transactions is essential for building reliable applications. Start with the default isolation level and only change when needed. Use pessimistic locking for short, critical sections and optimistic locking for lower contention scenarios. For distributed systems, consider the Saga pattern over distributed transactions for better resilience.
Comments