Skip to main content

CQRS and Event Sourcing: Practical Implementation Patterns 2026

Created: March 6, 2026 Larry Qu 19 min read

Introduction

Traditional architectures use a single model for both reading and writing data. This works well for simple applications but creates challenges as systems grow in complexity. CQRS (Command Query Responsibility Segregation) and Event Sourcing offer alternative approaches that separate read and write operations, enabling better scalability, flexibility, and auditability.

In 2026, these patterns have matured significantly. EventStoreDB has rebranded to KurrentDB (version 26.1 with SQL access and a new projection engine). Apache Kafka 4.0+ now runs in KRaft mode without ZooKeeper, and frameworks like Marten and Axon Framework continue to evolve. This guide covers CQRS and Event Sourcing patterns, implementation strategies, production considerations, and common anti-patterns to avoid.

For foundational context on why these patterns emerged, see the Event-Driven Architecture Complete Guide.

Understanding CQRS

What Is CQRS?

CQRS separates read and write operations into distinct models:

  • Commands: Operations that change state (Create, Update, Delete)
  • Queries: Operations that read state without modification

This separation allows optimizing each operation type independently. CQRS and Event Sourcing are independent patterns — you can use CQRS without Event Sourcing (using traditional databases for both sides) and Event Sourcing without CQRS (using the event log for both reads and writes). In practice, they’re often combined because Event Sourcing’s event-replay model benefits from CQRS’s read-side projections.

Traditional vs CQRS

Traditional CRUD:

## Single model for both read and write
class OrderController:
    def get_order(self, order_id):
        return self.db.query(Order).filter_by(id=order_id).first()

    def create_order(self, data):
        order = Order(**data)
        self.db.add(order)
        self.db.commit()
        return order

    def update_order(self, order_id, data):
        order = self.db.query(Order).filter_by(id=order_id).first()
        for key, value in data.items():
            setattr(order, key, value)
        self.db.commit()
        return order

CQRS:

## Separate models for read and write

## Write model - focused on business logic
class OrderCommandHandler:
    def create_order(self, data):
        validator = OrderValidator()
        validator.validate(data)
        order = Order.create(**data)
        self.event_store.save(order.events)
        return order.id

## Read model - optimized for queries
class OrderQueryHandler:
    def get_order_summary(self, order_id):
        return self.read_db.query(OrderSummaryDTO).filter_by(id=order_id).first()

    def get_orders_by_customer(self, customer_id):
        return self.read_db.query(OrderListDTO).filter_by(customer_id=customer_id).all()

Benefits of CQRS

Optimized read models: Different read models for different views:

## Product list view - minimal data
class ProductListItem:
    id: str
    name: str
    price: float
    thumbnail_url: str

## Product detail view - full data
class ProductDetail:
    id: str
    name: str
    description: str
    price: float
    inventory: int
    images: list[str]
    specifications: dict
    reviews: list[Review]

Optimized write models: Focused on business logic:

## Write model - complex validation
class Order:
    def __init__(self):
        self.items = []
        self.status = "draft"

    def add_item(self, product, quantity):
        if product.inventory < quantity:
            raise InsufficientInventoryError()
        if self.status != "draft":
            raise OrderNotEditableError()
        self.items.append(OrderItem(product, quantity))

    def submit(self):
        if not self.items:
            raise EmptyOrderError()
        self.status = "submitted"

Independent scaling: Read and write workloads scale separately:

## Kubernetes: Scale reads independently
resources:
  read-api:
    replicas: 10
  write-api:
    replicas: 2

Event Sourcing

What Is Event Sourcing?

Instead of storing current state, event sourcing stores all state changes as a sequence of events:

## Traditional: Store current state
class BankAccount:
    balance: 100  # Current balance

## Event Sourcing: Store all changes
class BankAccountEvents:
    events: [
        AccountCreated(account_id="acc_1", initial_balance=0),
        DepositMade(account_id="acc_1", amount=50, new_balance=50),
        DepositMade(account_id="acc_1", amount=50, new_balance=100),
    ]

## Current state = replay all events
def get_balance(events):
    balance = 0
    for event in events:
        if isinstance(event, DepositMade):
            balance = event.new_balance
        elif isinstance(event, WithdrawalMade):
            balance = event.new_balance
    return balance

Event Structure

