Time Series Data Handling in Go
Introduction
Time series data is fundamental to monitoring, analytics, and financial applications. This guide covers handling, storing, and analyzing time series data in Go.
Proper time series handling enables efficient storage, fast queries, and meaningful analysis of temporal data.
Time Series Fundamentals
Basic Time Series Structure
package main
import (
"fmt"
"time"
)
// DataPoint represents a single time series data point
type DataPoint struct {
Timestamp time.Time
Value float64
Tags map[string]string
}
// TimeSeries represents a collection of data points
type TimeSeries struct {
Name string
DataPoints []DataPoint
}
// NewTimeSeries creates a new time series
func NewTimeSeries(name string) *TimeSeries {
return &TimeSeries{
Name: name,
DataPoints: []DataPoint{},
}
}
// AddPoint adds a data point
func (ts *TimeSeries) AddPoint(timestamp time.Time, value float64) {
ts.DataPoints = append(ts.DataPoints, DataPoint{
Timestamp: timestamp,
Value: value,
Tags: make(map[string]string),
})
}
// GetRange gets data points in a time range
func (ts *TimeSeries) GetRange(start, end time.Time) []DataPoint {
var result []DataPoint
for _, point := range ts.DataPoints {
if point.Timestamp.After(start) && point.Timestamp.Before(end) {
result = append(result, point)
}
}
return result
}
// Example usage
func TimeSeriesExample() {
ts := NewTimeSeries("cpu_usage")
now := time.Now()
for i := 0; i < 10; i++ {
ts.AddPoint(now.Add(time.Duration(i)*time.Minute), float64(50+i*2))
}
// Get last 5 minutes
start := now.Add(-5 * time.Minute)
end := now
points := ts.GetRange(start, end)
fmt.Printf("Found %d points\n", len(points))
}
Good: Proper Time Series Implementation
package main
import (
"fmt"
"sort"
"sync"
"time"
)
// TimeSeriesStore manages time series data
type TimeSeriesStore struct {
series map[string]*TimeSeries
mu sync.RWMutex
}
// NewTimeSeriesStore creates a new store
func NewTimeSeriesStore() *TimeSeriesStore {
return &TimeSeriesStore{
series: make(map[string]*TimeSeries),
}
}
// CreateSeries creates a new time series
func (tss *TimeSeriesStore) CreateSeries(name string) *TimeSeries {
tss.mu.Lock()
defer tss.mu.Unlock()
ts := NewTimeSeries(name)
tss.series[name] = ts
return ts
}
// GetSeries gets a time series
func (tss *TimeSeriesStore) GetSeries(name string) *TimeSeries {
tss.mu.RLock()
defer tss.mu.RUnlock()
return tss.series[name]
}
// AddPoint adds a point to a series
func (tss *TimeSeriesStore) AddPoint(seriesName string, timestamp time.Time, value float64) error {
tss.mu.Lock()
defer tss.mu.Unlock()
ts, exists := tss.series[seriesName]
if !exists {
return fmt.Errorf("series not found: %s", seriesName)
}
ts.AddPoint(timestamp, value)
return nil
}
// TimeSeriesAnalyzer analyzes time series data
type TimeSeriesAnalyzer struct {
ts *TimeSeries
}
// NewTimeSeriesAnalyzer creates a new analyzer
func NewTimeSeriesAnalyzer(ts *TimeSeries) *TimeSeriesAnalyzer {
return &TimeSeriesAnalyzer{ts: ts}
}
// Average calculates average value
func (tsa *TimeSeriesAnalyzer) Average() float64 {
if len(tsa.ts.DataPoints) == 0 {
return 0
}
var sum float64
for _, point := range tsa.ts.DataPoints {
sum += point.Value
}
return sum / float64(len(tsa.ts.DataPoints))
}
// Min finds minimum value
func (tsa *TimeSeriesAnalyzer) Min() float64 {
if len(tsa.ts.DataPoints) == 0 {
return 0
}
min := tsa.ts.DataPoints[0].Value
for _, point := range tsa.ts.DataPoints[1:] {
if point.Value < min {
min = point.Value
}
}
return min
}
// Max finds maximum value
func (tsa *TimeSeriesAnalyzer) Max() float64 {
if len(tsa.ts.DataPoints) == 0 {
return 0
}
max := tsa.ts.DataPoints[0].Value
for _, point := range tsa.ts.DataPoints[1:] {
if point.Value > max {
max = point.Value
}
}
return max
}
// Resample resamples time series to different interval
func (tsa *TimeSeriesAnalyzer) Resample(interval time.Duration) *TimeSeries {
if len(tsa.ts.DataPoints) == 0 {
return NewTimeSeries(tsa.ts.Name + "_resampled")
}
resampled := NewTimeSeries(tsa.ts.Name + "_resampled")
// Sort points by timestamp
sort.Slice(tsa.ts.DataPoints, func(i, j int) bool {
return tsa.ts.DataPoints[i].Timestamp.Before(tsa.ts.DataPoints[j].Timestamp)
})
start := tsa.ts.DataPoints[0].Timestamp
end := tsa.ts.DataPoints[len(tsa.ts.DataPoints)-1].Timestamp
for current := start; current.Before(end); current = current.Add(interval) {
windowEnd := current.Add(interval)
var sum float64
var count int
for _, point := range tsa.ts.DataPoints {
if point.Timestamp.After(current) && point.Timestamp.Before(windowEnd) {
sum += point.Value
count++
}
}
if count > 0 {
resampled.AddPoint(current, sum/float64(count))
}
}
return resampled
}
// Downsample downsamples time series
func (tsa *TimeSeriesAnalyzer) Downsample(factor int) *TimeSeries {
downsampled := NewTimeSeries(tsa.ts.Name + "_downsampled")
for i := 0; i < len(tsa.ts.DataPoints); i += factor {
downsampled.DataPoints = append(downsampled.DataPoints, tsa.ts.DataPoints[i])
}
return downsampled
}
Bad: Improper Time Series Handling
package main
// BAD: No sorting
func BadTimeSeriesQuery(ts *TimeSeries, start, end time.Time) []DataPoint {
// Assumes data is sorted
// No validation
return ts.DataPoints
}
// BAD: No aggregation
func BadAnalysis(ts *TimeSeries) {
// No averaging, min, max
// No resampling
}
// BAD: No memory management
func BadLargeTimeSeries() {
// Loads all data into memory
// No streaming
}
Problems:
- No sorting
- No aggregation
- No memory management
- No error handling
Time Series Aggregation
package main
import (
"time"
)
// AggregationFunc defines aggregation function
type AggregationFunc func([]float64) float64
// SumAggregation sums values
func SumAggregation(values []float64) float64 {
var sum float64
for _, v := range values {
sum += v
}
return sum
}
// AverageAggregation averages values
func AverageAggregation(values []float64) float64 {
if len(values) == 0 {
return 0
}
return SumAggregation(values) / float64(len(values))
}
// PercentileAggregation calculates percentile
func PercentileAggregation(percentile float64) AggregationFunc {
return func(values []float64) float64 {
if len(values) == 0 {
return 0
}
// Sort values
sort.Float64s(values)
index := int(float64(len(values)) * percentile / 100)
if index >= len(values) {
index = len(values) - 1
}
return values[index]
}
}
// Aggregate aggregates time series data
func (tsa *TimeSeriesAnalyzer) Aggregate(interval time.Duration, aggFunc AggregationFunc) *TimeSeries {
aggregated := NewTimeSeries(tsa.ts.Name + "_aggregated")
if len(tsa.ts.DataPoints) == 0 {
return aggregated
}
start := tsa.ts.DataPoints[0].Timestamp
end := tsa.ts.DataPoints[len(tsa.ts.DataPoints)-1].Timestamp
for current := start; current.Before(end); current = current.Add(interval) {
windowEnd := current.Add(interval)
var values []float64
for _, point := range tsa.ts.DataPoints {
if point.Timestamp.After(current) && point.Timestamp.Before(windowEnd) {
values = append(values, point.Value)
}
}
if len(values) > 0 {
aggregated.AddPoint(current, aggFunc(values))
}
}
return aggregated
}
Best Practices
1. Sort Data
sort.Slice(points, func(i, j int) bool {
return points[i].Timestamp.Before(points[j].Timestamp)
})
2. Use Appropriate Intervals
// Choose interval based on data frequency
interval := time.Minute
3. Handle Missing Data
// Interpolate or skip missing points
4. Compress Old Data
// Downsample or aggregate old data
Common Pitfalls
1. Unsorted Data
Always sort time series data.
2. No Aggregation
Aggregate data for different time scales.
3. Memory Issues
Stream or compress large datasets.
4. No Validation
Validate timestamps and values.
Resources
Summary
Proper time series handling is crucial. Key takeaways:
- Sort data by timestamp
- Implement aggregation functions
- Resample to different intervals
- Downsample for storage efficiency
- Handle missing data appropriately
- Monitor data quality
- Use appropriate storage
By mastering time series handling, you can build effective monitoring and analytics systems.
Comments