Skip to main content
โšก Calmops

Database Transactions: ACID Properties, Isolation Levels, and Concurrency Control

Introduction

Database transactions are fundamental to data integrity. They ensure that database operations complete atomically, maintain consistency, and don’t interfere with each other. Understanding transactions is essential for building reliable applications that handle concurrent access correctly.

This guide covers ACID properties, isolation levels, transaction patterns, and strategies for managing concurrency in modern applications.

ACID Properties

Atomicity

All operations in a transaction succeed or all fail together:

import psycopg2
from psycopg2 import Error

def transfer_funds(from_account, to_account, amount):
    conn = psycopg2.connect(database="bank")
    
    try:
        conn.autocommit = False
        
        cursor = conn.cursor()
        
        # Debit from source account
        cursor.execute(
            "UPDATE accounts SET balance = balance - %s WHERE id = %s",
            (amount, from_account)
        )
        
        # Credit to destination account
        cursor.execute(
            "UPDATE accounts SET balance = balance + %s WHERE id = %s",
            (amount, to_account)
        )
        
        # Both succeed or both fail
        conn.commit()
        
    except Error as e:
        conn.rollback()  # Undo all changes
        raise TransferError(f"Transfer failed: {e}")
    finally:
        conn.close()

Consistency

Transactions maintain database invariants:

# Database constraint ensures consistency
# CREATE TABLE accounts (
#     id SERIAL PRIMARY KEY,
#     balance DECIMAL(15,2) CHECK (balance >= 0)
# );

def withdraw(account_id, amount):
    """Withdrawal maintains balance >= 0 constraint."""
    conn = psycopg2.connect(database="bank")
    
    try:
        with conn.cursor() as cursor:
            # This will fail if balance would go negative
            cursor.execute(
                "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                (amount, account_id)
            )
            
            conn.commit()
            
    except Error as e:
        conn.rollback()
        raise InsufficientFundsError(f"Withdrawal failed: {e}")
    finally:
        conn.close()

Isolation

Concurrent transactions appear to execute sequentially:

# Transaction 1: Read isolation
def get_account_balance(account_id):
    conn = psycopg2.connect(database="bank")
    
    # Default isolation level depends on database
    cursor = conn.cursor()
    
    cursor.execute(
        "SELECT balance FROM accounts WHERE id = %s",
        (account_id,)
    )
    
    balance = cursor.fetchone()[0]
    conn.close()
    
    return balance

# Transaction isolation prevents dirty reads
def get_balances():
    conn = psycopg2.connect(database="bank")
    conn.isolation_level = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
    
    cursor = conn.cursor()
    cursor.execute("SELECT id, balance FROM accounts")
    
    balances = cursor.fetchall()
    conn.close()
    
    return balances

Durability

Committed data survives system failures:

# PostgreSQL: Synchronous commit ensures durability
def commit_with_durability(conn):
    cursor = conn.cursor()
    
    # Synchronous commit - waits for WAL write
    cursor.execute("SET synchronous_commit = ON")
    
    cursor.execute("UPDATE accounts SET balance = balance - 100")
    conn.commit()
    
    # Data is durable after commit returns

Isolation Levels

Read Uncommitted

Lowest isolation - can see uncommitted changes:

-- MySQL: Set isolation level
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

