Skip to main content
โšก Calmops

CQRS Pattern: Command Query Responsibility Segregation

Introduction

The Command Query Responsibility Segregation (CQRS) pattern is an architectural approach that separates read and write operations for a data store. While traditional CRUD applications treat reads and writes identically, CQRS recognizes that these operations have fundamentally different requirements, performance characteristics, and scaling needs.

CQRS enables you to optimize each side independentlyโ€”using different data models, different storage technologies, and different scaling strategies for reads versus writes.

Understanding CQRS

The Problem with CRUD

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  Traditional CRUD Architecture                    โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                                 โ”‚
โ”‚    โ”‚ Client  โ”‚                                                 โ”‚
โ”‚    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜                                                 โ”‚
โ”‚         โ”‚                                                       โ”‚
โ”‚         โ–ผ                                                       โ”‚
โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                                 โ”‚
โ”‚    โ”‚ Service โ”‚  Same model for reads and writes                โ”‚
โ”‚    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜                                                 โ”‚
โ”‚         โ”‚                                                       โ”‚
โ”‚         โ–ผ                                                       โ”‚
โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                                 โ”‚
โ”‚    โ”‚Database โ”‚                                                 โ”‚
โ”‚    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                                 โ”‚
โ”‚                                                                 โ”‚
โ”‚  Problems:                                                      โ”‚
โ”‚  - Single model serves both read and write poorly              โ”‚
โ”‚  - Cannot scale reads and writes independently                 โ”‚
โ”‚  - Complex queries impact write performance                    โ”‚
โ”‚  - Read models often denormalized for performance              โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

CQRS Solution

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                     CQRS Architecture                             โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                                 โ”‚
โ”‚    โ”‚ Client  โ”‚                                                 โ”‚
โ”‚    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜                                                 โ”‚
โ”‚         โ”‚                                                       โ”‚
โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”                                                  โ”‚
โ”‚    โ”‚         โ”‚                                                  โ”‚
โ”‚    โ–ผ         โ–ผ                                                  โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”                                            โ”‚
โ”‚ โ”‚Commandโ”‚  โ”‚Query โ”‚                                            โ”‚
โ”‚ โ”‚ Side  โ”‚  โ”‚ Side โ”‚                                            โ”‚
โ”‚ โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”˜                                            โ”‚
โ”‚    โ”‚         โ”‚                                                  โ”‚
โ”‚    โ–ผ         โ–ผ                                                  โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”                                            โ”‚
โ”‚ โ”‚ Write โ”‚  โ”‚ Read โ”‚                                            โ”‚
โ”‚ โ”‚ Model โ”‚  โ”‚Model โ”‚                                            โ”‚
โ”‚ โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”˜                                            โ”‚
โ”‚    โ”‚         โ”‚                                                  โ”‚
โ”‚    โ–ผ         โ–ผ                                                  โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”                                            โ”‚
โ”‚ โ”‚ Write โ”‚  โ”‚ Read โ”‚                                            โ”‚
โ”‚ โ”‚ Store โ”‚  โ”‚ Storeโ”‚                                            โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                            โ”‚
โ”‚                                                                 โ”‚
โ”‚  Benefits:                                                      โ”‚
โ”‚  - Independent scaling                                          โ”‚
โ”‚  - Optimized models for each operation                          โ”‚
โ”‚  - Different data stores for different needs                    โ”‚
โ”‚  - Better performance for complex reads                         โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

CQRS Implementation

Simple CQRS with Separate Models

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import uuid

# Write Model (normalized)
@dataclass
class OrderEntity:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    customer_id: str = ""
    items: list[dict] = field(default_factory=list)
    total_amount: float = 0.0
    status: str = "pending"
    shipping_address_id: str = ""
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
    version: int = 1

# Read Model (denormalized for display)
@dataclass
class OrderView:
    order_id: str = ""
    customer_name: str = ""
    customer_email: str = ""
    items_summary: str = ""
    item_count: int = 0
    total_amount: float = 0.0
    status_display: str = ""
    shipping_address: str = ""
    can_cancel: bool = False
    can_modify: bool = False
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None


