Skip to main content
โšก Calmops

Real-Time Event Processing with Apache Kafka

Introduction

Apache Kafka has evolved from a simple message queue to the backbone of modern event-driven architectures. In 2026, Kafka powers real-time data pipelines at most major enterprises, handling billions of events per day. Understanding Kafkaโ€”its architecture, patterns, and best practicesโ€”is essential for any software engineer building modern, event-driven systems.

This comprehensive guide explores Apache Kafka from fundamentals to advanced patterns. We cover core concepts, producer and consumer design, stream processing with Kafka Streams, schema management, security, operations, and real-world use cases. By the end, you’ll have the knowledge to design and implement robust, scalable event streaming systems.

Kafka Architecture Fundamentals

Kafka’s architecture is deceptively simple yet incredibly powerful. At its core, Kafka is a distributed, partitioned, replicated commit log that provides durability, ordering, and throughput guarantees difficult to match with traditional messaging systems.

Core Concepts

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                       Kafka Architecture                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                      โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚                    Kafka Cluster                             โ”‚    โ”‚
โ”‚  โ”‚                                                              โ”‚    โ”‚
โ”‚  โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                 โ”‚    โ”‚
โ”‚  โ”‚   โ”‚ Broker 1โ”‚   โ”‚ Broker 2โ”‚   โ”‚ Broker 3โ”‚   ...           โ”‚    โ”‚
โ”‚  โ”‚   โ”‚         โ”‚   โ”‚         โ”‚   โ”‚         โ”‚                 โ”‚    โ”‚
โ”‚  โ”‚   โ”‚ Partitionโ”‚  โ”‚ Partitionโ”‚  โ”‚ Partitionโ”‚                 โ”‚    โ”‚
โ”‚  โ”‚   โ”‚    0    โ”‚   โ”‚    1    โ”‚   โ”‚    2    โ”‚                 โ”‚    โ”‚
โ”‚  โ”‚   โ”‚    1    โ”‚   โ”‚    2    โ”‚   โ”‚    0    โ”‚                 โ”‚    โ”‚
โ”‚  โ”‚   โ”‚    2    โ”‚   โ”‚    0    โ”‚   โ”‚    1    โ”‚                 โ”‚    โ”‚
โ”‚  โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                 โ”‚    โ”‚
โ”‚  โ”‚                                                              โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                              โ–ฒ                                       โ”‚
โ”‚                              โ”‚                                       โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚
โ”‚  โ”‚                    ZooKeeper / KRaft                       โ”‚     โ”‚
โ”‚  โ”‚              (Cluster Metadata Management)                 โ”‚     โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ”‚
โ”‚                                                                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Key Components:

  • Broker: A Kafka server that stores topics and serves client requests
  • Topic: A named stream of events, partitioned for scalability
  • Partition: Ordered, immutable sequence of events within a topic
  • Producer: Client that publishes events to topics
  • Consumer: Client that subscribes to topics and processes events
  • Consumer Group: Set of consumers that cooperatively consume a topic
  • Replica: Copies of partitions for fault tolerance

Topic and Partition Design

from kafka import KafkaAdminClient
from kafka.admin import NewTopic, ConfigResource, ConfigResourceType

class KafkaTopicManager:
    def __init__(self, bootstrap_servers: list):
        self.admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    
    def create_topic(
        self,
        name: str,
        num_partitions: int = 3,
        replication_factor: int = 3,
        configs: dict = None
    ) -> bool:
        topic = NewTopic(
            name=name,
            num_partitions=num_partitions,
            replication_factor=replication_factor,
            topic_configs=configs or {}
        )
        
        try:
            self.admin.create_topics([topic])
            return True
        except Exception as e:
            print(f"Error creating topic: {e}")
            return False
    
    def create_topic_with_retention(
        self,
        name: str,
        retention_bytes: int = -1,
        retention_ms: int = 604800000
    ) -> bool:
        configs = {
            'retention.bytes': str(retention_bytes),
            'retention.ms': str(retention_ms)
        }
        return self.create_topic(name, configs=configs)
    
    def create_compacted_topic(self, name: str) -> bool:
        configs = {
            'cleanup.policy': 'compact',
            'min.cleanable.dirty.ratio': '0.5',
            'min.compaction.lag.ms': '0'
        }
        return self.create_topic(name, configs=configs)
    
    def list_topics(self) -> list:
        return self.admin.list_topics()
    
    def describe_topic(self, name: str) -> dict:
        cluster_metadata = self.admin.describe_configs(
            config_resources=[
                ConfigResource(ConfigResourceType.TOPIC, name)
            ]
        )
        
        configs = {}
        for resource in cluster_metadata.values():
            for config_name, config_value in resource.items():
                configs[config_name] = config_value.value
        
        return configs
    
    def delete_topic(self, name: str) -> bool:
        try:
            self.admin.delete_topics([name])
            return True
        except Exception as e:
            print(f"Error deleting topic: {e}")
            return False

