Skip to main content
โšก Calmops

Outbox Pattern: Reliable Event Publishing in Microservices

The Outbox Pattern solves a fundamental problem in microservices: how to reliably publish events when updating a database, without requiring distributed transactions. It ensures that database changes and event publishing happen atomically.

The Problem

Without Outbox Pattern

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚         Race Condition Without Outbox Pattern                      โ”‚
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”‚
โ”‚  โ”‚  Order Service   โ”‚      โ”‚  Message Queue   โ”‚               โ”‚
โ”‚  โ”‚                  โ”‚      โ”‚                  โ”‚               โ”‚
โ”‚  โ”‚  1. UPDATE order โ”‚      โ”‚                  โ”‚               โ”‚
โ”‚  โ”‚     status='paid'โ”‚      โ”‚                  โ”‚               โ”‚
โ”‚  โ”‚                  โ”‚      โ”‚                  โ”‚               โ”‚
โ”‚  โ”‚  2. PUBLISH     โ”‚ โ”€โ”€โ”€โ”€โ–ถโ”‚  OrderPaidEvent  โ”‚               โ”‚
โ”‚  โ”‚     event       โ”‚      โ”‚                  โ”‚               โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚
โ”‚         โ”‚                                                       โ”‚
โ”‚         โ–ผ                                                       โ”‚
โ”‚  Problems:                                                      โ”‚
โ”‚  โœ— If step 2 fails: DB updated but no event                    โ”‚
โ”‚  โœ— If step 2 crashes: Inconsistent state                        โ”‚
โ”‚  โœ— If step 2 times out: Unknown if event published              โ”‚
โ”‚  โœ— No atomicity between DB and message queue                    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

With Outbox Pattern

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚            Outbox Pattern Solution                                 โ”‚
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚                  Order Service (Single DB)                 โ”‚  โ”‚
โ”‚  โ”‚                                                            โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  orders table                                     โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€    โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  id: 123                                         โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  status: 'paid'                                  โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  ...                                             โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚  โ”‚
โ”‚  โ”‚                          โ”‚                               โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  outbox table (same transaction!)              โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€    โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  id: 1                                         โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  aggregate_type: 'Order'                        โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  aggregate_id: '123'                           โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  event_type: 'OrderPaidEvent'                  โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  payload: {...}                                โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  created_at: '2026-02-28T10:00:00Z'          โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                              โ”‚                                   โ”‚
โ”‚                              โ–ผ                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚              Outbox Relay (Background Worker)              โ”‚  โ”‚
โ”‚  โ”‚                                                            โ”‚  โ”‚
โ”‚  โ”‚  1. SELECT * FROM outbox WHERE processed = false          โ”‚  โ”‚
โ”‚  โ”‚  2. PUBLISH each event to message broker                   โ”‚  โ”‚
โ”‚  โ”‚  3. UPDATE outbox SET processed = true                    โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                              โ”‚                                   โ”‚
โ”‚                              โ–ผ                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”‚
โ”‚  โ”‚  Inventory       โ”‚      โ”‚  Notification    โ”‚               โ”‚
โ”‚  โ”‚  Service        โ”‚      โ”‚  Service         โ”‚               โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚
โ”‚                                                                 โ”‚
โ”‚  โœ“ Atomic: Both DB update and outbox write in one transaction  โ”‚
โ”‚  โœ“ Reliable: Events guaranteed to be published                  โ”‚
โ”‚  โœ“ Simple: No distributed transactions needed                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Implementation

Database Schema

-- PostgreSQL outbox table
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMPTZ,
    retry_count INT DEFAULT 0,
    last_error TEXT,
    processed BOOLEAN DEFAULT FALSE
);

CREATE INDEX idx_outbox_unprocessed ON outbox(created_at) 
    WHERE processed = FALSE;

CREATE INDEX idx_outbox_aggregate ON outbox(aggregate_type, aggregate_id);

-- MySQL outbox table
CREATE TABLE outbox (
    id CHAR(36) PRIMARY KEY,
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSON NOT NULL,
    metadata JSON DEFAULT ('{}'),
    created_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    processed_at TIMESTAMP(3),
    retry_count INT DEFAULT 0,
    last_error TEXT,
    processed BOOLEAN DEFAULT FALSE,
    INDEX idx_outbox_unprocessed (created_at)
) ENGINE=InnoDB;

Event Publisher Service

import asyncio
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import aio_pika
import asyncpg

logger = logging.getLogger(__name__)

