Skip to main content
โšก Calmops

Event Sourcing and CQRS: Complete Guide to Event-Driven Architecture

Event Sourcing and CQRS (Command Query Responsibility Segregation) are powerful patterns that change how we think about data persistence and system architecture. Instead of storing the current state, you store a sequence of events that lead to that state.

Understanding Event Sourcing

The Problem with Traditional Persistence

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Traditional CRUD Approach                           โ”‚
โ”‚                                                                 โ”‚
โ”‚  User โ”€โ”€โ–บ POST /orders โ”€โ”€โ–บ UPDATE orders SET status='pending'  โ”‚
โ”‚                                    โ”‚                            โ”‚
โ”‚                                    โ–ผ                            โ”‚
โ”‚                           We only store FINAL state             โ”‚
โ”‚                           All history is LOST!                  โ”‚
โ”‚                                                                 โ”‚
โ”‚  Problems:                                                      โ”‚
โ”‚  โœ— No audit trail                                              โ”‚
โ”‚  โœ— Can't reconstruct past states                               โœ— Can't debug โ”‚
โ”‚  what happened                                    โ”‚
โ”‚  โœ— Can't implement temporal queries                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Event Sourcing Solution

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Event Sourcing Approach                             โ”‚
โ”‚                                                                 โ”‚
โ”‚  User โ”€โ”€โ–บ POST /orders โ”€โ”€โ–บ APPEND OrderCreatedEvent             โ”‚
โ”‚                                    โ”‚                            โ”‚
โ”‚                                    โ–ผ                            โ”‚
โ”‚                           Events Store:                         โ”‚
โ”‚                           [OrderCreatedEvent]                  โ”‚
โ”‚                           [OrderItemAddedEvent]                 โ”‚
โ”‚                           [OrderShippedEvent]                   โ”‚
โ”‚                           [OrderDeliveredEvent]                โ”‚
โ”‚                                                                 โ”‚
โ”‚  โœ“ Complete audit trail                                        โ”‚
โ”‚  โœ“ Can reconstruct any past state                               โ”‚
โ”‚  โœ“ Full event replay for debugging                              โ”‚
โ”‚  โœ“ Temporal queries at any point in time                        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Event Store Structure

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json

@dataclass
class Event:
    event_id: str
    aggregate_id: str
    event_type: str
    version: int
    timestamp: datetime
    payload: dict
    metadata: dict = field(default_factory=dict)