Replication and Fault Tolerance

class KafkaReplicationManager:
    def __init__(self, admin_client: KafkaAdminClient):
        self.admin = admin_client
    
    def get_replica_assignment(self, topic: str) -> dict:
        cluster_metadata = self.admin.describe_topics([topic])
        
        partitions = {}
        for tp, meta in cluster_metadata.items():
            partitions[tp.partition] = {
                'leader': meta.leader,
                'isr': meta.isr_nodes,
                'replicas': meta.replica_nodes
            }
        
        return partitions
    
    def check_replication_health(self, topic: str) -> dict:
        assignment = self.get_replica_assignment(topic)
        
        under_replicated = 0
        offline_replicas = 0
        
        for partition, info in assignment.items():
            if len(info['isr']) < len(info['replicas']):
                under_replicated += 1
            if info['leader'] == -1:
                offline_replicas += 1
        
        return {
            'topic': topic,
            'total_partitions': len(assignment),
            'under_replicated': under_replicated,
            'offline_replicas': offline_replicated,
            'healthy': under_replicated == 0 and offline_replicas == 0
        }
    
    def increase_replication_factor(self, topic: str, new_replication_factor: int):
        current_assignment = self.get_replica_assignment(topic)
        
        new_assignment = {}
        for partition, info in current_assignment.items():
            replicas = list(info['replicas'])
            while len(replicas) < new_replication_factor:
                next_broker = (max(replicas) + 1) % 10
                if next_broker not in replicas:
                    replicas.append(next_broker)
            new_assignment[partition] = replicas
        
        return self.admin.alter_partition_reassignments(
            topic,
            {p: {'replicas': r} for p, r in new_assignment.items()}
        )

Producer Implementation

Kafka producers handle event serialization, partitioning, batching, and reliability. Getting the producer right is crucial for system stability and performance.

Basic Producer

from kafka import KafkaProducer
from kafka.errors import KafkaError, ProducerFenced
import json
import time

class ReliableKafkaProducer:
    def __init__(
        self,
        bootstrap_servers: list,
        acks: str = 'all',
        retries: int = 3,
        max_in_flight_requests_per_connection: int = 5,
        enable_idempotence: bool = True
    ):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            acks=acks,
            retries=retries,
            max_in_flight_requests_per_connection=(
                max_in_flight_requests_per_connection 
                if not enable_idempotence else 5
            ),
            enable_idempotence=enable_idempotence,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            compression_type='lz4',
            batch_size=16384,
            linger_ms=10,
            buffer_memory=33554432,
            max_block_ms=60000,
            request_timeout_ms=30000
        )
    
    def send(
        self,
        topic: str,
        value: dict,
        key: str = None,
        partition: int = None,
        timestamp_ms: int = None
    ) -> str:
        future = self.producer.send(
            topic=topic,
            value=value,
            key=key,
            partition=partition,
            timestamp_ms=timestamp_ms
        )
        
        try:
            record_metadata = future.get(timeout=10)
            return f"{record_metadata.topic}-{record_metadata.partition}-{record_metadata.offset}"
        except KafkaError as e:
            raise e
    
    def send_async(self, topic: str, value: dict, key: str = None):
        def on_success(record_metadata):
            print(f"Success: {record_metadata.topic}-{record_metadata.partition}")
        
        def on_error(exc):
            print(f"Error: {exc}")
        
        future = self.producer.send(topic, value=value, key=key)
        future.add_callback(on_success)
        future.add_errback(on_error)
    
    def flush(self):
        self.producer.flush()
    
    def close(self):
        self.producer.close()

