Skip to main content

Event Sourcing vs CQRS: When to Use Each Pattern

Created: March 6, 2026 Larry Qu 9 min read

In the world of distributed systems and domain-driven design, two architectural patterns frequently emerge in conversations about building scalable, maintainable applications: Event Sourcing and CQRS (Command Query Responsibility Segregation). While they often appear together, they solve different problems and can be used independently.

Understanding when to use each pattern—and when to combine them—is crucial for building systems that can evolve, scale, and maintain consistency over time.

The Problem with Traditional Persistence

Traditional applications typically use a CRUD (Create, Read, Update, Delete) approach:

## Traditional CRUD approach
class OrderRepository:
    def create(self, order_data):
        # Insert into database
        db.insert("orders", order_data)
    
    def read(self, order_id):
        # Select from database
        return db.select("orders", id=order_id)
    
    def update(self, order_id, changes):
        # Update database
        db.update("orders", id=order_id, data=changes)
    
    def delete(self, order_id):
        # Delete from database
        db.delete("orders", id=order_id)

This approach has limitations:

  • You lose history of changes
  • Optimistic locking leads to race conditions
  • Read and write paths compete for the same resources
  • Scaling becomes difficult

Event Sourcing: Storing State as a Sequence of Events

Event Sourcing is a pattern where instead of storing the current state of an entity, you store a sequence of events that led to that state. The current state is derived by replaying all events.

Core Concept

## Instead of storing current state:
order_current_state = {
    "id": "order-123",
    "status": "shipped",
    "items": [{"product": "Widget", "qty": 2}],
    "total": 29.98
}

## Event Sourcing stores events:
order_events = [
    {"type": "OrderCreated", "data": {...}},
    {"type": "ItemAdded", "data": {...}},
    {"type": "PaymentReceived", "data": {...}},
    {"type": "OrderShipped", "data": {...}}
]

Event Store Implementation

from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Any
import json

@dataclass
class Event:
    event_id: str
    aggregate_id: str
    event_type: str
    data: Dict[str, Any]
    timestamp: datetime
    version: int

class EventStore:
    def __init__(self):
        self.events: Dict[str, List[Event]] = {}
    
    def append_event(self, event: Event):
        if event.aggregate_id not in self.events:
            self.events[event.aggregate_id] = []
        
        self.events[event.aggregate_id].append(event)
    
    def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]:
        return self.events.get(aggregate_id, [])
    
    def get_all_events(self, aggregate_type: str) -> List[Event]:
        """Get events across all aggregates of a type."""
        return [
            event for events in self.events.values()
            for event in events
            if event.aggregate_id.startswith(aggregate_type)
        ]

Aggregate Reconstruction

class Order:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.status = "draft"
        self.items: List[Dict] = []
        self.total = 0.0
        self.version = 0
    
    def apply(self, event: Event):
        """Apply event to rebuild state."""
        self.version = event.version
        
        if event.event_type == "OrderCreated":
            self.order_id = event.data["order_id"]
            self.status = "created"
        
        elif event.event_type == "ItemAdded":
            self.items.append(event.data["item"])
            self.total += event.data["item"]["price"] * event.data["item"]["qty"]
            self.status = "items_added"
        
        elif event.event_type == "PaymentReceived":
            self.status = "paid"
        
        elif event.event_type == "OrderShipped":
            self.status = "shipped"
    
    @classmethod
    def reconstruct(cls, event_store: EventStore, order_id: str) -> "Order":
        """Reconstruct order from event history."""
        order = cls(order_id)
        events = event_store.get_events_for_aggregate(order_id)
        
        for event in events:
            order.apply(event)
        
        return order

Command Handling

class OrderCommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def handle_create_order(self, command: Dict) -> Event:
        order_id = command["order_id"]
        
        event = Event(
            event_id=generate_uuid(),
            aggregate_id=order_id,
            event_type="OrderCreated",
            data={
                "order_id": order_id,
                "customer_id": command["customer_id"],
                "created_at": datetime.utcnow().isoformat()
            },
            timestamp=datetime.utcnow(),
            version=1
        )
        
        self.event_store.append_event(event)
        return event
    
    def handle_add_item(self, command: Dict) -> Event:
        order_id = command["order_id"]
        
        # Get current version
        events = self.event_store.get_events_for_aggregate(order_id)
        version = len(events) + 1
        
        event = Event(
            event_id=generate_uuid(),
            aggregate_id=order_id,
            event_type="ItemAdded",
            data={
                "item": command["item"],
                "added_at": datetime.utcnow().isoformat()
            },
            timestamp=datetime.utcnow(),
            version=version
        )
        
        self.event_store.append_event(event)
        return event

