Skip to main content
โšก Calmops

Event-Driven Architecture: Patterns, Kafka, and CQRS

Introduction

Event-driven architecture (EDA) organizes systems around events โ€” things that happened โ€” rather than direct service calls. Instead of Service A calling Service B’s API, A publishes an event and B (and any other interested service) reacts to it. This decoupling is the core benefit: producers and consumers evolve independently.

When EDA makes sense:

  • Multiple services need to react to the same business event
  • You need audit trails of everything that happened
  • Services have different scaling requirements
  • You want to add new consumers without changing producers

When it doesn’t:

  • Simple CRUD applications
  • When you need synchronous responses
  • Small teams where the operational overhead isn’t justified

Core Concepts

Event:    Something that happened ("OrderPlaced", "PaymentProcessed")
Producer: The service that emits events
Consumer: The service that reacts to events
Broker:   The infrastructure that routes events (Kafka, RabbitMQ, SQS)
Topic:    A named stream of events (like a table in a database)

Events vs Commands:

Command: "ProcessPayment" โ€” a request to do something (can be rejected)
Event:   "PaymentProcessed" โ€” a fact that already happened (immutable)

Kafka: The Standard Event Broker

Apache Kafka is the most widely used event streaming platform. It stores events durably and allows multiple consumers to read the same events independently.

Docker Compose Setup

# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
docker compose up -d
# Kafka UI: http://localhost:8080

Producing Events (Node.js)

npm install kafkajs
// producer.js
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
    clientId: 'order-service',
    brokers: ['localhost:9092'],
});

const producer = kafka.producer();

async function publishOrderPlaced(order) {
    await producer.connect();

    await producer.send({
        topic: 'orders',
        messages: [
            {
                key: order.id,           // partition key โ€” same order always goes to same partition
                value: JSON.stringify({
                    eventType: 'OrderPlaced',
                    eventId: crypto.randomUUID(),
                    timestamp: new Date().toISOString(),
                    version: 1,
                    data: {
                        orderId: order.id,
                        userId: order.userId,
                        items: order.items,
                        total: order.total,
                    },
                }),
                headers: {
                    'content-type': 'application/json',
                    'source': 'order-service',
                },
            },
        ],
    });
}

// In your order creation handler
app.post('/api/orders', async (req, res) => {
    const order = await db.createOrder(req.body);
    await publishOrderPlaced(order);
    res.status(201).json(order);
});

Consuming Events

// consumer.js
import { Kafka } from 'kafkajs';

const kafka = new Kafka({ brokers: ['localhost:9092'] });

const consumer = kafka.consumer({
    groupId: 'notification-service',  // consumer group โ€” each group gets all messages
});

async function startConsumer() {
    await consumer.connect();
    await consumer.subscribe({ topic: 'orders', fromBeginning: false });

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            const event = JSON.parse(message.value.toString());

            console.log(`Processing ${event.eventType} for order ${event.data.orderId}`);

            try {
                switch (event.eventType) {
                    case 'OrderPlaced':
                        await sendOrderConfirmationEmail(event.data);
                        break;
                    case 'OrderShipped':
                        await sendShippingNotification(event.data);
                        break;
                    default:
                        console.log('Unknown event type:', event.eventType);
                }
            } catch (err) {
                // Log error but don't throw โ€” Kafka will retry if you throw
                console.error('Failed to process event:', err);
                // Consider sending to a dead-letter topic
            }
        },
    });
}

Consumer Groups

Multiple consumer groups can read the same topic independently:

Topic: orders
โ”œโ”€โ”€ Consumer Group: notification-service  โ†’ sends emails
โ”œโ”€โ”€ Consumer Group: inventory-service     โ†’ updates stock
โ””โ”€โ”€ Consumer Group: analytics-service    โ†’ updates dashboards

Each group maintains its own offset โ€” they all see all messages.

Event Schema Design

Good event schemas are self-describing and versioned:

// Good event structure
{
    "eventType": "OrderPlaced",
    "eventId": "550e8400-e29b-41d4-a716-446655440000",
    "timestamp": "2026-03-30T10:00:00Z",
    "version": 1,
    "source": "order-service",
    "correlationId": "req-abc123",  // trace requests across services
    "data": {
        "orderId": "ord-123",
        "userId": "user-456",
        "items": [{"productId": "prod-789", "quantity": 2, "price": 29.99}],
        "total": 59.98
    }
}

Schema evolution rules:

  • Adding new fields: safe (consumers ignore unknown fields)
  • Removing fields: breaking change โ€” use versioning
  • Renaming fields: breaking change โ€” add new field, deprecate old