Custom Partitioner

from kafka.producer.partitioner import Partitioner
from kafka.errors import NotEnoughReplicas

class CustomPartitioner(Partitioner):
    def __init__(self, configs: dict = None):
        self.partition_count = None
    
    def partition(self, key: bytes, all_partitions: int, available: int) -> int:
        if not key:
            return self.round_robin_partitioner(key, all_partitions, available)
        
        key_str = key.decode('utf-8')
        
        if key_str.startswith('user_'):
            user_id = int(key_str.split('_')[1])
            return user_id % all_partitions
        
        if key_str.startswith('order_'):
            order_id = int(key_str.split('_')[1])
            return order_id % all_partitions
        
        return abs(hash(key)) % all_partitions
    
    def round_robin_partitioner(self, key: bytes, all_partitions: int, available: int) -> int:
        if available:
            return available[hash(key) % len(available)]
        return all_partitions[hash(key) % all_partitions]

class KeyBasedPartitioner:
    @staticmethod
    def consistent_partition(key: bytes, num_partitions: int) -> int:
        if not key:
            raise ValueError("Key is required for consistent partitioning")
        
        return abs(hash(key)) % num_partitions
    
    @staticmethod
    def composite_key_partition(key: bytes, num_partitions: int) -> int:
        key_str = key.decode('utf-8')
        
        if ':' in key_str:
            primary_key = key_str.split(':')[0]
            return abs(hash(primary_key)) % num_partitions
        
        return abs(hash(key)) % num_partitions

Batch Producer with Retries

import threading
from collections import deque
from typing import Callable

class BatchKafkaProducer:
    def __init__(
        self,
        bootstrap_servers: list,
        batch_size: int = 100,
        flush_interval_seconds: int = 5,
        max_retries: int = 3
    ):
        self.bootstrap_servers = bootstrap_servers
        self.batch_size = batch_size
        self.flush_interval_seconds = flush_interval_seconds
        self.max_retries = max_retries
        
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            batch_size=batch_size * 1024,
            linger_ms=1000,
            compression_type='zstd'
        )
        
        self.pending = deque()
        self.lock = threading.Lock()
        
        self._start_flush_timer()
    
    def _start_flush_timer(self):
        def flush_periodically():
            while True:
                time.sleep(self.flush_interval_seconds)
                self.flush()
        
        thread = threading.Thread(target=flush_periodically, daemon=True)
        thread.start()
    
    def send(self, topic: str, value: dict, key: str = None):
        with self.lock:
            future = self.producer.send(topic, value=value, key=key)
            self.pending.append({
                'future': future,
                'topic': topic,
                'value': value,
                'retry_count': 0
            })
    
    def flush(self):
        with self.lock:
            while self.pending:
                item = self.pending[0]
                try:
                    item['future'].get(timeout=1)
                    self.pending.popleft()
                except Exception as e:
                    if item['retry_count'] < self.max_retries:
                        item['retry_count'] += 1
                        item['future'] = self.producer.send(
                            item['topic'],
                            value=item['value']
                        )
                    else:
                        print(f"Failed after {self.max_retries} retries: {e}")
                        self.pending.popleft()
    
    def close(self):
        self.flush()
        self.producer.close()

Consumer Implementation

Kafka consumers must handle partitioning, offset management, rebalancing, and error handling. Robust consumer design is essential for reliable data processing.

Basic Consumer

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import json

