Skip to main content
โšก Calmops

Change Data Capture (CDC) Complete Guide

Introduction

Change Data Capture (CDC) is a pattern that identifies and captures changes made to data in a database and delivers those changes in real-time to downstream systems. Instead of polling or batch processing, CDC enables event-driven architectures where every data change triggers immediate processing.

This comprehensive guide covers CDC fundamentals, implementation approaches, popular tools like Debezium, and practical patterns for building real-time data pipelines. You’ll learn how to detect changes, propagate them reliably, and handle common challenges like schema evolution and exactly-once delivery.

CDC Fundamentals

What is Change Data Capture?

CDC solves a fundamental problem: how to detect and propagate database changes in real-time. Traditional approaches like batch exports or table polling have limitationsโ€” they’re inefficient, create latency, and can miss changes. CDC captures changes as they happen, enabling:

  • Real-time Analytics: Fresh data for dashboards and ML models
  • Data Replication: Sync databases across environments
  • Event Sourcing: Build event-driven architectures
  • Microservices Communication: Share data changes between services
  • Audit Trails: Complete history of data modifications

CDC works by reading database transaction logs rather than querying tables directly. This approach is efficient, captures all changes ( and doesn’tincluding deletes), impact source database performance.

# CDC Architecture Overview
"""
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Source DB     โ”‚    โ”‚  CDC Connector  โ”‚    โ”‚  Message Queue  โ”‚
โ”‚  (PostgreSQL,  โ”‚โ”€โ”€โ”€โ–ถโ”‚  (Debezium,     โ”‚โ”€โ”€โ”€โ–ถโ”‚  (Kafka,        โ”‚
โ”‚   MySQL, etc.) โ”‚    โ”‚   Oracle CDC)   โ”‚    โ”‚   Pulsar)      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                                         โ”‚
                                                         โ–ผ
                                                โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                                                โ”‚  Downstream    โ”‚
                                                โ”‚  (Data Lake,   โ”‚
                                                โ”‚   Analytics,   โ”‚
                                                โ”‚   Microservices)โ”‚
                                                โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
"""

CDC Implementation Approaches

There are three main approaches to implementing CDC:

1. Log-Based CDC: Reads the database’s transaction log (WAL for PostgreSQL, binlog for MySQL). This is the most comprehensive approachโ€”captures all changes with minimal overhead.

2. Trigger-Based CDC: Uses database triggers to capture changes in shadow tables. Simpler to implement but adds database overhead.

3. Timestamp-Based CDC: Polls tables for rows with recent update timestamps. Simple but can miss changes and creates database load.

# Compare CDC approaches
CDC_APPROACHES = {
    "log_based": {
        "description": "Read transaction logs directly",
        "pros": [
            "Captures all changes (inserts, updates, deletes)",
            "Minimal source database impact",
            "No missed changes",
            "Low latency"
        ],
        "cons": [
            "Requires log access (may need config changes)",
            "More complex to implement",
            "Schema changes can be challenging"
        ],
        "databases": ["PostgreSQL", "MySQL", "Oracle", "SQL Server"]
    },
    "trigger_based": {
        "description": "Use database triggers to capture changes",
        "pros": [
            "Works with any database",
            "Simpler implementation",
            "No special database config needed"
        ],
        "cons": [
            "Adds overhead to every write",
            "Can impact transaction performance",
            "Trigger management complexity"
        ],
        "databases": ["All relational databases"]
    },
    "timestamp_based": {
        "description": "Poll for recent changes using timestamps",
        "pros": [
            "Very simple to implement",
            "No database changes required",
            "Works with any database"
        ],
        "cons": [
            "Can miss updates that don't change timestamps",
            "Polling overhead",
            "Higher latency",
            "Can't capture deletes reliably"
        ],
        "databases": ["All databases with timestamp columns"]
    }
}

Debezium Implementation

Setting Up Debezium

Debezium is an open-source CDC platform built on Kafka Connect. It provides connectors for various databases and handles the complexity of reading transaction logs.

# docker-compose.yml for Debezium with PostgreSQL
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  postgres:
    image: debezium/postgres:15
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: inventory
    command: >
      postgres
      -c wal_level=replica
      -c max_wal_senders=10
      -c logical_replication=on

  debezium:
    image: debezium/connect:2.4
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: debezium-group
      CONFIG_STORAGE_TOPIC: debezium_configs
      OFFSET_STORAGE_TOPIC: debezium_offsets
      STATUS_STORAGE_TOPIC: debezium_status
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1
// Debezium PostgreSQL connector configuration
{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "database.server.name": "inventory",
    
    // Schema evolution
    "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    
    // Publication and slot
    "publication.name": "dbz_publication",
    "slot.name": "dbz_slot",
    
    // Snapshot mode
    "snapshot.mode": "initial",
    
    // Table filters
    "table.include.list": "public.orders,public.customers,public.products",
    
    // Precision handling
    "decimal.handling.mode": "double",
    "time.precision.mode": "adaptive",
    
    // Heartbeat
    "heartbeat.interval.ms": "10000",
    "heartbeat.topics.prefix": "heartbeat"
  }
}