@dataclass
class OutboxEvent:
    id: str
    aggregate_type: str
    aggregate_id: str
    event_type: str
    payload: dict
    metadata: dict
    created_at: datetime


class OutboxRepository:
    def __init__(self, db_pool: asyncpg.Pool):
        self.pool = db_pool
    
    async def create_event(
        self,
        aggregate_type: str,
        aggregate_id: str,
        event_type: str,
        payload: dict,
        metadata: Optional[dict] = None
    ) -> OutboxEvent:
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow("""
                INSERT INTO outbox 
                (aggregate_type, aggregate_id, event_type, payload, metadata)
                VALUES ($1, $2, $3, $4, $5)
                RETURNING *
            """, aggregate_type, aggregate_id, event_type, 
                 json.dumps(payload), json.dumps(metadata or {}))
            
            return self._row_to_event(row)
    
    async def get_unprocessed_events(
        self, 
        limit: int = 100
    ) -> list[OutboxEvent]:
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT * FROM outbox 
                WHERE processed = FALSE 
                AND retry_count < 5
                ORDER BY created_at ASC
                LIMIT $1
            """, limit)
            
            return [self._row_to_event(row) for row in rows]
    
    async def mark_processed(self, event_id: str):
        async with self.pool.acquire() as conn:
            await conn.execute("""
                UPDATE outbox 
                SET processed = TRUE, 
                    processed_at = NOW()
                WHERE id = $1
            """, event_id)
    
    async def mark_failed(
        self, 
        event_id: str, 
        error: str,
        retry_count: int
    ):
        async with self.pool.acquire() as conn:
            await conn.execute("""
                UPDATE outbox 
                SET last_error = $2,
                    retry_count = $3
                WHERE id = $1
            """, event_id, error, retry_count)
    
    def _row_to_event(self, row) -> OutboxEvent:
        return OutboxEvent(
            id=str(row["id"]),
            aggregate_type=row["aggregate_type"],
            aggregate_id=str(row["aggregate_id"]),
            event_type=row["event_type"],
            payload=json.loads(row["payload"]),
            metadata=json.loads(row["metadata"]),
            created_at=row["created_at"]
        )


class MessageBrokerPublisher:
    def __init__(self, rabbitmq_url: str):
        self.rabbitmq_url = rabbitmq_url
        self.connection: Optional[aio_pika.Connection] = None
        self.channel: Optional[aio_pika.Channel] = None
    
    async def connect(self):
        self.connection = await aio_pika.connect_robust(self.rabbitmq_url)
        self.channel = await self.connection.channel()
    
    async def publish(
        self,
        exchange: str,
        routing_key: str,
        event: OutboxEvent
    ):
        message = aio_pika.Message(
            body=json.dumps({
                "event_id": event.id,
                "event_type": event.event_type,
                "aggregate_type": event.aggregate_type,
                "aggregate_id": event.aggregate_id,
                "payload": event.payload,
                "metadata": event.metadata,
                "timestamp": event.created_at.isoformat()
            }).encode(),
            content_type="application/json",
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
            headers={
                "event-type": event.event_type,
                "aggregate-id": event.aggregate_id
            }
        )
        
        await self.channel.default_exchange.publish(
            message,
            routing_key=routing_key
        )
    
    async def close(self):
        if self.connection:
            await self.connection.close()

Outbox Relay Worker

class OutboxRelay:
    def __init__(
        self,
        outbox_repo: OutboxRepository,
        publisher: MessageBrokerPublisher,
        batch_size: int = 100,
        poll_interval: float = 1.0
    ):
        self.outbox = outbox_repo
        self.publisher = publisher
        self.batch_size = batch_size
        self.poll_interval = poll_interval
        self._running = False
    
    async def start(self):
        self._running = True
        logger.info("Outbox relay started")
        
        while self._running:
            try:
                await self._process_events()
            except Exception as e:
                logger.error(f"Error processing outbox: {e}")
            
            await asyncio.sleep(self.poll_interval)
    
    async def stop(self):
        self._running = False
        logger.info("Outbox relay stopped")
    
    async def _process_events(self):
        events = await self.outbox.get_unprocessed_events(self.batch_size)
        
        if not events:
            return
        
        for event in events:
            try:
                exchange = self._get_exchange(event.aggregate_type)
                routing_key = self._get_routing_key(event.event_type)
                
                await self.publisher.publish(
                    exchange=exchange,
                    routing_key=routing_key,
                    event=event
                )
                
                await self.outbox.mark_processed(event.id)
                
                logger.info(f"Published event {event.id}: {event.event_type}")
                
            except Exception as e:
                logger.error(f"Failed to publish event {event.id}: {e}")
                
                await self.outbox.mark_failed(
                    event.id, 
                    str(e),
                    retry_count=1
                )
    
    def _get_exchange(self, aggregate_type: str) -> str:
        exchange_map = {
            "Order": "orders.events",
            "Payment": "payments.events",
            "User": "users.events",
            "Product": "products.events"
        }
        return exchange_map.get(aggregate_type, "default.events")
    
    def _get_routing_key(self, event_type: str) -> str:
        return event_type.lower().replace("event", "")


class OutboxRelayRunner:
    def __init__(self, config: dict):
        self.db_pool = asyncpg.create_pool(config["database_url"])
        self.publisher = MessageBrokerPublisher(config["rabbitmq_url"])
        self.relay = OutboxRelay(
            OutboxRepository(self.db_pool),
            self.publisher
        )
    
    async def run(self):
        await self.publisher.connect()
        
        await self.relay.start()
    
    async def shutdown(self):
        await self.relay.stop()
        await self.publisher.close()
        await self.db_pool.close()

Transactional Outbox in Service

class OrderService:
    def __init__(self, db_pool: asyncpg.Pool, outbox_repo: OutboxRepository):
        self.db = db_pool
        self.outbox = outbox_repo
    
    async def create_order(self, order_data: dict) -> Order:
        async with self.db.acquire() as conn:
            async with conn.transaction():
                order = await conn.fetchrow("""
                    INSERT INTO orders (customer_id, items, total_amount, status)
                    VALUES ($1, $2, $3, 'pending')
                    RETURNING *
                """, order_data["customer_id"], 
                    json.dumps(order_data["items"]),
                    order_data["total_amount"])
                
                await self.outbox.create_event(
                    aggregate_type="Order",
                    aggregate_id=str(order["id"]),
                    event_type="OrderCreatedEvent",
                    payload={
                        "order_id": str(order["id"]),
                        "customer_id": order["customer_id"],
                        "items": order_data["items"],
                        "total_amount": order_data["total_amount"]
                    }
                )
                
                return Order(**dict(order))
    
    async def mark_order_paid(self, order_id: str, payment_id: str):
        async with self.db.acquire() as conn:
            async with conn.transaction():
                await conn.execute("""
                    UPDATE orders 
                    SET status = 'paid', 
                        payment_id = $2,
                        paid_at = NOW()
                    WHERE id = $1
                """, order_id, payment_id)
                
                await self.outbox.create_event(
                    aggregate_type="Order",
                    aggregate_id=order_id,
                    event_type="OrderPaidEvent",
                    payload={
                        "order_id": order_id,
                        "payment_id": payment_id,
                        "paid_at": datetime.utcnow().isoformat()
                    }
                )

Change Data Capture (CDC) Approach

Debezium Integration

# docker-compose.yml for Debezium
version: '3.8'

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: orders
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  debezium:
    image: debezium/connect:2.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: debezium-group
      CONFIG_STORAGE_TOPIC: debezium_configs
      OFFSET_STORAGE_TOPIC: debezium_offsets
      STATUS_STORAGE_TOPIC: debezium_status
    ports:
      - "8083:8083"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
    ports:
      - "9092:9092"
{
  "name": "orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "orders",
    "database.server.name": "orders",
    "table.include.list": "public.outbox",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Building Custom CDC

class PollingCDC:
    """Simple CDC using polling of the outbox table."""
    
    def __init__(
        self,
        db_pool: asyncpg.Pool,
        kafka_producer: AIOKafkaProducer,
        last_sequence: int = 0
    ):
        self.db = db_pool
        self.kafka = kafka_producer
        self.last_sequence = last_sequence
    
    async def poll_and_publish(self):
        async with self.db.acquire() as conn:
            rows = await conn.fetch("""
                SELECT * FROM outbox 
                WHERE id > $1 
                AND processed = FALSE
                ORDER BY id ASC
                LIMIT 1000
            """, str(self.last_sequence))
        
        for row in rows:
            await self._publish_to_kafka(row)
            self.last_sequence = row["id"]
        
        return len(rows)
    
    async def _publish_to_kafka(self, row):
        await self.kafka.send(
            topic=f"{row['aggregate_type'].lower()}.events",
            key=row["aggregate_id"].encode(),
            value=json.dumps({
                "event_type": row["event_type"],
                "payload": json.loads(row["payload"]),
                "metadata": json.loads(row["metadata"]),
                "timestamp": row["created_at"].isoformat()
            }).encode()
        )

Handling Failures

Idempotent Processing

class IdempotentEventHandler:
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def process_event(self, event: OutboxEvent) -> bool:
        processed_key = f"event:processed:{event.id}"
        
        already_processed = await self.redis.exists(processed_key)
        if already_processed:
            logger.info(f"Event {event.id} already processed, skipping")
            return True
        
        try:
            await self._handle_event(event)
            
            await self.redis.setex(
                processed_key,
                86400 * 7,  # Keep for 7 days
                "1"
            )
            
            return True
            
        except Exception as e:
            logger.error(f"Failed to handle event {event.id}: {e}")
            raise
    
    async def _handle_event(self, event: OutboxEvent):
        pass  # Implement actual handling

Retry with Dead Letter Queue

class OutboxWithDLQ:
    def __init__(self, outbox_repo: OutboxRepository, dlq_topic: str):
        self.outbox = outbox_repo
        self.dlq_topic = dlq_topic
        self.max_retries = 5
    
    async def process_event(self, event: OutboxEvent) -> bool:
        try:
            await self._publish(event)
            await self.outbox.mark_processed(event.id)
            return True
            
        except Exception as e:
            new_retry_count = event.metadata.get("retry_count", 0) + 1
            
            if new_retry_count >= self.max_retries:
                await self._send_to_dlq(event, str(e))
                await self.outbox.mark_processed(event.id)
                logger.error(f"Event {event.id} sent to DLQ after {new_retry_count} retries")
            else:
                await self.outbox.mark_failed(
                    event.id,
                    str(e),
                    new_retry_count
                )
            
            return False
    
    async def _send_to_dlq(self, event: OutboxEvent, error: str):
        dlq_message = {
            "original_event": {
                "id": event.id,
                "type": event.event_type,
                "aggregate_id": event.aggregate_id,
                "payload": event.payload
            },
            "error": error,
            "failed_at": datetime.utcnow().isoformat()
        }
        
        await self.kafka.send(
            topic=self.dlq_topic,
            key=event.aggregate_id.encode(),
            value=json.dumps(dlq_message).encode()
        )

Best Practices

GOOD_PATTERNS = {
    "use_jsonb_postgres": """