from dataclasses import dataclass
from datetime import datetime
from typing import Any

@dataclass
class Event:
    event_id: str
    aggregate_id: str
    event_type: str
    timestamp: datetime
    payload: dict[str, Any]
    metadata: dict[str, Any]

@dataclass
class OrderCreated(Event):
    event_type: str = "OrderCreated"

    def __init__(self, order_id: str, customer_id: str, items: list):
        self.event_id = str(uuid.uuid4())
        self.aggregate_id = order_id
        self.event_type = "OrderCreated"
        self.timestamp = datetime.now()
        self.payload = {
            "customer_id": customer_id,
            "items": items
        }
        self.metadata = {}

Always include correlationId and causationId in event metadata. In distributed systems where a single user request generates multiple events, troubleshooting becomes nearly impossible without these two fields for tracing the causal chain.

Event Design Principles

Designing events is the most important decision in an event-sourced system. Events persist forever and are never modified. Follow these principles:

  • Name events using past-tense verbs: OrderCreated, PaymentProcessed, ItemShipped
  • Capture business intent: RoomBooked rather than ReservationStatusChanged — the first tells you what happened, the second only tells you the resulting state
  • Events must be self-contained: it should be possible to understand what happened from a single event
  • Use only simple types: string, number, boolean, arrays. Do not embed complex domain objects directly in events
  • Include a version field: essential for schema evolution
  • Model around verbs (actions), not nouns (entities)

Event Store

class EventStore:
    def __init__(self, connection):
        self.connection = connection

    def append(self, aggregate_id: str, events: list[Event]):
        for event in events:
            self.connection.execute(
                """
                INSERT INTO events (aggregate_id, event_type, payload, metadata, created_at)
                VALUES (?, ?, ?, ?, ?)
                """,
                aggregate_id,
                event.event_type,
                json.dumps(event.payload),
                json.dumps(event.metadata),
                event.timestamp
            )

    def get_events(self, aggregate_id: str) -> list[Event]:
        rows = self.connection.execute(
            "SELECT * FROM events WHERE aggregate_id = ? ORDER BY created_at",
            aggregate_id
        )
        return [self.row_to_event(row) for row in rows]

    def get_events_stream(self, from_position: int = 0) -> EventStream:
        return EventStream(self.connection, from_position)

Event Store Solution Comparison

Choosing the right event store depends on your technology stack, operational environment, and requirements:

