The Outbox Pattern solves a fundamental problem in microservices: how to reliably publish events when updating a database, without requiring distributed transactions. It ensures that database changes and event publishing happen atomically.
The Problem
Without Outbox Pattern
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Race Condition Without Outbox Pattern โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โ
โ โ Order Service โ โ Message Queue โ โ
โ โ โ โ โ โ
โ โ 1. UPDATE order โ โ โ โ
โ โ status='paid'โ โ โ โ
โ โ โ โ โ โ
โ โ 2. PUBLISH โ โโโโโถโ OrderPaidEvent โ โ
โ โ event โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ Problems: โ
โ โ If step 2 fails: DB updated but no event โ
โ โ If step 2 crashes: Inconsistent state โ
โ โ If step 2 times out: Unknown if event published โ
โ โ No atomicity between DB and message queue โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
With Outbox Pattern
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Outbox Pattern Solution โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Order Service (Single DB) โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ orders table โ โ โ
โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ
โ โ โ id: 123 โ โ โ
โ โ โ status: 'paid' โ โ โ
โ โ โ ... โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ outbox table (same transaction!) โ โ โ
โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ
โ โ โ id: 1 โ โ โ
โ โ โ aggregate_type: 'Order' โ โ โ
โ โ โ aggregate_id: '123' โ โ โ
โ โ โ event_type: 'OrderPaidEvent' โ โ โ
โ โ โ payload: {...} โ โ โ
โ โ โ created_at: '2026-02-28T10:00:00Z' โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Outbox Relay (Background Worker) โ โ
โ โ โ โ
โ โ 1. SELECT * FROM outbox WHERE processed = false โ โ
โ โ 2. PUBLISH each event to message broker โ โ
โ โ 3. UPDATE outbox SET processed = true โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โ
โ โ Inventory โ โ Notification โ โ
โ โ Service โ โ Service โ โ
โ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ โ Atomic: Both DB update and outbox write in one transaction โ
โ โ Reliable: Events guaranteed to be published โ
โ โ Simple: No distributed transactions needed โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Implementation
Database Schema
-- PostgreSQL outbox table
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ,
retry_count INT DEFAULT 0,
last_error TEXT,
processed BOOLEAN DEFAULT FALSE
);
CREATE INDEX idx_outbox_unprocessed ON outbox(created_at)
WHERE processed = FALSE;
CREATE INDEX idx_outbox_aggregate ON outbox(aggregate_type, aggregate_id);
-- MySQL outbox table
CREATE TABLE outbox (
id CHAR(36) PRIMARY KEY,
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
metadata JSON DEFAULT ('{}'),
created_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
processed_at TIMESTAMP(3),
retry_count INT DEFAULT 0,
last_error TEXT,
processed BOOLEAN DEFAULT FALSE,
INDEX idx_outbox_unprocessed (created_at)
) ENGINE=InnoDB;
Event Publisher Service
import asyncio
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import aio_pika
import asyncpg
logger = logging.getLogger(__name__)
@dataclass
class OutboxEvent:
id: str
aggregate_type: str
aggregate_id: str
event_type: str
payload: dict
metadata: dict
created_at: datetime
class OutboxRepository:
def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool
async def create_event(
self,
aggregate_type: str,
aggregate_id: str,
event_type: str,
payload: dict,
metadata: Optional[dict] = None
) -> OutboxEvent:
async with self.pool.acquire() as conn:
row = await conn.fetchrow("""
INSERT INTO outbox
(aggregate_type, aggregate_id, event_type, payload, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
""", aggregate_type, aggregate_id, event_type,
json.dumps(payload), json.dumps(metadata or {}))
return self._row_to_event(row)
async def get_unprocessed_events(
self,
limit: int = 100
) -> list[OutboxEvent]:
async with self.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT * FROM outbox
WHERE processed = FALSE
AND retry_count < 5
ORDER BY created_at ASC
LIMIT $1
""", limit)
return [self._row_to_event(row) for row in rows]
async def mark_processed(self, event_id: str):
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE outbox
SET processed = TRUE,
processed_at = NOW()
WHERE id = $1
""", event_id)
async def mark_failed(
self,
event_id: str,
error: str,
retry_count: int
):
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE outbox
SET last_error = $2,
retry_count = $3
WHERE id = $1
""", event_id, error, retry_count)
def _row_to_event(self, row) -> OutboxEvent:
return OutboxEvent(
id=str(row["id"]),
aggregate_type=row["aggregate_type"],
aggregate_id=str(row["aggregate_id"]),
event_type=row["event_type"],
payload=json.loads(row["payload"]),
metadata=json.loads(row["metadata"]),
created_at=row["created_at"]
)
class MessageBrokerPublisher:
def __init__(self, rabbitmq_url: str):
self.rabbitmq_url = rabbitmq_url
self.connection: Optional[aio_pika.Connection] = None
self.channel: Optional[aio_pika.Channel] = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.rabbitmq_url)
self.channel = await self.connection.channel()
async def publish(
self,
exchange: str,
routing_key: str,
event: OutboxEvent
):
message = aio_pika.Message(
body=json.dumps({
"event_id": event.id,
"event_type": event.event_type,
"aggregate_type": event.aggregate_type,
"aggregate_id": event.aggregate_id,
"payload": event.payload,
"metadata": event.metadata,
"timestamp": event.created_at.isoformat()
}).encode(),
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
headers={
"event-type": event.event_type,
"aggregate-id": event.aggregate_id
}
)
await self.channel.default_exchange.publish(
message,
routing_key=routing_key
)
async def close(self):
if self.connection:
await self.connection.close()
Outbox Relay Worker
class OutboxRelay:
def __init__(
self,
outbox_repo: OutboxRepository,
publisher: MessageBrokerPublisher,
batch_size: int = 100,
poll_interval: float = 1.0
):
self.outbox = outbox_repo
self.publisher = publisher
self.batch_size = batch_size
self.poll_interval = poll_interval
self._running = False
async def start(self):
self._running = True
logger.info("Outbox relay started")
while self._running:
try:
await self._process_events()
except Exception as e:
logger.error(f"Error processing outbox: {e}")
await asyncio.sleep(self.poll_interval)
async def stop(self):
self._running = False
logger.info("Outbox relay stopped")
async def _process_events(self):
events = await self.outbox.get_unprocessed_events(self.batch_size)
if not events:
return
for event in events:
try:
exchange = self._get_exchange(event.aggregate_type)
routing_key = self._get_routing_key(event.event_type)
await self.publisher.publish(
exchange=exchange,
routing_key=routing_key,
event=event
)
await self.outbox.mark_processed(event.id)
logger.info(f"Published event {event.id}: {event.event_type}")
except Exception as e:
logger.error(f"Failed to publish event {event.id}: {e}")
await self.outbox.mark_failed(
event.id,
str(e),
retry_count=1
)
def _get_exchange(self, aggregate_type: str) -> str:
exchange_map = {
"Order": "orders.events",
"Payment": "payments.events",
"User": "users.events",
"Product": "products.events"
}
return exchange_map.get(aggregate_type, "default.events")
def _get_routing_key(self, event_type: str) -> str:
return event_type.lower().replace("event", "")
class OutboxRelayRunner:
def __init__(self, config: dict):
self.db_pool = asyncpg.create_pool(config["database_url"])
self.publisher = MessageBrokerPublisher(config["rabbitmq_url"])
self.relay = OutboxRelay(
OutboxRepository(self.db_pool),
self.publisher
)
async def run(self):
await self.publisher.connect()
await self.relay.start()
async def shutdown(self):
await self.relay.stop()
await self.publisher.close()
await self.db_pool.close()
Transactional Outbox in Service
class OrderService:
def __init__(self, db_pool: asyncpg.Pool, outbox_repo: OutboxRepository):
self.db = db_pool
self.outbox = outbox_repo
async def create_order(self, order_data: dict) -> Order:
async with self.db.acquire() as conn:
async with conn.transaction():
order = await conn.fetchrow("""
INSERT INTO orders (customer_id, items, total_amount, status)
VALUES ($1, $2, $3, 'pending')
RETURNING *
""", order_data["customer_id"],
json.dumps(order_data["items"]),
order_data["total_amount"])
await self.outbox.create_event(
aggregate_type="Order",
aggregate_id=str(order["id"]),
event_type="OrderCreatedEvent",
payload={
"order_id": str(order["id"]),
"customer_id": order["customer_id"],
"items": order_data["items"],
"total_amount": order_data["total_amount"]
}
)
return Order(**dict(order))
async def mark_order_paid(self, order_id: str, payment_id: str):
async with self.db.acquire() as conn:
async with conn.transaction():
await conn.execute("""
UPDATE orders
SET status = 'paid',
payment_id = $2,
paid_at = NOW()
WHERE id = $1
""", order_id, payment_id)
await self.outbox.create_event(
aggregate_type="Order",
aggregate_id=order_id,
event_type="OrderPaidEvent",
payload={
"order_id": order_id,
"payment_id": payment_id,
"paid_at": datetime.utcnow().isoformat()
}
)
Change Data Capture (CDC) Approach
Debezium Integration
# docker-compose.yml for Debezium
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: orders
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
debezium:
image: debezium/connect:2.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: debezium-group
CONFIG_STORAGE_TOPIC: debezium_configs
OFFSET_STORAGE_TOPIC: debezium_offsets
STATUS_STORAGE_TOPIC: debezium_status
ports:
- "8083:8083"
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
ports:
- "9092:9092"
{
"name": "orders-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orders",
"database.server.name": "orders",
"table.include.list": "public.outbox",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Building Custom CDC
class PollingCDC:
"""Simple CDC using polling of the outbox table."""
def __init__(
self,
db_pool: asyncpg.Pool,
kafka_producer: AIOKafkaProducer,
last_sequence: int = 0
):
self.db = db_pool
self.kafka = kafka_producer
self.last_sequence = last_sequence
async def poll_and_publish(self):
async with self.db.acquire() as conn:
rows = await conn.fetch("""
SELECT * FROM outbox
WHERE id > $1
AND processed = FALSE
ORDER BY id ASC
LIMIT 1000
""", str(self.last_sequence))
for row in rows:
await self._publish_to_kafka(row)
self.last_sequence = row["id"]
return len(rows)
async def _publish_to_kafka(self, row):
await self.kafka.send(
topic=f"{row['aggregate_type'].lower()}.events",
key=row["aggregate_id"].encode(),
value=json.dumps({
"event_type": row["event_type"],
"payload": json.loads(row["payload"]),
"metadata": json.loads(row["metadata"]),
"timestamp": row["created_at"].isoformat()
}).encode()
)
Handling Failures
Idempotent Processing
class IdempotentEventHandler:
def __init__(self, redis: Redis):
self.redis = redis
async def process_event(self, event: OutboxEvent) -> bool:
processed_key = f"event:processed:{event.id}"
already_processed = await self.redis.exists(processed_key)
if already_processed:
logger.info(f"Event {event.id} already processed, skipping")
return True
try:
await self._handle_event(event)
await self.redis.setex(
processed_key,
86400 * 7, # Keep for 7 days
"1"
)
return True
except Exception as e:
logger.error(f"Failed to handle event {event.id}: {e}")
raise
async def _handle_event(self, event: OutboxEvent):
pass # Implement actual handling
Retry with Dead Letter Queue
class OutboxWithDLQ:
def __init__(self, outbox_repo: OutboxRepository, dlq_topic: str):
self.outbox = outbox_repo
self.dlq_topic = dlq_topic
self.max_retries = 5
async def process_event(self, event: OutboxEvent) -> bool:
try:
await self._publish(event)
await self.outbox.mark_processed(event.id)
return True
except Exception as e:
new_retry_count = event.metadata.get("retry_count", 0) + 1
if new_retry_count >= self.max_retries:
await self._send_to_dlq(event, str(e))
await self.outbox.mark_processed(event.id)
logger.error(f"Event {event.id} sent to DLQ after {new_retry_count} retries")
else:
await self.outbox.mark_failed(
event.id,
str(e),
new_retry_count
)
return False
async def _send_to_dlq(self, event: OutboxEvent, error: str):
dlq_message = {
"original_event": {
"id": event.id,
"type": event.event_type,
"aggregate_id": event.aggregate_id,
"payload": event.payload
},
"error": error,
"failed_at": datetime.utcnow().isoformat()
}
await self.kafka.send(
topic=self.dlq_topic,
key=event.aggregate_id.encode(),
value=json.dumps(dlq_message).encode()
)
Best Practices
GOOD_PATTERNS = {
"use_jsonb_postgres": """
# Use JSONB for payload flexibility
โ
Good:
payload JSONB NOT NULL
# Can query by payload fields
# Fast serialization
# Schema evolution support
โ Bad:
payload TEXT NOT NULL
# Must parse on every read
# No query capability
""",
"keep_payload_small": """
# Don't store entire objects
โ
Good:
payload: {"order_id": "123", "status": "paid"}
โ Bad:
payload: {"order": {...entire order object...}}
# Large payload = slow replication
# Data duplication
""",
"index_wisely": """
# Index for polling efficiency
โ
Good:
CREATE INDEX idx_outbox_unprocessed ON outbox(created_at)
WHERE processed = FALSE;
# Fast finding of next events to process
โ Bad:
# No index, full table scan every poll
"""
}
BAD_PATTERNS = {
"publish_directly": """
โ Bad:
async def create_order():
await db.execute("INSERT INTO orders...")
await message_queue.publish(event) # Not atomic!
# If publish fails, data is inconsistent
โ
Good:
async def create_order():
async with transaction():
await db.execute("INSERT INTO orders...")
await db.execute("INSERT INTO outbox...")
# Both succeed or both fail
""",
"no_ordering": """
โ Bad:
# Process events out of order
SELECT * FROM outbox WHERE processed = FALSE
# Can cause wrong state if events have dependencies
โ
Good:
ORDER BY created_at ASC
# Preserve event ordering for consistency
""",
"infinite_retries": """
โ Bad:
# Never give up on failed events
# Can block processing of other events
โ
Good:
retry_count < 5
# After max retries, move to DLQ
# Manual intervention for problematic events
"""
}
Summary
The Outbox Pattern provides reliable event publishing:
- Atomicity - Database changes and event creation happen in a single transaction
- Reliability - Events are guaranteed to be published (at-least-once)
- Simplicity - No distributed transactions or complex coordination needed
- Scalability - Outbox relay can scale horizontally
Two main approaches:
- Polling - Simple, works with any database, suitable for low-medium volume
- CDC (Debezium) - More complex but more scalable, real-time streaming
The pattern is essential for building reliable event-driven microservices without sacrificing data consistency.
Comments