Introduction
Event sourcing stores all state changes as an immutable event sequence, enabling audit trails, temporal queries, and event replay. This guide covers event sourcing patterns and implementation.
Event Store
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any
import json
@dataclass
class Event:
id: str
aggregate_id: str
event_type: str
data: Dict[str, Any]
timestamp: datetime
version: int
class EventStore:
"""Store and retrieve events."""
def __init__(self, connection):
self.db = connection
def append(self, aggregate_id: str, events: List[Event]):
for event in events:
self.db.execute(
"""INSERT INTO events
(id, aggregate_id, event_type, data, timestamp, version)
VALUES (?, ?, ?, ?, ?, ?)""",
event.id, aggregate_id, event.event_type,
json.dumps(event.data), event.timestamp, event.version
)
def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
rows = self.db.query(
"""SELECT * FROM events
WHERE aggregate_id = ? AND version > ?
ORDER BY version""",
aggregate_id, from_version
)
return [self._row_to_event(row) for row in rows]
def _row_to_event(self, row) -> Event:
return Event(
id=row["id"],
aggregate_id=row["aggregate_id"],
event_type=row["event_type"],
data=json.loads(row["data"]),
timestamp=row["timestamp"],
version=row["version"]
)
Aggregate Reconstruction
class User:
"""Aggregate reconstructed from events."""
def __init__(self, user_id: str):
self.id = user_id
self.email = None
self.name = None
self.is_active = False
self.version = 0
self._events = []
def load_from_events(self, events: List[Event]):
for event in events:
self._apply(event)
self.version = event.version
def _apply(self, event: Event):
if event.event_type == "user.created":
self.email = event.data["email"]
self.name = event.data["name"]
self.is_active = True
elif event.event_type == "user.email_changed":
self.email = event.data["new_email"]
elif event.event_type == "user.deactivated":
self.is_active = False
def change_email(self, new_email: str):
if not new_email or "@" not in new_email:
raise ValueError("Invalid email")
event = Event(
id=str(uuid.uuid4()),
aggregate_id=self.id,
event_type="user.email_changed",
data={"old_email": self.email, "new_email": new_email},
timestamp=datetime.utcnow(),
version=self.version + 1
)
self._apply(event)
self._events.append(event)
self.version += 1
def get_uncommitted_events(self) -> List[Event]:
events = self._events.copy()
self._events = []
return events
Snapshots
class SnapshotStore:
"""Store aggregate snapshots for performance."""
def __init__(self, event_store: EventStore):
self.event_store = event_store
self.snapshots = {}
def save_snapshot(self, aggregate_id: str, snapshot: dict, version: int):
self.snapshots[aggregate_id] = {
"data": snapshot,
"version": version
}
def get_aggregate(self, aggregate_id: str, factory) -> object:
# Check for snapshot
if aggregate_id in self.snapshots:
snapshot = self.snapshots[aggregate_id]
aggregate = factory(aggregate_id)
aggregate.load_from_snapshot(snapshot["data"])
# Get events after snapshot
events = self.event_store.get_events(
aggregate_id, snapshot["version"]
)
aggregate.load_from_events(events)
return aggregate
# No snapshot, load all events
events = self.event_store.get_events(aggregate_id)
aggregate = factory(aggregate_id)
aggregate.load_from_events(events)
return aggregate
Conclusion
Event sourcing provides complete audit trails and temporal queries. Store all events, reconstruct aggregates by replaying, use snapshots for performance. Combine with CQRS for read optimization.
Resources
- “Event Sourcing” by Martin Fowler
- EventStoreDB Documentation
Comments