Skip to main content
โšก Calmops

Saga Pattern and Distributed Transactions

Saga Pattern and Distributed Transactions

Introduction

The Saga pattern manages distributed transactions across microservices without traditional ACID guarantees. This guide covers implementing sagas in Go using both choreography and orchestration approaches.

Sagas break long-running transactions into smaller, manageable steps with compensating transactions for rollback, enabling reliable distributed operations.

Saga Fundamentals

Saga Step Model

package main

import (
	"context"
	"fmt"
	"time"
)

// SagaStep represents a step in a saga
type SagaStep interface {
	Execute(ctx context.Context) error
	Compensate(ctx context.Context) error
	Name() string
}

// BaseSagaStep provides common functionality
type BaseSagaStep struct {
	name string
}

func (s *BaseSagaStep) Name() string {
	return s.name
}

// OrderStep represents an order creation step
type OrderStep struct {
	BaseSagaStep
	orderID    string
	customerID string
	total      float64
}

// NewOrderStep creates a new order step
func NewOrderStep(orderID, customerID string, total float64) *OrderStep {
	return &OrderStep{
		BaseSagaStep: BaseSagaStep{name: "CreateOrder"},
		orderID:      orderID,
		customerID:   customerID,
		total:        total,
	}
}

// Execute creates an order
func (s *OrderStep) Execute(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	fmt.Printf("Creating order %s for customer %s\n", s.orderID, s.customerID)
	// Call order service
	return nil
}

// Compensate cancels the order
func (s *OrderStep) Compensate(ctx context.Context) error {
	fmt.Printf("Cancelling order %s\n", s.orderID)
	// Call order service to cancel
	return nil
}

// PaymentStep represents a payment step
type PaymentStep struct {
	BaseSagaStep
	orderID string
	amount  float64
}

// NewPaymentStep creates a new payment step
func NewPaymentStep(orderID string, amount float64) *PaymentStep {
	return &PaymentStep{
		BaseSagaStep: BaseSagaStep{name: "ProcessPayment"},
		orderID:      orderID,
		amount:       amount,
	}
}

// Execute processes payment
func (s *PaymentStep) Execute(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	fmt.Printf("Processing payment of $%.2f for order %s\n", s.amount, s.orderID)
	// Call payment service
	return nil
}

// Compensate refunds the payment
func (s *PaymentStep) Compensate(ctx context.Context) error {
	fmt.Printf("Refunding payment for order %s\n", s.orderID)
	// Call payment service to refund
	return nil
}

// ShippingStep represents a shipping step
type ShippingStep struct {
	BaseSagaStep
	orderID string
}

// NewShippingStep creates a new shipping step
func NewShippingStep(orderID string) *ShippingStep {
	return &ShippingStep{
		BaseSagaStep: BaseSagaStep{name: "ArrangeShipping"},
		orderID:      orderID,
	}
}

// Execute arranges shipping
func (s *ShippingStep) Execute(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	fmt.Printf("Arranging shipping for order %s\n", s.orderID)
	// Call shipping service
	return nil
}

// Compensate cancels shipping
func (s *ShippingStep) Compensate(ctx context.Context) error {
	fmt.Printf("Cancelling shipping for order %s\n", s.orderID)
	// Call shipping service to cancel
	return nil
}

Orchestration-Based Saga

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)

// SagaOrchestrator orchestrates saga execution
type SagaOrchestrator struct {
	steps []SagaStep
	mu    sync.Mutex
}

// NewSagaOrchestrator creates a new orchestrator
func NewSagaOrchestrator() *SagaOrchestrator {
	return &SagaOrchestrator{
		steps: []SagaStep{},
	}
}

// AddStep adds a step to the saga
func (so *SagaOrchestrator) AddStep(step SagaStep) {
	so.mu.Lock()
	defer so.mu.Unlock()
	so.steps = append(so.steps, step)
}

// Execute executes the saga
func (so *SagaOrchestrator) Execute(ctx context.Context) error {
	so.mu.Lock()
	steps := make([]SagaStep, len(so.steps))
	copy(steps, so.steps)
	so.mu.Unlock()

	executedSteps := []SagaStep{}

	for _, step := range steps {
		select {
		case <-ctx.Done():
			return so.compensate(ctx, executedSteps)
		default:
		}

		fmt.Printf("Executing step: %s\n", step.Name())
		if err := step.Execute(ctx); err != nil {
			log.Printf("Step %s failed: %v", step.Name(), err)
			return so.compensate(ctx, executedSteps)
		}

		executedSteps = append(executedSteps, step)
	}

	fmt.Println("Saga completed successfully")
	return nil
}

