Skip to main content
โšก Calmops

Message Brokers: RabbitMQ, Kafka, and Distributed Messaging

Introduction

Message brokers are the backbone of distributed systems, enabling asynchronous communication between services, decoupling producers from consumers, and providing reliability, scalability, and fault tolerance. Whether you’re building microservices, event-driven architectures, or real-time data pipelines, understanding message brokers is essential.

RabbitMQ and Apache Kafka are two of the most popular message brokers, each with distinct strengths and use cases. RabbitMQ excels at traditional message queuing with complex routing, while Kafka provides high-throughput event streaming with durable persistence.

Understanding Message Brokers

Why Use Message Brokers?

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Synchronous vs Asynchronous                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Synchronous:                    Asynchronous:                  โ”‚
โ”‚                                                                 โ”‚
โ”‚  Client โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ Server           Client โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ Queue         โ”‚
โ”‚       โ—€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€                     โ”‚                        โ”‚
โ”‚                                    โ”Œโ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”                  โ”‚
โ”‚                                    โ”‚ Message โ”‚                  โ”‚
โ”‚                                    โ”‚ Broker  โ”‚                  โ”‚
โ”‚                                    โ””โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜                  โ”‚
โ”‚                                        โ”‚                        โ”‚
โ”‚                                        โ–ผ                        โ”‚
โ”‚                                   Worker                        โ”‚
โ”‚                                                                 โ”‚
โ”‚  - Blocking                   - Non-blocking                   โ”‚
โ”‚  - Tight coupling            - Loose coupling                  โ”‚
โ”‚  - Same availability         - Independent scale              โ”‚
โ”‚  - Simple                    - More complex                    โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Key Concepts

Concept Description
Producer Application that sends messages
Consumer Application that receives messages
Queue Message storage for FIFO processing
Topic/Subject Message category for pub/sub
Exchange Routing mechanism (RabbitMQ)
Partition Kafka’s unit of parallelism
Offset Consumer position in partition
Broker Message server instance

RabbitMQ

RabbitMQ implements the AMQP (Advanced Message Queuing Protocol) and provides sophisticated routing capabilities through exchanges, bindings, and queues.

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      RabbitMQ Architecture                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Producer                                                         โ”‚
โ”‚     โ”‚                                                             โ”‚
โ”‚     โ–ผ                                                             โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    Binding    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    Routing    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚  โ”‚ Exchangeโ”‚โ—€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Queue  โ”‚โ—€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Messageโ”‚ โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                โ”‚  Store โ”‚ โ”‚
โ”‚     โ”‚                                                   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚     โ”‚         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                     โ”‚
โ”‚     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ Queue  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ Consumer 1                    โ”‚
โ”‚               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                     โ”‚
โ”‚               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                      โ”‚
โ”‚               โ”‚ Queue  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ Consumer 2                    โ”‚
โ”‚               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                      โ”‚
โ”‚                                                                 โ”‚
โ”‚  Exchange Types:                                                โ”‚
โ”‚  - direct: Exact routing key match                           โ”‚
โ”‚  - fanout: All bound queues                                   โ”‚
โ”‚  - topic:  Pattern matching                                   โ”‚
โ”‚  - headers: Header attribute matching                         โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Python Implementation

import pika
import json
import time
from typing import Callable, Any
import threading

