Skip to main content
โšก Calmops

Event Sourcing vs CQRS: When to Use Each Pattern

In the world of distributed systems and domain-driven design, two architectural patterns frequently emerge in conversations about building scalable, maintainable applications: Event Sourcing and CQRS (Command Query Responsibility Segregation). While they often appear together, they solve different problems and can be used independently.

Understanding when to use each patternโ€”and when to combine themโ€”is crucial for building systems that can evolve, scale, and maintain consistency over time.

The Problem with Traditional Persistence

Traditional applications typically use a CRUD (Create, Read, Update, Delete) approach:

# Traditional CRUD approach
class OrderRepository:
    def create(self, order_data):
        # Insert into database
        db.insert("orders", order_data)
    
    def read(self, order_id):
        # Select from database
        return db.select("orders", id=order_id)
    
    def update(self, order_id, changes):
        # Update database
        db.update("orders", id=order_id, data=changes)
    
    def delete(self, order_id):
        # Delete from database
        db.delete("orders", id=order_id)

This approach has limitations:

  • You lose history of changes
  • Optimistic locking leads to race conditions
  • Read and write paths compete for the same resources
  • Scaling becomes difficult

Event Sourcing: Storing State as a Sequence of Events

Event Sourcing is a pattern where instead of storing the current state of an entity, you store a sequence of events that led to that state. The current state is derived by replaying all events.

Core Concept

# Instead of storing current state:
order_current_state = {
    "id": "order-123",
    "status": "shipped",
    "items": [{"product": "Widget", "qty": 2}],
    "total": 29.98
}

# Event Sourcing stores events:
order_events = [
    {"type": "OrderCreated", "data": {...}},
    {"type": "ItemAdded", "data": {...}},
    {"type": "PaymentReceived", "data": {...}},
    {"type": "OrderShipped", "data": {...}}
]

Event Store Implementation

from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Any
import json

@dataclass
class Event:
    event_id: str
    aggregate_id: str
    event_type: str
    data: Dict[str, Any]
    timestamp: datetime
    version: int

class EventStore:
    def __init__(self):
        self.events: Dict[str, List[Event]] = {}
    
    def append_event(self, event: Event):
        if event.aggregate_id not in self.events:
            self.events[event.aggregate_id] = []
        
        self.events[event.aggregate_id].append(event)
    
    def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]:
        return self.events.get(aggregate_id, [])
    
    def get_all_events(self, aggregate_type: str) -> List[Event]:
        """Get events across all aggregates of a type."""
        return [
            event for events in self.events.values()
            for event in events
            if event.aggregate_id.startswith(aggregate_type)
        ]

Aggregate Reconstruction

class Order:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.status = "draft"
        self.items: List[Dict] = []
        self.total = 0.0
        self.version = 0
    
    def apply(self, event: Event):
        """Apply event to rebuild state."""
        self.version = event.version
        
        if event.event_type == "OrderCreated":
            self.order_id = event.data["order_id"]
            self.status = "created"
        
        elif event.event_type == "ItemAdded":
            self.items.append(event.data["item"])
            self.total += event.data["item"]["price"] * event.data["item"]["qty"]
            self.status = "items_added"
        
        elif event.event_type == "PaymentReceived":
            self.status = "paid"
        
        elif event.event_type == "OrderShipped":
            self.status = "shipped"
    
    @classmethod
    def reconstruct(cls, event_store: EventStore, order_id: str) -> "Order":
        """Reconstruct order from event history."""
        order = cls(order_id)
        events = event_store.get_events_for_aggregate(order_id)
        
        for event in events:
            order.apply(event)
        
        return order

Command Handling

class OrderCommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def handle_create_order(self, command: Dict) -> Event:
        order_id = command["order_id"]
        
        event = Event(
            event_id=generate_uuid(),
            aggregate_id=order_id,
            event_type="OrderCreated",
            data={
                "order_id": order_id,
                "customer_id": command["customer_id"],
                "created_at": datetime.utcnow().isoformat()
            },
            timestamp=datetime.utcnow(),
            version=1
        )
        
        self.event_store.append_event(event)
        return event
    
    def handle_add_item(self, command: Dict) -> Event:
        order_id = command["order_id"]
        
        # Get current version
        events = self.event_store.get_events_for_aggregate(order_id)
        version = len(events) + 1
        
        event = Event(
            event_id=generate_uuid(),
            aggregate_id=order_id,
            event_type="ItemAdded",
            data={
                "item": command["item"],
                "added_at": datetime.utcnow().isoformat()
            },
            timestamp=datetime.utcnow(),
            version=version
        )
        
        self.event_store.append_event(event)
        return event

