Skip to main content
โšก Calmops

Database Transactions: ACID Properties and Isolation Levels

Introduction

Database transactions are the foundation of reliable data applications. They ensure that your data remains consistent even when multiple users or applications access it simultaneously. Understanding transactionsโ€”how they work, when to use them, and how to avoid pitfallsโ€”is essential for any developer working with databases.

This comprehensive guide covers everything from ACID properties to isolation levels, from locking strategies to distributed transactions, with practical examples in Python and SQL.

ACID Properties

ACID is an acronym that defines the four key properties every reliable database transaction must have:

Atomicity

Atomicity ensures that a transaction is treated as a single unitโ€”all operations succeed, or none do. If any operation fails, the entire transaction is rolled back as if it never happened.

# Atomic transaction example
import psycopg2

def transfer_funds(from_account: int, to_account: int, amount: float):
    """Transfer funds atomically - either both debit and credit succeed."""
    conn = psycopg2.connect(database="bank")
    
    try:
        conn.autocommit = False  # Start transaction
        cursor = conn.cursor()
        
        # Debit from source account
        cursor.execute(
            "UPDATE accounts SET balance = balance - %s WHERE id = %s",
            (amount, from_account)
        )
        
        # Credit to destination account
        cursor.execute(
            "UPDATE accounts SET balance = balance + %s WHERE id = %s",
            (amount, to_account)
        )
        
        # Commit only if both succeed
        conn.commit()
        
    except Exception as e:
        # Rollback entire transaction on any failure
        conn.rollback()
        raise TransferError(f"Transfer failed: {e}")
    finally:
        conn.close()

Consistency

Consistency ensures that a transaction brings the database from one valid state to another. All constraints, triggers, and rules must be satisfied.

# Consistency enforced by constraints
# This constraint ensures balance never goes negative
"""
CREATE TABLE accounts (
    id SERIAL PRIMARY KEY,
    balance DECIMAL(15,2) NOT NULL,
    CONSTRAINT balance_non_negative CHECK (balance >= 0)
);
"""

# Attempting to violate constraint automatically fails
def withdraw(account_id: int, amount: float):
    conn = psycopg2.connect(database="bank")
    
    try:
        cursor = conn.cursor()
        cursor.execute(
            "UPDATE accounts SET balance = balance - %s WHERE id = %s",
            (amount, account_id)
        )
        conn.commit()
        
    except psycopg2.errors.CheckViolation:
        conn.rollback()
        raise InsufficientFundsError("Balance would go negative")
    finally:
        conn.close()

Isolation

Isolation ensures that concurrent transactions execute as if they were sequential. Users don’t see intermediate states from other transactions.

# Isolation prevents dirty reads
# Transaction 1 starts
conn1 = psycopg2.connect(database="bank")
conn1.autocommit = False
cursor1 = conn1.cursor()

# Transaction 2 starts
conn2 = psycopg2.connect(database="bank")
conn2.autocommit = False
cursor2 = conn2.cursor()

# Transaction 1: Updates balance but doesn't commit
cursor1.execute(
    "UPDATE accounts SET balance = balance - 100 WHERE id = 1"
)

# Transaction 2: Tries to read balance
# With READ COMMITTED (default): sees original value
# With REPEATABLE READ: sees original value until T1 commits
cursor2.execute("SELECT balance FROM accounts WHERE id = 1")
balance = cursor2.fetchone()[0]  # Doesn't see T1's uncommitted change

Durability

Durability ensures that once a transaction is committed, it survives even system crashes. This is typically achieved through write-ahead logging (WAL).

# PostgreSQL durability settings
def configure_durability(conn):
    cursor = conn.cursor()
    
    # Synchronous commit - wait for WAL write
    cursor.execute("SET synchronous_commit = ON")
    
    # For maximum durability, use replication
    # synchronous_commit = on + multiple replicas

Transaction Control Statements

Basic Transaction Control

-- Start a transaction
BEGIN;

-- Or explicitly
START TRANSACTION;