class RabbitMQClient:
    def __init__(self, host: str = 'localhost', port: int = 5672,
                 username: str = 'guest', password: str = 'guest'):
        self.host = host
        self.port = port
        self.credentials = pika.PlainCredentials(username, password)
        self.connection = None
        self.channel = None
    
    def connect(self):
        """Establish connection to RabbitMQ."""
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.host,
                port=self.port,
                credentials=self.credentials,
                heartbeat=600,
                blocked_connection_timeout=300,
            )
        )
        self.channel = self.connection.channel()
        return self
    
    def declare_exchange(self, exchange_name: str, exchange_type: str = 'direct',
                         durable: bool = True):
        """Declare an exchange."""
        self.channel.exchange_declare(
            exchange=exchange_name,
            exchange_type=exchange_type,
            durable=durable
        )
        return self
    
    def declare_queue(self, queue_name: str, durable: bool = True,
                      exclusive: bool = False, auto_delete: bool = False):
        """Declare a queue."""
        self.channel.queue_declare(
            queue=queue_name,
            durable=durable,
            exclusive=exclusive,
            auto_delete=auto_delete
        )
        return self
    
    def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str = ''):
        """Bind queue to exchange."""
        self.channel.queue_bind(
            queue=queue_name,
            exchange=exchange_name,
            routing_key=routing_key
        )
        return self
    
    def publish(self, exchange: str, routing_key: str, message: Any,
                persistent: bool = True):
        """Publish message to exchange."""
        properties = pika.BasicProperties(
            delivery_mode=2 if persistent else 1,  # Persistent
            content_type='application/json',
        )
        
        if isinstance(message, (dict, list)):
            message = json.dumps(message)
        
        self.channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=message,
            properties=properties
        )
        return self
    
    def consume(self, queue_name: str, callback: Callable, auto_ack: bool = False):
        """Start consuming messages."""
        self.channel.basic_qos(prefetch_count=1)
        
        def wrapped_callback(ch, method, properties, body):
            try:
                message = json.loads(body)
                callback(message, ch, method, properties)
                
                if not auto_ack:
                    ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                print(f"Error processing message: {e}")
                if not auto_ack:
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=wrapped_callback,
            auto_ack=auto_ack
        )
        
        print(f'Started consuming from queue: {queue_name}')
        self.channel.start_consuming()
    
    def close(self):
        """Close connection."""
        if self.connection and not self.connection.is_closed:
            self.connection.close()


class TaskQueue:
    """Simple task queue implementation."""
    
    def __init__(self, client: RabbitMQClient):
        self.client = client
        self.exchange = 'tasks'
        self.queue = 'task_queue'
    
    def setup(self):
        """Setup exchange and queue."""
        self.client.declare_exchange(self.exchange, 'direct') \
                   .declare_queue(self.queue, durable=True) \
                   .bind_queue(self.queue, self.exchange, 'task')
    
    def enqueue(self, task: dict):
        """Add task to queue."""
        task['enqueued_at'] = time.time()
        self.client.publish(self.exchange, 'task', task, persistent=True)
    
    def process(self, callback: Callable):
        """Process tasks from queue."""
        def handler(message, ch, method, properties):
            print(f"Processing task: {message}")
            callback(message)
        
        self.client.consume(self.queue, handler)


# Usage
client = RabbitMQClient(host='localhost')
client.connect()

# Setup task queue
task_queue = TaskQueue(client)
task_queue.setup()

# Publish tasks
for i in range(5):
    task_queue.enqueue({'task_id': i, 'type': 'process_data'})

# Process tasks
task_queue.process(lambda task: print(f"Done: {task}"))

RabbitMQ with Python asyncio

import asyncio
import aio_pika
import json
from typing import Callable

class AsyncRabbitMQ:
    def __init__(self, url: str):
        self.url = url
        self.connection = None
        self.channel = None
    
    async def connect(self):
        self.connection = await aio_pika.connect_robust(self.url)
        self.channel = await self.connection.channel()
        return self
    
    async def declare_queue(self, name: str, durable: bool = True):
        return await self.channel.declare_queue(name, durable=durable)
    
    async def publish(self, queue_name: str, message: dict):
        queue = await self.channel.declare_queue(queue_name, durable=True)
        
        await self.channel.default_exchange.publish(
            aio_pika.Message(
                body=json.dumps(message).encode(),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
            ),
            routing_key=queue_name,
        )
    
    async def consume(self, queue_name: str, callback: Callable):
        queue = await self.channel.declare_queue(queue_name, durable=True)
        
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    data = json.loads(message.body)
                    await callback(data)
    
    async def close(self):
        await self.connection.close()


async def main():
    client = await AsyncRabbitMQ('amqp://guest:guest@localhost/').connect()
    
    # Publish
    await client.publish('tasks', {'id': 1, 'action': 'process'})
    
    # Consume
    await client.consume('tasks', lambda msg: print(f"Received: {msg}"))
    
    await asyncio.sleep(10)
    await client.close()

asyncio.run(main())

Apache Kafka