Solution Ecosystem Storage Built-in Projections License Ops Complexity
KurrentDB (formerly EventStoreDB) Language-agnostic (gRPC) Dedicated file system Yes (JS-based) BSL (free for commercial) Medium
Axon Server JVM (Java/Kotlin) Dedicated storage No (handled by Framework) Open Source + Enterprise Medium
Marten .NET (C#) PostgreSQL Yes (C#-based) MIT Low
EventSourcingDB Language-agnostic (gRPC) Dedicated engine No Open Source Low
PostgreSQL + DIY Language-agnostic RDBMS No (build your own) Open Source High
MongoDB + DIY Language-agnostic Document DB No (build your own) SSPL High
Apache Kafka + ksqlDB Language-agnostic Distributed commit log Yes (ksqlDB) Open Source High

KurrentDB is a dedicated event store designed by Greg Young, providing native support for stream-based append-only storage, built-in projections, and catch-up subscriptions. As of 2025, EventStoreDB rebranded to Kurrent, and version 26.1 (released 2026) adds SQL access and a new projection engine.

Apache Kafka is increasingly used as the event store foundation, especially with Kafka 4.0+ running in KRaft mode (no ZooKeeper dependency), exactly-once semantics, and ksqlDB for stream processing and materialized views.

Projections

Build read models from events:

## Materialized view projection
class OrderSummaryProjection:
    def __init__(self, read_db):
        self.read_db = read_db

    def project(self, event: Event):
        if event.event_type == "OrderCreated":
            self._handle_order_created(event)
        elif event.event_type == "OrderItemAdded":
            self._handle_item_added(event)
        elif event.event_type == "OrderSubmitted":
            self._handle_order_submitted(event)

    def _handle_order_created(self, event):
        self.read_db.execute(
            """
            INSERT INTO order_summaries (order_id, customer_id, total, status, created_at)
            VALUES (?, ?, 0, 'draft', ?)
            """,
            event.aggregate_id,
            event.payload["customer_id"],
            event.timestamp
        )

Projection Pattern Classification

Projections can be implemented in three ways, each with different consistency and performance trade-offs:

Pattern Consistency Write Performance Read Latency Use Case
Inline Strong Degraded Low Small-scale, strict consistency
Async Eventual Good Low Most production systems
Live Strong (on read) Best High (replay on each read) Rare queries, ad-hoc analysis

Inline Projection: Updates the read model in the same transaction as event storage. Guarantees strong consistency but degrades write performance because the write path waits for projection completion.

Async Projection: Updates the read model asynchronously through subscriptions. Provides eventual consistency but offers good write performance. Most production systems adopt this approach. Events flow through a message broker (Kafka, RabbitMQ) or event store subscriptions to projection handlers.

Live Projection: Replays events for every request. Always guarantees the latest state, but performance drops sharply when there are many events. Suitable for administrative dashboards or debugging tools where latency is acceptable.

CQRS + Event Sourcing Together

Architecture Overview

flowchart LR
    C1[Client] --> CH[Command Handler]
    CH --> ES[Event Store]
    ES --> PE[Projection Engine]
    PE --> QH[Query Handler]
    C2[Client] --> QH

The write path (commands) and read path (queries) are completely separated. The write side appends events to the event store. The read side projects those events into read-optimized models. A message broker or event store subscription fans out events from the write side to multiple projection handlers.

Implementation Example

## Domain Events
@dataclass
class OrderEvent(Event):
    pass

@dataclass
class OrderCreated(OrderEvent):
    def __init__(self, order_id: str, customer_id: str):
        super().__init__()
        self.aggregate_id = order_id
        self.event_type = "OrderCreated"
        self.payload = {"customer_id": customer_id}

@dataclass
class OrderItemAdded(OrderEvent):
    def __init__(self, order_id: str, product_id: str, quantity: int):
        super().__init__()
        self.aggregate_id = order_id
        self.event_type = "OrderItemAdded"
        self.payload = {"product_id": product_id, "quantity": quantity}

## Aggregate
class OrderAggregate:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.events = []
        self.items = []
        self.status = "draft"

    def create(self, customer_id: str):
        self.events.append(OrderCreated(self.order_id, customer_id))
        return self

    def add_item(self, product_id: str, quantity: int):
        self.events.append(OrderItemAdded(self.order_id, product_id, quantity))
        return self

    def get_uncommitted_events(self) -> list[Event]:
        return self.events

    def mark_events_committed(self):
        self.events = []

## Command Handler
class OrderCommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store

    def handle_create_order(self, command: CreateOrderCommand) -> str:
        aggregate = OrderAggregate(str(uuid.uuid4()))

        existing_events = self.event_store.get_events(command.order_id)
        # ... replay events to restore state

        # Execute command
        aggregate.create(command.customer_id)
        for item in command.items:
            aggregate.add_item(item["product_id"], item["quantity"])

        # Save events
        self.event_store.append(
            aggregate.order_id,
            aggregate.get_uncommitted_events()
        )

        return aggregate.order_id

## Query Handler
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    def get_order_summary(self, order_id: str) -> OrderSummaryDTO:
        return self.read_db.query(OrderSummaryDTO).filter_by(
            order_id=order_id
        ).first()

Handling Concurrency

## Optimistic concurrency with events
class EventStore:
    def append(self, aggregate_id: str, expected_version: int, events: list[Event]):
        current_version = self.get_current_version(aggregate_id)

        if current_version != expected_version:
            raise ConcurrencyError(
                f"Expected version {expected_version}, found {current_version}"
            )

        # Store events with version
        for i, event in enumerate(events):
            self.connection.execute(
                "INSERT INTO events VALUES (?, ?, ?, ?, ?)",
                aggregate_id,
                expected_version + i + 1,
                event.event_type,
                json.dumps(event.payload),
                event.timestamp
            )

The expectedVersion parameter (called expectedRevision in KurrentDB) is the core of optimistic concurrency control. When two commands execute simultaneously against the same aggregate, the first to save succeeds and the second receives a WrongExpectedVersionError. The retry logic must reload the events and re-execute the command.

Snapshot Strategy

When an aggregate accumulates thousands of events, replaying the entire history on every read becomes impractical. Snapshots solve this by periodically saving the current state as a checkpoint.

Instead of replaying 10,000 account transactions, you load the most recent snapshot (taken at event 9,500) and replay only the 500 events since then.

Snapshot Application Criteria

Introduce snapshots only when justified. Use these guidelines:

  • Consider snapshots when the average events per aggregate exceeds 50
  • Snapshots are needed when event replay time exceeds 100ms
  • Create snapshots every 100 to 500 events
  • Base snapshots on business-relevant boundaries (end of day, monthly close) rather than arbitrary event counts
## Snapshotting for performance
class SnapshotStore:
    def save_snapshot(self, aggregate_id: str, version: int, state: dict):
        self.db.execute(
            "INSERT INTO snapshots VALUES (?, ?, ?)",
            aggregate_id, version, json.dumps(state)
        )

    def get_snapshot(self, aggregate_id: str) -> tuple[int, dict] | None:
        row = self.db.query(
            "SELECT version, state FROM snapshots WHERE aggregate_id = ? "
            "ORDER BY version DESC LIMIT 1",
            aggregate_id
        ).first()
        if row:
            return row.version, json.loads(row.state)
        return None

Snapshot Schema Versioning

When the aggregate structure changes, existing snapshots become invalid. Track a schemaVersion on each snapshot to detect this:

class VersionedSnapshotStore:
    SCHEMA_VERSION = 2

    def save_snapshot(self, aggregate_id: str, version: int, state: dict):
        self.db.execute(
            "INSERT INTO snapshots VALUES (?, ?, ?, ?)",
            aggregate_id, version, self.SCHEMA_VERSION, json.dumps(state)
        )

    def load_aggregate(self, aggregate_id: str) -> Aggregate:
        snapshot = self.get_snapshot(aggregate_id)
        if snapshot and snapshot.schema_version == self.SCHEMA_VERSION:
            # Start from snapshot, replay only subsequent events
            start_version = snapshot.version
            aggregate = self.reconstitute_from_snapshot(snapshot)
        else:
            # Snapshot missing or schema mismatch — full replay
            start_version = 0
            aggregate = self.create_empty_aggregate()

        events = self.event_store.get_events_since(aggregate_id, start_version)
        for event in events:
            aggregate.apply(event)
        return aggregate

Snapshots are a performance optimization, not a required component. The system must function correctly without snapshots. Always implement automatic fallback to full event replay when snapshots are corrupted or schema versions mismatch.

Event Versioning

Events are immutable — once stored, they are never modified. As your domain evolves, event schemas change. You need strategies for handling multiple event versions and migrating between schemas.

Versioning Strategy Comparison

Strategy Description Risk Suitable For
Tolerant deserialization Ignore unknown fields, use defaults for missing Low Adding optional fields
Event versioning Include a version identifier, consumers handle each version Medium Field name or type changes
Upcasting Transform at read time via middleware Medium Nonbreaking schema changes
In-place migration Rewrite historical events to new schema High Last resort — breaks immutability

Upcasting Example

Upcasting places middleware that converts previous event versions to the current version during deserialization:

## Event versioning
@dataclass
class OrderCreatedV1:
    event_type: str = "OrderCreated"
    version: int = 1
    payload: dict = field(default_factory=lambda: {
        "customer_id": None
    })

@dataclass
class OrderCreatedV2:
    event_type: str = "OrderCreated"
    version: int = 2
    payload: dict = field(default_factory=lambda: {
        "customer_id": None,
        "shipping_address": None  # Added field
    })

## Upcaster
class EventUpcaster:
    def upcast(self, event: Event) -> Event:
        if event.version < 2:
            return self._upgrade_to_v2(event)
        return event

    def _upgrade_to_v2(self, event):
        event.payload["shipping_address"] = None  # Default value
        event.version = 2
        return event

The most important principle: never modify existing events in the store. Transform at read time (upcasting) or define new event types. Kafka’s Schema Registry integrates with producers and consumers to automatically validate schemas before publishing and deserialize correctly based on schema version.

Event Ordering and Idempotency

In distributed systems, events may arrive out of order or be delivered more than once. Event handlers must handle both scenarios.

  • Sequence numbers: Tag each event in a stream with a monotonically increasing sequence number. Handlers track the last processed sequence and skip earlier events.
  • Idempotency keys: For external integrations, include an idempotency key in event metadata so downstream services can safely handle duplicates.
  • Exactly-once processing: In Kafka 4.0+, idempotent producers combined with transactional guarantees provide exactly-once semantics for event publishing.

See the Distributed Systems Fundamentals Guide for consensus and ordering patterns.

Apache Kafka Integration

Apache Kafka’s distributed commit log aligns naturally with CQRS and Event Sourcing requirements. Kafka 4.0+ (2024-2025) uses KRaft mode, removing ZooKeeper dependency and improving cluster scalability.

Kafka topics serve as the event store. Events are written to topics with configurable retention — days, weeks, or indefinitely. Kafka’s partitioning enables horizontal scaling: events with the same aggregate ID go to the same partition, maintaining order. Multiple partitions allow parallel processing across many consumers, which is crucial when rebuilding read models from millions of events.

// Command handler publishes events to Kafka (write side)
public class OrderCommandHandler {
    private final KafkaProducer<String, OrderEvent> producer;

    public void handle(CreateOrderCommand command) {
        validateOrder(command);
        OrderCreatedEvent event = new OrderCreatedEvent(
            UUID.randomUUID(),
            command.getOrderId(),
            command.getCustomerId(),
            command.getItems(),
            Instant.now()
        );
        // Order ID as key ensures partition ordering
        ProducerRecord<String, OrderEvent> record =
            new ProducerRecord<>("order-events", event.getOrderId(), event);
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                log.info("Event published to partition {} at offset {}",
                    metadata.partition(), metadata.offset());
            }
        });
    }
}

