Skip to main content
โšก Calmops

Time Series Data Handling in Go

Time Series Data Handling in Go

Introduction

Time series data is fundamental to monitoring, analytics, and financial applications. This guide covers handling, storing, and analyzing time series data in Go.

Proper time series handling enables efficient storage, fast queries, and meaningful analysis of temporal data.

Time Series Fundamentals

Basic Time Series Structure

package main

import (
	"fmt"
	"time"
)

// DataPoint represents a single time series data point
type DataPoint struct {
	Timestamp time.Time
	Value     float64
	Tags      map[string]string
}

// TimeSeries represents a collection of data points
type TimeSeries struct {
	Name       string
	DataPoints []DataPoint
}

// NewTimeSeries creates a new time series
func NewTimeSeries(name string) *TimeSeries {
	return &TimeSeries{
		Name:       name,
		DataPoints: []DataPoint{},
	}
}

// AddPoint adds a data point
func (ts *TimeSeries) AddPoint(timestamp time.Time, value float64) {
	ts.DataPoints = append(ts.DataPoints, DataPoint{
		Timestamp: timestamp,
		Value:     value,
		Tags:      make(map[string]string),
	})
}

// GetRange gets data points in a time range
func (ts *TimeSeries) GetRange(start, end time.Time) []DataPoint {
	var result []DataPoint

	for _, point := range ts.DataPoints {
		if point.Timestamp.After(start) && point.Timestamp.Before(end) {
			result = append(result, point)
		}
	}

	return result
}

// Example usage
func TimeSeriesExample() {
	ts := NewTimeSeries("cpu_usage")

	now := time.Now()
	for i := 0; i < 10; i++ {
		ts.AddPoint(now.Add(time.Duration(i)*time.Minute), float64(50+i*2))
	}

	// Get last 5 minutes
	start := now.Add(-5 * time.Minute)
	end := now
	points := ts.GetRange(start, end)

	fmt.Printf("Found %d points\n", len(points))
}

Good: Proper Time Series Implementation

package main

import (
	"fmt"
	"sort"
	"sync"
	"time"
)

// TimeSeriesStore manages time series data
type TimeSeriesStore struct {
	series map[string]*TimeSeries
	mu     sync.RWMutex
}

// NewTimeSeriesStore creates a new store
func NewTimeSeriesStore() *TimeSeriesStore {
	return &TimeSeriesStore{
		series: make(map[string]*TimeSeries),
	}
}

// CreateSeries creates a new time series
func (tss *TimeSeriesStore) CreateSeries(name string) *TimeSeries {
	tss.mu.Lock()
	defer tss.mu.Unlock()

	ts := NewTimeSeries(name)
	tss.series[name] = ts
	return ts
}

// GetSeries gets a time series
func (tss *TimeSeriesStore) GetSeries(name string) *TimeSeries {
	tss.mu.RLock()
	defer tss.mu.RUnlock()

	return tss.series[name]
}

// AddPoint adds a point to a series
func (tss *TimeSeriesStore) AddPoint(seriesName string, timestamp time.Time, value float64) error {
	tss.mu.Lock()
	defer tss.mu.Unlock()

	ts, exists := tss.series[seriesName]
	if !exists {
		return fmt.Errorf("series not found: %s", seriesName)
	}

	ts.AddPoint(timestamp, value)
	return nil
}

// TimeSeriesAnalyzer analyzes time series data
type TimeSeriesAnalyzer struct {
	ts *TimeSeries
}

// NewTimeSeriesAnalyzer creates a new analyzer
func NewTimeSeriesAnalyzer(ts *TimeSeries) *TimeSeriesAnalyzer {
	return &TimeSeriesAnalyzer{ts: ts}
}

// Average calculates average value
func (tsa *TimeSeriesAnalyzer) Average() float64 {
	if len(tsa.ts.DataPoints) == 0 {
		return 0
	}

	var sum float64
	for _, point := range tsa.ts.DataPoints {
		sum += point.Value
	}

	return sum / float64(len(tsa.ts.DataPoints))
}

// Min finds minimum value
func (tsa *TimeSeriesAnalyzer) Min() float64 {
	if len(tsa.ts.DataPoints) == 0 {
		return 0
	}

	min := tsa.ts.DataPoints[0].Value
	for _, point := range tsa.ts.DataPoints[1:] {
		if point.Value < min {
			min = point.Value
		}
	}

	return min
}

// Max finds maximum value
func (tsa *TimeSeriesAnalyzer) Max() float64 {
	if len(tsa.ts.DataPoints) == 0 {
		return 0
	}

	max := tsa.ts.DataPoints[0].Value
	for _, point := range tsa.ts.DataPoints[1:] {
		if point.Value > max {
			max = point.Value
		}
	}

	return max
}

