Skip to main content
โšก Calmops

Event Sourcing: Building Systems Around State Changes

Introduction

Event sourcing stores all state changes as an immutable event sequence, enabling audit trails, temporal queries, and event replay. This guide covers event sourcing patterns and implementation.

Event Store

from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any
import json

@dataclass
class Event:
    id: str
    aggregate_id: str
    event_type: str
    data: Dict[str, Any]
    timestamp: datetime
    version: int

class EventStore:
    """Store and retrieve events."""
    
    def __init__(self, connection):
        self.db = connection
    
    def append(self, aggregate_id: str, events: List[Event]):
        for event in events:
            self.db.execute(
                """INSERT INTO events
                   (id, aggregate_id, event_type, data, timestamp, version)
                   VALUES (?, ?, ?, ?, ?, ?)""",
                event.id, aggregate_id, event.event_type,
                json.dumps(event.data), event.timestamp, event.version
            )
    
    def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
        rows = self.db.query(
            """SELECT * FROM events
               WHERE aggregate_id = ? AND version > ?
               ORDER BY version""",
            aggregate_id, from_version
        )
        return [self._row_to_event(row) for row in rows]
    
    def _row_to_event(self, row) -> Event:
        return Event(
            id=row["id"],
            aggregate_id=row["aggregate_id"],
            event_type=row["event_type"],
            data=json.loads(row["data"]),
            timestamp=row["timestamp"],
            version=row["version"]
        )

Aggregate Reconstruction

class User:
    """Aggregate reconstructed from events."""
    
    def __init__(self, user_id: str):
        self.id = user_id
        self.email = None
        self.name = None
        self.is_active = False
        self.version = 0
        self._events = []
    
    def load_from_events(self, events: List[Event]):
        for event in events:
            self._apply(event)
            self.version = event.version
    
    def _apply(self, event: Event):
        if event.event_type == "user.created":
            self.email = event.data["email"]
            self.name = event.data["name"]
            self.is_active = True
        elif event.event_type == "user.email_changed":
            self.email = event.data["new_email"]
        elif event.event_type == "user.deactivated":
            self.is_active = False
    
    def change_email(self, new_email: str):
        if not new_email or "@" not in new_email:
            raise ValueError("Invalid email")
        
        event = Event(
            id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type="user.email_changed",
            data={"old_email": self.email, "new_email": new_email},
            timestamp=datetime.utcnow(),
            version=self.version + 1
        )
        self._apply(event)
        self._events.append(event)
        self.version += 1
    
    def get_uncommitted_events(self) -> List[Event]:
        events = self._events.copy()
        self._events = []
        return events

Snapshots

class SnapshotStore:
    """Store aggregate snapshots for performance."""
    
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.snapshots = {}
    
    def save_snapshot(self, aggregate_id: str, snapshot: dict, version: int):
        self.snapshots[aggregate_id] = {
            "data": snapshot,
            "version": version
        }
    
    def get_aggregate(self, aggregate_id: str, factory) -> object:
        # Check for snapshot
        if aggregate_id in self.snapshots:
            snapshot = self.snapshots[aggregate_id]
            aggregate = factory(aggregate_id)
            aggregate.load_from_snapshot(snapshot["data"])
            
            # Get events after snapshot
            events = self.event_store.get_events(
                aggregate_id, snapshot["version"]
            )
            aggregate.load_from_events(events)
            return aggregate
        
        # No snapshot, load all events
        events = self.event_store.get_events(aggregate_id)
        aggregate = factory(aggregate_id)
        aggregate.load_from_events(events)
        return aggregate

Conclusion

Event sourcing provides complete audit trails and temporal queries. Store all events, reconstruct aggregates by replaying, use snapshots for performance. Combine with CQRS for read optimization.

Resources

  • “Event Sourcing” by Martin Fowler
  • EventStoreDB Documentation

Comments