Skip to main content
โšก Calmops

Message Queues: RabbitMQ and Kafka in Go

Message Queues: RabbitMQ and Kafka in Go

Introduction

Message queues enable asynchronous communication between microservices, improving system resilience and scalability. This guide covers implementing message queue patterns with RabbitMQ and Kafka in Go.

Message queues decouple services, allowing them to communicate without direct dependencies, making systems more robust and easier to scale.

RabbitMQ Fundamentals

RabbitMQ Publisher

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

// Message represents a message to publish
type Message struct {
	ID      string `json:"id"`
	Type    string `json:"type"`
	Payload interface{} `json:"payload"`
}

// RabbitMQPublisher publishes messages to RabbitMQ
type RabbitMQPublisher struct {
	conn    *amqp.Connection
	channel *amqp.Channel
}

// NewRabbitMQPublisher creates a new publisher
func NewRabbitMQPublisher(url string) (*RabbitMQPublisher, error) {
	conn, err := amqp.Dial(url)
	if err != nil {
		return nil, fmt.Errorf("failed to connect: %w", err)
	}

	channel, err := conn.Channel()
	if err != nil {
		return nil, fmt.Errorf("failed to open channel: %w", err)
	}

	return &RabbitMQPublisher{
		conn:    conn,
		channel: channel,
	}, nil
}

// DeclareExchange declares an exchange
func (p *RabbitMQPublisher) DeclareExchange(name, kind string) error {
	return p.channel.ExchangeDeclare(
		name,  // name
		kind,  // kind (direct, fanout, topic, headers)
		true,  // durable
		false, // auto-deleted
		false, // internal
		false, // no-wait
		nil,   // arguments
	)
}

// Publish publishes a message
func (p *RabbitMQPublisher) Publish(ctx context.Context, exchange, routingKey string, message Message) error {
	body, err := json.Marshal(message)
	if err != nil {
		return fmt.Errorf("failed to marshal message: %w", err)
	}

	return p.channel.PublishWithContext(
		ctx,
		exchange,   // exchange
		routingKey, // routing key
		false,      // mandatory
		false,      // immediate
		amqp.Publishing{
			ContentType: "application/json",
			Body:        body,
		},
	)
}

// Close closes the publisher
func (p *RabbitMQPublisher) Close() error {
	if err := p.channel.Close(); err != nil {
		return err
	}
	return p.conn.Close()
}

RabbitMQ Consumer

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

// MessageHandler processes a message
type MessageHandler func(ctx context.Context, message Message) error

// RabbitMQConsumer consumes messages from RabbitMQ
type RabbitMQConsumer struct {
	conn    *amqp.Connection
	channel *amqp.Channel
	queue   string
}

// NewRabbitMQConsumer creates a new consumer
func NewRabbitMQConsumer(url, queueName string) (*RabbitMQConsumer, error) {
	conn, err := amqp.Dial(url)
	if err != nil {
		return nil, fmt.Errorf("failed to connect: %w", err)
	}

	channel, err := conn.Channel()
	if err != nil {
		return nil, fmt.Errorf("failed to open channel: %w", err)
	}

	// Declare queue
	_, err = channel.QueueDeclare(
		queueName, // name
		true,      // durable
		false,     // auto-delete
		false,     // exclusive
		false,     // no-wait
		nil,       // arguments
	)
	if err != nil {
		return nil, fmt.Errorf("failed to declare queue: %w", err)
	}

	return &RabbitMQConsumer{
		conn:    conn,
		channel: channel,
		queue:   queueName,
	}, nil
}

// BindQueue binds a queue to an exchange
func (c *RabbitMQConsumer) BindQueue(exchange, routingKey string) error {
	return c.channel.QueueBind(
		c.queue,   // queue name
		routingKey, // routing key
		exchange,  // exchange
		false,     // no-wait
		nil,       // arguments
	)
}

