Skip to main content
โšก Calmops

CQRS and Event Sourcing: Practical Implementation Patterns 2026

Introduction

Traditional architectures use a single model for both reading and writing data. This works well for simple applications but creates challenges as systems grow in complexity. CQRS (Command Query Responsibility Segregation) and Event Sourcing offer alternative approaches that separate read and write operations, enabling better scalability, flexibility, and auditability.

In 2026, these patterns have matured significantly, with established frameworks and real-world implementations demonstrating their value. This guide explores CQRS and Event Sourcing patterns, implementation strategies, and best practices.

Understanding CQRS

What Is CQRS?

CQRS separates read and write operations into distinct models:

  • Commands: Operations that change state (Create, Update, Delete)
  • Queries: Operations that read state without modification

This separation allows optimizing each operation type independently.

Traditional vs CQRS

Traditional CRUD:

# Single model for both read and write
class OrderController:
    def get_order(self, order_id):
        return self.db.query(Order).filter_by(id=order_id).first()
    
    def create_order(self, data):
        order = Order(**data)
        self.db.add(order)
        self.db.commit()
        return order
    
    def update_order(self, order_id, data):
        order = self.db.query(Order).filter_by(id=order_id).first()
        for key, value in data.items():
            setattr(order, key, value)
        self.db.commit()
        return order

CQRS:

# Separate models for read and write

# Write model - focused on business logic
class OrderCommandHandler:
    def create_order(self, data):
        # Validate business rules
        validator = OrderValidator()
        validator.validate(data)
        
        # Create aggregate
        order = Order.create(**data)
        
        # Save to event store
        self.event_store.save(order.events)
        
        return order.id

# Read model - optimized for queries
class OrderQueryHandler:
    def get_order_summary(self, order_id):
        return self.read_db.query(OrderSummaryDTO).filter_by(id=order_id).first()
    
    def get_orders_by_customer(self, customer_id):
        return self.read_db.query(OrderListDTO).filter_by(customer_id=customer_id).all()

Benefits of CQRS

Optimized read models: Different read models for different views:

# Product list view - minimal data
class ProductListItem:
    id: str
    name: str
    price: float
    thumbnail_url: str

# Product detail view - full data
class ProductDetail:
    id: str
    name: str
    description: str
    price: float
    inventory: int
    images: list[str]
    specifications: dict
    reviews: list[Review]

Optimized write models: Focused on business logic:

# Write model - complex validation
class Order:
    def __init__(self):
        self.items = []
        self.status = "draft"
    
    def add_item(self, product, quantity):
        if product.inventory < quantity:
            raise InsufficientInventoryError()
        
        if self.status != "draft":
            raise OrderNotEditableError()
        
        self.items.append(OrderItem(product, quantity))
    
    def submit(self):
        if not self.items:
            raise EmptyOrderError()
        
        self.status = "submitted"

Independent scaling: Read and write workloads scale separately:

# Kubernetes: Scale reads independently
resources:
  read-api:
    replicas: 10
  write-api:
    replicas: 2

Event Sourcing

What Is Event Sourcing?

Instead of storing current state, event sourcing stores all state changes as a sequence of events:

# Traditional: Store current state
class BankAccount:
    balance: 100  # Current balance

# Event Sourcing: Store all changes
class BankAccountEvents:
    events: [
        AccountCreated(account_id="acc_1", initial_balance=0),
        DepositMade(account_id="acc_1", amount=50, new_balance=50),
        DepositMade(account_id="acc_1", amount=50, new_balance=100),
    ]

# Current state = replay all events
def get_balance(events):
    balance = 0
    for event in events:
        if isinstance(event, DepositMade):
            balance = event.new_balance
        elif isinstance(event, WithdrawalMade):
            balance = event.new_balance
    return balance

Event Structure

from dataclasses import dataclass
from datetime import datetime
from typing import Any

@dataclass
class Event:
    event_id: str
    aggregate_id: str
    event_type: str
    timestamp: datetime
    payload: dict[str, Any]
    metadata: dict[str, Any]

@dataclass
class OrderCreated(Event):
    event_type: str = "OrderCreated"
    
    def __init__(self, order_id: str, customer_id: str, items: list):
        self.event_id = str(uuid.uuid4())
        self.aggregate_id = order_id
        self.event_type = "OrderCreated"
        self.timestamp = datetime.now()
        self.payload = {
            "customer_id": customer_id,
            "items": items
        }
        self.metadata = {}

