Introduction
Change Data Capture (CDC) is a pattern that identifies and captures changes made to data in a database and delivers those changes in real-time to downstream systems. Instead of polling or batch processing, CDC enables event-driven architectures where every data change triggers immediate processing.
This comprehensive guide covers CDC fundamentals, implementation approaches, popular tools like Debezium, and practical patterns for building real-time data pipelines. You’ll learn how to detect changes, propagate them reliably, and handle common challenges like schema evolution and exactly-once delivery.
CDC Fundamentals
What is Change Data Capture?
CDC solves a fundamental problem: how to detect and propagate database changes in real-time. Traditional approaches like batch exports or table polling have limitationsโ they’re inefficient, create latency, and can miss changes. CDC captures changes as they happen, enabling:
- Real-time Analytics: Fresh data for dashboards and ML models
- Data Replication: Sync databases across environments
- Event Sourcing: Build event-driven architectures
- Microservices Communication: Share data changes between services
- Audit Trails: Complete history of data modifications
CDC works by reading database transaction logs rather than querying tables directly. This approach is efficient, captures all changes ( and doesn’tincluding deletes), impact source database performance.
# CDC Architecture Overview
"""
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Source DB โ โ CDC Connector โ โ Message Queue โ
โ (PostgreSQL, โโโโโถโ (Debezium, โโโโโถโ (Kafka, โ
โ MySQL, etc.) โ โ Oracle CDC) โ โ Pulsar) โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโ
โ Downstream โ
โ (Data Lake, โ
โ Analytics, โ
โ Microservices)โ
โโโโโโโโโโโโโโโโโโโ
"""
CDC Implementation Approaches
There are three main approaches to implementing CDC:
1. Log-Based CDC: Reads the database’s transaction log (WAL for PostgreSQL, binlog for MySQL). This is the most comprehensive approachโcaptures all changes with minimal overhead.
2. Trigger-Based CDC: Uses database triggers to capture changes in shadow tables. Simpler to implement but adds database overhead.
3. Timestamp-Based CDC: Polls tables for rows with recent update timestamps. Simple but can miss changes and creates database load.
# Compare CDC approaches
CDC_APPROACHES = {
"log_based": {
"description": "Read transaction logs directly",
"pros": [
"Captures all changes (inserts, updates, deletes)",
"Minimal source database impact",
"No missed changes",
"Low latency"
],
"cons": [
"Requires log access (may need config changes)",
"More complex to implement",
"Schema changes can be challenging"
],
"databases": ["PostgreSQL", "MySQL", "Oracle", "SQL Server"]
},
"trigger_based": {
"description": "Use database triggers to capture changes",
"pros": [
"Works with any database",
"Simpler implementation",
"No special database config needed"
],
"cons": [
"Adds overhead to every write",
"Can impact transaction performance",
"Trigger management complexity"
],
"databases": ["All relational databases"]
},
"timestamp_based": {
"description": "Poll for recent changes using timestamps",
"pros": [
"Very simple to implement",
"No database changes required",
"Works with any database"
],
"cons": [
"Can miss updates that don't change timestamps",
"Polling overhead",
"Higher latency",
"Can't capture deletes reliably"
],
"databases": ["All databases with timestamp columns"]
}
}
Debezium Implementation
Setting Up Debezium
Debezium is an open-source CDC platform built on Kafka Connect. It provides connectors for various databases and handles the complexity of reading transaction logs.
# docker-compose.yml for Debezium with PostgreSQL
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
postgres:
image: debezium/postgres:15
ports:
- "5432:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: inventory
command: >
postgres
-c wal_level=replica
-c max_wal_senders=10
-c logical_replication=on
debezium:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: debezium-group
CONFIG_STORAGE_TOPIC: debezium_configs
OFFSET_STORAGE_TOPIC: debezium_offsets
STATUS_STORAGE_TOPIC: debezium_status
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
// Debezium PostgreSQL connector configuration
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "inventory",
// Schema evolution
"schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
// Publication and slot
"publication.name": "dbz_publication",
"slot.name": "dbz_slot",
// Snapshot mode
"snapshot.mode": "initial",
// Table filters
"table.include.list": "public.orders,public.customers,public.products",
// Precision handling
"decimal.handling.mode": "double",
"time.precision.mode": "adaptive",
// Heartbeat
"heartbeat.interval.ms": "10000",
"heartbeat.topics.prefix": "heartbeat"
}
}
Debezium with MySQL
// Debezium MySQL connector configuration
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz123",
"database.server.id": "184054",
"database.server.name": "mysql-server",
// Binlog configuration
"database.include.list": "inventory",
"table.include.list": "inventory.orders,inventory.customers",
// Snapshot configuration
"snapshot.mode": "when_needed",
// GTID-based positioning
"gtid.source.filter": "inventory.orders,inventory.customers",
// Schema history
"schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
"schema.history.internal.kafka.topic": "schema-changes.mysql-server",
// Time handling
"time.precision.mode": "adaptive"
}
}
CDC Event Processing
Event Structure
Debezium produces structured events with before/after states:
// Debezium CDC event example
{
"before": null,
"after": {
"id": 1001,
"order_number": "ORD-2026-001",
"customer_id": 42,
"total_amount": 299.99,
"status": "pending",
"created_at": "2026-03-07T10:30:00Z",
"updated_at": "2026-03-07T10:30:00Z"
},
"source": {
"version": "2.4.0.Final",
"connector": "postgresql",
"name": "inventory",
"ts_ms": 1709799000000,
"snapshot": "false",
"db": "inventory",
"table": "orders",
"lsn": 12345678,
"xmin": null
},
"op": "c", // c=create, u=update, d=delete, r=read (snapshot)
"ts_ms": 1709799000123
}
# Process CDC events
from typing import Dict, Any, Callable
import json
class CDCEventProcessor:
"""Process CDC events from Debezium."""
def __init__(self):
self.handlers: Dict[str, Callable] = {}
def register_handler(self, operation: str, handler: Callable):
"""Register a handler for specific operations."""
self.handlers[operation] = handler
def process_event(self, event: Dict[str, Any]):
"""Process a CDC event."""
operation = event.get('op')
# Get before and after states
before = event.get('before')
after = event.get('after')
source = event.get('source', {})
# Route to appropriate handler
handler = self.handlers.get(operation)
if handler:
return handler(before, after, source)
return None
# Example: Process orders events
def handle_order_create(before, after, source):
"""Handle new order creation."""
print(f"New order created: {after['order_number']}")
# Send notification
notify_customer.delay(
customer_id=after['customer_id'],
message=f"Order {after['order_number']} confirmed"
)
# Update inventory
update_inventory.delay(order_id=after['id'])
return {"status": "processed", "action": "created"}
def handle_order_update(before, after, source):
"""Handle order status updates."""
old_status = before['status'] if before else None
new_status = after['status']
if old_status != new_status:
print(f"Order {after['order_number']} status: {old_status} -> {new_status}")
# Handle specific status transitions
if new_status == 'shipped':
send_shipping_notification(after)
elif new_status == 'delivered':
request_feedback(after)
elif new_status == 'cancelled':
process_refund(after)
return {"status": "processed", "action": "updated"}
def handle_order_delete(before, after, source):
"""Handle order deletion."""
if before:
print(f"Order cancelled: {before['order_number']}")
# Log cancellation for audit
audit_log.log(
action='order_deleted',
order_id=before['id'],
original_data=before
)
return {"status": "processed", "action": "deleted"}
# Register handlers
processor = CDCEventProcessor()
processor.register_handler('c', handle_order_create)
processor.register_handler('u', handle_order_update)
processor.register_handler('d', handle_order_delete)
Schema Evolution
Handling schema changes is one of CDC’s biggest challenges:
# Handle schema evolution
class SchemaEvolutionHandler:
"""Handle schema changes in CDC streams."""
def __init__(self):
self.schema_versions: Dict[str, int] = {}
self.transforms: Dict[str, Callable] = {}
def handle_schema_change(self, event: Dict) -> Dict:
"""Process schema change event."""
source = event.get('source', {})
table = source.get('table')
# Get schema version from event
version = source.get('version', 1)
if table not in self.schema_versions:
self.schema_versions[table] = version
return event
old_version = self.schema_versions[table]
if version > old_version:
print(f"Schema evolution detected for {table}: v{old_version} -> v{version}")
self.schema_versions[table] = version
# Apply transformation to migrate data
return self.transform_event(event, old_version, version)
return event
def transform_event(self, event: Dict, from_version: int, to_version: int) -> Dict:
"""Transform event between schema versions."""
# Apply sequential migrations
for v in range(from_version + 1, to_version + 1):
transform = self.transforms.get(f"{event['source']['table']}_v{v}")
if transform:
event = transform(event)
return event
# Example: Add default for new column
def transform_v2(event):
"""Migration from v1 to v2: add default priority."""
if event.get('after'):
if 'priority' not in event['after']:
event['after']['priority'] = 'normal'
return event
def transform_v3(event):
"""Migration from v2 to v3: split name into first/last."""
if event.get('after') and 'customer_name' in event['after']:
name = event['after']['customer_name']
parts = name.split(' ', 1)
event['after']['first_name'] = parts[0]
event['after']['last_name'] = parts[1] if len(parts) > 1 else ''
del event['after']['customer_name']
return event
CDC Patterns and Best Practices
Exactly-Once Processing
Ensuring exactly-once processing requires idempotent consumers:
# Idempotent event processing
import hashlib
import json
from datetime import datetime
class IdempotentProcessor:
"""Process CDC events exactly-once."""
def __init__(self, checkpoint_store):
self.checkpoint_store = checkpoint_store
def generate_event_id(self, event: Dict) -> str:
"""Generate unique ID for deduplication."""
source = event.get('source', {})
# Use LSN/position as unique identifier
return f"{source.get('lsn')}:{source.get('ts_ms')}"
def is_duplicate(self, event_id: str) -> bool:
"""Check if event was already processed."""
return self.checkpoint_store.exists(event_id)
def mark_processed(self, event_id: str):
"""Mark event as processed."""
self.checkpoint_store.set(event_id, datetime.utcnow().isoformat())
def process(self, event: Dict) -> Dict:
"""Process event exactly-once."""
event_id = self.generate_event_id(event)
# Check for duplicate
if self.is_duplicate(event_id):
return {"status": "skipped", "reason": "duplicate"}
# Process event
result = self.do_process(event)
# Mark as processed only after successful processing
if result.get('status') == 'success':
self.mark_processed(event_id)
return result
def do_process(self, event: Dict) -> Dict:
"""Actual event processing logic."""
# Implement your processing logic here
pass
# Using Redis for checkpointing
class RedisCheckpointStore:
def __init__(self, redis_client, ttl_days=7):
self.redis = redis_client
self.ttl = ttl_days * 86400
def exists(self, event_id: str) -> bool:
return self.redis.exists(f"cdc:checkpoint:{event_id}")
def set(self, event_id: str, timestamp: str):
self.redis.setex(f"cdc:checkpoint:{event_id}", self.ttl, timestamp)
Filtering and Routing
Route events to different topics based on content:
# Event routing based on content
class EventRouter:
"""Route CDC events to different destinations."""
def __init__(self):
self.routes = []
def add_route(self, condition: Callable, destination: str):
"""Add a routing rule."""
self.routes.append((condition, destination))
def route(self, event: Dict) -> list:
"""Determine destinations for an event."""
destinations = []
for condition, destination in self.routes:
if condition(event):
destinations.append(destination)
# Default route if no rules match
if not destinations:
destinations.append('default')
return destinations
# Define routing rules
router = EventRouter()
# Route high-value orders to priority topic
router.add_route(
lambda e: e.get('after', {}).get('total_amount', 0) > 1000,
'orders-high-value'
)
# Route orders with issues to dead letter queue
router.add_route(
lambda e: e.get('after', {}).get('status') == 'cancelled',
'orders-cancelled'
)
# Route events by table
router.add_route(
lambda e: e.get('source', {}).get('table') == 'products',
'products-changes'
)
# Example usage
def handle_event(event):
destinations = router.route(event)
for dest in destinations:
publish_to_topic(dest, event)
Performance Optimization
# Optimize CDC processing performance
class OptimizedProcessor:
"""High-performance CDC event processor."""
def __init__(self, batch_size=100, flush_interval=5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = []
self.last_flush = datetime.utcnow()
async def process_async(self, events: list):
"""Batch process events for efficiency."""
# Group events by table for batch operations
by_table = {}
for event in events:
table = event.get('source', {}).get('table', 'unknown')
if table not in by_table:
by_table[table] = []
by_table[table].append(event)
# Process each table's events as a batch
results = []
for table, table_events in by_table.items():
batch_result = await self.process_table_batch(table, table_events)
results.extend(batch_result)
return results
async def process_table_batch(self, table: str, events: list):
"""Process batch of events for a single table."""
# Extract IDs for bulk operations
ids = [e.get('after', {}).get('id') or e.get('before', {}).get('id')
for e in events]
# Bulk fetch current state for comparison
current_states = await self.bulk_fetch_current_states(table, ids)
# Process with current state context
results = []
for event in events:
result = self.process_with_context(event, current_states)
results.append(result)
return results
def should_flush(self) -> bool:
"""Check if buffer should be flushed."""
now = datetime.utcnow()
return (
len(self.buffer) >= self.batch_size or
(now - self.last_flush).total_seconds() >= self.flush_interval
)
CDC with Kafka
Kafka Connect Configuration
# connect-distributed.properties
bootstrap.servers=kafka:29092
# Internal topics
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
# Replication factors (1 for dev, 3+ for prod)
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
# Serializers
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Transformations
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
# Performance
connector.client.config.override.policy=All
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
Sink Connectors
// Kafka to Elasticsearch sink connector
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "inventory.orders",
"connection.url": "http://elasticsearch:9200",
"type.name": "orders",
"key.ignore": "false",
"schema.ignore": "true",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.[^.]+",
"transforms.route.replacement": "$1",
"write.method": "upsert",
"connection.timeout.ms": "10000",
"batch.size": "100",
"max.batch.size": "500"
}
}
// Kafka to JDBC sink connector
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "inventory.orders,inventory.customers",
"connection.url": "jdbc:postgresql://target-db:5432/warehouse",
"connection.user": "etl",
"connection.password": "etl123",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"auto.create": "true",
"auto.evolve": "true",
"fields.whitelist": "id,order_number,customer_id,total_amount,status,created_at,updated_at",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
Monitoring and Troubleshooting
CDC Monitoring
# Monitor CDC pipeline health
import requests
from datetime import datetime, timedelta
class CDCMonitor:
"""Monitor Debezium CDC connectors."""
def __init__(self, connect_url: str):
self.connect_url = connect_url
def get_connector_status(self, connector: str) -> Dict:
"""Get connector status."""
response = requests.get(f"{self.connect_url}/connectors/{connector}/status")
return response.json()
def check_lag(self, connector: str) -> Dict:
"""Check consumer lag."""
status = self.get_connector_status(connector)
connector_state = status.get('connector', {})
tasks = status.get('tasks', [])
# Get latest offset from source
# Compare with committed offset in Kafka
lag_info = {
'connector_state': connector_state.get('state'),
'task_states': [t.get('state') for t in tasks],
'lag_seconds': self._calculate_lag(tasks)
}
return lag_info
def _calculate_lag(self, tasks: list) -> int:
"""Calculate lag in seconds."""
# Implementation depends on your monitoring setup
# Compare source LSN with consumer offset
return 0
def check_for_errors(self, connector: str) -> list:
"""Check for connector errors."""
status = self.get_connector_status(connector)
errors = []
# Check connector state
if status.get('connector', {}).get('state') == 'FAILED':
errors.append({
'type': 'connector',
'error': status['connector'].get('trace', 'Unknown error')
})
# Check task states
for i, task in enumerate(status.get('tasks', [])):
if task.get('state') == 'FAILED':
errors.append({
'type': 'task',
'task_id': i,
'error': task.get('trace', 'Unknown error')
})
return errors
def alert_on_issues(self, connector: str):
"""Check and alert on issues."""
errors = self.check_for_errors(connector)
if errors:
for error in errors:
print(f"ALERT: {error['type']} - {error.get('error', 'Unknown')}")
return False
lag = self.check_lag(connector)
if lag.get('lag_seconds', 0) > 300: # 5 minutes
print(f"ALERT: High lag detected: {lag['lag_seconds']} seconds")
return False
return True
Conclusion
CDC enables powerful real-time data integration patterns. Key takeaways:
- Choose log-based CDC for production systemsโit captures all changes with minimal overhead
- Debezium provides robust, battle-tested CDC connectors for major databases
- Handle schema evolution carefullyโplan for changes from the start
- Ensure idempotency in consumers for exactly-once processing
- Monitor the pipeline to catch issues before they become problems
CDC is foundational for modern data architectures, enabling real-time analytics, event-driven microservices, and data replication at scale.
Resources
- Debezium Documentation
- Kafka Connect Documentation
- CDC Best Practices - IBM
- Change Data Capture Patterns
Comments