// compensate compensates executed steps
func (so *SagaOrchestrator) compensate(ctx context.Context, executedSteps []SagaStep) error {
	fmt.Println("Compensating saga...")

	// Compensate in reverse order
	for i := len(executedSteps) - 1; i >= 0; i-- {
		step := executedSteps[i]
		fmt.Printf("Compensating step: %s\n", step.Name())

		if err := step.Compensate(ctx); err != nil {
			log.Printf("Compensation of %s failed: %v", step.Name(), err)
			// Continue compensating other steps
		}
	}

	return fmt.Errorf("saga failed and was compensated")
}

// Example usage
func ExampleOrchestration() {
	orchestrator := NewSagaOrchestrator()

	// Add steps
	orchestrator.AddStep(NewOrderStep("order-001", "customer-001", 99.99))
	orchestrator.AddStep(NewPaymentStep("order-001", 99.99))
	orchestrator.AddStep(NewShippingStep("order-001"))

	// Execute saga
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	if err := orchestrator.Execute(ctx); err != nil {
		log.Printf("Saga execution failed: %v", err)
	}
}

Choreography-Based Saga

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)

// SagaEvent represents a saga event
type SagaEvent interface {
	EventType() string
	OrderID() string
}

// OrderCreatedEvent represents order creation
type OrderCreatedEvent struct {
	orderID    string
	customerID string
	total      float64
}

func (e *OrderCreatedEvent) EventType() string {
	return "OrderCreated"
}

func (e *OrderCreatedEvent) OrderID() string {
	return e.orderID
}

// PaymentProcessedEvent represents payment processing
type PaymentProcessedEvent struct {
	orderID string
	amount  float64
}

func (e *PaymentProcessedEvent) EventType() string {
	return "PaymentProcessed"
}

func (e *PaymentProcessedEvent) OrderID() string {
	return e.orderID
}

// ShippingArrangedEvent represents shipping arrangement
type ShippingArrangedEvent struct {
	orderID string
}

func (e *ShippingArrangedEvent) EventType() string {
	return "ShippingArranged"
}

func (e *ShippingArrangedEvent) OrderID() string {
	return e.orderID
}

// SagaEventBus manages saga events
type SagaEventBus struct {
	handlers map[string][]func(ctx context.Context, event SagaEvent) error
	mu       sync.RWMutex
}

// NewSagaEventBus creates a new event bus
func NewSagaEventBus() *SagaEventBus {
	return &SagaEventBus{
		handlers: make(map[string][]func(ctx context.Context, event SagaEvent) error),
	}
}

// Subscribe subscribes to events
func (seb *SagaEventBus) Subscribe(eventType string, handler func(ctx context.Context, event SagaEvent) error) {
	seb.mu.Lock()
	defer seb.mu.Unlock()
	seb.handlers[eventType] = append(seb.handlers[eventType], handler)
}

// Publish publishes an event
func (seb *SagaEventBus) Publish(ctx context.Context, event SagaEvent) error {
	seb.mu.RLock()
	handlers, exists := seb.handlers[event.EventType()]
	seb.mu.RUnlock()

	if !exists {
		return nil
	}

	for _, handler := range handlers {
		go func(h func(ctx context.Context, event SagaEvent) error) {
			if err := h(ctx, event); err != nil {
				log.Printf("Error handling event: %v", err)
			}
		}(handler)
	}

	return nil
}

// OrderService handles order events
type OrderService struct {
	eventBus *SagaEventBus
}

// NewOrderService creates a new order service
func NewOrderService(eventBus *SagaEventBus) *OrderService {
	return &OrderService{eventBus: eventBus}
}

// CreateOrder creates an order
func (os *OrderService) CreateOrder(ctx context.Context, customerID string, total float64) error {
	orderID := fmt.Sprintf("order-%d", time.Now().Unix())
	fmt.Printf("Creating order %s\n", orderID)

	event := &OrderCreatedEvent{
		orderID:    orderID,
		customerID: customerID,
		total:      total,
	}

	return os.eventBus.Publish(ctx, event)
}

// PaymentService handles payment events
type PaymentService struct {
	eventBus *SagaEventBus
}

// NewPaymentService creates a new payment service
func NewPaymentService(eventBus *SagaEventBus) *PaymentService {
	ps := &PaymentService{eventBus: eventBus}

	// Subscribe to order created events
	eventBus.Subscribe("OrderCreated", ps.HandleOrderCreated)

	return ps
}

