Message Queues: RabbitMQ and Kafka in Go
Introduction
Message queues enable asynchronous communication between microservices, improving system resilience and scalability. This guide covers implementing message queue patterns with RabbitMQ and Kafka in Go.
Message queues decouple services, allowing them to communicate without direct dependencies, making systems more robust and easier to scale.
RabbitMQ Fundamentals
RabbitMQ Publisher
package main
import (
"context"
"encoding/json"
"fmt"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
// Message represents a message to publish
type Message struct {
ID string `json:"id"`
Type string `json:"type"`
Payload interface{} `json:"payload"`
}
// RabbitMQPublisher publishes messages to RabbitMQ
type RabbitMQPublisher struct {
conn *amqp.Connection
channel *amqp.Channel
}
// NewRabbitMQPublisher creates a new publisher
func NewRabbitMQPublisher(url string) (*RabbitMQPublisher, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
channel, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to open channel: %w", err)
}
return &RabbitMQPublisher{
conn: conn,
channel: channel,
}, nil
}
// DeclareExchange declares an exchange
func (p *RabbitMQPublisher) DeclareExchange(name, kind string) error {
return p.channel.ExchangeDeclare(
name, // name
kind, // kind (direct, fanout, topic, headers)
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
}
// Publish publishes a message
func (p *RabbitMQPublisher) Publish(ctx context.Context, exchange, routingKey string, message Message) error {
body, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.channel.PublishWithContext(
ctx,
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
}
// Close closes the publisher
func (p *RabbitMQPublisher) Close() error {
if err := p.channel.Close(); err != nil {
return err
}
return p.conn.Close()
}
RabbitMQ Consumer
package main
import (
"context"
"encoding/json"
"fmt"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
// MessageHandler processes a message
type MessageHandler func(ctx context.Context, message Message) error
// RabbitMQConsumer consumes messages from RabbitMQ
type RabbitMQConsumer struct {
conn *amqp.Connection
channel *amqp.Channel
queue string
}
// NewRabbitMQConsumer creates a new consumer
func NewRabbitMQConsumer(url, queueName string) (*RabbitMQConsumer, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
channel, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to open channel: %w", err)
}
// Declare queue
_, err = channel.QueueDeclare(
queueName, // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("failed to declare queue: %w", err)
}
return &RabbitMQConsumer{
conn: conn,
channel: channel,
queue: queueName,
}, nil
}
// BindQueue binds a queue to an exchange
func (c *RabbitMQConsumer) BindQueue(exchange, routingKey string) error {
return c.channel.QueueBind(
c.queue, // queue name
routingKey, // routing key
exchange, // exchange
false, // no-wait
nil, // arguments
)
}
// Consume consumes messages
func (c *RabbitMQConsumer) Consume(ctx context.Context, handler MessageHandler) error {
msgs, err := c.channel.Consume(
c.queue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to consume: %w", err)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case delivery := <-msgs:
var message Message
if err := json.Unmarshal(delivery.Body, &message); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
delivery.Nack(false, true) // Requeue
continue
}
if err := handler(ctx, message); err != nil {
log.Printf("Failed to handle message: %v", err)
delivery.Nack(false, true) // Requeue
continue
}
delivery.Ack(false) // Acknowledge
}
}
}
// Close closes the consumer
func (c *RabbitMQConsumer) Close() error {
if err := c.channel.Close(); err != nil {
return err
}
return c.conn.Close()
}
Kafka Implementation
Kafka Producer
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
// KafkaProducer produces messages to Kafka
type KafkaProducer struct {
writer *kafka.Writer
}
// NewKafkaProducer creates a new Kafka producer
func NewKafkaProducer(brokers []string, topic string) *KafkaProducer {
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: topic,
})
return &KafkaProducer{
writer: writer,
}
}
// Produce produces a message
func (p *KafkaProducer) Produce(ctx context.Context, key string, message Message) error {
body, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(key),
Value: body,
})
}
// ProduceBatch produces multiple messages
func (p *KafkaProducer) ProduceBatch(ctx context.Context, messages []kafka.Message) error {
return p.writer.WriteMessages(ctx, messages...)
}
// Close closes the producer
func (p *KafkaProducer) Close() error {
return p.writer.Close()
}
Kafka Consumer
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
// KafkaConsumer consumes messages from Kafka
type KafkaConsumer struct {
reader *kafka.Reader
}
// NewKafkaConsumer creates a new Kafka consumer
func NewKafkaConsumer(brokers []string, topic, groupID string) *KafkaConsumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
})
return &KafkaConsumer{
reader: reader,
}
}
// Consume consumes messages
func (c *KafkaConsumer) Consume(ctx context.Context, handler MessageHandler) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
return fmt.Errorf("failed to read message: %w", err)
}
var message Message
if err := json.Unmarshal(msg.Value, &message); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}
if err := handler(ctx, message); err != nil {
log.Printf("Failed to handle message: %v", err)
continue
}
}
}
// Close closes the consumer
func (c *KafkaConsumer) Close() error {
return c.reader.Close()
}
Message Patterns
Request-Reply Pattern
package main
import (
"context"
"fmt"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// RequestReplyService implements request-reply pattern
type RequestReplyService struct {
channel *amqp.Channel
replies map[string]chan interface{}
mu sync.RWMutex
}
// SendRequest sends a request and waits for reply
func (s *RequestReplyService) SendRequest(ctx context.Context, request interface{}, timeout time.Duration) (interface{}, error) {
replyQueue, err := s.channel.QueueDeclare("", false, true, true, false, nil)
if err != nil {
return nil, err
}
replyChan := make(chan interface{}, 1)
s.mu.Lock()
s.replies[replyQueue.Name] = replyChan
s.mu.Unlock()
defer func() {
s.mu.Lock()
delete(s.replies, replyQueue.Name)
s.mu.Unlock()
}()
// Send request
// ... implementation ...
// Wait for reply
select {
case reply := <-replyChan:
return reply, nil
case <-time.After(timeout):
return nil, fmt.Errorf("request timeout")
case <-ctx.Done():
return nil, ctx.Err()
}
}
Pub-Sub Pattern
package main
import (
"context"
"fmt"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
// PubSubService implements pub-sub pattern
type PubSubService struct {
channel *amqp.Channel
}
// Subscribe subscribes to a topic
func (s *PubSubService) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
// Declare fanout exchange
err := s.channel.ExchangeDeclare(
topic, // name
"fanout", // kind
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
// Declare exclusive queue
queue, err := s.channel.QueueDeclare("", false, true, true, false, nil)
if err != nil {
return err
}
// Bind queue to exchange
err = s.channel.QueueBind(queue.Name, "", topic, false, nil)
if err != nil {
return err
}
// Consume messages
msgs, err := s.channel.Consume(queue.Name, "", true, false, false, false, nil)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case delivery := <-msgs:
var message Message
if err := json.Unmarshal(delivery.Body, &message); err != nil {
log.Printf("Failed to unmarshal: %v", err)
continue
}
handler(ctx, message)
}
}
}
Dead Letter Queues
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
// SetupDeadLetterQueue sets up a dead letter queue
func SetupDeadLetterQueue(channel *amqp.Channel, queueName string) error {
// Declare main exchange
err := channel.ExchangeDeclare("main-exchange", "direct", true, false, false, false, nil)
if err != nil {
return err
}
// Declare DLX (Dead Letter Exchange)
err = channel.ExchangeDeclare("dlx-exchange", "direct", true, false, false, false, nil)
if err != nil {
return err
}
// Declare main queue with DLX
_, err = channel.QueueDeclare(
queueName,
true,
false,
false,
false,
amqp.Table{
"x-dead-letter-exchange": "dlx-exchange",
"x-message-ttl": 3600000, // 1 hour
},
)
if err != nil {
return err
}
// Declare DLQ
_, err = channel.QueueDeclare(
queueName+"-dlq",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Bind queues
err = channel.QueueBind(queueName, queueName, "main-exchange", false, nil)
if err != nil {
return err
}
err = channel.QueueBind(queueName+"-dlq", queueName, "dlx-exchange", false, nil)
if err != nil {
return err
}
return nil
}
Best Practices
1. Message Idempotency
// Ensure messages can be processed multiple times safely
type IdempotentMessage struct {
ID string // Unique identifier
Message Message
}
2. Error Handling
// Implement proper error handling and retries
func (c *RabbitMQConsumer) Consume(ctx context.Context, handler MessageHandler) error {
// ... with retry logic ...
}
3. Monitoring
// Monitor queue depth and processing metrics
type QueueMetrics struct {
Depth int
ProcessTime time.Duration
ErrorCount int
}
4. Graceful Shutdown
// Implement graceful shutdown
func (p *RabbitMQPublisher) GracefulShutdown(ctx context.Context) error {
// Wait for pending messages
// Close connections
return p.Close()
}
Common Pitfalls
1. No Message Ordering
Kafka maintains order per partition; RabbitMQ doesn’t guarantee order.
2. Duplicate Processing
Always implement idempotent message handlers.
3. No Dead Letter Handling
Implement dead letter queues for failed messages.
4. Insufficient Monitoring
Monitor queue depth, processing time, and error rates.
Resources
Summary
Message queues enable scalable, resilient microservices. Key takeaways:
- Use RabbitMQ for complex routing and pub-sub patterns
- Use Kafka for high-throughput, ordered message processing
- Implement idempotent message handlers
- Set up dead letter queues for failed messages
- Monitor queue metrics continuously
- Implement graceful shutdown procedures
- Choose the right pattern for your use case
By mastering message queues, you can build robust, scalable distributed systems.
Comments