# Use JSONB for payload flexibility

โœ… Good:
payload JSONB NOT NULL
# Can query by payload fields
# Fast serialization
# Schema evolution support

โŒ Bad:
payload TEXT NOT NULL
# Must parse on every read
# No query capability
""",
    
    "keep_payload_small": """
# Don't store entire objects

โœ… Good:
payload: {"order_id": "123", "status": "paid"}

โŒ Bad:
payload: {"order": {...entire order object...}}
# Large payload = slow replication
# Data duplication
""",
    
    "index_wisely": """
# Index for polling efficiency

โœ… Good:
CREATE INDEX idx_outbox_unprocessed ON outbox(created_at) 
    WHERE processed = FALSE;

# Fast finding of next events to process

โŒ Bad:
# No index, full table scan every poll
"""
}

BAD_PATTERNS = {
    "publish_directly": """
โŒ Bad:
async def create_order():
    await db.execute("INSERT INTO orders...")
    await message_queue.publish(event)  # Not atomic!

# If publish fails, data is inconsistent

โœ… Good:
async def create_order():
    async with transaction():
        await db.execute("INSERT INTO orders...")
        await db.execute("INSERT INTO outbox...")
    # Both succeed or both fail
""",
    
    "no_ordering": """
โŒ Bad:
# Process events out of order
SELECT * FROM outbox WHERE processed = FALSE

# Can cause wrong state if events have dependencies

โœ… Good:
ORDER BY created_at ASC
# Preserve event ordering for consistency
""",
    
    "infinite_retries": """
โŒ Bad:
# Never give up on failed events
# Can block processing of other events

โœ… Good:
retry_count < 5
# After max retries, move to DLQ
# Manual intervention for problematic events
"""
}

Summary

The Outbox Pattern provides reliable event publishing:

  • Atomicity - Database changes and event creation happen in a single transaction
  • Reliability - Events are guaranteed to be published (at-least-once)
  • Simplicity - No distributed transactions or complex coordination needed
  • Scalability - Outbox relay can scale horizontally

Two main approaches:

  1. Polling - Simple, works with any database, suitable for low-medium volume
  2. CDC (Debezium) - More complex but more scalable, real-time streaming

The pattern is essential for building reliable event-driven microservices without sacrificing data consistency.

Comments