Skip to main content

Event Sourcing: Building Systems Around State Changes

Created: March 19, 2026 Larry Qu 14 min read

Introduction

Event sourcing is an architectural pattern where all changes to application state are stored as a sequence of immutable events. Instead of storing the current state, you store the history of state changes. The current state is derived by replaying these events.

This approach provides several powerful capabilities:

Complete Audit Trail: Every state change is recorded with full context Temporal Queries: Query the state at any point in time Event Replay: Reconstruct state by replaying events Debugging: Understand exactly how the system reached its current state Event-Driven Architecture: Events can trigger side effects and integrations Compliance: Meet regulatory requirements for audit and traceability

Event sourcing is particularly valuable in domains like finance, healthcare, e-commerce, and any system where understanding the history of changes is critical.

This guide covers event store implementation, aggregate reconstruction, snapshots for performance, CQRS integration, event versioning, and production patterns.

Core Concepts

Event: Immutable record of something that happened (past tense: OrderPlaced, UserRegistered) Event Store: Database optimized for appending and reading events Aggregate: Domain object whose state is derived from events Event Stream: Sequence of events for a specific aggregate Snapshot: Point-in-time state capture for performance optimization Projection: Read model built from events (CQRS pattern) Event Version: Schema version for event evolution

Event Store Implementation

An event store is an append-only database optimized for storing and retrieving events. It provides strong consistency guarantees and efficient event stream reads.

Basic Event Store

from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Any, Optional
import json
import uuid
import sqlite3

@dataclass
class Event:
    """Immutable event record."""
    id: str
    aggregate_id: str
    aggregate_type: str
    event_type: str
    data: Dict[str, Any]
    metadata: Dict[str, Any]
    timestamp: datetime
    version: int
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            **asdict(self),
            'timestamp': self.timestamp.isoformat()
        }

