Event Sourcing and CQRS (Command Query Responsibility Segregation) are powerful patterns that change how we think about data persistence and system architecture. Instead of storing the current state, you store a sequence of events that lead to that state.
Understanding Event Sourcing
The Problem with Traditional Persistence
┌─────────────────────────────────────────────────────────────────┐
│ Traditional CRUD Approach │
│ │
│ User ──► POST /orders ──► UPDATE orders SET status='pending' │
│ │ │
│ ▼ │
│ We only store FINAL state │
│ All history is LOST! │
│ │
│ Problems: │
│ ✗ No audit trail │
│ ✗ Can't reconstruct past states ✗ Can't debug │
│ what happened │
│ ✗ Can't implement temporal queries │
└─────────────────────────────────────────────────────────────────┘
Event Sourcing Solution
┌─────────────────────────────────────────────────────────────────┐
│ Event Sourcing Approach │
│ │
│ User ──► POST /orders ──► APPEND OrderCreatedEvent │
│ │ │
│ ▼ │
│ Events Store: │
│ [OrderCreatedEvent] │
│ [OrderItemAddedEvent] │
│ [OrderShippedEvent] │
│ [OrderDeliveredEvent] │
│ │
│ ✓ Complete audit trail │
│ ✓ Can reconstruct any past state │
│ ✓ Full event replay for debugging │
│ ✓ Temporal queries at any point in time │
└─────────────────────────────────────────────────────────────────┘
Event Store Structure
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json
@dataclass
class Event:
event_id: str
aggregate_id: str
event_type: str
version: int
timestamp: datetime
payload: dict
metadata: dict = field(default_factory=dict)
class EventStore:
def __init__(self, connection):
self.connection = connection
def append_event(self, event: Event) -> None:
event_json = {
"event_id": event.event_id,
"aggregate_id": event.aggregate_id,
"event_type": event.event_type,
"version": event.version,
"timestamp": event.timestamp.isoformat(),
"payload": json.dumps(event.payload),
"metadata": json.dumps(event.metadata)
}
self.connection.execute("""
INSERT INTO events
(event_id, aggregate_id, event_type, version,
timestamp, payload, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", list(event_json.values()))
def get_events_for_aggregate(
self,
aggregate_id: str,
from_version: int = 0
) -> list[Event]:
cursor = self.connection.execute("""
SELECT * FROM events
WHERE aggregate_id = ? AND version > ?
ORDER BY version ASC
""", (aggregate_id, from_version))
return [self._row_to_event(row) for row in cursor]
Defining Domain Events
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
import uuid
@dataclass
class OrderCreatedEvent:
aggregate_id: str
customer_id: str
items: list[dict]
total_amount: float
timestamp: datetime = field(default_factory=datetime.utcnow)
event_type: str = "OrderCreated"
def to_event(self) -> Event:
return Event(
event_id=str(uuid.uuid4()),
aggregate_id=self.aggregate_id,
event_type=self.event_type,
version=1,
timestamp=self.timestamp,
payload={
"customer_id": self.customer_id,
"items": self.items,
"total_amount": self.total_amount
}
)
@dataclass
class OrderItemAddedEvent:
aggregate_id: str
product_id: str
quantity: int
price: float
version: int
timestamp: datetime = field(default_factory=datetime.utcnow)
event_type: str = "OrderItemAdded"
@dataclass
class OrderShippedEvent:
aggregate_id: str
tracking_number: str
carrier: str
version: int
timestamp: datetime = field(default_factory=datetime.utcnow)
event_type: str = "OrderShipped"
Command Query Responsibility Segregation (CQRS)
The Problem with Monolithic Models
┌─────────────────────────────────────────────────────────────────┐
│ Monolithic Read/Write Model │
│ │
│ ┌──────────────┐ │
│ │ User Model │ │
│ └──────┬───────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ ▼ ▼ │
│ ┌────────┐ ┌────────┐ │
│ │ WRITE │ │ READ │ │
│ │ Model │ │ Model │ │
│ └────────┘ └────────┘ │
│ │ │ │
│ └────────────┬────────────┘ │
│ ▼ │
│ Same database table │
│ │
│ Problems with complex reads: │
│ ✗ Joining 10+ tables for dashboard │
│ ✗ Aggregating millions of rows │
│ ✗ Different read/write performance needs │
│ ✗ Can't optimize both equally │
└─────────────────────────────────────────────────────────────────┘
CQRS Architecture
┌─────────────────────────────────────────────────────────────────┐
│ CQRS Architecture │
│ │
│ Commands Queries │
│ (Writes) (Reads) │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │ Command │ │ Query │ │
│ │ Handler │ │ Handler │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │ Write │ │ Read │ │
│ │ Model │ │ Model │ │
│ │ (Events)│ │ (Views) │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ └────────┬────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Event Handlers │ │
│ │ (Projections) │ │
│ └────────┬────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Read Database │ │
│ │ (PostgreSQL, │ │
│ │ MongoDB, │ │
│ │ Redis, etc.) │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Command Handler Implementation
from dataclasses import dataclass
from typing import Callable, Any
import uuid
from datetime import datetime
@dataclass
class CreateOrderCommand:
customer_id: str
items: list[dict]
@dataclass
class AddItemCommand:
order_id: str
product_id: str
quantity: int
@dataclass
class ShipOrderCommand:
order_id: str
tracking_number: str
carrier: str
class CommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def handle_create_order(self, command: CreateOrderCommand) -> str:
order_id = str(uuid.uuid4())
total = sum(item["price"] * item["quantity"] for item in command.items)
event = OrderCreatedEvent(
aggregate_id=order_id,
customer_id=command.customer_id,
items=command.items,
total_amount=total,
version=1
).to_event()
self.event_store.append_event(event)
return order_id
def handle_add_item(self, command: AddItemCommand) -> None:
events = self.event_store.get_events_for_aggregate(command.order_id)
if not events:
raise ValueError(f"Order {command.order_id} not found")
current_version = events[-1].version
product_price = self._get_product_price(command.product_id)
event = OrderItemAddedEvent(
aggregate_id=command.order_id,
product_id=command.product_id,
quantity=command.quantity,
price=product_price,
version=current_version + 1
).to_event()
self.event_store.append_event(event)
Query Handler and Projections
class OrderProjection:
def __init__(self, read_db_connection):
self.read_db = read_db_connection
def project_order_created(self, event: Event):
payload = event.payload
self.read_db.execute("""
INSERT INTO orders (id, customer_id, total_amount,
status, created_at, version)
VALUES (?, ?, ?, ?, ?, ?)
""", (
event.aggregate_id,
payload["customer_id"],
payload["total_amount"],
"created",
event.timestamp,
event.version
))
def project_order_item_added(self, event: Event):
payload = event.payload
self.read_db.execute("""
INSERT INTO order_items (id, order_id, product_id,
quantity, price)
VALUES (?, ?, ?, ?, ?)
""", (str(uuid.uuid4()), event.aggregate_id,
payload["product_id"], payload["quantity"],
payload["price"]))
self.read_db.execute("""
UPDATE orders
SET total_amount = total_amount + ?,
version = ?
WHERE id = ?
""", (payload["price"] * payload["quantity"],
event.version, event.aggregate_id))
class QueryHandler:
def __init__(self, read_db_connection):
self.read_db = read_db_connection
def get_order_summary(self, order_id: str) -> dict:
cursor = self.read_db.execute("""
SELECT * FROM orders WHERE id = ?
""", (order_id,))
row = cursor.fetchone()
if not row:
return None
return {
"id": row["id"],
"customer_id": row["customer_id"],
"total_amount": row["total_amount"],
"status": row["status"]
}
def get_customer_orders(self, customer_id: str) -> list[dict]:
cursor = self.read_db.execute("""
SELECT * FROM orders
WHERE customer_id = ?
ORDER BY created_at DESC
""", (customer_id,))
return [dict(row) for row in cursor]
Rebuilding State from Events
Aggregate Reconstruction
class OrderAggregate:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def load_from_events(self, order_id: str) -> 'Order':
events = self.event_store.get_events_for_aggregate(order_id)
order = Order()
for event in events:
order.apply(event)
return order
@dataclass
class Order:
order_id: str = ""
customer_id: str = ""
items: list[dict] = field(default_factory=list)
total_amount: float = 0.0
status: str = "created"
version: int = 0
def apply(self, event: Event):
if event.event_type == "OrderCreated":
self.order_id = event.aggregate_id
self.customer_id = event.payload["customer_id"]
self.items = event.payload["items"]
self.total_amount = event.payload["total_amount"]
self.status = "created"
self.version = event.version
elif event.event_type == "OrderItemAdded":
self.items.append({
"product_id": event.payload["product_id"],
"quantity": event.payload["quantity"],
"price": event.payload["price"]
})
self.total_amount += (event.payload["price"] *
event.payload["quantity"])
self.version = event.version
elif event.event_type == "OrderShipped":
self.status = "shipped"
self.version = event.version
Snapshotting for Performance
class SnapshotStore:
def __init__(self, connection):
self.connection = connection
def save_snapshot(self, aggregate_id: str, snapshot: dict, version: int):
self.connection.execute("""
INSERT OR REPLACE INTO snapshots (aggregate_id, snapshot, version)
VALUES (?, ?, ?)
""", (aggregate_id, json.dumps(snapshot), version))
def get_snapshot(self, aggregate_id: str) -> tuple[dict, int] | None:
cursor = self.connection.execute("""
SELECT snapshot, version FROM snapshots
WHERE aggregate_id = ?
ORDER BY version DESC
LIMIT 1
""", (aggregate_id,))
row = cursor.fetchone()
if row:
return json.loads(row["snapshot"]), row["version"]
return None
class SnapshottingOrderAggregate:
def __init__(self, event_store: EventStore, snapshot_store: SnapshotStore):
self.event_store = event_store
self.snapshot_store = snapshot_store
def load_aggregate(self, order_id: str, snapshot_threshold: int = 100):
snapshot, version = self.snapshot_store.get_snapshot(order_id)
if snapshot:
order = Order(**snapshot)
events = self.event_store.get_events_for_aggregate(
order_id, from_version=version
)
else:
order = Order()
events = self.event_store.get_events_for_aggregate(order_id)
for event in events:
order.apply(event)
if len(events) > snapshot_threshold:
self.snapshot_store.save_snapshot(
order_id,
{
"order_id": order.order_id,
"customer_id": order.customer_id,
"items": order.items,
"total_amount": order.total_amount,
"status": order.status
},
order.version
)
return order
Event Handler Patterns
Projecting to Multiple Views
class EventRouter:
def __init__(self):
self.handlers: dict[str, list[Callable]] = {}
def register(self, event_type: str, handler: Callable):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def route(self, event: Event):
handlers = self.handlers.get(event.event_type, [])
for handler in handlers:
try:
handler.handle(event)
except Exception as e:
logger.error(f"Handler failed for {event.event_type}: {e}")
class EventProcessor:
def __init__(self, event_store: EventStore, router: EventRouter):
self.event_store = event_store
self.router = router
self.checkpoint_store = CheckpointStore()
def process_events(self, last_checkpoint: int = 0):
cursor = self.event_store.get_events_after(last_checkpoint)
for event in cursor:
self.router.route(event)
self.checkpoint_store.save(event.event_id)
Read Model Projections
class OrderReadModelProjector:
def __init__(self, read_db):
self.read_db = read_db
def handle_order_created(self, event: Event):
self.read_db.execute("""
INSERT INTO order_summaries
(order_id, customer_id, total, status, created_at)
VALUES (?, ?, ?, ?, ?)
""", (event.aggregate_id, event.payload["customer_id"],
event.payload["total_amount"], "created", event.timestamp))
def handle_order_shipped(self, event: Event):
self.read_db.execute("""
UPDATE order_summaries
SET status = 'shipped',
tracking_number = ?,
carrier = ?,
shipped_at = ?
WHERE order_id = ?
""", (event.payload["tracking_number"], event.payload["carrier"],
event.timestamp, event.aggregate_id))
class CustomerActivityProjector:
def __init__(self, read_db):
self.read_db = read_db
def handle_order_created(self, event: Event):
customer_id = event.payload["customer_id"]
self.read_db.execute("""
UPDATE customer_activity
SET total_orders = total_orders + 1,
total_spent = total_spent + ?
WHERE customer_id = ?
""", (event.payload["total_amount"], customer_id))
self.read_db.execute("""
INSERT INTO customer_order_history
(customer_id, order_id, timestamp)
VALUES (?, ?, ?)
""", (customer_id, event.aggregate_id, event.timestamp))
Versioning and Migrations
Event Schema Versioning
@dataclass
class EventWithMigration:
original_event: Event
migrated_payload: dict
def migrate_if_needed(event: Event) -> EventWithMigration:
schema_version = event.metadata.get("schema_version", 1)
if schema_version == 1:
migrated = migrate_v1_to_v2(event.payload)
return EventWithMigration(
original_event=event,
migrated_payload=migrated
)
return EventWithMigration(
original_event=event,
migrated_payload=event.payload
)
def migrate_v1_to_v2(payload: dict) -> dict:
migrated = payload.copy()
if "price" in migrated and "amount" not in migrated:
migrated["amount"] = migrated["price"]
if "items" in migrated:
migrated["line_items"] = [
{"product_id": item["product_id"],
"quantity": item["quantity"],
"unit_amount": item.get("price", item.get("amount", 0))}
for item in migrated["items"]
]
del migrated["items"]
return migrated
Upcasters
class Upcaster:
def upcast(self, event: Event) -> Event:
raise NotImplementedError
class OrderCreatedUpcaster(Upcaster):
def upcast(self, event: Event) -> Event:
if event.event_type == "OrderCreated":
payload = event.payload
if "customer_email" not in payload:
payload["customer_email"] = "[email protected]"
if "currency" not in payload:
payload["currency"] = "USD"
return event
class UpcasterChain:
def __init__(self):
self.upcasters: list[Upcaster] = []
def add_upcaster(self, upcaster: Upcaster):
self.upcasters.append(upcaster)
def process(self, event: Event) -> Event:
for upcaster in self.upcasters:
event = upcaster.upcast(event)
return event
Best Practices
Good Patterns
GOOD_PATTERNS = {
"immutable_events": """
# Events should be immutable once created
# Never modify events after appending
✅ Good:
event = OrderCreatedEvent(order_id="123", status="created")
# Store as-is, never change
❌ Bad:
event.payload["status"] = "modified" # NEVER DO THIS
""",
"eventual_consistency": """
# Accept that read models lag behind write model
# Design UI for eventual consistency
✅ Good:
- Show "processing" indicator while projecting
- Use WebSocket updates when projection completes
- Implement optimistic UI updates
❌ Bad:
- Blocking UI waiting for all projections
- Synchronous read-after-write on different models
""",
"idempotency": """
# Handle duplicate events gracefully
✅ Good:
def handle_order_created(event: Event):
if order_exists(event.aggregate_id):
return # Idempotent: already handled
create_order(event.payload)
""",
"snapshotting": """
# Regularly snapshot aggregates to improve performance
✅ Good:
- Snapshot every N events (e.g., 100)
- Store snapshots in separate store
- Rebuild from snapshot + delta events
❌ Bad:
- Loading all events for every aggregate load
- No performance consideration for large event stores
"""
}
Bad Patterns to Avoid
BAD_PATTERNS = {
"query_from_event_store": """
# Don't query current state from event store for reads
❌ Bad:
def get_order_summary(order_id):
events = event_store.get_all(order_id)
order = reconstruct_order(events)
return format_summary(order)
✅ Good:
def get_order_summary(order_id):
return read_db.query(
"SELECT * FROM order_summaries WHERE id = ?",
order_id
)
""",
"complex_events": """
# Keep events small and focused
❌ Bad:
class MassiveEvent:
# 500 fields covering everything
pass
✅ Good:
class OrderCreatedEvent: pass
class PaymentReceivedEvent: pass
class OrderShippedEvent: pass
# Small, focused, replayable events
""",
"synchronous_projections": """
# Don't block command completion on all projections
❌ Bad:
def handle_create_order(cmd):
append_event(event)
# Wait for ALL projections to complete
wait_for_all_projections()
return order_id
✅ Good:
def handle_create_order(cmd):
append_event(event)
return order_id # Return immediately
# Projections run asynchronously
"""
}
Scaling Considerations
Partitioning the Event Store
-- Partition events by time for scaling
CREATE TABLE events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
version INT NOT NULL,
payload JSONB NOT NULL,
timestamp TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (timestamp);
-- Create monthly partitions
CREATE TABLE events_2026_01 PARTITION OF events
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE events_2026_02 PARTITION OF events
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
Projections Scaling
class ParallelProjectionProcessor:
def __init__(self, num_workers: int = 4):
self.queue = Queue()
self.workers = [
ProjectionWorker(self.queue)
for _ in range(num_workers)
]
def start(self):
for worker in self.workers:
worker.start()
def process_async(self, event: Event):
self.queue.put(event)
Related Articles
Summary
Event Sourcing and CQRS are powerful patterns for building complex, scalable systems:
- Event Sourcing stores all state changes as immutable events, providing complete audit trail and the ability to reconstruct any past state
- CQRS separates read and write models, allowing optimization of each for their specific use cases
- Projections transform events into read-optimized views
- Snapshotting improves performance for aggregates with many events
- Versioning ensures backward compatibility as your domain evolves
These patterns are particularly valuable for:
- Systems requiring complete audit history
- Complex business domains with rich state transitions
- Systems needing multiple, different read representations
- Event-driven microservices architectures
Comments