Event Store

class EventStore:
    def __init__(self, connection):
        self.connection = connection
    
    def append(self, aggregate_id: str, events: list[Event]):
        for event in events:
            self.connection.execute(
                """
                INSERT INTO events (aggregate_id, event_type, payload, metadata, created_at)
                VALUES (?, ?, ?, ?, ?)
                """,
                aggregate_id,
                event.event_type,
                json.dumps(event.payload),
                json.dumps(event.metadata),
                event.timestamp
            )
    
    def get_events(self, aggregate_id: str) -> list[Event]:
        rows = self.connection.execute(
            "SELECT * FROM events WHERE aggregate_id = ? ORDER BY created_at",
            aggregate_id
        )
        
        return [self.row_to_event(row) for row in rows]
    
    def get_events_stream(self, from_position: int = 0) -> EventStream:
        return EventStream(self.connection, from_position)

Projections

Build read models from events:

# Materialized view projection
class OrderSummaryProjection:
    def __init__(self, read_db):
        self.read_db = read_db
    
    def project(self, event: Event):
        if event.event_type == "OrderCreated":
            self._handle_order_created(event)
        elif event.event_type == "OrderItemAdded":
            self._handle_item_added(event)
        elif event.event_type == "OrderSubmitted":
            self._handle_order_submitted(event)
    
    def _handle_order_created(self, event):
        self.read_db.execute(
            """
            INSERT INTO order_summaries (order_id, customer_id, total, status, created_at)
            VALUES (?, ?, 0, 'draft', ?)
            """,
            event.aggregate_id,
            event.payload["customer_id"],
            event.timestamp
        )

CQRS + Event Sourcing Together

Architecture Overview

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Client    โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚   Command   โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚ Event Store โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ”‚   Handler   โ”‚     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ”‚
                                              โ”‚ Events
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚   Query     โ”‚โ—€โ”€โ”€โ”€โ”€โ”‚ Projection  โ”‚
โ”‚   Client    โ”‚โ—€โ”€โ”€โ”€โ”€โ”‚   Handler  โ”‚     โ”‚   Engine    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Implementation Example

# Domain Events
@dataclass
class OrderEvent(Event):
    pass

@dataclass
class OrderCreated(OrderEvent):
    def __init__(self, order_id: str, customer_id: str):
        super().__init__()
        self.aggregate_id = order_id
        self.event_type = "OrderCreated"
        self.payload = {"customer_id": customer_id}

@dataclass
class OrderItemAdded(OrderEvent):
    def __init__(self, order_id: str, product_id: str, quantity: int):
        super().__init__()
        self.aggregate_id = order_id
        self.event_type = "OrderItemAdded"
        self.payload = {"product_id": product_id, "quantity": quantity}

# Aggregate
class OrderAggregate:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.events = []
        self.items = []
        self.status = "draft"
    
    def create(self, customer_id: str):
        self.events.append(OrderCreated(self.order_id, customer_id))
        return self
    
    def add_item(self, product_id: str, quantity: int):
        self.events.append(OrderItemAdded(self.order_id, product_id, quantity))
        return self
    
    def get_uncommitted_events(self) -> list[Event]:
        return self.events
    
    def mark_events_committed(self):
        self.events = []

# Command Handler
class OrderCommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def handle_create_order(self, command: CreateOrderCommand) -> str:
        aggregate = OrderAggregate(str(uuid.uuid4()))
        
        # Build aggregate from events if updating existing
        existing_events = self.event_store.get_events(command.order_id)
        # ... replay events to restore state
        
        # Execute command
        aggregate.create(command.customer_id)
        for item in command.items:
            aggregate.add_item(item["product_id"], item["quantity"])
        
        # Save events
        self.event_store.append(
            aggregate.order_id,
            aggregate.get_uncommitted_events()
        )
        
        return aggregate.order_id

# Query Handler  
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db
    
    def get_order_summary(self, order_id: str) -> OrderSummaryDTO:
        return self.read_db.query(OrderSummaryDTO).filter_by(
            order_id=order_id
        ).first()

Handling Concurrency

