Skip to main content
โšก Calmops

Message Queues: Kafka, RabbitMQ, and Event-Driven Architecture

Message queues are the backbone of asynchronous, scalable systems. They decouple producers from consumers, enable resilience, and allow systems to handle bursts of traffic. This guide covers architecture patterns, implementation strategies, and best practices for building message-driven systems.

Why Message Queues Matter

Message queues provide:

  • Decoupling: Producers don’t need to know consumers
  • Resilience: Messages persist until processed
  • Scalability: Consumers can scale independently
  • Ordering: Maintain message order within partitions
  • Throughput: Handle millions of messages per second
  • Backpressure: Buffer during traffic spikes

Kafka Deep Dive

Kafka Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      Kafka Cluster                           โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                     โ”‚
โ”‚  โ”‚Broker 1 โ”‚  โ”‚Broker 2 โ”‚  โ”‚Broker 3 โ”‚                     โ”‚
โ”‚  โ”‚Leader:P1โ”‚  โ”‚Leader:P2โ”‚  โ”‚Leader:P3โ”‚                     โ”‚
โ”‚  โ”‚Replica:P3โ”‚ โ”‚Replica:P1โ”‚ โ”‚Replica:P2โ”‚                    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                     โ”‚
โ”‚                                                               โ”‚
โ”‚  Topic: orders                                               โ”‚
โ”‚  Partition 0 โ”€โ”€โ–ถ [msg1, msg4, msg7]                        โ”‚
โ”‚  Partition 1 โ”€โ”€โ–ถ [msg2, msg5, msg8]                        โ”‚
โ”‚  Partition 2 โ”€โ”€โ–ถ [msg3, msg6, msg9]                        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
        โ†‘                   โ†‘                    โ†‘
    Producer            Consumer              Consumer
    Group A            Group B               Group C

Producer Implementation

from kafka import KafkaProducer
import json
import logging

producer = KafkaProducer(
    bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',  # Wait for all replicas
    retries=3,
    max_in_flight_requests_per_connection=5,
    compression_type='gzip'
)

# Send messages
def send_order(order_data):
    # Use order ID as key for partitioning
    future = producer.send(
        'orders',
        key=order_data['order_id'],
        value={
            'event': 'order_created',
            'order_id': order_data['order_id'],
            'customer_id': order_data['customer_id'],
            'total': order_data['total'],
            'items': order_data['items'],
            'timestamp': order_data['created_at']
        }
    )
    
    # Block until sent (or handle async)
    record_metadata = future.get(timeout=10)
    
    logging.info(f"Order {order_data['order_id']} sent to "
                 f"{record_metadata.topic}:{record_metadata.partition}:"
                 f"{record_metadata.offset}")
    
    return record_metadata

# Batch sending for better performance
def send_orders_batch(orders):
    batch = []
    for order in orders:
        batch.append(
            producer.send(
                'orders',
                key=order['order_id'],
                value=order
            )
        )
    
    # Wait for all to complete
    for future in batch:
        future.get(timeout=30)

Consumer Implementation

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
    group_id='order-processor',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    max_poll_records=100,
    max_poll_interval_ms=300000
)

# Process messages
for message in consumer:
    try:
        order = message.value
        
        # Process order
        process_order(order)
        
        # Commit offset after successful processing
        consumer.commit()
        
        print(f"Processed order {order['order_id']} from "
              f"partition {message.partition}")
        
    except Exception as e:
        print(f"Error processing order: {e}")
        # Don't commit - message will be reprocessed
        # Or send to DLQ
        send_to_dlq(message.value, str(e))

Kafka Streams

from kafka import KafkaConsumer, KafkaProducer
from collections import defaultdict

# Simple streams processing
orders_consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka-1:9092'],
    group_id='order-analytics',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Aggregate orders by customer
customer_orders = defaultdict(list)

for message in orders_consumer:
    order = message.value
    customer_id = order['customer_id']
    customer_orders[customer_id].append(order)
    
    # Every 100 orders per customer, send analytics
    if len(customer_orders[customer_id]) % 100 == 0:
        analytics = {
            'customer_id': customer_id,
            'order_count': len(customer_orders[customer_id]),
            'total_spent': sum(o['total'] for o in customer_orders[customer_id]),
            'last_order': order['created_at']
        }
        
        producer = KafkaProducer(
            bootstrap_servers=['kafka-1:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        producer.send('customer-analytics', value=analytics)
        print(f"Sent analytics for customer {customer_id}")

RabbitMQ Patterns

Exchange Types

import pika
import json

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='rabbitmq.local')
)
channel = connection.channel()