Apache Kafka is a distributed event streaming platform capable of handling trillions of messages per day. It provides durability, horizontal scalability, and exactly-once semantics.

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      Kafka Architecture                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚                    Kafka Cluster                         โ”‚   โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                 โ”‚   โ”‚
โ”‚  โ”‚  โ”‚ Broker 1โ”‚  โ”‚ Broker 2โ”‚  โ”‚ Broker 3โ”‚                 โ”‚   โ”‚
โ”‚  โ”‚  โ”‚(Leader) โ”‚  โ”‚(Follower)โ”‚  โ”‚(Follower)โ”‚                 โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜                 โ”‚   โ”‚
โ”‚  โ”‚       โ”‚            โ”‚            โ”‚                        โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚          โ”‚            โ”‚            โ”‚                              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚  โ”‚                   Topic: orders                              โ”‚ โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”‚ โ”‚
โ”‚  โ”‚  โ”‚Partition 0โ”‚  โ”‚Partition 1โ”‚  โ”‚Partition 2โ”‚               โ”‚ โ”‚
โ”‚  โ”‚  โ”‚ P0:0-999  โ”‚  โ”‚ P1:0-888  โ”‚  โ”‚ P2:0-777  โ”‚               โ”‚ โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚ โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚                                                                 โ”‚
โ”‚  Producer: โ”€โ”€โ–ถ Partition 0 โ”€โ”€โ–ถ Partition 1 โ”€โ”€โ–ถ Partition 2    โ”‚
โ”‚       โ”‚                                                           โ”‚
โ”‚       โ”‚            Consumer Group                                โ”‚
โ”‚       โ–ผ           โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                   โ”‚
โ”‚  Partition 0 โ”€โ”€โ”€โ”€โ–ถโ”‚Consumer1โ”‚                                   โ”‚
โ”‚  Partition 1 โ”€โ”€โ”€โ”€โ–ถโ”‚Consumer2โ”‚  (Each partition assigned        โ”‚
โ”‚  Partition 2 โ”€โ”€โ”€โ”€โ–ถโ”‚Consumer3โ”‚   to one consumer)               โ”‚
โ”‚                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                   โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Python Implementation

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import time
from typing import Callable, Any
from datetime import datetime

class KafkaClient:
    def __init__(self, bootstrap_servers: list[str]):
        self.bootstrap_servers = bootstrap_servers
        self.producer = None
        self.consumer = None
    
    def create_producer(self, value_serializer=None, key_serializer=None,
                       acks='all', retries=3):
        """Create Kafka producer."""
        if value_serializer is None:
            value_serializer = lambda v: json.dumps(v).encode('utf-8')
        if key_serializer is None:
            key_serializer = lambda k: k.encode('utf-8') if k else None
        
        self.producer = KafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=value_serializer,
            key_serializer=key_serializer,
            acks=acks,
            retries=retries,
            max_in_flight_requests_per_connection=5,
            compression_type='gzip',
            linger_ms=10,
            batch_size=16384,
        )
        return self
    
    def send(self, topic: str, value: Any, key: str = None,
             partition: int = None) -> 'FutureRecordMetadata':
        """Send message to topic."""
        future = self.producer.send(
            topic=topic,
            value=value,
            key=key,
            partition=partition,
            timestamp_ms=int(time.time() * 1000)
        )
        
        # Wait for send to complete
        try:
            record_metadata = future.get(timeout=10)
            print(f"Sent to {record_metadata.topic}:{record_metadata.partition} "
                  f"offset {record_metadata.offset}")
            return record_metadata
        except KafkaError as e:
            print(f"Send failed: {e}")
            raise
    
    def send_async(self, topic: str, value: Any, key: str = None,
                   callback: Callable = None):
        """Send message asynchronously."""
        def default_callback(record_metadata, exception):
            if exception:
                print(f"Send failed: {exception}")
            else:
                print(f"Sent to {record_metadata.topic}:{record_metadata.partition}")
        
        self.producer.send(
            topic=topic,
            value=value,
            key=key,
        ).add_callback(callback or default_callback)
    
    def flush(self):
        """Flush pending messages."""
        self.producer.flush()
    
    def close(self):
        """Close producer."""
        if self.producer:
            self.producer.close()