class EventStore:
    """Append-only event store with optimistic concurrency control."""
    
    def __init__(self, connection_string: str):
        self.conn = sqlite3.connect(connection_string)
        self.conn.row_factory = sqlite3.Row
        self._create_schema()
    
    def _create_schema(self):
        """Create event store schema."""
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS events (
                id TEXT PRIMARY KEY,
                aggregate_id TEXT NOT NULL,
                aggregate_type TEXT NOT NULL,
                event_type TEXT NOT NULL,
                data TEXT NOT NULL,
                metadata TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                version INTEGER NOT NULL,
                UNIQUE(aggregate_id, version)
            )
        """)
        
        # Index for efficient stream reads
        self.conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_aggregate_stream
            ON events(aggregate_id, version)
        """)
        
        # Index for event type queries
        self.conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_event_type
            ON events(event_type, timestamp)
        """)
        
        self.conn.commit()
    
    def append(self, aggregate_id: str, aggregate_type: str, 
               events: List[Event], expected_version: int) -> None:
        """
        Append events to stream with optimistic concurrency control.
        
        Args:
            aggregate_id: Aggregate identifier
            aggregate_type: Type of aggregate
            events: Events to append
            expected_version: Expected current version (for concurrency)
        
        Raises:
            ConcurrencyError: If expected_version doesn't match actual version
        """
        cursor = self.conn.cursor()
        
        try:
            # Check current version
            cursor.execute(
                "SELECT MAX(version) as current_version FROM events WHERE aggregate_id = ?",
                (aggregate_id,)
            )
            row = cursor.fetchone()
            current_version = row['current_version'] if row['current_version'] else 0
            
            if current_version != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, but current is {current_version}"
                )
            
            # Append events
            for i, event in enumerate(events):
                version = expected_version + i + 1
                cursor.execute("""
                    INSERT INTO events 
                    (id, aggregate_id, aggregate_type, event_type, data, metadata, timestamp, version)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """, (
                    event.id,
                    aggregate_id,
                    aggregate_type,
                    event.event_type,
                    json.dumps(event.data),
                    json.dumps(event.metadata),
                    event.timestamp.isoformat(),
                    version
                ))
            
            self.conn.commit()
            
        except sqlite3.IntegrityError as e:
            self.conn.rollback()
            raise ConcurrencyError(f"Concurrency conflict: {e}")
    
    def get_events(self, aggregate_id: str, from_version: int = 0, 
                   to_version: Optional[int] = None) -> List[Event]:
        """
        Get events for an aggregate.
        
        Args:
            aggregate_id: Aggregate identifier
            from_version: Start version (exclusive)
            to_version: End version (inclusive), None for all
        
        Returns:
            List of events ordered by version
        """
        if to_version:
            query = """
                SELECT * FROM events
                WHERE aggregate_id = ? AND version > ? AND version <= ?
                ORDER BY version
            """
            params = (aggregate_id, from_version, to_version)
        else:
            query = """
                SELECT * FROM events
                WHERE aggregate_id = ? AND version > ?
                ORDER BY version
            """
            params = (aggregate_id, from_version)
        
        cursor = self.conn.execute(query, params)
        return [self._row_to_event(row) for row in cursor.fetchall()]
    
    def get_all_events(self, from_timestamp: Optional[datetime] = None,
                       event_types: Optional[List[str]] = None) -> List[Event]:
        """Get all events, optionally filtered by timestamp and type."""
        query = "SELECT * FROM events WHERE 1=1"
        params = []
        
        if from_timestamp:
            query += " AND timestamp > ?"
            params.append(from_timestamp.isoformat())
        
        if event_types:
            placeholders = ','.join('?' * len(event_types))
            query += f" AND event_type IN ({placeholders})"
            params.extend(event_types)
        
        query += " ORDER BY timestamp, version"
        
        cursor = self.conn.execute(query, params)
        return [self._row_to_event(row) for row in cursor.fetchall()]
    
    def get_current_version(self, aggregate_id: str) -> int:
        """Get current version of aggregate."""
        cursor = self.conn.execute(
            "SELECT MAX(version) as version FROM events WHERE aggregate_id = ?",
            (aggregate_id,)
        )
        row = cursor.fetchone()
        return row['version'] if row['version'] else 0
    
    def _row_to_event(self, row) -> Event:
        """Convert database row to Event."""
        return Event(
            id=row['id'],
            aggregate_id=row['aggregate_id'],
            aggregate_type=row['aggregate_type'],
            event_type=row['event_type'],
            data=json.loads(row['data']),
            metadata=json.loads(row['metadata']),
            timestamp=datetime.fromisoformat(row['timestamp']),
            version=row['version']
        )

class ConcurrencyError(Exception):
    """Raised when optimistic concurrency check fails."""
    pass

PostgreSQL Event Store

import psycopg2
from psycopg2.extras import RealDictCursor

class PostgreSQLEventStore:
    """Production-ready event store using PostgreSQL."""
    
    def __init__(self, connection_string: str):
        self.conn = psycopg2.connect(connection_string)
        self._create_schema()
    
    def _create_schema(self):
        """Create event store schema with partitioning."""
        with self.conn.cursor() as cursor:
            # Events table with partitioning by aggregate_type
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS events (
                    id UUID PRIMARY KEY,
                    aggregate_id UUID NOT NULL,
                    aggregate_type VARCHAR(100) NOT NULL,
                    event_type VARCHAR(100) NOT NULL,
                    data JSONB NOT NULL,
                    metadata JSONB NOT NULL,
                    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
                    version INTEGER NOT NULL,
                    CONSTRAINT unique_aggregate_version UNIQUE(aggregate_id, version)
                ) PARTITION BY LIST (aggregate_type)
            """)
            
            # Indexes
            cursor.execute("""
                CREATE INDEX IF NOT EXISTS idx_events_aggregate_stream
                ON events(aggregate_id, version)
            """)
            
            cursor.execute("""
                CREATE INDEX IF NOT EXISTS idx_events_type_timestamp
                ON events(event_type, timestamp)
            """)
            
            cursor.execute("""
                CREATE INDEX IF NOT EXISTS idx_events_data_gin
                ON events USING GIN (data)
            """)
            
            self.conn.commit()
    
    def append(self, aggregate_id: str, aggregate_type: str,
               events: List[Event], expected_version: int) -> None:
        """Append events with optimistic locking."""
        with self.conn.cursor() as cursor:
            # Lock aggregate row
            cursor.execute("""
                SELECT MAX(version) as current_version
                FROM events
                WHERE aggregate_id = %s
                FOR UPDATE
            """, (aggregate_id,))
            
            row = cursor.fetchone()
            current_version = row[0] if row[0] else 0
            
            if current_version != expected_version:
                self.conn.rollback()
                raise ConcurrencyError(
                    f"Expected {expected_version}, got {current_version}"
                )
            
            # Insert events
            for i, event in enumerate(events):
                version = expected_version + i + 1
                cursor.execute("""
                    INSERT INTO events
                    (id, aggregate_id, aggregate_type, event_type, data, metadata, timestamp, version)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                """, (
                    event.id,
                    aggregate_id,
                    aggregate_type,
                    event.event_type,
                    json.dumps(event.data),
                    json.dumps(event.metadata),
                    event.timestamp,
                    version
                ))
            
            self.conn.commit()