The read side consumes events and builds materialized views:

// Consumer building read model (read side)
public class OrderReadModelConsumer {
    private final OrderRepository repository;

    @KafkaListener(topics = "order-events", groupId = "order-view-builder")
    public void consume(ConsumerRecord<String, OrderEvent> record) {
        OrderEvent event = record.value();
        switch (event.getType()) {
            case ORDER_CREATED:
                OrderCreatedEvent created = (OrderCreatedEvent) event;
                repository.save(new OrderView(
                    created.getOrderId(),
                    created.getCustomerId(),
                    created.getTotalAmount(),
                    "CREATED"
                ));
                break;
            case ORDER_SHIPPED:
                repository.updateStatus(event.getOrderId(), "SHIPPED");
                break;
        }
    }
}

Using Kafka Streams or ksqlDB enables sophisticated projections with joins, aggregations, and windowing — operating directly on Kafka topics as tables and streams. Kafka’s log compaction is useful for snapshot topics, keeping only the most recent state per key while preserving the full event history in the main topic.

Anti-Patterns

Common mistakes that cause event sourcing projects to fail:

Anti-pattern 1: Storing Entire State in Events

## BAD: Putting the entire state in an event is no different from CRUD
class OrderUpdated:
    data: Order  # Entire Order object — defeats the purpose of events