-- Make changes
INSERT INTO accounts (id, name, balance) VALUES (1, 'Alice', 1000);
INSERT INTO accounts (id, name, balance) VALUES (2, 'Bob', 500);

-- Savepoint for partial rollback
SAVEPOINT after_alice;

-- More changes
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;

-- Rollback to savepoint
ROLLBACK TO after_alice;

-- Commit the transaction
COMMIT;

Savepoints

# Using savepoints for complex transactions
def complex_operation(user_id: int, order_data: dict):
    conn = psycopg2.connect(database="app")
    
    try:
        conn.autocommit = False
        cursor = conn.cursor()
        
        # Create order
        cursor.execute(
            "INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id",
            (user_id, order_data['total'])
        )
        order_id = cursor.fetchone()[0]
        
        # Create savepoint
        cursor.execute("SAVEPOINT after_order")
        
        try:
            # Add order items
            for item in order_data['items']:
                cursor.execute(
                    "INSERT INTO order_items (order_id, product_id, quantity) VALUES (%s, %s, %s)",
                    (order_id, item['product_id'], item['quantity'])
                )
                
                # Update inventory
                cursor.execute(
                    "UPDATE inventory SET quantity = quantity - %s WHERE product_id = %s",
                    (item['quantity'], item['product_id'])
                )
            
            conn.commit()
            
        except Exception as e:
            # Rollback to savepoint, keep order
            cursor.execute("ROLLBACK TO SAVEPOINT after_order")
            conn.commit()
            return {"status": "order_created", "items_failed": True}
        
    finally:
        conn.close()

Isolation Levels

Isolation levels define how transactions see each other’s data:

Level Dirty Reads Non-Repeatable Reads Phantom Reads
READ UNCOMMITTED Possible Possible Possible
READ COMMITTED Prevented Possible Possible
REPEATABLE READ Prevented Prevented Possible (DB-specific)
SERIALIZABLE Prevented Prevented Prevented

Setting Isolation Levels

-- PostgreSQL
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- MySQL
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

-- SQL Server
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
# In Python with psycopg2
conn = psycopg2.connect(database="bank")

# Set before transaction starts
conn.set_session(isolation_level="SERIALIZABLE")

# Or per-transaction
with conn.cursor() as cursor:
    cursor.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")

Trade-offs

def choose_isolation_level(use_case: str) -> str:
    """
    Choosing the right isolation level depends on your use case.
    """
    levels = {
        # Simple reads, can tolerate some inconsistency
        "analytics": "READ COMMITTED",
        
        # Most business logic
        "business_logic": "REPEATABLE READ",
        
        # Financial transactions, critical data
        "financial": "SERIALIZABLE",
        
        # Simple CRUD, performance critical
        "high_performance": "READ COMMITTED"
    }
    
    return levels.get(use_case, "READ COMMITTED")

Locking Strategies

Row-Level Locking

# Explicit row locking
def transfer_with_lock(from_id: int, to_id: int, amount: float):
    conn = psycopg2.connect(database="bank")
    
    try:
        conn.autocommit = False
        cursor = conn.cursor()
        
        # Lock rows in consistent order to prevent deadlocks
        first_id, second_id = sorted([from_id, to_id])
        
        # FOR UPDATE prevents other transactions from modifying
        cursor.execute(
            "SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
            (first_id,)
        )
        cursor.execute(
            "SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
            (second_id,)
        )
        
        # Now perform transfer
        cursor.execute(
            "UPDATE accounts SET balance = balance - %s WHERE id = %s",
            (amount, from_id)
        )
        cursor.execute(
            "UPDATE accounts SET balance = balance + %s WHERE id = %s",
            (amount, to_id)
        )
        
        conn.commit()
        
    finally:
        conn.close()

Optimistic Locking

# Optimistic locking - assume conflicts are rare
from dataclasses import dataclass
from typing import Optional

@dataclass
class OptimisticLockError(Exception):
    record_id: str
    expected_version: int
    actual_version: int