Aggregate Reconstruction

class User:
    """Aggregate reconstructed from events."""
    
    def __init__(self, user_id: str):
        self.id = user_id
        self.email = None
        self.name = None
        self.is_active = False
        self.version = 0
        self._events = []
    
    def load_from_events(self, events: List[Event]):
        for event in events:
            self._apply(event)
            self.version = event.version
    
    def _apply(self, event: Event):
        if event.event_type == "user.created":
            self.email = event.data["email"]
            self.name = event.data["name"]
            self.is_active = True
        elif event.event_type == "user.email_changed":
            self.email = event.data["new_email"]
        elif event.event_type == "user.deactivated":
            self.is_active = False
    
    def change_email(self, new_email: str):
        if not new_email or "@" not in new_email:
            raise ValueError("Invalid email")
        
        event = Event(
            id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type="user.email_changed",
            data={"old_email": self.email, "new_email": new_email},
            timestamp=datetime.utcnow(),
            version=self.version + 1
        )
        self._apply(event)
        self._events.append(event)
        self.version += 1
    
    def get_uncommitted_events(self) -> List[Event]:
        events = self._events.copy()
        self._events = []
        return events

Snapshots

class SnapshotStore:
    """Store aggregate snapshots for performance."""
    
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.snapshots = {}
    
    def save_snapshot(self, aggregate_id: str, snapshot: dict, version: int):
        self.snapshots[aggregate_id] = {
            "data": snapshot,
            "version": version
        }
    
    def get_aggregate(self, aggregate_id: str, factory) -> object:
        # Check for snapshot
        if aggregate_id in self.snapshots:
            snapshot = self.snapshots[aggregate_id]
            aggregate = factory(aggregate_id)
            aggregate.load_from_snapshot(snapshot["data"])
            
            # Get events after snapshot
            events = self.event_store.get_events(
                aggregate_id, snapshot["version"]
            )
            aggregate.load_from_events(events)
            return aggregate
        
        # No snapshot, load all events
        events = self.event_store.get_events(aggregate_id)
        aggregate = factory(aggregate_id)
        aggregate.load_from_events(events)
        return aggregate

Conclusion

Event sourcing provides complete audit trails and temporal queries. Store all events, reconstruct aggregates by replaying, use snapshots for performance. Combine with CQRS for read optimization.

Resources

  • “Event Sourcing” by Martin Fowler
  • EventStoreDB Documentation

Aggregate Reconstruction with Event Sourcing

Aggregates are reconstructed by replaying their event stream. Each event is applied to update the aggregate’s state.

Complete Aggregate Example

from typing import List, Optional
from decimal import Decimal
import uuid