Debezium with MySQL

// Debezium MySQL connector configuration
{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz123",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    
    // Binlog configuration
    "database.include.list": "inventory",
    "table.include.list": "inventory.orders,inventory.customers",
    
    // Snapshot configuration  
    "snapshot.mode": "when_needed",
    
    // GTID-based positioning
    "gtid.source.filter": "inventory.orders,inventory.customers",
    
    // Schema history
    "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
    "schema.history.internal.kafka.topic": "schema-changes.mysql-server",
    
    // Time handling
    "time.precision.mode": "adaptive"
  }
}

CDC Event Processing

Event Structure

Debezium produces structured events with before/after states:

// Debezium CDC event example
{
  "before": null,
  "after": {
    "id": 1001,
    "order_number": "ORD-2026-001",
    "customer_id": 42,
    "total_amount": 299.99,
    "status": "pending",
    "created_at": "2026-03-07T10:30:00Z",
    "updated_at": "2026-03-07T10:30:00Z"
  },
  "source": {
    "version": "2.4.0.Final",
    "connector": "postgresql",
    "name": "inventory",
    "ts_ms": 1709799000000,
    "snapshot": "false",
    "db": "inventory",
    "table": "orders",
    "lsn": 12345678,
    "xmin": null
  },
  "op": "c",  // c=create, u=update, d=delete, r=read (snapshot)
  "ts_ms": 1709799000123
}
# Process CDC events
from typing import Dict, Any, Callable
import json

class CDCEventProcessor:
    """Process CDC events from Debezium."""
    
    def __init__(self):
        self.handlers: Dict[str, Callable] = {}
    
    def register_handler(self, operation: str, handler: Callable):
        """Register a handler for specific operations."""
        self.handlers[operation] = handler
    
    def process_event(self, event: Dict[str, Any]):
        """Process a CDC event."""
        operation = event.get('op')
        
        # Get before and after states
        before = event.get('before')
        after = event.get('after')
        source = event.get('source', {})
        
        # Route to appropriate handler
        handler = self.handlers.get(operation)
        if handler:
            return handler(before, after, source)
        
        return None

# Example: Process orders events
def handle_order_create(before, after, source):
    """Handle new order creation."""
    print(f"New order created: {after['order_number']}")
    
    # Send notification
    notify_customer.delay(
        customer_id=after['customer_id'],
        message=f"Order {after['order_number']} confirmed"
    )
    
    # Update inventory
    update_inventory.delay(order_id=after['id'])
    
    return {"status": "processed", "action": "created"}

def handle_order_update(before, after, source):
    """Handle order status updates."""
    old_status = before['status'] if before else None
    new_status = after['status']
    
    if old_status != new_status:
        print(f"Order {after['order_number']} status: {old_status} -> {new_status}")
        
        # Handle specific status transitions
        if new_status == 'shipped':
            send_shipping_notification(after)
        elif new_status == 'delivered':
            request_feedback(after)
        elif new_status == 'cancelled':
            process_refund(after)
    
    return {"status": "processed", "action": "updated"}

def handle_order_delete(before, after, source):
    """Handle order deletion."""
    if before:
        print(f"Order cancelled: {before['order_number']}")
        # Log cancellation for audit
        audit_log.log(
            action='order_deleted',
            order_id=before['id'],
            original_data=before
        )
    
    return {"status": "processed", "action": "deleted"}

# Register handlers
processor = CDCEventProcessor()
processor.register_handler('c', handle_order_create)
processor.register_handler('u', handle_order_update)
processor.register_handler('d', handle_order_delete)

Schema Evolution

Handling schema changes is one of CDC’s biggest challenges:

# Handle schema evolution
class SchemaEvolutionHandler:
    """Handle schema changes in CDC streams."""
    
    def __init__(self):
        self.schema_versions: Dict[str, int] = {}
        self.transforms: Dict[str, Callable] = {}
    
    def handle_schema_change(self, event: Dict) -> Dict:
        """Process schema change event."""
        source = event.get('source', {})
        table = source.get('table')
        
        # Get schema version from event
        version = source.get('version', 1)
        
        if table not in self.schema_versions:
            self.schema_versions[table] = version
            return event
        
        old_version = self.schema_versions[table]
        
        if version > old_version:
            print(f"Schema evolution detected for {table}: v{old_version} -> v{version}")
            self.schema_versions[table] = version
            
            # Apply transformation to migrate data
            return self.transform_event(event, old_version, version)
        
        return event
    
    def transform_event(self, event: Dict, from_version: int, to_version: int) -> Dict:
        """Transform event between schema versions."""
        # Apply sequential migrations
        for v in range(from_version + 1, to_version + 1):
            transform = self.transforms.get(f"{event['source']['table']}_v{v}")
            if transform:
                event = transform(event)
        
        return event

