Introduction
Apache Kafka has evolved from a simple message queue to the backbone of modern event-driven architectures. In 2026, Kafka powers real-time data pipelines at most major enterprises, handling billions of events per day. Understanding Kafkaโits architecture, patterns, and best practicesโis essential for any software engineer building modern, event-driven systems.
This comprehensive guide explores Apache Kafka from fundamentals to advanced patterns. We cover core concepts, producer and consumer design, stream processing with Kafka Streams, schema management, security, operations, and real-world use cases. By the end, you’ll have the knowledge to design and implement robust, scalable event streaming systems.
Kafka Architecture Fundamentals
Kafka’s architecture is deceptively simple yet incredibly powerful. At its core, Kafka is a distributed, partitioned, replicated commit log that provides durability, ordering, and throughput guarantees difficult to match with traditional messaging systems.
Core Concepts
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Kafka Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Kafka Cluster โ โ
โ โ โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โ โ Broker 1โ โ Broker 2โ โ Broker 3โ ... โ โ
โ โ โ โ โ โ โ โ โ โ
โ โ โ Partitionโ โ Partitionโ โ Partitionโ โ โ
โ โ โ 0 โ โ 1 โ โ 2 โ โ โ
โ โ โ 1 โ โ 2 โ โ 0 โ โ โ
โ โ โ 2 โ โ 0 โ โ 1 โ โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โฒ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ ZooKeeper / KRaft โ โ
โ โ (Cluster Metadata Management) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key Components:
- Broker: A Kafka server that stores topics and serves client requests
- Topic: A named stream of events, partitioned for scalability
- Partition: Ordered, immutable sequence of events within a topic
- Producer: Client that publishes events to topics
- Consumer: Client that subscribes to topics and processes events
- Consumer Group: Set of consumers that cooperatively consume a topic
- Replica: Copies of partitions for fault tolerance
Topic and Partition Design
from kafka import KafkaAdminClient
from kafka.admin import NewTopic, ConfigResource, ConfigResourceType
class KafkaTopicManager:
def __init__(self, bootstrap_servers: list):
self.admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
def create_topic(
self,
name: str,
num_partitions: int = 3,
replication_factor: int = 3,
configs: dict = None
) -> bool:
topic = NewTopic(
name=name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs=configs or {}
)
try:
self.admin.create_topics([topic])
return True
except Exception as e:
print(f"Error creating topic: {e}")
return False
def create_topic_with_retention(
self,
name: str,
retention_bytes: int = -1,
retention_ms: int = 604800000
) -> bool:
configs = {
'retention.bytes': str(retention_bytes),
'retention.ms': str(retention_ms)
}
return self.create_topic(name, configs=configs)
def create_compacted_topic(self, name: str) -> bool:
configs = {
'cleanup.policy': 'compact',
'min.cleanable.dirty.ratio': '0.5',
'min.compaction.lag.ms': '0'
}
return self.create_topic(name, configs=configs)
def list_topics(self) -> list:
return self.admin.list_topics()
def describe_topic(self, name: str) -> dict:
cluster_metadata = self.admin.describe_configs(
config_resources=[
ConfigResource(ConfigResourceType.TOPIC, name)
]
)
configs = {}
for resource in cluster_metadata.values():
for config_name, config_value in resource.items():
configs[config_name] = config_value.value
return configs
def delete_topic(self, name: str) -> bool:
try:
self.admin.delete_topics([name])
return True
except Exception as e:
print(f"Error deleting topic: {e}")
return False
Replication and Fault Tolerance
class KafkaReplicationManager:
def __init__(self, admin_client: KafkaAdminClient):
self.admin = admin_client
def get_replica_assignment(self, topic: str) -> dict:
cluster_metadata = self.admin.describe_topics([topic])
partitions = {}
for tp, meta in cluster_metadata.items():
partitions[tp.partition] = {
'leader': meta.leader,
'isr': meta.isr_nodes,
'replicas': meta.replica_nodes
}
return partitions
def check_replication_health(self, topic: str) -> dict:
assignment = self.get_replica_assignment(topic)
under_replicated = 0
offline_replicas = 0
for partition, info in assignment.items():
if len(info['isr']) < len(info['replicas']):
under_replicated += 1
if info['leader'] == -1:
offline_replicas += 1
return {
'topic': topic,
'total_partitions': len(assignment),
'under_replicated': under_replicated,
'offline_replicas': offline_replicated,
'healthy': under_replicated == 0 and offline_replicas == 0
}
def increase_replication_factor(self, topic: str, new_replication_factor: int):
current_assignment = self.get_replica_assignment(topic)
new_assignment = {}
for partition, info in current_assignment.items():
replicas = list(info['replicas'])
while len(replicas) < new_replication_factor:
next_broker = (max(replicas) + 1) % 10
if next_broker not in replicas:
replicas.append(next_broker)
new_assignment[partition] = replicas
return self.admin.alter_partition_reassignments(
topic,
{p: {'replicas': r} for p, r in new_assignment.items()}
)
Producer Implementation
Kafka producers handle event serialization, partitioning, batching, and reliability. Getting the producer right is crucial for system stability and performance.
Basic Producer
from kafka import KafkaProducer
from kafka.errors import KafkaError, ProducerFenced
import json
import time
class ReliableKafkaProducer:
def __init__(
self,
bootstrap_servers: list,
acks: str = 'all',
retries: int = 3,
max_in_flight_requests_per_connection: int = 5,
enable_idempotence: bool = True
):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
acks=acks,
retries=retries,
max_in_flight_requests_per_connection=(
max_in_flight_requests_per_connection
if not enable_idempotence else 5
),
enable_idempotence=enable_idempotence,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
compression_type='lz4',
batch_size=16384,
linger_ms=10,
buffer_memory=33554432,
max_block_ms=60000,
request_timeout_ms=30000
)
def send(
self,
topic: str,
value: dict,
key: str = None,
partition: int = None,
timestamp_ms: int = None
) -> str:
future = self.producer.send(
topic=topic,
value=value,
key=key,
partition=partition,
timestamp_ms=timestamp_ms
)
try:
record_metadata = future.get(timeout=10)
return f"{record_metadata.topic}-{record_metadata.partition}-{record_metadata.offset}"
except KafkaError as e:
raise e
def send_async(self, topic: str, value: dict, key: str = None):
def on_success(record_metadata):
print(f"Success: {record_metadata.topic}-{record_metadata.partition}")
def on_error(exc):
print(f"Error: {exc}")
future = self.producer.send(topic, value=value, key=key)
future.add_callback(on_success)
future.add_errback(on_error)
def flush(self):
self.producer.flush()
def close(self):
self.producer.close()
Custom Partitioner
from kafka.producer.partitioner import Partitioner
from kafka.errors import NotEnoughReplicas
class CustomPartitioner(Partitioner):
def __init__(self, configs: dict = None):
self.partition_count = None
def partition(self, key: bytes, all_partitions: int, available: int) -> int:
if not key:
return self.round_robin_partitioner(key, all_partitions, available)
key_str = key.decode('utf-8')
if key_str.startswith('user_'):
user_id = int(key_str.split('_')[1])
return user_id % all_partitions
if key_str.startswith('order_'):
order_id = int(key_str.split('_')[1])
return order_id % all_partitions
return abs(hash(key)) % all_partitions
def round_robin_partitioner(self, key: bytes, all_partitions: int, available: int) -> int:
if available:
return available[hash(key) % len(available)]
return all_partitions[hash(key) % all_partitions]
class KeyBasedPartitioner:
@staticmethod
def consistent_partition(key: bytes, num_partitions: int) -> int:
if not key:
raise ValueError("Key is required for consistent partitioning")
return abs(hash(key)) % num_partitions
@staticmethod
def composite_key_partition(key: bytes, num_partitions: int) -> int:
key_str = key.decode('utf-8')
if ':' in key_str:
primary_key = key_str.split(':')[0]
return abs(hash(primary_key)) % num_partitions
return abs(hash(key)) % num_partitions
Batch Producer with Retries
import threading
from collections import deque
from typing import Callable
class BatchKafkaProducer:
def __init__(
self,
bootstrap_servers: list,
batch_size: int = 100,
flush_interval_seconds: int = 5,
max_retries: int = 3
):
self.bootstrap_servers = bootstrap_servers
self.batch_size = batch_size
self.flush_interval_seconds = flush_interval_seconds
self.max_retries = max_retries
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
batch_size=batch_size * 1024,
linger_ms=1000,
compression_type='zstd'
)
self.pending = deque()
self.lock = threading.Lock()
self._start_flush_timer()
def _start_flush_timer(self):
def flush_periodically():
while True:
time.sleep(self.flush_interval_seconds)
self.flush()
thread = threading.Thread(target=flush_periodically, daemon=True)
thread.start()
def send(self, topic: str, value: dict, key: str = None):
with self.lock:
future = self.producer.send(topic, value=value, key=key)
self.pending.append({
'future': future,
'topic': topic,
'value': value,
'retry_count': 0
})
def flush(self):
with self.lock:
while self.pending:
item = self.pending[0]
try:
item['future'].get(timeout=1)
self.pending.popleft()
except Exception as e:
if item['retry_count'] < self.max_retries:
item['retry_count'] += 1
item['future'] = self.producer.send(
item['topic'],
value=item['value']
)
else:
print(f"Failed after {self.max_retries} retries: {e}")
self.pending.popleft()
def close(self):
self.flush()
self.producer.close()
Consumer Implementation
Kafka consumers must handle partitioning, offset management, rebalancing, and error handling. Robust consumer design is essential for reliable data processing.
Basic Consumer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import json
class ReliableKafkaConsumer:
def __init__(
self,
bootstrap_servers: list,
group_id: str,
topics: list,
auto_offset_reset: str = 'earliest',
enable_auto_commit: bool = False,
max_poll_records: int = 500,
max_poll_interval_ms: int = 300000
):
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
max_poll_records=max_poll_records,
max_poll_interval_ms=max_poll_interval_ms,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
heartbeat_interval_ms=3000,
session_timeout_ms=10000,
request_timeout_ms=30000
)
self.consumer.subscribe(topics)
def consume(self, timeout_ms: int = 1000, max_records: int = None):
return self.consumer.poll(timeout_ms=timeout_ms, max_records=max_records)
def process_messages(self, handler: Callable):
try:
while True:
records = self.consume()
for topic_partition, messages in records.items():
for message in messages:
try:
handler(message.value, message)
except Exception as e:
print(f"Error processing message: {e}")
if not self.consumer.assignment():
continue
self.consumer.commit()
except KeyboardInterrupt:
self.close()
def commit(self):
self.consumer.commit()
def close(self):
self.consumer.close()
Consumer with Error Handling
import logging
from datetime import datetime
import dead_letter_queue_producer
class ResilientConsumer:
def __init__(
self,
bootstrap_servers: list,
group_id: str,
topics: list,
dlq_topic: str = None
):
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id,
topics=topics,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
enable_auto_commit=False
)
self.dlq_producer = None
if dlq_topic:
self.dlq_producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.dlq_topic = dlq_topic
self.logger = logging.getLogger(__name__)
self.max_retries = 3
def process_with_retry(self, handler: Callable, message: dict) -> bool:
for attempt in range(self.max_retries):
try:
handler(message)
return True
except RetryableError as e:
if attempt < self.max_retries - 1:
self.logger.warning(f"Retryable error: {e}, attempt {attempt + 1}")
time.sleep(2 ** attempt)
else:
raise
except NonRetryableError as e:
self.logger.error(f"Non-retryable error: {e}")
raise
return False
def send_to_dlq(self, message: dict, error: Exception):
if not self.dlq_producer:
return
dlq_message = {
'original_message': message,
'error': str(error),
'error_type': type(error).__name__,
'timestamp': datetime.utcnow().isoformat(),
'retry_count': 0
}
self.dlq_producer.send(self.dlq_topic, value=dlq_message)
self.dlq_producer.flush()
def consume(self):
try:
for message in self.consumer:
try:
handler(message.value)
self.consumer.commit()
except Exception as e:
self.logger.error(f"Error processing message: {e}")
self.send_to_dlq(message.value, e)
self.consumer.commit()
finally:
self.consumer.close()
if self.dlq_producer:
self.dlq_producer.close()
Consumer Groups and Rebalancing
class StatefulConsumer:
def __init__(self, bootstrap_servers: list, group_id: str, topics: list):
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id,
topics=topics,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
enable_auto_commit=False
)
self.state = {}
self.partition_offsets = {}
self._setup_rebalance_listener()
self._load_state()
def _setup_rebalance_listener(self):
def on_partitions_assigned(partitions):
print(f"Assigned partitions: {partitions}")
for partition in partitions:
self.partition_offsets[partition] = 0
def on_partitions_revoked(partitions):
print(f"Revoking partitions: {partitions}")
self._save_state()
for partition in partitions:
if partition in self.partition_offsets:
del self.partition_offsets[partition]
self.consumer.subscribe(
topics=self.consumer.subscription(),
on_revoke=on_partitions_revoked,
on_assign=on_partitions_assigned
)
def _load_state(self):
pass
def _save_state(self):
pass
def get_current_offsets(self) -> dict:
offsets = {}
for partition in self.consumer.assignment():
current_offset = self.consumer.position(partition)
offsets[partition] = current_offset
return offsets
def seek_to_beginning(self, partitions: list = None):
if partitions is None:
partitions = self.consumer.assignment()
self.consumer.seek_to_beginning(*partitions)
def seek_to_end(self, partitions: list = None):
if partitions is None:
partitions = self.consumer.assignment()
self.consumer.seek_to_end(*partitions)
def seek_to_offset(self, partition: int, offset: int):
tp = TopicPartition(topic=self.consumer.subscription()[0], partition=partition)
self.consumer.seek(tp, offset)
Kafka Streams
Kafka Streams is a powerful library for building real-time stream processing applications. It provides exactly-once processing, stateful operations, and windowing capabilities.
Stream Processing Basics
from kafka import KafkaProducer
from kafka.streams import KafkaStreams
from kafka.structs import TimestampType
import json
class WordCountStream:
def __init__(self, application_id: str, bootstrap_servers: list):
self.app_id = application_id
self.bootstrap_servers = bootstrap_servers
self.streams = KafkaStreams(
self._get_processorTopology(),
{
'application.id': application_id,
'bootstrap.servers': bootstrap_servers,
'processing.guarantee': 'exactly_once_v2',
'commit.interval.ms': 1000,
'state.dir': '/tmp/kafka-streams'
}
)
def _get_processorTopology(self):
from kafka.streams.state import Stores
builder = StreamBuilder()
source = builder.stream('text-lines')
flat_map = source.flat_map_values(
lambda value: value.lower().split()
)
grouped = flat_map.group_by(lambda key, value: value)
counts = grouped.count(Materialized.as('counts-store'))
counts.to_stream().to('word-counts-output')
return builder
def start(self):
self.streams.start()
def stop(self):
self.streams.stop()
def clean_up(self):
self.streams.clean_up()
Stateful Stream Processing
class EnrichmentStream:
def __init__(self, bootstrap_servers: list):
self.config = {
'bootstrap.servers': bootstrap_servers,
'application.id': 'enrichment-stream',
'default.key.serde': Serdes.String(),
'default.value.serde': Serdes.Json()
}
def build_topology(self):
builder = StreamBuilder()
orders = builder.stream('orders')
products_table = builder.table(
'products',
Materialized.as('products-store')
)
users_table = builder.table(
'users',
Materialized.as('users-store')
)
enriched_orders = orders.join(
products_table,
lambda order: order['product_id'],
lambda order, product: {**order, 'product': product}
)
final_enriched = enriched_orders.join(
users_table,
lambda order: order['user_id'],
lambda order, user: {**order, 'user': user}
)
final_enriched.to('enriched-orders')
return builder.build(self.config)
def process(self):
streams = KafkaStreams(self.build_topology(), self.config)
streams.start()
Windowing Operations
class WindowedAggregation:
def __init__(self, bootstrap_servers: list):
self.bootstrap_servers = bootstrap_servers
def tumbling_window(self):
builder = StreamBuilder()
source = builder.stream('clicks')
windowed = source \
.group_by_key() \
.windowed_by(
WindowedTime.of(
Duration.ofSeconds(60)
).with_earliestGrace(Duration.ofSeconds(10))
) \
.count() \
.to_stream() \
.to('minute-click-counts')
return builder
def hopping_window(self):
builder = StreamBuilder()
source = builder.stream('events')
windowed = source \
.group_by_key() \
.windowed_by(
WindowedTime.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1))
) \
.aggregate(
initializer=lambda: 0,
aggregator=lambda key, value, agg: agg + value['value']
) \
.to_stream() \
.to('five-minute-aggregates')
return builder
def session_window(self):
builder = StreamBuilder()
source = builder.stream('user-actions')
session_windowed = source \
.group_by_key() \
.windowed_by(
SessionWindows.of(Duration.ofMinutes(30))
.with_gap(Duration.ofMinutes(5))
) \
.count() \
.to_stream() \
.to('session-counts')
return builder
Schema Management
Schema management is critical for Kafka applications. Using a schema registry ensures producers and consumers can evolve schemas without breaking compatibility.
Schema Registry Integration
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer
from confluent_kafka.schema_registry.json_schema import JsonSchemaSerializer, JsonSchemaDeserializer
class SchemaRegistryManager:
def __init__(self, schema_registry_url: str):
self.client = SchemaRegistryClient({'url': schema_registry_url})
def register_schema(self, subject: str, schema: str, schema_type: str = 'AVRO') -> int:
schema_id = self.client.register_schema(
subject_name=subject,
schema_reference=None,
schema_str=schema,
schema_type=schema_type
)
return schema_id
def get_schema(self, subject: str, version: str = 'latest'):
schema = self.client.get_latest_version(subject)
return schema
def check_compatibility(self, subject: str, new_schema: str) -> bool:
compatibility = self.client.test_compatibility(
subject_name=subject,
schema_str=new_schema
)
return compatibility.is_compatible
class AvroProducer:
def __init__(self, bootstrap_servers: list, schema_registry_url: str):
self.client = SchemaRegistryClient({'url': schema_registry_url})
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'value.serializer': JsonSchemaSerializer(
schema_registry_client=self.client
)
})
def send(self, topic: str, value: dict, key: str = None):
self.producer.produce(
topic=topic,
value=value,
key=key
)
self.producer.flush()
class AvroConsumer:
def __init__(self, bootstrap_servers: list, schema_registry_url: str, group_id: str, topics: list):
self.client = SchemaRegistryClient({'url': schema_registry_url})
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest',
'value.deserializer': JsonSchemaDeserializer(
schema_registry_client=self.client
)
})
self.consumer.subscribe(topics)
def consume(self):
return self.consumer.poll(timeout=1.0)
Schema Evolution
class SchemaEvolutionManager:
def __init__(self, schema_registry_client):
self.client = schema_registry_client
def evolve_schema(self, subject: str, new_schema: str) -> dict:
compatibility = self.client.test_compatibility(
subject_name=subject,
schema_str=new_schema
)
if compatibility.is_compatible:
new_version = self.client.register_schema(
subject_name=subject,
schema_str=new_schema
)
return {
'success': True,
'version': new_version,
'message': 'Schema registered successfully'
}
else:
return {
'success': False,
'incompatible': compatibility.incompatible_changes,
'message': 'Schema incompatible with current version'
}
def rollback_schema(self, subject: str, target_version: int):
schema = self.client.get_version(subject, target_version)
return self.client.register_schema(
subject_name=subject,
schema_str=schema.schema_str
)
Security
Kafka security encompasses authentication, authorization, and encryption. Modern deployments require comprehensive security configurations.
Security Configuration
class SecureKafkaProducer:
def __init__(
self,
bootstrap_servers: list,
security_protocol: str = 'SSL',
ssl_cafile: str = 'ca.pem',
ssl_certfile: str = 'service.crt',
ssl_keyfile: str = 'service.key',
sasl_mechanism: str = 'SCRAM-SHA-512',
sasl_username: str = None,
sasl_password: str = None
):
config = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': security_protocol,
'ssl.ca.location': ssl_cafile,
'ssl.certificate.location': ssl_certfile,
'ssl.key.location': ssl_keyfile,
}
if sasl_username and sasl_password:
config.update({
'sasl.mechanism': sasl_mechanism,
'sasl.username': sasl_username,
'sasl.password': sasl_password
})
self.producer = KafkaProducer(**config)
def send_with_acl(self, topic: str, value: dict):
try:
future = self.producer.send(topic, value=value)
metadata = future.get(timeout=10)
return metadata
except Exception as e:
raise e
ACL Management
class KafkaACLManager:
def __init__(self, admin_client: KafkaAdminClient):
self.admin = admin_client
def create_producer_acls(self, principal: str, topic: str):
from kafka.acl import AclBinding, AclOperation, AclPermissionType
acls = [
AclBinding(
resource_type=Topic.RESOURCE_TYPE,
resource_name=topic,
principal=principal,
host='*',
operation=AclOperation.WRITE,
permission_type=AclPermissionType.ALLOW
),
AclBinding(
resource_type=Topic.RESOURCE_TYPE,
resource_name=topic,
principal=principal,
host='*',
operation=AclOperation.CREATE,
permission_type=AclPermissionType.ALLOW
),
AclBinding(
resource_type=Topic.RESOURCE_TYPE,
resource_name=topic,
principal=principal,
host='*',
operation=AclOperation.DESCRIBE,
permission_type=AclPermissionType.ALLOW
)
]
for acl in acls:
self.admin.create_acls([acl])
def create_consumer_acls(self, principal: str, group: str, topic: str):
from kafka.acl import AclBinding, AclOperation, AclPermissionType
acls = [
AclBinding(
resource_type=Topic.RESOURCE_TYPE,
resource_name=topic,
principal=principal,
host='*',
operation=AclOperation.READ,
permission_type=AclPermissionType.ALLOW
),
AclBinding(
resource_type=Group.RESOURCE_TYPE,
resource_name=group,
principal=principal,
host='*',
operation=AclOperation.READ,
permission_type=AclPermissionType.ALLOW
)
]
for acl in acls:
self.admin.create_acls([acl])
Operations and Monitoring
Operating Kafka requires understanding metrics, troubleshooting, and maintenance procedures.
Monitoring Setup
from prometheus_client import start_http_server, Counter, Gauge
import time
class KafkaMetrics:
def __init__(self):
self.messages_produced = Counter(
'kafka_messages_produced_total',
'Total messages produced',
['topic']
)
self.messages_consumed = Counter(
'kafka_messages_consumed_total',
'Total messages consumed',
['topic', 'group']
)
self.producer_latency = Gauge(
'kafka_producer_latency_seconds',
'Producer send latency',
['topic']
)
self.consumer_lag = Gauge(
'kafka_consumer_lag',
'Consumer lag in messages',
['topic', 'partition', 'group']
)
def record_produced(self, topic: str):
self.messages_produced.labels(topic=topic).inc()
def record_consumed(self, topic: str, group: str):
self.messages_consumed.labels(topic=topic, group=group).inc()
def update_producer_latency(self, topic: str, latency: float):
self.producer_latency.labels(topic=topic).set(latency)
def update_consumer_lag(self, topic: str, partition: int, group: str, lag: int):
self.consumer_lag.labels(
topic=topic,
partition=partition,
group=group
).set(lag)
class ConsumerMonitor:
def __init__(self, consumer: KafkaConsumer, metrics: KafkaMetrics):
self.consumer = consumer
self.metrics = metrics
def monitor_lag(self):
while True:
for partition in self.consumer.assignment():
committed = self.consumer.committed(partition)
position = self.consumer.position(partition)
lag = position - committed if committed is not None else 0
self.metrics.update_consumer_lag(
topic=partition.topic,
partition=partition.partition,
group=self.consumer.group_id,
lag=lag
)
time.sleep(10)
Performance Tuning
class KafkaPerformanceTuner:
@staticmethod
def tune_producer(throughput: int = 50, latency_ms: int = 10) -> dict:
return {
'batch.size': 16384,
'linger.ms': latency_ms,
'buffer.memory': 33554432,
'compression.type': 'lz4',
'max.in.flight.requests.per.connection': 5,
'retries': 3,
'acks': 'all',
'max.block.ms': 60000
}
@staticmethod
def tune_consumer(parallelism: int = 10) -> dict:
return {
'fetch.min.bytes': 1,
'fetch.max.wait.ms': 500,
'max.poll.records': 500,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 10000,
'heartbeat.interval.ms': 3000,
'enable.auto.commit': False,
'auto.offset.reset': 'earliest'
}
@staticmethod
def tune_stream_app() -> dict:
return {
'processing.guarantee': 'exactly_once_v2',
'commit.interval.ms': 1000,
'state.dir': '/var/lib/kafka/streams',
'num.stream.threads': 2,
'application.server': ''
}
Best Practices
Producer Best Practices
BEST_PRACTICES = {
"producers": [
"Use idempotent producers for exactly-once semantics",
"Set acks='all' for critical data",
"Implement retry logic with exponential backoff",
"Use compression to reduce network overhead",
"Batch messages for higher throughput",
"Monitor producer metrics: error-rate, queue-time, request-latency"
],
"consumers": [
"Always commit offsets after successful processing",
"Handle rebalances gracefully",
"Use consumer groups for parallel processing",
"Process records in batches for better throughput",
"Monitor consumer lag - should be near zero",
"Implement dead letter queue for failed messages"
],
"topics": [
"Use at least 3 partitions for parallelism",
"Size partitions based on throughput requirements",
"Use compacted topics for key-value data",
"Set appropriate retention policies",
"Monitor partition size and rebalance if needed"
],
"security": [
"Enable SSL/TLS for encryption in transit",
"Use SASL for authentication",
"Implement ACLs for authorization",
"Rotate credentials regularly",
"Enable audit logging"
]
}
Use Cases
Event Sourcing
class EventSourcingStore:
def __init__(self, bootstrap_servers: list):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id='event-sourcing-store',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
def append_event(self, aggregate_id: str, event: dict):
event['aggregate_id'] = aggregate_id
event['timestamp'] = time.time()
self.producer.send(
topic=f'events-{aggregate_id}',
value=event,
key=aggregate_id
)
self.producer.flush()
def get_events(self, aggregate_id: str):
self.consumer.subscribe([f'events-{aggregate_id}'])
events = []
for message in self.consumer:
events.append(message.value)
return events
Change Data Capture
class CDCProcessor:
def __init__(self, bootstrap_servers: list):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def process_debezium_event(self, event: dict):
payload = event.get('payload', {})
if payload.get('op') == 'c':
self._handle_create(payload)
elif payload.get('op') == 'u':
self._handle_update(payload)
elif payload.get('op') == 'd':
self._handle_delete(payload)
def _handle_create(self, payload: dict):
after = payload.get('after', {})
self.producer.send('cdc-events', value=after)
def _handle_update(self, payload: dict):
before = payload.get('before', {})
after = payload.get('after', {})
change = {'before': before, 'after': after}
self.producer.send('cdc-events', value=change)
def _handle_delete(self, payload: dict):
before = payload.get('before', {})
self.producer.send('cdc-events', value={'deleted': before})
Resources
- Apache Kafka Documentation
- Kafka Streams Documentation
- Confluent Kafka Documentation
- Kafka Improvement Proposals
- Kafka Best Practices
Conclusion
Apache Kafka has become the backbone of modern event-driven architectures. Understanding its architecture, proper implementation patterns, and operational considerations is essential for building robust, scalable systems.
This guide covered the fundamentals of topics and partitions, producer and consumer implementation, Kafka Streams for stream processing, schema management, security configurations, and operational best practices. With this knowledge, you’re well-equipped to design and implement enterprise-grade event streaming systems.
Comments