Skip to main content
โšก Calmops

Event-Driven Architecture in Go

Event-Driven Architecture in Go

Introduction

Event-driven architecture decouples services through events, enabling scalable, responsive systems. This guide covers implementing event-driven patterns in Go, including event sourcing and CQRS.

Event-driven systems react to state changes, allowing services to communicate asynchronously and independently, improving resilience and scalability.

Event-Driven Fundamentals

Basic Event Model

package main

import (
	"fmt"
	"time"
)

// Event represents a domain event
type Event interface {
	EventType() string
	AggregateID() string
	Timestamp() time.Time
}

// BaseEvent provides common event functionality
type BaseEvent struct {
	Type        string
	ID          string
	OccurredAt  time.Time
}

func (e *BaseEvent) EventType() string {
	return e.Type
}

func (e *BaseEvent) AggregateID() string {
	return e.ID
}

func (e *BaseEvent) Timestamp() time.Time {
	return e.OccurredAt
}

// OrderCreatedEvent represents an order creation
type OrderCreatedEvent struct {
	BaseEvent
	CustomerID string
	Total      float64
}

// OrderShippedEvent represents order shipment
type OrderShippedEvent struct {
	BaseEvent
	TrackingNumber string
}

// OrderCancelledEvent represents order cancellation
type OrderCancelledEvent struct {
	BaseEvent
	Reason string
}

Good: Proper Event-Driven Implementation

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)

// EventStore stores events
type EventStore interface {
	Append(ctx context.Context, event Event) error
	GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
}

// InMemoryEventStore is a simple in-memory event store
type InMemoryEventStore struct {
	events map[string][]Event
	mu     sync.RWMutex
}

// NewInMemoryEventStore creates a new event store
func NewInMemoryEventStore() *InMemoryEventStore {
	return &InMemoryEventStore{
		events: make(map[string][]Event),
	}
}

// Append appends an event
func (es *InMemoryEventStore) Append(ctx context.Context, event Event) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	es.mu.Lock()
	defer es.mu.Unlock()

	id := event.AggregateID()
	es.events[id] = append(es.events[id], event)
	return nil
}

// GetEvents retrieves events for an aggregate
func (es *InMemoryEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

	es.mu.RLock()
	defer es.mu.RUnlock()

	events, exists := es.events[aggregateID]
	if !exists {
		return []Event{}, nil
	}

	return events, nil
}

// EventHandler processes events
type EventHandler func(ctx context.Context, event Event) error

// EventBus manages event publishing and subscription
type EventBus struct {
	handlers map[string][]EventHandler
	mu       sync.RWMutex
}

// NewEventBus creates a new event bus
func NewEventBus() *EventBus {
	return &EventBus{
		handlers: make(map[string][]EventHandler),
	}
}

// Subscribe subscribes to events
func (eb *EventBus) Subscribe(eventType string, handler EventHandler) {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}

// Publish publishes an event
func (eb *EventBus) Publish(ctx context.Context, event Event) error {
	eb.mu.RLock()
	handlers, exists := eb.handlers[event.EventType()]
	eb.mu.RUnlock()

	if !exists {
		return nil
	}

	for _, handler := range handlers {
		go func(h EventHandler) {
			if err := h(ctx, event); err != nil {
				log.Printf("Error handling event: %v", err)
			}
		}(handler)
	}

	return nil
}

// Order represents an order aggregate
type Order struct {
	ID         string
	CustomerID string
	Status     string
	Total      float64
	Events     []Event
}

// NewOrder creates a new order
func NewOrder(id, customerID string, total float64) *Order {
	return &Order{
		ID:         id,
		CustomerID: customerID,
		Status:     "pending",
		Total:      total,
		Events:     []Event{},
	}
}

// CreateOrder creates an order and records event
func (o *Order) CreateOrder() Event {
	event := &OrderCreatedEvent{
		BaseEvent: BaseEvent{
			Type:       "OrderCreated",
			ID:         o.ID,
			OccurredAt: time.Now(),
		},
		CustomerID: o.CustomerID,
		Total:      o.Total,
	}
	o.Events = append(o.Events, event)
	o.Status = "created"
	return event
}

