Skip to main content
โšก Calmops

Data Processing Pipelines in Go

Data Processing Pipelines in Go

Introduction

Data processing pipelines are essential for handling large volumes of data efficiently. This guide covers designing and implementing data processing pipelines in Go.

Well-designed pipelines enable efficient data transformation, filtering, and aggregation while maintaining code clarity and performance.

Pipeline Fundamentals

Basic Pipeline Pattern

package main

import (
	"fmt"
	"sync"
)

// Pipeline represents a data processing pipeline
type Pipeline struct {
	stages []Stage
}

// Stage represents a pipeline stage
type Stage interface {
	Process(input interface{}) (interface{}, error)
}

// NewPipeline creates a new pipeline
func NewPipeline() *Pipeline {
	return &Pipeline{
		stages: []Stage{},
	}
}

// AddStage adds a stage to the pipeline
func (p *Pipeline) AddStage(stage Stage) {
	p.stages = append(p.stages, stage)
}

// Execute executes the pipeline
func (p *Pipeline) Execute(input interface{}) (interface{}, error) {
	result := input

	for _, stage := range p.stages {
		output, err := stage.Process(result)
		if err != nil {
			return nil, err
		}
		result = output
	}

	return result, nil
}

// FilterStage filters data
type FilterStage struct {
	predicate func(interface{}) bool
}

// Process implements Stage
func (fs *FilterStage) Process(input interface{}) (interface{}, error) {
	items := input.([]interface{})
	var filtered []interface{}

	for _, item := range items {
		if fs.predicate(item) {
			filtered = append(filtered, item)
		}
	}

	return filtered, nil
}

// MapStage transforms data
type MapStage struct {
	transform func(interface{}) interface{}
}

// Process implements Stage
func (ms *MapStage) Process(input interface{}) (interface{}, error) {
	items := input.([]interface{})
	var mapped []interface{}

	for _, item := range items {
		mapped = append(mapped, ms.transform(item))
	}

	return mapped, nil
}

Good: Proper Pipeline Implementation

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// DataItem represents a data item
type DataItem struct {
	ID    string
	Value interface{}
	Error error
}

// PipelineStage represents a pipeline stage
type PipelineStage struct {
	name    string
	process func(context.Context, <-chan DataItem) <-chan DataItem
}

// DataPipeline manages data processing
type DataPipeline struct {
	stages []PipelineStage
	workers int
}

// NewDataPipeline creates a new data pipeline
func NewDataPipeline(workers int) *DataPipeline {
	return &DataPipeline{
		stages:  []PipelineStage{},
		workers: workers,
	}
}

// AddStage adds a stage to the pipeline
func (dp *DataPipeline) AddStage(name string, process func(context.Context, <-chan DataItem) <-chan DataItem) {
	dp.stages = append(dp.stages, PipelineStage{
		name:    name,
		process: process,
	})
}

// Execute executes the pipeline
func (dp *DataPipeline) Execute(ctx context.Context, input <-chan DataItem) <-chan DataItem {
	output := input

	for _, stage := range dp.stages {
		output = stage.process(ctx, output)
	}

	return output
}

// FilterStage creates a filter stage
func FilterStage(predicate func(DataItem) bool) func(context.Context, <-chan DataItem) <-chan DataItem {
	return func(ctx context.Context, input <-chan DataItem) <-chan DataItem {
		output := make(chan DataItem)

		go func() {
			defer close(output)

			for {
				select {
				case <-ctx.Done():
					return
				case item, ok := <-input:
					if !ok {
						return
					}

					if predicate(item) {
						output <- item
					}
				}
			}
		}()

		return output
	}
}

// MapStage creates a map stage
func MapStage(transform func(DataItem) DataItem) func(context.Context, <-chan DataItem) <-chan DataItem {
	return func(ctx context.Context, input <-chan DataItem) <-chan DataItem {
		output := make(chan DataItem)

		go func() {
			defer close(output)

			for {
				select {
				case <-ctx.Done():
					return
				case item, ok := <-input:
					if !ok {
						return
					}

					output <- transform(item)
				}
			}
		}()

		return output
	}
}

// AggregateStage creates an aggregate stage
func AggregateStage(aggregate func([]DataItem) DataItem) func(context.Context, <-chan DataItem) <-chan DataItem {
	return func(ctx context.Context, input <-chan DataItem) <-chan DataItem {
		output := make(chan DataItem)

		go func() {
			defer close(output)

			var items []DataItem

			for {
				select {
				case <-ctx.Done():
					return
				case item, ok := <-input:
					if !ok {
						if len(items) > 0 {
							output <- aggregate(items)
						}
						return
					}

					items = append(items, item)
				}
			}
		}()

		return output
	}
}

