Introduction
Event sourcing is an architectural pattern where all changes to application state are stored as a sequence of immutable events. Instead of storing the current state, you store the history of state changes. The current state is derived by replaying these events.
This approach provides several powerful capabilities:
Complete Audit Trail: Every state change is recorded with full context Temporal Queries: Query the state at any point in time Event Replay: Reconstruct state by replaying events Debugging: Understand exactly how the system reached its current state Event-Driven Architecture: Events can trigger side effects and integrations Compliance: Meet regulatory requirements for audit and traceability
Event sourcing is particularly valuable in domains like finance, healthcare, e-commerce, and any system where understanding the history of changes is critical.
This guide covers event store implementation, aggregate reconstruction, snapshots for performance, CQRS integration, event versioning, and production patterns.
Core Concepts
Event: Immutable record of something that happened (past tense: OrderPlaced, UserRegistered) Event Store: Database optimized for appending and reading events Aggregate: Domain object whose state is derived from events Event Stream: Sequence of events for a specific aggregate Snapshot: Point-in-time state capture for performance optimization Projection: Read model built from events (CQRS pattern) Event Version: Schema version for event evolution
Event Store Implementation
An event store is an append-only database optimized for storing and retrieving events. It provides strong consistency guarantees and efficient event stream reads.
Basic Event Store
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Any, Optional
import json
import uuid
import sqlite3
@dataclass
class Event:
"""Immutable event record."""
id: str
aggregate_id: str
aggregate_type: str
event_type: str
data: Dict[str, Any]
metadata: Dict[str, Any]
timestamp: datetime
version: int
def to_dict(self) -> Dict[str, Any]:
return {
**asdict(self),
'timestamp': self.timestamp.isoformat()
}
class EventStore:
"""Append-only event store with optimistic concurrency control."""
def __init__(self, connection_string: str):
self.conn = sqlite3.connect(connection_string)
self.conn.row_factory = sqlite3.Row
self._create_schema()
def _create_schema(self):
"""Create event store schema."""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
aggregate_id TEXT NOT NULL,
aggregate_type TEXT NOT NULL,
event_type TEXT NOT NULL,
data TEXT NOT NULL,
metadata TEXT NOT NULL,
timestamp TEXT NOT NULL,
version INTEGER NOT NULL,
UNIQUE(aggregate_id, version)
)
""")
# Index for efficient stream reads
self.conn.execute("""
CREATE INDEX IF NOT EXISTS idx_aggregate_stream
ON events(aggregate_id, version)
""")
# Index for event type queries
self.conn.execute("""
CREATE INDEX IF NOT EXISTS idx_event_type
ON events(event_type, timestamp)
""")
self.conn.commit()
def append(self, aggregate_id: str, aggregate_type: str,
events: List[Event], expected_version: int) -> None:
"""
Append events to stream with optimistic concurrency control.
Args:
aggregate_id: Aggregate identifier
aggregate_type: Type of aggregate
events: Events to append
expected_version: Expected current version (for concurrency)
Raises:
ConcurrencyError: If expected_version doesn't match actual version
"""
cursor = self.conn.cursor()
try:
# Check current version
cursor.execute(
"SELECT MAX(version) as current_version FROM events WHERE aggregate_id = ?",
(aggregate_id,)
)
row = cursor.fetchone()
current_version = row['current_version'] if row['current_version'] else 0
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, but current is {current_version}"
)
# Append events
for i, event in enumerate(events):
version = expected_version + i + 1
cursor.execute("""
INSERT INTO events
(id, aggregate_id, aggregate_type, event_type, data, metadata, timestamp, version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
event.id,
aggregate_id,
aggregate_type,
event.event_type,
json.dumps(event.data),
json.dumps(event.metadata),
event.timestamp.isoformat(),
version
))
self.conn.commit()
except sqlite3.IntegrityError as e:
self.conn.rollback()
raise ConcurrencyError(f"Concurrency conflict: {e}")
def get_events(self, aggregate_id: str, from_version: int = 0,
to_version: Optional[int] = None) -> List[Event]:
"""
Get events for an aggregate.
Args:
aggregate_id: Aggregate identifier
from_version: Start version (exclusive)
to_version: End version (inclusive), None for all
Returns:
List of events ordered by version
"""
if to_version:
query = """
SELECT * FROM events
WHERE aggregate_id = ? AND version > ? AND version <= ?
ORDER BY version
"""
params = (aggregate_id, from_version, to_version)
else:
query = """
SELECT * FROM events
WHERE aggregate_id = ? AND version > ?
ORDER BY version
"""
params = (aggregate_id, from_version)
cursor = self.conn.execute(query, params)
return [self._row_to_event(row) for row in cursor.fetchall()]
def get_all_events(self, from_timestamp: Optional[datetime] = None,
event_types: Optional[List[str]] = None) -> List[Event]:
"""Get all events, optionally filtered by timestamp and type."""
query = "SELECT * FROM events WHERE 1=1"
params = []
if from_timestamp:
query += " AND timestamp > ?"
params.append(from_timestamp.isoformat())
if event_types:
placeholders = ','.join('?' * len(event_types))
query += f" AND event_type IN ({placeholders})"
params.extend(event_types)
query += " ORDER BY timestamp, version"
cursor = self.conn.execute(query, params)
return [self._row_to_event(row) for row in cursor.fetchall()]
def get_current_version(self, aggregate_id: str) -> int:
"""Get current version of aggregate."""
cursor = self.conn.execute(
"SELECT MAX(version) as version FROM events WHERE aggregate_id = ?",
(aggregate_id,)
)
row = cursor.fetchone()
return row['version'] if row['version'] else 0
def _row_to_event(self, row) -> Event:
"""Convert database row to Event."""
return Event(
id=row['id'],
aggregate_id=row['aggregate_id'],
aggregate_type=row['aggregate_type'],
event_type=row['event_type'],
data=json.loads(row['data']),
metadata=json.loads(row['metadata']),
timestamp=datetime.fromisoformat(row['timestamp']),
version=row['version']
)
class ConcurrencyError(Exception):
"""Raised when optimistic concurrency check fails."""
pass
PostgreSQL Event Store
import psycopg2
from psycopg2.extras import RealDictCursor
class PostgreSQLEventStore:
"""Production-ready event store using PostgreSQL."""
def __init__(self, connection_string: str):
self.conn = psycopg2.connect(connection_string)
self._create_schema()
def _create_schema(self):
"""Create event store schema with partitioning."""
with self.conn.cursor() as cursor:
# Events table with partitioning by aggregate_type
cursor.execute("""
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
data JSONB NOT NULL,
metadata JSONB NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
version INTEGER NOT NULL,
CONSTRAINT unique_aggregate_version UNIQUE(aggregate_id, version)
) PARTITION BY LIST (aggregate_type)
""")
# Indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_events_aggregate_stream
ON events(aggregate_id, version)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_events_type_timestamp
ON events(event_type, timestamp)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_events_data_gin
ON events USING GIN (data)
""")
self.conn.commit()
def append(self, aggregate_id: str, aggregate_type: str,
events: List[Event], expected_version: int) -> None:
"""Append events with optimistic locking."""
with self.conn.cursor() as cursor:
# Lock aggregate row
cursor.execute("""
SELECT MAX(version) as current_version
FROM events
WHERE aggregate_id = %s
FOR UPDATE
""", (aggregate_id,))
row = cursor.fetchone()
current_version = row[0] if row[0] else 0
if current_version != expected_version:
self.conn.rollback()
raise ConcurrencyError(
f"Expected {expected_version}, got {current_version}"
)
# Insert events
for i, event in enumerate(events):
version = expected_version + i + 1
cursor.execute("""
INSERT INTO events
(id, aggregate_id, aggregate_type, event_type, data, metadata, timestamp, version)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
event.id,
aggregate_id,
aggregate_type,
event.event_type,
json.dumps(event.data),
json.dumps(event.metadata),
event.timestamp,
version
))
self.conn.commit()
Aggregate Reconstruction
class User:
"""Aggregate reconstructed from events."""
def __init__(self, user_id: str):
self.id = user_id
self.email = None
self.name = None
self.is_active = False
self.version = 0
self._events = []
def load_from_events(self, events: List[Event]):
for event in events:
self._apply(event)
self.version = event.version
def _apply(self, event: Event):
if event.event_type == "user.created":
self.email = event.data["email"]
self.name = event.data["name"]
self.is_active = True
elif event.event_type == "user.email_changed":
self.email = event.data["new_email"]
elif event.event_type == "user.deactivated":
self.is_active = False
def change_email(self, new_email: str):
if not new_email or "@" not in new_email:
raise ValueError("Invalid email")
event = Event(
id=str(uuid.uuid4()),
aggregate_id=self.id,
event_type="user.email_changed",
data={"old_email": self.email, "new_email": new_email},
timestamp=datetime.utcnow(),
version=self.version + 1
)
self._apply(event)
self._events.append(event)
self.version += 1
def get_uncommitted_events(self) -> List[Event]:
events = self._events.copy()
self._events = []
return events
Snapshots
class SnapshotStore:
"""Store aggregate snapshots for performance."""
def __init__(self, event_store: EventStore):
self.event_store = event_store
self.snapshots = {}
def save_snapshot(self, aggregate_id: str, snapshot: dict, version: int):
self.snapshots[aggregate_id] = {
"data": snapshot,
"version": version
}
def get_aggregate(self, aggregate_id: str, factory) -> object:
# Check for snapshot
if aggregate_id in self.snapshots:
snapshot = self.snapshots[aggregate_id]
aggregate = factory(aggregate_id)
aggregate.load_from_snapshot(snapshot["data"])
# Get events after snapshot
events = self.event_store.get_events(
aggregate_id, snapshot["version"]
)
aggregate.load_from_events(events)
return aggregate
# No snapshot, load all events
events = self.event_store.get_events(aggregate_id)
aggregate = factory(aggregate_id)
aggregate.load_from_events(events)
return aggregate
Conclusion
Event sourcing provides complete audit trails and temporal queries. Store all events, reconstruct aggregates by replaying, use snapshots for performance. Combine with CQRS for read optimization.
Resources
- “Event Sourcing” by Martin Fowler
- EventStoreDB Documentation
Aggregate Reconstruction with Event Sourcing
Aggregates are reconstructed by replaying their event stream. Each event is applied to update the aggregate’s state.
Complete Aggregate Example
from typing import List, Optional
from decimal import Decimal
import uuid
class BankAccount:
"""Bank account aggregate reconstructed from events."""
def __init__(self, account_id: str):
self.id = account_id
self.owner_id: Optional[str] = None
self.balance: Decimal = Decimal('0')
self.is_active: bool = False
self.overdraft_limit: Decimal = Decimal('0')
self.version: int = 0
self._uncommitted_events: List[Event] = []
@classmethod
def create(cls, account_id: str, owner_id: str, initial_deposit: Decimal):
"""Factory method to create new account."""
account = cls(account_id)
event = Event(
id=str(uuid.uuid4()),
aggregate_id=account_id,
aggregate_type='BankAccount',
event_type='AccountOpened',
data={
'owner_id': owner_id,
'initial_deposit': str(initial_deposit)
},
metadata={'user_id': owner_id},
timestamp=datetime.utcnow(),
version=1
)
account._apply(event)
account._uncommitted_events.append(event)
return account
def load_from_history(self, events: List[Event]):
"""Reconstruct aggregate from event history."""
for event in events:
self._apply(event)
self.version = event.version
def _apply(self, event: Event):
"""Apply event to update state."""
event_type = event.event_type
if event_type == 'AccountOpened':
self.owner_id = event.data['owner_id']
self.balance = Decimal(event.data['initial_deposit'])
self.is_active = True
elif event_type == 'MoneyDeposited':
self.balance += Decimal(event.data['amount'])
elif event_type == 'MoneyWithdrawn':
self.balance -= Decimal(event.data['amount'])
elif event_type == 'OverdraftLimitSet':
self.overdraft_limit = Decimal(event.data['limit'])
elif event_type == 'AccountClosed':
self.is_active = False
def deposit(self, amount: Decimal, description: str):
"""Deposit money into account."""
if not self.is_active:
raise ValueError("Account is closed")
if amount <= 0:
raise ValueError("Amount must be positive")
event = self._create_event('MoneyDeposited', {
'amount': str(amount),
'description': description,
'balance_after': str(self.balance + amount)
})
self._apply(event)
self._uncommitted_events.append(event)
def withdraw(self, amount: Decimal, description: str):
"""Withdraw money from account."""
if not self.is_active:
raise ValueError("Account is closed")
if amount <= 0:
raise ValueError("Amount must be positive")
if self.balance - amount < -self.overdraft_limit:
raise ValueError("Insufficient funds")
event = self._create_event('MoneyWithdrawn', {
'amount': str(amount),
'description': description,
'balance_after': str(self.balance - amount)
})
self._apply(event)
self._uncommitted_events.append(event)
def set_overdraft_limit(self, limit: Decimal):
"""Set overdraft limit."""
if not self.is_active:
raise ValueError("Account is closed")
if limit < 0:
raise ValueError("Overdraft limit cannot be negative")
event = self._create_event('OverdraftLimitSet', {
'limit': str(limit),
'previous_limit': str(self.overdraft_limit)
})
self._apply(event)
self._uncommitted_events.append(event)
def close(self):
"""Close account."""
if not self.is_active:
raise ValueError("Account already closed")
if self.balance != 0:
raise ValueError("Cannot close account with non-zero balance")
event = self._create_event('AccountClosed', {
'final_balance': str(self.balance)
})
self._apply(event)
self._uncommitted_events.append(event)
def _create_event(self, event_type: str, data: dict) -> Event:
"""Create new event."""
return Event(
id=str(uuid.uuid4()),
aggregate_id=self.id,
aggregate_type='BankAccount',
event_type=event_type,
data=data,
metadata={'timestamp': datetime.utcnow().isoformat()},
timestamp=datetime.utcnow(),
version=self.version + 1
)
def get_uncommitted_events(self) -> List[Event]:
"""Get events that haven't been persisted."""
events = self._uncommitted_events.copy()
self._uncommitted_events.clear()
return events
Repository Pattern for Event-Sourced Aggregates
class EventSourcedRepository:
"""Repository for event-sourced aggregates."""
def __init__(self, event_store: EventStore, aggregate_factory):
self.event_store = event_store
self.aggregate_factory = aggregate_factory
def get(self, aggregate_id: str):
"""Load aggregate from event store."""
events = self.event_store.get_events(aggregate_id)
if not events:
return None
aggregate = self.aggregate_factory(aggregate_id)
aggregate.load_from_history(events)
return aggregate
def save(self, aggregate):
"""Save aggregate by persisting uncommitted events."""
uncommitted_events = aggregate.get_uncommitted_events()
if not uncommitted_events:
return
self.event_store.append(
aggregate_id=aggregate.id,
aggregate_type=aggregate.__class__.__name__,
events=uncommitted_events,
expected_version=aggregate.version - len(uncommitted_events)
)
# Usage
event_store = EventStore('events.db')
account_repo = EventSourcedRepository(event_store, BankAccount)
# Create new account
account = BankAccount.create('acc-123', 'user-456', Decimal('1000'))
account_repo.save(account)
# Load and modify
account = account_repo.get('acc-123')
account.deposit(Decimal('500'), 'Salary')
account.withdraw(Decimal('200'), 'Rent')
account_repo.save(account)
Snapshots for Performance
As event streams grow, replaying thousands of events becomes slow. Snapshots capture aggregate state at a point in time, allowing faster reconstruction.
Snapshot Implementation
@dataclass
class Snapshot:
"""Aggregate state snapshot."""
aggregate_id: str
aggregate_type: str
version: int
state: Dict[str, Any]
timestamp: datetime
class SnapshotStore:
"""Store and retrieve snapshots."""
def __init__(self, connection_string: str):
self.conn = sqlite3.connect(connection_string)
self._create_schema()
def _create_schema(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
aggregate_id TEXT PRIMARY KEY,
aggregate_type TEXT NOT NULL,
version INTEGER NOT NULL,
state TEXT NOT NULL,
timestamp TEXT NOT NULL
)
""")
self.conn.commit()
def save_snapshot(self, snapshot: Snapshot):
"""Save or update snapshot."""
self.conn.execute("""
INSERT OR REPLACE INTO snapshots
(aggregate_id, aggregate_type, version, state, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (
snapshot.aggregate_id,
snapshot.aggregate_type,
snapshot.version,
json.dumps(snapshot.state),
snapshot.timestamp.isoformat()
))
self.conn.commit()
def get_snapshot(self, aggregate_id: str) -> Optional[Snapshot]:
"""Get latest snapshot for aggregate."""
cursor = self.conn.execute(
"SELECT * FROM snapshots WHERE aggregate_id = ?",
(aggregate_id,)
)
row = cursor.fetchone()
if not row:
return None
return Snapshot(
aggregate_id=row[0],
aggregate_type=row[1],
version=row[2],
state=json.loads(row[3]),
timestamp=datetime.fromisoformat(row[4])
)
class SnapshotRepository:
"""Repository with snapshot support."""
def __init__(self, event_store: EventStore, snapshot_store: SnapshotStore,
aggregate_factory, snapshot_frequency: int = 100):
self.event_store = event_store
self.snapshot_store = snapshot_store
self.aggregate_factory = aggregate_factory
self.snapshot_frequency = snapshot_frequency
def get(self, aggregate_id: str):
"""Load aggregate using snapshot + events."""
# Try to load from snapshot
snapshot = self.snapshot_store.get_snapshot(aggregate_id)
if snapshot:
# Load from snapshot
aggregate = self.aggregate_factory(aggregate_id)
aggregate.load_from_snapshot(snapshot.state)
aggregate.version = snapshot.version
# Load events after snapshot
events = self.event_store.get_events(aggregate_id, snapshot.version)
aggregate.load_from_history(events)
else:
# Load all events
events = self.event_store.get_events(aggregate_id)
if not events:
return None
aggregate = self.aggregate_factory(aggregate_id)
aggregate.load_from_history(events)
return aggregate
def save(self, aggregate):
"""Save aggregate and create snapshot if needed."""
uncommitted_events = aggregate.get_uncommitted_events()
if not uncommitted_events:
return
# Save events
self.event_store.append(
aggregate_id=aggregate.id,
aggregate_type=aggregate.__class__.__name__,
events=uncommitted_events,
expected_version=aggregate.version - len(uncommitted_events)
)
# Create snapshot if threshold reached
if aggregate.version % self.snapshot_frequency == 0:
snapshot = Snapshot(
aggregate_id=aggregate.id,
aggregate_type=aggregate.__class__.__name__,
version=aggregate.version,
state=aggregate.to_snapshot(),
timestamp=datetime.utcnow()
)
self.snapshot_store.save_snapshot(snapshot)
# Add snapshot support to aggregate
class BankAccount:
# ... existing code ...
def to_snapshot(self) -> Dict[str, Any]:
"""Serialize state for snapshot."""
return {
'owner_id': self.owner_id,
'balance': str(self.balance),
'is_active': self.is_active,
'overdraft_limit': str(self.overdraft_limit)
}
def load_from_snapshot(self, state: Dict[str, Any]):
"""Restore state from snapshot."""
self.owner_id = state['owner_id']
self.balance = Decimal(state['balance'])
self.is_active = state['is_active']
self.overdraft_limit = Decimal(state['overdraft_limit'])
CQRS: Command Query Responsibility Segregation
CQRS separates write models (commands) from read models (queries). Event sourcing naturally fits CQRS—events update write models and project into read models.
CQRS with Projections
class AccountProjection:
"""Read model for account queries."""
def __init__(self, connection_string: str):
self.conn = sqlite3.connect(connection_string)
self._create_schema()
def _create_schema(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS account_summary (
account_id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL,
balance TEXT NOT NULL,
is_active BOOLEAN NOT NULL,
last_transaction_date TEXT,
transaction_count INTEGER DEFAULT 0
)
""")
self.conn.commit()
def handle_event(self, event: Event):
"""Update read model based on event."""
if event.event_type == 'AccountOpened':
self.conn.execute("""
INSERT INTO account_summary
(account_id, owner_id, balance, is_active, transaction_count)
VALUES (?, ?, ?, ?, ?)
""", (
event.aggregate_id,
event.data['owner_id'],
event.data['initial_deposit'],
True,
1
))
elif event.event_type == 'MoneyDeposited':
self.conn.execute("""
UPDATE account_summary
SET balance = ?,
last_transaction_date = ?,
transaction_count = transaction_count + 1
WHERE account_id = ?
""", (
event.data['balance_after'],
event.timestamp.isoformat(),
event.aggregate_id
))
elif event.event_type == 'MoneyWithdrawn':
self.conn.execute("""
UPDATE account_summary
SET balance = ?,
last_transaction_date = ?,
transaction_count = transaction_count + 1
WHERE account_id = ?
""", (
event.data['balance_after'],
event.timestamp.isoformat(),
event.aggregate_id
))
elif event.event_type == 'AccountClosed':
self.conn.execute("""
UPDATE account_summary
SET is_active = ?
WHERE account_id = ?
""", (False, event.aggregate_id))
self.conn.commit()
def get_account_summary(self, account_id: str) -> Optional[Dict]:
"""Query read model."""
cursor = self.conn.execute(
"SELECT * FROM account_summary WHERE account_id = ?",
(account_id,)
)
row = cursor.fetchone()
if not row:
return None
return {
'account_id': row[0],
'owner_id': row[1],
'balance': row[2],
'is_active': bool(row[3]),
'last_transaction_date': row[4],
'transaction_count': row[5]
}
def get_accounts_by_owner(self, owner_id: str) -> List[Dict]:
"""Query accounts by owner."""
cursor = self.conn.execute(
"SELECT * FROM account_summary WHERE owner_id = ?",
(owner_id,)
)
return [dict(row) for row in cursor.fetchall()]
Event Versioning and Schema Evolution
Events are immutable, but schemas evolve. Handle this with event versioning and upcasting.
Event Versioning Pattern
@dataclass
class VersionedEvent(Event):
"""Event with schema version."""
schema_version: int = 1
class EventUpcaster:
"""Convert old event versions to current schema."""
def upcast(self, event: Event) -> Event:
"""Upcast event to latest version."""
schema_version = event.metadata.get('schema_version', 1)
if event.event_type == 'MoneyDeposited':
if schema_version == 1:
# V1: Only had amount
# V2: Added description and category
event.data['description'] = event.data.get('description', 'Deposit')
event.data['category'] = 'general'
event.metadata['schema_version'] = 2
return event
# Usage in repository
class EventSourcedRepository:
def __init__(self, event_store, aggregate_factory, upcaster=None):
self.event_store = event_store
self.aggregate_factory = aggregate_factory
self.upcaster = upcaster or EventUpcaster()
def get(self, aggregate_id: str):
events = self.event_store.get_events(aggregate_id)
if not events:
return None
# Upcast events to current schema
events = [self.upcaster.upcast(e) for e in events]
aggregate = self.aggregate_factory(aggregate_id)
aggregate.load_from_history(events)
return aggregate
Best Practices
- Events are immutable — never modify or delete events
- Events are past tense — OrderPlaced, not PlaceOrder
- Events contain all data — no foreign key lookups needed
- Use optimistic concurrency — prevent lost updates
- Snapshot strategically — every 100-1000 events
- Version events — plan for schema evolution
- Keep aggregates small — large event streams are slow
- Use CQRS for queries — don’t query event store directly
- Handle idempotency — same command twice = same result
- Monitor event store size — archive old events if needed
When to Use Event Sourcing
Use event sourcing when:
- Complete audit trail is required (finance, healthcare, legal)
- Temporal queries are needed (state at any point in time)
- Event-driven architecture is beneficial
- Domain has complex business rules
- Debugging requires understanding state changes
Don’t use event sourcing when:
- Simple CRUD operations suffice
- Team lacks event sourcing experience
- Performance of event replay is prohibitive
- Audit trail isn’t critical
- Domain is simple and stable
Conclusion
Event sourcing stores all state changes as immutable events, providing complete audit trails, temporal queries, and event replay capabilities. Implement event stores with optimistic concurrency control, reconstruct aggregates by replaying events, use snapshots for performance optimization, and combine with CQRS for efficient queries. Version events for schema evolution, keep aggregates small, and monitor event store growth. Event sourcing excels in domains requiring audit trails, temporal queries, and complex business logic, but adds complexity that may not be justified for simple CRUD applications.
Resources
- Event Sourcing by Martin Fowler
- EventStoreDB Documentation
- Implementing Domain-Driven Design (book)
- CQRS Journey by Microsoft
- Event Sourcing Patterns
- Axon Framework - Event sourcing framework for Java
Comments