Skip to main content

Outbox Pattern: Reliable Event Publishing in Microservices

Published: February 28, 2026 Updated: May 11, 2026 Larry Qu 10 min read

The Outbox Pattern solves a fundamental problem in microservices: how to reliably publish events when updating a database, without requiring distributed transactions. It ensures that database changes and event publishing happen atomically.

The Dual-Write Problem

The Outbox Pattern solves the dual-write problem: the challenge of atomically updating a database and publishing an event. Without the outbox, you face an impossible dilemma. Write to the database first, and the event may be lost if the publish fails. Publish the event first, and the consumer may act on stale data if the subsequent database write fails.

The pattern works in three steps: (1) write the event to an outbox table within the same database transaction as the business data, (2) a separate process reads the outbox table and publishes events to the message broker, and (3) events are deleted or marked as published after successful delivery. This guarantees at-least-once delivery, meaning consumers must be idempotent.

Two implementation approaches exist. The transactional outbox uses the same database transaction for strong consistency. Change data capture (CDC) with tools like Debezium and Kafka Connect streams database changes directly, offering looser coupling at the cost of higher latency. The outbox table itself should be lightweight: event_id, aggregate_id, event_type, payload (JSON), and created_at. A common pitfall is publishing events without removing them from the outbox, causing infinite replays. Always monitor outbox table size — sustained growth indicates a publishing failure.

The Problem

Without Outbox Pattern

┌─────────────────────────────────────────────────────────────────┐
│         Race Condition Without Outbox Pattern                      │
│                                                                 │
│  ┌──────────────────┐      ┌──────────────────┐               │
│  │  Order Service   │      │  Message Queue   │               │
│  │                  │      │                  │               │
│  │  1. UPDATE order │      │                  │               │
│  │     status='paid'│      │                  │               │
│  │                  │      │                  │               │
│  │  2. PUBLISH     │ ────▶│  OrderPaidEvent  │               │
│  │     event       │      │                  │               │
│  └──────────────────┘      └──────────────────┘               │
│         │                                                       │
│         ▼                                                       │
│  Problems:                                                      │
│  ✗ If step 2 fails: DB updated but no event                    │
│  ✗ If step 2 crashes: Inconsistent state                        │
│  ✗ If step 2 times out: Unknown if event published              │
│  ✗ No atomicity between DB and message queue                    │
└─────────────────────────────────────────────────────────────────┘

With Outbox Pattern

┌─────────────────────────────────────────────────────────────────┐
│            Outbox Pattern Solution                                 │
│                                                                 │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                  Order Service (Single DB)                 │  │
│  │                                                            │  │
│  │  ┌─────────────────────────────────────────────────┐    │  │
│  │  │  orders table                                     │    │  │
│  │  │  ─────────────────────────────────────────────    │    │  │
│  │  │  id: 123                                         │    │  │
│  │  │  status: 'paid'                                  │    │  │
│  │  │  ...                                             │    │  │
│  │  └─────────────────────────────────────────────────┘    │  │
│  │                          │                               │  │
│  │  ┌─────────────────────────────────────────────────┐    │  │
│  │  │  outbox table (same transaction!)              │    │  │
│  │  │  ─────────────────────────────────────────────    │    │  │
│  │  │  id: 1                                         │    │  │
│  │  │  aggregate_type: 'Order'                        │    │  │
│  │  │  aggregate_id: '123'                           │    │  │
│  │  │  event_type: 'OrderPaidEvent'                  │    │  │
│  │  │  payload: {...}                                │    │  │
│  │  │  created_at: '2026-02-28T10:00:00Z'          │    │  │
│  │  └─────────────────────────────────────────────────┘    │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                   │
│                              ▼                                   │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │              Outbox Relay (Background Worker)              │  │
│  │                                                            │  │
│  │  1. SELECT * FROM outbox WHERE processed = false          │  │
│  │  2. PUBLISH each event to message broker                   │  │
│  │  3. UPDATE outbox SET processed = true                    │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                   │
│                              ▼                                   │
│  ┌──────────────────┐      ┌──────────────────┐               │
│  │  Inventory       │      │  Notification    │               │
│  │  Service        │      │  Service         │               │
│  └──────────────────┘      └──────────────────┘               │
│                                                                 │
│  ✓ Atomic: Both DB update and outbox write in one transaction│  ✓ Reliable: Events guaranteed to be published                  │
│  ✓ Simple: No distributed transactions needed                  │
└─────────────────────────────────────────────────────────────────┘

Implementation

Database Schema

-- PostgreSQL outbox table
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMPTZ,
    retry_count INT DEFAULT 0,
    last_error TEXT,
    processed BOOLEAN DEFAULT FALSE
);

CREATE INDEX idx_outbox_unprocessed ON outbox(created_at) 
    WHERE processed = FALSE;