class EventStore:
    def __init__(self, connection):
        self.connection = connection
    
    def append_event(self, event: Event) -> None:
        event_json = {
            "event_id": event.event_id,
            "aggregate_id": event.aggregate_id,
            "event_type": event.event_type,
            "version": event.version,
            "timestamp": event.timestamp.isoformat(),
            "payload": json.dumps(event.payload),
            "metadata": json.dumps(event.metadata)
        }
        
        self.connection.execute("""
            INSERT INTO events 
            (event_id, aggregate_id, event_type, version, 
             timestamp, payload, metadata)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, list(event_json.values()))
    
    def get_events_for_aggregate(
        self, 
        aggregate_id: str, 
        from_version: int = 0
    ) -> list[Event]:
        cursor = self.connection.execute("""
            SELECT * FROM events 
            WHERE aggregate_id = ? AND version > ?
            ORDER BY version ASC
        """, (aggregate_id, from_version))
        
        return [self._row_to_event(row) for row in cursor]

Defining Domain Events

from datetime import datetime
from dataclasses import dataclass
from typing import Optional
import uuid

@dataclass
class OrderCreatedEvent:
    aggregate_id: str
    customer_id: str
    items: list[dict]
    total_amount: float
    timestamp: datetime = field(default_factory=datetime.utcnow)
    event_type: str = "OrderCreated"
    
    def to_event(self) -> Event:
        return Event(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.aggregate_id,
            event_type=self.event_type,
            version=1,
            timestamp=self.timestamp,
            payload={
                "customer_id": self.customer_id,
                "items": self.items,
                "total_amount": self.total_amount
            }
        )

@dataclass
class OrderItemAddedEvent:
    aggregate_id: str
    product_id: str
    quantity: int
    price: float
    version: int
    timestamp: datetime = field(default_factory=datetime.utcnow)
    event_type: str = "OrderItemAdded"

@dataclass
class OrderShippedEvent:
    aggregate_id: str
    tracking_number: str
    carrier: str
    version: int
    timestamp: datetime = field(default_factory=datetime.utcnow)
    event_type: str = "OrderShipped"

Command Query Responsibility Segregation (CQRS)

The Problem with Monolithic Models

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Monolithic Read/Write Model                         โ”‚
โ”‚                                                                 โ”‚
โ”‚                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                             โ”‚
โ”‚                    โ”‚  User Model  โ”‚                             โ”‚
โ”‚                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                             โ”‚
โ”‚                           โ”‚                                     โ”‚
โ”‚              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                        โ”‚
โ”‚              โ–ผ                         โ–ผ                        โ”‚
โ”‚         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                     โ”‚
โ”‚         โ”‚ WRITE  โ”‚               โ”‚ READ   โ”‚                     โ”‚
โ”‚         โ”‚ Model  โ”‚               โ”‚ Model  โ”‚                     โ”‚
โ”‚         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                     โ”‚
โ”‚              โ”‚                         โ”‚                        โ”‚
โ”‚              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                        โ”‚
โ”‚                           โ–ผ                                     โ”‚
โ”‚                  Same database table                            โ”‚
โ”‚                                                                 โ”‚
โ”‚  Problems with complex reads:                                  โ”‚
โ”‚  โœ— Joining 10+ tables for dashboard                             โ”‚
โ”‚  โœ— Aggregating millions of rows                                 โ”‚
โ”‚  โœ— Different read/write performance needs                      โ”‚
โ”‚  โœ— Can't optimize both equally                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

CQRS Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    CQRS Architecture                            โ”‚
โ”‚                                                                 โ”‚
โ”‚   Commands                    Queries                           โ”‚
โ”‚   (Writes)                   (Reads)                            โ”‚
โ”‚       โ”‚                         โ”‚                               โ”‚
โ”‚       โ–ผ                         โ–ผ                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                         โ”‚
โ”‚  โ”‚ Command โ”‚              โ”‚ Query   โ”‚                         โ”‚
โ”‚  โ”‚ Handler โ”‚              โ”‚ Handler โ”‚                         โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜              โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜                         โ”‚
โ”‚       โ”‚                         โ”‚                               โ”‚
โ”‚       โ–ผ                         โ–ผ                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                         โ”‚
โ”‚  โ”‚ Write   โ”‚              โ”‚ Read    โ”‚                         โ”‚
โ”‚  โ”‚ Model   โ”‚              โ”‚ Model   โ”‚                         โ”‚
โ”‚  โ”‚ (Events)โ”‚              โ”‚ (Views) โ”‚                         โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜              โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜                         โ”‚
โ”‚       โ”‚                         โ”‚                               โ”‚
โ”‚       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                              โ”‚
โ”‚                โ–ผ                                                 โ”‚
โ”‚      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                        โ”‚
โ”‚      โ”‚ Event Handlers  โ”‚                                        โ”‚
โ”‚      โ”‚ (Projections)   โ”‚                                        โ”‚
โ”‚      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                        โ”‚
โ”‚               โ–ผ                                                 โ”‚
โ”‚      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                        โ”‚
โ”‚      โ”‚  Read Database  โ”‚                                        โ”‚
โ”‚      โ”‚ (PostgreSQL,    โ”‚                                        โ”‚
โ”‚      โ”‚  MongoDB,       โ”‚                                        โ”‚
โ”‚      โ”‚  Redis, etc.)   โ”‚                                        โ”‚
โ”‚      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Command Handler Implementation

from dataclasses import dataclass
from typing import Callable, Any
import uuid
from datetime import datetime

@dataclass
class CreateOrderCommand:
    customer_id: str
    items: list[dict]
    
@dataclass
class AddItemCommand:
    order_id: str
    product_id: str
    quantity: int

@dataclass
class ShipOrderCommand:
    order_id: str
    tracking_number: str
    carrier: str

class CommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def handle_create_order(self, command: CreateOrderCommand) -> str:
        order_id = str(uuid.uuid4())
        
        total = sum(item["price"] * item["quantity"] for item in command.items)
        
        event = OrderCreatedEvent(
            aggregate_id=order_id,
            customer_id=command.customer_id,
            items=command.items,
            total_amount=total,
            version=1
        ).to_event()
        
        self.event_store.append_event(event)
        
        return order_id
    
    def handle_add_item(self, command: AddItemCommand) -> None:
        events = self.event_store.get_events_for_aggregate(command.order_id)
        
        if not events:
            raise ValueError(f"Order {command.order_id} not found")
        
        current_version = events[-1].version
        
        product_price = self._get_product_price(command.product_id)
        
        event = OrderItemAddedEvent(
            aggregate_id=command.order_id,
            product_id=command.product_id,
            quantity=command.quantity,
            price=product_price,
            version=current_version + 1
        ).to_event()
        
        self.event_store.append_event(event)

Query Handler and Projections

class OrderProjection:
    def __init__(self, read_db_connection):
        self.read_db = read_db_connection
    
    def project_order_created(self, event: Event):
        payload = event.payload
        
        self.read_db.execute("""
            INSERT INTO orders (id, customer_id, total_amount, 
                               status, created_at, version)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            event.aggregate_id,
            payload["customer_id"],
            payload["total_amount"],
            "created",
            event.timestamp,
            event.version
        ))
    
    def project_order_item_added(self, event: Event):
        payload = event.payload
        
        self.read_db.execute("""
            INSERT INTO order_items (id, order_id, product_id, 
                                    quantity, price)
            VALUES (?, ?, ?, ?, ?)
        """, (str(uuid.uuid4()), event.aggregate_id,
              payload["product_id"], payload["quantity"],
              payload["price"]))
        
        self.read_db.execute("""
            UPDATE orders 
            SET total_amount = total_amount + ?,
                version = ?
            WHERE id = ?
        """, (payload["price"] * payload["quantity"],
              event.version, event.aggregate_id))