class KafkaConsumerClient:
    def __init__(self, bootstrap_servers: list[str], group_id: str):
        self.bootstrap_servers = bootstrap_servers
        self.group_id = group_id
        self.consumer = None
    
    def create_consumer(self, topics: list[str], value_deserializer=None,
                        auto_offset_reset='earliest', enable_auto_commit=True):
        """Create Kafka consumer."""
        if value_deserializer is None:
            value_deserializer = lambda v: json.loads(v.decode('utf-8'))
        
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=self.bootstrap_servers,
            group_id=self.group_id,
            value_deserializer=value_deserializer,
            auto_offset_reset=auto_offset_reset,
            enable_auto_commit=enable_auto_commit,
            auto_commit_interval_ms=5000,
            consumer_timeout_ms=-1,  # Wait indefinitely
            max_poll_records=500,
        )
        return self
    
    def consume(self, callback: Callable):
        """Start consuming messages."""
        try:
            for message in self.consumer:
                print(f"Topic: {message.topic}, "
                      f"Partition: {message.partition}, "
                      f"Offset: {message.offset}, "
                      f"Key: {message.key}, "
                      f"Value: {message.value}")
                
                callback(message.value, message)
        except KeyboardInterrupt:
            print("Stopped consuming")
        finally:
            self.close()
    
    def consume_batch(self, max_records: int = 100):
        """Consume messages in batches."""
        while True:
            records = self.consumer.poll(timeout_ms=1000, max_records=max_records)
            
            for topic_partition, messages in records.items():
                for message in messages:
                    yield message
    
    def seek_to_beginning(self, topic: str, partition: int = 0):
        """Seek to beginning of partition."""
        self.consumer.seek_to_beginning()
    
    def close(self):
        """Close consumer."""
        if self.consumer:
            self.consumer.close()


# Usage
kafka = KafkaClient(['localhost:9092'])
kafka.create_producer()

# Send messages
for i in range(100):
    kafka.send('orders', {'order_id': i, 'amount': 100 * i}, key=f'order_{i}')

kafka.flush()
kafka.close()

# Consume messages
consumer = KafkaConsumerClient(['localhost:9092'], group_id='order-processor')
consumer.create_consumer(['orders'])

def process_order(order, message):
    print(f"Processing order: {order}")

consumer.consume(process_order)

Kafka Streams

from kafka import KafkaProducer
from kafka.streams import KafkaStreams
import json

class OrderProcessor:
    def __init__(self, app_id: str, bootstrap_servers: list[str]):
        self.app_id = app_id
        self.bootstrap_servers = bootstrap_servers
        self.streams = None
    
    def process(self):
        """Process orders using Kafka Streams."""
        from kafka.streams import KafkaStreams
        from kafka.streams.state import InMemoryKeyValueStore
        
        def transform(key, value):
            order = json.loads(value)
            order['total'] = order['quantity'] * order['price']
            order['processed_at'] = str(datetime.now())
            return key, json.dumps(order).encode()
        
        self.streams = KafkaStreams(
            application_id=self.app_id,
            bootstrap_servers=self.bootstrap_servers,
        )
        
        self.streams.map('orders', transform) \
                    .to('orders-processed')
        
        self.streams.start()
        
        return self
    
    def stop(self):
        if self.streams:
            self.streams.stop()


# Advanced Kafka patterns

class KafkaExactlyOnce:
    """Exactly-once processing with transactions."""
    
    def __init__(self, bootstrap_servers: list[str]):
        self.bootstrap_servers = bootstrap_servers
    
    def create_transactional_producer(self):
        """Create producer with exactly-once semantics."""
        return KafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            enable_idempotence=True,
            acks='all',
            retries=3,
            transaction_id='order-processor',
            transactional_id='order-transaction',
        )
    
    def process_with_transactions(self, input_topic: str, output_topic: str):
        """Process messages with transactional guarantees."""
        consumer = KafkaConsumer(
            input_topic,
            bootstrap_servers=self.bootstrap_servers,
            group_id='transaction-processor',
            auto_offset_reset='earliest',
        )
        
        producer = self.create_transactional_producer()
        producer.init_transactions()
        
        try:
            producer.begin_transaction()
            
            for message in consumer:
                # Process message
                processed = self.process_message(message.value)
                
                # Send to output topic
                producer.send(output_topic, value=processed)
            
            producer.commit_transaction()
            
        except Exception as e:
            producer.abort_transaction()
            raise e
        finally:
            consumer.close()
            producer.close()
    
    def process_message(self, message):
        return message

Comparison: RabbitMQ vs Kafka

Feature RabbitMQ Apache Kafka
Protocol AMQP, STOMP, MQTT Kafka Protocol
Message Ordering Per-queue Per-partition
Message Retention Per-queue (memory/disk) Configurable (days/size)
Delivery Semantics At-least-once, At-most-once At-least-once, Exactly-once
Latency Low (< 1ms) Low (1-5ms)
Throughput Up to 100K msg/s Up to millions msg/s
Message Size Up to 128MB (default 64MB) Up to 10MB (default 1MB)
Scaling Queue partitioning Partition scaling
Use Case Task queues, RPC Event streaming, log aggregation
Complexity Lower Higher

Design Patterns

