Message queues are the backbone of asynchronous, scalable systems. They decouple producers from consumers, enable resilience, and allow systems to handle bursts of traffic. This guide covers architecture patterns, implementation strategies, and best practices for building message-driven systems.
Why Message Queues Matter
Message queues provide:
- Decoupling: Producers don’t need to know consumers
- Resilience: Messages persist until processed
- Scalability: Consumers can scale independently
- Ordering: Maintain message order within partitions
- Throughput: Handle millions of messages per second
- Backpressure: Buffer during traffic spikes
Kafka Deep Dive
Kafka Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Kafka Cluster โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โBroker 1 โ โBroker 2 โ โBroker 3 โ โ
โ โLeader:P1โ โLeader:P2โ โLeader:P3โ โ
โ โReplica:P3โ โReplica:P1โ โReplica:P2โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ
โ Topic: orders โ
โ Partition 0 โโโถ [msg1, msg4, msg7] โ
โ Partition 1 โโโถ [msg2, msg5, msg8] โ
โ Partition 2 โโโถ [msg3, msg6, msg9] โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
Producer Consumer Consumer
Group A Group B Group C
Producer Implementation
from kafka import KafkaProducer
import json
import logging
producer = KafkaProducer(
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas
retries=3,
max_in_flight_requests_per_connection=5,
compression_type='gzip'
)
# Send messages
def send_order(order_data):
# Use order ID as key for partitioning
future = producer.send(
'orders',
key=order_data['order_id'],
value={
'event': 'order_created',
'order_id': order_data['order_id'],
'customer_id': order_data['customer_id'],
'total': order_data['total'],
'items': order_data['items'],
'timestamp': order_data['created_at']
}
)
# Block until sent (or handle async)
record_metadata = future.get(timeout=10)
logging.info(f"Order {order_data['order_id']} sent to "
f"{record_metadata.topic}:{record_metadata.partition}:"
f"{record_metadata.offset}")
return record_metadata
# Batch sending for better performance
def send_orders_batch(orders):
batch = []
for order in orders:
batch.append(
producer.send(
'orders',
key=order['order_id'],
value=order
)
)
# Wait for all to complete
for future in batch:
future.get(timeout=30)
Consumer Implementation
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
group_id='order-processor',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
max_poll_records=100,
max_poll_interval_ms=300000
)
# Process messages
for message in consumer:
try:
order = message.value
# Process order
process_order(order)
# Commit offset after successful processing
consumer.commit()
print(f"Processed order {order['order_id']} from "
f"partition {message.partition}")
except Exception as e:
print(f"Error processing order: {e}")
# Don't commit - message will be reprocessed
# Or send to DLQ
send_to_dlq(message.value, str(e))
Kafka Streams
from kafka import KafkaConsumer, KafkaProducer
from collections import defaultdict
# Simple streams processing
orders_consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka-1:9092'],
group_id='order-analytics',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Aggregate orders by customer
customer_orders = defaultdict(list)
for message in orders_consumer:
order = message.value
customer_id = order['customer_id']
customer_orders[customer_id].append(order)
# Every 100 orders per customer, send analytics
if len(customer_orders[customer_id]) % 100 == 0:
analytics = {
'customer_id': customer_id,
'order_count': len(customer_orders[customer_id]),
'total_spent': sum(o['total'] for o in customer_orders[customer_id]),
'last_order': order['created_at']
}
producer = KafkaProducer(
bootstrap_servers=['kafka-1:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('customer-analytics', value=analytics)
print(f"Sent analytics for customer {customer_id}")
RabbitMQ Patterns
Exchange Types
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='rabbitmq.local')
)
channel = connection.channel()
# Declare exchange
channel.exchange_declare(
exchange='orders',
exchange_type='topic',
durable=True
)
# Declare queue
channel.queue_declare(queue='order.created', durable=True)
# Bind queue to exchange
channel.queue_bind(
exchange='orders',
queue='order.created',
routing_key='order.created'
)
Publisher Patterns
# Direct publishing
def publish_order_created(order_data):
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=json.dumps(order_data),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
message_id=order_data['order_id']
)
)
# Confirmed publishing
channel.confirm_delivery()
def publish_with_confirm(order_data):
if channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=json.dumps(order_data),
mandatory=True
):
print("Message confirmed")
else:
print("Message failed")
Consumer Patterns
# Fair dispatch - don't overwhelm consumers
channel.basic_qos(prefetch_count=10)
def process_order(ch, method, properties, body):
try:
order = json.loads(body)
process_order(order)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error: {e}")
# Requeue the message
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue='order.created', on_message_callback=process_order)
# Run consumer
channel.start_consuming()
Work Queues
# Task queue for background processing
def process_task(task_data):
# Simulate work
import time
time.sleep(len(task_data['items']) * 0.1)
return f"Processed {len(task_data['items'])} items"
# Producer - submit tasks
def submit_task(task):
channel.queue_declare(queue='tasks', durable=True)
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(task),
properties=pika.BasicProperties(
delivery_mode=2
)
)
# Worker - process tasks
def worker(ch, method, properties, body):
task = json.loads(body)
result = process_task(task)
print(result)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=worker)
AWS SQS Patterns
Standard Queue
import boto3
import json
sqs = boto3.client('sqs')
def send_order_sqs(order_data):
response = sqs.send_message(
QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders',
MessageBody=json.dumps(order_data),
MessageGroupId='orders',
MessageDeduplicationId=order_data['order_id'],
DelaySeconds=0
)
return response['MessageId']
def receive_orders():
response = sqs.receive_message(
QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders',
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=300
)
messages = response.get('Messages', [])
for message in messages:
order = json.loads(message['Body'])
# Process order
process_order(order)
# Delete message
sqs.delete_message(
QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders',
ReceiptHandle=message['ReceiptHandle']
)
return len(messages)
FIFO Queue (Ordering + Deduplication)
# FIFO queue ensures ordering and exactly-once processing
sqs_fifo = boto3.client('sqs')
def send_order_fifo(order_data):
# Use order ID as deduplication ID
response = sqs_fifo.send_message(
QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders.fifo',
MessageBody=json.dumps(order_data),
MessageGroupId='orders', # Messages with same group ID maintain order
MessageDeduplicationId=order_data['order_id'], # Prevent duplicates
)
return response['MessageId']
Event-Driven Architecture
Event Schema
{
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"event_type": "OrderCreated",
"event_version": "1.0",
"timestamp": "2024-01-15T10:30:00Z",
"producer": "orders-service",
"data": {
"order_id": "ORD-12345",
"customer_id": "CUST-67890",
"total": 99.99,
"currency": "USD",
"items": [
{"product_id": "PROD-1", "quantity": 2, "price": 49.99}
]
},
"metadata": {
"correlation_id": "corr-123",
"causation_id": "cmd-456"
}
}
Event Handler Pattern
from abc import ABC, abstractmethod
class EventHandler(ABC):
@abstractmethod
def handle(self, event):
pass
class OrderEventHandler(EventHandler):
def handle(self, event):
if event['event_type'] == 'OrderCreated':
self.handle_order_created(event)
elif event['event_type'] == 'OrderCancelled':
self.handle_order_cancelled(event)
def handle_order_created(self, event):
# Update inventory
update_inventory(event['data']['items'])
# Send confirmation email
send_email(event['data']['customer_id'], 'order_confirmation')
# Notify fulfillment
publish_to_queue('fulfillment', event)
class EventBus:
def __init__(self):
self.handlers = {}
def subscribe(self, event_type, handler):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def publish(self, event):
event_type = event['event_type']
if event_type in self.handlers:
for handler in self.handlers[event_type]:
try:
handler.handle(event)
except Exception as e:
print(f"Handler error: {e}")
# Usage
event_bus = EventBus()
event_bus.subscribe('OrderCreated', OrderEventHandler())
event_bus.subscribe('OrderCreated', AnalyticsEventHandler())
event_bus.publish(order_event)
Saga Pattern
# Orchestrated saga for order processing
class OrderSaga:
def __init__(self):
self.steps = [
self.reserve_inventory,
self.process_payment,
self.create_shipment,
self.send_confirmation
]
def execute(self, order_data):
completed_steps = []
for step in self.steps:
try:
step(order_data)
completed_steps.append(step.__name__)
except Exception as e:
print(f"Step {step.__name__} failed: {e}")
self.compensate(completed_steps, order_data)
raise
return "Order completed successfully"
def compensate(self, completed_steps, order_data):
# Reverse completed steps
for step_name in reversed(completed_steps):
if step_name == 'reserve_inventory':
release_inventory(order_data)
elif step_name == 'process_payment':
refund_payment(order_data)
# ... etc
CQRS with Message Queues
โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ
โ Commands โ โ Queries โ
โ (Writes) โ โ (Reads) โ
โโโโโโโโฌโโโโโโโโ โโโโโโโโฌโโโโโโโโ
โ โ
โผ โผ
โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ
โ Command โ โ Query โ
โ Handler โ โ Handler โ
โโโโโโโโฌโโโโโโโโ โโโโโโโโฌโโโโโโโโ
โ โ
โผ โผ
โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ
โ Write โโโโโโถโ Read โ
โ Database โ โ Database โ
โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ
โ โฒ
โ Event โ
โโโโโโโโโโโโโโโโโโโโโโ
Error Handling and Dead Letters
Dead Letter Queues
# Kafka DLQ
def send_to_dlq(message, error):
producer.send(
'orders.dlq',
value={
'original_message': message,
'error': str(error),
'timestamp': datetime.utcnow().isoformat()
}
)
# Consumer with DLQ
for message in consumer:
try:
process_message(message.value)
consumer.commit()
except PermanentError as e:
# Non-retryable - send to DLQ
send_to_dlq(message.value, str(e))
consumer.commit()
except TemporaryError as e:
# Retryable - will be redelivered
raise
# RabbitMQ DLQ configuration
arguments:
x-dead-letter-exchange: orders.dlx
x-dead-letter-routing-key: orders.dlq
Retry with Backoff
from functools import wraps
import time
def retry_with_backoff(max_retries=3, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Retry {attempt + 1} after {delay}s")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=1)
def process_message(message):
# Your processing logic
pass
Message Queue Comparison
| Feature | Kafka | RabbitMQ | AWS SQS |
|---|---|---|---|
| Ordering | Per partition | Per queue | Per FIFO queue |
| Delivery | At-least-once | At-least-once | At-least-once |
| Exactly-once | With idempotency | Via transactions | FIFO only |
| Throughput | Millions/s | Hundreds/s | Thousands/s |
| Latency | <1ms | <1ms | ~100ms |
| Persistence | Yes (configurable) | Yes | Yes (managed) |
| Scaling | Partition-based | Clustering | Managed |
| Use case | Event streaming | Task queues | Cloud-native |
Monitoring
# Kafka monitoring
- name: kafka
rules:
- alert: ConsumerLagHigh
expr: kafka_consumer_group_lag > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "High consumer lag"
- alert: BrokerDown
expr: up{job="kafka"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka broker down"
Conclusion
Message queues enable scalable, resilient systems:
- Use Kafka for high-throughput event streaming
- Use RabbitMQ for complex routing and task queues
- Use SQS for simple cloud-native queuing
- Implement proper error handling and DLQs
- Monitor consumer lag and message rates
- Design events carefully with versioning
Start with simple queues, add complexity as needed.
External Resources
Related Articles
- Event-Driven Architecture - Lambda + SQS patterns
- Observability - Message queue monitoring
- Kubernetes at Scale - Running Kafka on K8s
Comments