class QueryHandler:
    def __init__(self, read_db_connection):
        self.read_db = read_db_connection
    
    def get_order_summary(self, order_id: str) -> dict:
        cursor = self.read_db.execute("""
            SELECT * FROM orders WHERE id = ?
        """, (order_id,))
        
        row = cursor.fetchone()
        if not row:
            return None
        
        return {
            "id": row["id"],
            "customer_id": row["customer_id"],
            "total_amount": row["total_amount"],
            "status": row["status"]
        }
    
    def get_customer_orders(self, customer_id: str) -> list[dict]:
        cursor = self.read_db.execute("""
            SELECT * FROM orders 
            WHERE customer_id = ?
            ORDER BY created_at DESC
        """, (customer_id,))
        
        return [dict(row) for row in cursor]

Rebuilding State from Events

Aggregate Reconstruction

class OrderAggregate:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def load_from_events(self, order_id: str) -> 'Order':
        events = self.event_store.get_events_for_aggregate(order_id)
        
        order = Order()
        
        for event in events:
            order.apply(event)
        
        return order

@dataclass
class Order:
    order_id: str = ""
    customer_id: str = ""
    items: list[dict] = field(default_factory=list)
    total_amount: float = 0.0
    status: str = "created"
    version: int = 0
    
    def apply(self, event: Event):
        if event.event_type == "OrderCreated":
            self.order_id = event.aggregate_id
            self.customer_id = event.payload["customer_id"]
            self.items = event.payload["items"]
            self.total_amount = event.payload["total_amount"]
            self.status = "created"
            self.version = event.version
            
        elif event.event_type == "OrderItemAdded":
            self.items.append({
                "product_id": event.payload["product_id"],
                "quantity": event.payload["quantity"],
                "price": event.payload["price"]
            })
            self.total_amount += (event.payload["price"] * 
                                 event.payload["quantity"])
            self.version = event.version
            
        elif event.event_type == "OrderShipped":
            self.status = "shipped"
            self.version = event.version

Snapshotting for Performance

class SnapshotStore:
    def __init__(self, connection):
        self.connection = connection
    
    def save_snapshot(self, aggregate_id: str, snapshot: dict, version: int):
        self.connection.execute("""
            INSERT OR REPLACE INTO snapshots (aggregate_id, snapshot, version)
            VALUES (?, ?, ?)
        """, (aggregate_id, json.dumps(snapshot), version))
    
    def get_snapshot(self, aggregate_id: str) -> tuple[dict, int] | None:
        cursor = self.connection.execute("""
            SELECT snapshot, version FROM snapshots
            WHERE aggregate_id = ?
            ORDER BY version DESC
            LIMIT 1
        """, (aggregate_id,))
        
        row = cursor.fetchone()
        if row:
            return json.loads(row["snapshot"]), row["version"]
        return None