class BankAccount:
    """Bank account aggregate reconstructed from events."""
    
    def __init__(self, account_id: str):
        self.id = account_id
        self.owner_id: Optional[str] = None
        self.balance: Decimal = Decimal('0')
        self.is_active: bool = False
        self.overdraft_limit: Decimal = Decimal('0')
        self.version: int = 0
        self._uncommitted_events: List[Event] = []
    
    @classmethod
    def create(cls, account_id: str, owner_id: str, initial_deposit: Decimal):
        """Factory method to create new account."""
        account = cls(account_id)
        event = Event(
            id=str(uuid.uuid4()),
            aggregate_id=account_id,
            aggregate_type='BankAccount',
            event_type='AccountOpened',
            data={
                'owner_id': owner_id,
                'initial_deposit': str(initial_deposit)
            },
            metadata={'user_id': owner_id},
            timestamp=datetime.utcnow(),
            version=1
        )
        account._apply(event)
        account._uncommitted_events.append(event)
        return account
    
    def load_from_history(self, events: List[Event]):
        """Reconstruct aggregate from event history."""
        for event in events:
            self._apply(event)
            self.version = event.version
    
    def _apply(self, event: Event):
        """Apply event to update state."""
        event_type = event.event_type
        
        if event_type == 'AccountOpened':
            self.owner_id = event.data['owner_id']
            self.balance = Decimal(event.data['initial_deposit'])
            self.is_active = True
        
        elif event_type == 'MoneyDeposited':
            self.balance += Decimal(event.data['amount'])
        
        elif event_type == 'MoneyWithdrawn':
            self.balance -= Decimal(event.data['amount'])
        
        elif event_type == 'OverdraftLimitSet':
            self.overdraft_limit = Decimal(event.data['limit'])
        
        elif event_type == 'AccountClosed':
            self.is_active = False
    
    def deposit(self, amount: Decimal, description: str):
        """Deposit money into account."""
        if not self.is_active:
            raise ValueError("Account is closed")
        
        if amount <= 0:
            raise ValueError("Amount must be positive")
        
        event = self._create_event('MoneyDeposited', {
            'amount': str(amount),
            'description': description,
            'balance_after': str(self.balance + amount)
        })
        
        self._apply(event)
        self._uncommitted_events.append(event)
    
    def withdraw(self, amount: Decimal, description: str):
        """Withdraw money from account."""
        if not self.is_active:
            raise ValueError("Account is closed")
        
        if amount <= 0:
            raise ValueError("Amount must be positive")
        
        if self.balance - amount < -self.overdraft_limit:
            raise ValueError("Insufficient funds")
        
        event = self._create_event('MoneyWithdrawn', {
            'amount': str(amount),
            'description': description,
            'balance_after': str(self.balance - amount)
        })
        
        self._apply(event)
        self._uncommitted_events.append(event)
    
    def set_overdraft_limit(self, limit: Decimal):
        """Set overdraft limit."""
        if not self.is_active:
            raise ValueError("Account is closed")
        
        if limit < 0:
            raise ValueError("Overdraft limit cannot be negative")
        
        event = self._create_event('OverdraftLimitSet', {
            'limit': str(limit),
            'previous_limit': str(self.overdraft_limit)
        })
        
        self._apply(event)
        self._uncommitted_events.append(event)
    
    def close(self):
        """Close account."""
        if not self.is_active:
            raise ValueError("Account already closed")
        
        if self.balance != 0:
            raise ValueError("Cannot close account with non-zero balance")
        
        event = self._create_event('AccountClosed', {
            'final_balance': str(self.balance)
        })
        
        self._apply(event)
        self._uncommitted_events.append(event)
    
    def _create_event(self, event_type: str, data: dict) -> Event:
        """Create new event."""
        return Event(
            id=str(uuid.uuid4()),
            aggregate_id=self.id,
            aggregate_type='BankAccount',
            event_type=event_type,
            data=data,
            metadata={'timestamp': datetime.utcnow().isoformat()},
            timestamp=datetime.utcnow(),
            version=self.version + 1
        )
    
    def get_uncommitted_events(self) -> List[Event]:
        """Get events that haven't been persisted."""
        events = self._uncommitted_events.copy()
        self._uncommitted_events.clear()
        return events