class ReliableKafkaConsumer:
    def __init__(
        self,
        bootstrap_servers: list,
        group_id: str,
        topics: list,
        auto_offset_reset: str = 'earliest',
        enable_auto_commit: bool = False,
        max_poll_records: int = 500,
        max_poll_interval_ms: int = 300000
    ):
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset=auto_offset_reset,
            enable_auto_commit=enable_auto_commit,
            max_poll_records=max_poll_records,
            max_poll_interval_ms=max_poll_interval_ms,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            heartbeat_interval_ms=3000,
            session_timeout_ms=10000,
            request_timeout_ms=30000
        )
        
        self.consumer.subscribe(topics)
    
    def consume(self, timeout_ms: int = 1000, max_records: int = None):
        return self.consumer.poll(timeout_ms=timeout_ms, max_records=max_records)
    
    def process_messages(self, handler: Callable):
        try:
            while True:
                records = self.consume()
                
                for topic_partition, messages in records.items():
                    for message in messages:
                        try:
                            handler(message.value, message)
                        except Exception as e:
                            print(f"Error processing message: {e}")
                
                if not self.consumer.assignment():
                    continue
                
                self.consumer.commit()
        
        except KeyboardInterrupt:
            self.close()
    
    def commit(self):
        self.consumer.commit()
    
    def close(self):
        self.consumer.close()

Consumer with Error Handling

import logging
from datetime import datetime
import dead_letter_queue_producer

class ResilientConsumer:
    def __init__(
        self,
        bootstrap_servers: list,
        group_id: str,
        topics: list,
        dlq_topic: str = None
    ):
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            topics=topics,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            enable_auto_commit=False
        )
        
        self.dlq_producer = None
        if dlq_topic:
            self.dlq_producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                value_serializer=lambda v: json.dumps(v).encode('utf-8')
            )
            self.dlq_topic = dlq_topic
        
        self.logger = logging.getLogger(__name__)
        self.max_retries = 3
    
    def process_with_retry(self, handler: Callable, message: dict) -> bool:
        for attempt in range(self.max_retries):
            try:
                handler(message)
                return True
            except RetryableError as e:
                if attempt < self.max_retries - 1:
                    self.logger.warning(f"Retryable error: {e}, attempt {attempt + 1}")
                    time.sleep(2 ** attempt)
                else:
                    raise
            except NonRetryableError as e:
                self.logger.error(f"Non-retryable error: {e}")
                raise
        
        return False
    
    def send_to_dlq(self, message: dict, error: Exception):
        if not self.dlq_producer:
            return
        
        dlq_message = {
            'original_message': message,
            'error': str(error),
            'error_type': type(error).__name__,
            'timestamp': datetime.utcnow().isoformat(),
            'retry_count': 0
        }
        
        self.dlq_producer.send(self.dlq_topic, value=dlq_message)
        self.dlq_producer.flush()
    
    def consume(self):
        try:
            for message in self.consumer:
                try:
                    handler(message.value)
                    self.consumer.commit()
                except Exception as e:
                    self.logger.error(f"Error processing message: {e}")
                    self.send_to_dlq(message.value, e)
                    self.consumer.commit()
        
        finally:
            self.consumer.close()
            if self.dlq_producer:
                self.dlq_producer.close()

Consumer Groups and Rebalancing

class StatefulConsumer:
    def __init__(self, bootstrap_servers: list, group_id: str, topics: list):
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            topics=topics,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            enable_auto_commit=False
        )
        
        self.state = {}
        self.partition_offsets = {}
        
        self._setup_rebalance_listener()
        self._load_state()
    
    def _setup_rebalance_listener(self):
        def on_partitions_assigned(partitions):
            print(f"Assigned partitions: {partitions}")
            for partition in partitions:
                self.partition_offsets[partition] = 0
        
        def on_partitions_revoked(partitions):
            print(f"Revoking partitions: {partitions}")
            self._save_state()
            for partition in partitions:
                if partition in self.partition_offsets:
                    del self.partition_offsets[partition]
        
        self.consumer.subscribe(
            topics=self.consumer.subscription(),
            on_revoke=on_partitions_revoked,
            on_assign=on_partitions_assigned
        )
    
    def _load_state(self):
        pass
    
    def _save_state(self):
        pass
    
    def get_current_offsets(self) -> dict:
        offsets = {}
        for partition in self.consumer.assignment():
            current_offset = self.consumer.position(partition)
            offsets[partition] = current_offset
        return offsets
    
    def seek_to_beginning(self, partitions: list = None):
        if partitions is None:
            partitions = self.consumer.assignment()
        
        self.consumer.seek_to_beginning(*partitions)
    
    def seek_to_end(self, partitions: list = None):
        if partitions is None:
            partitions = self.consumer.assignment()
        
        self.consumer.seek_to_end(*partitions)
    
    def seek_to_offset(self, partition: int, offset: int):
        tp = TopicPartition(topic=self.consumer.subscription()[0], partition=partition)
        self.consumer.seek(tp, offset)