class SnapshottingOrderAggregate:
    def __init__(self, event_store: EventStore, snapshot_store: SnapshotStore):
        self.event_store = event_store
        self.snapshot_store = snapshot_store
    
    def load_aggregate(self, order_id: str, snapshot_threshold: int = 100):
        snapshot, version = self.snapshot_store.get_snapshot(order_id)
        
        if snapshot:
            order = Order(**snapshot)
            events = self.event_store.get_events_for_aggregate(
                order_id, from_version=version
            )
        else:
            order = Order()
            events = self.event_store.get_events_for_aggregate(order_id)
        
        for event in events:
            order.apply(event)
        
        if len(events) > snapshot_threshold:
            self.snapshot_store.save_snapshot(
                order_id,
                {
                    "order_id": order.order_id,
                    "customer_id": order.customer_id,
                    "items": order.items,
                    "total_amount": order.total_amount,
                    "status": order.status
                },
                order.version
            )
        
        return order

Event Handler Patterns

Projecting to Multiple Views

class EventRouter:
    def __init__(self):
        self.handlers: dict[str, list[Callable]] = {}
    
    def register(self, event_type: str, handler: Callable):
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
    
    def route(self, event: Event):
        handlers = self.handlers.get(event.event_type, [])
        
        for handler in handlers:
            try:
                handler.handle(event)
            except Exception as e:
                logger.error(f"Handler failed for {event.event_type}: {e}")

class EventProcessor:
    def __init__(self, event_store: EventStore, router: EventRouter):
        self.event_store = event_store
        self.router = router
        self.checkpoint_store = CheckpointStore()
    
    def process_events(self, last_checkpoint: int = 0):
        cursor = self.event_store.get_events_after(last_checkpoint)
        
        for event in cursor:
            self.router.route(event)
            self.checkpoint_store.save(event.event_id)

Read Model Projections

class OrderReadModelProjector:
    def __init__(self, read_db):
        self.read_db = read_db
    
    def handle_order_created(self, event: Event):
        self.read_db.execute("""
            INSERT INTO order_summaries 
            (order_id, customer_id, total, status, created_at)
            VALUES (?, ?, ?, ?, ?)
        """, (event.aggregate_id, event.payload["customer_id"],
              event.payload["total_amount"], "created", event.timestamp))
    
    def handle_order_shipped(self, event: Event):
        self.read_db.execute("""
            UPDATE order_summaries
            SET status = 'shipped',
                tracking_number = ?,
                carrier = ?,
                shipped_at = ?
            WHERE order_id = ?
        """, (event.payload["tracking_number"], event.payload["carrier"],
              event.timestamp, event.aggregate_id))

class CustomerActivityProjector:
    def __init__(self, read_db):
        self.read_db = read_db
    
    def handle_order_created(self, event: Event):
        customer_id = event.payload["customer_id"]
        
        self.read_db.execute("""
            UPDATE customer_activity
            SET total_orders = total_orders + 1,
                total_spent = total_spent + ?
            WHERE customer_id = ?
        """, (event.payload["total_amount"], customer_id))
        
        self.read_db.execute("""
            INSERT INTO customer_order_history
            (customer_id, order_id, timestamp)
            VALUES (?, ?, ?)
        """, (customer_id, event.aggregate_id, event.timestamp))

Versioning and Migrations

Event Schema Versioning

@dataclass
class EventWithMigration:
    original_event: Event
    migrated_payload: dict
    
    def migrate_if_needed(event: Event) -> EventWithMigration:
        schema_version = event.metadata.get("schema_version", 1)
        
        if schema_version == 1:
            migrated = migrate_v1_to_v2(event.payload)
            return EventWithMigration(
                original_event=event,
                migrated_payload=migrated
            )
        
        return EventWithMigration(
            original_event=event,
            migrated_payload=event.payload
        )

def migrate_v1_to_v2(payload: dict) -> dict:
    migrated = payload.copy()
    
    if "price" in migrated and "amount" not in migrated:
        migrated["amount"] = migrated["price"]
    
    if "items" in migrated:
        migrated["line_items"] = [
            {"product_id": item["product_id"],
             "quantity": item["quantity"],
             "unit_amount": item.get("price", item.get("amount", 0))}
            for item in migrated["items"]
        ]
        del migrated["items"]
    
    return migrated

Upcasters

class Upcaster:
    def upcast(self, event: Event) -> Event:
        raise NotImplementedError