When to Use Event Sourcing

# Good for Event Sourcing:

benefits:
  - Complete audit trail: Every change is stored
  - Temporal queries: "What was the state at time T?"
  - Event replay: Rebuild state from any point
  - Feature flexibility: Add new read models easily
  - Debugging: Full history of what happened

use_cases:
  - Financial systems (audit requirements)
  - Inventory management
  - Workflow engines
  - Gaming (replay functionality)
  - Any domain with complex state transitions

Problems with Event Sourcing

# Challenges to consider

challenges = {
    "event_schema_evolution": {
        "problem": "Events stored forever must handle schema changes",
        "solution": "Event upcasters / versioning"
    },
    
    "read_model_performance": {
        "problem": "Replaying events for every read is slow",
        "solution": "Projections / materialized views"
    },
    
    "eventual_consistency": {
        "problem": "Read models may be stale",
        "solution": "Accept eventual consistency"
    },
    
    "snapshotting": {
        "problem": "Too many events to replay",
        "solution": "Periodic snapshots"
    }
}

CQRS: Separating Read and Write Models

CQRS (Command Query Responsibility Segregation) is a pattern that separates the way you read data from the way you write data. Instead of having a single model for both operations, you have distinct models optimized for each.

Core Concept

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      Traditional Model                       โ”‚
โ”‚                                                             โ”‚
โ”‚   Client โ”€โ”€โ”€โ”€โ”€โ–บ Read/Write Model โ”€โ”€โ”€โ”€โ”€โ–บ Database           โ”‚
โ”‚                (Same for both)                              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      CQRS Model                             โ”‚
โ”‚                                                             โ”‚
โ”‚   Client โ”€โ”€โ”€โ”€โ”€โ–บ Command Model โ”€โ”€โ”€โ”€โ”€โ–บ Write DB              โ”‚
โ”‚       โ”‚                                                    โ”‚
โ”‚       โ””โ”€โ”€โ”€โ”€โ–บ Query Model โ”€โ”€โ”€โ”€โ”€โ–บ Read DB                    โ”‚
โ”‚              (Separate)                                    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Command Side (Write Model)

class OrderCommand:
    def __init__(self, order_id, customer_id, items):
        self.order_id = order_id
        self.customer_id = customer_id
        self.items = items
        self.status = "pending"
        self.created_at = datetime.utcnow()

class OrderCommandHandler:
    def __init__(self, write_repository):
        self.repository = write_repository
    
    def handle_create_order(self, command: OrderCommand):
        # Validate business rules
        if not command.items:
            raise ValueError("Order must have items")
        
        # Create aggregate
        order = Order(
            id=command.order_id,
            customer_id=command.customer_id,
            items=command.items,
            status="created"
        )
        
        # Persist to write database
        self.repository.save(order)
        
        return order.id

Query Side (Read Model)

# Different read models for different views

class OrderListView:
    """Lightweight view for list pages."""
    def __init__(self, read_db):
        self.db = read_db
    
    def get_orders(self, limit=20, offset=0):
        return self.db.query("""
            SELECT order_id, customer_name, total, status, created_at
            FROM orders
            ORDER BY created_at DESC
            LIMIT %s OFFSET %s
        """, [limit, offset])

class OrderDetailView:
    """Detailed view for order detail page."""
    def __init__(self, read_db):
        self.db = read_db
    
    def get_order_detail(self, order_id):
        order = self.db.query_one(
            "SELECT * FROM orders WHERE order_id = %s",
            [order_id]
        )
        
        items = self.db.query(
            "SELECT * FROM order_items WHERE order_id = %s",
            [order_id]
        )
        
        timeline = self.db.query(
            "SELECT * FROM order_timeline WHERE order_id = %s",
            [order_id]
        )
        
        return {
            "order": order,
            "items": items,
            "timeline": timeline
        }

class OrderAnalyticsView:
    """Aggregated view for analytics."""
    def __init__(self, read_db):
        self.db = read_db
    
    def get_revenue_by_day(self, start_date, end_date):
        return self.db.query("""
            SELECT DATE(created_at) as day, 
                   SUM(total) as revenue,
                   COUNT(*) as order_count
            FROM orders
            WHERE created_at BETWEEN %s AND %s
            GROUP BY DATE(created_at)
        """, [start_date, end_date])

