Introduction
Message queues enable asynchronous communication between services, providing decoupling, buffering, and reliability. This guide covers RabbitMQ patterns and Kafka streaming for building event-driven systems.
RabbitMQ Patterns
import pika
import json
from typing import Callable, Dict, Any
class RabbitMQPublisher:
"""RabbitMQ message publisher."""
def __init__(self, host: str, queue: str):
self.host = host
self.queue = queue
self.connection = None
self.channel = None
def connect(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(self.host)
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue, durable=True)
def publish(self, message: Dict[str, Any], priority: int = 0):
self.channel.basic_publish(
exchange="",
routing_key=self.queue,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
priority=priority
)
)
def close(self):
if self.connection:
self.connection.close()
class RabbitMQConsumer:
"""RabbitMQ message consumer."""
def __init__(self, host: str, queue: str):
self.host = host
self.queue = queue
self.connection = None
self.channel = None
def connect(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(self.host)
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue, durable=True)
def consume(self, callback: Callable, prefetch_count: int = 1):
self.channel.basic_qos(prefetch_count=prefetch_count)
self.channel.basic_consume(
queue=self.queue,
on_message_callback=callback
)
self.channel.start_consuming()
def close(self):
if self.connection:
self.connection.close()
# Usage
publisher = RabbitMQPublisher("localhost", "orders")
publisher.connect()
publisher.publish({"order_id": "123", "total": 99.99})
Kafka Streaming
from kafka import KafkaProducer, KafkaConsumer
import json
class KafkaEventProducer:
"""Kafka event producer."""
def __init__(self, bootstrap_servers: str):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode() if k else None,
acks="all"
)
def send(self, topic: str, key: str, value: Dict):
future = self.producer.send(topic, key=key, value=value)
record = future.get(timeout=10)
return record
def close(self):
self.producer.flush()
self.producer.close()
class KafkaEventConsumer:
"""Kafka consumer with group management."""
def __init__(self, bootstrap_servers: str, group_id: str, topics: list):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset="earliest",
enable_auto_commit=True
)
def consume(self, callback: Callable):
for message in self.consumer:
callback(
topic=message.topic,
partition=message.partition,
offset=message.offset,
key=message.key,
value=message.value
)
def close(self):
self.consumer.close()
# Usage
producer = KafkaEventProducer("localhost:9092")
producer.send("order-events", "order-123", {
"event": "order.created",
"order_id": "123",
"total": 99.99
})
consumer = KafkaEventConsumer(
"localhost:9092",
"order-processors",
["order-events"]
)
consumer.consume(lambda msg: print(f"Received: {msg}"))
Conclusion
Message queues enable async communication and service decoupling. Use RabbitMQ for traditional queuing with complex routing. Use Kafka for high-throughput event streaming. Implement consumer groups for scalability.
Resources
- RabbitMQ Documentation
- Kafka Documentation
Comments