# Example: Add default for new column
def transform_v2(event):
    """Migration from v1 to v2: add default priority."""
    if event.get('after'):
        if 'priority' not in event['after']:
            event['after']['priority'] = 'normal'
    return event

def transform_v3(event):
    """Migration from v2 to v3: split name into first/last."""
    if event.get('after') and 'customer_name' in event['after']:
        name = event['after']['customer_name']
        parts = name.split(' ', 1)
        event['after']['first_name'] = parts[0]
        event['after']['last_name'] = parts[1] if len(parts) > 1 else ''
        del event['after']['customer_name']
    return event

CDC Patterns and Best Practices

Exactly-Once Processing

Ensuring exactly-once processing requires idempotent consumers:

# Idempotent event processing
import hashlib
import json
from datetime import datetime

class IdempotentProcessor:
    """Process CDC events exactly-once."""
    
    def __init__(self, checkpoint_store):
        self.checkpoint_store = checkpoint_store
    
    def generate_event_id(self, event: Dict) -> str:
        """Generate unique ID for deduplication."""
        source = event.get('source', {})
        # Use LSN/position as unique identifier
        return f"{source.get('lsn')}:{source.get('ts_ms')}"
    
    def is_duplicate(self, event_id: str) -> bool:
        """Check if event was already processed."""
        return self.checkpoint_store.exists(event_id)
    
    def mark_processed(self, event_id: str):
        """Mark event as processed."""
        self.checkpoint_store.set(event_id, datetime.utcnow().isoformat())
    
    def process(self, event: Dict) -> Dict:
        """Process event exactly-once."""
        event_id = self.generate_event_id(event)
        
        # Check for duplicate
        if self.is_duplicate(event_id):
            return {"status": "skipped", "reason": "duplicate"}
        
        # Process event
        result = self.do_process(event)
        
        # Mark as processed only after successful processing
        if result.get('status') == 'success':
            self.mark_processed(event_id)
        
        return result
    
    def do_process(self, event: Dict) -> Dict:
        """Actual event processing logic."""
        # Implement your processing logic here
        pass

# Using Redis for checkpointing
class RedisCheckpointStore:
    def __init__(self, redis_client, ttl_days=7):
        self.redis = redis_client
        self.ttl = ttl_days * 86400
    
    def exists(self, event_id: str) -> bool:
        return self.redis.exists(f"cdc:checkpoint:{event_id}")
    
    def set(self, event_id: str, timestamp: str):
        self.redis.setex(f"cdc:checkpoint:{event_id}", self.ttl, timestamp)

Filtering and Routing

Route events to different topics based on content:

# Event routing based on content
class EventRouter:
    """Route CDC events to different destinations."""
    
    def __init__(self):
        self.routes = []
    
    def add_route(self, condition: Callable, destination: str):
        """Add a routing rule."""
        self.routes.append((condition, destination))
    
    def route(self, event: Dict) -> list:
        """Determine destinations for an event."""
        destinations = []
        
        for condition, destination in self.routes:
            if condition(event):
                destinations.append(destination)
        
        # Default route if no rules match
        if not destinations:
            destinations.append('default')
        
        return destinations

# Define routing rules
router = EventRouter()

# Route high-value orders to priority topic
router.add_route(
    lambda e: e.get('after', {}).get('total_amount', 0) > 1000,
    'orders-high-value'
)

# Route orders with issues to dead letter queue
router.add_route(
    lambda e: e.get('after', {}).get('status') == 'cancelled',
    'orders-cancelled'
)

# Route events by table
router.add_route(
    lambda e: e.get('source', {}).get('table') == 'products',
    'products-changes'
)

# Example usage
def handle_event(event):
    destinations = router.route(event)
    
    for dest in destinations:
        publish_to_topic(dest, event)

Performance Optimization

