Skip to main content
โšก Calmops

Stream Processing with Go

Stream Processing with Go

Introduction

Stream processing handles continuous data flows in real-time. This guide covers implementing stream processing systems in Go with windowing, aggregation, and stateful operations.

Stream processing enables real-time analytics, monitoring, and event processing at scale.

Stream Processing Basics

Simple Stream Processor

package main

import (
	"context"
	"fmt"
	"time"
)

// Event represents a stream event
type Event struct {
	Timestamp time.Time
	Key       string
	Value     interface{}
}

// StreamProcessor processes events
type StreamProcessor struct {
	handlers map[string][]func(Event)
}

// NewStreamProcessor creates a new stream processor
func NewStreamProcessor() *StreamProcessor {
	return &StreamProcessor{
		handlers: make(map[string][]func(Event)),
	}
}

// Subscribe subscribes to events
func (sp *StreamProcessor) Subscribe(eventType string, handler func(Event)) {
	sp.handlers[eventType] = append(sp.handlers[eventType], handler)
}

// Process processes an event
func (sp *StreamProcessor) Process(event Event) {
	if handlers, exists := sp.handlers[event.Key]; exists {
		for _, handler := range handlers {
			go handler(event)
		}
	}
}

// Example usage
func SimpleStreamExample() {
	processor := NewStreamProcessor()

	processor.Subscribe("temperature", func(event Event) {
		fmt.Printf("Temperature: %v at %v\n", event.Value, event.Timestamp)
	})

	// Process events
	processor.Process(Event{
		Timestamp: time.Now(),
		Key:       "temperature",
		Value:     25.5,
	})
}

Good: Proper Stream Processing Implementation

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// StreamWindow represents a time window
type StreamWindow struct {
	Start  time.Time
	End    time.Time
	Events []Event
}

// WindowedStreamProcessor processes events in windows
type WindowedStreamProcessor struct {
	windowSize time.Duration
	windows    map[string]*StreamWindow
	mu         sync.RWMutex
}

// NewWindowedStreamProcessor creates a new windowed processor
func NewWindowedStreamProcessor(windowSize time.Duration) *WindowedStreamProcessor {
	return &WindowedStreamProcessor{
		windowSize: windowSize,
		windows:    make(map[string]*StreamWindow),
	}
}

// AddEvent adds an event to the stream
func (wsp *WindowedStreamProcessor) AddEvent(event Event) {
	wsp.mu.Lock()
	defer wsp.mu.Unlock()

	windowKey := fmt.Sprintf("%s-%d", event.Key, event.Timestamp.Unix()/int64(wsp.windowSize.Seconds()))

	if _, exists := wsp.windows[windowKey]; !exists {
		windowStart := event.Timestamp.Truncate(wsp.windowSize)
		wsp.windows[windowKey] = &StreamWindow{
			Start:  windowStart,
			End:    windowStart.Add(wsp.windowSize),
			Events: []Event{},
		}
	}

	wsp.windows[windowKey].Events = append(wsp.windows[windowKey].Events, event)
}

// GetWindow gets a window
func (wsp *WindowedStreamProcessor) GetWindow(key string, timestamp time.Time) *StreamWindow {
	wsp.mu.RLock()
	defer wsp.mu.RUnlock()

	windowKey := fmt.Sprintf("%s-%d", key, timestamp.Unix()/int64(wsp.windowSize.Seconds()))
	return wsp.windows[windowKey]
}

// AggregateWindow aggregates events in a window
func (wsp *WindowedStreamProcessor) AggregateWindow(window *StreamWindow, aggregator func([]Event) interface{}) interface{} {
	return aggregator(window.Events)
}

// StatefulStreamProcessor maintains state across events
type StatefulStreamProcessor struct {
	state map[string]interface{}
	mu    sync.RWMutex
}

// NewStatefulStreamProcessor creates a new stateful processor
func NewStatefulStreamProcessor() *StatefulStreamProcessor {
	return &StatefulStreamProcessor{
		state: make(map[string]interface{}),
	}
}

// UpdateState updates processor state
func (ssp *StatefulStreamProcessor) UpdateState(key string, value interface{}) {
	ssp.mu.Lock()
	defer ssp.mu.Unlock()
	ssp.state[key] = value
}

// GetState gets processor state
func (ssp *StatefulStreamProcessor) GetState(key string) interface{} {
	ssp.mu.RLock()
	defer ssp.mu.RUnlock()
	return ssp.state[key]
}

// ProcessWithState processes event with state
func (ssp *StatefulStreamProcessor) ProcessWithState(event Event, processor func(Event, interface{}) interface{}) interface{} {
	state := ssp.GetState(event.Key)
	result := processor(event, state)
	ssp.UpdateState(event.Key, result)
	return result
}

// Example: Count events per key
func CountingProcessor() {
	processor := NewStatefulStreamProcessor()

	events := []Event{
		{Key: "user-1", Value: "login"},
		{Key: "user-1", Value: "click"},
		{Key: "user-2", Value: "login"},
		{Key: "user-1", Value: "logout"},
	}

	for _, event := range events {
		result := processor.ProcessWithState(event, func(e Event, state interface{}) interface{} {
			count := 0
			if state != nil {
				count = state.(int)
			}
			return count + 1
		})

		fmt.Printf("User %s: %d events\n", event.Key, result)
	}
}