// ShipOrder ships an order
func (o *Order) ShipOrder(trackingNumber string) Event {
	event := &OrderShippedEvent{
		BaseEvent: BaseEvent{
			Type:       "OrderShipped",
			ID:         o.ID,
			OccurredAt: time.Now(),
		},
		TrackingNumber: trackingNumber,
	}
	o.Events = append(o.Events, event)
	o.Status = "shipped"
	return event
}

// CancelOrder cancels an order
func (o *Order) CancelOrder(reason string) Event {
	event := &OrderCancelledEvent{
		BaseEvent: BaseEvent{
			Type:       "OrderCancelled",
			ID:         o.ID,
			OccurredAt: time.Now(),
		},
		Reason: reason,
	}
	o.Events = append(o.Events, event)
	o.Status = "cancelled"
	return event
}

// OrderService manages orders
type OrderService struct {
	eventStore EventStore
	eventBus   *EventBus
}

// NewOrderService creates a new order service
func NewOrderService(eventStore EventStore, eventBus *EventBus) *OrderService {
	return &OrderService{
		eventStore: eventStore,
		eventBus:   eventBus,
	}
}

// CreateOrder creates an order
func (s *OrderService) CreateOrder(ctx context.Context, customerID string, total float64) (*Order, error) {
	order := NewOrder(fmt.Sprintf("order-%d", time.Now().Unix()), customerID, total)
	event := order.CreateOrder()

	if err := s.eventStore.Append(ctx, event); err != nil {
		return nil, err
	}

	if err := s.eventBus.Publish(ctx, event); err != nil {
		return nil, err
	}

	return order, nil
}

// ShipOrder ships an order
func (s *OrderService) ShipOrder(ctx context.Context, orderID, trackingNumber string) error {
	events, err := s.eventStore.GetEvents(ctx, orderID)
	if err != nil {
		return err
	}

	order := &Order{ID: orderID, Events: events}
	event := order.ShipOrder(trackingNumber)

	if err := s.eventStore.Append(ctx, event); err != nil {
		return err
	}

	return s.eventBus.Publish(ctx, event)
}

Bad: Improper Event-Driven Implementation

package main

import (
	"context"
)

// BAD: No event store
type BadOrderService struct {
	orders map[string]interface{}
}

// BAD: Direct state mutation without events
func (s *BadOrderService) CreateOrder(ctx context.Context, customerID string, total float64) error {
	// No event recording
	// No event publishing
	// Direct state mutation
	s.orders["order-1"] = map[string]interface{}{
		"customer": customerID,
		"total":    total,
	}
	return nil
}

// BAD: No event sourcing
func (s *BadOrderService) ShipOrder(ctx context.Context, orderID string) error {
	// No event history
	// No audit trail
	// Direct state update
	return nil
}

Problems:

  • No event store
  • No event history
  • No audit trail
  • Direct state mutation

Event Sourcing

package main

import (
	"context"
	"fmt"
	"time"
)

// Snapshot represents a point-in-time snapshot
type Snapshot struct {
	AggregateID string
	Version     int
	State       interface{}
	Timestamp   time.Time
}

// SnapshotStore stores snapshots
type SnapshotStore interface {
	SaveSnapshot(ctx context.Context, snapshot Snapshot) error
	GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error)
}

// EventSourcingService implements event sourcing
type EventSourcingService struct {
	eventStore    EventStore
	snapshotStore SnapshotStore
}

// ReplayEvents replays events to rebuild state
func (es *EventSourcingService) ReplayEvents(ctx context.Context, aggregateID string) (*Order, error) {
	// Try to get snapshot first
	snapshot, _ := es.snapshotStore.GetSnapshot(ctx, aggregateID)

	var order *Order
	var startVersion int

	if snapshot != nil {
		order = snapshot.State.(*Order)
		startVersion = snapshot.Version
	} else {
		order = &Order{ID: aggregateID}
		startVersion = 0
	}

	// Get events after snapshot
	events, err := es.eventStore.GetEvents(ctx, aggregateID)
	if err != nil {
		return nil, err
	}

	// Replay events
	for _, event := range events {
		es.applyEvent(order, event)
	}

	return order, nil
}