Kafka Streams

Kafka Streams is a powerful library for building real-time stream processing applications. It provides exactly-once processing, stateful operations, and windowing capabilities.

Stream Processing Basics

from kafka import KafkaProducer
from kafka.streams import KafkaStreams
from kafka.structs import TimestampType
import json

class WordCountStream:
    def __init__(self, application_id: str, bootstrap_servers: list):
        self.app_id = application_id
        self.bootstrap_servers = bootstrap_servers
        
        self.streams = KafkaStreams(
            self._get_processorTopology(),
            {
                'application.id': application_id,
                'bootstrap.servers': bootstrap_servers,
                'processing.guarantee': 'exactly_once_v2',
                'commit.interval.ms': 1000,
                'state.dir': '/tmp/kafka-streams'
            }
        )
    
    def _get_processorTopology(self):
        from kafka.streams.state import Stores
        
        builder = StreamBuilder()
        
        source = builder.stream('text-lines')
        
        flat_map = source.flat_map_values(
            lambda value: value.lower().split()
        )
        
        grouped = flat_map.group_by(lambda key, value: value)
        
        counts = grouped.count(Materialized.as('counts-store'))
        
        counts.to_stream().to('word-counts-output')
        
        return builder
    
    def start(self):
        self.streams.start()
    
    def stop(self):
        self.streams.stop()
    
    def clean_up(self):
        self.streams.clean_up()

Stateful Stream Processing

class EnrichmentStream:
    def __init__(self, bootstrap_servers: list):
        self.config = {
            'bootstrap.servers': bootstrap_servers,
            'application.id': 'enrichment-stream',
            'default.key.serde': Serdes.String(),
            'default.value.serde': Serdes.Json()
        }
    
    def build_topology(self):
        builder = StreamBuilder()
        
        orders = builder.stream('orders')
        
        products_table = builder.table(
            'products',
            Materialized.as('products-store')
        )
        
        users_table = builder.table(
            'users',
            Materialized.as('users-store')
        )
        
        enriched_orders = orders.join(
            products_table,
            lambda order: order['product_id'],
            lambda order, product: {**order, 'product': product}
        )
        
        final_enriched = enriched_orders.join(
            users_table,
            lambda order: order['user_id'],
            lambda order, user: {**order, 'user': user}
        )
        
        final_enriched.to('enriched-orders')
        
        return builder.build(self.config)
    
    def process(self):
        streams = KafkaStreams(self.build_topology(), self.config)
        streams.start()

Windowing Operations

class WindowedAggregation:
    def __init__(self, bootstrap_servers: list):
        self.bootstrap_servers = bootstrap_servers
    
    def tumbling_window(self):
        builder = StreamBuilder()
        
        source = builder.stream('clicks')
        
        windowed = source \
            .group_by_key() \
            .windowed_by(
                WindowedTime.of(
                    Duration.ofSeconds(60)
                ).with_earliestGrace(Duration.ofSeconds(10))
            ) \
            .count() \
            .to_stream() \
            .to('minute-click-counts')
        
        return builder
    
    def hopping_window(self):
        builder = StreamBuilder()
        
        source = builder.stream('events')
        
        windowed = source \
            .group_by_key() \
            .windowed_by(
                WindowedTime.of(Duration.ofMinutes(5))
                    .advanceBy(Duration.ofMinutes(1))
            ) \
            .aggregate(
                initializer=lambda: 0,
                aggregator=lambda key, value, agg: agg + value['value']
            ) \
            .to_stream() \
            .to('five-minute-aggregates')
        
        return builder
    
    def session_window(self):
        builder = StreamBuilder()
        
        source = builder.stream('user-actions')
        
        session_windowed = source \
            .group_by_key() \
            .windowed_by(
                SessionWindows.of(Duration.ofMinutes(30))
                    .with_gap(Duration.ofMinutes(5))
            ) \
            .count() \
            .to_stream() \
            .to('session-counts')
        
        return builder