// Consume consumes messages
func (c *RabbitMQConsumer) Consume(ctx context.Context, handler MessageHandler) error {
	msgs, err := c.channel.Consume(
		c.queue, // queue
		"",      // consumer
		false,   // auto-ack
		false,   // exclusive
		false,   // no-local
		false,   // no-wait
		nil,     // arguments
	)
	if err != nil {
		return fmt.Errorf("failed to consume: %w", err)
	}

	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case delivery := <-msgs:
			var message Message
			if err := json.Unmarshal(delivery.Body, &message); err != nil {
				log.Printf("Failed to unmarshal message: %v", err)
				delivery.Nack(false, true) // Requeue
				continue
			}

			if err := handler(ctx, message); err != nil {
				log.Printf("Failed to handle message: %v", err)
				delivery.Nack(false, true) // Requeue
				continue
			}

			delivery.Ack(false) // Acknowledge
		}
	}
}

// Close closes the consumer
func (c *RabbitMQConsumer) Close() error {
	if err := c.channel.Close(); err != nil {
		return err
	}
	return c.conn.Close()
}

Kafka Implementation

Kafka Producer

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	"github.com/segmentio/kafka-go"
)

// KafkaProducer produces messages to Kafka
type KafkaProducer struct {
	writer *kafka.Writer
}

// NewKafkaProducer creates a new Kafka producer
func NewKafkaProducer(brokers []string, topic string) *KafkaProducer {
	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers: brokers,
		Topic:   topic,
	})

	return &KafkaProducer{
		writer: writer,
	}
}

// Produce produces a message
func (p *KafkaProducer) Produce(ctx context.Context, key string, message Message) error {
	body, err := json.Marshal(message)
	if err != nil {
		return fmt.Errorf("failed to marshal message: %w", err)
	}

	return p.writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte(key),
		Value: body,
	})
}

// ProduceBatch produces multiple messages
func (p *KafkaProducer) ProduceBatch(ctx context.Context, messages []kafka.Message) error {
	return p.writer.WriteMessages(ctx, messages...)
}

// Close closes the producer
func (p *KafkaProducer) Close() error {
	return p.writer.Close()
}

Kafka Consumer

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	"github.com/segmentio/kafka-go"
)

// KafkaConsumer consumes messages from Kafka
type KafkaConsumer struct {
	reader *kafka.Reader
}

// NewKafkaConsumer creates a new Kafka consumer
func NewKafkaConsumer(brokers []string, topic, groupID string) *KafkaConsumer {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers: brokers,
		Topic:   topic,
		GroupID: groupID,
	})

	return &KafkaConsumer{
		reader: reader,
	}
}

// Consume consumes messages
func (c *KafkaConsumer) Consume(ctx context.Context, handler MessageHandler) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}

		msg, err := c.reader.ReadMessage(ctx)
		if err != nil {
			return fmt.Errorf("failed to read message: %w", err)
		}

		var message Message
		if err := json.Unmarshal(msg.Value, &message); err != nil {
			log.Printf("Failed to unmarshal message: %v", err)
			continue
		}

		if err := handler(ctx, message); err != nil {
			log.Printf("Failed to handle message: %v", err)
			continue
		}
	}
}

// Close closes the consumer
func (c *KafkaConsumer) Close() error {
	return c.reader.Close()
}

Message Patterns

Request-Reply Pattern

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

// RequestReplyService implements request-reply pattern
type RequestReplyService struct {
	channel *amqp.Channel
	replies map[string]chan interface{}
	mu      sync.RWMutex
}

// SendRequest sends a request and waits for reply
func (s *RequestReplyService) SendRequest(ctx context.Context, request interface{}, timeout time.Duration) (interface{}, error) {
	replyQueue, err := s.channel.QueueDeclare("", false, true, true, false, nil)
	if err != nil {
		return nil, err
	}

	replyChan := make(chan interface{}, 1)
	s.mu.Lock()
	s.replies[replyQueue.Name] = replyChan
	s.mu.Unlock()

	defer func() {
		s.mu.Lock()
		delete(s.replies, replyQueue.Name)
		s.mu.Unlock()
	}()

	// Send request
	// ... implementation ...

	// Wait for reply
	select {
	case reply := <-replyChan:
		return reply, nil
	case <-time.After(timeout):
		return nil, fmt.Errorf("request timeout")
	case <-ctx.Done():
		return nil, ctx.Err()
	}
}