Synchronization Between Models

class ModelSynchronizer:
    def __init__(self, event_store):
        self.event_store = event_store
        self.read_databases = {
            "default": ReadDatabase(),
            "analytics": AnalyticsDatabase(),
            "search": SearchIndex()
        }
    
    def synchronize(self, event: Event):
        """Sync event to all read models."""
        if event.event_type == "OrderCreated":
            self._sync_order_created(event)
        elif event.event_type == "OrderShipped":
            self._sync_order_shipped(event)
    
    def _sync_order_created(self, event: Event):
        data = event.data
        
        # Default read DB
        self.read_databases["default"].insert("orders", {
            "order_id": data["order_id"],
            "customer_id": data["customer_id"],
            "status": "created",
            "total": 0
        })
        
        # Analytics DB
        self.read_databases["analytics"].increment("daily_orders", 
            data["created_at"].date())
        
        # Search index
        self.read_databases["search"].index("orders", {
            "id": data["order_id"],
            "text": f"Order {data['order_id']}"
        })

When to Use CQRS

# Good for CQRS:

benefits:
  - Optimized read models for specific needs
  - Different scaling strategies for reads/writes
  - Team autonomy (different teams own read/write)
  - Complex query requirements
  - Polyglot persistence (different DBs for read/write)

use_cases:
  - Read-heavy applications (dashboards, reports)
  - Complex domain with many query patterns
  - Multi-tenant SaaS applications
  - Systems with different consistency needs

Problems with CQRS

# CQRS challenges

challenges = {
    "eventual_consistency": {
        "problem": "Read models may lag behind write model",
        "solution": "Accept and communicate eventual consistency"
    },
    
    "complexity": {
        "problem": "Two models instead of one",
        "solution": "Only use when benefits justify complexity"
    },
    
    "synchronization": {
        "problem": "Keeping read models in sync",
        "solution": "Reliable event-driven sync, saga patterns"
    },
    
    "testing": {
        "problem": "More components to test",
        "solution": "Integration tests for sync, unit for each model"
    }
}

Event Sourcing + CQRS: A Powerful Combination

While both patterns can be used independently, they complement each other exceptionally well.

Combined Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  Event Sourcing + CQRS                       โ”‚
โ”‚                                                             โ”‚
โ”‚   Commands โ”€โ”€โ–บ Command Handler โ”€โ”€โ–บ Event Store            โ”‚
โ”‚                          โ”‚                                  โ”‚
โ”‚                          โ–ผ                                  โ”‚
โ”‚   Events โ”€โ”€โ”€โ”€โ”€โ”€โ–บ Projections โ”€โ”€โ–บ Read Models               โ”‚
โ”‚                          โ”‚                                  โ”‚
โ”‚              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                     โ”‚
โ”‚              โ–ผ           โ–ผ           โ–ผ                     โ”‚
โ”‚         List View   Detail View  Analytics                  โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Full Implementation

class OrderAggregate:
    """Order aggregate with event sourcing."""
    
    def __init__(self, order_id: str_id = order_id):
        self.order
        self.events: List[Event] = []
        self.status = "draft"
        self.items: List[Dict] = []
        self.total = 0.0
        self.version = 0
    
    def create_order(self, customer_id: str):
        if self.status != "draft":
            raise ValueError("Order already created")
        
        event = Event(
            aggregate_id=self.order_id,
            event_type="OrderCreated",
            data={"customer_id": customer_id},
            version=self.version + 1
        )
        self._apply(event)
        self.events.append(event)
    
    def add_item(self, product_id: str, quantity: int, price: float):
        if self.status == "shipped":
            raise ValueError("Cannot add items to shipped order")
        
        event = Event(
            aggregate_id=self.order_id,
            event_type="ItemAdded",
            data={
                "product_id": product_id,
                "quantity": quantity,
                "price": price
            },
            version=self.version + 1
        )
        self._apply(event)
        self.events.append(event)
    
    def ship_order(self):
        if self.status != "paid":
            raise ValueError("Can only ship paid orders")
        
        event = Event(
            aggregate_id=self.order_id,
            event_type="OrderShipped",
            data={"shipped_at": datetime.utcnow().isoformat()},
            version=self.version + 1
        )
        self._apply(event)
        self.events.append(event)
    
    def _apply(self, event: Event):
        self.version = event.version
        
        if event.event_type == "OrderCreated":
            self.status = "created"
        elif event.event_type == "ItemAdded":
            self.items.append(event.data)
            self.total += event.data["price"] * event.data["quantity"]
        elif event.event_type == "OrderShipped":
            self.status = "shipped"
    
    def get_uncommitted_events(self) -> List[Event]:
        return self.events