class OrderCreatedUpcaster(Upcaster):
    def upcast(self, event: Event) -> Event:
        if event.event_type == "OrderCreated":
            payload = event.payload
            
            if "customer_email" not in payload:
                payload["customer_email"] = "[email protected]"
            
            if "currency" not in payload:
                payload["currency"] = "USD"
        
        return event

class UpcasterChain:
    def __init__(self):
        self.upcasters: list[Upcaster] = []
    
    def add_upcaster(self, upcaster: Upcaster):
        self.upcasters.append(upcaster)
    
    def process(self, event: Event) -> Event:
        for upcaster in self.upcasters:
            event = upcaster.upcast(event)
        return event

Best Practices

Good Patterns

GOOD_PATTERNS = {
    "immutable_events": """
# Events should be immutable once created
# Never modify events after appending

โœ… Good:
event = OrderCreatedEvent(order_id="123", status="created")
# Store as-is, never change

โŒ Bad:
event.payload["status"] = "modified"  # NEVER DO THIS
""",
    
    "eventual_consistency": """
# Accept that read models lag behind write model
# Design UI for eventual consistency

โœ… Good:
- Show "processing" indicator while projecting
- Use WebSocket updates when projection completes
- Implement optimistic UI updates

โŒ Bad:
- Blocking UI waiting for all projections
- Synchronous read-after-write on different models
""",
    
    "idempotency": """
# Handle duplicate events gracefully

โœ… Good:
def handle_order_created(event: Event):
    if order_exists(event.aggregate_id):
        return  # Idempotent: already handled
    create_order(event.payload)
""",
    
    "snapshotting": """
# Regularly snapshot aggregates to improve performance

โœ… Good:
- Snapshot every N events (e.g., 100)
- Store snapshots in separate store
- Rebuild from snapshot + delta events

โŒ Bad:
- Loading all events for every aggregate load
- No performance consideration for large event stores
"""
}

Bad Patterns to Avoid

BAD_PATTERNS = {
    "query_from_event_store": """
# Don't query current state from event store for reads

โŒ Bad:
def get_order_summary(order_id):
    events = event_store.get_all(order_id)
    order = reconstruct_order(events)
    return format_summary(order)

โœ… Good:
def get_order_summary(order_id):
    return read_db.query(
        "SELECT * FROM order_summaries WHERE id = ?", 
        order_id
    )
""",
    
    "complex_events": """
# Keep events small and focused

โŒ Bad:
class MassiveEvent:
    # 500 fields covering everything
    pass

โœ… Good:
class OrderCreatedEvent: pass
class PaymentReceivedEvent: pass
class OrderShippedEvent: pass
# Small, focused, replayable events
""",
    
    "synchronous_projections": """
# Don't block command completion on all projections

โŒ Bad:
def handle_create_order(cmd):
    append_event(event)
    # Wait for ALL projections to complete
    wait_for_all_projections()
    return order_id

โœ… Good:
def handle_create_order(cmd):
    append_event(event)
    return order_id  # Return immediately
    # Projections run asynchronously
"""
}

Scaling Considerations

Partitioning the Event Store

-- Partition events by time for scaling
CREATE TABLE events (
    id UUID PRIMARY KEY,
    aggregate_id UUID NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    version INT NOT NULL,
    payload JSONB NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (timestamp);

-- Create monthly partitions
CREATE TABLE events_2026_01 PARTITION OF events
    FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');

CREATE TABLE events_2026_02 PARTITION OF events
    FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

Projections Scaling

class ParallelProjectionProcessor:
    def __init__(self, num_workers: int = 4):
        self.queue = Queue()
        self.workers = [
            ProjectionWorker(self.queue) 
            for _ in range(num_workers)
        ]
    
    def start(self):
        for worker in self.workers:
            worker.start()
    
    def process_async(self, event: Event):
        self.queue.put(event)

Summary

Event Sourcing and CQRS are powerful patterns for building complex, scalable systems:

  • Event Sourcing stores all state changes as immutable events, providing complete audit trail and the ability to reconstruct any past state
  • CQRS separates read and write models, allowing optimization of each for their specific use cases
  • Projections transform events into read-optimized views
  • Snapshotting improves performance for aggregates with many events
  • Versioning ensures backward compatibility as your domain evolves

These patterns are particularly valuable for:

  • Systems requiring complete audit history
  • Complex business domains with rich state transitions
  • Systems needing multiple, different read representations
  • Event-driven microservices architectures

Comments