Skip to main content
โšก Calmops

Event Sourcing: Building Systems with Complete Audit Trails

Introduction

Event sourcing is a architectural pattern that fundamentally changes how we think about state. Instead of storing the current state of an entity, event sourcing stores every change as an immutable event. This seemingly simple shift unlocks powerful capabilities: complete audit trails, temporal queries, event replay, and natural integration with event-driven systems.

This comprehensive guide covers event sourcing from basics to advanced patterns, including implementation strategies, common challenges, and how event sourcing naturally pairs with CQRS (Command Query Responsibility Segregation).

Understanding Event Sourcing

The Fundamental Difference

Traditional approaches store the current state:

# Traditional: Store current state
class BankAccount:
    def __init__(self):
        self.balance = 0
    
    def deposit(self, amount):
        self.balance += amount
    
    def withdraw(self, amount):
        self.balance -= amount
    
    # State is overwritten on each change

Event sourcing stores every change as an immutable event:

# Event Sourcing: Store events, derive state
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
import uuid

# Events are immutable facts
@dataclass
class Event:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    occurred_at: datetime = field(default_factory=datetime.utcnow)
    aggregate_id: str

@dataclass
class AccountCreated(Event):
    initial_balance: float = 0
    owner: str = ""

@dataclass
class MoneyDeposited(Event):
    amount: float
    deposit_method: str = "cash"

@dataclass
class MoneyWithdrawn(Event):
    amount: float
    withdrawal_method: str = "cash"

@dataclass
class TransferCompleted(Event):
    from_account: str
    to_account: str
    amount: float

Why Events Are Different

Aspect Traditional Event Sourcing
Storage Current state History of changes
Mutability Overwrites state Appends events
Audit Requires explicit logging Built-in
Queries Current state only Any point in time
Debugging Limited Full replay possible

Implementing Event Sourcing

The Event Store

from typing import List, Callable, Optional
from datetime import datetime
import json

class EventStore:
    """Append-only store for events."""
    
    def __init__(self):
        self._events: List[Event] = []
        self._subscribers: List[Callable[[Event], None]] = []
    
    def append(self, event: Event) -> None:
        """Add an event to the store."""
        # Validate event
        if not event.event_id:
            raise ValueError("Event must have an ID")
        
        self._events.append(event)
        
        # Notify subscribers (for read models, projections)
        for subscriber in self._subscribers:
            subscriber(event)
    
    def get_events_for_aggregate(
        self, 
        aggregate_id: str,
        from_version: Optional[int] = None
    ) -> List[Event]:
        """Get all events for an aggregate."""
        events = [e for e in self._events if e.aggregate_id == aggregate_id]
        
        if from_version:
            events = [e for e in events if e.aggregate_version >= from_version]
        
        return sorted(events, key=lambda e: e.occurred_at)
    
    def subscribe(self, handler: Callable[[Event], None]) -> None:
        """Subscribe to new events."""
        self._subscribers.append(handler)
    
    def get_all_events(
        self, 
        since: Optional[datetime] = None
    ) -> List[Event]:
        """Get all events since a given time."""
        if since:
            return [e for e in self._events if e.occurred_at > since]
        return self._events.copy()

Aggregate Root

from abc import ABC, abstractmethod

class AggregateRoot(ABC):
    """Base class for aggregates in event sourcing."""
    
    def __init__(self, aggregate_id: str):
        self.id = aggregate_id
        self._version = 0
        self._pending_events: List[Event] = []
    
    @property
    def version(self) -> int:
        return self._version
    
    def _apply(self, event: Event) -> None:
        """Apply event to aggregate state."""
        self._version += 1
        event.aggregate_version = self._version
        self._pending_events.append(event)
        self._handle_event(event)
    
    @abstractmethod
    def _handle_event(self, event: Event) -> None:
        """Handle event - update internal state."""
        pass
    
    def get_pending_events(self) -> List[Event]:
        """Get uncommitted events."""
        events = self._pending_events.copy()
        self._pending_events.clear()
        return events
    
    def load_from_history(self, events: List[Event]) -> None:
        """Rebuild aggregate from event history."""
        for event in events:
            self._handle_event(event)
            self._version = event.aggregate_version