Event Sourcing

Instead of storing current state, store the sequence of events that led to it:

// Traditional: store current state
// users table: { id, name, email, balance: 150 }

// Event sourcing: store events
// events table:
// { type: "AccountOpened",  data: { userId: 1, initialBalance: 100 } }
// { type: "MoneyDeposited", data: { userId: 1, amount: 100 } }
// { type: "MoneyWithdrawn", data: { userId: 1, amount: 50 } }
// Current balance = 100 + 100 - 50 = 150
// Rebuild state by replaying events
class BankAccount {
    constructor() {
        this.balance = 0;
        this.events = [];
    }

    apply(event) {
        switch (event.type) {
            case 'AccountOpened':
                this.balance = event.data.initialBalance;
                break;
            case 'MoneyDeposited':
                this.balance += event.data.amount;
                break;
            case 'MoneyWithdrawn':
                if (event.data.amount > this.balance) {
                    throw new Error('Insufficient funds');
                }
                this.balance -= event.data.amount;
                break;
        }
        this.events.push(event);
    }

    static fromEvents(events) {
        const account = new BankAccount();
        events.forEach(e => account.apply(e));
        return account;
    }
}

// Load account from event store
const events = await eventStore.getEvents('account-123');
const account = BankAccount.fromEvents(events);
console.log(account.balance);  // current balance

Benefits of event sourcing:

  • Complete audit trail โ€” you know exactly what happened and when
  • Time travel โ€” reconstruct state at any point in time
  • Event replay โ€” rebuild read models, fix bugs by replaying

Costs:

  • More complex than CRUD
  • Eventual consistency between write and read models
  • Event schema evolution is harder

CQRS: Command Query Responsibility Segregation

Separate the write model (commands) from the read model (queries):

Write side:                    Read side:
POST /orders โ†’ OrderService โ†’ EventStore โ†’ Projector โ†’ ReadDB
                                                           โ†“
GET /orders/{id} โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ†’ ReadDB
// Write side: command handler
async function handlePlaceOrder(command) {
    // Validate
    if (command.items.length === 0) {
        throw new Error('Order must have at least one item');
    }

    // Create event
    const event = {
        type: 'OrderPlaced',
        orderId: generateId(),
        userId: command.userId,
        items: command.items,
        total: calculateTotal(command.items),
        timestamp: new Date().toISOString(),
    };

    // Store event (source of truth)
    await eventStore.append('orders', event.orderId, event);

    // Publish to Kafka for other services
    await kafka.publish('orders', event);

    return event.orderId;
}

// Read side: projection (builds read model from events)
async function projectOrderSummary(event) {
    if (event.type === 'OrderPlaced') {
        await readDb.upsert('order_summaries', {
            id: event.orderId,
            userId: event.userId,
            status: 'pending',
            total: event.total,
            itemCount: event.items.length,
            createdAt: event.timestamp,
        });
    }

    if (event.type === 'OrderShipped') {
        await readDb.update('order_summaries',
            { id: event.orderId },
            { status: 'shipped', shippedAt: event.timestamp }
        );
    }
}

// Read side: query handler (fast, optimized for reads)
async function getOrderSummary(orderId) {
    return readDb.findOne('order_summaries', { id: orderId });
}

Saga Pattern: Distributed Transactions

When a business process spans multiple services, use sagas to coordinate:

// Choreography-based saga (services react to events)
// 1. OrderService publishes OrderPlaced
// 2. PaymentService reacts, publishes PaymentProcessed or PaymentFailed
// 3. InventoryService reacts to PaymentProcessed, publishes InventoryReserved
// 4. ShippingService reacts to InventoryReserved, publishes OrderShipped

// Compensation: if any step fails, publish compensating events
// PaymentFailed โ†’ OrderService cancels order
// InventoryFailed โ†’ PaymentService refunds payment

Dead Letter Queue

Handle events that fail processing:

const dlqProducer = kafka.producer();

async function processWithDLQ(event) {
    try {
        await processEvent(event);
    } catch (err) {
        console.error('Failed to process event, sending to DLQ:', err);

        await dlqProducer.send({
            topic: 'orders.dlq',
            messages: [{
                key: event.orderId,
                value: JSON.stringify({
                    originalEvent: event,
                    error: err.message,
                    failedAt: new Date().toISOString(),
                    retryCount: (event.retryCount || 0) + 1,
                }),
            }],
        });
    }
}

Resources

Comments