The Outbox Pattern solves a fundamental problem in microservices: how to reliably publish events when updating a database, without requiring distributed transactions. It ensures that database changes and event publishing happen atomically.
The Dual-Write Problem
The Outbox Pattern solves the dual-write problem: the challenge of atomically updating a database and publishing an event. Without the outbox, you face an impossible dilemma. Write to the database first, and the event may be lost if the publish fails. Publish the event first, and the consumer may act on stale data if the subsequent database write fails.
The pattern works in three steps: (1) write the event to an outbox table within the same database transaction as the business data, (2) a separate process reads the outbox table and publishes events to the message broker, and (3) events are deleted or marked as published after successful delivery. This guarantees at-least-once delivery, meaning consumers must be idempotent.
Two implementation approaches exist. The transactional outbox uses the same database transaction for strong consistency. Change data capture (CDC) with tools like Debezium and Kafka Connect streams database changes directly, offering looser coupling at the cost of higher latency. The outbox table itself should be lightweight: event_id, aggregate_id, event_type, payload (JSON), and created_at. A common pitfall is publishing events without removing them from the outbox, causing infinite replays. Always monitor outbox table size — sustained growth indicates a publishing failure.
The Problem
Without Outbox Pattern
┌─────────────────────────────────────────────────────────────────┐
│ Race Condition Without Outbox Pattern │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Order Service │ │ Message Queue │ │
│ │ │ │ │ │
│ │ 1. UPDATE order │ │ │ │
│ │ status='paid'│ │ │ │
│ │ │ │ │ │
│ │ 2. PUBLISH │ ────▶│ OrderPaidEvent │ │
│ │ event │ │ │ │
│ └──────────────────┘ └──────────────────┘ │
│ │ │
│ ▼ │
│ Problems: │
│ ✗ If step 2 fails: DB updated but no event │
│ ✗ If step 2 crashes: Inconsistent state │
│ ✗ If step 2 times out: Unknown if event published │
│ ✗ No atomicity between DB and message queue │
└─────────────────────────────────────────────────────────────────┘
With Outbox Pattern
┌─────────────────────────────────────────────────────────────────┐
│ Outbox Pattern Solution │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Order Service (Single DB) │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ orders table │ │ │
│ │ │ ───────────────────────────────────────────── │ │ │
│ │ │ id: 123 │ │ │
│ │ │ status: 'paid' │ │ │
│ │ │ ... │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ outbox table (same transaction!) │ │ │
│ │ │ ───────────────────────────────────────────── │ │ │
│ │ │ id: 1 │ │ │
│ │ │ aggregate_type: 'Order' │ │ │
│ │ │ aggregate_id: '123' │ │ │
│ │ │ event_type: 'OrderPaidEvent' │ │ │
│ │ │ payload: {...} │ │ │
│ │ │ created_at: '2026-02-28T10:00:00Z' │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Outbox Relay (Background Worker) │ │
│ │ │ │
│ │ 1. SELECT * FROM outbox WHERE processed = false │ │
│ │ 2. PUBLISH each event to message broker │ │
│ │ 3. UPDATE outbox SET processed = true │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Inventory │ │ Notification │ │
│ │ Service │ │ Service │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
│ ✓ Atomic: Both DB update and outbox write in one transaction │
│ ✓ Reliable: Events guaranteed to be published │
│ ✓ Simple: No distributed transactions needed │
└─────────────────────────────────────────────────────────────────┘
Implementation
Database Schema
-- PostgreSQL outbox table
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ,
retry_count INT DEFAULT 0,
last_error TEXT,
processed BOOLEAN DEFAULT FALSE
);
CREATE INDEX idx_outbox_unprocessed ON outbox(created_at)
WHERE processed = FALSE;
CREATE INDEX idx_outbox_aggregate ON outbox(aggregate_type, aggregate_id);
-- MySQL outbox table
CREATE TABLE outbox (
id CHAR(36) PRIMARY KEY,
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
metadata JSON DEFAULT ('{}'),
created_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
processed_at TIMESTAMP(3),
retry_count INT DEFAULT 0,
last_error TEXT,
processed BOOLEAN DEFAULT FALSE,
INDEX idx_outbox_unprocessed (created_at)
) ENGINE=InnoDB;
Event Publisher Service
import asyncio
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import aio_pika
import asyncpg
logger = logging.getLogger(__name__)
@dataclass
class OutboxEvent:
id: str
aggregate_type: str
aggregate_id: str
event_type: str
payload: dict
metadata: dict
created_at: datetime
class OutboxRepository:
def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool
async def create_event(
self,
aggregate_type: str,
aggregate_id: str,
event_type: str,
payload: dict,
metadata: Optional[dict] = None
) -> OutboxEvent:
async with self.pool.acquire() as conn:
row = await conn.fetchrow("""
INSERT INTO outbox
(aggregate_type, aggregate_id, event_type, payload, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
""", aggregate_type, aggregate_id, event_type,
json.dumps(payload), json.dumps(metadata or {}))
return self._row_to_event(row)
async def get_unprocessed_events(
self,
limit: int = 100
) -> list[OutboxEvent]:
async with self.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT * FROM outbox
WHERE processed = FALSE
AND retry_count < 5
ORDER BY created_at ASC
LIMIT $1
""", limit)
return [self._row_to_event(row) for row in rows]
async def mark_processed(self, event_id: str):
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE outbox
SET processed = TRUE,
processed_at = NOW()
WHERE id = $1
""", event_id)
async def mark_failed(
self,
event_id: str,
error: str,
retry_count: int
):
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE outbox
SET last_error = $2,
retry_count = $3
WHERE id = $1
""", event_id, error, retry_count)
def _row_to_event(self, row) -> OutboxEvent:
return OutboxEvent(
id=str(row["id"]),
aggregate_type=row["aggregate_type"],
aggregate_id=str(row["aggregate_id"]),
event_type=row["event_type"],
payload=json.loads(row["payload"]),
metadata=json.loads(row["metadata"]),
created_at=row["created_at"]
)
class MessageBrokerPublisher:
def __init__(self, rabbitmq_url: str):
self.rabbitmq_url = rabbitmq_url
self.connection: Optional[aio_pika.Connection] = None
self.channel: Optional[aio_pika.Channel] = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.rabbitmq_url)
self.channel = await self.connection.channel()
async def publish(
self,
exchange: str,
routing_key: str,
event: OutboxEvent
):
message = aio_pika.Message(
body=json.dumps({
"event_id": event.id,
"event_type": event.event_type,
"aggregate_type": event.aggregate_type,
"aggregate_id": event.aggregate_id,
"payload": event.payload,
"metadata": event.metadata,
"timestamp": event.created_at.isoformat()
}).encode(),
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
headers={
"event-type": event.event_type,
"aggregate-id": event.aggregate_id
}
)
await self.channel.default_exchange.publish(
message,
routing_key=routing_key
)
async def close(self):
if self.connection:
await self.connection.close()
Outbox Relay Worker
class OutboxRelay:
def __init__(
self,
outbox_repo: OutboxRepository,
publisher: MessageBrokerPublisher,
batch_size: int = 100,
poll_interval: float = 1.0
):
self.outbox = outbox_repo
self.publisher = publisher
self.batch_size = batch_size
self.poll_interval = poll_interval
self._running = False
async def start(self):
self._running = True
logger.info("Outbox relay started")
while self._running:
try:
await self._process_events()
except Exception as e:
logger.error(f"Error processing outbox: {e}")
await asyncio.sleep(self.poll_interval)
async def stop(self):
self._running = False
logger.info("Outbox relay stopped")
async def _process_events(self):
events = await self.outbox.get_unprocessed_events(self.batch_size)
if not events:
return
for event in events:
try:
exchange = self._get_exchange(event.aggregate_type)
routing_key = self._get_routing_key(event.event_type)
await self.publisher.publish(
exchange=exchange,
routing_key=routing_key,
event=event
)
await self.outbox.mark_processed(event.id)
logger.info(f"Published event {event.id}: {event.event_type}")
except Exception as e:
logger.error(f"Failed to publish event {event.id}: {e}")
await self.outbox.mark_failed(
event.id,
str(e),
retry_count=1
)
def _get_exchange(self, aggregate_type: str) -> str:
exchange_map = {
"Order": "orders.events",
"Payment": "payments.events",
"User": "users.events",
"Product": "products.events"
}
return exchange_map.get(aggregate_type, "default.events")
def _get_routing_key(self, event_type: str) -> str:
return event_type.lower().replace("event", "")
class OutboxRelayRunner:
def __init__(self, config: dict):
self.db_pool = asyncpg.create_pool(config["database_url"])
self.publisher = MessageBrokerPublisher(config["rabbitmq_url"])
self.relay = OutboxRelay(
OutboxRepository(self.db_pool),
self.publisher
)
async def run(self):
await self.publisher.connect()
await self.relay.start()
async def shutdown(self):
await self.relay.stop()
await self.publisher.close()
await self.db_pool.close()
Transactional Outbox in Service
class OrderService:
def __init__(self, db_pool: asyncpg.Pool, outbox_repo: OutboxRepository):
self.db = db_pool
self.outbox = outbox_repo
async def create_order(self, order_data: dict) -> Order:
async with self.db.acquire() as conn:
async with conn.transaction():
order = await conn.fetchrow("""
INSERT INTO orders (customer_id, items, total_amount, status)
VALUES ($1, $2, $3, 'pending')
RETURNING *
""", order_data["customer_id"],
json.dumps(order_data["items"]),
order_data["total_amount"])
await self.outbox.create_event(
aggregate_type="Order",
aggregate_id=str(order["id"]),
event_type="OrderCreatedEvent",
payload={
"order_id": str(order["id"]),
"customer_id": order["customer_id"],
"items": order_data["items"],
"total_amount": order_data["total_amount"]
}
)
return Order(**dict(order))
async def mark_order_paid(self, order_id: str, payment_id: str):
async with self.db.acquire() as conn:
async with conn.transaction():
await conn.execute("""
UPDATE orders
SET status = 'paid',
payment_id = $2,
paid_at = NOW()
WHERE id = $1
""", order_id, payment_id)
await self.outbox.create_event(
aggregate_type="Order",
aggregate_id=order_id,
event_type="OrderPaidEvent",
payload={
"order_id": order_id,
"payment_id": payment_id,
"paid_at": datetime.utcnow().isoformat()
}
)
Change Data Capture (CDC) Approach
Debezium Integration
# docker-compose.yml for Debezium
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: orders
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
debezium:
image: debezium/connect:2.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: debezium-group
CONFIG_STORAGE_TOPIC: debezium_configs
OFFSET_STORAGE_TOPIC: debezium_offsets
STATUS_STORAGE_TOPIC: debezium_status
ports:
- "8083:8083"
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
ports:
- "9092:9092"
{
"name": "orders-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orders",
"database.server.name": "orders",
"table.include.list": "public.outbox",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Building Custom CDC
class PollingCDC:
"""Simple CDC using polling of the outbox table."""
def __init__(
self,
db_pool: asyncpg.Pool,
kafka_producer: AIOKafkaProducer,
last_sequence: int = 0
):
self.db = db_pool
self.kafka = kafka_producer
self.last_sequence = last_sequence
async def poll_and_publish(self):
async with self.db.acquire() as conn:
rows = await conn.fetch("""
SELECT * FROM outbox
WHERE id > $1
AND processed = FALSE
ORDER BY id ASC
LIMIT 1000
""", str(self.last_sequence))
for row in rows:
await self._publish_to_kafka(row)
self.last_sequence = row["id"]
return len(rows)
async def _publish_to_kafka(self, row):
await self.kafka.send(
topic=f"{row['aggregate_type'].lower()}.events",
key=row["aggregate_id"].encode(),
value=json.dumps({
"event_type": row["event_type"],
"payload": json.loads(row["payload"]),
"metadata": json.loads(row["metadata"]),
"timestamp": row["created_at"].isoformat()
}).encode()
)
Handling Failures
Idempotent Processing
class IdempotentEventHandler:
def __init__(self, redis: Redis):
self.redis = redis
async def process_event(self, event: OutboxEvent) -> bool:
processed_key = f"event:processed:{event.id}"
already_processed = await self.redis.exists(processed_key)
if already_processed:
logger.info(f"Event {event.id} already processed, skipping")
return True
try:
await self._handle_event(event)
await self.redis.setex(
processed_key,
86400 * 7, # Keep for 7 days
"1"
)
return True
except Exception as e:
logger.error(f"Failed to handle event {event.id}: {e}")
raise
async def _handle_event(self, event: OutboxEvent):
pass # Implement actual handling
Retry with Dead Letter Queue
class OutboxWithDLQ:
def __init__(self, outbox_repo: OutboxRepository, dlq_topic: str):
self.outbox = outbox_repo
self.dlq_topic = dlq_topic
self.max_retries = 5
async def process_event(self, event: OutboxEvent) -> bool:
try:
await self._publish(event)
await self.outbox.mark_processed(event.id)
return True
except Exception as e:
new_retry_count = event.metadata.get("retry_count", 0) + 1
if new_retry_count >= self.max_retries:
await self._send_to_dlq(event, str(e))
await self.outbox.mark_processed(event.id)
logger.error(f"Event {event.id} sent to DLQ after {new_retry_count} retries")
else:
await self.outbox.mark_failed(
event.id,
str(e),
new_retry_count
)
return False
async def _send_to_dlq(self, event: OutboxEvent, error: str):
dlq_message = {
"original_event": {
"id": event.id,
"type": event.event_type,
"aggregate_id": event.aggregate_id,
"payload": event.payload
},
"error": error,
"failed_at": datetime.utcnow().isoformat()
}
await self.kafka.send(
topic=self.dlq_topic,
key=event.aggregate_id.encode(),
value=json.dumps(dlq_message).encode()
)
Best Practices
GOOD_PATTERNS = {
"use_jsonb_postgres": """
# Use JSONB for payload flexibility
✅ Good:
payload JSONB NOT NULL
# Can query by payload fields
# Fast serialization
# Schema evolution support
❌ Bad:
payload TEXT NOT NULL
# Must parse on every read
# No query capability
""",
"keep_payload_small": """
# Don't store entire objects
✅ Good:
payload: {"order_id": "123", "status": "paid"}
❌ Bad:
payload: {"order": {...entire order object...}}
# Large payload = slow replication
# Data duplication
""",
"index_wisely": """
# Index for polling efficiency
✅ Good:
CREATE INDEX idx_outbox_unprocessed ON outbox(created_at)
WHERE processed = FALSE;
# Fast finding of next events to process
❌ Bad:
# No index, full table scan every poll
"""
}
BAD_PATTERNS = {
"publish_directly": """
❌ Bad:
async def create_order():
await db.execute("INSERT INTO orders...")
await message_queue.publish(event) # Not atomic!
# If publish fails, data is inconsistent
✅ Good:
async def create_order():
async with transaction():
await db.execute("INSERT INTO orders...")
await db.execute("INSERT INTO outbox...")
# Both succeed or both fail
""",
"no_ordering": """
❌ Bad:
# Process events out of order
SELECT * FROM outbox WHERE processed = FALSE
# Can cause wrong state if events have dependencies
✅ Good:
ORDER BY created_at ASC
# Preserve event ordering for consistency
""",
"infinite_retries": """
❌ Bad:
# Never give up on failed events
# Can block processing of other events
✅ Good:
retry_count < 5
# After max retries, move to DLQ
# Manual intervention for problematic events
"""
}
Related Articles
Summary
The Outbox Pattern provides reliable event publishing:
- Atomicity - Database changes and event creation happen in a single transaction
- Reliability - Events are guaranteed to be published (at-least-once)
- Simplicity - No distributed transactions or complex coordination needed
- Scalability - Outbox relay can scale horizontally
Two main approaches:
- Polling - Simple, works with any database, suitable for low-medium volume
- CDC (Debezium) - More complex but more scalable, real-time streaming
The pattern is essential for building reliable event-driven microservices without sacrificing data consistency.
Comments