Introduction
Traditional architectures use a single model for both reading and writing data. This works well for simple applications but creates challenges as systems grow in complexity. CQRS (Command Query Responsibility Segregation) and Event Sourcing offer alternative approaches that separate read and write operations, enabling better scalability, flexibility, and auditability.
In 2026, these patterns have matured significantly, with established frameworks and real-world implementations demonstrating their value. This guide explores CQRS and Event Sourcing patterns, implementation strategies, and best practices.
Understanding CQRS
What Is CQRS?
CQRS separates read and write operations into distinct models:
- Commands: Operations that change state (Create, Update, Delete)
- Queries: Operations that read state without modification
This separation allows optimizing each operation type independently.
Traditional vs CQRS
Traditional CRUD:
# Single model for both read and write
class OrderController:
def get_order(self, order_id):
return self.db.query(Order).filter_by(id=order_id).first()
def create_order(self, data):
order = Order(**data)
self.db.add(order)
self.db.commit()
return order
def update_order(self, order_id, data):
order = self.db.query(Order).filter_by(id=order_id).first()
for key, value in data.items():
setattr(order, key, value)
self.db.commit()
return order
CQRS:
# Separate models for read and write
# Write model - focused on business logic
class OrderCommandHandler:
def create_order(self, data):
# Validate business rules
validator = OrderValidator()
validator.validate(data)
# Create aggregate
order = Order.create(**data)
# Save to event store
self.event_store.save(order.events)
return order.id
# Read model - optimized for queries
class OrderQueryHandler:
def get_order_summary(self, order_id):
return self.read_db.query(OrderSummaryDTO).filter_by(id=order_id).first()
def get_orders_by_customer(self, customer_id):
return self.read_db.query(OrderListDTO).filter_by(customer_id=customer_id).all()
Benefits of CQRS
Optimized read models: Different read models for different views:
# Product list view - minimal data
class ProductListItem:
id: str
name: str
price: float
thumbnail_url: str
# Product detail view - full data
class ProductDetail:
id: str
name: str
description: str
price: float
inventory: int
images: list[str]
specifications: dict
reviews: list[Review]
Optimized write models: Focused on business logic:
# Write model - complex validation
class Order:
def __init__(self):
self.items = []
self.status = "draft"
def add_item(self, product, quantity):
if product.inventory < quantity:
raise InsufficientInventoryError()
if self.status != "draft":
raise OrderNotEditableError()
self.items.append(OrderItem(product, quantity))
def submit(self):
if not self.items:
raise EmptyOrderError()
self.status = "submitted"
Independent scaling: Read and write workloads scale separately:
# Kubernetes: Scale reads independently
resources:
read-api:
replicas: 10
write-api:
replicas: 2
Event Sourcing
What Is Event Sourcing?
Instead of storing current state, event sourcing stores all state changes as a sequence of events:
# Traditional: Store current state
class BankAccount:
balance: 100 # Current balance
# Event Sourcing: Store all changes
class BankAccountEvents:
events: [
AccountCreated(account_id="acc_1", initial_balance=0),
DepositMade(account_id="acc_1", amount=50, new_balance=50),
DepositMade(account_id="acc_1", amount=50, new_balance=100),
]
# Current state = replay all events
def get_balance(events):
balance = 0
for event in events:
if isinstance(event, DepositMade):
balance = event.new_balance
elif isinstance(event, WithdrawalMade):
balance = event.new_balance
return balance
Event Structure
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@dataclass
class Event:
event_id: str
aggregate_id: str
event_type: str
timestamp: datetime
payload: dict[str, Any]
metadata: dict[str, Any]
@dataclass
class OrderCreated(Event):
event_type: str = "OrderCreated"
def __init__(self, order_id: str, customer_id: str, items: list):
self.event_id = str(uuid.uuid4())
self.aggregate_id = order_id
self.event_type = "OrderCreated"
self.timestamp = datetime.now()
self.payload = {
"customer_id": customer_id,
"items": items
}
self.metadata = {}
Event Store
class EventStore:
def __init__(self, connection):
self.connection = connection
def append(self, aggregate_id: str, events: list[Event]):
for event in events:
self.connection.execute(
"""
INSERT INTO events (aggregate_id, event_type, payload, metadata, created_at)
VALUES (?, ?, ?, ?, ?)
""",
aggregate_id,
event.event_type,
json.dumps(event.payload),
json.dumps(event.metadata),
event.timestamp
)
def get_events(self, aggregate_id: str) -> list[Event]:
rows = self.connection.execute(
"SELECT * FROM events WHERE aggregate_id = ? ORDER BY created_at",
aggregate_id
)
return [self.row_to_event(row) for row in rows]
def get_events_stream(self, from_position: int = 0) -> EventStream:
return EventStream(self.connection, from_position)
Projections
Build read models from events:
# Materialized view projection
class OrderSummaryProjection:
def __init__(self, read_db):
self.read_db = read_db
def project(self, event: Event):
if event.event_type == "OrderCreated":
self._handle_order_created(event)
elif event.event_type == "OrderItemAdded":
self._handle_item_added(event)
elif event.event_type == "OrderSubmitted":
self._handle_order_submitted(event)
def _handle_order_created(self, event):
self.read_db.execute(
"""
INSERT INTO order_summaries (order_id, customer_id, total, status, created_at)
VALUES (?, ?, 0, 'draft', ?)
""",
event.aggregate_id,
event.payload["customer_id"],
event.timestamp
)
CQRS + Event Sourcing Together
Architecture Overview
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ Client โโโโโโถโ Command โโโโโโถโ Event Store โ
โโโโโโโโโโโโโโโ โ Handler โ โโโโโโโโฌโโโโโโโ
โโโโโโโโโโโโโโโ โ
โ Events
โโโโโโโโโโโโโโโ โโโโโโโโผโโโโโโโ
โโโโโโโโโโโโโโโ โ Query โโโโโโโ Projection โ
โ Client โโโโโโโ Handler โ โ Engine โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
Implementation Example
# Domain Events
@dataclass
class OrderEvent(Event):
pass
@dataclass
class OrderCreated(OrderEvent):
def __init__(self, order_id: str, customer_id: str):
super().__init__()
self.aggregate_id = order_id
self.event_type = "OrderCreated"
self.payload = {"customer_id": customer_id}
@dataclass
class OrderItemAdded(OrderEvent):
def __init__(self, order_id: str, product_id: str, quantity: int):
super().__init__()
self.aggregate_id = order_id
self.event_type = "OrderItemAdded"
self.payload = {"product_id": product_id, "quantity": quantity}
# Aggregate
class OrderAggregate:
def __init__(self, order_id: str):
self.order_id = order_id
self.events = []
self.items = []
self.status = "draft"
def create(self, customer_id: str):
self.events.append(OrderCreated(self.order_id, customer_id))
return self
def add_item(self, product_id: str, quantity: int):
self.events.append(OrderItemAdded(self.order_id, product_id, quantity))
return self
def get_uncommitted_events(self) -> list[Event]:
return self.events
def mark_events_committed(self):
self.events = []
# Command Handler
class OrderCommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def handle_create_order(self, command: CreateOrderCommand) -> str:
aggregate = OrderAggregate(str(uuid.uuid4()))
# Build aggregate from events if updating existing
existing_events = self.event_store.get_events(command.order_id)
# ... replay events to restore state
# Execute command
aggregate.create(command.customer_id)
for item in command.items:
aggregate.add_item(item["product_id"], item["quantity"])
# Save events
self.event_store.append(
aggregate.order_id,
aggregate.get_uncommitted_events()
)
return aggregate.order_id
# Query Handler
class OrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db
def get_order_summary(self, order_id: str) -> OrderSummaryDTO:
return self.read_db.query(OrderSummaryDTO).filter_by(
order_id=order_id
).first()
Handling Concurrency
# Optimistic concurrency with events
class EventStore:
def append(self, aggregate_id: str, expected_version: int, events: list[Event]):
current_version = self.get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, found {current_version}"
)
# Store events with version
for i, event in enumerate(events):
self.connection.execute(
"INSERT INTO events VALUES (?, ?, ?, ?, ?)",
aggregate_id,
expected_version + i + 1,
event.event_type,
json.dumps(event.payload),
event.timestamp
)
Practical Considerations
When to Use CQRS
Good candidates:
- Complex domain logic with validation
- Multiple read models needed
- High read-to-write ratio
- Audit requirements
- Event-driven architectures
Probably not needed for:
- Simple CRUD applications
- Low complexity domains
- Single read model
When to Use Event Sourcing
Good candidates:
- Audit requirements (complete history)
- Temporal queries (“what was the state at time X?”)
- Complex business rules requiring history
- Rebuilding state from events
- Event-driven microservices
Probably not needed for:
- Simple state management
- Performance-critical writes
- Eventually consistent reads acceptable
Challenges
Complexity: More moving parts than simple CRUD
Event schema evolution: Handling schema changes over time
# Event versioning
@dataclass
class OrderCreatedV1:
event_type: str = "OrderCreated"
version: int = 1
payload: dict = field(default_factory=lambda: {
"customer_id": None
})
@dataclass
class OrderCreatedV2:
event_type: str = "OrderCreated"
version: int = 2
payload: dict = field(default_factory=lambda: {
"customer_id": None,
"shipping_address": None # Added field
})
# Upcaster
class EventUpcaster:
def upcast(self, event: Event) -> Event:
if event.version < 2:
return self._upgrade_to_v2(event)
return event
def _upgrade_to_v2(self, event):
event.payload["shipping_address"] = None # Default value
event.version = 2
return event
Performance: Replaying many events can be slow
# Snapshotting for performance
class SnapshotStore:
def save_snapshot(self, aggregate_id: str, version: int, state: dict):
self.db.execute(
"INSERT INTO snapshots VALUES (?, ?, ?)",
aggregate_id, version, json.dumps(state)
)
def get_snapshot(self, aggregate_id: str) -> tuple[int, dict] | None:
row = self.db.query(
"SELECT version, state FROM snapshots WHERE aggregate_id = ? "
"ORDER BY version DESC LIMIT 1",
aggregate_id
).first()
if row:
return row.version, json.loads(row.state)
return None
Tools and Frameworks
Event Sourcing
- Axon Framework: Java/Kotlin CQRS/ES framework
- EventStoreDB: Purpose-built event store
- Marten: .NET event sourcing on PostgreSQL
- AggregateSource: .NET ES library
Read Models
- Elasticsearch: Full-text search and analytics
- PostgreSQL: Relational read models
- Redis: Cached read models
- MongoDB: Document read models
Best Practices
Keep Events Immutable
# Wrong: Mutable event
class BadEvent:
def __init__(self):
self.value = 0
def update(self, new_value):
self.value = new_value # Don't do this!
# Right: Immutable events
@dataclass(frozen=True)
class GoodEvent:
old_value: int
new_value: int
Design Events Carefully
# Good: Intent-focused event
class OrderSubmitted:
order_id: str
submitted_at: datetime
total: Decimal
# Avoid: Implementation-focused event
class OrderTableUpdated:
# What does this mean?
table: str = "orders"
operation: str = "UPDATE"
Handle Failures
# Idempotent event processing
class IdempotentEventHandler:
def __init__(self, event_store):
self.event_store = event_store
self.processed_events = set()
def handle(self, event: Event):
# Check if already processed
if event.event_id in self.processed_events:
return
# Process event
self._process(event)
# Mark as processed
self.processed_events.add(event.event_id)
Conclusion
CQRS and Event Sourcing provide powerful patterns for building complex applications. CQRS enables optimized read and write models, while Event Sourcing provides complete audit trails and temporal queries.
These patterns add complexityโonly adopt when the benefits justify it. Start with simpler approaches and evolve as requirements demand.
The combination works particularly well for event-driven microservices, audit-critical systems, and domains with complex state transitions. The initial investment pays dividends in flexibility and understanding.
Comments