-- Can read uncommitted data from other transactions
SELECT * FROM accounts WHERE id = 1;
-- Might see: balance = 500 (even if other tx hasn't committed)

Read Committed

Default in PostgreSQL and Oracle - only see committed changes:

SET TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- PostgreSQL: Each query sees snapshot from start of query
-- This can cause non-repeatable reads
BEGIN;
SELECT balance FROM accounts WHERE id = 1;  -- Returns 500
-- (Another transaction updates balance to 400 and commits)
SELECT balance FROM accounts WHERE id = 1;  -- Returns 400!
COMMIT;

Repeatable Read

MySQL default - same read returns consistent results:

SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- PostgreSQL: Uses MVCC, but may cause phantom reads
BEGIN;
SELECT balance FROM accounts WHERE id = 1;  -- Returns 500
-- (Another transaction inserts new row)
SELECT * FROM accounts;  -- Includes new row!
COMMIT;

Serializable

Highest isolation - transactions appear to run sequentially:

SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

-- PostgreSQL: Uses SSI (Serializable Snapshot Isolation)
BEGIN;
SELECT SUM(balance) FROM accounts;  -- Returns 10000
-- (Another transaction adds new account)
SELECT SUM(balance) FROM accounts;  -- May fail with serialization failure
COMMIT;

Isolation Level Comparison

Level Dirty Reads Non-Repeatable Reads Phantom Reads
Read Uncommitted Possible Possible Possible
Read Committed Prevented Possible Possible
Repeatable Read Prevented Prevented Possible
Serializable Prevented Prevented Prevented

Transaction Patterns

Savepoints

def complex_operation():
    conn = psycopg2.connect(database="bank")
    
    try:
        conn.autocommit = False
        cursor = conn.cursor()
        
        # Create savepoint
        cursor.execute("SAVEPOINT step1")
        
        try:
            # First operation
            cursor.execute("INSERT INTO transactions (type) VALUES ('deposit')")
            conn.commit()
        except Error:
            # Rollback to savepoint
            cursor.execute("ROLLBACK TO SAVEPOINT step1")
        
        # Continue with other operations
        cursor.execute("INSERT INTO transactions (type) VALUES ('withdrawal')")
        conn.commit()
        
    finally:
        conn.close()

Explicit Locking

# Row-level lock
def transfer_with_lock(from_account, to_account, amount):
    conn = psycopg2.connect(database="bank")
    
    try:
        conn.autocommit = False
        cursor = conn.cursor()
        
        # Lock rows in consistent order to prevent deadlocks
        accounts = sorted([from_account, to_account])
        
        for account_id in accounts:
            cursor.execute(
                "SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
                (account_id,)
            )
        
        # Perform transfer
        cursor.execute(
            "UPDATE accounts SET balance = balance - %s WHERE id = %s",
            (amount, from_account)
        )
        
        cursor.execute(
            "UPDATE accounts SET balance = balance + %s WHERE id = %s",
            (amount, to_account)
        )
        
        conn.commit()
        
    finally:
        conn.close()

# Table-level lock
def archive_old_records():
    conn = psycopg2.connect(database="bank")
    
    try:
        cursor = conn.cursor()
        
        # Lock table exclusively
        cursor.execute("LOCK TABLE accounts IN ACCESS EXCLUSIVE MODE")
        
        # Move old records
        cursor.execute("""
            INSERT INTO accounts_archive 
            SELECT * FROM accounts WHERE created_at < NOW() - INTERVAL '1 year'
        """)
        
        cursor.execute("""
            DELETE FROM accounts WHERE created_at < NOW() - INTERVAL '1 year'
        """)
        
        conn.commit()
        
    finally:
        conn.close()

Optimistic Locking

class OptimisticLockError(Exception):
    pass

def update_with_optimistic_lock(cursor, table, record_id, updates, version):
    """Update only if version matches."""
    
    # Build update query
    set_clause = ", ".join([f"{k} = %s" for k in updates.keys()])
    values = list(updates.values()) + [version, record_id]
    
    query = f"""
        UPDATE {table}
        SET {set_clause}, version = version + 1
        WHERE id = %s AND version = %s
    """
    
    cursor.execute(query, values)
    
    if cursor.rowcount == 0:
        raise OptimisticLockError(f"Record {record_id} was modified by another transaction")
    
    return cursor.rowcount

# Usage
def update_user_email(user_id, new_email, current_version):
    conn = psycopg2.connect(database="app")
    
    try:
        cursor = conn.cursor()
        
        update_with_optimistic_lock(
            cursor,
            "users",
            user_id,
            {"email": new_email},
            current_version
        )
        
        conn.commit()
        
    finally:
        conn.close()

Pessimistic Locking

def update_with_pessimistic_lock(cursor, table, record_id):
    """Lock and hold until transaction ends."""
    
    cursor.execute(
        f"SELECT * FROM {table} WHERE id = %s FOR UPDATE",
        (record_id,)
    )
    
    return cursor.fetchone()

Concurrency Control

Deadlock Detection

-- PostgreSQL: Set deadlock timeout
SET deadlock_timeout = '1s';

-- View waiting transactions
SELECT * FROM pg_stat_activity 
WHERE wait_event_type = 'Lock';

-- Kill blocking transaction
SELECT pg_terminate_backend(pid);
import time

def transfer_with_deadlock_handling(from_account, to_account, amount, max_retries=3):
    for attempt in range(max_retries):
        conn = psycopg2.connect(database="bank")
        
        try:
            conn.autocommit = False
            cursor = conn.cursor()
            
            # Lock in consistent order
            first, second = sorted([from_account, to_account])
            
            cursor.execute(
                "SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
                (first,)
            )
            
            cursor.execute(
                "SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
                (second,)
            )
            
            cursor.execute(
                "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                (amount, from_account)
            )
            
            cursor.execute(
                "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                (amount, to_account)
            )
            
            conn.commit()
            return True
            
        except psycopg2.errors.LockNotAvailable:
            conn.rollback()
            time.sleep(0.1 * (attempt + 1))  # Exponential backoff
            continue
            
        finally:
            conn.close()
    
    raise TransferError("Max retries exceeded due to lock conflicts")

Advisory Locks

# PostgreSQL advisory locks
def process_with_advisory_lock(lock_id, process_func):
    conn = psycopg2.connect(database="app")
    
    try:
        cursor = conn.cursor()
        
        # Acquire advisory lock
        cursor.execute("SELECT pg_advisory_xact_lock(%s)", (lock_id,))
        
        # Do the work
        result = process_func()
        
        conn.commit()
        return result
        
    finally:
        conn.close()

# Try to acquire without blocking
def try_acquire_lock(lock_id):
    conn = psycopg2.connect(database="app")
    
    try:
        cursor = conn.cursor()
        cursor.execute("SELECT pg_try_advisory_xact_lock(%s)", (lock_id,))
        return cursor.fetchone()[0]
        
    finally:
        conn.close()

Distributed Transactions

Two-Phase Commit

class TwoPhaseCommit:
    def __init__(self, coordinators):
        self.coordinators = coordinators  # List of database connections
        self.prepared = {}
    
    def execute(self, operations):
        """Execute distributed transaction with 2PC."""
        
        # Phase 1: Prepare
        for name, conn in self.coordinators.items():
            try:
                conn.autocommit = False
                cursor = conn.cursor()
                
                # Execute operation
                operation = operations[name]
                cursor.execute(operation['sql'], operation['params'])
                
                # Prepare for commit
                cursor.execute("PREPARE TRANSACTION 'tx_" + name + "'")
                self.prepared[name] = True
                
            except Exception as e:
                # Rollback this and all prepared
                self._rollback_all(name)
                raise TwoPCError(f"Prepare failed for {name}: {e}")
        
        # Phase 2: Commit
        for name, conn in self.coordinators.items():
            try:
                cursor = conn.cursor()
                cursor.execute("COMMIT PREPARED 'tx_" + name + "'")
                
            except Exception as e:
                # This is problematic - need manual intervention
                self._mark_inconsistent(name)
                raise TwoPCError(f"Commit failed for {name}: {e}")
    
    def _rollback_all(self, failed_name):
        for name, conn in self.coordinators.items():
            if name in self.prepared:
                try:
                    cursor = conn.cursor()
                    cursor.execute("ROLLBACK PREPARED 'tx_" + name + "'")
                except:
                    pass
    
    def _mark_inconsistent(self, name):
        # Log for manual resolution
        print(f"WARNING: Transaction inconsistent for {name}")

Saga Pattern

class Saga:
    def __init__(self):
        self.steps = []
        self.compensations = []
    
    def add_step(self, forward, compensation):
        """Add saga step."""
        self.steps.append(forward)
        self.compensations.append(compensation)
    
    def execute(self, context):
        """Execute saga with compensation on failure."""
        completed = []
        
        try:
            for i, step in enumerate(self.steps):
                result = step(context)
                completed.append(i)
                
        except Exception as e:
            # Compensate in reverse order
            for i in reversed(completed):
                try:
                    self.compensations[i](context)
                except Exception as comp_error:
                    # Log compensation failure
                    print(f"Compensation failed: {comp_error}")
            
            raise SagaError(f"Saga failed at step {len(completed)}: {e}")
        
        return context

# Example: Order processing saga
def create_order_saga():
    saga = Saga()
    
    # Step 1: Create order
    def create_order(ctx):
        order = db.orders.create(customer_id=ctx['customer_id'])
        ctx['order_id'] = order.id
        return ctx
    
    def cancel_order(ctx):
        db.orders.cancel(ctx['order_id'])
    
    saga.add_step(create_order, cancel_order)
    
    # Step 2: Reserve inventory
    def reserve_inventory(ctx):
        db.inventory.reserve(
            items=ctx['items'],
            order_id=ctx['order_id']
        )
    
    def release_inventory(ctx):
        db.inventory.release(order_id=ctx['order_id'])
    
    saga.add_step(reserve_inventory, release_inventory)
    
    # Step 3: Process payment
    def process_payment(ctx):
        db.payments.charge(
            customer_id=ctx['customer_id'],
            amount=ctx['amount'],
            order_id=ctx['order_id']
        )
    
    def refund_payment(ctx):
        db.payments.refund(order_id=ctx['order_id'])
    
    saga.add_step(process_payment, refund_payment)
    
    return saga

Transaction Monitoring

import logging
from prometheus_client import Counter, Histogram

transaction_commits = Counter(
    'db_transactions_commits_total',
    'Total committed transactions',
    ['database', 'isolation_level']
)

transaction_rollbacks = Counter(
    'db_transactions_rollbacks_total',
    'Total rolled back transactions',
    ['database', 'reason']
)

transaction_duration = Histogram(
    'db_transaction_duration_seconds',
    'Transaction duration',
    ['database', 'operation']
)

class InstrumentedConnection:
    def __init__(self, conn, db_name):
        self.conn = conn
        self.db_name = db_name
    
    def commit(self):
        with transaction_duration.labels(self.db_name, 'commit').time():
            self.conn.commit()
            transaction_commits.labels(self.db_name, 'unknown').inc()
    
    def rollback(self, reason='unknown'):
        with transaction_duration.labels(self.db_name, 'rollback').time():
            self.conn.rollback()
            transaction_rollbacks.labels(self.db_name, reason).inc()

Best Practices

  1. Keep Transactions Short: Minimize lock hold time
  2. Access Data in Consistent Order: Prevent deadlocks
  3. Use Appropriate Isolation Level: Balance consistency vs. performance
  4. Handle Failures Explicitly: Always rollback on errors
  5. Avoid Nested Transactions: Use savepoints instead
  6. Monitor for Long-Running Queries: Identify blocking issues
  7. Test Concurrency: Use tools to simulate concurrent access

Conclusion

Understanding database transactions is essential for building reliable applications. Start with the default isolation level and only change when needed. Use pessimistic locking for short, critical sections and optimistic locking for lower contention scenarios. For distributed systems, consider the Saga pattern over distributed transactions for better resilience.

Comments