When to Use Event Sourcing

## Good for Event Sourcing:

benefits:
  - Complete audit trail: Every change is stored
  - Temporal queries: "What was the state at time T?"
  - Event replay: Rebuild state from any point
  - Feature flexibility: Add new read models easily
  - Debugging: Full history of what happened

use_cases:
  - Financial systems (audit requirements)
  - Inventory management
  - Workflow engines
  - Gaming (replay functionality)
  - Any domain with complex state transitions

Problems with Event Sourcing

## Challenges to consider

challenges = {
    "event_schema_evolution": {
        "problem": "Events stored forever must handle schema changes",
        "solution": "Event upcasters / versioning"
    },
    
    "read_model_performance": {
        "problem": "Replaying events for every read is slow",
        "solution": "Projections / materialized views"
    },
    
    "eventual_consistency": {
        "problem": "Read models may be stale",
        "solution": "Accept eventual consistency"
    },
    
    "snapshotting": {
        "problem": "Too many events to replay",
        "solution": "Periodic snapshots"
    }
}

CQRS: Separating Read and Write Models

CQRS (Command Query Responsibility Segregation) is a pattern that separates the way you read data from the way you write data. Instead of having a single model for both operations, you have distinct models optimized for each.

Core Concept

┌─────────────────────────────────────────────────────────────┐
│                      Traditional Model                       │
│                                                             │
│   Client ─────► Read/Write Model ─────► Database           │
│                (Same for both)                              │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                      CQRS Model                             │
│                                                             │
│   Client ─────► Command Model ─────► Write DB              │
│       │                                                    │
│       └────► Query Model ─────► Read DB                    │
│              (Separate)                                    │
└─────────────────────────────────────────────────────────────┘

Command Side (Write Model)

class OrderCommand:
    def __init__(self, order_id, customer_id, items):
        self.order_id = order_id
        self.customer_id = customer_id
        self.items = items
        self.status = "pending"
        self.created_at = datetime.utcnow()

class OrderCommandHandler:
    def __init__(self, write_repository):
        self.repository = write_repository
    
    def handle_create_order(self, command: OrderCommand):
        # Validate business rules
        if not command.items:
            raise ValueError("Order must have items")
        
        # Create aggregate
        order = Order(
            id=command.order_id,
            customer_id=command.customer_id,
            items=command.items,
            status="created"
        )
        
        # Persist to write database
        self.repository.save(order)
        
        return order.id

Query Side (Read Model)

## Different read models for different views

class OrderListView:
    """Lightweight view for list pages."""
    def __init__(self, read_db):
        self.db = read_db
    
    def get_orders(self, limit=20, offset=0):
        return self.db.query("""
            SELECT order_id, customer_name, total, status, created_at
            FROM orders
            ORDER BY created_at DESC
            LIMIT %s OFFSET %s
        """, [limit, offset])

class OrderDetailView:
    """Detailed view for order detail page."""
    def __init__(self, read_db):
        self.db = read_db
    
    def get_order_detail(self, order_id):
        order = self.db.query_one(
            "SELECT * FROM orders WHERE order_id = %s",
            [order_id]
        )
        
        items = self.db.query(
            "SELECT * FROM order_items WHERE order_id = %s",
            [order_id]
        )
        
        timeline = self.db.query(
            "SELECT * FROM order_timeline WHERE order_id = %s",
            [order_id]
        )
        
        return {
            "order": order,
            "items": items,
            "timeline": timeline
        }

class OrderAnalyticsView:
    """Aggregated view for analytics."""
    def __init__(self, read_db):
        self.db = read_db
    
    def get_revenue_by_day(self, start_date, end_date):
        return self.db.query("""
            SELECT DATE(created_at) as day, 
                   SUM(total) as revenue,
                   COUNT(*) as order_count
            FROM orders
            WHERE created_at BETWEEN %s AND %s
            GROUP BY DATE(created_at)
        """, [start_date, end_date])

