Stream Processing with Go
Introduction
Stream processing handles continuous data flows in real-time. This guide covers implementing stream processing systems in Go with windowing, aggregation, and stateful operations.
Stream processing enables real-time analytics, monitoring, and event processing at scale.
Stream Processing Basics
Simple Stream Processor
package main
import (
"context"
"fmt"
"time"
)
// Event represents a stream event
type Event struct {
Timestamp time.Time
Key string
Value interface{}
}
// StreamProcessor processes events
type StreamProcessor struct {
handlers map[string][]func(Event)
}
// NewStreamProcessor creates a new stream processor
func NewStreamProcessor() *StreamProcessor {
return &StreamProcessor{
handlers: make(map[string][]func(Event)),
}
}
// Subscribe subscribes to events
func (sp *StreamProcessor) Subscribe(eventType string, handler func(Event)) {
sp.handlers[eventType] = append(sp.handlers[eventType], handler)
}
// Process processes an event
func (sp *StreamProcessor) Process(event Event) {
if handlers, exists := sp.handlers[event.Key]; exists {
for _, handler := range handlers {
go handler(event)
}
}
}
// Example usage
func SimpleStreamExample() {
processor := NewStreamProcessor()
processor.Subscribe("temperature", func(event Event) {
fmt.Printf("Temperature: %v at %v\n", event.Value, event.Timestamp)
})
// Process events
processor.Process(Event{
Timestamp: time.Now(),
Key: "temperature",
Value: 25.5,
})
}
Good: Proper Stream Processing Implementation
package main
import (
"context"
"fmt"
"sync"
"time"
)
// StreamWindow represents a time window
type StreamWindow struct {
Start time.Time
End time.Time
Events []Event
}
// WindowedStreamProcessor processes events in windows
type WindowedStreamProcessor struct {
windowSize time.Duration
windows map[string]*StreamWindow
mu sync.RWMutex
}
// NewWindowedStreamProcessor creates a new windowed processor
func NewWindowedStreamProcessor(windowSize time.Duration) *WindowedStreamProcessor {
return &WindowedStreamProcessor{
windowSize: windowSize,
windows: make(map[string]*StreamWindow),
}
}
// AddEvent adds an event to the stream
func (wsp *WindowedStreamProcessor) AddEvent(event Event) {
wsp.mu.Lock()
defer wsp.mu.Unlock()
windowKey := fmt.Sprintf("%s-%d", event.Key, event.Timestamp.Unix()/int64(wsp.windowSize.Seconds()))
if _, exists := wsp.windows[windowKey]; !exists {
windowStart := event.Timestamp.Truncate(wsp.windowSize)
wsp.windows[windowKey] = &StreamWindow{
Start: windowStart,
End: windowStart.Add(wsp.windowSize),
Events: []Event{},
}
}
wsp.windows[windowKey].Events = append(wsp.windows[windowKey].Events, event)
}
// GetWindow gets a window
func (wsp *WindowedStreamProcessor) GetWindow(key string, timestamp time.Time) *StreamWindow {
wsp.mu.RLock()
defer wsp.mu.RUnlock()
windowKey := fmt.Sprintf("%s-%d", key, timestamp.Unix()/int64(wsp.windowSize.Seconds()))
return wsp.windows[windowKey]
}
// AggregateWindow aggregates events in a window
func (wsp *WindowedStreamProcessor) AggregateWindow(window *StreamWindow, aggregator func([]Event) interface{}) interface{} {
return aggregator(window.Events)
}
// StatefulStreamProcessor maintains state across events
type StatefulStreamProcessor struct {
state map[string]interface{}
mu sync.RWMutex
}
// NewStatefulStreamProcessor creates a new stateful processor
func NewStatefulStreamProcessor() *StatefulStreamProcessor {
return &StatefulStreamProcessor{
state: make(map[string]interface{}),
}
}
// UpdateState updates processor state
func (ssp *StatefulStreamProcessor) UpdateState(key string, value interface{}) {
ssp.mu.Lock()
defer ssp.mu.Unlock()
ssp.state[key] = value
}
// GetState gets processor state
func (ssp *StatefulStreamProcessor) GetState(key string) interface{} {
ssp.mu.RLock()
defer ssp.mu.RUnlock()
return ssp.state[key]
}
// ProcessWithState processes event with state
func (ssp *StatefulStreamProcessor) ProcessWithState(event Event, processor func(Event, interface{}) interface{}) interface{} {
state := ssp.GetState(event.Key)
result := processor(event, state)
ssp.UpdateState(event.Key, result)
return result
}
// Example: Count events per key
func CountingProcessor() {
processor := NewStatefulStreamProcessor()
events := []Event{
{Key: "user-1", Value: "login"},
{Key: "user-1", Value: "click"},
{Key: "user-2", Value: "login"},
{Key: "user-1", Value: "logout"},
}
for _, event := range events {
result := processor.ProcessWithState(event, func(e Event, state interface{}) interface{} {
count := 0
if state != nil {
count = state.(int)
}
return count + 1
})
fmt.Printf("User %s: %d events\n", event.Key, result)
}
}
Bad: Improper Stream Processing
package main
// BAD: No windowing
func BadStreamProcessor(events []Event) {
// No time windows
// No aggregation
// No state management
}
// BAD: No error handling
func BadEventProcessing(event Event) {
// No error handling
// No recovery
// No monitoring
}
Problems:
- No windowing
- No state management
- No error handling
- No monitoring
Advanced Stream Patterns
Tumbling Windows
package main
import (
"context"
"time"
)
// TumblingWindow creates non-overlapping windows
func TumblingWindow(ctx context.Context, input <-chan Event, windowSize time.Duration) <-chan []Event {
output := make(chan []Event)
go func() {
defer close(output)
var window []Event
ticker := time.NewTicker(windowSize)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
if len(window) > 0 {
output <- window
}
return
case event, ok := <-input:
if !ok {
if len(window) > 0 {
output <- window
}
return
}
window = append(window, event)
case <-ticker.C:
if len(window) > 0 {
output <- window
window = []Event{}
}
}
}
}()
return output
}
Sliding Windows
package main
import (
"context"
"time"
)
// SlidingWindow creates overlapping windows
func SlidingWindow(ctx context.Context, input <-chan Event, windowSize, slideSize time.Duration) <-chan []Event {
output := make(chan []Event)
go func() {
defer close(output)
var buffer []Event
ticker := time.NewTicker(slideSize)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case event, ok := <-input:
if !ok {
return
}
buffer = append(buffer, event)
// Remove old events
cutoff := time.Now().Add(-windowSize)
for len(buffer) > 0 && buffer[0].Timestamp.Before(cutoff) {
buffer = buffer[1:]
}
case <-ticker.C:
if len(buffer) > 0 {
windowCopy := make([]Event, len(buffer))
copy(windowCopy, buffer)
output <- windowCopy
}
}
}
}()
return output
}
Session Windows
package main
import (
"context"
"time"
)
// SessionWindow creates windows based on inactivity
func SessionWindow(ctx context.Context, input <-chan Event, sessionTimeout time.Duration) <-chan []Event {
output := make(chan []Event)
go func() {
defer close(output)
var session []Event
timer := time.NewTimer(sessionTimeout)
defer timer.Stop()
for {
select {
case <-ctx.Done():
if len(session) > 0 {
output <- session
}
return
case event, ok := <-input:
if !ok {
if len(session) > 0 {
output <- session
}
return
}
session = append(session, event)
timer.Reset(sessionTimeout)
case <-timer.C:
if len(session) > 0 {
output <- session
session = []Event{}
}
}
}
}()
return output
}
Stream Aggregations
package main
import (
"fmt"
)
// Sum aggregates sum of values
func Sum(events []Event) interface{} {
var total float64
for _, event := range events {
if val, ok := event.Value.(float64); ok {
total += val
}
}
return total
}
// Average aggregates average of values
func Average(events []Event) interface{} {
if len(events) == 0 {
return 0.0
}
var total float64
for _, event := range events {
if val, ok := event.Value.(float64); ok {
total += val
}
}
return total / float64(len(events))
}
// Count aggregates count of events
func Count(events []Event) interface{} {
return len(events)
}
// Max aggregates maximum value
func Max(events []Event) interface{} {
if len(events) == 0 {
return nil
}
max := events[0].Value.(float64)
for _, event := range events[1:] {
if val, ok := event.Value.(float64); ok && val > max {
max = val
}
}
return max
}
Best Practices
1. Use Appropriate Window Type
// Choose based on use case
// Tumbling: non-overlapping
// Sliding: overlapping
// Session: inactivity-based
2. Handle Late Events
// Allow grace period for late arrivals
gracePeriod := 5 * time.Second
3. Monitor Stream Health
// Track lag, throughput, errors
type StreamMetrics struct {
EventsProcessed int
Lag time.Duration
ErrorCount int
}
4. Implement Backpressure
// Handle slow consumers
select {
case output <- event:
case <-ctx.Done():
return
}
Common Pitfalls
1. No Window Management
Always define appropriate windows.
2. Ignoring Late Events
Handle events arriving after window close.
3. No State Cleanup
Clean up old state to prevent memory leaks.
4. Insufficient Monitoring
Monitor stream health continuously.
Resources
- Stream Processing Patterns
- Go Concurrency Patterns
- Windowing Strategies
- Stream Processing Best Practices
Summary
Stream processing enables real-time data handling. Key takeaways:
- Choose appropriate window types
- Implement stateful processing
- Handle late events
- Monitor stream health
- Implement backpressure
- Clean up state regularly
- Test with realistic data volumes
By mastering stream processing, you can build real-time analytics systems.
Comments