CREATE INDEX idx_outbox_aggregate ON outbox(aggregate_type, aggregate_id);

-- MySQL outbox table
CREATE TABLE outbox (
    id CHAR(36) PRIMARY KEY,
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSON NOT NULL,
    metadata JSON DEFAULT ('{}'),
    created_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    processed_at TIMESTAMP(3),
    retry_count INT DEFAULT 0,
    last_error TEXT,
    processed BOOLEAN DEFAULT FALSE,
    INDEX idx_outbox_unprocessed (created_at)
) ENGINE=InnoDB;

Event Publisher Service

import asyncio
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import aio_pika
import asyncpg

logger = logging.getLogger(__name__)

@dataclass
class OutboxEvent:
    id: str
    aggregate_type: str
    aggregate_id: str
    event_type: str
    payload: dict
    metadata: dict
    created_at: datetime


class OutboxRepository:
    def __init__(self, db_pool: asyncpg.Pool):
        self.pool = db_pool
    
    async def create_event(
        self,
        aggregate_type: str,
        aggregate_id: str,
        event_type: str,
        payload: dict,
        metadata: Optional[dict] = None
    ) -> OutboxEvent:
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow("""
                INSERT INTO outbox 
                (aggregate_type, aggregate_id, event_type, payload, metadata)
                VALUES ($1, $2, $3, $4, $5)
                RETURNING *
            """, aggregate_type, aggregate_id, event_type, 
                 json.dumps(payload), json.dumps(metadata or {}))
            
            return self._row_to_event(row)
    
    async def get_unprocessed_events(
        self, 
        limit: int = 100
    ) -> list[OutboxEvent]:
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT * FROM outbox 
                WHERE processed = FALSE 
                AND retry_count < 5
                ORDER BY created_at ASC
                LIMIT $1
            """, limit)
            
            return [self._row_to_event(row) for row in rows]
    
    async def mark_processed(self, event_id: str):
        async with self.pool.acquire() as conn:
            await conn.execute("""
                UPDATE outbox 
                SET processed = TRUE, 
                    processed_at = NOW()
                WHERE id = $1
            """, event_id)
    
    async def mark_failed(
        self, 
        event_id: str, 
        error: str,
        retry_count: int
    ):
        async with self.pool.acquire() as conn:
            await conn.execute("""
                UPDATE outbox 
                SET last_error = $2,
                    retry_count = $3
                WHERE id = $1
            """, event_id, error, retry_count)
    
    def _row_to_event(self, row) -> OutboxEvent:
        return OutboxEvent(
            id=str(row["id"]),
            aggregate_type=row["aggregate_type"],
            aggregate_id=str(row["aggregate_id"]),
            event_type=row["event_type"],
            payload=json.loads(row["payload"]),
            metadata=json.loads(row["metadata"]),
            created_at=row["created_at"]
        )


class MessageBrokerPublisher:
    def __init__(self, rabbitmq_url: str):
        self.rabbitmq_url = rabbitmq_url
        self.connection: Optional[aio_pika.Connection] = None
        self.channel: Optional[aio_pika.Channel] = None
    
    async def connect(self):
        self.connection = await aio_pika.connect_robust(self.rabbitmq_url)
        self.channel = await self.connection.channel()
    
    async def publish(
        self,
        exchange: str,
        routing_key: str,
        event: OutboxEvent
    ):
        message = aio_pika.Message(
            body=json.dumps({
                "event_id": event.id,
                "event_type": event.event_type,
                "aggregate_type": event.aggregate_type,
                "aggregate_id": event.aggregate_id,
                "payload": event.payload,
                "metadata": event.metadata,
                "timestamp": event.created_at.isoformat()
            }).encode(),
            content_type="application/json",
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
            headers={
                "event-type": event.event_type,
                "aggregate-id": event.aggregate_id
            }
        )
        
        await self.channel.default_exchange.publish(
            message,
            routing_key=routing_key
        )
    
    async def close(self):
        if self.connection:
            await self.connection.close()

Outbox Relay Worker

class OutboxRelay:
    def __init__(
        self,
        outbox_repo: OutboxRepository,
        publisher: MessageBrokerPublisher,
        batch_size: int = 100,
        poll_interval: float = 1.0
    ):
        self.outbox = outbox_repo
        self.publisher = publisher
        self.batch_size = batch_size
        self.poll_interval = poll_interval
        self._running = False
    
    async def start(self):
        self._running = True
        logger.info("Outbox relay started")
        
        while self._running:
            try:
                await self._process_events()
            except Exception as e:
                logger.error(f"Error processing outbox: {e}")
            
            await asyncio.sleep(self.poll_interval)
    
    async def stop(self):
        self._running = False
        logger.info("Outbox relay stopped")
    
    async def _process_events(self):
        events = await self.outbox.get_unprocessed_events(self.batch_size)
        
        if not events:
            return
        
        for event in events:
            try:
                exchange = self._get_exchange(event.aggregate_type)
                routing_key = self._get_routing_key(event.event_type)
                
                await self.publisher.publish(
                    exchange=exchange,
                    routing_key=routing_key,
                    event=event
                )
                
                await self.outbox.mark_processed(event.id)
                
                logger.info(f"Published event {event.id}: {event.event_type}")
                
            except Exception as e:
                logger.error(f"Failed to publish event {event.id}: {e}")
                
                await self.outbox.mark_failed(
                    event.id, 
                    str(e),
                    retry_count=1
                )
    
    def _get_exchange(self, aggregate_type: str) -> str:
        exchange_map = {
            "Order": "orders.events",
            "Payment": "payments.events",
            "User": "users.events",
            "Product": "products.events"
        }
        return exchange_map.get(aggregate_type, "default.events")
    
    def _get_routing_key(self, event_type: str) -> str:
        return event_type.lower().replace("event", "")


class OutboxRelayRunner:
    def __init__(self, config: dict):
        self.db_pool = asyncpg.create_pool(config["database_url"])
        self.publisher = MessageBrokerPublisher(config["rabbitmq_url"])
        self.relay = OutboxRelay(
            OutboxRepository(self.db_pool),
            self.publisher
        )
    
    async def run(self):
        await self.publisher.connect()
        
        await self.relay.start()
    
    async def shutdown(self):
        await self.relay.stop()
        await self.publisher.close()
        await self.db_pool.close()

Transactional Outbox in Service

class OrderService:
    def __init__(self, db_pool: asyncpg.Pool, outbox_repo: OutboxRepository):
        self.db = db_pool
        self.outbox = outbox_repo
    
    async def create_order(self, order_data: dict) -> Order:
        async with self.db.acquire() as conn:
            async with conn.transaction():
                order = await conn.fetchrow("""
                    INSERT INTO orders (customer_id, items, total_amount, status)
                    VALUES ($1, $2, $3, 'pending')
                    RETURNING *
                """, order_data["customer_id"], 
                    json.dumps(order_data["items"]),
                    order_data["total_amount"])
                
                await self.outbox.create_event(
                    aggregate_type="Order",
                    aggregate_id=str(order["id"]),
                    event_type="OrderCreatedEvent",
                    payload={
                        "order_id": str(order["id"]),
                        "customer_id": order["customer_id"],
                        "items": order_data["items"],
                        "total_amount": order_data["total_amount"]
                    }
                )
                
                return Order(**dict(order))
    
    async def mark_order_paid(self, order_id: str, payment_id: str):
        async with self.db.acquire() as conn:
            async with conn.transaction():
                await conn.execute("""
                    UPDATE orders 
                    SET status = 'paid', 
                        payment_id = $2,
                        paid_at = NOW()
                    WHERE id = $1
                """, order_id, payment_id)
                
                await self.outbox.create_event(
                    aggregate_type="Order",
                    aggregate_id=order_id,
                    event_type="OrderPaidEvent",
                    payload={
                        "order_id": order_id,
                        "payment_id": payment_id,
                        "paid_at": datetime.utcnow().isoformat()
                    }
                )

Change Data Capture (CDC) Approach

Debezium Integration

# docker-compose.yml for Debezium
version: '3.8'

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: orders
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  debezium:
    image: debezium/connect:2.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: debezium-group
      CONFIG_STORAGE_TOPIC: debezium_configs
      OFFSET_STORAGE_TOPIC: debezium_offsets
      STATUS_STORAGE_TOPIC: debezium_status
    ports:
      - "8083:8083"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
    ports:
      - "9092:9092"
{
  "name": "orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "orders",
    "database.server.name": "orders",
    "table.include.list": "public.outbox",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Building Custom CDC

class PollingCDC:
    """Simple CDC using polling of the outbox table."""
    
    def __init__(
        self,
        db_pool: asyncpg.Pool,
        kafka_producer: AIOKafkaProducer,
        last_sequence: int = 0
    ):
        self.db = db_pool
        self.kafka = kafka_producer
        self.last_sequence = last_sequence
    
    async def poll_and_publish(self):
        async with self.db.acquire() as conn:
            rows = await conn.fetch("""
                SELECT * FROM outbox 
                WHERE id > $1 
                AND processed = FALSE
                ORDER BY id ASC
                LIMIT 1000
            """, str(self.last_sequence))
        
        for row in rows:
            await self._publish_to_kafka(row)
            self.last_sequence = row["id"]
        
        return len(rows)
    
    async def _publish_to_kafka(self, row):
        await self.kafka.send(
            topic=f"{row['aggregate_type'].lower()}.events",
            key=row["aggregate_id"].encode(),
            value=json.dumps({
                "event_type": row["event_type"],
                "payload": json.loads(row["payload"]),
                "metadata": json.loads(row["metadata"]),
                "timestamp": row["created_at"].isoformat()
            }).encode()
        )

Handling Failures

Idempotent Processing

class IdempotentEventHandler:
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def process_event(self, event: OutboxEvent) -> bool:
        processed_key = f"event:processed:{event.id}"
        
        already_processed = await self.redis.exists(processed_key)
        if already_processed:
            logger.info(f"Event {event.id} already processed, skipping")
            return True
        
        try:
            await self._handle_event(event)
            
            await self.redis.setex(
                processed_key,
                86400 * 7,  # Keep for 7 days
                "1"
            )
            
            return True
            
        except Exception as e:
            logger.error(f"Failed to handle event {event.id}: {e}")
            raise
    
    async def _handle_event(self, event: OutboxEvent):
        pass  # Implement actual handling

Retry with Dead Letter Queue

class OutboxWithDLQ:
    def __init__(self, outbox_repo: OutboxRepository, dlq_topic: str):
        self.outbox = outbox_repo
        self.dlq_topic = dlq_topic
        self.max_retries = 5
    
    async def process_event(self, event: OutboxEvent) -> bool:
        try:
            await self._publish(event)
            await self.outbox.mark_processed(event.id)
            return True
            
        except Exception as e:
            new_retry_count = event.metadata.get("retry_count", 0) + 1
            
            if new_retry_count >= self.max_retries:
                await self._send_to_dlq(event, str(e))
                await self.outbox.mark_processed(event.id)
                logger.error(f"Event {event.id} sent to DLQ after {new_retry_count} retries")
            else:
                await self.outbox.mark_failed(
                    event.id,
                    str(e),
                    new_retry_count
                )
            
            return False
    
    async def _send_to_dlq(self, event: OutboxEvent, error: str):
        dlq_message = {
            "original_event": {
                "id": event.id,
                "type": event.event_type,
                "aggregate_id": event.aggregate_id,
                "payload": event.payload
            },
            "error": error,
            "failed_at": datetime.utcnow().isoformat()
        }
        
        await self.kafka.send(
            topic=self.dlq_topic,
            key=event.aggregate_id.encode(),
            value=json.dumps(dlq_message).encode()
        )

Best Practices

GOOD_PATTERNS = {
    "use_jsonb_postgres": """