## GOOD: Record only what changed
class OrderItemAdded:
    data:
        product_id: str
        quantity: int
        unit_price: Decimal

Anti-pattern 2: Calling External Services from Projections

Projections should be pure functions that update read models using only event data. If a projection calls an external API, those side effects will fire during event replay, causing duplicate charges, notifications, or API calls. Separate external integrations into dedicated Saga or Process Manager handlers.

Anti-pattern 3: Applying Event Sourcing to Every Domain

Event sourcing adds complexity. Apply it selectively to domains where change history has business value — financial transactions, order processing, inventory management. For simple CRUD domains like user settings or reference code tables, traditional storage is more appropriate.

Anti-pattern 4: Accumulating 100,000+ Events in a Stream

When a single aggregate stream exceeds 100,000 events, loading time exceeds several seconds even with snapshots. The root cause is poorly designed aggregate boundaries. Consider splitting the aggregate into smaller units or reviewing the event generation pattern. For example, an Order aggregate with a few dozen events is fine — a Customer aggregate that records every login, click, and preference change over years is not.

Anti-pattern 5: Single Mega Aggregate

Having one projector that feeds into a single large aggregate, then transforming that aggregate for every query, creates coupling and increases complexity. Instead, project directly into the shape each query needs. Multiple projections for different read models are a feature, not a bug.

Disaster Recovery and Operations

