Introduction
Message queues are the backbone of distributed systems. They enable asynchronous communication, load leveling, and system decoupling.
This guide covers message queue systems: Apache Kafka, RabbitMQ, and AWS SQS - when to use each, patterns, and implementation.
Comparison
Feature Matrix
| Feature | Kafka | RabbitMQ | SQS |
|---|---|---|---|
| Delivery | At-least-once, exactly-once | At-least-once, exactly-once | At-least-once |
| Ordering | Per partition | Per queue | FIFO option |
| Throughput | Very high | High | Medium |
| Latency | Low | Very low | Medium |
| Storage | Durable log | Memory/disk | Managed |
| Scaling | Partition-based | Clustering | Auto-scaling |
| Complexity | Medium | Medium | Low |
Apache Kafka
Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ KAFKA ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Producers โโโโโโโโโโโโโโโโ Consumers โ
โ โโโโโโโโโ โ Broker โ โโโโโโโโ โ
โ โ โ โ โ โ
โ โโโโโโโโโโโโโโโโถโ Topic โโโโโโโโโโโโโโ โ
โ โ Partitions โ โ
โ โ [0] [1] [2]โ โ
โ โ โ โ
โ โ Replication โ โ
โ โโโโโโโโโโโโโโโโ โ
โ โ
โ ZooKeeper / KRaft - Cluster management โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Python Producer
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send message
future = producer.send(
'user-events',
key='user-123'.encode('utf-8'),
value={
'event': 'user.created',
'user_id': '123',
'email': '[email protected]'
}
)
# Wait for send to complete
record_metadata = future.get(timeout=10)
print(f"Topic: {record_metadata.topic}")
print(f"Partition: {record_metadata.partition}")
print(f"Offset: {record_metadata.offset}")
Python Consumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='user-service-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Topic: {message.topic}")
print(f"Value: {message.value}")
print(f"Partition: {message.partition}")
Streams API
from kafka.streams import KafkaStreams
import json
class WordCountStream(KafkaStreams):
def __init__(self):
super().__init__(
{
'application.id': 'wordcount',
'bootstrap.servers': 'localhost:9092'
}
)
def create_topology(self):
return (
self.stream('text-input')
.flat_map_values(lambda x: x.lower().split())
.group_by(lambda word: word)
.count()
.to_stream()
.to('word-count-output')
)
streams = WordCountStream()
streams.start()
RabbitMQ
Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ RABBITMQ ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โProducer โโโโโโถโ Exchangeโโโโโโถโ Queue โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโฌโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโ โ
โ โโโโโถโConsumer โ โ
โ โ โโโโโโโโโโโ โ
โ Exchange Types: โ
โ โข Direct - Route by routing key โ
โ โข Fanout - Broadcast to all queues โ
โ โข Topic - Pattern matching โ
โ โข Headers - Header attribute matching โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Python Publisher
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare exchange and queue
channel.exchange_declare(exchange='user_events', exchange_type='topic')
channel.queue_declare(queue='user_notifications', durable=True)
# Bind queue to exchange
channel.queue_bind(
exchange='user_events',
queue='user_notifications',
routing_key='user.*'
)
# Publish message
channel.basic_publish(
exchange='user_events',
routing_key='user.created',
body=json.dumps({
'event': 'user.created',
'user_id': '123',
'email': '[email protected]'
}),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json'
)
)
connection.close()
Python Consumer
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Received: {message}")
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='user_notifications',
on_message_callback=callback
)
print('Waiting for messages...')
channel.start_consuming()
AWS SQS
Queue Types
| Type | Description | Use Case |
|---|---|---|
| Standard | High throughput, at-least-once | Best effort ordering |
| FIFO | Guaranteed ordering, exactly-once | Financial transactions |
Python Boto3
import boto3
import json
sqs = boto3.client('sqs')
# Create queue
response = sqs.create_queue(
QueueName='user-events.fifo',
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true'
}
)
queue_url = response['QueueUrl']
# Send message
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
'event': 'user.created',
'user_id': '123'
}),
MessageGroupId='user-events',
MessageDeduplicationId='user-123-created'
)
# Receive message
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=10
)
if 'Messages' in response:
message = response['Messages'][0]
print(message['Body'])
# Delete message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
Message Patterns
Publish/Subscribe
# Kafka pub/sub
producer.send('notifications', {'type': 'email', 'to': '[email protected]'})
# Multiple consumers subscribe
consumer1.subscribe(['notifications'])
consumer2.subscribe(['notifications'])
Request/Reply
# Using correlation ID
def send_request(queue, request):
correlation_id = str(uuid.uuid4())
# Send request
queue.send(
body=request,
reply_to='responses',
correlation_id=correlation_id
)
# Wait for response
response = wait_for_response(correlation_id)
return response
Saga Pattern
# Orchestration-based saga
class OrderSaga:
async def create_order(self, order):
# Step 1: Reserve inventory
await inventory.reserve(order.items)
# Step 2: Process payment
await payment.charge(order.customer, order.total)
# Step 3: Create order
await order.create(order)
return order
# Compensation on failure
async def compensate(order):
await inventory.release(order.items)
await payment.refund(order.customer, order.total)
await order.cancel(order)
Error Handling
Dead Letter Queues
# Kafka: Configure DLQ
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
retries=3
)
# RabbitMQ: Dead letter exchange
channel.queue_declare(
queue='main_queue',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dlq'
}
)
# SQS: Redrive policy
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': 5
})
}
)
Choosing a Message Queue
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ DECISION GUIDE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Need high throughput streaming? โ
โ โโโ Kafka โ
โ โ
โ Need complex routing? โ
โ โโโ RabbitMQ โ
โ โ
โ Need fully managed service? โ
โ โโโ SQS โ
โ โ
โ Need message ordering? โ
โ โโโ FIFO SQS or Kafka โ
โ โ
โ Building event-driven microservices? โ
โ โโโ Kafka โ
โ โ
โ Simple task queues? โ
โ โโโ RabbitMQ or SQS โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Conclusion
Choose based on requirements:
- Kafka: High-throughput event streaming
- RabbitMQ: Complex routing, flexible patterns
- SQS: Fully managed, AWS integration
Comments