// HandleOrderCreated handles order creation
func (ps *PaymentService) HandleOrderCreated(ctx context.Context, event SagaEvent) error {
	orderEvent := event.(*OrderCreatedEvent)
	fmt.Printf("Processing payment for order %s\n", orderEvent.orderID)

	paymentEvent := &PaymentProcessedEvent{
		orderID: orderEvent.orderID,
		amount:  orderEvent.total,
	}

	return ps.eventBus.Publish(ctx, paymentEvent)
}

// ShippingService handles shipping events
type ShippingService struct {
	eventBus *SagaEventBus
}

// NewShippingService creates a new shipping service
func NewShippingService(eventBus *SagaEventBus) *ShippingService {
	ss := &ShippingService{eventBus: eventBus}

	// Subscribe to payment processed events
	eventBus.Subscribe("PaymentProcessed", ss.HandlePaymentProcessed)

	return ss
}

// HandlePaymentProcessed handles payment processing
func (ss *ShippingService) HandlePaymentProcessed(ctx context.Context, event SagaEvent) error {
	paymentEvent := event.(*PaymentProcessedEvent)
	fmt.Printf("Arranging shipping for order %s\n", paymentEvent.orderID)

	shippingEvent := &ShippingArrangedEvent{
		orderID: paymentEvent.orderID,
	}

	return ss.eventBus.Publish(ctx, shippingEvent)
}

// Example usage
func ExampleChoreography() {
	eventBus := NewSagaEventBus()

	// Create services
	orderService := NewOrderService(eventBus)
	_ = NewPaymentService(eventBus)
	_ = NewShippingService(eventBus)

	// Create order
	ctx := context.Background()
	if err := orderService.CreateOrder(ctx, "customer-001", 99.99); err != nil {
		log.Printf("Failed to create order: %v", err)
	}

	// Wait for async processing
	time.Sleep(2 * time.Second)
}

Saga State Machine

package main

import (
	"context"
	"fmt"
)

// SagaState represents saga state
type SagaState string

const (
	StatePending      SagaState = "PENDING"
	StateExecuting    SagaState = "EXECUTING"
	StateCompleted    SagaState = "COMPLETED"
	StateCompensating SagaState = "COMPENSATING"
	StateFailed       SagaState = "FAILED"
)

// SagaStateMachine manages saga state transitions
type SagaStateMachine struct {
	state SagaState
}

// NewSagaStateMachine creates a new state machine
func NewSagaStateMachine() *SagaStateMachine {
	return &SagaStateMachine{
		state: StatePending,
	}
}

// Transition transitions to a new state
func (ssm *SagaStateMachine) Transition(newState SagaState) error {
	validTransitions := map[SagaState][]SagaState{
		StatePending:      {StateExecuting},
		StateExecuting:    {StateCompleted, StateCompensating},
		StateCompensating: {StateFailed},
		StateCompleted:    {},
		StateFailed:       {},
	}

	if validStates, exists := validTransitions[ssm.state]; exists {
		for _, valid := range validStates {
			if valid == newState {
				ssm.state = newState
				return nil
			}
		}
	}

	return fmt.Errorf("invalid transition from %s to %s", ssm.state, newState)
}

// GetState returns current state
func (ssm *SagaStateMachine) GetState() SagaState {
	return ssm.state
}

Best Practices

1. Idempotent Steps

// Ensure steps can be retried safely
func (s *OrderStep) Execute(ctx context.Context) error {
	// Check if already executed
	// Execute only once
	return nil
}

2. Timeout Management

// Set appropriate timeouts
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

3. Monitoring

// Monitor saga execution
type SagaMetrics struct {
	SuccessCount int
	FailureCount int
	AverageDuration time.Duration
}

4. Compensation Logging

// Log all compensations for audit trail
func (so *SagaOrchestrator) compensate(ctx context.Context, executedSteps []SagaStep) error {
	// Log compensation events
	return nil
}

Common Pitfalls

1. Non-Idempotent Steps

Always make steps idempotent for safe retries.

2. No Timeout Management

Always set timeouts to prevent hanging.

3. Insufficient Logging

Log all saga transitions for debugging.

4. No Compensation Testing

Test compensation paths thoroughly.

Resources

Summary

The Saga pattern enables reliable distributed transactions. Key takeaways:

  • Use orchestration for complex workflows
  • Use choreography for loosely coupled services
  • Make all steps idempotent
  • Implement proper compensation logic
  • Monitor saga execution
  • Log all transitions
  • Test compensation paths thoroughly

By mastering sagas, you can build reliable distributed systems.

Comments