Synchronization Between Models

class ModelSynchronizer:
    def __init__(self, event_store):
        self.event_store = event_store
        self.read_databases = {
            "default": ReadDatabase(),
            "analytics": AnalyticsDatabase(),
            "search": SearchIndex()
        }
    
    def synchronize(self, event: Event):
        """Sync event to all read models."""
        if event.event_type == "OrderCreated":
            self._sync_order_created(event)
        elif event.event_type == "OrderShipped":
            self._sync_order_shipped(event)
    
    def _sync_order_created(self, event: Event):
        data = event.data
        
        # Default read DB
        self.read_databases["default"].insert("orders", {
            "order_id": data["order_id"],
            "customer_id": data["customer_id"],
            "status": "created",
            "total": 0
        })
        
        # Analytics DB
        self.read_databases["analytics"].increment("daily_orders", 
            data["created_at"].date())
        
        # Search index
        self.read_databases["search"].index("orders", {
            "id": data["order_id"],
            "text": f"Order {data['order_id']}"
        })

When to Use CQRS

## Good for CQRS:

benefits:
  - Optimized read models for specific needs
  - Different scaling strategies for reads/writes
  - Team autonomy (different teams own read/write)
  - Complex query requirements
  - Polyglot persistence (different DBs for read/write)

use_cases:
  - Read-heavy applications (dashboards, reports)
  - Complex domain with many query patterns
  - Multi-tenant SaaS applications
  - Systems with different consistency needs

Problems with CQRS

## CQRS challenges

challenges = {
    "eventual_consistency": {
        "problem": "Read models may lag behind write model",
        "solution": "Accept and communicate eventual consistency"
    },
    
    "complexity": {
        "problem": "Two models instead of one",
        "solution": "Only use when benefits justify complexity"
    },
    
    "synchronization": {
        "problem": "Keeping read models in sync",
        "solution": "Reliable event-driven sync, saga patterns"
    },
    
    "testing": {
        "problem": "More components to test",
        "solution": "Integration tests for sync, unit for each model"
    }
}

Event Sourcing + CQRS: A Powerful Combination

While both patterns can be used independently, they complement each other exceptionally well.

Combined Architecture

┌─────────────────────────────────────────────────────────────┐
│                  Event Sourcing + CQRS                       │
│                                                             │
│   Commands ──► Command Handler ──► Event Store            │
│                          │                                  │
│                          ▼                                  │
│   Events ──────► Projections ──► Read Models               │
│                          │                                  │
│              ┌───────────┼───────────┐                     │
│              ▼           ▼           ▼                     │
│         List View   Detail View  Analytics                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Full Implementation

class OrderAggregate:
    """Order aggregate with event sourcing."""
    
    def __init__(self, order_id: str_id = order_id):
        self.order
        self.events: List[Event] = []
        self.status = "draft"
        self.items: List[Dict] = []
        self.total = 0.0
        self.version = 0
    
    def create_order(self, customer_id: str):
        if self.status != "draft":
            raise ValueError("Order already created")
        
        event = Event(
            aggregate_id=self.order_id,
            event_type="OrderCreated",
            data={"customer_id": customer_id},
            version=self.version + 1
        )
        self._apply(event)
        self.events.append(event)
    
    def add_item(self, product_id: str, quantity: int, price: float):
        if self.status == "shipped":
            raise ValueError("Cannot add items to shipped order")
        
        event = Event(
            aggregate_id=self.order_id,
            event_type="ItemAdded",
            data={
                "product_id": product_id,
                "quantity": quantity,
                "price": price
            },
            version=self.version + 1
        )
        self._apply(event)
        self.events.append(event)
    
    def ship_order(self):
        if self.status != "paid":
            raise ValueError("Can only ship paid orders")
        
        event = Event(
            aggregate_id=self.order_id,
            event_type="OrderShipped",
            data={"shipped_at": datetime.utcnow().isoformat()},
            version=self.version + 1
        )
        self._apply(event)
        self.events.append(event)
    
    def _apply(self, event: Event):
        self.version = event.version
        
        if event.event_type == "OrderCreated":
            self.status = "created"
        elif event.event_type == "ItemAdded":
            self.items.append(event.data)
            self.total += event.data["price"] * event.data["quantity"]
        elif event.event_type == "OrderShipped":
            self.status = "shipped"
    
    def get_uncommitted_events(self) -> List[Event]:
        return self.events