class BankAccount(AggregateRoot):
    """Bank account using event sourcing."""
    
    def __init__(self, account_id: str):
        super().__init__(account_id)
        self.balance = 0
        self.owner = ""
        self.is_active = False
    
    def _handle_event(self, event: Event) -> None:
        if isinstance(event, AccountCreated):
            self.balance = event.initial_balance
            self.owner = event.owner
            self.is_active = True
        elif isinstance(event, MoneyDeposited):
            self.balance += event.amount
        elif isinstance(event, MoneyWithdrawn):
            self.balance -= event.amount
        elif isinstance(event, TransferCompleted):
            if event.from_account == self.id:
                self.balance -= event.amount
            elif event.to_account == self.id:
                self.balance += event.amount
    
    @staticmethod
    def create(account_id: str, owner: str, initial_balance: float = 0) -> 'BankAccount':
        """Factory method to create new account."""
        account = BankAccount(account_id)
        account._apply(AccountCreated(
            aggregate_id=account_id,
            initial_balance=initial_balance,
            owner=owner
        ))
        return account
    
    def deposit(self, amount: float, method: str = "cash") -> None:
        if amount <= 0:
            raise ValueError("Deposit amount must be positive")
        
        self._apply(MoneyDeposited(
            aggregate_id=self.id,
            amount=amount,
            deposit_method=method
        ))
    
    def withdraw(self, amount: float, method: str = "cash") -> None:
        if amount <= 0:
            raise ValueError("Withdrawal amount must be positive")
        
        if self.balance < amount:
            raise InsufficientFundsError(f"Balance {self.balance} < {amount}")
        
        self._apply(MoneyWithdrawn(
            aggregate_id=self.id,
            amount=amount,
            withdrawal_method=method
        ))
    
    def transfer_to(self, amount: float, to_account: str) -> None:
        if self.balance < amount:
            raise InsufficientFundsError(f"Balance {self.balance} < {amount}")
        
        self._apply(TransferCompleted(
            aggregate_id=self.id,
            from_account=self.id,
            to_account=to_account,
            amount=amount
        ))

Projections and Read Models

Building Projections

class AccountProjection:
    """Read model for account queries."""
    
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self._accounts: dict = {}
        
        # Subscribe to updates
        event_store.subscribe(self._handle_event)
    
    def _handle_event(self, event: Event) -> None:
        if isinstance(event, AccountCreated):
            self._accounts[event.aggregate_id] = {
                "id": event.aggregate_id,
                "owner": event.owner,
                "balance": event.initial_balance,
                "status": "active",
                "created_at": event.occurred_at
            }
        elif isinstance(event, MoneyDeposited):
            if event.aggregate_id in self._accounts:
                self._accounts[event.aggregate_id]["balance"] += event.amount
        elif isinstance(event, MoneyWithdrawn):
            if event.aggregate_id in self._accounts:
                self._accounts[event.aggregate_id]["balance"] -= event.amount
    
    def get_account(self, account_id: str) -> Optional[dict]:
        return self._accounts.get(account_id)
    
    def get_all_accounts(self) -> List[dict]:
        return list(self._accounts.values())
    
    def get_accounts_by_owner(self, owner: str) -> List[dict]:
        return [a for a in self._accounts.values() if a["owner"] == owner]

Temporal Queries

