Introduction
Message queues enable asynchronous communication between services, providing decoupling, buffering, and reliability. They’re fundamental to building scalable, resilient distributed systems where services communicate without blocking or requiring immediate responses.
Message queues solve several critical problems:
- Decoupling: Services don’t need to know about each other’s implementation
- Load Leveling: Absorb traffic spikes without overwhelming downstream services
- Reliability: Messages persist even if consumers are temporarily unavailable
- Scalability: Add more consumers to process messages in parallel
- Ordering: Guarantee message processing order when needed
This guide covers two dominant message queue technologies—RabbitMQ for traditional message queuing and Kafka for high-throughput event streaming—along with practical patterns for building event-driven architectures.
Message Queue Fundamentals
Core Concepts
Producer: Service that sends messages to the queue Consumer: Service that receives and processes messages from the queue Queue: Buffer that stores messages between producers and consumers Exchange: Routes messages to queues based on rules (RabbitMQ concept) Topic: Category of messages (Kafka concept) Partition: Ordered, immutable sequence of messages (Kafka concept)
When to Use Message Queues
Use message queues when:
- Processing can happen asynchronously (email sending, report generation)
- You need to decouple services for independent scaling
- Traffic is bursty and you need load leveling
- You require guaranteed delivery and retry logic
- Multiple consumers need to process the same events
Don’t use message queues when:
- You need immediate synchronous responses
- Message ordering across all messages is critical (use single partition or queue)
- Latency requirements are sub-millisecond
- The overhead of queue infrastructure isn’t justified
RabbitMQ: Traditional Message Broker
RabbitMQ is a mature message broker that implements AMQP (Advanced Message Queuing Protocol). It excels at complex routing, message acknowledgments, and traditional queue patterns.
Basic Producer and Consumer
import pika
import json
from typing import Callable, Dict, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RabbitMQPublisher:
"""RabbitMQ message publisher with connection management."""
def __init__(self, host: str, queue: str, port: int = 5672):
self.host = host
self.port = port
self.queue = queue
self.connection = None
self.channel = None
def connect(self):
"""Establish connection and declare queue."""
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
port=self.port,
heartbeat=600,
blocked_connection_timeout=300
)
)
self.channel = self.connection.channel()
# Declare durable queue (survives broker restart)
self.channel.queue_declare(
queue=self.queue,
durable=True,
arguments={
'x-max-length': 10000, # Max queue size
'x-message-ttl': 86400000 # 24 hour TTL
}
)
logger.info(f"Connected to RabbitMQ at {self.host}:{self.port}")
def publish(self, message: Dict[str, Any], priority: int = 0,
expiration: str = None):
"""Publish message with optional priority and expiration."""
if not self.channel:
self.connect()
properties = pika.BasicProperties(
delivery_mode=2, # Persistent message
priority=priority,
content_type='application/json',
expiration=expiration # Message TTL in milliseconds
)
self.channel.basic_publish(
exchange='',
routing_key=self.queue,
body=json.dumps(message),
properties=properties
)
logger.info(f"Published message to {self.queue}: {message}")
def close(self):
"""Close connection gracefully."""
if self.connection and not self.connection.is_closed:
self.connection.close()
logger.info("Connection closed")
class RabbitMQConsumer:
"""RabbitMQ message consumer with acknowledgments."""
def __init__(self, host: str, queue: str, port: int = 5672):
self.host = host
self.port = port
self.queue = queue
self.connection = None
self.channel = None
def connect(self):
"""Establish connection and declare queue."""
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
port=self.port,
heartbeat=600
)
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue, durable=True)
logger.info(f"Consumer connected to {self.queue}")
def consume(self, callback: Callable, prefetch_count: int = 1):
"""
Consume messages with manual acknowledgment.
prefetch_count: Number of unacked messages per consumer.
"""
if not self.channel:
self.connect()
# QoS: Limit unacknowledged messages
self.channel.basic_qos(prefetch_count=prefetch_count)
def on_message(ch, method, properties, body):
try:
message = json.loads(body)
logger.info(f"Received message: {message}")
# Process message
callback(message)
# Acknowledge successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Message acknowledged: {method.delivery_tag}")
except Exception as e:
logger.error(f"Error processing message: {e}")
# Reject and requeue message
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
self.channel.basic_consume(
queue=self.queue,
on_message_callback=on_message,
auto_ack=False # Manual acknowledgment
)
logger.info(f"Waiting for messages on {self.queue}...")
self.channel.start_consuming()
def close(self):
"""Close connection gracefully."""
if self.connection and not self.connection.is_closed:
self.connection.close()
# Usage example
def process_order(message: Dict[str, Any]):
"""Process order message."""
order_id = message.get('order_id')
print(f"Processing order {order_id}")
# Business logic here
# Producer
publisher = RabbitMQPublisher("localhost", "orders")
publisher.connect()
publisher.publish({
"order_id": "123",
"customer_id": "456",
"total": 99.99,
"items": [{"sku": "ABC", "qty": 2}]
}, priority=5)
publisher.close()
# Consumer
consumer = RabbitMQConsumer("localhost", "orders")
consumer.consume(process_order, prefetch_count=10)
RabbitMQ Exchange Patterns
RabbitMQ uses exchanges to route messages to queues. Four exchange types provide different routing behaviors.
Direct Exchange (Exact Routing Key Match)
import pika
# Publisher with direct exchange
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare direct exchange
channel.exchange_declare(exchange='logs_direct', exchange_type='direct', durable=True)
# Publish with routing key
severities = ['info', 'warning', 'error']
for severity in severities:
message = f"Log message with severity: {severity}"
channel.basic_publish(
exchange='logs_direct',
routing_key=severity,
body=message
)
print(f"Sent {severity}: {message}")
connection.close()
# Consumer binds queue to specific routing keys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct', exchange_type='direct', durable=True)
# Create queue and bind to 'error' and 'warning' only
result = channel.queue_declare(queue='critical_logs', durable=True)
channel.queue_bind(exchange='logs_direct', queue='critical_logs', routing_key='error')
channel.queue_bind(exchange='logs_direct', queue='critical_logs', routing_key='warning')
print("Waiting for critical logs...")
channel.basic_consume(queue='critical_logs', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Topic Exchange (Pattern Matching)
# Topic exchange with wildcard routing
channel.exchange_declare(exchange='logs_topic', exchange_type='topic', durable=True)
# Publish with hierarchical routing keys
routing_keys = [
'app.error.database',
'app.warning.cache',
'app.info.startup',
'system.error.disk',
'system.warning.memory'
]
for routing_key in routing_keys:
channel.basic_publish(
exchange='logs_topic',
routing_key=routing_key,
body=f"Message for {routing_key}"
)
# Consumer with pattern matching
# * matches exactly one word
# # matches zero or more words
# Queue 1: All errors
channel.queue_bind(exchange='logs_topic', queue='all_errors', routing_key='*.error.*')
# Queue 2: All app logs
channel.queue_bind(exchange='logs_topic', queue='app_logs', routing_key='app.#')
# Queue 3: All warnings and errors
channel.queue_bind(exchange='logs_topic', queue='important', routing_key='*.warning.*')
channel.queue_bind(exchange='logs_topic', queue='important', routing_key='*.error.*')
Fanout Exchange (Broadcast)
# Fanout exchange broadcasts to all bound queues
channel.exchange_declare(exchange='notifications', exchange_type='fanout', durable=True)
# Publish once
channel.basic_publish(
exchange='notifications',
routing_key='', # Ignored for fanout
body=json.dumps({
"type": "user.registered",
"user_id": "123",
"email": "[email protected]"
})
)
# Multiple consumers receive the same message
# Consumer 1: Email service
channel.queue_declare(queue='email_queue')
channel.queue_bind(exchange='notifications', queue='email_queue')
# Consumer 2: Analytics service
channel.queue_declare(queue='analytics_queue')
channel.queue_bind(exchange='notifications', queue='analytics_queue')
# Consumer 3: Audit log service
channel.queue_declare(queue='audit_queue')
channel.queue_bind(exchange='notifications', queue='audit_queue')
Dead Letter Queue (DLQ) Pattern
# Configure queue with dead letter exchange
channel.queue_declare(
queue='orders',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'orders.failed',
'x-message-ttl': 60000, # 60 seconds
'x-max-retries': 3
}
)
# Declare dead letter exchange and queue
channel.exchange_declare(exchange='dlx', exchange_type='direct', durable=True)
channel.queue_declare(queue='orders.dlq', durable=True)
channel.queue_bind(exchange='dlx', queue='orders.dlq', routing_key='orders.failed')
# Consumer with retry logic
def process_with_retry(ch, method, properties, body):
try:
message = json.loads(body)
# Process message
process_order(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Get retry count from headers
headers = properties.headers or {}
retry_count = headers.get('x-retry-count', 0)
if retry_count < 3:
# Requeue with incremented retry count
headers['x-retry-count'] = retry_count + 1
ch.basic_publish(
exchange='',
routing_key='orders',
body=body,
properties=pika.BasicProperties(
headers=headers,
delivery_mode=2
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Requeued message, retry {retry_count + 1}")
else:
# Max retries exceeded, send to DLQ
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
logger.error(f"Message failed after {retry_count} retries, sent to DLQ")
Apache Kafka: Distributed Event Streaming
Kafka is a distributed streaming platform designed for high-throughput, fault-tolerant event streaming. Unlike traditional message queues, Kafka retains messages for a configurable period, enabling replay and multiple consumer groups.
Kafka Core Concepts
Topic: Category of messages (like a table in a database) Partition: Ordered, immutable log within a topic (enables parallelism) Offset: Unique identifier for each message within a partition Consumer Group: Set of consumers that coordinate to consume a topic Broker: Kafka server that stores and serves messages Replication Factor: Number of copies of each partition across brokers
Kafka Producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class KafkaEventProducer:
"""Production-ready Kafka producer with error handling."""
def __init__(self, bootstrap_servers: str, client_id: str = 'producer'):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id=client_id,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas
retries=3,
max_in_flight_requests_per_connection=5,
compression_type='gzip',
linger_ms=10, # Batch messages for 10ms
batch_size=16384 # 16KB batch size
)
logger.info(f"Kafka producer initialized: {bootstrap_servers}")
def send(self, topic: str, key: str, value: dict,
headers: list = None) -> bool:
"""
Send message to Kafka topic.
Returns True if successful, False otherwise.
"""
try:
future = self.producer.send(
topic,
key=key,
value=value,
headers=headers or []
)
# Block for 'synchronous' send (optional)
record_metadata = future.get(timeout=10)
logger.info(
f"Message sent to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}"
)
return True
except KafkaError as e:
logger.error(f"Failed to send message: {e}")
return False
def send_async(self, topic: str, key: str, value: dict):
"""Send message asynchronously with callback."""
def on_success(record_metadata):
logger.info(
f"Message delivered to {record_metadata.topic} "
f"[{record_metadata.partition}] @ {record_metadata.offset}"
)
def on_error(exception):
logger.error(f"Message delivery failed: {exception}")
self.producer.send(topic, key=key, value=value).add_callback(
on_success
).add_errback(on_error)
def close(self):
"""Flush and close producer."""
self.producer.flush()
self.producer.close()
logger.info("Producer closed")
# Usage
producer = KafkaEventProducer("localhost:9092", client_id="order-service")
# Send order event
producer.send(
topic="order-events",
key="order-123",
value={
"event_type": "order.created",
"order_id": "123",
"customer_id": "456",
"total": 99.99,
"timestamp": "2026-05-05T10:30:00Z"
},
headers=[
("event_version", b"v1"),
("source", b"order-service")
]
)
producer.close()
Kafka Consumer with Consumer Groups
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
logger = logging.getLogger(__name__)
class KafkaEventConsumer:
"""Kafka consumer with consumer group coordination."""
def __init__(self, bootstrap_servers: str, group_id: str,
topics: list, client_id: str = 'consumer'):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
client_id=client_id,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest', # Start from beginning if no offset
enable_auto_commit=False, # Manual commit for reliability
max_poll_records=500,
max_poll_interval_ms=300000, # 5 minutes
session_timeout_ms=10000,
heartbeat_interval_ms=3000
)
logger.info(f"Consumer joined group {group_id} for topics {topics}")
def consume(self, callback):
"""
Consume messages with manual offset commit.
callback: Function that processes the message.
"""
try:
for message in self.consumer:
try:
logger.info(
f"Received message from {message.topic} "
f"[{message.partition}] @ {message.offset}"
)
# Process message
callback(
topic=message.topic,
partition=message.partition,
offset=message.offset,
key=message.key,
value=message.value,
headers=dict(message.headers) if message.headers else {}
)
# Commit offset after successful processing
self.consumer.commit()
except Exception as e:
logger.error(f"Error processing message: {e}")
# Don't commit offset, message will be reprocessed
except KeyboardInterrupt:
logger.info("Consumer interrupted")
finally:
self.close()
def consume_batch(self, callback, batch_size: int = 100):
"""Consume messages in batches for better throughput."""
try:
while True:
messages = self.consumer.poll(timeout_ms=1000, max_records=batch_size)
if not messages:
continue
for topic_partition, records in messages.items():
try:
# Process batch
batch = [
{
'key': msg.key,
'value': msg.value,
'offset': msg.offset,
'partition': msg.partition
}
for msg in records
]
callback(batch)
# Commit batch
self.consumer.commit()
logger.info(f"Processed batch of {len(records)} messages")
except Exception as e:
logger.error(f"Error processing batch: {e}")
except KeyboardInterrupt:
logger.info("Consumer interrupted")
finally:
self.close()
def seek_to_beginning(self):
"""Reset consumer to beginning of all partitions."""
self.consumer.seek_to_beginning()
def seek_to_end(self):
"""Skip to end of all partitions."""
self.consumer.seek_to_end()
def close(self):
"""Close consumer gracefully."""
self.consumer.close()
logger.info("Consumer closed")
# Usage
def process_order_event(topic, partition, offset, key, value, headers):
"""Process individual order event."""
event_type = value.get('event_type')
order_id = value.get('order_id')
if event_type == 'order.created':
print(f"New order created: {order_id}")
elif event_type == 'order.completed':
print(f"Order completed: {order_id}")
consumer = KafkaEventConsumer(
bootstrap_servers="localhost:9092",
group_id="order-processors",
topics=["order-events"],
client_id="processor-1"
)
consumer.consume(process_order_event)
Kafka Partitioning Strategy
from kafka import KafkaProducer
from kafka.partitioner import Murmur2Partitioner
# Custom partitioner for specific routing logic
class CustomPartitioner:
"""Route messages based on custom logic."""
def __call__(self, key, all_partitions, available_partitions):
"""
Partition based on key hash.
key: Message key (bytes)
all_partitions: List of all partition IDs
available_partitions: List of available partition IDs
"""
if key is None:
# Round-robin for messages without keys
return available_partitions[0] if available_partitions else all_partitions[0]
# Hash key to partition
key_hash = hash(key)
return all_partitions[key_hash % len(all_partitions)]
# Producer with custom partitioner
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
partitioner=CustomPartitioner(),
value_serializer=lambda v: json.dumps(v).encode()
)
# Messages with same key go to same partition (ordering guaranteed)
for i in range(10):
producer.send(
'user-events',
key=f'user-{i % 3}'.encode(), # 3 users
value={'event': f'action-{i}'}
)
# user-0 messages -> partition X
# user-1 messages -> partition Y
# user-2 messages -> partition Z
Kafka Consumer Rebalancing
from kafka import KafkaConsumer
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
# Consumer with rebalance listener
class RebalanceListener:
"""Handle partition assignment changes."""
def on_partitions_revoked(self, revoked):
"""Called before rebalance starts."""
print(f"Partitions revoked: {revoked}")
# Commit offsets, cleanup resources
def on_partitions_assigned(self, assigned):
"""Called after rebalance completes."""
print(f"Partitions assigned: {assigned}")
# Initialize resources for new partitions
consumer = KafkaConsumer(
'order-events',
bootstrap_servers='localhost:9092',
group_id='order-processors',
partition_assignment_strategy=[
RoundRobinPartitionAssignor # Distribute partitions evenly
]
)
# Subscribe with rebalance listener
consumer.subscribe(['order-events'], listener=RebalanceListener())
Kafka Streams Processing
# Stateful stream processing with windowing
from kafka import KafkaConsumer, KafkaProducer
from collections import defaultdict
from datetime import datetime, timedelta
import time
class StreamProcessor:
"""Process Kafka streams with windowing."""
def __init__(self, input_topic: str, output_topic: str):
self.consumer = KafkaConsumer(
input_topic,
bootstrap_servers='localhost:9092',
group_id='stream-processor',
value_deserializer=lambda v: json.loads(v.decode())
)
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
self.output_topic = output_topic
self.window_size = timedelta(minutes=5)
self.windows = defaultdict(list)
def process_windowed(self):
"""Aggregate events in 5-minute windows."""
for message in self.consumer:
event = message.value
timestamp = datetime.fromisoformat(event['timestamp'])
# Determine window
window_start = timestamp.replace(
minute=(timestamp.minute // 5) * 5,
second=0,
microsecond=0
)
# Add to window
self.windows[window_start].append(event)
# Check if window is complete
if datetime.now() - window_start > self.window_size:
self.emit_window(window_start)
def emit_window(self, window_start):
"""Emit aggregated window results."""
events = self.windows.pop(window_start, [])
if not events:
return
# Aggregate
result = {
'window_start': window_start.isoformat(),
'window_end': (window_start + self.window_size).isoformat(),
'event_count': len(events),
'total_value': sum(e.get('value', 0) for e in events)
}
# Emit to output topic
self.producer.send(self.output_topic, value=result)
print(f"Emitted window: {result}")
RabbitMQ vs Kafka Comparison
| Feature | RabbitMQ | Kafka |
|---|---|---|
| Model | Message broker (queue) | Distributed log (stream) |
| Message Retention | Deleted after consumption | Retained for configured period |
| Ordering | Per queue | Per partition |
| Throughput | ~20K msg/sec | ~1M msg/sec |
| Latency | Low (sub-ms) | Higher (ms) |
| Routing | Complex (exchanges, bindings) | Simple (topics, partitions) |
| Replay | No (messages deleted) | Yes (seek to offset) |
| Consumer Groups | Competing consumers | Coordinated groups |
| Use Case | Task queues, RPC, routing | Event streaming, logs, metrics |
| Persistence | Optional | Always |
| Scalability | Vertical (clustering) | Horizontal (partitions) |
Message Queue Patterns
Work Queue Pattern
# Distribute tasks among multiple workers
# RabbitMQ example
# Producer
for i in range(100):
task = {"task_id": i, "data": f"task-{i}"}
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(task)
)
# Multiple workers consume from same queue
# Worker 1, 2, 3... each get different tasks
def worker(worker_id):
def callback(ch, method, properties, body):
task = json.loads(body)
print(f"Worker {worker_id} processing {task['task_id']}")
time.sleep(1) # Simulate work
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()
Pub/Sub Pattern
# Kafka pub/sub with multiple consumer groups
# Each group gets all messages
# Producer publishes once
producer.send('notifications', value={'message': 'New user registered'})
# Consumer Group 1: Email service
email_consumer = KafkaConsumer(
'notifications',
group_id='email-service',
bootstrap_servers='localhost:9092'
)
# Consumer Group 2: SMS service
sms_consumer = KafkaConsumer(
'notifications',
group_id='sms-service',
bootstrap_servers='localhost:9092'
)
# Consumer Group 3: Push notification service
push_consumer = KafkaConsumer(
'notifications',
group_id='push-service',
bootstrap_servers='localhost:9092'
)
# All three groups receive the same message
Request/Reply Pattern
# RabbitMQ RPC pattern
import uuid
class RPCClient:
"""RPC client using RabbitMQ."""
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
# Declare callback queue
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id
),
body=str(n)
)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
# RPC Server
def on_request(ch, method, props, body):
n = int(body)
response = n * n # Square the number
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id
),
body=str(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
Best Practices
RabbitMQ Best Practices
- Use durable queues and persistent messages for reliability
- Enable manual acknowledgments to prevent message loss
- Set prefetch count to control consumer load
- Implement dead letter queues for failed messages
- Use appropriate exchange types for routing needs
- Monitor queue depth to detect consumer lag
- Set message TTL to prevent queue buildup
- Use connection pooling in high-throughput scenarios
Kafka Best Practices
- Choose partition count carefully — affects parallelism and ordering
- Use keys for ordering — messages with same key go to same partition
- Set appropriate retention — balance storage cost and replay needs
- Monitor consumer lag — indicates processing bottlenecks
- Use consumer groups for scalability
- Enable compression (gzip, snappy) to reduce network usage
- Batch messages for better throughput
- Set replication factor ≥ 3 for production
- Use idempotent producers to prevent duplicates
- Commit offsets after processing to ensure at-least-once delivery
Monitoring and Observability
Key Metrics to Monitor
RabbitMQ:
- Queue depth (messages ready)
- Consumer count
- Message rate (publish/deliver)
- Unacknowledged messages
- Connection count
- Memory usage
Kafka:
- Consumer lag (offset difference)
- Throughput (bytes in/out per second)
- Partition count and distribution
- Under-replicated partitions
- Request latency (produce/fetch)
- Disk usage per broker
Prometheus Metrics Example
from prometheus_client import Counter, Histogram, Gauge
# Kafka metrics
messages_produced = Counter(
'kafka_messages_produced_total',
'Total messages produced',
['topic']
)
messages_consumed = Counter(
'kafka_messages_consumed_total',
'Total messages consumed',
['topic', 'consumer_group']
)
consumer_lag = Gauge(
'kafka_consumer_lag',
'Consumer lag in messages',
['topic', 'partition', 'consumer_group']
)
processing_duration = Histogram(
'message_processing_duration_seconds',
'Time spent processing messages',
['topic']
)
# Usage in consumer
with processing_duration.labels(topic='orders').time():
process_message(message)
messages_consumed.labels(topic='orders', consumer_group='processors').inc()
Conclusion
Message queues enable asynchronous communication and service decoupling in distributed systems. Use RabbitMQ for traditional queuing with complex routing, manual acknowledgments, and low-latency requirements. Use Kafka for high-throughput event streaming, message replay, and log aggregation. Implement consumer groups for horizontal scalability, dead letter queues for error handling, and monitoring for operational visibility. Choose partitioning strategies carefully to balance ordering guarantees with parallelism. Always use manual acknowledgments and idempotent processing to ensure reliability.
Resources
- RabbitMQ Documentation
- Apache Kafka Documentation
- Kafka: The Definitive Guide (book)
- RabbitMQ in Depth (book)
- Confluent Kafka Tutorials
- CloudAMQP RabbitMQ Best Practices
Comments