Introduction
Message queues are the backbone of asynchronous communication in modern distributed systems. They enable services to communicate without waiting, handle load spikes gracefully, and build resilient architectures. This guide covers message queue concepts, implementation patterns, and choosing the right tool.
Message queues decouple producers from consumers, enabling temporal independence and building systems that can handle failure gracefully.
Core Concepts
Message Queue Patterns
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Point-to-Point Pattern โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Producer โโโโโโโบ Queue โโโโโโโบ Consumer โ
โ โ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโ acknowledgment โโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Publish-Subscribe Pattern โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Publisher โโโบ Topic โโโฌโโโบ Subscriber A โ
โ โโโโบ Subscriber B โ
โ โโโโบ Subscriber C โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Message Structure
# Well-structured message
message = {
"message_id": "msg_123_abc",
"correlation_id": "corr_456", # For tracing
"causation_id": "cmd_789", # What caused this
"timestamp": "2026-03-12T10:30:00Z",
"message_type": "OrderCreated",
"version": "1.0",
"source": "order-service",
"payload": {
"order_id": "ord_123",
"customer_id": "cust_456",
"items": [...],
"total": 199.99
},
"metadata": {
"trace_id": "trace_abc",
"user_id": "user_789"
}
}
RabbitMQ
Installation and Configuration
# docker-compose.yml
version: '3'
services:
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
volumes:
- rabbitmq_data:/var/lib/rabbitmq
Publishing Messages
import pika
import json
class RabbitMQPublisher:
def __init__(self, host="localhost", username="guest", password="guest"):
credentials = pika.PlainCredentials(username, password)
params = pika.ConnectionParameters(
host=host,
credentials=credentials
)
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
def publish(self, exchange, routing_key, message, persistent=True):
"""Publish message to queue."""
properties = pika.BasicProperties(
delivery_mode=2 if persistent else 1, # Persistent
content_type="application/json",
message_id=message.get("message_id"),
correlation_id=message.get("correlation_id"),
timestamp=int(message.get("timestamp", 0))
)
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=properties
)
def close(self):
self.connection.close()
# Usage
publisher = RabbitMQPublisher()
publisher.publish(
exchange="orders",
routing_key="order.created",
message={
"message_id": "msg_123",
"order_id": "ord_456",
"total": 99.99
}
)
Consuming Messages
import pika
import json
class RabbitMQConsumer:
def __init__(self, queue_name, callback):
self.queue_name = queue_name
self.callback = callback
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
self.channel = self.connection.channel()
# Declare queue
self.channel.queue_declare(queue=queue_name, durable=True)
# Set QoS
self.channel.basic_qos(prefetch_count=1)
def start_consuming(self):
"""Start consuming messages."""
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self._process_message,
auto_ack=False
)
print(f"Started consuming from {self.queue_name}")
self.channel.start_consuming()
def _process_message(self, channel, method, properties, body):
"""Process incoming message."""
try:
message = json.loads(body)
# Process message
self.callback(message)
# Acknowledge
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Reject and requeue
print(f"Error processing message: {e}")
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
# Usage
def handle_order_created(message):
print(f"Processing order: {message['order_id']}")
# Process order...
consumer = RabbitMQConsumer("orders.created", handle_order_created)
consumer.start_consuming()
Exchanges and Bindings
# Declare exchanges
channel.exchange_declare(
exchange="orders",
exchange_type="topic",
durable=True
)
# Bind queue to exchange
channel.queue_bind(
queue="orders.created",
exchange="orders",
routing_key="order.*"
)
Apache Kafka
Setup
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Kafka Producer
from kafka import KafkaProducer
import json
class KafkaPublisher:
def __init__(self, bootstrap_servers=["localhost:9092"]):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks="all", # Wait for all replicas
retries=3,
max_in_flight_requests_per_connection=1 # Preserve order
)
def publish(self, topic, message, key=None):
"""Publish message to topic."""
future = self.producer.send(
topic,
key=key,
value=message
)
# Wait for acknowledgment
record_metadata = future.get(timeout=10)
return {
"topic": record_metadata.topic,
"partition": record_metadata.partition,
"offset": record_metadata.offset
}
def close(self):
self.producer.flush()
self.producer.close()
# Usage
publisher = KafkaPublisher()
publisher.publish(
topic="orders",
key="order_123", # For partitioning
message={
"event_type": "OrderCreated",
"order_id": "order_123",
"total": 199.99
}
)
Kafka Consumer
from kafka import KafkaConsumer
import json
class KafkaConsumer:
def __init__(self, topics, group_id, bootstrap_servers=["localhost:9092"]):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None,
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=5000
)
def consume(self, callback):
"""Consume messages."""
for message in self.consumer:
try:
callback(message.value, message.key, message.partition)
except Exception as e:
print(f"Error processing: {e}")
def close(self):
self.consumer.close()
# Usage
def process_order(message, key, partition):
print(f"Processing order: {message['order_id']}")
consumer = KafkaConsumer(
topics=["orders"],
group_id="order-processor"
)
consumer.consume(process_order)
Reliability Patterns
Dead Letter Queues
# Configure DLQ
channel.queue_declare(
queue="orders",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "orders.dead"
}
)
# Dead letter exchange
channel.exchange_declare(
exchange="dlx",
exchange_type="direct"
)
channel.queue_declare(queue="orders.dead")
channel.queue_bind(
queue="orders.dead",
exchange="dlx",
routing_key="orders.dead"
)
Idempotency
class IdempotentConsumer:
def __init__(self, redis_client):
self.redis = redis_client
def process(self, message):
message_id = message["message_id"]
# Check if already processed
if self.redis.exists(f"processed:{message_id}"):
print(f"Message {message_id} already processed, skipping")
return
# Process message
self._do_process(message)
# Mark as processed
self.redis.setex(f"processed:{message_id}", 86400, "1") # 24h TTL
def _do_process(self, message):
pass
Choosing Between RabbitMQ and Kafka
| Aspect | RabbitMQ | Kafka |
|---|---|---|
| Use Case | General messaging | Event streaming |
| Ordering | Per-queue | Per-partition |
| Throughput | Medium | Very high |
| Latency | Low | Medium |
| Storage | Short-term | Long-term |
| Complexity | Lower | Higher |
Conclusion
Message queues enable building resilient, scalable distributed systems. RabbitMQ excels at traditional message queuing, while Kafka is ideal for high-volume event streaming. Choose based on your throughput needs and use case.
Comments