class TemporalQueryService:
    """Query state at any point in time."""
    
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def get_balance_at(
        self, 
        account_id: str, 
        as_of: datetime
    ) -> float:
        """Get account balance at a specific time."""
        events = self.event_store.get_events_for_aggregate(account_id)
        
        # Filter events up to the specified time
        past_events = [e for e in events if e.occurred_at <= as_of]
        
        # Rebuild state
        balance = 0
        for event in past_events:
            if isinstance(event, (MoneyDeposited, TransferCompleted)):
                balance += event.amount
            elif isinstance(event, MoneyWithdrawn):
                balance -= event.amount
        
        return balance
    
    def get_account_history(
        self, 
        account_id: str,
        from_time: Optional[datetime] = None,
        to_time: Optional[datetime] = None
    ) -> List[Event]:
        """Get all events in a time range."""
        events = self.event_store.get_events_for_aggregate(account_id)
        
        if from_time:
            events = [e for e in events if e.occurred_at >= from_time]
        if to_time:
            events = [e for e in events if e.occurred_at <= to_time]
        
        return events

Event Versioning

Schema Evolution

Events are immutable, but your understanding changes. Handle versioning:

from typing import Dict, Any
import json

class VersionedEvent:
    """Event with version information."""
    
    def __init__(
        self, 
        event_type: str, 
        version: int,
        data: Dict[str, Any]
    ):
        self.event_type = event_type
        self.version = version
        self.data = data
    
    def migrate(self, target_version: int) -> 'VersionedEvent':
        """Migrate event to target version."""
        if self.version == target_version:
            return self
        
        # Apply migrations sequentially
        current = self
        for v in range(self.version, target_version):
            migrator = migration_strategies.get(f"{self.event_type}_v{v}")
            if migrator:
                current = migrator(current)
        
        return current

# Migration functions
def migrate_event_v1_to_v2(event: VersionedEvent) -> VersionedEvent:
    """Example migration: add new field."""
    new_data = event.data.copy()
    new_data["new_field"] = new_data.get("default_value", "value")
    
    return VersionedEvent(
        event_type=event.event_type,
        version=2,
        data=new_data
    )

migration_strategies = {
    "UserCreated_v1": migrate_event_v1_to_v2,
}

Upcasting

class EventUpcaster:
    """Transform old event formats to new."""
    
    def __init__(self):
        self._upcasters: Dict[str, Callable] = {}
    
    def register(
        self, 
        event_type: str, 
        from_version: int,
        upcaster: Callable
    ) -> None:
        key = f"{event_type}_v{from_version}"
        self._upcasters[key] = upcaster
    
    def upcast(self, event: Dict) -> Dict:
        """Upcast event to current version."""
        event_type = event.get("type", "")
        version = event.get("version", 1)
        
        key = f"{event_type}_v{version}"
        
        if key in self._upcasters:
            return self._upcasters[key](event)
        
        return event

CQRS with Event Sourcing

Architecture Overview

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                        CQRS + Event Sourcing                        โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚   Commands                  Events                    Queries      โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”            โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚   โ”‚  Create  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚  Event  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Read   โ”‚  โ”‚
โ”‚   โ”‚  Account โ”‚            โ”‚  Store  โ”‚               โ”‚ Model  โ”‚  โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚         โ”‚                       โ”‚                         โ”‚        โ”‚
โ”‚         โ”‚                       โ–ผ                         โ”‚        โ”‚
โ”‚         โ”‚               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                 โ”‚        โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”           โ”‚ Projections โ”‚                 โ”‚        โ”‚
โ”‚   โ”‚ Command โ”‚           โ”‚ (denormalize)โ”‚โ—€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜        โ”‚
โ”‚   โ”‚ Handler โ”‚           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                         โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                                   โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Implementation

class CommandHandler:
    """Handle commands, produce events."""
    
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def handle(self, command) -> Event:
        """Process command and return event."""
        
        if isinstance(command, CreateAccountCommand):
            account = BankAccount.create(
                command.account_id,
                command.owner,
                command.initial_balance
            )
            events = account.get_pending_events()
            
            for event in events:
                self.event_store.append(event)
            
            return events[0]
        
        elif isinstance(command, DepositCommand):
            # Load aggregate
            events = self.event_store.get_events_for_aggregate(
                command.account_id
            )
            account = BankAccount(command.account_id)
            account.load_from_history(events)
            
            # Apply command
            account.deposit(command.amount, command.method)
            
            # Persist events
            for event in account.get_pending_events():
                self.event_store.append(event)
            
            return event
        
        # ... other commands