Repository Pattern for Event-Sourced Aggregates

class EventSourcedRepository:
    """Repository for event-sourced aggregates."""
    
    def __init__(self, event_store: EventStore, aggregate_factory):
        self.event_store = event_store
        self.aggregate_factory = aggregate_factory
    
    def get(self, aggregate_id: str):
        """Load aggregate from event store."""
        events = self.event_store.get_events(aggregate_id)
        
        if not events:
            return None
        
        aggregate = self.aggregate_factory(aggregate_id)
        aggregate.load_from_history(events)
        return aggregate
    
    def save(self, aggregate):
        """Save aggregate by persisting uncommitted events."""
        uncommitted_events = aggregate.get_uncommitted_events()
        
        if not uncommitted_events:
            return
        
        self.event_store.append(
            aggregate_id=aggregate.id,
            aggregate_type=aggregate.__class__.__name__,
            events=uncommitted_events,
            expected_version=aggregate.version - len(uncommitted_events)
        )

# Usage
event_store = EventStore('events.db')
account_repo = EventSourcedRepository(event_store, BankAccount)

# Create new account
account = BankAccount.create('acc-123', 'user-456', Decimal('1000'))
account_repo.save(account)

# Load and modify
account = account_repo.get('acc-123')
account.deposit(Decimal('500'), 'Salary')
account.withdraw(Decimal('200'), 'Rent')
account_repo.save(account)

Snapshots for Performance

As event streams grow, replaying thousands of events becomes slow. Snapshots capture aggregate state at a point in time, allowing faster reconstruction.

Snapshot Implementation

@dataclass
class Snapshot:
    """Aggregate state snapshot."""
    aggregate_id: str
    aggregate_type: str
    version: int
    state: Dict[str, Any]
    timestamp: datetime