1. Competing Consumers

# Multiple consumers processing from same queue
class CompetingConsumer:
    def __init__(self, consumer_id: int, queue: str):
        self.consumer_id = consumer_id
        self.queue = queue
    
    def process(self):
        consumer = KafkaConsumer(
            'orders',
            bootstrap_servers=['localhost:9092'],
            group_id='order-processors',  # Same group = competing
            auto_offset_reset='latest',
        )
        
        for message in consumer:
            print(f"Consumer {self.consumer_id} processing: {message.value}")

# Run multiple instances
for i in range(3):
    threading.Thread(target=CompetingConsumer(i, 'orders').process).start()

2. Publish-Subscribe

# RabbitMQ pub/sub with exchanges
class PubSub:
    def __init__(self, exchange: str):
        self.exchange = exchange
        self.client = RabbitMQClient()
        self.client.connect()
        self.client.declare_exchange(exchange, 'fanout')
    
    def publish(self, message: dict):
        self.client.publish(self.exchange, '', message)
    
    def subscribe(self, queue: str, callback: Callable):
        self.client.declare_queue(queue, auto_delete=True) \
                   .bind_queue(queue, self.exchange, '')
        self.client.consume(queue, callback)

3. Request-Reply

class RequestReply:
    def __init__(self):
        self.client = RabbitMQClient()
        self.client.connect()
    
    def request(self, queue: str, message: dict, timeout: int = 30) -> dict:
        reply_queue = self.client.channel.queue_declare(queue='', exclusive=True)
        correlation_id = str(uuid.uuid4())
        
        props = pika.BasicProperties(
            reply_to=reply_queue.method.queue,
            correlation_id=correlation_id,
        )
        
        self.client.channel.basic_publish(
            exchange='',
            routing_key=queue,
            body=json.dumps(message),
            properties=props
        )
        
        # Wait for response
        response = None
        
        def on_response(ch, method, props, body):
            if props.correlation_id == correlation_id:
                nonlocal response
                response = json.loads(body)
                ch.basic_cancel(method.consumer_tag)
        
        self.client.channel.basic_consume(
            queue=reply_queue.method.queue,
            on_message_callback=on_response,
            auto_ack=True
        )
        
        self.client.channel.start_consuming()
        return response

4. Event Sourcing

class EventStore:
    """Store events in Kafka for event sourcing."""
    
    def __init__(self, bootstrap_servers: list[str]):
        self.bootstrap_servers = bootstrap_servers
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
    
    def append_event(self, aggregate_id: str, event_type: str, data: dict):
        """Append event to event stream."""
        event = {
            'aggregate_id': aggregate_id,
            'event_type': event_type,
            'data': data,
            'timestamp': datetime.now().isoformat(),
            'version': self.get_version(aggregate_id) + 1,
        }
        
        self.producer.send(
            'events',
            key=aggregate_id.encode(),
            value=event
        )
    
    def get_version(self, aggregate_id: str) -> int:
        """Get current version of aggregate."""
        # Query latest offset for aggregate
        consumer = KafkaConsumer(
            'events',
            bootstrap_servers=self.bootstrap_servers,
            group_id='version-checker',
            auto_offset_reset='latest',
        )
        return 0  # Simplified

Monitoring and Operations

Key Metrics

Metric RabbitMQ Kafka
Queue Depth queue.messages consumer lag
Connection Count connections connected.consumer.count
Message Rate message_stats.publish messages.in.per.sec
Error Rate channel.chanel_error failed.fetch.requests
Memory Usage memory heap.used
# Prometheus metrics for Kafka
from prometheus_client import Counter, Histogram, start_http_server

orders_processed = Counter('orders_processed_total', 'Total orders processed')
order_processing_time = Histogram('order_processing_seconds', 'Order processing time')

def process_order(order):
    with order_processing_time.time():
        # Process order
        orders_processed.inc()

if __name__ == '__main__':
    start_http_server(8000)
    # Run consumer

Conclusion

Message brokers are essential for building scalable, distributed systems. RabbitMQ provides flexible routing and is ideal for task queues and traditional message processing, while Kafka excels at high-throughput event streaming and log aggregation.

Key takeaways:

  • Choose RabbitMQ for complex routing, low latency, and traditional queuing
  • Choose Kafka for event streaming, log aggregation, and high throughput
  • Implement proper error handling and retry mechanisms
  • Monitor queue depth and consumer lag
  • Consider exactly-once semantics for critical data

Resources

Comments