class QueryHandler:
    """Handle queries using read models."""
    
    def __init__(self, projection: AccountProjection):
        self.projection = projection
    
    def get_account(self, account_id: str) -> Optional[dict]:
        return self.projection.get_account(account_id)
    
    def get_balance(self, account_id: str) -> float:
        account = self.get_account(account_id)
        return account["balance"] if account else 0

Snapshots

Optimizing Performance

class SnapshotStore:
    """Store snapshots to optimize event replay."""
    
    def __init__(self):
        self._snapshots: dict = {}
    
    def save_snapshot(
        self, 
        aggregate_id: str, 
        state: dict, 
        version: int
    ) -> None:
        key = f"{aggregate_id}"
        self._snapshots[key] = {
            "aggregate_id": aggregate_id,
            "state": state,
            "version": version,
            "saved_at": datetime.utcnow()
        }
    
    def get_snapshot(self, aggregate_id: str) -> Optional[dict]:
        return self._snapshots.get(aggregate_id)


class SnapshottingAggregate(AggregateRoot):
    """Aggregate with snapshot support."""
    
    def __init__(self, aggregate_id: str, snapshot_store: SnapshotStore):
        super().__init__(aggregate_id)
        self.snapshot_store = snapshot_store
    
    def load_from_history(self, events: List[Event]) -> None:
        # Try to load from snapshot
        snapshot = self.snapshot_store.get_snapshot(self.id)
        
        if snapshot:
            self._restore_from_snapshot(snapshot)
            
            # Apply events after snapshot
            remaining_events = [
                e for e in events 
                if e.aggregate_version > snapshot["version"]
            ]
        else:
            remaining_events = events
        
        for event in remaining_events:
            self._handle_event(event)
    
    def _restore_from_snapshot(self, snapshot: dict) -> None:
        self.balance = snapshot["state"]["balance"]
        self.owner = snapshot["state"]["owner"]
        self._version = snapshot["version"]

Best Practices

Event Design

# Good event design principles

GOOD_EVENTS = """
1. Events represent facts, not commands
   - โœ“ AccountCreated, MoneyDeposited
   - โœ— CreateAccount, DepositMoney

2. Events are named in past tense
   - โœ“ OrderShipped, PaymentProcessed
   - โœ— ShipOrder, ProcessPayment

3. Events contain what happened, not what to do
   - Include all necessary data
   - Avoid referencing other aggregates by ID when possible

4. Version events from the start
   - Add version field to all events
   - Plan for migrations
"""

Performance Tips

# Optimize event sourcing performance

PERFORMANCE_TIPS = {
    "snapshots": "Save snapshots every N events",
    "projections": "Build read models asynchronously",
    "batching": "Batch event writes",
    "indexing": "Index events by aggregate_id and timestamp",
    "compression": "Compress old events"
}

Challenges and Solutions

Challenge Solution
Eventual consistency Accept and design for it
Large event streams Use snapshots, archiving
Event schema changes Plan versioning from start
Debugging Use event replay, logging
Learning curve Start with simple domains

Conclusion

Event sourcing provides powerful capabilities for building audit-ready, temporal, and event-driven applications. While it introduces complexity, the benefitsโ€”complete audit trails, natural auditability, and powerful analyticsโ€”make it invaluable for domains where history matters.

Key takeaways:

  • Events are immutable facts about what happened
  • Store events, derive state through projections
  • Plan for event versioning from the start
  • Use snapshots to optimize replay
  • CQRS pairs naturally with event sourcing

Resources

Comments