Saga Pattern and Distributed Transactions
Introduction
The Saga pattern manages distributed transactions across microservices without traditional ACID guarantees. This guide covers implementing sagas in Go using both choreography and orchestration approaches.
Sagas break long-running transactions into smaller, manageable steps with compensating transactions for rollback, enabling reliable distributed operations.
Saga Fundamentals
Saga Step Model
package main
import (
"context"
"fmt"
"time"
)
// SagaStep represents a step in a saga
type SagaStep interface {
Execute(ctx context.Context) error
Compensate(ctx context.Context) error
Name() string
}
// BaseSagaStep provides common functionality
type BaseSagaStep struct {
name string
}
func (s *BaseSagaStep) Name() string {
return s.name
}
// OrderStep represents an order creation step
type OrderStep struct {
BaseSagaStep
orderID string
customerID string
total float64
}
// NewOrderStep creates a new order step
func NewOrderStep(orderID, customerID string, total float64) *OrderStep {
return &OrderStep{
BaseSagaStep: BaseSagaStep{name: "CreateOrder"},
orderID: orderID,
customerID: customerID,
total: total,
}
}
// Execute creates an order
func (s *OrderStep) Execute(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
fmt.Printf("Creating order %s for customer %s\n", s.orderID, s.customerID)
// Call order service
return nil
}
// Compensate cancels the order
func (s *OrderStep) Compensate(ctx context.Context) error {
fmt.Printf("Cancelling order %s\n", s.orderID)
// Call order service to cancel
return nil
}
// PaymentStep represents a payment step
type PaymentStep struct {
BaseSagaStep
orderID string
amount float64
}
// NewPaymentStep creates a new payment step
func NewPaymentStep(orderID string, amount float64) *PaymentStep {
return &PaymentStep{
BaseSagaStep: BaseSagaStep{name: "ProcessPayment"},
orderID: orderID,
amount: amount,
}
}
// Execute processes payment
func (s *PaymentStep) Execute(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
fmt.Printf("Processing payment of $%.2f for order %s\n", s.amount, s.orderID)
// Call payment service
return nil
}
// Compensate refunds the payment
func (s *PaymentStep) Compensate(ctx context.Context) error {
fmt.Printf("Refunding payment for order %s\n", s.orderID)
// Call payment service to refund
return nil
}
// ShippingStep represents a shipping step
type ShippingStep struct {
BaseSagaStep
orderID string
}
// NewShippingStep creates a new shipping step
func NewShippingStep(orderID string) *ShippingStep {
return &ShippingStep{
BaseSagaStep: BaseSagaStep{name: "ArrangeShipping"},
orderID: orderID,
}
}
// Execute arranges shipping
func (s *ShippingStep) Execute(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
fmt.Printf("Arranging shipping for order %s\n", s.orderID)
// Call shipping service
return nil
}
// Compensate cancels shipping
func (s *ShippingStep) Compensate(ctx context.Context) error {
fmt.Printf("Cancelling shipping for order %s\n", s.orderID)
// Call shipping service to cancel
return nil
}
Orchestration-Based Saga
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// SagaOrchestrator orchestrates saga execution
type SagaOrchestrator struct {
steps []SagaStep
mu sync.Mutex
}
// NewSagaOrchestrator creates a new orchestrator
func NewSagaOrchestrator() *SagaOrchestrator {
return &SagaOrchestrator{
steps: []SagaStep{},
}
}
// AddStep adds a step to the saga
func (so *SagaOrchestrator) AddStep(step SagaStep) {
so.mu.Lock()
defer so.mu.Unlock()
so.steps = append(so.steps, step)
}
// Execute executes the saga
func (so *SagaOrchestrator) Execute(ctx context.Context) error {
so.mu.Lock()
steps := make([]SagaStep, len(so.steps))
copy(steps, so.steps)
so.mu.Unlock()
executedSteps := []SagaStep{}
for _, step := range steps {
select {
case <-ctx.Done():
return so.compensate(ctx, executedSteps)
default:
}
fmt.Printf("Executing step: %s\n", step.Name())
if err := step.Execute(ctx); err != nil {
log.Printf("Step %s failed: %v", step.Name(), err)
return so.compensate(ctx, executedSteps)
}
executedSteps = append(executedSteps, step)
}
fmt.Println("Saga completed successfully")
return nil
}
// compensate compensates executed steps
func (so *SagaOrchestrator) compensate(ctx context.Context, executedSteps []SagaStep) error {
fmt.Println("Compensating saga...")
// Compensate in reverse order
for i := len(executedSteps) - 1; i >= 0; i-- {
step := executedSteps[i]
fmt.Printf("Compensating step: %s\n", step.Name())
if err := step.Compensate(ctx); err != nil {
log.Printf("Compensation of %s failed: %v", step.Name(), err)
// Continue compensating other steps
}
}
return fmt.Errorf("saga failed and was compensated")
}
// Example usage
func ExampleOrchestration() {
orchestrator := NewSagaOrchestrator()
// Add steps
orchestrator.AddStep(NewOrderStep("order-001", "customer-001", 99.99))
orchestrator.AddStep(NewPaymentStep("order-001", 99.99))
orchestrator.AddStep(NewShippingStep("order-001"))
// Execute saga
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := orchestrator.Execute(ctx); err != nil {
log.Printf("Saga execution failed: %v", err)
}
}
Choreography-Based Saga
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// SagaEvent represents a saga event
type SagaEvent interface {
EventType() string
OrderID() string
}
// OrderCreatedEvent represents order creation
type OrderCreatedEvent struct {
orderID string
customerID string
total float64
}
func (e *OrderCreatedEvent) EventType() string {
return "OrderCreated"
}
func (e *OrderCreatedEvent) OrderID() string {
return e.orderID
}
// PaymentProcessedEvent represents payment processing
type PaymentProcessedEvent struct {
orderID string
amount float64
}
func (e *PaymentProcessedEvent) EventType() string {
return "PaymentProcessed"
}
func (e *PaymentProcessedEvent) OrderID() string {
return e.orderID
}
// ShippingArrangedEvent represents shipping arrangement
type ShippingArrangedEvent struct {
orderID string
}
func (e *ShippingArrangedEvent) EventType() string {
return "ShippingArranged"
}
func (e *ShippingArrangedEvent) OrderID() string {
return e.orderID
}
// SagaEventBus manages saga events
type SagaEventBus struct {
handlers map[string][]func(ctx context.Context, event SagaEvent) error
mu sync.RWMutex
}
// NewSagaEventBus creates a new event bus
func NewSagaEventBus() *SagaEventBus {
return &SagaEventBus{
handlers: make(map[string][]func(ctx context.Context, event SagaEvent) error),
}
}
// Subscribe subscribes to events
func (seb *SagaEventBus) Subscribe(eventType string, handler func(ctx context.Context, event SagaEvent) error) {
seb.mu.Lock()
defer seb.mu.Unlock()
seb.handlers[eventType] = append(seb.handlers[eventType], handler)
}
// Publish publishes an event
func (seb *SagaEventBus) Publish(ctx context.Context, event SagaEvent) error {
seb.mu.RLock()
handlers, exists := seb.handlers[event.EventType()]
seb.mu.RUnlock()
if !exists {
return nil
}
for _, handler := range handlers {
go func(h func(ctx context.Context, event SagaEvent) error) {
if err := h(ctx, event); err != nil {
log.Printf("Error handling event: %v", err)
}
}(handler)
}
return nil
}
// OrderService handles order events
type OrderService struct {
eventBus *SagaEventBus
}
// NewOrderService creates a new order service
func NewOrderService(eventBus *SagaEventBus) *OrderService {
return &OrderService{eventBus: eventBus}
}
// CreateOrder creates an order
func (os *OrderService) CreateOrder(ctx context.Context, customerID string, total float64) error {
orderID := fmt.Sprintf("order-%d", time.Now().Unix())
fmt.Printf("Creating order %s\n", orderID)
event := &OrderCreatedEvent{
orderID: orderID,
customerID: customerID,
total: total,
}
return os.eventBus.Publish(ctx, event)
}
// PaymentService handles payment events
type PaymentService struct {
eventBus *SagaEventBus
}
// NewPaymentService creates a new payment service
func NewPaymentService(eventBus *SagaEventBus) *PaymentService {
ps := &PaymentService{eventBus: eventBus}
// Subscribe to order created events
eventBus.Subscribe("OrderCreated", ps.HandleOrderCreated)
return ps
}
// HandleOrderCreated handles order creation
func (ps *PaymentService) HandleOrderCreated(ctx context.Context, event SagaEvent) error {
orderEvent := event.(*OrderCreatedEvent)
fmt.Printf("Processing payment for order %s\n", orderEvent.orderID)
paymentEvent := &PaymentProcessedEvent{
orderID: orderEvent.orderID,
amount: orderEvent.total,
}
return ps.eventBus.Publish(ctx, paymentEvent)
}
// ShippingService handles shipping events
type ShippingService struct {
eventBus *SagaEventBus
}
// NewShippingService creates a new shipping service
func NewShippingService(eventBus *SagaEventBus) *ShippingService {
ss := &ShippingService{eventBus: eventBus}
// Subscribe to payment processed events
eventBus.Subscribe("PaymentProcessed", ss.HandlePaymentProcessed)
return ss
}
// HandlePaymentProcessed handles payment processing
func (ss *ShippingService) HandlePaymentProcessed(ctx context.Context, event SagaEvent) error {
paymentEvent := event.(*PaymentProcessedEvent)
fmt.Printf("Arranging shipping for order %s\n", paymentEvent.orderID)
shippingEvent := &ShippingArrangedEvent{
orderID: paymentEvent.orderID,
}
return ss.eventBus.Publish(ctx, shippingEvent)
}
// Example usage
func ExampleChoreography() {
eventBus := NewSagaEventBus()
// Create services
orderService := NewOrderService(eventBus)
_ = NewPaymentService(eventBus)
_ = NewShippingService(eventBus)
// Create order
ctx := context.Background()
if err := orderService.CreateOrder(ctx, "customer-001", 99.99); err != nil {
log.Printf("Failed to create order: %v", err)
}
// Wait for async processing
time.Sleep(2 * time.Second)
}
Saga State Machine
package main
import (
"context"
"fmt"
)
// SagaState represents saga state
type SagaState string
const (
StatePending SagaState = "PENDING"
StateExecuting SagaState = "EXECUTING"
StateCompleted SagaState = "COMPLETED"
StateCompensating SagaState = "COMPENSATING"
StateFailed SagaState = "FAILED"
)
// SagaStateMachine manages saga state transitions
type SagaStateMachine struct {
state SagaState
}
// NewSagaStateMachine creates a new state machine
func NewSagaStateMachine() *SagaStateMachine {
return &SagaStateMachine{
state: StatePending,
}
}
// Transition transitions to a new state
func (ssm *SagaStateMachine) Transition(newState SagaState) error {
validTransitions := map[SagaState][]SagaState{
StatePending: {StateExecuting},
StateExecuting: {StateCompleted, StateCompensating},
StateCompensating: {StateFailed},
StateCompleted: {},
StateFailed: {},
}
if validStates, exists := validTransitions[ssm.state]; exists {
for _, valid := range validStates {
if valid == newState {
ssm.state = newState
return nil
}
}
}
return fmt.Errorf("invalid transition from %s to %s", ssm.state, newState)
}
// GetState returns current state
func (ssm *SagaStateMachine) GetState() SagaState {
return ssm.state
}
Best Practices
1. Idempotent Steps
// Ensure steps can be retried safely
func (s *OrderStep) Execute(ctx context.Context) error {
// Check if already executed
// Execute only once
return nil
}
2. Timeout Management
// Set appropriate timeouts
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
3. Monitoring
// Monitor saga execution
type SagaMetrics struct {
SuccessCount int
FailureCount int
AverageDuration time.Duration
}
4. Compensation Logging
// Log all compensations for audit trail
func (so *SagaOrchestrator) compensate(ctx context.Context, executedSteps []SagaStep) error {
// Log compensation events
return nil
}
Common Pitfalls
1. Non-Idempotent Steps
Always make steps idempotent for safe retries.
2. No Timeout Management
Always set timeouts to prevent hanging.
3. Insufficient Logging
Log all saga transitions for debugging.
4. No Compensation Testing
Test compensation paths thoroughly.
Resources
Summary
The Saga pattern enables reliable distributed transactions. Key takeaways:
- Use orchestration for complex workflows
- Use choreography for loosely coupled services
- Make all steps idempotent
- Implement proper compensation logic
- Monitor saga execution
- Log all transitions
- Test compensation paths thoroughly
By mastering sagas, you can build reliable distributed systems.
Comments