class CommandHandler:
    """Handles write operations."""
    
    def __init__(self, db):
        self.db = db
    
    def create_order(self, customer_id: str, items: list[dict], 
                    shipping_address_id: str) -> OrderEntity:
        """Create a new order."""
        total = sum(item['price'] * item['quantity'] for item in items)
        
        order = OrderEntity(
            customer_id=customer_id,
            items=items,
            total_amount=total,
            shipping_address_id=shipping_address_id,
        )
        
        # Save to write database
        self.db.orders.insert(order)
        
        # Update read model
        self._update_read_model(order)
        
        return order
    
    def update_order_status(self, order_id: str, new_status: str):
        """Update order status."""
        order = self.db.orders.find_one(id=order_id)
        if not order:
            raise ValueError(f"Order not found: {order_id}")
        
        order.status = new_status
        order.updated_at = datetime.utcnow()
        order.version += 1
        
        self.db.orders.update(order)
        
        # Update read model
        self._update_read_model(order)
    
    def add_item(self, order_id: str, item: dict):
        """Add item to order."""
        order = self.db.orders.find_one(id=order_id)
        if not order:
            raise ValueError(f"Order not found: {order_id}")
        
        if order.status != "pending":
            raise ValueError("Cannot modify order not in pending status")
        
        order.items.append(item)
        order.total_amount += item['price'] * item['quantity']
        order.updated_at = datetime.utcnow()
        order.version += 1
        
        self.db.orders.update(order)
        self._update_read_model(order)
    
    def _update_read_model(self, order: OrderEntity):
        """Synchronously update read model."""
        # Get customer info
        customer = self.db.customers.find_one(id=order.customer_id)
        
        # Build read model
        view = OrderView(
            order_id=order.id,
            customer_name=customer.name if customer else "",
            customer_email=customer.email if customer else "",
            items_summary=f"{len(order.items)} items",
            item_count=len(order.items),
            total_amount=order.total_amount,
            status_display=order.status.title(),
            shipping_address=self._format_address(order.shipping_address_id),
            can_cancel=order.status == "pending",
            can_modify=order.status == "pending",
            created_at=order.created_at,
            updated_at=order.updated_at,
        )
        
        self.db.order_views.upsert(view, key=view.order_id)
    
    def _format_address(self, address_id: str) -> str:
        address = self.db.addresses.find_one(id=address_id)
        if not address:
            return ""
        return f"{address.street}, {address.city}, {address.country}"


class QueryHandler:
    """Handles read operations."""
    
    def __init__(self, db):
        self.db = db
    
    def get_order(self, order_id: str) -> Optional[OrderView]:
        """Get order by ID (from read model)."""
        return self.db.order_views.find_one(order_id=order_id)
    
    def get_orders_by_customer(self, customer_id: str, 
                               limit: int = 50) -> list[OrderView]:
        """Get customer's orders."""
        # This could query a separate read database
        return self.db.order_views.find(
            customer_email=self._get_customer_email(customer_id)
        ).limit(limit).all()
    
    def get_orders_by_status(self, status: str, 
                            limit: int = 100) -> list[OrderView]:
        """Get orders by status."""
        # Efficient status queries on denormalized model
        return self.db.order_views.find(
            status_display=status.title()
        ).limit(limit).all()
    
    def get_order_summary(self) -> dict:
        """Get order statistics."""
        return self.db.order_views.aggregate([
            {"$group": {
                "_id": "$status_display",
                "count": {"$sum": 1},
                "total_amount": {"$sum": "$total_amount"}
            }}
        ])
    
    def _get_customer_email(self, customer_id: str) -> str:
        customer = self.db.customers.find_one(id=customer_id)
        return customer.email if customer else ""

Asynchronous CQRS with Event Bus

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import threading
import queue

# Event definitions
@dataclass
class DomainEvent:
    event_id: str = ""
    occurred_at: datetime = field(default_factory=datetime.utcnow)
    aggregate_id: str = ""


@dataclass
class OrderCreatedEvent(DomainEvent):
    customer_id: str = ""
    items: list[dict] = field(default_factory=list)
    total_amount: float = 0.0
    shipping_address_id: str = ""


@dataclass
class OrderStatusChangedEvent(DomainEvent):
    old_status: str = ""
    new_status: str = ""