# Declare exchange
channel.exchange_declare(
    exchange='orders',
    exchange_type='topic',
    durable=True
)

# Declare queue
channel.queue_declare(queue='order.created', durable=True)

# Bind queue to exchange
channel.queue_bind(
    exchange='orders',
    queue='order.created',
    routing_key='order.created'
)

Publisher Patterns

# Direct publishing
def publish_order_created(order_data):
    channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=json.dumps(order_data),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Persistent
            content_type='application/json',
            message_id=order_data['order_id']
        )
    )

# Confirmed publishing
channel.confirm_delivery()

def publish_with_confirm(order_data):
    if channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=json.dumps(order_data),
        mandatory=True
    ):
        print("Message confirmed")
    else:
        print("Message failed")

Consumer Patterns

# Fair dispatch - don't overwhelm consumers
channel.basic_qos(prefetch_count=10)

def process_order(ch, method, properties, body):
    try:
        order = json.loads(body)
        process_order(order)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error: {e}")
        # Requeue the message
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(queue='order.created', on_message_callback=process_order)

# Run consumer
channel.start_consuming()

Work Queues

# Task queue for background processing
def process_task(task_data):
    # Simulate work
    import time
    time.sleep(len(task_data['items']) * 0.1)
    return f"Processed {len(task_data['items'])} items"

# Producer - submit tasks
def submit_task(task):
    channel.queue_declare(queue='tasks', durable=True)
    channel.basic_publish(
        exchange='',
        routing_key='tasks',
        body=json.dumps(task),
        properties=pika.BasicProperties(
            delivery_mode=2
        )
    )

# Worker - process tasks
def worker(ch, method, properties, body):
    task = json.loads(body)
    result = process_task(task)
    print(result)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=worker)

AWS SQS Patterns

Standard Queue

import boto3
import json

sqs = boto3.client('sqs')

def send_order_sqs(order_data):
    response = sqs.send_message(
        QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders',
        MessageBody=json.dumps(order_data),
        MessageGroupId='orders',
        MessageDeduplicationId=order_data['order_id'],
        DelaySeconds=0
    )
    return response['MessageId']

def receive_orders():
    response = sqs.receive_message(
        QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders',
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20,
        VisibilityTimeout=300
    )
    
    messages = response.get('Messages', [])
    
    for message in messages:
        order = json.loads(message['Body'])
        
        # Process order
        process_order(order)
        
        # Delete message
        sqs.delete_message(
            QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders',
            ReceiptHandle=message['ReceiptHandle']
        )
    
    return len(messages)

FIFO Queue (Ordering + Deduplication)

# FIFO queue ensures ordering and exactly-once processing
sqs_fifo = boto3.client('sqs')

def send_order_fifo(order_data):
    # Use order ID as deduplication ID
    response = sqs_fifo.send_message(
        QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/orders.fifo',
        MessageBody=json.dumps(order_data),
        MessageGroupId='orders',  # Messages with same group ID maintain order
        MessageDeduplicationId=order_data['order_id'],  # Prevent duplicates
    )
    
    return response['MessageId']

Event-Driven Architecture

Event Schema

{
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "event_type": "OrderCreated",
  "event_version": "1.0",
  "timestamp": "2024-01-15T10:30:00Z",
  "producer": "orders-service",
  "data": {
    "order_id": "ORD-12345",
    "customer_id": "CUST-67890",
    "total": 99.99,
    "currency": "USD",
    "items": [
      {"product_id": "PROD-1", "quantity": 2, "price": 49.99}
    ]
  },
  "metadata": {
    "correlation_id": "corr-123",
    "causation_id": "cmd-456"
  }
}

Event Handler Pattern

from abc import ABC, abstractmethod

class EventHandler(ABC):
    @abstractmethod
    def handle(self, event):
        pass

class OrderEventHandler(EventHandler):
    def handle(self, event):
        if event['event_type'] == 'OrderCreated':
            self.handle_order_created(event)
        elif event['event_type'] == 'OrderCancelled':
            self.handle_order_cancelled(event)
    
    def handle_order_created(self, event):
        # Update inventory
        update_inventory(event['data']['items'])
        
        # Send confirmation email
        send_email(event['data']['customer_id'], 'order_confirmation')
        
        # Notify fulfillment
        publish_to_queue('fulfillment', event)