// Resample resamples time series to different interval
func (tsa *TimeSeriesAnalyzer) Resample(interval time.Duration) *TimeSeries {
	if len(tsa.ts.DataPoints) == 0 {
		return NewTimeSeries(tsa.ts.Name + "_resampled")
	}

	resampled := NewTimeSeries(tsa.ts.Name + "_resampled")

	// Sort points by timestamp
	sort.Slice(tsa.ts.DataPoints, func(i, j int) bool {
		return tsa.ts.DataPoints[i].Timestamp.Before(tsa.ts.DataPoints[j].Timestamp)
	})

	start := tsa.ts.DataPoints[0].Timestamp
	end := tsa.ts.DataPoints[len(tsa.ts.DataPoints)-1].Timestamp

	for current := start; current.Before(end); current = current.Add(interval) {
		windowEnd := current.Add(interval)
		var sum float64
		var count int

		for _, point := range tsa.ts.DataPoints {
			if point.Timestamp.After(current) && point.Timestamp.Before(windowEnd) {
				sum += point.Value
				count++
			}
		}

		if count > 0 {
			resampled.AddPoint(current, sum/float64(count))
		}
	}

	return resampled
}

// Downsample downsamples time series
func (tsa *TimeSeriesAnalyzer) Downsample(factor int) *TimeSeries {
	downsampled := NewTimeSeries(tsa.ts.Name + "_downsampled")

	for i := 0; i < len(tsa.ts.DataPoints); i += factor {
		downsampled.DataPoints = append(downsampled.DataPoints, tsa.ts.DataPoints[i])
	}

	return downsampled
}

Bad: Improper Time Series Handling

package main

// BAD: No sorting
func BadTimeSeriesQuery(ts *TimeSeries, start, end time.Time) []DataPoint {
	// Assumes data is sorted
	// No validation
	return ts.DataPoints
}

// BAD: No aggregation
func BadAnalysis(ts *TimeSeries) {
	// No averaging, min, max
	// No resampling
}

// BAD: No memory management
func BadLargeTimeSeries() {
	// Loads all data into memory
	// No streaming
}

Problems:

  • No sorting
  • No aggregation
  • No memory management
  • No error handling

Time Series Aggregation

package main

import (
	"time"
)

// AggregationFunc defines aggregation function
type AggregationFunc func([]float64) float64

// SumAggregation sums values
func SumAggregation(values []float64) float64 {
	var sum float64
	for _, v := range values {
		sum += v
	}
	return sum
}

// AverageAggregation averages values
func AverageAggregation(values []float64) float64 {
	if len(values) == 0 {
		return 0
	}
	return SumAggregation(values) / float64(len(values))
}

// PercentileAggregation calculates percentile
func PercentileAggregation(percentile float64) AggregationFunc {
	return func(values []float64) float64 {
		if len(values) == 0 {
			return 0
		}

		// Sort values
		sort.Float64s(values)

		index := int(float64(len(values)) * percentile / 100)
		if index >= len(values) {
			index = len(values) - 1
		}

		return values[index]
	}
}

// Aggregate aggregates time series data
func (tsa *TimeSeriesAnalyzer) Aggregate(interval time.Duration, aggFunc AggregationFunc) *TimeSeries {
	aggregated := NewTimeSeries(tsa.ts.Name + "_aggregated")

	if len(tsa.ts.DataPoints) == 0 {
		return aggregated
	}

	start := tsa.ts.DataPoints[0].Timestamp
	end := tsa.ts.DataPoints[len(tsa.ts.DataPoints)-1].Timestamp

	for current := start; current.Before(end); current = current.Add(interval) {
		windowEnd := current.Add(interval)
		var values []float64

		for _, point := range tsa.ts.DataPoints {
			if point.Timestamp.After(current) && point.Timestamp.Before(windowEnd) {
				values = append(values, point.Value)
			}
		}

		if len(values) > 0 {
			aggregated.AddPoint(current, aggFunc(values))
		}
	}

	return aggregated
}

Best Practices

1. Sort Data

sort.Slice(points, func(i, j int) bool {
	return points[i].Timestamp.Before(points[j].Timestamp)
})

2. Use Appropriate Intervals

// Choose interval based on data frequency
interval := time.Minute

3. Handle Missing Data

// Interpolate or skip missing points

4. Compress Old Data

// Downsample or aggregate old data

Common Pitfalls

1. Unsorted Data

Always sort time series data.

2. No Aggregation

Aggregate data for different time scales.

3. Memory Issues

Stream or compress large datasets.

4. No Validation

Validate timestamps and values.

Resources

Summary

Proper time series handling is crucial. Key takeaways:

  • Sort data by timestamp
  • Implement aggregation functions
  • Resample to different intervals
  • Downsample for storage efficiency
  • Handle missing data appropriately
  • Monitor data quality
  • Use appropriate storage

By mastering time series handling, you can build effective monitoring and analytics systems.

Comments