Disaster recovery in event sourcing differs from traditional systems. Since the event store is the single source of truth, projections can always be rebuilt from scratch.

Recovering from Projection Corruption

  1. Stop the projection service
  2. TRUNCATE or DROP the read model table
  3. Reset the checkpoint position for that projection
  4. Restart the projection service — it rebuilds the read model by replaying all events from the beginning
  5. Verify the projection has caught up to the current position

If you have hundreds of millions of events, design a projection architecture capable of parallel processing, or rebuild by partition.

Event Store Cluster Failure

KurrentDB uses a leader-follower architecture. When the leader goes down, a follower is automatically promoted:

  • Write failures: Retry writes that fail during leader transitions. Idempotency is guaranteed by expectedRevision
  • Subscription reconnection: Persistent subscriptions reconnect automatically; catch-up subscriptions must be manually reconnected from the last checkpoint
  • Split-brain prevention: Operate a minimum 3-node cluster for quorum-based leader election

Operational Checklist

Design Phase:

  • Does every event include eventVersion, correlationId, and causationId?
  • Is the aggregate’s invariant validation complete?
  • Is CQRS truly needed for this domain?

Implementation Phase:

  • Are projection handlers idempotent (using ON CONFLICT / upsert semantics)?
  • Is optimistic concurrency control implemented with expectedRevision?
  • Does the upcaster correctly handle all previous event versions?
  • Does the system fall back to full event replay when snapshot schemas mismatch?

Operations Phase:

  • Is the event store cluster configured with 3+ nodes?
  • Is projection consumer lag monitored?
  • Are disk usage alerts configured for the event store?
  • Is the projection rebuild procedure documented?
  • Are snapshot creation frequency and retention policies configured?
  • Is a Dead Letter Queue configured for unprocessable events?
  • Are manual event correction scripts prepared for failure scenarios?

GDPR and Compliance

The append-only, immutable nature of an event store conflicts with data protection regulations that require deletion of personal data (the “right to be forgotten”).

Recommended approach: Store personal data outside the event store and reference it by identifier in events. This allows deletion to occur independently without affecting the event stream.

When personal data cannot be separated: Use crypto-shredding — encrypt personal data in events using a per-subject key. Delete the encryption key to render the data unrecoverable while leaving the event structure intact. This adds encryption overhead on every read and write and requires robust key management.

Practical Considerations

When to Use CQRS

Good candidates:

  • Complex domain logic with validation
  • Multiple read models needed
  • High read-to-write ratio
  • Audit requirements
  • Event-driven architectures

Probably not needed for:

  • Simple CRUD applications
  • Low complexity domains
  • Single read model

When to Use Event Sourcing

Good candidates:

  • Audit requirements (complete history)
  • Temporal queries (“what was the state at time X?”)
  • Complex business rules requiring history
  • Rebuilding state from events
  • Event-driven microservices

Probably not needed for:

  • Simple state management
  • Performance-critical writes
  • Eventually consistent reads acceptable

Event Sourcing vs Traditional CRUD

Criteria Traditional CRUD Event Sourcing
Current state querying Simple (direct read) Complex (event replay or projection)
Change history Requires separate audit logs Automatically captured
Schema changes ALTER TABLE migration Event versioning + upcasting
Debugging Current state only Full history replay
Data consistency ACID transactions Aggregate-level + eventual consistency
Storage Current state only All events accumulate
Read performance Index optimization Projection design required
Team experience General DDD + event modeling required

Challenges

Complexity: More moving parts than simple CRUD. Teams must manage event stores, projection engines, read model databases, and message brokers simultaneously. Start with a modular event sourcing setup and add CQRS only when read-model optimization justifies it.

Eventual consistency: Read models lag behind write models. The delay (eventual consistency window) depends on projection engine throughput. For systems requiring immediate consistency, CQRS may not be appropriate without compensating mechanisms.

Event schema evolution: Handling schema changes over time requires careful migration planning. Start with versioned events from day one — retrofitting versioning is significantly harder.

## Event versioning
@dataclass
class OrderCreatedV1:
    event_type: str = "OrderCreated"
    version: int = 1
    payload: dict = field(default_factory=lambda: {
        "customer_id": None
    })