// applyEvent applies an event to the order
func (es *EventSourcingService) applyEvent(order *Order, event Event) {
	switch e := event.(type) {
	case *OrderCreatedEvent:
		order.Status = "created"
		order.CustomerID = e.CustomerID
		order.Total = e.Total
	case *OrderShippedEvent:
		order.Status = "shipped"
	case *OrderCancelledEvent:
		order.Status = "cancelled"
	}
}

CQRS Pattern

package main

import (
	"context"
	"sync"
)

// Command represents a command
type Command interface {
	CommandType() string
}

// Query represents a query
type Query interface {
	QueryType() string
}

// CommandHandler handles commands
type CommandHandler interface {
	Handle(ctx context.Context, command Command) error
}

// QueryHandler handles queries
type QueryHandler interface {
	Handle(ctx context.Context, query Query) (interface{}, error)
}

// CQRS implements Command Query Responsibility Segregation
type CQRS struct {
	commandHandlers map[string]CommandHandler
	queryHandlers   map[string]QueryHandler
	mu              sync.RWMutex
}

// NewCQRS creates a new CQRS
func NewCQRS() *CQRS {
	return &CQRS{
		commandHandlers: make(map[string]CommandHandler),
		queryHandlers:   make(map[string]QueryHandler),
	}
}

// RegisterCommandHandler registers a command handler
func (c *CQRS) RegisterCommandHandler(commandType string, handler CommandHandler) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.commandHandlers[commandType] = handler
}

// RegisterQueryHandler registers a query handler
func (c *CQRS) RegisterQueryHandler(queryType string, handler QueryHandler) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.queryHandlers[queryType] = handler
}

// ExecuteCommand executes a command
func (c *CQRS) ExecuteCommand(ctx context.Context, command Command) error {
	c.mu.RLock()
	handler, exists := c.commandHandlers[command.CommandType()]
	c.mu.RUnlock()

	if !exists {
		return fmt.Errorf("no handler for command: %s", command.CommandType())
	}

	return handler.Handle(ctx, command)
}

// ExecuteQuery executes a query
func (c *CQRS) ExecuteQuery(ctx context.Context, query Query) (interface{}, error) {
	c.mu.RLock()
	handler, exists := c.queryHandlers[query.QueryType()]
	c.mu.RUnlock()

	if !exists {
		return nil, fmt.Errorf("no handler for query: %s", query.QueryType())
	}

	return handler.Handle(ctx, query)
}

// Example commands and queries
type CreateOrderCommand struct {
	CustomerID string
	Total      float64
}

func (c *CreateOrderCommand) CommandType() string {
	return "CreateOrder"
}

type GetOrderQuery struct {
	OrderID string
}

func (q *GetOrderQuery) QueryType() string {
	return "GetOrder"
}

Best Practices

1. Event Versioning

// Version events for backward compatibility
type VersionedEvent struct {
	Event   Event
	Version int
}

2. Event Validation

// Validate events before storing
func ValidateEvent(event Event) error {
	if event.AggregateID() == "" {
		return fmt.Errorf("aggregate ID is required")
	}
	return nil
}

3. Idempotent Handlers

// Ensure event handlers are idempotent
func (s *OrderService) HandleOrderCreated(ctx context.Context, event Event) error {
	// Check if already processed
	// Process only once
	return nil
}

4. Monitoring

// Monitor event processing
type EventMetrics struct {
	ProcessedCount int
	FailedCount    int
	AverageLatency time.Duration
}

Common Pitfalls

1. Event Immutability

Never modify events after storing.

2. No Event Versioning

Always version events for compatibility.

3. Synchronous Event Handling

Use asynchronous handlers to avoid blocking.

4. No Snapshots

Use snapshots for large event streams.

Resources

Summary

Event-driven architecture enables scalable, responsive systems. Key takeaways:

  • Use events to represent state changes
  • Implement event sourcing for audit trails
  • Use CQRS to separate read and write models
  • Version events for backward compatibility
  • Implement idempotent event handlers
  • Use snapshots for performance
  • Monitor event processing metrics

By mastering event-driven patterns, you can build resilient, scalable systems.

Comments