Event-Driven Architecture in Go
Introduction
Event-driven architecture decouples services through events, enabling scalable, responsive systems. This guide covers implementing event-driven patterns in Go, including event sourcing and CQRS.
Event-driven systems react to state changes, allowing services to communicate asynchronously and independently, improving resilience and scalability.
Event-Driven Fundamentals
Basic Event Model
package main
import (
"fmt"
"time"
)
// Event represents a domain event
type Event interface {
EventType() string
AggregateID() string
Timestamp() time.Time
}
// BaseEvent provides common event functionality
type BaseEvent struct {
Type string
ID string
OccurredAt time.Time
}
func (e *BaseEvent) EventType() string {
return e.Type
}
func (e *BaseEvent) AggregateID() string {
return e.ID
}
func (e *BaseEvent) Timestamp() time.Time {
return e.OccurredAt
}
// OrderCreatedEvent represents an order creation
type OrderCreatedEvent struct {
BaseEvent
CustomerID string
Total float64
}
// OrderShippedEvent represents order shipment
type OrderShippedEvent struct {
BaseEvent
TrackingNumber string
}
// OrderCancelledEvent represents order cancellation
type OrderCancelledEvent struct {
BaseEvent
Reason string
}
Good: Proper Event-Driven Implementation
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// EventStore stores events
type EventStore interface {
Append(ctx context.Context, event Event) error
GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
}
// InMemoryEventStore is a simple in-memory event store
type InMemoryEventStore struct {
events map[string][]Event
mu sync.RWMutex
}
// NewInMemoryEventStore creates a new event store
func NewInMemoryEventStore() *InMemoryEventStore {
return &InMemoryEventStore{
events: make(map[string][]Event),
}
}
// Append appends an event
func (es *InMemoryEventStore) Append(ctx context.Context, event Event) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
es.mu.Lock()
defer es.mu.Unlock()
id := event.AggregateID()
es.events[id] = append(es.events[id], event)
return nil
}
// GetEvents retrieves events for an aggregate
func (es *InMemoryEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
es.mu.RLock()
defer es.mu.RUnlock()
events, exists := es.events[aggregateID]
if !exists {
return []Event{}, nil
}
return events, nil
}
// EventHandler processes events
type EventHandler func(ctx context.Context, event Event) error
// EventBus manages event publishing and subscription
type EventBus struct {
handlers map[string][]EventHandler
mu sync.RWMutex
}
// NewEventBus creates a new event bus
func NewEventBus() *EventBus {
return &EventBus{
handlers: make(map[string][]EventHandler),
}
}
// Subscribe subscribes to events
func (eb *EventBus) Subscribe(eventType string, handler EventHandler) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}
// Publish publishes an event
func (eb *EventBus) Publish(ctx context.Context, event Event) error {
eb.mu.RLock()
handlers, exists := eb.handlers[event.EventType()]
eb.mu.RUnlock()
if !exists {
return nil
}
for _, handler := range handlers {
go func(h EventHandler) {
if err := h(ctx, event); err != nil {
log.Printf("Error handling event: %v", err)
}
}(handler)
}
return nil
}
// Order represents an order aggregate
type Order struct {
ID string
CustomerID string
Status string
Total float64
Events []Event
}
// NewOrder creates a new order
func NewOrder(id, customerID string, total float64) *Order {
return &Order{
ID: id,
CustomerID: customerID,
Status: "pending",
Total: total,
Events: []Event{},
}
}
// CreateOrder creates an order and records event
func (o *Order) CreateOrder() Event {
event := &OrderCreatedEvent{
BaseEvent: BaseEvent{
Type: "OrderCreated",
ID: o.ID,
OccurredAt: time.Now(),
},
CustomerID: o.CustomerID,
Total: o.Total,
}
o.Events = append(o.Events, event)
o.Status = "created"
return event
}
// ShipOrder ships an order
func (o *Order) ShipOrder(trackingNumber string) Event {
event := &OrderShippedEvent{
BaseEvent: BaseEvent{
Type: "OrderShipped",
ID: o.ID,
OccurredAt: time.Now(),
},
TrackingNumber: trackingNumber,
}
o.Events = append(o.Events, event)
o.Status = "shipped"
return event
}
// CancelOrder cancels an order
func (o *Order) CancelOrder(reason string) Event {
event := &OrderCancelledEvent{
BaseEvent: BaseEvent{
Type: "OrderCancelled",
ID: o.ID,
OccurredAt: time.Now(),
},
Reason: reason,
}
o.Events = append(o.Events, event)
o.Status = "cancelled"
return event
}
// OrderService manages orders
type OrderService struct {
eventStore EventStore
eventBus *EventBus
}
// NewOrderService creates a new order service
func NewOrderService(eventStore EventStore, eventBus *EventBus) *OrderService {
return &OrderService{
eventStore: eventStore,
eventBus: eventBus,
}
}
// CreateOrder creates an order
func (s *OrderService) CreateOrder(ctx context.Context, customerID string, total float64) (*Order, error) {
order := NewOrder(fmt.Sprintf("order-%d", time.Now().Unix()), customerID, total)
event := order.CreateOrder()
if err := s.eventStore.Append(ctx, event); err != nil {
return nil, err
}
if err := s.eventBus.Publish(ctx, event); err != nil {
return nil, err
}
return order, nil
}
// ShipOrder ships an order
func (s *OrderService) ShipOrder(ctx context.Context, orderID, trackingNumber string) error {
events, err := s.eventStore.GetEvents(ctx, orderID)
if err != nil {
return err
}
order := &Order{ID: orderID, Events: events}
event := order.ShipOrder(trackingNumber)
if err := s.eventStore.Append(ctx, event); err != nil {
return err
}
return s.eventBus.Publish(ctx, event)
}
Bad: Improper Event-Driven Implementation
package main
import (
"context"
)
// BAD: No event store
type BadOrderService struct {
orders map[string]interface{}
}
// BAD: Direct state mutation without events
func (s *BadOrderService) CreateOrder(ctx context.Context, customerID string, total float64) error {
// No event recording
// No event publishing
// Direct state mutation
s.orders["order-1"] = map[string]interface{}{
"customer": customerID,
"total": total,
}
return nil
}
// BAD: No event sourcing
func (s *BadOrderService) ShipOrder(ctx context.Context, orderID string) error {
// No event history
// No audit trail
// Direct state update
return nil
}
Problems:
- No event store
- No event history
- No audit trail
- Direct state mutation
Event Sourcing
package main
import (
"context"
"fmt"
"time"
)
// Snapshot represents a point-in-time snapshot
type Snapshot struct {
AggregateID string
Version int
State interface{}
Timestamp time.Time
}
// SnapshotStore stores snapshots
type SnapshotStore interface {
SaveSnapshot(ctx context.Context, snapshot Snapshot) error
GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error)
}
// EventSourcingService implements event sourcing
type EventSourcingService struct {
eventStore EventStore
snapshotStore SnapshotStore
}
// ReplayEvents replays events to rebuild state
func (es *EventSourcingService) ReplayEvents(ctx context.Context, aggregateID string) (*Order, error) {
// Try to get snapshot first
snapshot, _ := es.snapshotStore.GetSnapshot(ctx, aggregateID)
var order *Order
var startVersion int
if snapshot != nil {
order = snapshot.State.(*Order)
startVersion = snapshot.Version
} else {
order = &Order{ID: aggregateID}
startVersion = 0
}
// Get events after snapshot
events, err := es.eventStore.GetEvents(ctx, aggregateID)
if err != nil {
return nil, err
}
// Replay events
for _, event := range events {
es.applyEvent(order, event)
}
return order, nil
}
// applyEvent applies an event to the order
func (es *EventSourcingService) applyEvent(order *Order, event Event) {
switch e := event.(type) {
case *OrderCreatedEvent:
order.Status = "created"
order.CustomerID = e.CustomerID
order.Total = e.Total
case *OrderShippedEvent:
order.Status = "shipped"
case *OrderCancelledEvent:
order.Status = "cancelled"
}
}
CQRS Pattern
package main
import (
"context"
"sync"
)
// Command represents a command
type Command interface {
CommandType() string
}
// Query represents a query
type Query interface {
QueryType() string
}
// CommandHandler handles commands
type CommandHandler interface {
Handle(ctx context.Context, command Command) error
}
// QueryHandler handles queries
type QueryHandler interface {
Handle(ctx context.Context, query Query) (interface{}, error)
}
// CQRS implements Command Query Responsibility Segregation
type CQRS struct {
commandHandlers map[string]CommandHandler
queryHandlers map[string]QueryHandler
mu sync.RWMutex
}
// NewCQRS creates a new CQRS
func NewCQRS() *CQRS {
return &CQRS{
commandHandlers: make(map[string]CommandHandler),
queryHandlers: make(map[string]QueryHandler),
}
}
// RegisterCommandHandler registers a command handler
func (c *CQRS) RegisterCommandHandler(commandType string, handler CommandHandler) {
c.mu.Lock()
defer c.mu.Unlock()
c.commandHandlers[commandType] = handler
}
// RegisterQueryHandler registers a query handler
func (c *CQRS) RegisterQueryHandler(queryType string, handler QueryHandler) {
c.mu.Lock()
defer c.mu.Unlock()
c.queryHandlers[queryType] = handler
}
// ExecuteCommand executes a command
func (c *CQRS) ExecuteCommand(ctx context.Context, command Command) error {
c.mu.RLock()
handler, exists := c.commandHandlers[command.CommandType()]
c.mu.RUnlock()
if !exists {
return fmt.Errorf("no handler for command: %s", command.CommandType())
}
return handler.Handle(ctx, command)
}
// ExecuteQuery executes a query
func (c *CQRS) ExecuteQuery(ctx context.Context, query Query) (interface{}, error) {
c.mu.RLock()
handler, exists := c.queryHandlers[query.QueryType()]
c.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("no handler for query: %s", query.QueryType())
}
return handler.Handle(ctx, query)
}
// Example commands and queries
type CreateOrderCommand struct {
CustomerID string
Total float64
}
func (c *CreateOrderCommand) CommandType() string {
return "CreateOrder"
}
type GetOrderQuery struct {
OrderID string
}
func (q *GetOrderQuery) QueryType() string {
return "GetOrder"
}
Best Practices
1. Event Versioning
// Version events for backward compatibility
type VersionedEvent struct {
Event Event
Version int
}
2. Event Validation
// Validate events before storing
func ValidateEvent(event Event) error {
if event.AggregateID() == "" {
return fmt.Errorf("aggregate ID is required")
}
return nil
}
3. Idempotent Handlers
// Ensure event handlers are idempotent
func (s *OrderService) HandleOrderCreated(ctx context.Context, event Event) error {
// Check if already processed
// Process only once
return nil
}
4. Monitoring
// Monitor event processing
type EventMetrics struct {
ProcessedCount int
FailedCount int
AverageLatency time.Duration
}
Common Pitfalls
1. Event Immutability
Never modify events after storing.
2. No Event Versioning
Always version events for compatibility.
3. Synchronous Event Handling
Use asynchronous handlers to avoid blocking.
4. No Snapshots
Use snapshots for large event streams.
Resources
Summary
Event-driven architecture enables scalable, responsive systems. Key takeaways:
- Use events to represent state changes
- Implement event sourcing for audit trails
- Use CQRS to separate read and write models
- Version events for backward compatibility
- Implement idempotent event handlers
- Use snapshots for performance
- Monitor event processing metrics
By mastering event-driven patterns, you can build resilient, scalable systems.
Comments