class EventBus:
    def __init__(self):
        self.handlers = {}
    
    def subscribe(self, event_type, handler):
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
    
    def publish(self, event):
        event_type = event['event_type']
        if event_type in self.handlers:
            for handler in self.handlers[event_type]:
                try:
                    handler.handle(event)
                except Exception as e:
                    print(f"Handler error: {e}")

# Usage
event_bus = EventBus()
event_bus.subscribe('OrderCreated', OrderEventHandler())
event_bus.subscribe('OrderCreated', AnalyticsEventHandler())
event_bus.publish(order_event)

Saga Pattern

# Orchestrated saga for order processing
class OrderSaga:
    def __init__(self):
        self.steps = [
            self.reserve_inventory,
            self.process_payment,
            self.create_shipment,
            self.send_confirmation
        ]
    
    def execute(self, order_data):
        completed_steps = []
        
        for step in self.steps:
            try:
                step(order_data)
                completed_steps.append(step.__name__)
            except Exception as e:
                print(f"Step {step.__name__} failed: {e}")
                self.compensate(completed_steps, order_data)
                raise
        
        return "Order completed successfully"
    
    def compensate(self, completed_steps, order_data):
        # Reverse completed steps
        for step_name in reversed(completed_steps):
            if step_name == 'reserve_inventory':
                release_inventory(order_data)
            elif step_name == 'process_payment':
                refund_payment(order_data)
            # ... etc

CQRS with Message Queues

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Commands   โ”‚     โ”‚    Queries    โ”‚
โ”‚   (Writes)   โ”‚     โ”‚    (Reads)    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
       โ”‚                    โ”‚
       โ–ผ                    โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚    Command   โ”‚     โ”‚    Query     โ”‚
โ”‚   Handler    โ”‚     โ”‚   Handler    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
       โ”‚                    โ”‚
       โ–ผ                    โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚    Write     โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚    Read      โ”‚
โ”‚   Database   โ”‚     โ”‚   Database   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
       โ”‚                    โ–ฒ
       โ”‚  Event             โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Error Handling and Dead Letters

Dead Letter Queues

# Kafka DLQ
def send_to_dlq(message, error):
    producer.send(
        'orders.dlq',
        value={
            'original_message': message,
            'error': str(error),
            'timestamp': datetime.utcnow().isoformat()
        }
    )

# Consumer with DLQ
for message in consumer:
    try:
        process_message(message.value)
        consumer.commit()
    except PermanentError as e:
        # Non-retryable - send to DLQ
        send_to_dlq(message.value, str(e))
        consumer.commit()
    except TemporaryError as e:
        # Retryable - will be redelivered
        raise
# RabbitMQ DLQ configuration
arguments:
  x-dead-letter-exchange: orders.dlx
  x-dead-letter-routing-key: orders.dlq

Retry with Backoff

from functools import wraps
import time

def retry_with_backoff(max_retries=3, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    print(f"Retry {attempt + 1} after {delay}s")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3, base_delay=1)
def process_message(message):
    # Your processing logic
    pass

Message Queue Comparison

Feature Kafka RabbitMQ AWS SQS
Ordering Per partition Per queue Per FIFO queue
Delivery At-least-once At-least-once At-least-once
Exactly-once With idempotency Via transactions FIFO only
Throughput Millions/s Hundreds/s Thousands/s
Latency <1ms <1ms ~100ms
Persistence Yes (configurable) Yes Yes (managed)
Scaling Partition-based Clustering Managed
Use case Event streaming Task queues Cloud-native

Monitoring

# Kafka monitoring
- name: kafka
  rules:
    - alert: ConsumerLagHigh
      expr: kafka_consumer_group_lag > 1000
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "High consumer lag"
        
    - alert: BrokerDown
      expr: up{job="kafka"} == 0
      for: 1m
      labels:
        severity: critical
      annotations:
        summary: "Kafka broker down"

Conclusion

Message queues enable scalable, resilient systems:

  • Use Kafka for high-throughput event streaming
  • Use RabbitMQ for complex routing and task queues
  • Use SQS for simple cloud-native queuing
  • Implement proper error handling and DLQs
  • Monitor consumer lag and message rates
  • Design events carefully with versioning

Start with simple queues, add complexity as needed.

External Resources

Comments