Schema Management

Schema management is critical for Kafka applications. Using a schema registry ensures producers and consumers can evolve schemas without breaking compatibility.

Schema Registry Integration

from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer
from confluent_kafka.schema_registry.json_schema import JsonSchemaSerializer, JsonSchemaDeserializer

class SchemaRegistryManager:
    def __init__(self, schema_registry_url: str):
        self.client = SchemaRegistryClient({'url': schema_registry_url})
    
    def register_schema(self, subject: str, schema: str, schema_type: str = 'AVRO') -> int:
        schema_id = self.client.register_schema(
            subject_name=subject,
            schema_reference=None,
            schema_str=schema,
            schema_type=schema_type
        )
        return schema_id
    
    def get_schema(self, subject: str, version: str = 'latest'):
        schema = self.client.get_latest_version(subject)
        return schema
    
    def check_compatibility(self, subject: str, new_schema: str) -> bool:
        compatibility = self.client.test_compatibility(
            subject_name=subject,
            schema_str=new_schema
        )
        return compatibility.is_compatible

class AvroProducer:
    def __init__(self, bootstrap_servers: list, schema_registry_url: str):
        self.client = SchemaRegistryClient({'url': schema_registry_url})
        
        self.producer = Producer({
            'bootstrap.servers': bootstrap_servers,
            'value.serializer': JsonSchemaSerializer(
                schema_registry_client=self.client
            )
        })
    
    def send(self, topic: str, value: dict, key: str = None):
        self.producer.produce(
            topic=topic,
            value=value,
            key=key
        )
        self.producer.flush()

class AvroConsumer:
    def __init__(self, bootstrap_servers: list, schema_registry_url: str, group_id: str, topics: list):
        self.client = SchemaRegistryClient({'url': schema_registry_url})
        
        self.consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest',
            'value.deserializer': JsonSchemaDeserializer(
                schema_registry_client=self.client
            )
        })
        
        self.consumer.subscribe(topics)
    
    def consume(self):
        return self.consumer.poll(timeout=1.0)

Schema Evolution

class SchemaEvolutionManager:
    def __init__(self, schema_registry_client):
        self.client = schema_registry_client
    
    def evolve_schema(self, subject: str, new_schema: str) -> dict:
        compatibility = self.client.test_compatibility(
            subject_name=subject,
            schema_str=new_schema
        )
        
        if compatibility.is_compatible:
            new_version = self.client.register_schema(
                subject_name=subject,
                schema_str=new_schema
            )
            return {
                'success': True,
                'version': new_version,
                'message': 'Schema registered successfully'
            }
        else:
            return {
                'success': False,
                'incompatible': compatibility.incompatible_changes,
                'message': 'Schema incompatible with current version'
            }
    
    def rollback_schema(self, subject: str, target_version: int):
        schema = self.client.get_version(subject, target_version)
        
        return self.client.register_schema(
            subject_name=subject,
            schema_str=schema.schema_str
        )

Security

Kafka security encompasses authentication, authorization, and encryption. Modern deployments require comprehensive security configurations.

Security Configuration

class SecureKafkaProducer:
    def __init__(
        self,
        bootstrap_servers: list,
        security_protocol: str = 'SSL',
        ssl_cafile: str = 'ca.pem',
        ssl_certfile: str = 'service.crt',
        ssl_keyfile: str = 'service.key',
        sasl_mechanism: str = 'SCRAM-SHA-512',
        sasl_username: str = None,
        sasl_password: str = None
    ):
        config = {
            'bootstrap.servers': bootstrap_servers,
            'security.protocol': security_protocol,
            'ssl.ca.location': ssl_cafile,
            'ssl.certificate.location': ssl_certfile,
            'ssl.key.location': ssl_keyfile,
        }
        
        if sasl_username and sasl_password:
            config.update({
                'sasl.mechanism': sasl_mechanism,
                'sasl.username': sasl_username,
                'sasl.password': sasl_password
            })
        
        self.producer = KafkaProducer(**config)
    
    def send_with_acl(self, topic: str, value: dict):
        try:
            future = self.producer.send(topic, value=value)
            metadata = future.get(timeout=10)
            return metadata
        except Exception as e:
            raise e