class ProjectionBuilder:
    """Build read models from events."""
    
    def __init__(self):
        self.order_list: Dict = {}
        self.order_detail: Dict = {}
        self.daily_stats: Dict = {}
    
    def project(self, event: Event):
        if event.event_type == "OrderCreated":
            self._project_order_created(event)
        elif event.event_type == "ItemAdded":
            self._project_item_added(event)
        elif event.event_type == "OrderShipped":
            self._project_order_shipped(event)
    
    def _project_order_created(self, event: Event):
        order_id = event.aggregate_id
        
        self.order_list[order_id] = {
            "order_id": order_id,
            "customer_id": event.data["customer_id"],
            "status": "created",
            "total": 0,
            "created_at": event.timestamp
        }
        
        self.order_detail[order_id] = {
            "order_id": order_id,
            "items": [],
            "timeline": [f"Order created at {event.timestamp}"]
        }
    
    def _project_item_added(self, event: Event):
        order_id = event.aggregate_id
        
        if order_id in self.order_list:
            self.order_list[order_id]["total"] += (
                event.data["price"] * event.data["quantity"]
            )
        
        if order_id in self.order_detail:
            self.order_detail[order_id]["items"].append(event.data)
    
    def _project_order_shipped(self, event: Event):
        order_id = event.aggregate_id
        
        if order_id in self.order_list:
            self.order_list[order_id]["status"] = "shipped"
        
        if order_id in self.order_detail:
            self.order_detail[order_id]["timeline"].append(
                f"Order shipped at {event.data['shipped_at']}"
            )

Decision Matrix

Should You Use Event Sourcing?

# Use Event Sourcing when:
use_event_sourcing = (
    need_audit_trail or
    need_temporal_queries or
    complex_state_machine or
    want_event_replay or
    building_event_driven_system
)

# Avoid when:
avoid_event_sourcing = (
    simple_crud_app or
    low_volume or
    strong_agility_objection or
    team_new_to_pattern
)

Should You Use CQRS?

# Use CQRS when:
use_cqrs = (
    read_write_ratio_imbalanced or
    multiple_read_views_needed or
    different_consistency_needs or
    want_polyglot_persistence or
    building_microservices
)

# Avoid when:
avoid_cqrs = (
    simple_read_write_pattern or
    low_complexity_domain or
    strong_consistency_required or
    team_size_small
)

Comparison Summary

Aspect Event Sourcing CQRS
Primary Benefit Complete history, audit trail Optimized read/write paths
State Storage Events (immutable) Current state
Read Model Replay events Pre-built projections
Consistency Eventual Eventual (or strong if synced)
Complexity Higher Medium
Storage Growing (append-only) Optimized per view
Queries Rebuild from events Direct reads
Debugging Full history Check read model

Implementation Tools

# Popular frameworks supporting these patterns

event_sourcing:
  - EventStoreDB (formerly Event Store)
  - Axon Framework (Java)
  - EventFlow (.NET)
  - Lagom (Scala/Java)
  - Prooph (PHP)

cqrs_frameworks:
  - Many CQRS implementations are custom
  - Doctrine (PHP) has CQRS support
  - Microservices frameworks generally support it

message_brokers:
  - Kafka (popular for event sourcing)
  - RabbitMQ
  - NATS

Conclusion

Event Sourcing and CQRS are powerful patterns that address different architectural challenges:

  • Event Sourcing stores state as a sequence of events, providing complete history and auditability
  • CQRS separates read and write models, allowing optimization for different access patterns
  • Together, they form a robust foundation for complex, scalable systems

However, they introduce significant complexity. Start with simpler patterns and evolve toward these approaches only when the benefits outweigh the costs.

Remember:

  • Don’t use these patterns because they’re trendy
  • Use them when your domain genuinely needs the capabilities they provide
  • Start small and measure the impact

Comments