class EventBus:
    """Simple in-memory event bus."""
    
    def __init__(self):
        self.subscribers: dict[type, list[Callable]] = {}
        self.event_queue = queue.Queue()
        self.running = False
    
    def subscribe(self, event_type: type, handler: Callable):
        """Subscribe to events."""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
    
    def publish(self, event: DomainEvent):
        """Publish event."""
        self.event_queue.put(event)
    
    def start(self):
        """Start processing events."""
        self.running = True
        thread = threading.Thread(target=self._process_events)
        thread.daemon = True
        thread.start()
    
    def _process_events(self):
        """Process events from queue."""
        while self.running:
            try:
                event = self.event_queue.get(timeout=1)
                handlers = self.subscribers.get(type(event), [])
                
                for handler in handlers:
                    try:
                        handler(event)
                    except Exception as e:
                        print(f"Error handling event: {e}")
                        
            except queue.Empty:
                continue
    
    def stop(self):
        """Stop processing."""
        self.running = False


class EventSourcingOrderRepository:
    """Repository with event sourcing."""
    
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.events: list[DomainEvent] = []
        self.snapshots: dict[str, OrderEntity] = {}
    
    def append(self, event: DomainEvent):
        """Append event to stream."""
        self.events.append(event)
        self.event_bus.publish(event)
    
    def get_state(self, order_id: str) -> OrderEntity:
        """Reconstruct state from events."""
        # Check snapshot first
        if order_id in self.snapshots:
            events = self._get_events_after_snapshot(order_id)
            return self._reconstruct_from_events(self.snapshots[order_id], events)
        
        # Reconstruct from all events
        events = [e for e in self.events if e.aggregate_id == order_id]
        return self._reconstruct_from_events(None, events)
    
    def _get_events_after_snapshot(self, order_id: str) -> list[DomainEvent]:
        snapshot = self.snapshots[order_id]
        return [e for e in self.events 
                if e.aggregate_id == order_id 
                and e.occurred_at > snapshot.updated_at]
    
    def _reconstruct_from_events(self, initial: Optional[OrderEntity], 
                                  events: list[DomainEvent]) -> OrderEntity:
        state = initial or OrderEntity()
        
        for event in events:
            if isinstance(event, OrderCreatedEvent):
                state.id = event.aggregate_id
                state.customer_id = event.customer_id
                state.items = event.items
                state.total_amount = event.total_amount
                state.shipping_address_id = event.shipping_address_id
                state.status = "pending"
                
            elif isinstance(event, OrderStatusChangedEvent):
                state.status = event.new_status
                state.updated_at = event.occurred_at
        
        return state
    
    def create_snapshot(self, order: OrderEntity):
        """Create snapshot for faster reconstruction."""
        self.snapshots[order.id] = order


# Projections for different read models
class OrderViewProjection:
    """Project events to order view model."""
    
    def __init__(self, read_db):
        self.read_db = read_db
    
    def handle(self, event: DomainEvent):
        """Handle events and update read model."""
        if isinstance(event, OrderCreatedEvent):
            self._handle_created(event)
        elif isinstance(event, OrderStatusChangedEvent):
            self._handle_status_changed(event)
    
    def _handle_created(self, event: OrderCreatedEvent):
        view = OrderView(
            order_id=event.aggregate_id,
            item_count=len(event.items),
            total_amount=event.total_amount,
            status_display="Pending",
            can_cancel=True,
            can_modify=True,
            created_at=event.occurred_at,
        )
        self.read_db.order_views.insert(view)
    
    def _handle_status_changed(self, event: OrderStatusChangedEvent):
        view = self.read_db.order_views.find_one(order_id=event.aggregate_id)
        if view:
            view.status_display = event.new_status.title()
            view.can_cancel = event.new_status == "pending"
            view.can_modify = event.new_status == "pending"
            view.updated_at = event.occurred_at
            self.read_db.order_views.update(view)

API Layer with CQRS

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Write models (commands)
class CreateOrderCommand(BaseModel):
    customer_id: str
    items: list[dict]
    shipping_address_id: str

class UpdateStatusCommand(BaseModel):
    order_id: str
    new_status: str

# Read models (DTOs)
class OrderDTO(BaseModel):
    order_id: str
    customer_name: str
    customer_email: str
    items_summary: str
    total_amount: float
    status_display: str
    can_cancel: bool
    can_modify: bool