ACL Management

class KafkaACLManager:
    def __init__(self, admin_client: KafkaAdminClient):
        self.admin = admin_client
    
    def create_producer_acls(self, principal: str, topic: str):
        from kafka.acl import AclBinding, AclOperation, AclPermissionType
        
        acls = [
            AclBinding(
                resource_type=Topic.RESOURCE_TYPE,
                resource_name=topic,
                principal=principal,
                host='*',
                operation=AclOperation.WRITE,
                permission_type=AclPermissionType.ALLOW
            ),
            AclBinding(
                resource_type=Topic.RESOURCE_TYPE,
                resource_name=topic,
                principal=principal,
                host='*',
                operation=AclOperation.CREATE,
                permission_type=AclPermissionType.ALLOW
            ),
            AclBinding(
                resource_type=Topic.RESOURCE_TYPE,
                resource_name=topic,
                principal=principal,
                host='*',
                operation=AclOperation.DESCRIBE,
                permission_type=AclPermissionType.ALLOW
            )
        ]
        
        for acl in acls:
            self.admin.create_acls([acl])
    
    def create_consumer_acls(self, principal: str, group: str, topic: str):
        from kafka.acl import AclBinding, AclOperation, AclPermissionType
        
        acls = [
            AclBinding(
                resource_type=Topic.RESOURCE_TYPE,
                resource_name=topic,
                principal=principal,
                host='*',
                operation=AclOperation.READ,
                permission_type=AclPermissionType.ALLOW
            ),
            AclBinding(
                resource_type=Group.RESOURCE_TYPE,
                resource_name=group,
                principal=principal,
                host='*',
                operation=AclOperation.READ,
                permission_type=AclPermissionType.ALLOW
            )
        ]
        
        for acl in acls:
            self.admin.create_acls([acl])

Operations and Monitoring

Operating Kafka requires understanding metrics, troubleshooting, and maintenance procedures.

Monitoring Setup

from prometheus_client import start_http_server, Counter, Gauge
import time

class KafkaMetrics:
    def __init__(self):
        self.messages_produced = Counter(
            'kafka_messages_produced_total',
            'Total messages produced',
            ['topic']
        )
        
        self.messages_consumed = Counter(
            'kafka_messages_consumed_total',
            'Total messages consumed',
            ['topic', 'group']
        )
        
        self.producer_latency = Gauge(
            'kafka_producer_latency_seconds',
            'Producer send latency',
            ['topic']
        )
        
        self.consumer_lag = Gauge(
            'kafka_consumer_lag',
            'Consumer lag in messages',
            ['topic', 'partition', 'group']
        )
    
    def record_produced(self, topic: str):
        self.messages_produced.labels(topic=topic).inc()
    
    def record_consumed(self, topic: str, group: str):
        self.messages_consumed.labels(topic=topic, group=group).inc()
    
    def update_producer_latency(self, topic: str, latency: float):
        self.producer_latency.labels(topic=topic).set(latency)
    
    def update_consumer_lag(self, topic: str, partition: int, group: str, lag: int):
        self.consumer_lag.labels(
            topic=topic,
            partition=partition,
            group=group
        ).set(lag)

class ConsumerMonitor:
    def __init__(self, consumer: KafkaConsumer, metrics: KafkaMetrics):
        self.consumer = consumer
        self.metrics = metrics
    
    def monitor_lag(self):
        while True:
            for partition in self.consumer.assignment():
                committed = self.consumer.committed(partition)
                position = self.consumer.position(partition)
                
                lag = position - committed if committed is not None else 0
                
                self.metrics.update_consumer_lag(
                    topic=partition.topic,
                    partition=partition.partition,
                    group=self.consumer.group_id,
                    lag=lag
                )
            
            time.sleep(10)

Performance Tuning