// ParallelStage creates a parallel processing stage
func ParallelStage(workers int, process func(DataItem) DataItem) func(context.Context, <-chan DataItem) <-chan DataItem {
	return func(ctx context.Context, input <-chan DataItem) <-chan DataItem {
		output := make(chan DataItem)

		go func() {
			defer close(output)

			var wg sync.WaitGroup
			workerChan := make(chan DataItem, workers)

			// Start workers
			for i := 0; i < workers; i++ {
				wg.Add(1)
				go func() {
					defer wg.Done()

					for item := range workerChan {
						output <- process(item)
					}
				}()
			}

			// Feed workers
			for {
				select {
				case <-ctx.Done():
					close(workerChan)
					wg.Wait()
					return
				case item, ok := <-input:
					if !ok {
						close(workerChan)
						wg.Wait()
						return
					}

					workerChan <- item
				}
			}
		}()

		return output
	}
}

// Example usage
func PipelineExample() {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// Create input channel
	input := make(chan DataItem)

	go func() {
		for i := 0; i < 100; i++ {
			input <- DataItem{
				ID:    fmt.Sprintf("item-%d", i),
				Value: i,
			}
		}
		close(input)
	}()

	// Create pipeline
	pipeline := NewDataPipeline(4)

	pipeline.AddStage("filter", FilterStage(func(item DataItem) bool {
		return item.Value.(int)%2 == 0
	}))

	pipeline.AddStage("map", MapStage(func(item DataItem) DataItem {
		item.Value = item.Value.(int) * 2
		return item
	}))

	// Execute pipeline
	output := pipeline.Execute(ctx, input)

	// Consume output
	for item := range output {
		fmt.Printf("Processed: %s = %v\n", item.ID, item.Value)
	}
}

Bad: Improper Pipeline Implementation

package main

// BAD: No error handling
func BadPipeline(data []int) []int {
	// No error handling
	// No context management
	// No parallelization
	return data
}

// BAD: Blocking operations
func BadBlockingPipeline(data []int) {
	// Blocks entire pipeline
	// No streaming
	// No concurrency
}

Problems:

  • No error handling
  • No context management
  • No parallelization
  • Blocking operations

Advanced Pipeline Patterns

Fan-Out/Fan-In Pattern

package main

import (
	"context"
	"sync"
)

// FanOut distributes work to multiple workers
func FanOut(ctx context.Context, input <-chan DataItem, workers int) []<-chan DataItem {
	channels := make([]<-chan DataItem, workers)

	for i := 0; i < workers; i++ {
		ch := make(chan DataItem)
		channels[i] = ch

		go func(out chan<- DataItem) {
			defer close(out)

			for item := range input {
				select {
				case <-ctx.Done():
					return
				case out <- item:
				}
			}
		}(ch)
	}

	return channels
}

// FanIn merges multiple channels
func FanIn(ctx context.Context, channels ...<-chan DataItem) <-chan DataItem {
	output := make(chan DataItem)

	go func() {
		defer close(output)

		var wg sync.WaitGroup

		for _, ch := range channels {
			wg.Add(1)
			go func(c <-chan DataItem) {
				defer wg.Done()

				for item := range c {
					select {
					case <-ctx.Done():
						return
					case output <- item:
					}
				}
			}(ch)
		}

		wg.Wait()
	}()

	return output
}

Batch Processing

package main

import (
	"context"
	"time"
)

// BatchStage creates a batching stage
func BatchStage(batchSize int, timeout time.Duration) func(context.Context, <-chan DataItem) <-chan []DataItem {
	return func(ctx context.Context, input <-chan DataItem) <-chan []DataItem {
		output := make(chan []DataItem)

		go func() {
			defer close(output)

			batch := make([]DataItem, 0, batchSize)
			timer := time.NewTimer(timeout)
			defer timer.Stop()

			for {
				select {
				case <-ctx.Done():
					if len(batch) > 0 {
						output <- batch
					}
					return

				case item, ok := <-input:
					if !ok {
						if len(batch) > 0 {
							output <- batch
						}
						return
					}

					batch = append(batch, item)

					if len(batch) >= batchSize {
						output <- batch
						batch = make([]DataItem, 0, batchSize)
						timer.Reset(timeout)
					}

				case <-timer.C:
					if len(batch) > 0 {
						output <- batch
						batch = make([]DataItem, 0, batchSize)
					}
					timer.Reset(timeout)
				}
			}
		}()

		return output
	}
}

Best Practices

1. Use Channels for Data Flow

// Good: Channel-based pipeline
output := pipeline.Execute(ctx, input)

2. Implement Error Handling

// Include error in data item
type DataItem struct {
	Value interface{}
	Error error
}

3. Use Context for Cancellation

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

4. Monitor Performance

// Track metrics
type PipelineMetrics struct {
	ItemsProcessed int
	Duration       time.Duration
	ErrorCount     int
}

Common Pitfalls

1. No Error Handling

Always handle errors in pipelines.

2. Blocking Operations

Use goroutines to avoid blocking.

3. No Context Management

Always use context for cancellation.

4. Memory Leaks

Always close channels properly.

Resources

Summary

Data processing pipelines enable efficient data handling. Key takeaways:

  • Design pipelines with clear stages
  • Use channels for data flow
  • Implement error handling
  • Use context for cancellation
  • Parallelize where appropriate
  • Monitor performance
  • Avoid memory leaks

By mastering pipelines, you can process large datasets efficiently.

Comments