class ProjectionBuilder:
    """Build read models from events."""
    
    def __init__(self):
        self.order_list: Dict = {}
        self.order_detail: Dict = {}
        self.daily_stats: Dict = {}
    
    def project(self, event: Event):
        if event.event_type == "OrderCreated":
            self._project_order_created(event)
        elif event.event_type == "ItemAdded":
            self._project_item_added(event)
        elif event.event_type == "OrderShipped":
            self._project_order_shipped(event)
    
    def _project_order_created(self, event: Event):
        order_id = event.aggregate_id
        
        self.order_list[order_id] = {
            "order_id": order_id,
            "customer_id": event.data["customer_id"],
            "status": "created",
            "total": 0,
            "created_at": event.timestamp
        }
        
        self.order_detail[order_id] = {
            "order_id": order_id,
            "items": [],
            "timeline": [f"Order created at {event.timestamp}"]
        }
    
    def _project_item_added(self, event: Event):
        order_id = event.aggregate_id
        
        if order_id in self.order_list:
            self.order_list[order_id]["total"] += (
                event.data["price"] * event.data["quantity"]
            )
        
        if order_id in self.order_detail:
            self.order_detail[order_id]["items"].append(event.data)
    
    def _project_order_shipped(self, event: Event):
        order_id = event.aggregate_id
        
        if order_id in self.order_list:
            self.order_list[order_id]["status"] = "shipped"
        
        if order_id in self.order_detail:
            self.order_detail[order_id]["timeline"].append(
                f"Order shipped at {event.data['shipped_at']}"
            )

Decision Matrix

Should You Use Event Sourcing?

## Use Event Sourcing when:
use_event_sourcing = (
    need_audit_trail or
    need_temporal_queries or
    complex_state_machine or
    want_event_replay or
    building_event_driven_system
)

## Avoid when:
avoid_event_sourcing = (
    simple_crud_app or
    low_volume or
    strong_agility_objection or
    team_new_to_pattern
)

Should You Use CQRS?

## Use CQRS when:
use_cqrs = (
    read_write_ratio_imbalanced or
    multiple_read_views_needed or
    different_consistency_needs or
    want_polyglot_persistence or
    building_microservices
)

## Avoid when:
avoid_cqrs = (
    simple_read_write_pattern or
    low_complexity_domain or
    strong_consistency_required or
    team_size_small
)

Comparison Summary

Aspect Event Sourcing CQRS
Primary Benefit Complete history, audit trail Optimized read/write paths
State Storage Events (immutable) Current state
Read Model Replay events Pre-built projections
Consistency Eventual Eventual (or strong if synced)
Complexity Higher Medium
Storage Growing (append-only) Optimized per view
Queries Rebuild from events Direct reads
Debugging Full history Check read model

Implementation Tools

## Popular frameworks supporting these patterns

event_sourcing:
  - EventStoreDB (formerly Event Store)
  - Axon Framework (Java)
  - EventFlow (.NET)
  - Lagom (Scala/Java)
  - Prooph (PHP)

cqrs_frameworks:
  - Many CQRS implementations are custom
  - Doctrine (PHP) has CQRS support
  - Microservices frameworks generally support it

message_brokers:
  - Kafka (popular for event sourcing)
  - RabbitMQ
  - NATS

Conclusion

Event Sourcing and CQRS are powerful patterns that address different architectural challenges:

  • Event Sourcing stores state as a sequence of events, providing complete history and auditability
  • CQRS separates read and write models, allowing optimization for different access patterns
  • Together, they form a robust foundation for complex, scalable systems

However, they introduce significant complexity. Start with simpler patterns and evolve toward these approaches only when the benefits outweigh the costs.

Remember:

  • Don’t use these patterns because they’re trendy
  • Use them when your domain genuinely needs the capabilities they provide
  • Start small and measure the impact

Comments

Share this article

Scan to read on mobile