Skip to main content
โšก Calmops

Database Transaction Management: ACID, Isolation Levels, and Patterns

Introduction

Database transactions are the foundation of reliable data operations. Understanding ACID properties, isolation levels, and concurrency control is essential for building robust applications.

This guide covers transaction management: from basic concepts to advanced patterns for high-concurrency systems.


ACID Properties

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    ACID PROPERTIES                                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                      โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”            โ”‚
โ”‚   โ”‚    A    โ”‚  โ”‚    C    โ”‚  โ”‚    I    โ”‚  โ”‚    D    โ”‚            โ”‚
โ”‚   โ”‚ Atomicityโ”‚  โ”‚Consistencyโ”‚  โ”‚Isolation โ”‚  โ”‚ Durabilityโ”‚            โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ”‚
โ”‚                                                                      โ”‚
โ”‚   Atomicity: All or nothing                                           โ”‚
โ”‚   Consistency: Valid state to valid state                            โ”‚
โ”‚   Isolation: Concurrent transactions appear serial                   โ”‚
โ”‚   Durability: Committed data survives crashes                        โ”‚
โ”‚                                                                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Isolation Levels

Understanding Levels

Level Dirty Reads Non-Repeatable Reads Phantom Reads
READ UNCOMMITTED Possible Possible Possible
READ COMMITTED โŒ Possible Possible
REPEATABLE READ โŒ โŒ Possible
SERIALIZABLE โŒ โŒ โŒ

Implementation

# PostgreSQL isolation levels
import psycopg2
from psycopg2 import sql

class TransactionManager:
    
    def __init__(self, connection):
        self.conn = connection
    
    def set_isolation_level(self, level: str):
        """Set transaction isolation level"""
        cursor = self.conn.cursor()
        cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
    
    async def read_committed(self, query: str):
        """READ COMMITTED - default in PostgreSQL"""
        self.set_isolation_level("READ COMMITTED")
        return await self.execute(query)
    
    async def repeatable_read(self, query: str):
        """REPEATABLE READ - prevents non-repeatable reads"""
        self.set_isolation_level("REPEATABLE READ")
        return await self.execute(query)
    
    async def serializable(self, query: str):
        """SERIALIZABLE - highest isolation"""
        self.set_isolation_level("SERIALIZABLE")
        return await self.execute(query)


# Example: Serializable transaction
async def transfer_funds_serializable(from_account: str, to_account: str, amount: float):
    conn = await get_connection()
    
    try:
        # Set serializable isolation
        await conn.set_isolation_level("SERIALIZABLE")
        
        async with conn.transaction():
            # Deduct from source
            await conn.execute("""
                UPDATE accounts 
                SET balance = balance - %s 
                WHERE id = %s AND balance >= %s
            """, (amount, from_account, amount))
            
            # Add to destination
            await conn.execute("""
                UPDATE accounts 
                SET balance = balance + %s 
                WHERE id = %s
            """, (amount, to_account))
        
        return True
        
    except Exception as e:
        await conn.rollback()
        raise TransferError(f"Transfer failed: {e}")

Concurrency Control

Optimistic Locking

# Optimistic locking with version field
class OptimisticLock:
    """Optimistic concurrency control"""
    
    async def update_with_lock(self, conn, table: str, data: dict, version: int):
        """Update only if version matches"""
        
        # Build update query
        query = f"""
            UPDATE {table}
            SET {', '.join(f'{k} = %s' for k in data.keys())},
                version = version + 1
            WHERE id = %s AND version = %s
        """
        
        values = list(data.values()) + [data['id'], version]
        
        cursor = await conn.execute(query, values)
        
        if cursor.rowcount == 0:
            raise ConcurrentModificationError("Record was modified by another transaction")
        
        return True


# Usage with SQLAlchemy
class UserRepository:
    def __init__(self, session):
        self.session = session
    
    async def update_user(self, user_id: int, updates: dict):
        user = await self.session.get(User, user_id)
        
        # Add version check
        updates['version'] = user.version + 1
        
        result = await self.session.execute(
            update(User)
            .where(User.id == user_id, User.version == user.version)
            .values(**updates)
        )
        
        if result.rowcount == 0:
            raise ConcurrentModificationError("User was modified")
        
        await self.session.commit()

Pessimistic Locking

# Pessimistic locking with SELECT FOR UPDATE
class PessimisticLock:
    """Pessimistic concurrency control"""
    
    async def transfer_with_lock(self, conn, from_id: str, to_id: str, amount: float):
        """Lock rows during transaction"""
        
        async with conn.transaction():
            # Lock source account (blocks other transactions)
            source = await conn.fetchrow(
                "SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
                from_id
            )
            
            # Check balance
            if source['balance'] < amount:
                raise InsufficientFundsError()
            
            # Lock destination account
            dest = await conn.fetchrow(
                "SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
                to_id
            )
            
            # Perform transfer
            await conn.execute(
                "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
                amount, from_id
            )
            await conn.execute(
                "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
                amount, to_id
            )
        
        return True

Transaction Patterns

Saga Pattern

