Introduction
Traditional architectures use a single model for both reading and writing data. This works well for simple applications but creates challenges as systems grow in complexity. CQRS (Command Query Responsibility Segregation) and Event Sourcing offer alternative approaches that separate read and write operations, enabling better scalability, flexibility, and auditability.
In 2026, these patterns have matured significantly. EventStoreDB has rebranded to KurrentDB (version 26.1 with SQL access and a new projection engine). Apache Kafka 4.0+ now runs in KRaft mode without ZooKeeper, and frameworks like Marten and Axon Framework continue to evolve. This guide covers CQRS and Event Sourcing patterns, implementation strategies, production considerations, and common anti-patterns to avoid.
For foundational context on why these patterns emerged, see the Event-Driven Architecture Complete Guide.
Understanding CQRS
What Is CQRS?
CQRS separates read and write operations into distinct models:
- Commands: Operations that change state (Create, Update, Delete)
- Queries: Operations that read state without modification
This separation allows optimizing each operation type independently. CQRS and Event Sourcing are independent patterns — you can use CQRS without Event Sourcing (using traditional databases for both sides) and Event Sourcing without CQRS (using the event log for both reads and writes). In practice, they’re often combined because Event Sourcing’s event-replay model benefits from CQRS’s read-side projections.
Traditional vs CQRS
Traditional CRUD:
## Single model for both read and write
class OrderController:
def get_order(self, order_id):
return self.db.query(Order).filter_by(id=order_id).first()
def create_order(self, data):
order = Order(**data)
self.db.add(order)
self.db.commit()
return order
def update_order(self, order_id, data):
order = self.db.query(Order).filter_by(id=order_id).first()
for key, value in data.items():
setattr(order, key, value)
self.db.commit()
return order
CQRS:
## Separate models for read and write
## Write model - focused on business logic
class OrderCommandHandler:
def create_order(self, data):
validator = OrderValidator()
validator.validate(data)
order = Order.create(**data)
self.event_store.save(order.events)
return order.id
## Read model - optimized for queries
class OrderQueryHandler:
def get_order_summary(self, order_id):
return self.read_db.query(OrderSummaryDTO).filter_by(id=order_id).first()
def get_orders_by_customer(self, customer_id):
return self.read_db.query(OrderListDTO).filter_by(customer_id=customer_id).all()
Benefits of CQRS
Optimized read models: Different read models for different views:
## Product list view - minimal data
class ProductListItem:
id: str
name: str
price: float
thumbnail_url: str
## Product detail view - full data
class ProductDetail:
id: str
name: str
description: str
price: float
inventory: int
images: list[str]
specifications: dict
reviews: list[Review]
Optimized write models: Focused on business logic:
## Write model - complex validation
class Order:
def __init__(self):
self.items = []
self.status = "draft"
def add_item(self, product, quantity):
if product.inventory < quantity:
raise InsufficientInventoryError()
if self.status != "draft":
raise OrderNotEditableError()
self.items.append(OrderItem(product, quantity))
def submit(self):
if not self.items:
raise EmptyOrderError()
self.status = "submitted"
Independent scaling: Read and write workloads scale separately:
## Kubernetes: Scale reads independently
resources:
read-api:
replicas: 10
write-api:
replicas: 2
Event Sourcing
What Is Event Sourcing?
Instead of storing current state, event sourcing stores all state changes as a sequence of events:
## Traditional: Store current state
class BankAccount:
balance: 100 # Current balance
## Event Sourcing: Store all changes
class BankAccountEvents:
events: [
AccountCreated(account_id="acc_1", initial_balance=0),
DepositMade(account_id="acc_1", amount=50, new_balance=50),
DepositMade(account_id="acc_1", amount=50, new_balance=100),
]
## Current state = replay all events
def get_balance(events):
balance = 0
for event in events:
if isinstance(event, DepositMade):
balance = event.new_balance
elif isinstance(event, WithdrawalMade):
balance = event.new_balance
return balance
Event Structure
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@dataclass
class Event:
event_id: str
aggregate_id: str
event_type: str
timestamp: datetime
payload: dict[str, Any]
metadata: dict[str, Any]
@dataclass
class OrderCreated(Event):
event_type: str = "OrderCreated"
def __init__(self, order_id: str, customer_id: str, items: list):
self.event_id = str(uuid.uuid4())
self.aggregate_id = order_id
self.event_type = "OrderCreated"
self.timestamp = datetime.now()
self.payload = {
"customer_id": customer_id,
"items": items
}
self.metadata = {}
Always include correlationId and causationId in event metadata. In distributed systems where a single user request generates multiple events, troubleshooting becomes nearly impossible without these two fields for tracing the causal chain.
Event Design Principles
Designing events is the most important decision in an event-sourced system. Events persist forever and are never modified. Follow these principles:
- Name events using past-tense verbs:
OrderCreated,PaymentProcessed,ItemShipped - Capture business intent:
RoomBookedrather thanReservationStatusChanged— the first tells you what happened, the second only tells you the resulting state - Events must be self-contained: it should be possible to understand what happened from a single event
- Use only simple types: string, number, boolean, arrays. Do not embed complex domain objects directly in events
- Include a version field: essential for schema evolution
- Model around verbs (actions), not nouns (entities)
Event Store
class EventStore:
def __init__(self, connection):
self.connection = connection
def append(self, aggregate_id: str, events: list[Event]):
for event in events:
self.connection.execute(
"""
INSERT INTO events (aggregate_id, event_type, payload, metadata, created_at)
VALUES (?, ?, ?, ?, ?)
""",
aggregate_id,
event.event_type,
json.dumps(event.payload),
json.dumps(event.metadata),
event.timestamp
)
def get_events(self, aggregate_id: str) -> list[Event]:
rows = self.connection.execute(
"SELECT * FROM events WHERE aggregate_id = ? ORDER BY created_at",
aggregate_id
)
return [self.row_to_event(row) for row in rows]
def get_events_stream(self, from_position: int = 0) -> EventStream:
return EventStream(self.connection, from_position)
Event Store Solution Comparison
Choosing the right event store depends on your technology stack, operational environment, and requirements:
| Solution | Ecosystem | Storage | Built-in Projections | License | Ops Complexity |
|---|---|---|---|---|---|
| KurrentDB (formerly EventStoreDB) | Language-agnostic (gRPC) | Dedicated file system | Yes (JS-based) | BSL (free for commercial) | Medium |
| Axon Server | JVM (Java/Kotlin) | Dedicated storage | No (handled by Framework) | Open Source + Enterprise | Medium |
| Marten | .NET (C#) | PostgreSQL | Yes (C#-based) | MIT | Low |
| EventSourcingDB | Language-agnostic (gRPC) | Dedicated engine | No | Open Source | Low |
| PostgreSQL + DIY | Language-agnostic | RDBMS | No (build your own) | Open Source | High |
| MongoDB + DIY | Language-agnostic | Document DB | No (build your own) | SSPL | High |
| Apache Kafka + ksqlDB | Language-agnostic | Distributed commit log | Yes (ksqlDB) | Open Source | High |
KurrentDB is a dedicated event store designed by Greg Young, providing native support for stream-based append-only storage, built-in projections, and catch-up subscriptions. As of 2025, EventStoreDB rebranded to Kurrent, and version 26.1 (released 2026) adds SQL access and a new projection engine.
Apache Kafka is increasingly used as the event store foundation, especially with Kafka 4.0+ running in KRaft mode (no ZooKeeper dependency), exactly-once semantics, and ksqlDB for stream processing and materialized views.
Projections
Build read models from events:
## Materialized view projection
class OrderSummaryProjection:
def __init__(self, read_db):
self.read_db = read_db
def project(self, event: Event):
if event.event_type == "OrderCreated":
self._handle_order_created(event)
elif event.event_type == "OrderItemAdded":
self._handle_item_added(event)
elif event.event_type == "OrderSubmitted":
self._handle_order_submitted(event)
def _handle_order_created(self, event):
self.read_db.execute(
"""
INSERT INTO order_summaries (order_id, customer_id, total, status, created_at)
VALUES (?, ?, 0, 'draft', ?)
""",
event.aggregate_id,
event.payload["customer_id"],
event.timestamp
)
Projection Pattern Classification
Projections can be implemented in three ways, each with different consistency and performance trade-offs:
| Pattern | Consistency | Write Performance | Read Latency | Use Case |
|---|---|---|---|---|
| Inline | Strong | Degraded | Low | Small-scale, strict consistency |
| Async | Eventual | Good | Low | Most production systems |
| Live | Strong (on read) | Best | High (replay on each read) | Rare queries, ad-hoc analysis |
Inline Projection: Updates the read model in the same transaction as event storage. Guarantees strong consistency but degrades write performance because the write path waits for projection completion.
Async Projection: Updates the read model asynchronously through subscriptions. Provides eventual consistency but offers good write performance. Most production systems adopt this approach. Events flow through a message broker (Kafka, RabbitMQ) or event store subscriptions to projection handlers.
Live Projection: Replays events for every request. Always guarantees the latest state, but performance drops sharply when there are many events. Suitable for administrative dashboards or debugging tools where latency is acceptable.
CQRS + Event Sourcing Together
Architecture Overview
flowchart LR
C1[Client] --> CH[Command Handler]
CH --> ES[Event Store]
ES --> PE[Projection Engine]
PE --> QH[Query Handler]
C2[Client] --> QH
The write path (commands) and read path (queries) are completely separated. The write side appends events to the event store. The read side projects those events into read-optimized models. A message broker or event store subscription fans out events from the write side to multiple projection handlers.
Implementation Example
## Domain Events
@dataclass
class OrderEvent(Event):
pass
@dataclass
class OrderCreated(OrderEvent):
def __init__(self, order_id: str, customer_id: str):
super().__init__()
self.aggregate_id = order_id
self.event_type = "OrderCreated"
self.payload = {"customer_id": customer_id}
@dataclass
class OrderItemAdded(OrderEvent):
def __init__(self, order_id: str, product_id: str, quantity: int):
super().__init__()
self.aggregate_id = order_id
self.event_type = "OrderItemAdded"
self.payload = {"product_id": product_id, "quantity": quantity}
## Aggregate
class OrderAggregate:
def __init__(self, order_id: str):
self.order_id = order_id
self.events = []
self.items = []
self.status = "draft"
def create(self, customer_id: str):
self.events.append(OrderCreated(self.order_id, customer_id))
return self
def add_item(self, product_id: str, quantity: int):
self.events.append(OrderItemAdded(self.order_id, product_id, quantity))
return self
def get_uncommitted_events(self) -> list[Event]:
return self.events
def mark_events_committed(self):
self.events = []
## Command Handler
class OrderCommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def handle_create_order(self, command: CreateOrderCommand) -> str:
aggregate = OrderAggregate(str(uuid.uuid4()))
existing_events = self.event_store.get_events(command.order_id)
# ... replay events to restore state
# Execute command
aggregate.create(command.customer_id)
for item in command.items:
aggregate.add_item(item["product_id"], item["quantity"])
# Save events
self.event_store.append(
aggregate.order_id,
aggregate.get_uncommitted_events()
)
return aggregate.order_id
## Query Handler
class OrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db
def get_order_summary(self, order_id: str) -> OrderSummaryDTO:
return self.read_db.query(OrderSummaryDTO).filter_by(
order_id=order_id
).first()
Handling Concurrency
## Optimistic concurrency with events
class EventStore:
def append(self, aggregate_id: str, expected_version: int, events: list[Event]):
current_version = self.get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, found {current_version}"
)
# Store events with version
for i, event in enumerate(events):
self.connection.execute(
"INSERT INTO events VALUES (?, ?, ?, ?, ?)",
aggregate_id,
expected_version + i + 1,
event.event_type,
json.dumps(event.payload),
event.timestamp
)
The expectedVersion parameter (called expectedRevision in KurrentDB) is the core of optimistic concurrency control. When two commands execute simultaneously against the same aggregate, the first to save succeeds and the second receives a WrongExpectedVersionError. The retry logic must reload the events and re-execute the command.
Snapshot Strategy
When an aggregate accumulates thousands of events, replaying the entire history on every read becomes impractical. Snapshots solve this by periodically saving the current state as a checkpoint.
Instead of replaying 10,000 account transactions, you load the most recent snapshot (taken at event 9,500) and replay only the 500 events since then.
Snapshot Application Criteria
Introduce snapshots only when justified. Use these guidelines:
- Consider snapshots when the average events per aggregate exceeds 50
- Snapshots are needed when event replay time exceeds 100ms
- Create snapshots every 100 to 500 events
- Base snapshots on business-relevant boundaries (end of day, monthly close) rather than arbitrary event counts
## Snapshotting for performance
class SnapshotStore:
def save_snapshot(self, aggregate_id: str, version: int, state: dict):
self.db.execute(
"INSERT INTO snapshots VALUES (?, ?, ?)",
aggregate_id, version, json.dumps(state)
)
def get_snapshot(self, aggregate_id: str) -> tuple[int, dict] | None:
row = self.db.query(
"SELECT version, state FROM snapshots WHERE aggregate_id = ? "
"ORDER BY version DESC LIMIT 1",
aggregate_id
).first()
if row:
return row.version, json.loads(row.state)
return None
Snapshot Schema Versioning
When the aggregate structure changes, existing snapshots become invalid. Track a schemaVersion on each snapshot to detect this:
class VersionedSnapshotStore:
SCHEMA_VERSION = 2
def save_snapshot(self, aggregate_id: str, version: int, state: dict):
self.db.execute(
"INSERT INTO snapshots VALUES (?, ?, ?, ?)",
aggregate_id, version, self.SCHEMA_VERSION, json.dumps(state)
)
def load_aggregate(self, aggregate_id: str) -> Aggregate:
snapshot = self.get_snapshot(aggregate_id)
if snapshot and snapshot.schema_version == self.SCHEMA_VERSION:
# Start from snapshot, replay only subsequent events
start_version = snapshot.version
aggregate = self.reconstitute_from_snapshot(snapshot)
else:
# Snapshot missing or schema mismatch — full replay
start_version = 0
aggregate = self.create_empty_aggregate()
events = self.event_store.get_events_since(aggregate_id, start_version)
for event in events:
aggregate.apply(event)
return aggregate
Snapshots are a performance optimization, not a required component. The system must function correctly without snapshots. Always implement automatic fallback to full event replay when snapshots are corrupted or schema versions mismatch.
Event Versioning
Events are immutable — once stored, they are never modified. As your domain evolves, event schemas change. You need strategies for handling multiple event versions and migrating between schemas.
Versioning Strategy Comparison
| Strategy | Description | Risk | Suitable For |
|---|---|---|---|
| Tolerant deserialization | Ignore unknown fields, use defaults for missing | Low | Adding optional fields |
| Event versioning | Include a version identifier, consumers handle each version | Medium | Field name or type changes |
| Upcasting | Transform at read time via middleware | Medium | Nonbreaking schema changes |
| In-place migration | Rewrite historical events to new schema | High | Last resort — breaks immutability |
Upcasting Example
Upcasting places middleware that converts previous event versions to the current version during deserialization:
## Event versioning
@dataclass
class OrderCreatedV1:
event_type: str = "OrderCreated"
version: int = 1
payload: dict = field(default_factory=lambda: {
"customer_id": None
})
@dataclass
class OrderCreatedV2:
event_type: str = "OrderCreated"
version: int = 2
payload: dict = field(default_factory=lambda: {
"customer_id": None,
"shipping_address": None # Added field
})
## Upcaster
class EventUpcaster:
def upcast(self, event: Event) -> Event:
if event.version < 2:
return self._upgrade_to_v2(event)
return event
def _upgrade_to_v2(self, event):
event.payload["shipping_address"] = None # Default value
event.version = 2
return event
The most important principle: never modify existing events in the store. Transform at read time (upcasting) or define new event types. Kafka’s Schema Registry integrates with producers and consumers to automatically validate schemas before publishing and deserialize correctly based on schema version.
Event Ordering and Idempotency
In distributed systems, events may arrive out of order or be delivered more than once. Event handlers must handle both scenarios.
- Sequence numbers: Tag each event in a stream with a monotonically increasing sequence number. Handlers track the last processed sequence and skip earlier events.
- Idempotency keys: For external integrations, include an idempotency key in event metadata so downstream services can safely handle duplicates.
- Exactly-once processing: In Kafka 4.0+, idempotent producers combined with transactional guarantees provide exactly-once semantics for event publishing.
See the Distributed Systems Fundamentals Guide for consensus and ordering patterns.
Apache Kafka Integration
Apache Kafka’s distributed commit log aligns naturally with CQRS and Event Sourcing requirements. Kafka 4.0+ (2024-2025) uses KRaft mode, removing ZooKeeper dependency and improving cluster scalability.
Kafka topics serve as the event store. Events are written to topics with configurable retention — days, weeks, or indefinitely. Kafka’s partitioning enables horizontal scaling: events with the same aggregate ID go to the same partition, maintaining order. Multiple partitions allow parallel processing across many consumers, which is crucial when rebuilding read models from millions of events.
// Command handler publishes events to Kafka (write side)
public class OrderCommandHandler {
private final KafkaProducer<String, OrderEvent> producer;
public void handle(CreateOrderCommand command) {
validateOrder(command);
OrderCreatedEvent event = new OrderCreatedEvent(
UUID.randomUUID(),
command.getOrderId(),
command.getCustomerId(),
command.getItems(),
Instant.now()
);
// Order ID as key ensures partition ordering
ProducerRecord<String, OrderEvent> record =
new ProducerRecord<>("order-events", event.getOrderId(), event);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("Event published to partition {} at offset {}",
metadata.partition(), metadata.offset());
}
});
}
}
The read side consumes events and builds materialized views:
// Consumer building read model (read side)
public class OrderReadModelConsumer {
private final OrderRepository repository;
@KafkaListener(topics = "order-events", groupId = "order-view-builder")
public void consume(ConsumerRecord<String, OrderEvent> record) {
OrderEvent event = record.value();
switch (event.getType()) {
case ORDER_CREATED:
OrderCreatedEvent created = (OrderCreatedEvent) event;
repository.save(new OrderView(
created.getOrderId(),
created.getCustomerId(),
created.getTotalAmount(),
"CREATED"
));
break;
case ORDER_SHIPPED:
repository.updateStatus(event.getOrderId(), "SHIPPED");
break;
}
}
}
Using Kafka Streams or ksqlDB enables sophisticated projections with joins, aggregations, and windowing — operating directly on Kafka topics as tables and streams. Kafka’s log compaction is useful for snapshot topics, keeping only the most recent state per key while preserving the full event history in the main topic.
Anti-Patterns
Common mistakes that cause event sourcing projects to fail:
Anti-pattern 1: Storing Entire State in Events
## BAD: Putting the entire state in an event is no different from CRUD
class OrderUpdated:
data: Order # Entire Order object — defeats the purpose of events
## GOOD: Record only what changed
class OrderItemAdded:
data:
product_id: str
quantity: int
unit_price: Decimal
Anti-pattern 2: Calling External Services from Projections
Projections should be pure functions that update read models using only event data. If a projection calls an external API, those side effects will fire during event replay, causing duplicate charges, notifications, or API calls. Separate external integrations into dedicated Saga or Process Manager handlers.
Anti-pattern 3: Applying Event Sourcing to Every Domain
Event sourcing adds complexity. Apply it selectively to domains where change history has business value — financial transactions, order processing, inventory management. For simple CRUD domains like user settings or reference code tables, traditional storage is more appropriate.
Anti-pattern 4: Accumulating 100,000+ Events in a Stream
When a single aggregate stream exceeds 100,000 events, loading time exceeds several seconds even with snapshots. The root cause is poorly designed aggregate boundaries. Consider splitting the aggregate into smaller units or reviewing the event generation pattern. For example, an Order aggregate with a few dozen events is fine — a Customer aggregate that records every login, click, and preference change over years is not.
Anti-pattern 5: Single Mega Aggregate
Having one projector that feeds into a single large aggregate, then transforming that aggregate for every query, creates coupling and increases complexity. Instead, project directly into the shape each query needs. Multiple projections for different read models are a feature, not a bug.
Disaster Recovery and Operations
Disaster recovery in event sourcing differs from traditional systems. Since the event store is the single source of truth, projections can always be rebuilt from scratch.
Recovering from Projection Corruption
- Stop the projection service
- TRUNCATE or DROP the read model table
- Reset the checkpoint position for that projection
- Restart the projection service — it rebuilds the read model by replaying all events from the beginning
- Verify the projection has caught up to the current position
If you have hundreds of millions of events, design a projection architecture capable of parallel processing, or rebuild by partition.
Event Store Cluster Failure
KurrentDB uses a leader-follower architecture. When the leader goes down, a follower is automatically promoted:
- Write failures: Retry writes that fail during leader transitions. Idempotency is guaranteed by
expectedRevision - Subscription reconnection: Persistent subscriptions reconnect automatically; catch-up subscriptions must be manually reconnected from the last checkpoint
- Split-brain prevention: Operate a minimum 3-node cluster for quorum-based leader election
Operational Checklist
Design Phase:
- Does every event include
eventVersion,correlationId, andcausationId? - Is the aggregate’s invariant validation complete?
- Is CQRS truly needed for this domain?
Implementation Phase:
- Are projection handlers idempotent (using
ON CONFLICT/ upsert semantics)? - Is optimistic concurrency control implemented with
expectedRevision? - Does the upcaster correctly handle all previous event versions?
- Does the system fall back to full event replay when snapshot schemas mismatch?
Operations Phase:
- Is the event store cluster configured with 3+ nodes?
- Is projection consumer lag monitored?
- Are disk usage alerts configured for the event store?
- Is the projection rebuild procedure documented?
- Are snapshot creation frequency and retention policies configured?
- Is a Dead Letter Queue configured for unprocessable events?
- Are manual event correction scripts prepared for failure scenarios?
GDPR and Compliance
The append-only, immutable nature of an event store conflicts with data protection regulations that require deletion of personal data (the “right to be forgotten”).
Recommended approach: Store personal data outside the event store and reference it by identifier in events. This allows deletion to occur independently without affecting the event stream.
When personal data cannot be separated: Use crypto-shredding — encrypt personal data in events using a per-subject key. Delete the encryption key to render the data unrecoverable while leaving the event structure intact. This adds encryption overhead on every read and write and requires robust key management.
Practical Considerations
When to Use CQRS
Good candidates:
- Complex domain logic with validation
- Multiple read models needed
- High read-to-write ratio
- Audit requirements
- Event-driven architectures
Probably not needed for:
- Simple CRUD applications
- Low complexity domains
- Single read model
When to Use Event Sourcing
Good candidates:
- Audit requirements (complete history)
- Temporal queries (“what was the state at time X?”)
- Complex business rules requiring history
- Rebuilding state from events
- Event-driven microservices
Probably not needed for:
- Simple state management
- Performance-critical writes
- Eventually consistent reads acceptable
Event Sourcing vs Traditional CRUD
| Criteria | Traditional CRUD | Event Sourcing |
|---|---|---|
| Current state querying | Simple (direct read) | Complex (event replay or projection) |
| Change history | Requires separate audit logs | Automatically captured |
| Schema changes | ALTER TABLE migration | Event versioning + upcasting |
| Debugging | Current state only | Full history replay |
| Data consistency | ACID transactions | Aggregate-level + eventual consistency |
| Storage | Current state only | All events accumulate |
| Read performance | Index optimization | Projection design required |
| Team experience | General | DDD + event modeling required |
Challenges
Complexity: More moving parts than simple CRUD. Teams must manage event stores, projection engines, read model databases, and message brokers simultaneously. Start with a modular event sourcing setup and add CQRS only when read-model optimization justifies it.
Eventual consistency: Read models lag behind write models. The delay (eventual consistency window) depends on projection engine throughput. For systems requiring immediate consistency, CQRS may not be appropriate without compensating mechanisms.
Event schema evolution: Handling schema changes over time requires careful migration planning. Start with versioned events from day one — retrofitting versioning is significantly harder.
## Event versioning
@dataclass
class OrderCreatedV1:
event_type: str = "OrderCreated"
version: int = 1
payload: dict = field(default_factory=lambda: {
"customer_id": None
})
@dataclass
class OrderCreatedV2:
event_type: str = "OrderCreated"
version: int = 2
payload: dict = field(default_factory=lambda: {
"customer_id": None,
"shipping_address": None # Added field
})
## Upcaster
class EventUpcaster:
def upcast(self, event: Event) -> Event:
if event.version < 2:
return self._upgrade_to_v2(event)
return event
def _upgrade_to_v2(self, event):
event.payload["shipping_address"] = None # Default value
event.version = 2
return event
Performance: Replaying many events can be slow:
## Snapshotting for performance
class SnapshotStore:
def save_snapshot(self, aggregate_id: str, version: int, state: dict):
self.db.execute(
"INSERT INTO snapshots VALUES (?, ?, ?)",
aggregate_id, version, json.dumps(state)
)
def get_snapshot(self, aggregate_id: str) -> tuple[int, dict] | None:
row = self.db.query(
"SELECT version, state FROM snapshots WHERE aggregate_id = ? "
"ORDER BY version DESC LIMIT 1",
aggregate_id
).first()
if row:
return row.version, json.loads(row.state)
return None
Tools and Frameworks
Event Stores
- KurrentDB (formerly EventStoreDB): Purpose-built event store with built-in projections, subscriptions, and gRPC client libraries for .NET, Node.js, Python, Java, Go, and Rust. Version 26.1+ adds SQL access and a new projection engine.
- Apache Kafka: Distributed commit log ideal for event sourcing. Kafka 4.0+ uses KRaft mode (no ZooKeeper). Pair with ksqlDB for stream processing and materialized views.
- Axon Framework: Java/Kotlin CQRS/ES framework with strong aggregate support.
- Marten: .NET event sourcing on PostgreSQL with async projection support. MIT licensed, low operational complexity.
- PostgreSQL (DIY): Roll your own event store on PostgreSQL with JSONB columns for event payloads. High flexibility but requires building projection, concurrency, and snapshot infrastructure yourself.
Read Model Stores
- Elasticsearch: Full-text search and analytics for complex query models
- PostgreSQL: Relational read models with materialized views and partial indexes
- Redis: Cached read models for low-latency queries
- MongoDB: Document read models for denormalized, nested data
For patterns on connecting CQRS components in microservice architectures, see the Microservices Communication Patterns Guide.
Best Practices
Keep Events Immutable
## Wrong: Mutable event
class BadEvent:
def __init__(self):
self.value = 0
def update(self, new_value):
self.value = new_value # Don't do this!
## Right: Immutable events
@dataclass(frozen=True)
class GoodEvent:
old_value: int
new_value: int
Design Events Carefully
## Good: Intent-focused event
class OrderSubmitted:
order_id: str
submitted_at: datetime
total: Decimal
## Avoid: Implementation-focused event
class OrderTableUpdated:
# What does this mean?
table: str = "orders"
operation: str = "UPDATE"
Handle Failures
## Idempotent event processing
class IdempotentEventHandler:
def __init__(self, event_store):
self.event_store = event_store
self.processed_events = set()
def handle(self, event: Event):
if event.event_id in self.processed_events:
return
self._process(event)
self.processed_events.add(event.event_id)
Prefer Intent-Focused Events
Design events to capture business intent, not just state deltas. For example, a seat-reservation system should record “two seats were reserved” (SeatsReserved) rather than “remaining seats changed to 42” (SeatsChanged). The first tells you what happened. The second only tells you the result. Intent-focused events enable richer projections, better audit trails, and more flexible read model evolution.
Conclusion
CQRS and Event Sourcing provide powerful patterns for building complex applications. CQRS enables optimized read and write models, while Event Sourcing provides complete audit trails and temporal queries.
These patterns add complexity — only adopt when the benefits justify it. Start with simpler approaches and evolve as requirements demand. Invest the most time in event schema design — events persist forever. Establish versioning strategies early, implement projections idempotently, and apply these patterns selectively to core domains rather than the entire system.
The combination works particularly well for event-driven microservices, audit-critical systems, and domains with complex state transitions. The initial investment pays dividends in flexibility, observability, and the ability to answer questions about past system states that traditional architectures cannot.
Resources
- Microsoft Azure - Event Sourcing Pattern
- Microsoft Azure - CQRS Pattern
- Martin Fowler - Event Sourcing
- Microservices.io - Event Sourcing Pattern
- KurrentDB Documentation
- Event Sourcing with Kafka - Conduktor
- AWS Prescriptive Guidance - Event Sourcing Pattern
- Greg Young - Versioning in an Event Sourced System
Comments