Pub-Sub Pattern

package main

import (
	"context"
	"fmt"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

// PubSubService implements pub-sub pattern
type PubSubService struct {
	channel *amqp.Channel
}

// Subscribe subscribes to a topic
func (s *PubSubService) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
	// Declare fanout exchange
	err := s.channel.ExchangeDeclare(
		topic,    // name
		"fanout", // kind
		true,     // durable
		false,    // auto-delete
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	if err != nil {
		return err
	}

	// Declare exclusive queue
	queue, err := s.channel.QueueDeclare("", false, true, true, false, nil)
	if err != nil {
		return err
	}

	// Bind queue to exchange
	err = s.channel.QueueBind(queue.Name, "", topic, false, nil)
	if err != nil {
		return err
	}

	// Consume messages
	msgs, err := s.channel.Consume(queue.Name, "", true, false, false, false, nil)
	if err != nil {
		return err
	}

	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case delivery := <-msgs:
			var message Message
			if err := json.Unmarshal(delivery.Body, &message); err != nil {
				log.Printf("Failed to unmarshal: %v", err)
				continue
			}
			handler(ctx, message)
		}
	}
}

Dead Letter Queues

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

// SetupDeadLetterQueue sets up a dead letter queue
func SetupDeadLetterQueue(channel *amqp.Channel, queueName string) error {
	// Declare main exchange
	err := channel.ExchangeDeclare("main-exchange", "direct", true, false, false, false, nil)
	if err != nil {
		return err
	}

	// Declare DLX (Dead Letter Exchange)
	err = channel.ExchangeDeclare("dlx-exchange", "direct", true, false, false, false, nil)
	if err != nil {
		return err
	}

	// Declare main queue with DLX
	_, err = channel.QueueDeclare(
		queueName,
		true,
		false,
		false,
		false,
		amqp.Table{
			"x-dead-letter-exchange": "dlx-exchange",
			"x-message-ttl":          3600000, // 1 hour
		},
	)
	if err != nil {
		return err
	}

	// Declare DLQ
	_, err = channel.QueueDeclare(
		queueName+"-dlq",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return err
	}

	// Bind queues
	err = channel.QueueBind(queueName, queueName, "main-exchange", false, nil)
	if err != nil {
		return err
	}

	err = channel.QueueBind(queueName+"-dlq", queueName, "dlx-exchange", false, nil)
	if err != nil {
		return err
	}

	return nil
}

Best Practices

1. Message Idempotency

// Ensure messages can be processed multiple times safely
type IdempotentMessage struct {
	ID      string // Unique identifier
	Message Message
}

2. Error Handling

// Implement proper error handling and retries
func (c *RabbitMQConsumer) Consume(ctx context.Context, handler MessageHandler) error {
	// ... with retry logic ...
}

3. Monitoring

// Monitor queue depth and processing metrics
type QueueMetrics struct {
	Depth       int
	ProcessTime time.Duration
	ErrorCount  int
}

4. Graceful Shutdown

// Implement graceful shutdown
func (p *RabbitMQPublisher) GracefulShutdown(ctx context.Context) error {
	// Wait for pending messages
	// Close connections
	return p.Close()
}

Common Pitfalls

1. No Message Ordering

Kafka maintains order per partition; RabbitMQ doesn’t guarantee order.

2. Duplicate Processing

Always implement idempotent message handlers.

3. No Dead Letter Handling

Implement dead letter queues for failed messages.

4. Insufficient Monitoring

Monitor queue depth, processing time, and error rates.

Resources

Summary

Message queues enable scalable, resilient microservices. Key takeaways:

  • Use RabbitMQ for complex routing and pub-sub patterns
  • Use Kafka for high-throughput, ordered message processing
  • Implement idempotent message handlers
  • Set up dead letter queues for failed messages
  • Monitor queue metrics continuously
  • Implement graceful shutdown procedures
  • Choose the right pattern for your use case

By mastering message queues, you can build robust, scalable distributed systems.

Comments