# Saga pattern for distributed transactions
class Saga:
    """Saga pattern for managing distributed transactions"""
    
    def __init__(self, steps: list):
        self.steps = steps
        self.completed_steps = []
    
    async def execute(self) -> bool:
        try:
            for step in self.steps:
                result = await step.execute()
                self.completed_steps.append(step)
            
            return True
            
        except Exception as e:
            # Compensate in reverse order
            await self.compensate()
            raise SagaExecutionError(f"Saga failed: {e}")
    
    async def compensate(self):
        """Rollback completed steps"""
        for step in reversed(self.completed_steps):
            try:
                await step.compensate()
            except Exception as e:
                # Log but continue compensation
                log.error(f"Compensation failed for {step}: {e}")


# Example: Order processing saga
class OrderSaga:
    @staticmethod
    async def create_order(order_data: dict):
        steps = [
            ReserveInventoryStep(order_data),
            ProcessPaymentStep(order_data),
            CreateOrderStep(order_data),
            NotifyCustomerStep(order_data)
        ]
        
        saga = Saga(steps)
        return await saga.execute()


class ReserveInventoryStep(SagaStep):
    async def execute(self):
        # Reserve inventory
        await inventory_client.reserve(
            items=self.data['items'],
            order_id=self.data['order_id']
        )
    
    async def compensate(self):
        # Release inventory
        await inventory_client.release(
            order_id=self.data['order_id']
        )

Outbox Pattern

# Outbox pattern for reliable messaging
class OutboxPattern:
    """Reliable event publishing via database outbox"""
    
    async def publish_event(self, conn, event_type: str, payload: dict):
        """Write to outbox table within transaction"""
        
        async with conn.transaction():
            # 1. Update business data
            await conn.execute(
                "UPDATE orders SET status = 'PROCESSING' WHERE id = $1",
                payload['order_id']
            )
            
            # 2. Write to outbox (atomic!)
            await conn.execute(
                """
                INSERT INTO outbox (event_type, payload, created_at)
                VALUES ($1, $2, NOW())
                """,
                event_type, json.dumps(payload)
            )
        
        return True


# Outbox processor
class OutboxProcessor:
    """Process outbox and publish events"""
    
    async def process_outbox(self, batch_size: int = 100):
        conn = await get_connection()
        
        try:
            # Get unprocessed events
            events = await conn.fetch("""
                SELECT * FROM outbox 
                WHERE processed = false 
                ORDER BY created_at 
                LIMIT $1
            """, batch_size)
            
            for event in events:
                try:
                    # Publish to message broker
                    await self.publish(event)
                    
                    # Mark as processed
                    await conn.execute(
                        "UPDATE outbox SET processed = true WHERE id = $1",
                        event['id']
                    )
                    
                except Exception as e:
                    # Increment retry count
                    await conn.execute("""
                        UPDATE outbox 
                        SET retry_count = retry_count + 1,
                            last_error = $2
                        WHERE id = $1
                    """, event['id'], str(e))
                    
        finally:
            await conn.close()

Handling Deadlocks

class DeadlockHandler:
    """Strategies for handling deadlocks"""
    
    @staticmethod
    async def execute_with_retry(func, max_retries: int = 3):
        """Retry on deadlock with exponential backoff"""
        
        for attempt in range(max_retries):
            try:
                return await func()
            except DeadlockError:
                if attempt == max_retries - 1:
                    raise
                
                # Exponential backoff
                wait_time = (2 ** attempt) * 0.1
                await asyncio.sleep(wait_time)
                
        raise MaxRetriesExceededError()


# Prevent deadlocks with consistent lock ordering
async def safe_transfer(conn, from_id: str, to_id: str, amount: float):
    """Always acquire locks in same order"""
    
    # Sort IDs to prevent deadlock
    first_id, second_id = sorted([from_id, to_id])
    
    async with conn.transaction():
        # Always lock in same order
        await conn.execute(
            "SELECT * FROM accounts WHERE id = $1 FOR UPDATE",
            first_id
        )
        await conn.execute(
            "SELECT * FROM accounts WHERE id = $1 FOR UPDATE", 
            second_id
        )
        
        # Perform transfer
        # ...

Best Practices

Good: Short Transactions

# Good: Keep transactions short
async def good_example(conn):
    # Don't do this:
    async with conn.transaction():
        data = await fetch_data()  # Could be slow
        await process(data)  # More processing
        await more_processing()  # Even more
        # Transaction held open too long!
    
    # Do this instead:
    data = await fetch_data()  # Outside transaction
    processed = await process(data)  # Outside transaction
    
    async with conn.transaction():
        await save_results(processed)  # Quick write

Bad: Long-Running Transactions

# Bad: Don't hold locks during user interaction
async def bad_example(conn, user_request):
    async with conn.transaction():
        # User might go to lunch here!
        await handle_request(user_request)
        # Other transactions blocked...

Good: Explicit vs Implicit

# Explicit transaction control
async def explicit_transaction(conn):
    try:
        await conn.execute("BEGIN")
        
        await conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 'A'")
        await conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 'B'")
        
        await conn.execute("COMMIT")
    except:
        await conn.execute("ROLLBACK")
        raise

Conclusion

Transaction management is critical:

  • Understand ACID: Foundation of data integrity
  • Choose isolation level: Balance consistency vs performance
  • Use optimistic locking: For low contention
  • Use pessimistic locking: For high contention
  • Handle deadlocks: Plan for failure
  • Saga pattern: For distributed systems

Comments