def update_with_optimistic_lock(
    cursor,
    table: str,
    record_id: str,
    updates: dict,
    expected_version: int
) -> bool:
    """Update only if version matches."""
    
    # Build update query with version check
    set_clauses = [f"{k} = %s" for k in updates.keys()]
    set_clauses.append("version = version + 1")
    
    values = list(updates.values()) + [expected_version, record_id]
    
    query = f"""
        UPDATE {table}
        SET {', '.join(set_clauses)}
        WHERE id = %s AND version = %s
    """
    
    cursor.execute(query, values)
    
    if cursor.rowcount == 0:
        # Either record doesn't exist or version mismatch
        raise OptimisticLockError(
            record_id=record_id,
            expected_version=expected_version,
            actual_version=expected_version + 1
        )
    
    return True

Pessimistic vs Optimistic

# When to use which locking strategy

def choose_locking_strategy():
    return {
        "pessimistic": {
            "use_when": [
                "High contention (many concurrent updates to same records)",
                "Conflicts are expensive",
                "Must have the latest data"
            ],
            "implementation": "SELECT ... FOR UPDATE",
            "pros": "Guaranteed consistency",
            "cons": "Can cause deadlocks, slower"
        },
        
        "optimistic": {
            "use_when": [
                "Low contention (updates to different records)",
                "Conflicts are cheap to handle",
                "Throughput is important"
            ],
            "implementation": "Version/check column",
            "pros": "No locks, high throughput",
            "cons": "Retry on conflict"
        }
    }

Common Pitfalls

1. Long-Running Transactions

# โŒ Bad: Long-running transaction holds locks
def bad_example():
    conn = psycopg2.connect(database="app")
    conn.autocommit = False
    
    # Process millions of rows
    for row in process_millions():  # Takes hours!
        cursor.execute("UPDATE ...")
    
    conn.commit()  # Other transactions blocked the whole time!

# โœ… Good: Batch processing with small transactions
def good_example():
    conn = psycopg2.connect(database="app")
    
    for batch in process_in_batches(1000):
        conn.autocommit = False
        cursor = conn.cursor()
        
        cursor.executemany("UPDATE ...", batch)
        
        conn.commit()  # Short commit, minimal blocking
    
    conn.close()

2. Nested Transactions

# โŒ Bad: Thinking BEGIN creates nested transactions
def bad_nested():
    conn = psycopg2.connect(database="app")
    conn.autocommit = False
    
    cursor = conn.cursor()
    
    cursor.execute("BEGIN")
    cursor.execute("INSERT INTO orders ...")
    
    cursor.execute("BEGIN")  # This doesn't create nested transaction!
    cursor.execute("INSERT INTO order_items ...")
    cursor.execute("ROLLBACK")  # Rollback goes all the way to first BEGIN!
    
    conn.commit()  # Order also rolled back!

3. Not Handling Exceptions

# โŒ Bad: Missing exception handling
def bad_exception_handling():
    conn = psycopg2.connect(database="app")
    cursor = conn.cursor()
    
    cursor.execute("BEGIN")
    try:
        cursor.execute("INSERT INTO orders ...")
        cursor.execute("UPDATE inventory ...")
        conn.commit()
    except:
        # Nothing! Transaction left hanging
        pass
    
    # Connection closes, transaction still open

# โœ… Good: Proper exception handling
def good_exception_handling():
    conn = psycopg2.connect(database="app")
    
    try:
        cursor = conn.cursor()
        cursor.execute("BEGIN")
        
        cursor.execute("INSERT INTO orders ...")
        cursor.execute("UPDATE inventory ...")
        
        conn.commit()
        
    except Exception as e:
        conn.rollback()  # Explicit rollback
        raise
        
    finally:
        conn.close()  # Always close

Distributed Transactions

Two-Phase Commit