@dataclass
class OrderCreatedV2:
    event_type: str = "OrderCreated"
    version: int = 2
    payload: dict = field(default_factory=lambda: {
        "customer_id": None,
        "shipping_address": None  # Added field
    })

## Upcaster
class EventUpcaster:
    def upcast(self, event: Event) -> Event:
        if event.version < 2:
            return self._upgrade_to_v2(event)
        return event

    def _upgrade_to_v2(self, event):
        event.payload["shipping_address"] = None  # Default value
        event.version = 2
        return event

Performance: Replaying many events can be slow:

## Snapshotting for performance
class SnapshotStore:
    def save_snapshot(self, aggregate_id: str, version: int, state: dict):
        self.db.execute(
            "INSERT INTO snapshots VALUES (?, ?, ?)",
            aggregate_id, version, json.dumps(state)
        )

    def get_snapshot(self, aggregate_id: str) -> tuple[int, dict] | None:
        row = self.db.query(
            "SELECT version, state FROM snapshots WHERE aggregate_id = ? "
            "ORDER BY version DESC LIMIT 1",
            aggregate_id
        ).first()
        if row:
            return row.version, json.loads(row.state)
        return None

Tools and Frameworks

Event Stores

  • KurrentDB (formerly EventStoreDB): Purpose-built event store with built-in projections, subscriptions, and gRPC client libraries for .NET, Node.js, Python, Java, Go, and Rust. Version 26.1+ adds SQL access and a new projection engine.
  • Apache Kafka: Distributed commit log ideal for event sourcing. Kafka 4.0+ uses KRaft mode (no ZooKeeper). Pair with ksqlDB for stream processing and materialized views.
  • Axon Framework: Java/Kotlin CQRS/ES framework with strong aggregate support.
  • Marten: .NET event sourcing on PostgreSQL with async projection support. MIT licensed, low operational complexity.
  • PostgreSQL (DIY): Roll your own event store on PostgreSQL with JSONB columns for event payloads. High flexibility but requires building projection, concurrency, and snapshot infrastructure yourself.

Read Model Stores

  • Elasticsearch: Full-text search and analytics for complex query models
  • PostgreSQL: Relational read models with materialized views and partial indexes
  • Redis: Cached read models for low-latency queries
  • MongoDB: Document read models for denormalized, nested data

For patterns on connecting CQRS components in microservice architectures, see the Microservices Communication Patterns Guide.

Best Practices

Keep Events Immutable

## Wrong: Mutable event
class BadEvent:
    def __init__(self):
        self.value = 0
    def update(self, new_value):
        self.value = new_value  # Don't do this!

## Right: Immutable events
@dataclass(frozen=True)
class GoodEvent:
    old_value: int
    new_value: int

Design Events Carefully

## Good: Intent-focused event
class OrderSubmitted:
    order_id: str
    submitted_at: datetime
    total: Decimal

## Avoid: Implementation-focused event
class OrderTableUpdated:
    # What does this mean?
    table: str = "orders"
    operation: str = "UPDATE"

Handle Failures

## Idempotent event processing
class IdempotentEventHandler:
    def __init__(self, event_store):
        self.event_store = event_store
        self.processed_events = set()

    def handle(self, event: Event):
        if event.event_id in self.processed_events:
            return
        self._process(event)
        self.processed_events.add(event.event_id)

Prefer Intent-Focused Events

Design events to capture business intent, not just state deltas. For example, a seat-reservation system should record “two seats were reserved” (SeatsReserved) rather than “remaining seats changed to 42” (SeatsChanged). The first tells you what happened. The second only tells you the result. Intent-focused events enable richer projections, better audit trails, and more flexible read model evolution.

Conclusion

CQRS and Event Sourcing provide powerful patterns for building complex applications. CQRS enables optimized read and write models, while Event Sourcing provides complete audit trails and temporal queries.

These patterns add complexity — only adopt when the benefits justify it. Start with simpler approaches and evolve as requirements demand. Invest the most time in event schema design — events persist forever. Establish versioning strategies early, implement projections idempotently, and apply these patterns selectively to core domains rather than the entire system.

The combination works particularly well for event-driven microservices, audit-critical systems, and domains with complex state transitions. The initial investment pays dividends in flexibility, observability, and the ability to answer questions about past system states that traditional architectures cannot.

Resources

Comments

👍 Was this article helpful?