# Dependency injection
def get_command_handler():
    return CommandHandler(db)

def get_query_handler():
    return QueryHandler(db)


@app.post("/orders", response_model=OrderDTO)
async def create_order(
    command: CreateOrderCommand,
    handler: CommandHandler = Depends(get_command_handler)
):
    """Create a new order."""
    try:
        order = handler.create_order(
            customer_id=command.customer_id,
            items=command.items,
            shipping_address_id=command.shipping_address_id,
        )
        
        # Return the created order
        query_handler = get_query_handler()
        return query_handler.get_order(order.id)
        
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))


@app.get("/orders/{order_id}", response_model=OrderDTO)
async def get_order(
    order_id: str,
    handler: QueryHandler = Depends(get_query_handler)
):
    """Get order by ID."""
    order = handler.get_order(order_id)
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")
    return order


@app.get("/orders", response_model=list[OrderDTO])
async def list_orders(
    customer_id: Optional[str] = None,
    status: Optional[str] = None,
    limit: int = 50,
    handler: QueryHandler = Depends(get_query_handler)
):
    """List orders with filters."""
    if customer_id:
        return handler.get_orders_by_customer(customer_id, limit)
    elif status:
        return handler.get_orders_by_status(status, limit)
    return []


@app.put("/orders/{order_id}/status")
async def update_order_status(
    order_id: str,
    command: UpdateStatusCommand,
    handler: CommandHandler = Depends(get_command_handler)
):
    """Update order status."""
    try:
        handler.update_order_status(order_id, command.new_status)
        return {"success": True}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

CQRS with Different Data Stores

Write Database: PostgreSQL

# PostgreSQL for normalized writes
class PostgresWriteRepository:
    """Normalized relational store for writes."""
    
    def __init__(self, connection_string: str):
        import psycopg2
        self.conn = psycopg2.connect(connection_string)
    
    def create_order(self, order: OrderEntity) -> str:
        """Insert normalized order."""
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO orders (id, customer_id, total_amount, status, 
                                  shipping_address_id, created_at, updated_at, version)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                RETURNING id
            """, (order.id, order.customer_id, order.total_amount, order.status,
                  order.shipping_address_id, order.created_at, order.updated_at, 
                  order.version))
            
            # Insert order items
            for item in order.items:
                cur.execute("""
                    INSERT INTO order_items (order_id, product_id, quantity, 
                                           price, created_at)
                    VALUES (%s, %s, %s, %s, %s)
                """, (order.id, item['product_id'], item['quantity'],
                      item['price'], datetime.utcnow()))
            
            self.conn.commit()
            return order.id
    
    def get_order(self, order_id: str) -> Optional[OrderEntity]:
        """Get order with items."""
        with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute("""
                SELECT * FROM orders WHERE id = %s
            """, (order_id,))
            
            row = cur.fetchone()
            if not row:
                return None
            
            # Get items
            cur.execute("SELECT * FROM order_items WHERE order_id = %s", (order_id,))
            items = [dict(row) for row in cur.fetchall()]
            
            return OrderEntity(
                id=row['id'],
                customer_id=row['customer_id'],
                items=items,
                total_amount=row['total_amount'],
                status=row['status'],
                shipping_address_id=row['shipping_address_id'],
                created_at=row['created_at'],
                updated_at=row['updated_at'],
                version=row['version'],
            )

Read Database: MongoDB

# MongoDB for denormalized reads
class MongoReadRepository:
    """Denormalized document store for reads."""
    
    def __init__(self, connection_string: str, database: str):
        from pymongo import MongoClient
        self.client = MongoClient(connection_string)
        self.db = self.client[database]
        self.orders = self.db['order_views']
    
    def upsert_order_view(self, view: OrderView):
        """Upsert denormalized order view."""
        self.orders.update_one(
            {'order_id': view.order_id},
            {'$set': asdict(view)},
            upsert=True
        )
    
    def get_order_view(self, order_id: str) -> Optional[OrderView]:
        """Get denormalized order view."""
        doc = self.orders.find_one({'order_id': order_id})
        if doc:
            doc.pop('_id', None)
            return OrderView(**doc)
        return None
    
    def get_customer_orders(self, customer_email: str, 
                           limit: int = 50) -> list[OrderView]:
        """Get customer's orders."""
        cursor = self.orders.find(
            {'customer_email': customer_email}
        ).sort('created_at', -1).limit(limit)
        
        return [OrderView(**{k: v for k, v in doc.items() if k != '_id'}) 
                for doc in cursor]

