Introduction
The Command Query Responsibility Segregation (CQRS) pattern is an architectural approach that separates read and write operations for a data store. While traditional CRUD applications treat reads and writes identically, CQRS recognizes that these operations have fundamentally different requirements, performance characteristics, and scaling needs.
CQRS enables you to optimize each side independentlyโusing different data models, different storage technologies, and different scaling strategies for reads versus writes.
Understanding CQRS
The Problem with CRUD
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Traditional CRUD Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โ
โ โ Client โ โ
โ โโโโโโฌโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโ โ
โ โ Service โ Same model for reads and writes โ
โ โโโโโโฌโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโ โ
โ โDatabase โ โ
โ โโโโโโโโโโโ โ
โ โ
โ Problems: โ
โ - Single model serves both read and write poorly โ
โ - Cannot scale reads and writes independently โ
โ - Complex queries impact write performance โ
โ - Read models often denormalized for performance โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
CQRS Solution
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CQRS Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โ
โ โ Client โ โ
โ โโโโโโฌโโโโโ โ
โ โ โ
โ โโโโโโดโโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโ โโโโโโโโ โ
โ โCommandโ โQuery โ โ
โ โ Side โ โ Side โ โ
โ โโโโฌโโโโ โโโโฌโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโ โโโโโโโโ โ
โ โ Write โ โ Read โ โ
โ โ Model โ โModel โ โ
โ โโโโฌโโโโ โโโโฌโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโ โโโโโโโโ โ
โ โ Write โ โ Read โ โ
โ โ Store โ โ Storeโ โ
โ โโโโโโโโ โโโโโโโโ โ
โ โ
โ Benefits: โ
โ - Independent scaling โ
โ - Optimized models for each operation โ
โ - Different data stores for different needs โ
โ - Better performance for complex reads โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
CQRS Implementation
Simple CQRS with Separate Models
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import uuid
# Write Model (normalized)
@dataclass
class OrderEntity:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
customer_id: str = ""
items: list[dict] = field(default_factory=list)
total_amount: float = 0.0
status: str = "pending"
shipping_address_id: str = ""
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
version: int = 1
# Read Model (denormalized for display)
@dataclass
class OrderView:
order_id: str = ""
customer_name: str = ""
customer_email: str = ""
items_summary: str = ""
item_count: int = 0
total_amount: float = 0.0
status_display: str = ""
shipping_address: str = ""
can_cancel: bool = False
can_modify: bool = False
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class CommandHandler:
"""Handles write operations."""
def __init__(self, db):
self.db = db
def create_order(self, customer_id: str, items: list[dict],
shipping_address_id: str) -> OrderEntity:
"""Create a new order."""
total = sum(item['price'] * item['quantity'] for item in items)
order = OrderEntity(
customer_id=customer_id,
items=items,
total_amount=total,
shipping_address_id=shipping_address_id,
)
# Save to write database
self.db.orders.insert(order)
# Update read model
self._update_read_model(order)
return order
def update_order_status(self, order_id: str, new_status: str):
"""Update order status."""
order = self.db.orders.find_one(id=order_id)
if not order:
raise ValueError(f"Order not found: {order_id}")
order.status = new_status
order.updated_at = datetime.utcnow()
order.version += 1
self.db.orders.update(order)
# Update read model
self._update_read_model(order)
def add_item(self, order_id: str, item: dict):
"""Add item to order."""
order = self.db.orders.find_one(id=order_id)
if not order:
raise ValueError(f"Order not found: {order_id}")
if order.status != "pending":
raise ValueError("Cannot modify order not in pending status")
order.items.append(item)
order.total_amount += item['price'] * item['quantity']
order.updated_at = datetime.utcnow()
order.version += 1
self.db.orders.update(order)
self._update_read_model(order)
def _update_read_model(self, order: OrderEntity):
"""Synchronously update read model."""
# Get customer info
customer = self.db.customers.find_one(id=order.customer_id)
# Build read model
view = OrderView(
order_id=order.id,
customer_name=customer.name if customer else "",
customer_email=customer.email if customer else "",
items_summary=f"{len(order.items)} items",
item_count=len(order.items),
total_amount=order.total_amount,
status_display=order.status.title(),
shipping_address=self._format_address(order.shipping_address_id),
can_cancel=order.status == "pending",
can_modify=order.status == "pending",
created_at=order.created_at,
updated_at=order.updated_at,
)
self.db.order_views.upsert(view, key=view.order_id)
def _format_address(self, address_id: str) -> str:
address = self.db.addresses.find_one(id=address_id)
if not address:
return ""
return f"{address.street}, {address.city}, {address.country}"
class QueryHandler:
"""Handles read operations."""
def __init__(self, db):
self.db = db
def get_order(self, order_id: str) -> Optional[OrderView]:
"""Get order by ID (from read model)."""
return self.db.order_views.find_one(order_id=order_id)
def get_orders_by_customer(self, customer_id: str,
limit: int = 50) -> list[OrderView]:
"""Get customer's orders."""
# This could query a separate read database
return self.db.order_views.find(
customer_email=self._get_customer_email(customer_id)
).limit(limit).all()
def get_orders_by_status(self, status: str,
limit: int = 100) -> list[OrderView]:
"""Get orders by status."""
# Efficient status queries on denormalized model
return self.db.order_views.find(
status_display=status.title()
).limit(limit).all()
def get_order_summary(self) -> dict:
"""Get order statistics."""
return self.db.order_views.aggregate([
{"$group": {
"_id": "$status_display",
"count": {"$sum": 1},
"total_amount": {"$sum": "$total_amount"}
}}
])
def _get_customer_email(self, customer_id: str) -> str:
customer = self.db.customers.find_one(id=customer_id)
return customer.email if customer else ""
Asynchronous CQRS with Event Bus
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import threading
import queue
# Event definitions
@dataclass
class DomainEvent:
event_id: str = ""
occurred_at: datetime = field(default_factory=datetime.utcnow)
aggregate_id: str = ""
@dataclass
class OrderCreatedEvent(DomainEvent):
customer_id: str = ""
items: list[dict] = field(default_factory=list)
total_amount: float = 0.0
shipping_address_id: str = ""
@dataclass
class OrderStatusChangedEvent(DomainEvent):
old_status: str = ""
new_status: str = ""
class EventBus:
"""Simple in-memory event bus."""
def __init__(self):
self.subscribers: dict[type, list[Callable]] = {}
self.event_queue = queue.Queue()
self.running = False
def subscribe(self, event_type: type, handler: Callable):
"""Subscribe to events."""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
def publish(self, event: DomainEvent):
"""Publish event."""
self.event_queue.put(event)
def start(self):
"""Start processing events."""
self.running = True
thread = threading.Thread(target=self._process_events)
thread.daemon = True
thread.start()
def _process_events(self):
"""Process events from queue."""
while self.running:
try:
event = self.event_queue.get(timeout=1)
handlers = self.subscribers.get(type(event), [])
for handler in handlers:
try:
handler(event)
except Exception as e:
print(f"Error handling event: {e}")
except queue.Empty:
continue
def stop(self):
"""Stop processing."""
self.running = False
class EventSourcingOrderRepository:
"""Repository with event sourcing."""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.events: list[DomainEvent] = []
self.snapshots: dict[str, OrderEntity] = {}
def append(self, event: DomainEvent):
"""Append event to stream."""
self.events.append(event)
self.event_bus.publish(event)
def get_state(self, order_id: str) -> OrderEntity:
"""Reconstruct state from events."""
# Check snapshot first
if order_id in self.snapshots:
events = self._get_events_after_snapshot(order_id)
return self._reconstruct_from_events(self.snapshots[order_id], events)
# Reconstruct from all events
events = [e for e in self.events if e.aggregate_id == order_id]
return self._reconstruct_from_events(None, events)
def _get_events_after_snapshot(self, order_id: str) -> list[DomainEvent]:
snapshot = self.snapshots[order_id]
return [e for e in self.events
if e.aggregate_id == order_id
and e.occurred_at > snapshot.updated_at]
def _reconstruct_from_events(self, initial: Optional[OrderEntity],
events: list[DomainEvent]) -> OrderEntity:
state = initial or OrderEntity()
for event in events:
if isinstance(event, OrderCreatedEvent):
state.id = event.aggregate_id
state.customer_id = event.customer_id
state.items = event.items
state.total_amount = event.total_amount
state.shipping_address_id = event.shipping_address_id
state.status = "pending"
elif isinstance(event, OrderStatusChangedEvent):
state.status = event.new_status
state.updated_at = event.occurred_at
return state
def create_snapshot(self, order: OrderEntity):
"""Create snapshot for faster reconstruction."""
self.snapshots[order.id] = order
# Projections for different read models
class OrderViewProjection:
"""Project events to order view model."""
def __init__(self, read_db):
self.read_db = read_db
def handle(self, event: DomainEvent):
"""Handle events and update read model."""
if isinstance(event, OrderCreatedEvent):
self._handle_created(event)
elif isinstance(event, OrderStatusChangedEvent):
self._handle_status_changed(event)
def _handle_created(self, event: OrderCreatedEvent):
view = OrderView(
order_id=event.aggregate_id,
item_count=len(event.items),
total_amount=event.total_amount,
status_display="Pending",
can_cancel=True,
can_modify=True,
created_at=event.occurred_at,
)
self.read_db.order_views.insert(view)
def _handle_status_changed(self, event: OrderStatusChangedEvent):
view = self.read_db.order_views.find_one(order_id=event.aggregate_id)
if view:
view.status_display = event.new_status.title()
view.can_cancel = event.new_status == "pending"
view.can_modify = event.new_status == "pending"
view.updated_at = event.occurred_at
self.read_db.order_views.update(view)
API Layer with CQRS
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Write models (commands)
class CreateOrderCommand(BaseModel):
customer_id: str
items: list[dict]
shipping_address_id: str
class UpdateStatusCommand(BaseModel):
order_id: str
new_status: str
# Read models (DTOs)
class OrderDTO(BaseModel):
order_id: str
customer_name: str
customer_email: str
items_summary: str
total_amount: float
status_display: str
can_cancel: bool
can_modify: bool
# Dependency injection
def get_command_handler():
return CommandHandler(db)
def get_query_handler():
return QueryHandler(db)
@app.post("/orders", response_model=OrderDTO)
async def create_order(
command: CreateOrderCommand,
handler: CommandHandler = Depends(get_command_handler)
):
"""Create a new order."""
try:
order = handler.create_order(
customer_id=command.customer_id,
items=command.items,
shipping_address_id=command.shipping_address_id,
)
# Return the created order
query_handler = get_query_handler()
return query_handler.get_order(order.id)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/orders/{order_id}", response_model=OrderDTO)
async def get_order(
order_id: str,
handler: QueryHandler = Depends(get_query_handler)
):
"""Get order by ID."""
order = handler.get_order(order_id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order
@app.get("/orders", response_model=list[OrderDTO])
async def list_orders(
customer_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 50,
handler: QueryHandler = Depends(get_query_handler)
):
"""List orders with filters."""
if customer_id:
return handler.get_orders_by_customer(customer_id, limit)
elif status:
return handler.get_orders_by_status(status, limit)
return []
@app.put("/orders/{order_id}/status")
async def update_order_status(
order_id: str,
command: UpdateStatusCommand,
handler: CommandHandler = Depends(get_command_handler)
):
"""Update order status."""
try:
handler.update_order_status(order_id, command.new_status)
return {"success": True}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
CQRS with Different Data Stores
Write Database: PostgreSQL
# PostgreSQL for normalized writes
class PostgresWriteRepository:
"""Normalized relational store for writes."""
def __init__(self, connection_string: str):
import psycopg2
self.conn = psycopg2.connect(connection_string)
def create_order(self, order: OrderEntity) -> str:
"""Insert normalized order."""
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO orders (id, customer_id, total_amount, status,
shipping_address_id, created_at, updated_at, version)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (order.id, order.customer_id, order.total_amount, order.status,
order.shipping_address_id, order.created_at, order.updated_at,
order.version))
# Insert order items
for item in order.items:
cur.execute("""
INSERT INTO order_items (order_id, product_id, quantity,
price, created_at)
VALUES (%s, %s, %s, %s, %s)
""", (order.id, item['product_id'], item['quantity'],
item['price'], datetime.utcnow()))
self.conn.commit()
return order.id
def get_order(self, order_id: str) -> Optional[OrderEntity]:
"""Get order with items."""
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT * FROM orders WHERE id = %s
""", (order_id,))
row = cur.fetchone()
if not row:
return None
# Get items
cur.execute("SELECT * FROM order_items WHERE order_id = %s", (order_id,))
items = [dict(row) for row in cur.fetchall()]
return OrderEntity(
id=row['id'],
customer_id=row['customer_id'],
items=items,
total_amount=row['total_amount'],
status=row['status'],
shipping_address_id=row['shipping_address_id'],
created_at=row['created_at'],
updated_at=row['updated_at'],
version=row['version'],
)
Read Database: MongoDB
# MongoDB for denormalized reads
class MongoReadRepository:
"""Denormalized document store for reads."""
def __init__(self, connection_string: str, database: str):
from pymongo import MongoClient
self.client = MongoClient(connection_string)
self.db = self.client[database]
self.orders = self.db['order_views']
def upsert_order_view(self, view: OrderView):
"""Upsert denormalized order view."""
self.orders.update_one(
{'order_id': view.order_id},
{'$set': asdict(view)},
upsert=True
)
def get_order_view(self, order_id: str) -> Optional[OrderView]:
"""Get denormalized order view."""
doc = self.orders.find_one({'order_id': order_id})
if doc:
doc.pop('_id', None)
return OrderView(**doc)
return None
def get_customer_orders(self, customer_email: str,
limit: int = 50) -> list[OrderView]:
"""Get customer's orders."""
cursor = self.orders.find(
{'customer_email': customer_email}
).sort('created_at', -1).limit(limit)
return [OrderView(**{k: v for k, v in doc.items() if k != '_id'})
for doc in cursor]
Read Database: Elasticsearch
# Elasticsearch for complex search queries
class ElasticsearchReadRepository:
"""Search-optimized read store."""
def __init__(self, hosts: list[str]):
from opensearchpy import OpenSearch
self.client = OpenSearch(hosts=hosts)
self.index = 'orders'
def index_order(self, order: OrderView):
"""Index order for search."""
self.client.index(
index=self.index,
id=order.order_id,
body=asdict(order),
refresh=True
)
def search_orders(self, query: str, filters: dict = None,
size: int = 100) -> list[dict]:
"""Full-text search on orders."""
body = {
'query': {
'bool': {
'must': [
{'multi_match': {
'query': query,
'fields': ['customer_name', 'customer_email',
'items_summary']
}}
]
}
},
'size': size,
}
if filters:
body['query']['bool']['filter'] = [
{'term': {k: v}} for k, v in filters.items()
]
response = self.client.search(index=self.index, body=body)
return [hit['_source'] for hit in response['hits']['hits']]
CQRS with Materialized Views
class MaterializedViewUpdater:
"""Update materialized views for complex queries."""
def __init__(self, write_db, read_db):
self.write_db = write_db
self.read_db = read_db
def refresh_customer_order_summary(self, customer_id: str):
"""Refresh aggregated customer statistics."""
# Query from write database
with self.write_db.cursor() as cur:
cur.execute("""
SELECT
customer_id,
COUNT(*) as total_orders,
SUM(total_amount) as total_spent,
AVG(total_amount) as avg_order_value,
MIN(created_at) as first_order_date,
MAX(created_at) as last_order_date
FROM orders
WHERE customer_id = %s
GROUP BY customer_id
""", (customer_id,))
row = cur.fetchone()
if row:
# Update read database
self.read_db.customer_summaries.upsert({
'customer_id': row[0],
'total_orders': row[1],
'total_spent': row[2],
'avg_order_value': row[3],
'first_order_date': row[4],
'last_order_date': row[5],
}, key='customer_id')
def refresh_daily_sales(self, date: str):
"""Refresh daily sales aggregation."""
with self.write_db.cursor() as cur:
cur.execute("""
SELECT
DATE(created_at) as date,
status,
COUNT(*) as order_count,
SUM(total_amount) as revenue
FROM orders
WHERE DATE(created_at) = %s
GROUP BY DATE(created_at), status
""", (date,))
for row in cur.fetchall():
self.read_db.daily_sales.upsert({
'date': row[0],
'status': row[1],
'order_count': row[2],
'revenue': row[3],
}, key=['date', 'status'])
CQRS Benefits and Trade-offs
Benefits
| Benefit | Description |
|---|---|
| Independent Scaling | Scale read and write infrastructure separately |
| Optimized Models | Different models for different purposes |
| Performance | Optimized read queries without impacting writes |
| Flexibility | Different data stores for different needs |
| Complexity Management | Simpler domain models |
Trade-offs
| Trade-off | Description |
|---|---|
| Complexity | More complex than simple CRUD |
| Consistency | Eventual consistency between models |
| Learning Curve | Requires understanding of patterns |
| Overhead | More infrastructure to manage |
When to Use CQRS
- Complex read queries that impact write performance
- Different read and write scaling requirements
- Event-driven architectures
- Systems requiring multiple read representations
- High-throughput write workloads
- Teams working on reads and writes separately
Conclusion
CQRS is a powerful pattern for building systems with different read and write requirements. By separating these concerns, you can optimize each side independently, use appropriate technologies for each workload, and scale more effectively.
Key takeaways:
- Use CQRS when reads and writes have different requirements
- Consider event sourcing for complex domain models
- Use separate data stores optimized for each operation
- Implement eventual consistency for read models
- Start simple and evolve as needed
Comments