Event Sourcing and CQRS (Command Query Responsibility Segregation) are powerful patterns that change how we think about data persistence and system architecture. Instead of storing the current state, you store a sequence of events that lead to that state.
Understanding Event Sourcing
The Problem with Traditional Persistence
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Traditional CRUD Approach โ
โ โ
โ User โโโบ POST /orders โโโบ UPDATE orders SET status='pending' โ
โ โ โ
โ โผ โ
โ We only store FINAL state โ
โ All history is LOST! โ
โ โ
โ Problems: โ
โ โ No audit trail โ
โ โ Can't reconstruct past states โ Can't debug โ
โ what happened โ
โ โ Can't implement temporal queries โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Event Sourcing Solution
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Event Sourcing Approach โ
โ โ
โ User โโโบ POST /orders โโโบ APPEND OrderCreatedEvent โ
โ โ โ
โ โผ โ
โ Events Store: โ
โ [OrderCreatedEvent] โ
โ [OrderItemAddedEvent] โ
โ [OrderShippedEvent] โ
โ [OrderDeliveredEvent] โ
โ โ
โ โ Complete audit trail โ
โ โ Can reconstruct any past state โ
โ โ Full event replay for debugging โ
โ โ Temporal queries at any point in time โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Event Store Structure
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json
@dataclass
class Event:
event_id: str
aggregate_id: str
event_type: str
version: int
timestamp: datetime
payload: dict
metadata: dict = field(default_factory=dict)
class EventStore:
def __init__(self, connection):
self.connection = connection
def append_event(self, event: Event) -> None:
event_json = {
"event_id": event.event_id,
"aggregate_id": event.aggregate_id,
"event_type": event.event_type,
"version": event.version,
"timestamp": event.timestamp.isoformat(),
"payload": json.dumps(event.payload),
"metadata": json.dumps(event.metadata)
}
self.connection.execute("""
INSERT INTO events
(event_id, aggregate_id, event_type, version,
timestamp, payload, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", list(event_json.values()))
def get_events_for_aggregate(
self,
aggregate_id: str,
from_version: int = 0
) -> list[Event]:
cursor = self.connection.execute("""
SELECT * FROM events
WHERE aggregate_id = ? AND version > ?
ORDER BY version ASC
""", (aggregate_id, from_version))
return [self._row_to_event(row) for row in cursor]
Defining Domain Events
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
import uuid
@dataclass
class OrderCreatedEvent:
aggregate_id: str
customer_id: str
items: list[dict]
total_amount: float
timestamp: datetime = field(default_factory=datetime.utcnow)
event_type: str = "OrderCreated"
def to_event(self) -> Event:
return Event(
event_id=str(uuid.uuid4()),
aggregate_id=self.aggregate_id,
event_type=self.event_type,
version=1,
timestamp=self.timestamp,
payload={
"customer_id": self.customer_id,
"items": self.items,
"total_amount": self.total_amount
}
)
@dataclass
class OrderItemAddedEvent:
aggregate_id: str
product_id: str
quantity: int
price: float
version: int
timestamp: datetime = field(default_factory=datetime.utcnow)
event_type: str = "OrderItemAdded"
@dataclass
class OrderShippedEvent:
aggregate_id: str
tracking_number: str
carrier: str
version: int
timestamp: datetime = field(default_factory=datetime.utcnow)
event_type: str = "OrderShipped"
Command Query Responsibility Segregation (CQRS)
The Problem with Monolithic Models
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Monolithic Read/Write Model โ
โ โ
โ โโโโโโโโโโโโโโโโ โ
โ โ User Model โ โ
โ โโโโโโโโฌโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโดโโโโโโโโโโโโโ โ
โ โผ โผ โ
โ โโโโโโโโโโ โโโโโโโโโโ โ
โ โ WRITE โ โ READ โ โ
โ โ Model โ โ Model โ โ
โ โโโโโโโโโโ โโโโโโโโโโ โ
โ โ โ โ
โ โโโโโโโโโโโโโโฌโโโโโโโโโโโโโ โ
โ โผ โ
โ Same database table โ
โ โ
โ Problems with complex reads: โ
โ โ Joining 10+ tables for dashboard โ
โ โ Aggregating millions of rows โ
โ โ Different read/write performance needs โ
โ โ Can't optimize both equally โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
CQRS Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CQRS Architecture โ
โ โ
โ Commands Queries โ
โ (Writes) (Reads) โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Command โ โ Query โ โ
โ โ Handler โ โ Handler โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Write โ โ Read โ โ
โ โ Model โ โ Model โ โ
โ โ (Events)โ โ (Views) โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ
โ โโโโโโโโโโฌโโโโโโโโโโโโโโโโโ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ Event Handlers โ โ
โ โ (Projections) โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ Read Database โ โ
โ โ (PostgreSQL, โ โ
โ โ MongoDB, โ โ
โ โ Redis, etc.) โ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Command Handler Implementation
from dataclasses import dataclass
from typing import Callable, Any
import uuid
from datetime import datetime
@dataclass
class CreateOrderCommand:
customer_id: str
items: list[dict]
@dataclass
class AddItemCommand:
order_id: str
product_id: str
quantity: int
@dataclass
class ShipOrderCommand:
order_id: str
tracking_number: str
carrier: str
class CommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def handle_create_order(self, command: CreateOrderCommand) -> str:
order_id = str(uuid.uuid4())
total = sum(item["price"] * item["quantity"] for item in command.items)
event = OrderCreatedEvent(
aggregate_id=order_id,
customer_id=command.customer_id,
items=command.items,
total_amount=total,
version=1
).to_event()
self.event_store.append_event(event)
return order_id
def handle_add_item(self, command: AddItemCommand) -> None:
events = self.event_store.get_events_for_aggregate(command.order_id)
if not events:
raise ValueError(f"Order {command.order_id} not found")
current_version = events[-1].version
product_price = self._get_product_price(command.product_id)
event = OrderItemAddedEvent(
aggregate_id=command.order_id,
product_id=command.product_id,
quantity=command.quantity,
price=product_price,
version=current_version + 1
).to_event()
self.event_store.append_event(event)
Query Handler and Projections
class OrderProjection:
def __init__(self, read_db_connection):
self.read_db = read_db_connection
def project_order_created(self, event: Event):
payload = event.payload
self.read_db.execute("""
INSERT INTO orders (id, customer_id, total_amount,
status, created_at, version)
VALUES (?, ?, ?, ?, ?, ?)
""", (
event.aggregate_id,
payload["customer_id"],
payload["total_amount"],
"created",
event.timestamp,
event.version
))
def project_order_item_added(self, event: Event):
payload = event.payload
self.read_db.execute("""
INSERT INTO order_items (id, order_id, product_id,
quantity, price)
VALUES (?, ?, ?, ?, ?)
""", (str(uuid.uuid4()), event.aggregate_id,
payload["product_id"], payload["quantity"],
payload["price"]))
self.read_db.execute("""
UPDATE orders
SET total_amount = total_amount + ?,
version = ?
WHERE id = ?
""", (payload["price"] * payload["quantity"],
event.version, event.aggregate_id))
class QueryHandler:
def __init__(self, read_db_connection):
self.read_db = read_db_connection
def get_order_summary(self, order_id: str) -> dict:
cursor = self.read_db.execute("""
SELECT * FROM orders WHERE id = ?
""", (order_id,))
row = cursor.fetchone()
if not row:
return None
return {
"id": row["id"],
"customer_id": row["customer_id"],
"total_amount": row["total_amount"],
"status": row["status"]
}
def get_customer_orders(self, customer_id: str) -> list[dict]:
cursor = self.read_db.execute("""
SELECT * FROM orders
WHERE customer_id = ?
ORDER BY created_at DESC
""", (customer_id,))
return [dict(row) for row in cursor]
Rebuilding State from Events
Aggregate Reconstruction
class OrderAggregate:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def load_from_events(self, order_id: str) -> 'Order':
events = self.event_store.get_events_for_aggregate(order_id)
order = Order()
for event in events:
order.apply(event)
return order
@dataclass
class Order:
order_id: str = ""
customer_id: str = ""
items: list[dict] = field(default_factory=list)
total_amount: float = 0.0
status: str = "created"
version: int = 0
def apply(self, event: Event):
if event.event_type == "OrderCreated":
self.order_id = event.aggregate_id
self.customer_id = event.payload["customer_id"]
self.items = event.payload["items"]
self.total_amount = event.payload["total_amount"]
self.status = "created"
self.version = event.version
elif event.event_type == "OrderItemAdded":
self.items.append({
"product_id": event.payload["product_id"],
"quantity": event.payload["quantity"],
"price": event.payload["price"]
})
self.total_amount += (event.payload["price"] *
event.payload["quantity"])
self.version = event.version
elif event.event_type == "OrderShipped":
self.status = "shipped"
self.version = event.version
Snapshotting for Performance
class SnapshotStore:
def __init__(self, connection):
self.connection = connection
def save_snapshot(self, aggregate_id: str, snapshot: dict, version: int):
self.connection.execute("""
INSERT OR REPLACE INTO snapshots (aggregate_id, snapshot, version)
VALUES (?, ?, ?)
""", (aggregate_id, json.dumps(snapshot), version))
def get_snapshot(self, aggregate_id: str) -> tuple[dict, int] | None:
cursor = self.connection.execute("""
SELECT snapshot, version FROM snapshots
WHERE aggregate_id = ?
ORDER BY version DESC
LIMIT 1
""", (aggregate_id,))
row = cursor.fetchone()
if row:
return json.loads(row["snapshot"]), row["version"]
return None
class SnapshottingOrderAggregate:
def __init__(self, event_store: EventStore, snapshot_store: SnapshotStore):
self.event_store = event_store
self.snapshot_store = snapshot_store
def load_aggregate(self, order_id: str, snapshot_threshold: int = 100):
snapshot, version = self.snapshot_store.get_snapshot(order_id)
if snapshot:
order = Order(**snapshot)
events = self.event_store.get_events_for_aggregate(
order_id, from_version=version
)
else:
order = Order()
events = self.event_store.get_events_for_aggregate(order_id)
for event in events:
order.apply(event)
if len(events) > snapshot_threshold:
self.snapshot_store.save_snapshot(
order_id,
{
"order_id": order.order_id,
"customer_id": order.customer_id,
"items": order.items,
"total_amount": order.total_amount,
"status": order.status
},
order.version
)
return order
Event Handler Patterns
Projecting to Multiple Views
class EventRouter:
def __init__(self):
self.handlers: dict[str, list[Callable]] = {}
def register(self, event_type: str, handler: Callable):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def route(self, event: Event):
handlers = self.handlers.get(event.event_type, [])
for handler in handlers:
try:
handler.handle(event)
except Exception as e:
logger.error(f"Handler failed for {event.event_type}: {e}")
class EventProcessor:
def __init__(self, event_store: EventStore, router: EventRouter):
self.event_store = event_store
self.router = router
self.checkpoint_store = CheckpointStore()
def process_events(self, last_checkpoint: int = 0):
cursor = self.event_store.get_events_after(last_checkpoint)
for event in cursor:
self.router.route(event)
self.checkpoint_store.save(event.event_id)
Read Model Projections
class OrderReadModelProjector:
def __init__(self, read_db):
self.read_db = read_db
def handle_order_created(self, event: Event):
self.read_db.execute("""
INSERT INTO order_summaries
(order_id, customer_id, total, status, created_at)
VALUES (?, ?, ?, ?, ?)
""", (event.aggregate_id, event.payload["customer_id"],
event.payload["total_amount"], "created", event.timestamp))
def handle_order_shipped(self, event: Event):
self.read_db.execute("""
UPDATE order_summaries
SET status = 'shipped',
tracking_number = ?,
carrier = ?,
shipped_at = ?
WHERE order_id = ?
""", (event.payload["tracking_number"], event.payload["carrier"],
event.timestamp, event.aggregate_id))
class CustomerActivityProjector:
def __init__(self, read_db):
self.read_db = read_db
def handle_order_created(self, event: Event):
customer_id = event.payload["customer_id"]
self.read_db.execute("""
UPDATE customer_activity
SET total_orders = total_orders + 1,
total_spent = total_spent + ?
WHERE customer_id = ?
""", (event.payload["total_amount"], customer_id))
self.read_db.execute("""
INSERT INTO customer_order_history
(customer_id, order_id, timestamp)
VALUES (?, ?, ?)
""", (customer_id, event.aggregate_id, event.timestamp))
Versioning and Migrations
Event Schema Versioning
@dataclass
class EventWithMigration:
original_event: Event
migrated_payload: dict
def migrate_if_needed(event: Event) -> EventWithMigration:
schema_version = event.metadata.get("schema_version", 1)
if schema_version == 1:
migrated = migrate_v1_to_v2(event.payload)
return EventWithMigration(
original_event=event,
migrated_payload=migrated
)
return EventWithMigration(
original_event=event,
migrated_payload=event.payload
)
def migrate_v1_to_v2(payload: dict) -> dict:
migrated = payload.copy()
if "price" in migrated and "amount" not in migrated:
migrated["amount"] = migrated["price"]
if "items" in migrated:
migrated["line_items"] = [
{"product_id": item["product_id"],
"quantity": item["quantity"],
"unit_amount": item.get("price", item.get("amount", 0))}
for item in migrated["items"]
]
del migrated["items"]
return migrated
Upcasters
class Upcaster:
def upcast(self, event: Event) -> Event:
raise NotImplementedError
class OrderCreatedUpcaster(Upcaster):
def upcast(self, event: Event) -> Event:
if event.event_type == "OrderCreated":
payload = event.payload
if "customer_email" not in payload:
payload["customer_email"] = "[email protected]"
if "currency" not in payload:
payload["currency"] = "USD"
return event
class UpcasterChain:
def __init__(self):
self.upcasters: list[Upcaster] = []
def add_upcaster(self, upcaster: Upcaster):
self.upcasters.append(upcaster)
def process(self, event: Event) -> Event:
for upcaster in self.upcasters:
event = upcaster.upcast(event)
return event
Best Practices
Good Patterns
GOOD_PATTERNS = {
"immutable_events": """
# Events should be immutable once created
# Never modify events after appending
โ
Good:
event = OrderCreatedEvent(order_id="123", status="created")
# Store as-is, never change
โ Bad:
event.payload["status"] = "modified" # NEVER DO THIS
""",
"eventual_consistency": """
# Accept that read models lag behind write model
# Design UI for eventual consistency
โ
Good:
- Show "processing" indicator while projecting
- Use WebSocket updates when projection completes
- Implement optimistic UI updates
โ Bad:
- Blocking UI waiting for all projections
- Synchronous read-after-write on different models
""",
"idempotency": """
# Handle duplicate events gracefully
โ
Good:
def handle_order_created(event: Event):
if order_exists(event.aggregate_id):
return # Idempotent: already handled
create_order(event.payload)
""",
"snapshotting": """
# Regularly snapshot aggregates to improve performance
โ
Good:
- Snapshot every N events (e.g., 100)
- Store snapshots in separate store
- Rebuild from snapshot + delta events
โ Bad:
- Loading all events for every aggregate load
- No performance consideration for large event stores
"""
}
Bad Patterns to Avoid
BAD_PATTERNS = {
"query_from_event_store": """
# Don't query current state from event store for reads
โ Bad:
def get_order_summary(order_id):
events = event_store.get_all(order_id)
order = reconstruct_order(events)
return format_summary(order)
โ
Good:
def get_order_summary(order_id):
return read_db.query(
"SELECT * FROM order_summaries WHERE id = ?",
order_id
)
""",
"complex_events": """
# Keep events small and focused
โ Bad:
class MassiveEvent:
# 500 fields covering everything
pass
โ
Good:
class OrderCreatedEvent: pass
class PaymentReceivedEvent: pass
class OrderShippedEvent: pass
# Small, focused, replayable events
""",
"synchronous_projections": """
# Don't block command completion on all projections
โ Bad:
def handle_create_order(cmd):
append_event(event)
# Wait for ALL projections to complete
wait_for_all_projections()
return order_id
โ
Good:
def handle_create_order(cmd):
append_event(event)
return order_id # Return immediately
# Projections run asynchronously
"""
}
Scaling Considerations
Partitioning the Event Store
-- Partition events by time for scaling
CREATE TABLE events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
version INT NOT NULL,
payload JSONB NOT NULL,
timestamp TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (timestamp);
-- Create monthly partitions
CREATE TABLE events_2026_01 PARTITION OF events
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE events_2026_02 PARTITION OF events
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
Projections Scaling
class ParallelProjectionProcessor:
def __init__(self, num_workers: int = 4):
self.queue = Queue()
self.workers = [
ProjectionWorker(self.queue)
for _ in range(num_workers)
]
def start(self):
for worker in self.workers:
worker.start()
def process_async(self, event: Event):
self.queue.put(event)
Summary
Event Sourcing and CQRS are powerful patterns for building complex, scalable systems:
- Event Sourcing stores all state changes as immutable events, providing complete audit trail and the ability to reconstruct any past state
- CQRS separates read and write models, allowing optimization of each for their specific use cases
- Projections transform events into read-optimized views
- Snapshotting improves performance for aggregates with many events
- Versioning ensures backward compatibility as your domain evolves
These patterns are particularly valuable for:
- Systems requiring complete audit history
- Complex business domains with rich state transitions
- Systems needing multiple, different read representations
- Event-driven microservices architectures
Comments