Data Processing Pipelines in Go
Introduction
Data processing pipelines are essential for handling large volumes of data efficiently. This guide covers designing and implementing data processing pipelines in Go.
Well-designed pipelines enable efficient data transformation, filtering, and aggregation while maintaining code clarity and performance.
Pipeline Fundamentals
Basic Pipeline Pattern
package main
import (
"fmt"
"sync"
)
// Pipeline represents a data processing pipeline
type Pipeline struct {
stages []Stage
}
// Stage represents a pipeline stage
type Stage interface {
Process(input interface{}) (interface{}, error)
}
// NewPipeline creates a new pipeline
func NewPipeline() *Pipeline {
return &Pipeline{
stages: []Stage{},
}
}
// AddStage adds a stage to the pipeline
func (p *Pipeline) AddStage(stage Stage) {
p.stages = append(p.stages, stage)
}
// Execute executes the pipeline
func (p *Pipeline) Execute(input interface{}) (interface{}, error) {
result := input
for _, stage := range p.stages {
output, err := stage.Process(result)
if err != nil {
return nil, err
}
result = output
}
return result, nil
}
// FilterStage filters data
type FilterStage struct {
predicate func(interface{}) bool
}
// Process implements Stage
func (fs *FilterStage) Process(input interface{}) (interface{}, error) {
items := input.([]interface{})
var filtered []interface{}
for _, item := range items {
if fs.predicate(item) {
filtered = append(filtered, item)
}
}
return filtered, nil
}
// MapStage transforms data
type MapStage struct {
transform func(interface{}) interface{}
}
// Process implements Stage
func (ms *MapStage) Process(input interface{}) (interface{}, error) {
items := input.([]interface{})
var mapped []interface{}
for _, item := range items {
mapped = append(mapped, ms.transform(item))
}
return mapped, nil
}
Good: Proper Pipeline Implementation
package main
import (
"context"
"fmt"
"sync"
"time"
)
// DataItem represents a data item
type DataItem struct {
ID string
Value interface{}
Error error
}
// PipelineStage represents a pipeline stage
type PipelineStage struct {
name string
process func(context.Context, <-chan DataItem) <-chan DataItem
}
// DataPipeline manages data processing
type DataPipeline struct {
stages []PipelineStage
workers int
}
// NewDataPipeline creates a new data pipeline
func NewDataPipeline(workers int) *DataPipeline {
return &DataPipeline{
stages: []PipelineStage{},
workers: workers,
}
}
// AddStage adds a stage to the pipeline
func (dp *DataPipeline) AddStage(name string, process func(context.Context, <-chan DataItem) <-chan DataItem) {
dp.stages = append(dp.stages, PipelineStage{
name: name,
process: process,
})
}
// Execute executes the pipeline
func (dp *DataPipeline) Execute(ctx context.Context, input <-chan DataItem) <-chan DataItem {
output := input
for _, stage := range dp.stages {
output = stage.process(ctx, output)
}
return output
}
// FilterStage creates a filter stage
func FilterStage(predicate func(DataItem) bool) func(context.Context, <-chan DataItem) <-chan DataItem {
return func(ctx context.Context, input <-chan DataItem) <-chan DataItem {
output := make(chan DataItem)
go func() {
defer close(output)
for {
select {
case <-ctx.Done():
return
case item, ok := <-input:
if !ok {
return
}
if predicate(item) {
output <- item
}
}
}
}()
return output
}
}
// MapStage creates a map stage
func MapStage(transform func(DataItem) DataItem) func(context.Context, <-chan DataItem) <-chan DataItem {
return func(ctx context.Context, input <-chan DataItem) <-chan DataItem {
output := make(chan DataItem)
go func() {
defer close(output)
for {
select {
case <-ctx.Done():
return
case item, ok := <-input:
if !ok {
return
}
output <- transform(item)
}
}
}()
return output
}
}
// AggregateStage creates an aggregate stage
func AggregateStage(aggregate func([]DataItem) DataItem) func(context.Context, <-chan DataItem) <-chan DataItem {
return func(ctx context.Context, input <-chan DataItem) <-chan DataItem {
output := make(chan DataItem)
go func() {
defer close(output)
var items []DataItem
for {
select {
case <-ctx.Done():
return
case item, ok := <-input:
if !ok {
if len(items) > 0 {
output <- aggregate(items)
}
return
}
items = append(items, item)
}
}
}()
return output
}
}
// ParallelStage creates a parallel processing stage
func ParallelStage(workers int, process func(DataItem) DataItem) func(context.Context, <-chan DataItem) <-chan DataItem {
return func(ctx context.Context, input <-chan DataItem) <-chan DataItem {
output := make(chan DataItem)
go func() {
defer close(output)
var wg sync.WaitGroup
workerChan := make(chan DataItem, workers)
// Start workers
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range workerChan {
output <- process(item)
}
}()
}
// Feed workers
for {
select {
case <-ctx.Done():
close(workerChan)
wg.Wait()
return
case item, ok := <-input:
if !ok {
close(workerChan)
wg.Wait()
return
}
workerChan <- item
}
}
}()
return output
}
}
// Example usage
func PipelineExample() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create input channel
input := make(chan DataItem)
go func() {
for i := 0; i < 100; i++ {
input <- DataItem{
ID: fmt.Sprintf("item-%d", i),
Value: i,
}
}
close(input)
}()
// Create pipeline
pipeline := NewDataPipeline(4)
pipeline.AddStage("filter", FilterStage(func(item DataItem) bool {
return item.Value.(int)%2 == 0
}))
pipeline.AddStage("map", MapStage(func(item DataItem) DataItem {
item.Value = item.Value.(int) * 2
return item
}))
// Execute pipeline
output := pipeline.Execute(ctx, input)
// Consume output
for item := range output {
fmt.Printf("Processed: %s = %v\n", item.ID, item.Value)
}
}
Bad: Improper Pipeline Implementation
package main
// BAD: No error handling
func BadPipeline(data []int) []int {
// No error handling
// No context management
// No parallelization
return data
}
// BAD: Blocking operations
func BadBlockingPipeline(data []int) {
// Blocks entire pipeline
// No streaming
// No concurrency
}
Problems:
- No error handling
- No context management
- No parallelization
- Blocking operations
Advanced Pipeline Patterns
Fan-Out/Fan-In Pattern
package main
import (
"context"
"sync"
)
// FanOut distributes work to multiple workers
func FanOut(ctx context.Context, input <-chan DataItem, workers int) []<-chan DataItem {
channels := make([]<-chan DataItem, workers)
for i := 0; i < workers; i++ {
ch := make(chan DataItem)
channels[i] = ch
go func(out chan<- DataItem) {
defer close(out)
for item := range input {
select {
case <-ctx.Done():
return
case out <- item:
}
}
}(ch)
}
return channels
}
// FanIn merges multiple channels
func FanIn(ctx context.Context, channels ...<-chan DataItem) <-chan DataItem {
output := make(chan DataItem)
go func() {
defer close(output)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan DataItem) {
defer wg.Done()
for item := range c {
select {
case <-ctx.Done():
return
case output <- item:
}
}
}(ch)
}
wg.Wait()
}()
return output
}
Batch Processing
package main
import (
"context"
"time"
)
// BatchStage creates a batching stage
func BatchStage(batchSize int, timeout time.Duration) func(context.Context, <-chan DataItem) <-chan []DataItem {
return func(ctx context.Context, input <-chan DataItem) <-chan []DataItem {
output := make(chan []DataItem)
go func() {
defer close(output)
batch := make([]DataItem, 0, batchSize)
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
output <- batch
}
return
case item, ok := <-input:
if !ok {
if len(batch) > 0 {
output <- batch
}
return
}
batch = append(batch, item)
if len(batch) >= batchSize {
output <- batch
batch = make([]DataItem, 0, batchSize)
timer.Reset(timeout)
}
case <-timer.C:
if len(batch) > 0 {
output <- batch
batch = make([]DataItem, 0, batchSize)
}
timer.Reset(timeout)
}
}
}()
return output
}
}
Best Practices
1. Use Channels for Data Flow
// Good: Channel-based pipeline
output := pipeline.Execute(ctx, input)
2. Implement Error Handling
// Include error in data item
type DataItem struct {
Value interface{}
Error error
}
3. Use Context for Cancellation
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
4. Monitor Performance
// Track metrics
type PipelineMetrics struct {
ItemsProcessed int
Duration time.Duration
ErrorCount int
}
Common Pitfalls
1. No Error Handling
Always handle errors in pipelines.
2. Blocking Operations
Use goroutines to avoid blocking.
3. No Context Management
Always use context for cancellation.
4. Memory Leaks
Always close channels properly.
Resources
Summary
Data processing pipelines enable efficient data handling. Key takeaways:
- Design pipelines with clear stages
- Use channels for data flow
- Implement error handling
- Use context for cancellation
- Parallelize where appropriate
- Monitor performance
- Avoid memory leaks
By mastering pipelines, you can process large datasets efficiently.
Comments