class KafkaPerformanceTuner:
    @staticmethod
    def tune_producer(throughput: int = 50, latency_ms: int = 10) -> dict:
        return {
            'batch.size': 16384,
            'linger.ms': latency_ms,
            'buffer.memory': 33554432,
            'compression.type': 'lz4',
            'max.in.flight.requests.per.connection': 5,
            'retries': 3,
            'acks': 'all',
            'max.block.ms': 60000
        }
    
    @staticmethod
    def tune_consumer(parallelism: int = 10) -> dict:
        return {
            'fetch.min.bytes': 1,
            'fetch.max.wait.ms': 500,
            'max.poll.records': 500,
            'max.poll.interval.ms': 300000,
            'session.timeout.ms': 10000,
            'heartbeat.interval.ms': 3000,
            'enable.auto.commit': False,
            'auto.offset.reset': 'earliest'
        }
    
    @staticmethod
    def tune_stream_app() -> dict:
        return {
            'processing.guarantee': 'exactly_once_v2',
            'commit.interval.ms': 1000,
            'state.dir': '/var/lib/kafka/streams',
            'num.stream.threads': 2,
            'application.server': ''
        }

Best Practices

Producer Best Practices

BEST_PRACTICES = {
    "producers": [
        "Use idempotent producers for exactly-once semantics",
        "Set acks='all' for critical data",
        "Implement retry logic with exponential backoff",
        "Use compression to reduce network overhead",
        "Batch messages for higher throughput",
        "Monitor producer metrics: error-rate, queue-time, request-latency"
    ],
    
    "consumers": [
        "Always commit offsets after successful processing",
        "Handle rebalances gracefully",
        "Use consumer groups for parallel processing",
        "Process records in batches for better throughput",
        "Monitor consumer lag - should be near zero",
        "Implement dead letter queue for failed messages"
    ],
    
    "topics": [
        "Use at least 3 partitions for parallelism",
        "Size partitions based on throughput requirements",
        "Use compacted topics for key-value data",
        "Set appropriate retention policies",
        "Monitor partition size and rebalance if needed"
    ],
    
    "security": [
        "Enable SSL/TLS for encryption in transit",
        "Use SASL for authentication",
        "Implement ACLs for authorization",
        "Rotate credentials regularly",
        "Enable audit logging"
    ]
}

Use Cases

Event Sourcing

class EventSourcingStore:
    def __init__(self, bootstrap_servers: list):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id='event-sourcing-store',
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest'
        )
    
    def append_event(self, aggregate_id: str, event: dict):
        event['aggregate_id'] = aggregate_id
        event['timestamp'] = time.time()
        
        self.producer.send(
            topic=f'events-{aggregate_id}',
            value=event,
            key=aggregate_id
        )
        self.producer.flush()
    
    def get_events(self, aggregate_id: str):
        self.consumer.subscribe([f'events-{aggregate_id}'])
        
        events = []
        for message in self.consumer:
            events.append(message.value)
        
        return events

Change Data Capture

class CDCProcessor:
    def __init__(self, bootstrap_servers: list):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
    
    def process_debezium_event(self, event: dict):
        payload = event.get('payload', {})
        
        if payload.get('op') == 'c':
            self._handle_create(payload)
        elif payload.get('op') == 'u':
            self._handle_update(payload)
        elif payload.get('op') == 'd':
            self._handle_delete(payload)
    
    def _handle_create(self, payload: dict):
        after = payload.get('after', {})
        self.producer.send('cdc-events', value=after)
    
    def _handle_update(self, payload: dict):
        before = payload.get('before', {})
        after = payload.get('after', {})
        change = {'before': before, 'after': after}
        self.producer.send('cdc-events', value=change)
    
    def _handle_delete(self, payload: dict):
        before = payload.get('before', {})
        self.producer.send('cdc-events', value={'deleted': before})

Resources

Conclusion

Apache Kafka has become the backbone of modern event-driven architectures. Understanding its architecture, proper implementation patterns, and operational considerations is essential for building robust, scalable systems.

This guide covered the fundamentals of topics and partitions, producer and consumer implementation, Kafka Streams for stream processing, schema management, security configurations, and operational best practices. With this knowledge, you’re well-equipped to design and implement enterprise-grade event streaming systems.

Comments