Introduction
Message queues are the backbone of distributed systems. They enable asynchronous communication, load leveling, and system decoupling.
This guide covers message queue systems: Apache Kafka, RabbitMQ, and AWS SQS - when to use each, patterns, and implementation.
Comparison
Feature Matrix
| Feature | Kafka | RabbitMQ | SQS |
|---|---|---|---|
| Delivery | At-least-once, exactly-once | At-least-once, exactly-once | At-least-once |
| Ordering | Per partition | Per queue | FIFO option |
| Throughput | Very high | High | Medium |
| Latency | Low | Very low | Medium |
| Storage | Durable log | Memory/disk | Managed |
| Scaling | Partition-based | Clustering | Auto-scaling |
| Complexity | Medium | Medium | Low |
Apache Kafka
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ KAFKA ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Producers ┌──────────────┐ Consumers │
│ ───────── │ Broker │ ──────── │
│ │ │ │ │ │
│ └──────────────▶│ Topic │◀───────────┘ │
│ │ Partitions │ │
│ │ [0] [1] [2]│ │
│ │ │ │
│ │ Replication │ │
│ └──────────────┘ │
│ │
│ ZooKeeper / KRaft - Cluster management │
│ │
└─────────────────────────────────────────────────────────────────────┘
Python Producer
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send message
future = producer.send(
'user-events',
key='user-123'.encode('utf-8'),
value={
'event': 'user.created',
'user_id': '123',
'email': '[email protected]'
}
)
# Wait for send to complete
record_metadata = future.get(timeout=10)
print(f"Topic: {record_metadata.topic}")
print(f"Partition: {record_metadata.partition}")
print(f"Offset: {record_metadata.offset}")
Python Consumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='user-service-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Topic: {message.topic}")
print(f"Value: {message.value}")
print(f"Partition: {message.partition}")
Streams API
from kafka.streams import KafkaStreams
import json
class WordCountStream(KafkaStreams):
def __init__(self):
super().__init__(
{
'application.id': 'wordcount',
'bootstrap.servers': 'localhost:9092'
}
)
def create_topology(self):
return (
self.stream('text-input')
.flat_map_values(lambda x: x.lower().split())
.group_by(lambda word: word)
.count()
.to_stream()
.to('word-count-output')
)
streams = WordCountStream()
streams.start()
RabbitMQ
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ RABBITMQ ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Producer │────▶│ Exchange│────▶│ Queue │ │
│ └─────────┘ └─────────┘ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ ┌───▶│Consumer │ │
│ │ └─────────┘ │
│ Exchange Types: │
│ • Direct - Route by routing key │
│ • Fanout - Broadcast to all queues │
│ • Topic - Pattern matching │
│ • Headers - Header attribute matching │
│ │
└─────────────────────────────────────────────────────────────────────┘
Python Publisher
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare exchange and queue
channel.exchange_declare(exchange='user_events', exchange_type='topic')
channel.queue_declare(queue='user_notifications', durable=True)
# Bind queue to exchange
channel.queue_bind(
exchange='user_events',
queue='user_notifications',
routing_key='user.*'
)
# Publish message
channel.basic_publish(
exchange='user_events',
routing_key='user.created',
body=json.dumps({
'event': 'user.created',
'user_id': '123',
'email': '[email protected]'
}),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json'
)
)
connection.close()
Python Consumer
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Received: {message}")
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='user_notifications',
on_message_callback=callback
)
print('Waiting for messages...')
channel.start_consuming()
AWS SQS
Queue Types
| Type | Description | Use Case |
|---|---|---|
| Standard | High throughput, at-least-once | Best effort ordering |
| FIFO | Guaranteed ordering, exactly-once | Financial transactions |
Python Boto3
import boto3
import json
sqs = boto3.client('sqs')
# Create queue
response = sqs.create_queue(
QueueName='user-events.fifo',
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true'
}
)
queue_url = response['QueueUrl']
# Send message
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
'event': 'user.created',
'user_id': '123'
}),
MessageGroupId='user-events',
MessageDeduplicationId='user-123-created'
)
# Receive message
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=10
)
if 'Messages' in response:
message = response['Messages'][0]
print(message['Body'])
# Delete message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
Message Patterns
Publish/Subscribe
# Kafka pub/sub
producer.send('notifications', {'type': 'email', 'to': '[email protected]'})
# Multiple consumers subscribe
consumer1.subscribe(['notifications'])
consumer2.subscribe(['notifications'])
Request/Reply
# Using correlation ID
def send_request(queue, request):
correlation_id = str(uuid.uuid4())
# Send request
queue.send(
body=request,
reply_to='responses',
correlation_id=correlation_id
)
# Wait for response
response = wait_for_response(correlation_id)
return response
Saga Pattern
# Orchestration-based saga
class OrderSaga:
async def create_order(self, order):
# Step 1: Reserve inventory
await inventory.reserve(order.items)
# Step 2: Process payment
await payment.charge(order.customer, order.total)
# Step 3: Create order
await order.create(order)
return order
# Compensation on failure
async def compensate(order):
await inventory.release(order.items)
await payment.refund(order.customer, order.total)
await order.cancel(order)
Error Handling
Dead Letter Queues
# Kafka: Configure DLQ
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
retries=3
)
# RabbitMQ: Dead letter exchange
channel.queue_declare(
queue='main_queue',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dlq'
}
)
# SQS: Redrive policy
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': 5
})
}
)
Choosing a Message Queue
┌─────────────────────────────────────────────────────────────────────┐
│ DECISION GUIDE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Need high throughput streaming? │
│ └── Kafka │
│ │
│ Need complex routing? │
│ └── RabbitMQ │
│ │
│ Need fully managed service? │
│ └── SQS │
│ │
│ Need message ordering? │
│ └── FIFO SQS or Kafka │
│ │
│ Building event-driven microservices? │
│ └── Kafka │
│ │
│ Simple task queues? │
│ └── RabbitMQ or SQS │
│ │
└─────────────────────────────────────────────────────────────────────┘
Conclusion
Choose based on requirements:
- Kafka: High-throughput event streaming
- RabbitMQ: Complex routing, flexible patterns
- SQS: Fully managed, AWS integration
Related Articles
- Message Queues in DevOps
- Event-Driven Architecture
- Microservices Communication
- Event Sourcing and CQRS
Comments