# Use JSONB for payload flexibility

✅ Good:
payload JSONB NOT NULL
# Can query by payload fields
# Fast serialization
# Schema evolution support

❌ Bad:
payload TEXT NOT NULL
# Must parse on every read
# No query capability
""",
    
    "keep_payload_small": """
# Don't store entire objects

✅ Good:
payload: {"order_id": "123", "status": "paid"}

❌ Bad:
payload: {"order": {...entire order object...}}
# Large payload = slow replication
# Data duplication
""",
    
    "index_wisely": """
# Index for polling efficiency

✅ Good:
CREATE INDEX idx_outbox_unprocessed ON outbox(created_at) 
    WHERE processed = FALSE;

# Fast finding of next events to process

❌ Bad:
# No index, full table scan every poll
"""
}

BAD_PATTERNS = {
    "publish_directly": """
❌ Bad:
async def create_order():
    await db.execute("INSERT INTO orders...")
    await message_queue.publish(event)  # Not atomic!

# If publish fails, data is inconsistent

✅ Good:
async def create_order():
    async with transaction():
        await db.execute("INSERT INTO orders...")
        await db.execute("INSERT INTO outbox...")
    # Both succeed or both fail
""",
    
    "no_ordering": """
❌ Bad:
# Process events out of order
SELECT * FROM outbox WHERE processed = FALSE

# Can cause wrong state if events have dependencies

✅ Good:
ORDER BY created_at ASC
# Preserve event ordering for consistency
""",
    
    "infinite_retries": """
❌ Bad:
# Never give up on failed events
# Can block processing of other events

✅ Good:
retry_count < 5
# After max retries, move to DLQ
# Manual intervention for problematic events
"""
}

Summary

The Outbox Pattern provides reliable event publishing:

  • Atomicity - Database changes and event creation happen in a single transaction
  • Reliability - Events are guaranteed to be published (at-least-once)
  • Simplicity - No distributed transactions or complex coordination needed
  • Scalability - Outbox relay can scale horizontally

Two main approaches:

  1. Polling - Simple, works with any database, suitable for low-medium volume
  2. CDC (Debezium) - More complex but more scalable, real-time streaming

The pattern is essential for building reliable event-driven microservices without sacrificing data consistency.

Comments

👍 Was this article helpful?