Introduction
Event sourcing is a architectural pattern that fundamentally changes how we think about state. Instead of storing the current state of an entity, event sourcing stores every change as an immutable event. This seemingly simple shift unlocks powerful capabilities: complete audit trails, temporal queries, event replay, and natural integration with event-driven systems.
This comprehensive guide covers event sourcing from basics to advanced patterns, including implementation strategies, common challenges, and how event sourcing naturally pairs with CQRS (Command Query Responsibility Segregation).
Understanding Event Sourcing
The Fundamental Difference
Traditional approaches store the current state:
# Traditional: Store current state
class BankAccount:
def __init__(self):
self.balance = 0
def deposit(self, amount):
self.balance += amount
def withdraw(self, amount):
self.balance -= amount
# State is overwritten on each change
Event sourcing stores every change as an immutable event:
# Event Sourcing: Store events, derive state
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
import uuid
# Events are immutable facts
@dataclass
class Event:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
occurred_at: datetime = field(default_factory=datetime.utcnow)
aggregate_id: str
@dataclass
class AccountCreated(Event):
initial_balance: float = 0
owner: str = ""
@dataclass
class MoneyDeposited(Event):
amount: float
deposit_method: str = "cash"
@dataclass
class MoneyWithdrawn(Event):
amount: float
withdrawal_method: str = "cash"
@dataclass
class TransferCompleted(Event):
from_account: str
to_account: str
amount: float
Why Events Are Different
| Aspect | Traditional | Event Sourcing |
|---|---|---|
| Storage | Current state | History of changes |
| Mutability | Overwrites state | Appends events |
| Audit | Requires explicit logging | Built-in |
| Queries | Current state only | Any point in time |
| Debugging | Limited | Full replay possible |
Implementing Event Sourcing
The Event Store
from typing import List, Callable, Optional
from datetime import datetime
import json
class EventStore:
"""Append-only store for events."""
def __init__(self):
self._events: List[Event] = []
self._subscribers: List[Callable[[Event], None]] = []
def append(self, event: Event) -> None:
"""Add an event to the store."""
# Validate event
if not event.event_id:
raise ValueError("Event must have an ID")
self._events.append(event)
# Notify subscribers (for read models, projections)
for subscriber in self._subscribers:
subscriber(event)
def get_events_for_aggregate(
self,
aggregate_id: str,
from_version: Optional[int] = None
) -> List[Event]:
"""Get all events for an aggregate."""
events = [e for e in self._events if e.aggregate_id == aggregate_id]
if from_version:
events = [e for e in events if e.aggregate_version >= from_version]
return sorted(events, key=lambda e: e.occurred_at)
def subscribe(self, handler: Callable[[Event], None]) -> None:
"""Subscribe to new events."""
self._subscribers.append(handler)
def get_all_events(
self,
since: Optional[datetime] = None
) -> List[Event]:
"""Get all events since a given time."""
if since:
return [e for e in self._events if e.occurred_at > since]
return self._events.copy()
Aggregate Root
from abc import ABC, abstractmethod
class AggregateRoot(ABC):
"""Base class for aggregates in event sourcing."""
def __init__(self, aggregate_id: str):
self.id = aggregate_id
self._version = 0
self._pending_events: List[Event] = []
@property
def version(self) -> int:
return self._version
def _apply(self, event: Event) -> None:
"""Apply event to aggregate state."""
self._version += 1
event.aggregate_version = self._version
self._pending_events.append(event)
self._handle_event(event)
@abstractmethod
def _handle_event(self, event: Event) -> None:
"""Handle event - update internal state."""
pass
def get_pending_events(self) -> List[Event]:
"""Get uncommitted events."""
events = self._pending_events.copy()
self._pending_events.clear()
return events
def load_from_history(self, events: List[Event]) -> None:
"""Rebuild aggregate from event history."""
for event in events:
self._handle_event(event)
self._version = event.aggregate_version
class BankAccount(AggregateRoot):
"""Bank account using event sourcing."""
def __init__(self, account_id: str):
super().__init__(account_id)
self.balance = 0
self.owner = ""
self.is_active = False
def _handle_event(self, event: Event) -> None:
if isinstance(event, AccountCreated):
self.balance = event.initial_balance
self.owner = event.owner
self.is_active = True
elif isinstance(event, MoneyDeposited):
self.balance += event.amount
elif isinstance(event, MoneyWithdrawn):
self.balance -= event.amount
elif isinstance(event, TransferCompleted):
if event.from_account == self.id:
self.balance -= event.amount
elif event.to_account == self.id:
self.balance += event.amount
@staticmethod
def create(account_id: str, owner: str, initial_balance: float = 0) -> 'BankAccount':
"""Factory method to create new account."""
account = BankAccount(account_id)
account._apply(AccountCreated(
aggregate_id=account_id,
initial_balance=initial_balance,
owner=owner
))
return account
def deposit(self, amount: float, method: str = "cash") -> None:
if amount <= 0:
raise ValueError("Deposit amount must be positive")
self._apply(MoneyDeposited(
aggregate_id=self.id,
amount=amount,
deposit_method=method
))
def withdraw(self, amount: float, method: str = "cash") -> None:
if amount <= 0:
raise ValueError("Withdrawal amount must be positive")
if self.balance < amount:
raise InsufficientFundsError(f"Balance {self.balance} < {amount}")
self._apply(MoneyWithdrawn(
aggregate_id=self.id,
amount=amount,
withdrawal_method=method
))
def transfer_to(self, amount: float, to_account: str) -> None:
if self.balance < amount:
raise InsufficientFundsError(f"Balance {self.balance} < {amount}")
self._apply(TransferCompleted(
aggregate_id=self.id,
from_account=self.id,
to_account=to_account,
amount=amount
))
Projections and Read Models
Building Projections
class AccountProjection:
"""Read model for account queries."""
def __init__(self, event_store: EventStore):
self.event_store = event_store
self._accounts: dict = {}
# Subscribe to updates
event_store.subscribe(self._handle_event)
def _handle_event(self, event: Event) -> None:
if isinstance(event, AccountCreated):
self._accounts[event.aggregate_id] = {
"id": event.aggregate_id,
"owner": event.owner,
"balance": event.initial_balance,
"status": "active",
"created_at": event.occurred_at
}
elif isinstance(event, MoneyDeposited):
if event.aggregate_id in self._accounts:
self._accounts[event.aggregate_id]["balance"] += event.amount
elif isinstance(event, MoneyWithdrawn):
if event.aggregate_id in self._accounts:
self._accounts[event.aggregate_id]["balance"] -= event.amount
def get_account(self, account_id: str) -> Optional[dict]:
return self._accounts.get(account_id)
def get_all_accounts(self) -> List[dict]:
return list(self._accounts.values())
def get_accounts_by_owner(self, owner: str) -> List[dict]:
return [a for a in self._accounts.values() if a["owner"] == owner]
Temporal Queries
class TemporalQueryService:
"""Query state at any point in time."""
def __init__(self, event_store: EventStore):
self.event_store = event_store
def get_balance_at(
self,
account_id: str,
as_of: datetime
) -> float:
"""Get account balance at a specific time."""
events = self.event_store.get_events_for_aggregate(account_id)
# Filter events up to the specified time
past_events = [e for e in events if e.occurred_at <= as_of]
# Rebuild state
balance = 0
for event in past_events:
if isinstance(event, (MoneyDeposited, TransferCompleted)):
balance += event.amount
elif isinstance(event, MoneyWithdrawn):
balance -= event.amount
return balance
def get_account_history(
self,
account_id: str,
from_time: Optional[datetime] = None,
to_time: Optional[datetime] = None
) -> List[Event]:
"""Get all events in a time range."""
events = self.event_store.get_events_for_aggregate(account_id)
if from_time:
events = [e for e in events if e.occurred_at >= from_time]
if to_time:
events = [e for e in events if e.occurred_at <= to_time]
return events
Event Versioning
Schema Evolution
Events are immutable, but your understanding changes. Handle versioning:
from typing import Dict, Any
import json
class VersionedEvent:
"""Event with version information."""
def __init__(
self,
event_type: str,
version: int,
data: Dict[str, Any]
):
self.event_type = event_type
self.version = version
self.data = data
def migrate(self, target_version: int) -> 'VersionedEvent':
"""Migrate event to target version."""
if self.version == target_version:
return self
# Apply migrations sequentially
current = self
for v in range(self.version, target_version):
migrator = migration_strategies.get(f"{self.event_type}_v{v}")
if migrator:
current = migrator(current)
return current
# Migration functions
def migrate_event_v1_to_v2(event: VersionedEvent) -> VersionedEvent:
"""Example migration: add new field."""
new_data = event.data.copy()
new_data["new_field"] = new_data.get("default_value", "value")
return VersionedEvent(
event_type=event.event_type,
version=2,
data=new_data
)
migration_strategies = {
"UserCreated_v1": migrate_event_v1_to_v2,
}
Upcasting
class EventUpcaster:
"""Transform old event formats to new."""
def __init__(self):
self._upcasters: Dict[str, Callable] = {}
def register(
self,
event_type: str,
from_version: int,
upcaster: Callable
) -> None:
key = f"{event_type}_v{from_version}"
self._upcasters[key] = upcaster
def upcast(self, event: Dict) -> Dict:
"""Upcast event to current version."""
event_type = event.get("type", "")
version = event.get("version", 1)
key = f"{event_type}_v{version}"
if key in self._upcasters:
return self._upcasters[key](event)
return event
CQRS with Event Sourcing
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CQRS + Event Sourcing โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Commands Events Queries โ
โ โโโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโ โ
โ โ Create โโโโโโโโโโโโถโ Event โโโโโโโโโโโโโโโโถโ Read โ โ
โ โ Account โ โ Store โ โ Model โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโ โ
โ โ โ โ โ
โ โ โผ โ โ
โ โ โโโโโโโโโโโโโโโ โ โ
โ โโโโโโผโโโโโ โ Projections โ โ โ
โ โ Command โ โ (denormalize)โโโโโโโโโโโโโโโโโ โ
โ โ Handler โ โโโโโโโโโโโโโโโ โ
โ โโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Implementation
class CommandHandler:
"""Handle commands, produce events."""
def __init__(self, event_store: EventStore):
self.event_store = event_store
def handle(self, command) -> Event:
"""Process command and return event."""
if isinstance(command, CreateAccountCommand):
account = BankAccount.create(
command.account_id,
command.owner,
command.initial_balance
)
events = account.get_pending_events()
for event in events:
self.event_store.append(event)
return events[0]
elif isinstance(command, DepositCommand):
# Load aggregate
events = self.event_store.get_events_for_aggregate(
command.account_id
)
account = BankAccount(command.account_id)
account.load_from_history(events)
# Apply command
account.deposit(command.amount, command.method)
# Persist events
for event in account.get_pending_events():
self.event_store.append(event)
return event
# ... other commands
class QueryHandler:
"""Handle queries using read models."""
def __init__(self, projection: AccountProjection):
self.projection = projection
def get_account(self, account_id: str) -> Optional[dict]:
return self.projection.get_account(account_id)
def get_balance(self, account_id: str) -> float:
account = self.get_account(account_id)
return account["balance"] if account else 0
Snapshots
Optimizing Performance
class SnapshotStore:
"""Store snapshots to optimize event replay."""
def __init__(self):
self._snapshots: dict = {}
def save_snapshot(
self,
aggregate_id: str,
state: dict,
version: int
) -> None:
key = f"{aggregate_id}"
self._snapshots[key] = {
"aggregate_id": aggregate_id,
"state": state,
"version": version,
"saved_at": datetime.utcnow()
}
def get_snapshot(self, aggregate_id: str) -> Optional[dict]:
return self._snapshots.get(aggregate_id)
class SnapshottingAggregate(AggregateRoot):
"""Aggregate with snapshot support."""
def __init__(self, aggregate_id: str, snapshot_store: SnapshotStore):
super().__init__(aggregate_id)
self.snapshot_store = snapshot_store
def load_from_history(self, events: List[Event]) -> None:
# Try to load from snapshot
snapshot = self.snapshot_store.get_snapshot(self.id)
if snapshot:
self._restore_from_snapshot(snapshot)
# Apply events after snapshot
remaining_events = [
e for e in events
if e.aggregate_version > snapshot["version"]
]
else:
remaining_events = events
for event in remaining_events:
self._handle_event(event)
def _restore_from_snapshot(self, snapshot: dict) -> None:
self.balance = snapshot["state"]["balance"]
self.owner = snapshot["state"]["owner"]
self._version = snapshot["version"]
Best Practices
Event Design
# Good event design principles
GOOD_EVENTS = """
1. Events represent facts, not commands
- โ AccountCreated, MoneyDeposited
- โ CreateAccount, DepositMoney
2. Events are named in past tense
- โ OrderShipped, PaymentProcessed
- โ ShipOrder, ProcessPayment
3. Events contain what happened, not what to do
- Include all necessary data
- Avoid referencing other aggregates by ID when possible
4. Version events from the start
- Add version field to all events
- Plan for migrations
"""
Performance Tips
# Optimize event sourcing performance
PERFORMANCE_TIPS = {
"snapshots": "Save snapshots every N events",
"projections": "Build read models asynchronously",
"batching": "Batch event writes",
"indexing": "Index events by aggregate_id and timestamp",
"compression": "Compress old events"
}
Challenges and Solutions
| Challenge | Solution |
|---|---|
| Eventual consistency | Accept and design for it |
| Large event streams | Use snapshots, archiving |
| Event schema changes | Plan versioning from start |
| Debugging | Use event replay, logging |
| Learning curve | Start with simple domains |
Conclusion
Event sourcing provides powerful capabilities for building audit-ready, temporal, and event-driven applications. While it introduces complexity, the benefitsโcomplete audit trails, natural auditability, and powerful analyticsโmake it invaluable for domains where history matters.
Key takeaways:
- Events are immutable facts about what happened
- Store events, derive state through projections
- Plan for event versioning from the start
- Use snapshots to optimize replay
- CQRS pairs naturally with event sourcing
Resources
- Event Sourcing Martin Fowler - Original event sourcing article
- Event Store Documentation - Event Store DB guide
- Axon Framework - Event sourcing framework for Java
- Eventuate - Event sourcing platform
- Greg Young’s Event Store - Open source event store
Comments