Bad: Improper Stream Processing

package main

// BAD: No windowing
func BadStreamProcessor(events []Event) {
	// No time windows
	// No aggregation
	// No state management
}

// BAD: No error handling
func BadEventProcessing(event Event) {
	// No error handling
	// No recovery
	// No monitoring
}

Problems:

  • No windowing
  • No state management
  • No error handling
  • No monitoring

Advanced Stream Patterns

Tumbling Windows

package main

import (
	"context"
	"time"
)

// TumblingWindow creates non-overlapping windows
func TumblingWindow(ctx context.Context, input <-chan Event, windowSize time.Duration) <-chan []Event {
	output := make(chan []Event)

	go func() {
		defer close(output)

		var window []Event
		ticker := time.NewTicker(windowSize)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				if len(window) > 0 {
					output <- window
				}
				return

			case event, ok := <-input:
				if !ok {
					if len(window) > 0 {
						output <- window
					}
					return
				}

				window = append(window, event)

			case <-ticker.C:
				if len(window) > 0 {
					output <- window
					window = []Event{}
				}
			}
		}
	}()

	return output
}

Sliding Windows

package main

import (
	"context"
	"time"
)

// SlidingWindow creates overlapping windows
func SlidingWindow(ctx context.Context, input <-chan Event, windowSize, slideSize time.Duration) <-chan []Event {
	output := make(chan []Event)

	go func() {
		defer close(output)

		var buffer []Event
		ticker := time.NewTicker(slideSize)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return

			case event, ok := <-input:
				if !ok {
					return
				}

				buffer = append(buffer, event)

				// Remove old events
				cutoff := time.Now().Add(-windowSize)
				for len(buffer) > 0 && buffer[0].Timestamp.Before(cutoff) {
					buffer = buffer[1:]
				}

			case <-ticker.C:
				if len(buffer) > 0 {
					windowCopy := make([]Event, len(buffer))
					copy(windowCopy, buffer)
					output <- windowCopy
				}
			}
		}
	}()

	return output
}

Session Windows

package main

import (
	"context"
	"time"
)

// SessionWindow creates windows based on inactivity
func SessionWindow(ctx context.Context, input <-chan Event, sessionTimeout time.Duration) <-chan []Event {
	output := make(chan []Event)

	go func() {
		defer close(output)

		var session []Event
		timer := time.NewTimer(sessionTimeout)
		defer timer.Stop()

		for {
			select {
			case <-ctx.Done():
				if len(session) > 0 {
					output <- session
				}
				return

			case event, ok := <-input:
				if !ok {
					if len(session) > 0 {
						output <- session
					}
					return
				}

				session = append(session, event)
				timer.Reset(sessionTimeout)

			case <-timer.C:
				if len(session) > 0 {
					output <- session
					session = []Event{}
				}
			}
		}
	}()

	return output
}

Stream Aggregations

package main

import (
	"fmt"
)

// Sum aggregates sum of values
func Sum(events []Event) interface{} {
	var total float64
	for _, event := range events {
		if val, ok := event.Value.(float64); ok {
			total += val
		}
	}
	return total
}

// Average aggregates average of values
func Average(events []Event) interface{} {
	if len(events) == 0 {
		return 0.0
	}

	var total float64
	for _, event := range events {
		if val, ok := event.Value.(float64); ok {
			total += val
		}
	}

	return total / float64(len(events))
}

// Count aggregates count of events
func Count(events []Event) interface{} {
	return len(events)
}

// Max aggregates maximum value
func Max(events []Event) interface{} {
	if len(events) == 0 {
		return nil
	}

	max := events[0].Value.(float64)
	for _, event := range events[1:] {
		if val, ok := event.Value.(float64); ok && val > max {
			max = val
		}
	}

	return max
}

Best Practices

1. Use Appropriate Window Type

// Choose based on use case
// Tumbling: non-overlapping
// Sliding: overlapping
// Session: inactivity-based

2. Handle Late Events

// Allow grace period for late arrivals
gracePeriod := 5 * time.Second

3. Monitor Stream Health

// Track lag, throughput, errors
type StreamMetrics struct {
	EventsProcessed int
	Lag             time.Duration
	ErrorCount      int
}

4. Implement Backpressure

// Handle slow consumers
select {
case output <- event:
case <-ctx.Done():
	return
}

Common Pitfalls

1. No Window Management

Always define appropriate windows.

2. Ignoring Late Events

Handle events arriving after window close.

3. No State Cleanup

Clean up old state to prevent memory leaks.

4. Insufficient Monitoring

Monitor stream health continuously.

Resources

Summary

Stream processing enables real-time data handling. Key takeaways:

  • Choose appropriate window types
  • Implement stateful processing
  • Handle late events
  • Monitor stream health
  • Implement backpressure
  • Clean up state regularly
  • Test with realistic data volumes

By mastering stream processing, you can build real-time analytics systems.

Comments