# Optimistic concurrency with events
class EventStore:
    def append(self, aggregate_id: str, expected_version: int, events: list[Event]):
        current_version = self.get_current_version(aggregate_id)
        
        if current_version != expected_version:
            raise ConcurrencyError(
                f"Expected version {expected_version}, found {current_version}"
            )
        
        # Store events with version
        for i, event in enumerate(events):
            self.connection.execute(
                "INSERT INTO events VALUES (?, ?, ?, ?, ?)",
                aggregate_id,
                expected_version + i + 1,
                event.event_type,
                json.dumps(event.payload),
                event.timestamp
            )

Practical Considerations

When to Use CQRS

Good candidates:

  • Complex domain logic with validation
  • Multiple read models needed
  • High read-to-write ratio
  • Audit requirements
  • Event-driven architectures

Probably not needed for:

  • Simple CRUD applications
  • Low complexity domains
  • Single read model

When to Use Event Sourcing

Good candidates:

  • Audit requirements (complete history)
  • Temporal queries (“what was the state at time X?”)
  • Complex business rules requiring history
  • Rebuilding state from events
  • Event-driven microservices

Probably not needed for:

  • Simple state management
  • Performance-critical writes
  • Eventually consistent reads acceptable

Challenges

Complexity: More moving parts than simple CRUD

Event schema evolution: Handling schema changes over time

# Event versioning
@dataclass
class OrderCreatedV1:
    event_type: str = "OrderCreated"
    version: int = 1
    payload: dict = field(default_factory=lambda: {
        "customer_id": None
    })

@dataclass  
class OrderCreatedV2:
    event_type: str = "OrderCreated"
    version: int = 2
    payload: dict = field(default_factory=lambda: {
        "customer_id": None,
        "shipping_address": None  # Added field
    })

# Upcaster
class EventUpcaster:
    def upcast(self, event: Event) -> Event:
        if event.version < 2:
            return self._upgrade_to_v2(event)
        return event
    
    def _upgrade_to_v2(self, event):
        event.payload["shipping_address"] = None  # Default value
        event.version = 2
        return event

Performance: Replaying many events can be slow

# Snapshotting for performance
class SnapshotStore:
    def save_snapshot(self, aggregate_id: str, version: int, state: dict):
        self.db.execute(
            "INSERT INTO snapshots VALUES (?, ?, ?)",
            aggregate_id, version, json.dumps(state)
        )
    
    def get_snapshot(self, aggregate_id: str) -> tuple[int, dict] | None:
        row = self.db.query(
            "SELECT version, state FROM snapshots WHERE aggregate_id = ? "
            "ORDER BY version DESC LIMIT 1",
            aggregate_id
        ).first()
        
        if row:
            return row.version, json.loads(row.state)
        return None

Tools and Frameworks

Event Sourcing

  • Axon Framework: Java/Kotlin CQRS/ES framework
  • EventStoreDB: Purpose-built event store
  • Marten: .NET event sourcing on PostgreSQL
  • AggregateSource: .NET ES library

Read Models

  • Elasticsearch: Full-text search and analytics
  • PostgreSQL: Relational read models
  • Redis: Cached read models
  • MongoDB: Document read models

Best Practices

Keep Events Immutable

# Wrong: Mutable event
class BadEvent:
    def __init__(self):
        self.value = 0
    def update(self, new_value):
        self.value = new_value  # Don't do this!

# Right: Immutable events
@dataclass(frozen=True)
class GoodEvent:
    old_value: int
    new_value: int

Design Events Carefully

# Good: Intent-focused event
class OrderSubmitted:
    order_id: str
    submitted_at: datetime
    total: Decimal

# Avoid: Implementation-focused event
class OrderTableUpdated:
    # What does this mean?
    table: str = "orders"
    operation: str = "UPDATE"

Handle Failures

# Idempotent event processing
class IdempotentEventHandler:
    def __init__(self, event_store):
        self.event_store = event_store
        self.processed_events = set()
    
    def handle(self, event: Event):
        # Check if already processed
        if event.event_id in self.processed_events:
            return
        
        # Process event
        self._process(event)
        
        # Mark as processed
        self.processed_events.add(event.event_id)

Conclusion

CQRS and Event Sourcing provide powerful patterns for building complex applications. CQRS enables optimized read and write models, while Event Sourcing provides complete audit trails and temporal queries.

These patterns add complexityโ€”only adopt when the benefits justify it. Start with simpler approaches and evolve as requirements demand.

The combination works particularly well for event-driven microservices, audit-critical systems, and domains with complex state transitions. The initial investment pays dividends in flexibility and understanding.

Comments