# Optimize CDC processing performance
class OptimizedProcessor:
    """High-performance CDC event processor."""
    
    def __init__(self, batch_size=100, flush_interval=5):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer = []
        self.last_flush = datetime.utcnow()
    
    async def process_async(self, events: list):
        """Batch process events for efficiency."""
        # Group events by table for batch operations
        by_table = {}
        for event in events:
            table = event.get('source', {}).get('table', 'unknown')
            if table not in by_table:
                by_table[table] = []
            by_table[table].append(event)
        
        # Process each table's events as a batch
        results = []
        for table, table_events in by_table.items():
            batch_result = await self.process_table_batch(table, table_events)
            results.extend(batch_result)
        
        return results
    
    async def process_table_batch(self, table: str, events: list):
        """Process batch of events for a single table."""
        # Extract IDs for bulk operations
        ids = [e.get('after', {}).get('id') or e.get('before', {}).get('id') 
               for e in events]
        
        # Bulk fetch current state for comparison
        current_states = await self.bulk_fetch_current_states(table, ids)
        
        # Process with current state context
        results = []
        for event in events:
            result = self.process_with_context(event, current_states)
            results.append(result)
        
        return results
    
    def should_flush(self) -> bool:
        """Check if buffer should be flushed."""
        now = datetime.utcnow()
        return (
            len(self.buffer) >= self.batch_size or
            (now - self.last_flush).total_seconds() >= self.flush_interval
        )

CDC with Kafka

Kafka Connect Configuration

# connect-distributed.properties
bootstrap.servers=kafka:29092

# Internal topics
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

# Replication factors (1 for dev, 3+ for prod)
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

# Serializers
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# Transformations
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite

# Performance
connector.client.config.override.policy=All
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000

Sink Connectors

// Kafka to Elasticsearch sink connector
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "inventory.orders",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "orders",
    "key.ignore": "false",
    "schema.ignore": "true",
    
    "transforms": "unwrap,route",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.[^.]+",
    "transforms.route.replacement": "$1",
    
    "write.method": "upsert",
    "connection.timeout.ms": "10000",
    "batch.size": "100",
    "max.batch.size": "500"
  }
}
// Kafka to JDBC sink connector
{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "inventory.orders,inventory.customers",
    "connection.url": "jdbc:postgresql://target-db:5432/warehouse",
    "connection.user": "etl",
    "connection.password": "etl123",
    
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",
    
    "auto.create": "true",
    "auto.evolve": "true",
    
    "fields.whitelist": "id,order_number,customer_id,total_amount,status,created_at,updated_at",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

Monitoring and Troubleshooting

CDC Monitoring

# Monitor CDC pipeline health
import requests
from datetime import datetime, timedelta

class CDCMonitor:
    """Monitor Debezium CDC connectors."""
    
    def __init__(self, connect_url: str):
        self.connect_url = connect_url
    
    def get_connector_status(self, connector: str) -> Dict:
        """Get connector status."""
        response = requests.get(f"{self.connect_url}/connectors/{connector}/status")
        return response.json()
    
    def check_lag(self, connector: str) -> Dict:
        """Check consumer lag."""
        status = self.get_connector_status(connector)
        
        connector_state = status.get('connector', {})
        tasks = status.get('tasks', [])
        
        # Get latest offset from source
        # Compare with committed offset in Kafka
        lag_info = {
            'connector_state': connector_state.get('state'),
            'task_states': [t.get('state') for t in tasks],
            'lag_seconds': self._calculate_lag(tasks)
        }
        
        return lag_info
    
    def _calculate_lag(self, tasks: list) -> int:
        """Calculate lag in seconds."""
        # Implementation depends on your monitoring setup
        # Compare source LSN with consumer offset
        return 0
    
    def check_for_errors(self, connector: str) -> list:
        """Check for connector errors."""
        status = self.get_connector_status(connector)
        
        errors = []
        
        # Check connector state
        if status.get('connector', {}).get('state') == 'FAILED':
            errors.append({
                'type': 'connector',
                'error': status['connector'].get('trace', 'Unknown error')
            })
        
        # Check task states
        for i, task in enumerate(status.get('tasks', [])):
            if task.get('state') == 'FAILED':
                errors.append({
                    'type': 'task',
                    'task_id': i,
                    'error': task.get('trace', 'Unknown error')
                })
        
        return errors
    
    def alert_on_issues(self, connector: str):
        """Check and alert on issues."""
        errors = self.check_for_errors(connector)
        
        if errors:
            for error in errors:
                print(f"ALERT: {error['type']} - {error.get('error', 'Unknown')}")
            return False
        
        lag = self.check_lag(connector)
        if lag.get('lag_seconds', 0) > 300:  # 5 minutes
            print(f"ALERT: High lag detected: {lag['lag_seconds']} seconds")
            return False
        
        return True

Conclusion

CDC enables powerful real-time data integration patterns. Key takeaways:

  • Choose log-based CDC for production systemsโ€”it captures all changes with minimal overhead
  • Debezium provides robust, battle-tested CDC connectors for major databases
  • Handle schema evolution carefullyโ€”plan for changes from the start
  • Ensure idempotency in consumers for exactly-once processing
  • Monitor the pipeline to catch issues before they become problems

CDC is foundational for modern data architectures, enabling real-time analytics, event-driven microservices, and data replication at scale.

Resources

Comments