Skip to main content
โšก Calmops

Message Queues: RabbitMQ, Kafka, and Async Communication

Introduction

Message queues enable asynchronous communication between services, providing decoupling, buffering, and reliability. This guide covers RabbitMQ patterns and Kafka streaming for building event-driven systems.

RabbitMQ Patterns

import pika
import json
from typing import Callable, Dict, Any

class RabbitMQPublisher:
    """RabbitMQ message publisher."""
    
    def __init__(self, host: str, queue: str):
        self.host = host
        self.queue = queue
        self.connection = None
        self.channel = None
    
    def connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.host)
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue, durable=True)
    
    def publish(self, message: Dict[str, Any], priority: int = 0):
        self.channel.basic_publish(
            exchange="",
            routing_key=self.queue,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
                priority=priority
            )
        )
    
    def close(self):
        if self.connection:
            self.connection.close()

class RabbitMQConsumer:
    """RabbitMQ message consumer."""
    
    def __init__(self, host: str, queue: str):
        self.host = host
        self.queue = queue
        self.connection = None
        self.channel = None
    
    def connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.host)
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue, durable=True)
    
    def consume(self, callback: Callable, prefetch_count: int = 1):
        self.channel.basic_qos(prefetch_count=prefetch_count)
        self.channel.basic_consume(
            queue=self.queue,
            on_message_callback=callback
        )
        self.channel.start_consuming()
    
    def close(self):
        if self.connection:
            self.connection.close()

# Usage
publisher = RabbitMQPublisher("localhost", "orders")
publisher.connect()
publisher.publish({"order_id": "123", "total": 99.99})

Kafka Streaming

from kafka import KafkaProducer, KafkaConsumer
import json

class KafkaEventProducer:
    """Kafka event producer."""
    
    def __init__(self, bootstrap_servers: str):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode(),
            key_serializer=lambda k: k.encode() if k else None,
            acks="all"
        )
    
    def send(self, topic: str, key: str, value: Dict):
        future = self.producer.send(topic, key=key, value=value)
        record = future.get(timeout=10)
        return record
    
    def close(self):
        self.producer.flush()
        self.producer.close()

class KafkaEventConsumer:
    """Kafka consumer with group management."""
    
    def __init__(self, bootstrap_servers: str, group_id: str, topics: list):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda v: json.loads(v.decode()),
            auto_offset_reset="earliest",
            enable_auto_commit=True
        )
    
    def consume(self, callback: Callable):
        for message in self.consumer:
            callback(
                topic=message.topic,
                partition=message.partition,
                offset=message.offset,
                key=message.key,
                value=message.value
            )
    
    def close(self):
        self.consumer.close()

# Usage
producer = KafkaEventProducer("localhost:9092")
producer.send("order-events", "order-123", {
    "event": "order.created",
    "order_id": "123",
    "total": 99.99
})

consumer = KafkaEventConsumer(
    "localhost:9092",
    "order-processors",
    ["order-events"]
)
consumer.consume(lambda msg: print(f"Received: {msg}"))

Conclusion

Message queues enable async communication and service decoupling. Use RabbitMQ for traditional queuing with complex routing. Use Kafka for high-throughput event streaming. Implement consumer groups for scalability.

Resources

  • RabbitMQ Documentation
  • Kafka Documentation

Comments