class SnapshotStore:
    """Store and retrieve snapshots."""
    
    def __init__(self, connection_string: str):
        self.conn = sqlite3.connect(connection_string)
        self._create_schema()
    
    def _create_schema(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS snapshots (
                aggregate_id TEXT PRIMARY KEY,
                aggregate_type TEXT NOT NULL,
                version INTEGER NOT NULL,
                state TEXT NOT NULL,
                timestamp TEXT NOT NULL
            )
        """)
        self.conn.commit()
    
    def save_snapshot(self, snapshot: Snapshot):
        """Save or update snapshot."""
        self.conn.execute("""
            INSERT OR REPLACE INTO snapshots
            (aggregate_id, aggregate_type, version, state, timestamp)
            VALUES (?, ?, ?, ?, ?)
        """, (
            snapshot.aggregate_id,
            snapshot.aggregate_type,
            snapshot.version,
            json.dumps(snapshot.state),
            snapshot.timestamp.isoformat()
        ))
        self.conn.commit()
    
    def get_snapshot(self, aggregate_id: str) -> Optional[Snapshot]:
        """Get latest snapshot for aggregate."""
        cursor = self.conn.execute(
            "SELECT * FROM snapshots WHERE aggregate_id = ?",
            (aggregate_id,)
        )
        row = cursor.fetchone()
        
        if not row:
            return None
        
        return Snapshot(
            aggregate_id=row[0],
            aggregate_type=row[1],
            version=row[2],
            state=json.loads(row[3]),
            timestamp=datetime.fromisoformat(row[4])
        )

class SnapshotRepository:
    """Repository with snapshot support."""
    
    def __init__(self, event_store: EventStore, snapshot_store: SnapshotStore,
                 aggregate_factory, snapshot_frequency: int = 100):
        self.event_store = event_store
        self.snapshot_store = snapshot_store
        self.aggregate_factory = aggregate_factory
        self.snapshot_frequency = snapshot_frequency
    
    def get(self, aggregate_id: str):
        """Load aggregate using snapshot + events."""
        # Try to load from snapshot
        snapshot = self.snapshot_store.get_snapshot(aggregate_id)
        
        if snapshot:
            # Load from snapshot
            aggregate = self.aggregate_factory(aggregate_id)
            aggregate.load_from_snapshot(snapshot.state)
            aggregate.version = snapshot.version
            
            # Load events after snapshot
            events = self.event_store.get_events(aggregate_id, snapshot.version)
            aggregate.load_from_history(events)
        else:
            # Load all events
            events = self.event_store.get_events(aggregate_id)
            if not events:
                return None
            
            aggregate = self.aggregate_factory(aggregate_id)
            aggregate.load_from_history(events)
        
        return aggregate
    
    def save(self, aggregate):
        """Save aggregate and create snapshot if needed."""
        uncommitted_events = aggregate.get_uncommitted_events()
        
        if not uncommitted_events:
            return
        
        # Save events
        self.event_store.append(
            aggregate_id=aggregate.id,
            aggregate_type=aggregate.__class__.__name__,
            events=uncommitted_events,
            expected_version=aggregate.version - len(uncommitted_events)
        )
        
        # Create snapshot if threshold reached
        if aggregate.version % self.snapshot_frequency == 0:
            snapshot = Snapshot(
                aggregate_id=aggregate.id,
                aggregate_type=aggregate.__class__.__name__,
                version=aggregate.version,
                state=aggregate.to_snapshot(),
                timestamp=datetime.utcnow()
            )
            self.snapshot_store.save_snapshot(snapshot)

# Add snapshot support to aggregate
class BankAccount:
    # ... existing code ...
    
    def to_snapshot(self) -> Dict[str, Any]:
        """Serialize state for snapshot."""
        return {
            'owner_id': self.owner_id,
            'balance': str(self.balance),
            'is_active': self.is_active,
            'overdraft_limit': str(self.overdraft_limit)
        }
    
    def load_from_snapshot(self, state: Dict[str, Any]):
        """Restore state from snapshot."""
        self.owner_id = state['owner_id']
        self.balance = Decimal(state['balance'])
        self.is_active = state['is_active']
        self.overdraft_limit = Decimal(state['overdraft_limit'])

CQRS: Command Query Responsibility Segregation

CQRS separates write models (commands) from read models (queries). Event sourcing naturally fits CQRS—events update write models and project into read models.

CQRS with Projections

class AccountProjection:
    """Read model for account queries."""
    
    def __init__(self, connection_string: str):
        self.conn = sqlite3.connect(connection_string)
        self._create_schema()
    
    def _create_schema(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS account_summary (
                account_id TEXT PRIMARY KEY,
                owner_id TEXT NOT NULL,
                balance TEXT NOT NULL,
                is_active BOOLEAN NOT NULL,
                last_transaction_date TEXT,
                transaction_count INTEGER DEFAULT 0
            )
        """)
        self.conn.commit()
    
    def handle_event(self, event: Event):
        """Update read model based on event."""
        if event.event_type == 'AccountOpened':
            self.conn.execute("""
                INSERT INTO account_summary
                (account_id, owner_id, balance, is_active, transaction_count)
                VALUES (?, ?, ?, ?, ?)
            """, (
                event.aggregate_id,
                event.data['owner_id'],
                event.data['initial_deposit'],
                True,
                1
            ))
        
        elif event.event_type == 'MoneyDeposited':
            self.conn.execute("""
                UPDATE account_summary
                SET balance = ?, 
                    last_transaction_date = ?,
                    transaction_count = transaction_count + 1
                WHERE account_id = ?
            """, (
                event.data['balance_after'],
                event.timestamp.isoformat(),
                event.aggregate_id
            ))
        
        elif event.event_type == 'MoneyWithdrawn':
            self.conn.execute("""
                UPDATE account_summary
                SET balance = ?,
                    last_transaction_date = ?,
                    transaction_count = transaction_count + 1
                WHERE account_id = ?
            """, (
                event.data['balance_after'],
                event.timestamp.isoformat(),
                event.aggregate_id
            ))
        
        elif event.event_type == 'AccountClosed':
            self.conn.execute("""
                UPDATE account_summary
                SET is_active = ?
                WHERE account_id = ?
            """, (False, event.aggregate_id))
        
        self.conn.commit()
    
    def get_account_summary(self, account_id: str) -> Optional[Dict]:
        """Query read model."""
        cursor = self.conn.execute(
            "SELECT * FROM account_summary WHERE account_id = ?",
            (account_id,)
        )
        row = cursor.fetchone()
        
        if not row:
            return None
        
        return {
            'account_id': row[0],
            'owner_id': row[1],
            'balance': row[2],
            'is_active': bool(row[3]),
            'last_transaction_date': row[4],
            'transaction_count': row[5]
        }
    
    def get_accounts_by_owner(self, owner_id: str) -> List[Dict]:
        """Query accounts by owner."""
        cursor = self.conn.execute(
            "SELECT * FROM account_summary WHERE owner_id = ?",
            (owner_id,)
        )
        return [dict(row) for row in cursor.fetchall()]