Read Database: Elasticsearch

# Elasticsearch for complex search queries
class ElasticsearchReadRepository:
    """Search-optimized read store."""
    
    def __init__(self, hosts: list[str]):
        from opensearchpy import OpenSearch
        self.client = OpenSearch(hosts=hosts)
        self.index = 'orders'
    
    def index_order(self, order: OrderView):
        """Index order for search."""
        self.client.index(
            index=self.index,
            id=order.order_id,
            body=asdict(order),
            refresh=True
        )
    
    def search_orders(self, query: str, filters: dict = None, 
                     size: int = 100) -> list[dict]:
        """Full-text search on orders."""
        body = {
            'query': {
                'bool': {
                    'must': [
                        {'multi_match': {
                            'query': query,
                            'fields': ['customer_name', 'customer_email', 
                                      'items_summary']
                        }}
                    ]
                }
            },
            'size': size,
        }
        
        if filters:
            body['query']['bool']['filter'] = [
                {'term': {k: v}} for k, v in filters.items()
            ]
        
        response = self.client.search(index=self.index, body=body)
        
        return [hit['_source'] for hit in response['hits']['hits']]

CQRS with Materialized Views

class MaterializedViewUpdater:
    """Update materialized views for complex queries."""
    
    def __init__(self, write_db, read_db):
        self.write_db = write_db
        self.read_db = read_db
    
    def refresh_customer_order_summary(self, customer_id: str):
        """Refresh aggregated customer statistics."""
        # Query from write database
        with self.write_db.cursor() as cur:
            cur.execute("""
                SELECT 
                    customer_id,
                    COUNT(*) as total_orders,
                    SUM(total_amount) as total_spent,
                    AVG(total_amount) as avg_order_value,
                    MIN(created_at) as first_order_date,
                    MAX(created_at) as last_order_date
                FROM orders
                WHERE customer_id = %s
                GROUP BY customer_id
            """, (customer_id,))
            
            row = cur.fetchone()
            if row:
                # Update read database
                self.read_db.customer_summaries.upsert({
                    'customer_id': row[0],
                    'total_orders': row[1],
                    'total_spent': row[2],
                    'avg_order_value': row[3],
                    'first_order_date': row[4],
                    'last_order_date': row[5],
                }, key='customer_id')
    
    def refresh_daily_sales(self, date: str):
        """Refresh daily sales aggregation."""
        with self.write_db.cursor() as cur:
            cur.execute("""
                SELECT 
                    DATE(created_at) as date,
                    status,
                    COUNT(*) as order_count,
                    SUM(total_amount) as revenue
                FROM orders
                WHERE DATE(created_at) = %s
                GROUP BY DATE(created_at), status
            """, (date,))
            
            for row in cur.fetchall():
                self.read_db.daily_sales.upsert({
                    'date': row[0],
                    'status': row[1],
                    'order_count': row[2],
                    'revenue': row[3],
                }, key=['date', 'status'])

CQRS Benefits and Trade-offs

Benefits

Benefit Description
Independent Scaling Scale read and write infrastructure separately
Optimized Models Different models for different purposes
Performance Optimized read queries without impacting writes
Flexibility Different data stores for different needs
Complexity Management Simpler domain models

Trade-offs

Trade-off Description
Complexity More complex than simple CRUD
Consistency Eventual consistency between models
Learning Curve Requires understanding of patterns
Overhead More infrastructure to manage

When to Use CQRS

  • Complex read queries that impact write performance
  • Different read and write scaling requirements
  • Event-driven architectures
  • Systems requiring multiple read representations
  • High-throughput write workloads
  • Teams working on reads and writes separately

Conclusion

CQRS is a powerful pattern for building systems with different read and write requirements. By separating these concerns, you can optimize each side independently, use appropriate technologies for each workload, and scale more effectively.

Key takeaways:

  • Use CQRS when reads and writes have different requirements
  • Consider event sourcing for complex domain models
  • Use separate data stores optimized for each operation
  • Implement eventual consistency for read models
  • Start simple and evolve as needed

Resources

Comments