class TwoPhaseCommit:
    """Coordinate transactions across multiple databases."""
    
    def __init__(self, databases: list):
        self.databases = databases
        self.prepared = set()
    
    def execute(self, operations: dict):
        """
        Execute distributed transaction using 2PC.
        
        Phase 1: Prepare all databases
        Phase 2: Commit if all prepared successfully
        """
        
        # Phase 1: Prepare
        for db_name, operation in operations.items():
            try:
                db = self.databases[db_name]
                db.prepare(operation)  # Tell DB to prepare
                self.prepared.add(db_name)
            except Exception as e:
                # Phase 1 failed - rollback all
                self._rollback_all()
                raise TwoPCError(f"Prepare failed for {db_name}: {e}")
        
        # Phase 2: Commit
        for db_name in self.prepared:
            try:
                db = self.databases[db_name]
                db.commit_prepared()
            except Exception as e:
                # This is problematic!
                # Need manual intervention
                self._mark_inconsistent(db_name)
                raise TwoPCError(f"Commit failed for {db_name}: {e}")
    
    def _rollback_all(self):
        for db_name in self.prepared:
            try:
                self.databases[db_name].rollback()
            except:
                pass
    
    def _mark_inconsistent(self, db_name: str):
        # Log for manual resolution
        print(f"ALERT: Transaction inconsistent for {db_name}")

Saga Pattern

# Saga pattern for distributed transactions
class Saga:
    """Chain local transactions with compensating actions."""
    
    def __init__(self):
        self.steps = []
        self.completed = []
    
    def add_step(self, forward, compensation):
        """Add step with its compensation."""
        self.steps.append((forward, compensation))
    
    def execute(self, context: dict):
        try:
            for forward, _ in self.steps:
                result = forward(context)
                context.update(result)
                self.completed.append(forward)
                
        except Exception as e:
            # Compensate in reverse order
            for _, compensation in reversed(self.completed):
                try:
                    compensation(context)
                except Exception as comp_error:
                    log_error(f"Compensation failed: {comp_error}")
            
            raise SagaError(f"Saga failed: {e}")

# Example: Order processing saga
def create_order_saga():
    saga = Saga()
    
    # Step 1: Create order
    def create_order(ctx):
        order_id = db.orders.create(
            customer_id=ctx['customer_id'],
            total=ctx['total']
        )
        return {'order_id': order_id}
    
    def cancel_order(ctx):
        if 'order_id' in ctx:
            db.orders.cancel(ctx['order_id'])
    
    saga.add_step(create_order, cancel_order)
    
    # Step 2: Reserve inventory
    def reserve_inventory(ctx):
        db.inventory.reserve(
            items=ctx['items'],
            order_id=ctx['order_id']
        )
    
    def release_inventory(ctx):
        db.inventory.release(order_id=ctx['order_id'])
    
    saga.add_step(reserve_inventory, release_inventory)
    
    # Step 3: Process payment
    def process_payment(ctx):
        payment_id = db.payments.charge(
            customer_id=ctx['customer_id'],
            amount=ctx['total']
        )
        return {'payment_id': payment_id}
    
    def refund_payment(ctx):
        if 'payment_id' in ctx:
            db.payments.refund(ctx['payment_id'])
    
    saga.add_step(process_payment, refund_payment)
    
    return saga

Best Practices Summary

  1. Keep Transactions Short

    • Do minimal work inside transactions
    • Avoid user interaction during transactions
    • Use appropriate batch sizes
  2. Choose Right Isolation Level

    • Start with READ COMMITTED
    • Escal only when necessary
    • Understand the performance impact
  3. Handle Errors Properly

    • Always rollback on error
    • Use finally blocks to ensure cleanup
    • Don’t swallow exceptions
  4. Prevent Deadlocks

    • Access tables in consistent order
    • Keep transactions short
    • Use appropriate lock scope
  5. Test Failure Scenarios

    • Simulate network failures
    • Test rollback behavior
    • Verify data consistency after errors

Conclusion

Database transactions are essential for maintaining data integrity in any application. Understanding ACID properties, isolation levels, and locking strategies helps you build reliable systems that handle concurrent access correctly.

Key takeaways:

  • Use transactions for related operations that must succeed together
  • Choose isolation levels based on your consistency needs
  • Keep transactions short to minimize locking
  • Use savepoints for complex multi-step operations
  • Consider Saga pattern for distributed scenarios

Resources

Comments