In the world of distributed systems and domain-driven design, two architectural patterns frequently emerge in conversations about building scalable, maintainable applications: Event Sourcing and CQRS (Command Query Responsibility Segregation). While they often appear together, they solve different problems and can be used independently.
Understanding when to use each patternโand when to combine themโis crucial for building systems that can evolve, scale, and maintain consistency over time.
The Problem with Traditional Persistence
Traditional applications typically use a CRUD (Create, Read, Update, Delete) approach:
# Traditional CRUD approach
class OrderRepository:
def create(self, order_data):
# Insert into database
db.insert("orders", order_data)
def read(self, order_id):
# Select from database
return db.select("orders", id=order_id)
def update(self, order_id, changes):
# Update database
db.update("orders", id=order_id, data=changes)
def delete(self, order_id):
# Delete from database
db.delete("orders", id=order_id)
This approach has limitations:
- You lose history of changes
- Optimistic locking leads to race conditions
- Read and write paths compete for the same resources
- Scaling becomes difficult
Event Sourcing: Storing State as a Sequence of Events
Event Sourcing is a pattern where instead of storing the current state of an entity, you store a sequence of events that led to that state. The current state is derived by replaying all events.
Core Concept
# Instead of storing current state:
order_current_state = {
"id": "order-123",
"status": "shipped",
"items": [{"product": "Widget", "qty": 2}],
"total": 29.98
}
# Event Sourcing stores events:
order_events = [
{"type": "OrderCreated", "data": {...}},
{"type": "ItemAdded", "data": {...}},
{"type": "PaymentReceived", "data": {...}},
{"type": "OrderShipped", "data": {...}}
]
Event Store Implementation
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Any
import json
@dataclass
class Event:
event_id: str
aggregate_id: str
event_type: str
data: Dict[str, Any]
timestamp: datetime
version: int
class EventStore:
def __init__(self):
self.events: Dict[str, List[Event]] = {}
def append_event(self, event: Event):
if event.aggregate_id not in self.events:
self.events[event.aggregate_id] = []
self.events[event.aggregate_id].append(event)
def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]:
return self.events.get(aggregate_id, [])
def get_all_events(self, aggregate_type: str) -> List[Event]:
"""Get events across all aggregates of a type."""
return [
event for events in self.events.values()
for event in events
if event.aggregate_id.startswith(aggregate_type)
]
Aggregate Reconstruction
class Order:
def __init__(self, order_id: str):
self.order_id = order_id
self.status = "draft"
self.items: List[Dict] = []
self.total = 0.0
self.version = 0
def apply(self, event: Event):
"""Apply event to rebuild state."""
self.version = event.version
if event.event_type == "OrderCreated":
self.order_id = event.data["order_id"]
self.status = "created"
elif event.event_type == "ItemAdded":
self.items.append(event.data["item"])
self.total += event.data["item"]["price"] * event.data["item"]["qty"]
self.status = "items_added"
elif event.event_type == "PaymentReceived":
self.status = "paid"
elif event.event_type == "OrderShipped":
self.status = "shipped"
@classmethod
def reconstruct(cls, event_store: EventStore, order_id: str) -> "Order":
"""Reconstruct order from event history."""
order = cls(order_id)
events = event_store.get_events_for_aggregate(order_id)
for event in events:
order.apply(event)
return order
Command Handling
class OrderCommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def handle_create_order(self, command: Dict) -> Event:
order_id = command["order_id"]
event = Event(
event_id=generate_uuid(),
aggregate_id=order_id,
event_type="OrderCreated",
data={
"order_id": order_id,
"customer_id": command["customer_id"],
"created_at": datetime.utcnow().isoformat()
},
timestamp=datetime.utcnow(),
version=1
)
self.event_store.append_event(event)
return event
def handle_add_item(self, command: Dict) -> Event:
order_id = command["order_id"]
# Get current version
events = self.event_store.get_events_for_aggregate(order_id)
version = len(events) + 1
event = Event(
event_id=generate_uuid(),
aggregate_id=order_id,
event_type="ItemAdded",
data={
"item": command["item"],
"added_at": datetime.utcnow().isoformat()
},
timestamp=datetime.utcnow(),
version=version
)
self.event_store.append_event(event)
return event
When to Use Event Sourcing
# Good for Event Sourcing:
benefits:
- Complete audit trail: Every change is stored
- Temporal queries: "What was the state at time T?"
- Event replay: Rebuild state from any point
- Feature flexibility: Add new read models easily
- Debugging: Full history of what happened
use_cases:
- Financial systems (audit requirements)
- Inventory management
- Workflow engines
- Gaming (replay functionality)
- Any domain with complex state transitions
Problems with Event Sourcing
# Challenges to consider
challenges = {
"event_schema_evolution": {
"problem": "Events stored forever must handle schema changes",
"solution": "Event upcasters / versioning"
},
"read_model_performance": {
"problem": "Replaying events for every read is slow",
"solution": "Projections / materialized views"
},
"eventual_consistency": {
"problem": "Read models may be stale",
"solution": "Accept eventual consistency"
},
"snapshotting": {
"problem": "Too many events to replay",
"solution": "Periodic snapshots"
}
}
CQRS: Separating Read and Write Models
CQRS (Command Query Responsibility Segregation) is a pattern that separates the way you read data from the way you write data. Instead of having a single model for both operations, you have distinct models optimized for each.
Core Concept
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Traditional Model โ
โ โ
โ Client โโโโโโบ Read/Write Model โโโโโโบ Database โ
โ (Same for both) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CQRS Model โ
โ โ
โ Client โโโโโโบ Command Model โโโโโโบ Write DB โ
โ โ โ
โ โโโโโโบ Query Model โโโโโโบ Read DB โ
โ (Separate) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Command Side (Write Model)
class OrderCommand:
def __init__(self, order_id, customer_id, items):
self.order_id = order_id
self.customer_id = customer_id
self.items = items
self.status = "pending"
self.created_at = datetime.utcnow()
class OrderCommandHandler:
def __init__(self, write_repository):
self.repository = write_repository
def handle_create_order(self, command: OrderCommand):
# Validate business rules
if not command.items:
raise ValueError("Order must have items")
# Create aggregate
order = Order(
id=command.order_id,
customer_id=command.customer_id,
items=command.items,
status="created"
)
# Persist to write database
self.repository.save(order)
return order.id
Query Side (Read Model)
# Different read models for different views
class OrderListView:
"""Lightweight view for list pages."""
def __init__(self, read_db):
self.db = read_db
def get_orders(self, limit=20, offset=0):
return self.db.query("""
SELECT order_id, customer_name, total, status, created_at
FROM orders
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", [limit, offset])
class OrderDetailView:
"""Detailed view for order detail page."""
def __init__(self, read_db):
self.db = read_db
def get_order_detail(self, order_id):
order = self.db.query_one(
"SELECT * FROM orders WHERE order_id = %s",
[order_id]
)
items = self.db.query(
"SELECT * FROM order_items WHERE order_id = %s",
[order_id]
)
timeline = self.db.query(
"SELECT * FROM order_timeline WHERE order_id = %s",
[order_id]
)
return {
"order": order,
"items": items,
"timeline": timeline
}
class OrderAnalyticsView:
"""Aggregated view for analytics."""
def __init__(self, read_db):
self.db = read_db
def get_revenue_by_day(self, start_date, end_date):
return self.db.query("""
SELECT DATE(created_at) as day,
SUM(total) as revenue,
COUNT(*) as order_count
FROM orders
WHERE created_at BETWEEN %s AND %s
GROUP BY DATE(created_at)
""", [start_date, end_date])
Synchronization Between Models
class ModelSynchronizer:
def __init__(self, event_store):
self.event_store = event_store
self.read_databases = {
"default": ReadDatabase(),
"analytics": AnalyticsDatabase(),
"search": SearchIndex()
}
def synchronize(self, event: Event):
"""Sync event to all read models."""
if event.event_type == "OrderCreated":
self._sync_order_created(event)
elif event.event_type == "OrderShipped":
self._sync_order_shipped(event)
def _sync_order_created(self, event: Event):
data = event.data
# Default read DB
self.read_databases["default"].insert("orders", {
"order_id": data["order_id"],
"customer_id": data["customer_id"],
"status": "created",
"total": 0
})
# Analytics DB
self.read_databases["analytics"].increment("daily_orders",
data["created_at"].date())
# Search index
self.read_databases["search"].index("orders", {
"id": data["order_id"],
"text": f"Order {data['order_id']}"
})
When to Use CQRS
# Good for CQRS:
benefits:
- Optimized read models for specific needs
- Different scaling strategies for reads/writes
- Team autonomy (different teams own read/write)
- Complex query requirements
- Polyglot persistence (different DBs for read/write)
use_cases:
- Read-heavy applications (dashboards, reports)
- Complex domain with many query patterns
- Multi-tenant SaaS applications
- Systems with different consistency needs
Problems with CQRS
# CQRS challenges
challenges = {
"eventual_consistency": {
"problem": "Read models may lag behind write model",
"solution": "Accept and communicate eventual consistency"
},
"complexity": {
"problem": "Two models instead of one",
"solution": "Only use when benefits justify complexity"
},
"synchronization": {
"problem": "Keeping read models in sync",
"solution": "Reliable event-driven sync, saga patterns"
},
"testing": {
"problem": "More components to test",
"solution": "Integration tests for sync, unit for each model"
}
}
Event Sourcing + CQRS: A Powerful Combination
While both patterns can be used independently, they complement each other exceptionally well.
Combined Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Event Sourcing + CQRS โ
โ โ
โ Commands โโโบ Command Handler โโโบ Event Store โ
โ โ โ
โ โผ โ
โ Events โโโโโโโบ Projections โโโบ Read Models โ
โ โ โ
โ โโโโโโโโโโโโโผโโโโโโโโโโโโ โ
โ โผ โผ โผ โ
โ List View Detail View Analytics โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Full Implementation
class OrderAggregate:
"""Order aggregate with event sourcing."""
def __init__(self, order_id: str_id = order_id):
self.order
self.events: List[Event] = []
self.status = "draft"
self.items: List[Dict] = []
self.total = 0.0
self.version = 0
def create_order(self, customer_id: str):
if self.status != "draft":
raise ValueError("Order already created")
event = Event(
aggregate_id=self.order_id,
event_type="OrderCreated",
data={"customer_id": customer_id},
version=self.version + 1
)
self._apply(event)
self.events.append(event)
def add_item(self, product_id: str, quantity: int, price: float):
if self.status == "shipped":
raise ValueError("Cannot add items to shipped order")
event = Event(
aggregate_id=self.order_id,
event_type="ItemAdded",
data={
"product_id": product_id,
"quantity": quantity,
"price": price
},
version=self.version + 1
)
self._apply(event)
self.events.append(event)
def ship_order(self):
if self.status != "paid":
raise ValueError("Can only ship paid orders")
event = Event(
aggregate_id=self.order_id,
event_type="OrderShipped",
data={"shipped_at": datetime.utcnow().isoformat()},
version=self.version + 1
)
self._apply(event)
self.events.append(event)
def _apply(self, event: Event):
self.version = event.version
if event.event_type == "OrderCreated":
self.status = "created"
elif event.event_type == "ItemAdded":
self.items.append(event.data)
self.total += event.data["price"] * event.data["quantity"]
elif event.event_type == "OrderShipped":
self.status = "shipped"
def get_uncommitted_events(self) -> List[Event]:
return self.events
class ProjectionBuilder:
"""Build read models from events."""
def __init__(self):
self.order_list: Dict = {}
self.order_detail: Dict = {}
self.daily_stats: Dict = {}
def project(self, event: Event):
if event.event_type == "OrderCreated":
self._project_order_created(event)
elif event.event_type == "ItemAdded":
self._project_item_added(event)
elif event.event_type == "OrderShipped":
self._project_order_shipped(event)
def _project_order_created(self, event: Event):
order_id = event.aggregate_id
self.order_list[order_id] = {
"order_id": order_id,
"customer_id": event.data["customer_id"],
"status": "created",
"total": 0,
"created_at": event.timestamp
}
self.order_detail[order_id] = {
"order_id": order_id,
"items": [],
"timeline": [f"Order created at {event.timestamp}"]
}
def _project_item_added(self, event: Event):
order_id = event.aggregate_id
if order_id in self.order_list:
self.order_list[order_id]["total"] += (
event.data["price"] * event.data["quantity"]
)
if order_id in self.order_detail:
self.order_detail[order_id]["items"].append(event.data)
def _project_order_shipped(self, event: Event):
order_id = event.aggregate_id
if order_id in self.order_list:
self.order_list[order_id]["status"] = "shipped"
if order_id in self.order_detail:
self.order_detail[order_id]["timeline"].append(
f"Order shipped at {event.data['shipped_at']}"
)
Decision Matrix
Should You Use Event Sourcing?
# Use Event Sourcing when:
use_event_sourcing = (
need_audit_trail or
need_temporal_queries or
complex_state_machine or
want_event_replay or
building_event_driven_system
)
# Avoid when:
avoid_event_sourcing = (
simple_crud_app or
low_volume or
strong_agility_objection or
team_new_to_pattern
)
Should You Use CQRS?
# Use CQRS when:
use_cqrs = (
read_write_ratio_imbalanced or
multiple_read_views_needed or
different_consistency_needs or
want_polyglot_persistence or
building_microservices
)
# Avoid when:
avoid_cqrs = (
simple_read_write_pattern or
low_complexity_domain or
strong_consistency_required or
team_size_small
)
Comparison Summary
| Aspect | Event Sourcing | CQRS |
|---|---|---|
| Primary Benefit | Complete history, audit trail | Optimized read/write paths |
| State Storage | Events (immutable) | Current state |
| Read Model | Replay events | Pre-built projections |
| Consistency | Eventual | Eventual (or strong if synced) |
| Complexity | Higher | Medium |
| Storage | Growing (append-only) | Optimized per view |
| Queries | Rebuild from events | Direct reads |
| Debugging | Full history | Check read model |
Implementation Tools
# Popular frameworks supporting these patterns
event_sourcing:
- EventStoreDB (formerly Event Store)
- Axon Framework (Java)
- EventFlow (.NET)
- Lagom (Scala/Java)
- Prooph (PHP)
cqrs_frameworks:
- Many CQRS implementations are custom
- Doctrine (PHP) has CQRS support
- Microservices frameworks generally support it
message_brokers:
- Kafka (popular for event sourcing)
- RabbitMQ
- NATS
Conclusion
Event Sourcing and CQRS are powerful patterns that address different architectural challenges:
- Event Sourcing stores state as a sequence of events, providing complete history and auditability
- CQRS separates read and write models, allowing optimization for different access patterns
- Together, they form a robust foundation for complex, scalable systems
However, they introduce significant complexity. Start with simpler patterns and evolve toward these approaches only when the benefits outweigh the costs.
Remember:
- Don’t use these patterns because they’re trendy
- Use them when your domain genuinely needs the capabilities they provide
- Start small and measure the impact
Comments