Event Versioning and Schema Evolution

Events are immutable, but schemas evolve. Handle this with event versioning and upcasting.

Event Versioning Pattern

@dataclass
class VersionedEvent(Event):
    """Event with schema version."""
    schema_version: int = 1

class EventUpcaster:
    """Convert old event versions to current schema."""
    
    def upcast(self, event: Event) -> Event:
        """Upcast event to latest version."""
        schema_version = event.metadata.get('schema_version', 1)
        
        if event.event_type == 'MoneyDeposited':
            if schema_version == 1:
                # V1: Only had amount
                # V2: Added description and category
                event.data['description'] = event.data.get('description', 'Deposit')
                event.data['category'] = 'general'
                event.metadata['schema_version'] = 2
        
        return event

# Usage in repository
class EventSourcedRepository:
    def __init__(self, event_store, aggregate_factory, upcaster=None):
        self.event_store = event_store
        self.aggregate_factory = aggregate_factory
        self.upcaster = upcaster or EventUpcaster()
    
    def get(self, aggregate_id: str):
        events = self.event_store.get_events(aggregate_id)
        
        if not events:
            return None
        
        # Upcast events to current schema
        events = [self.upcaster.upcast(e) for e in events]
        
        aggregate = self.aggregate_factory(aggregate_id)
        aggregate.load_from_history(events)
        return aggregate

Best Practices

  1. Events are immutable — never modify or delete events
  2. Events are past tense — OrderPlaced, not PlaceOrder
  3. Events contain all data — no foreign key lookups needed
  4. Use optimistic concurrency — prevent lost updates
  5. Snapshot strategically — every 100-1000 events
  6. Version events — plan for schema evolution
  7. Keep aggregates small — large event streams are slow
  8. Use CQRS for queries — don’t query event store directly
  9. Handle idempotency — same command twice = same result
  10. Monitor event store size — archive old events if needed

When to Use Event Sourcing

Use event sourcing when:

  • Complete audit trail is required (finance, healthcare, legal)
  • Temporal queries are needed (state at any point in time)
  • Event-driven architecture is beneficial
  • Domain has complex business rules
  • Debugging requires understanding state changes

Don’t use event sourcing when:

  • Simple CRUD operations suffice
  • Team lacks event sourcing experience
  • Performance of event replay is prohibitive
  • Audit trail isn’t critical
  • Domain is simple and stable

Conclusion

Event sourcing stores all state changes as immutable events, providing complete audit trails, temporal queries, and event replay capabilities. Implement event stores with optimistic concurrency control, reconstruct aggregates by replaying events, use snapshots for performance optimization, and combine with CQRS for efficient queries. Version events for schema evolution, keep aggregates small, and monitor event store growth. Event sourcing excels in domains requiring audit trails, temporal queries, and complex business logic, but adds complexity that may not be justified for simple CRUD applications.

Resources

Comments

Share this article

Scan to read on mobile

👍 Was this article helpful?