Skip to main content
⚡ Calmops

TimescaleDB for AI: Machine Learning, Vector Search, and Data Pipelines

Introduction

As artificial intelligence and machine learning become central to modern applications, the role of databases in AI pipelines has expanded significantly. TimescaleDB, built on PostgreSQL, offers unique capabilities for AI workloads that traditional time-series databases cannot match. In this article, we explore how TimescaleDB serves as a foundation for AI applications, from feature engineering and time-series forecasting to storing vector embeddings and building complete ML pipelines.

The combination of time-series data with AI creates powerful applications: predictive maintenance in manufacturing, anomaly detection in monitoring systems, demand forecasting in retail, and many more. TimescaleDB’s SQL interface, PostgreSQL ecosystem, and time-series optimizations make it an excellent choice for these use cases.

Time-Series Feature Engineering

Machine learning models require carefully engineered features from raw time-series data. TimescaleDB provides powerful SQL capabilities for feature engineering directly in the database.

Statistical Features

Computing statistical features is fundamental to time-series ML:

-- Create a feature view for ML training
CREATE MATERIALIZED VIEW sensor_features
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('5 minutes', time) AS bucket,
    sensor_id,
    
    -- Statistical features
    AVG(temperature) AS mean_temp,
    STDDEV(temperature) AS std_temp,
    MIN(temperature) AS min_temp,
    MAX(temperature) AS max_temp,
    MAX(temp) - MIN(temperature) AS temp_range,
    
    -- Count of readings
    COUNT(*) AS reading_count,
    
    -- Missing data indicator
    COUNT(*) FILTER (WHERE temperature IS NULL) AS null_count,
    
    -- Trend indicators
    AVG(temperature) - LAG(AVG(temperature), 1) OVER w AS temp_change,
    
    -- Volatility
    AVG(ABS(temperature - AVG(temperature) OVER w)) AS avg_deviation
    
FROM sensor_data
WHERE time > NOW() - INTERVAL '30 days'
WINDOW w AS (
    PARTITION BY sensor_id 
    ORDER BY time 
    ROWS BETWEEN 11 PRECEDING AND CURRENT ROW
)
GROUP BY bucket, sensor_id;

-- Create refresh policy
SELECT add_continuous_aggregate_policy('sensor_features',
    start_offset => INTERVAL '1 hour',
    end_offset => INTERVAL '5 minutes',
    schedule_interval => INTERVAL '10 minutes');

This materialized view automatically maintains features like mean, standard deviation, min/max, trend indicators, and volatility measures—everything needed for ML model training.

Window-Based Features

Window functions enable sophisticated feature extraction:

-- Rolling window features for anomaly detection
CREATE MATERIALIZED VIEW rolling_features
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 minute', time) AS bucket,
    sensor_id,
    
    -- 5-minute rolling statistics
    AVG(value) OVER (
        PARTITION BY sensor_id 
        ORDER BY time 
        ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
    ) AS rolling_avg_5m,
    
    -- Exponential moving average
    AVG(value) OVER (
        PARTITION BY sensor_id 
        ORDER BY time 
        ROWS BETWEEN 20 PRECEDING AND CURRENT ROW
    ) AS ema_20,
    
    -- Lag features
    LAG(value, 1) OVER (PARTITION BY sensor_id ORDER BY time) AS value_l    LAG(valueag_1,
, 5) OVER (PARTITION BY sensor_id ORDER BY time) AS value_lag_5,
    
    -- Percentage change
    (value - LAG(value, 1) OVER (PARTITION BY sensor_id ORDER BY time)) 
        / NULLIF(LAG(value, 1) OVER (PARTITION BY sensor_id ORDER BY time), 0) 
        * 100 AS pct_change
    
FROM raw_metrics
WHERE time > NOW() - INTERVAL '7 days';

SELECT add_continuous_aggregate_policy('rolling_features',
    start_offset => INTERVAL '10 minutes',
    end_offset => INTERVAL '1 minute',
    schedule_interval => INTERVAL '5 minutes');

These rolling features capture temporal patterns that ML models use to make predictions.

Frequency Domain Features

For certain applications, frequency domain features are valuable:

-- Approximate frequency analysis using discrete Fourier transform
-- This is a simplified example; real implementations may use extensions

CREATE OR REPLACE FUNCTION compute_fft_features(
    sensor_id TEXT,
    start_time TIMESTAMPTZ,
    end_time TIMESTAMPTZ
) RETURNS TABLE (
    frequency_band TEXT,
    power NUMERIC
) AS $$
BEGIN
    -- Simplified FFT approximation using SQL
    -- Real implementation would use PL/Python or extension
    RETURN QUERY
    WITH RECURSIVE fft AS (
        SELECT 
            time_bucket('1 second', time) AS t,
            AVG(value) AS v
        FROM sensor_data
        WHERE sensor_id = compute_fft_features.sensor_id
          AND time BETWEEN compute_fft_features.start_time 
                       AND compute_fft_features.end_time
        GROUP BY t
    )
    SELECT 
        CASE 
            WHEN bucket < 10 THEN 'low'
            WHEN bucket < 50 THEN 'medium'
            ELSE 'high'
        END AS frequency_band,
        SUM(POWER(v, 2)) AS power
    FROM (
        SELECT v, NTILE(100) OVER (ORDER BY t) AS bucket
        FROM fft
    ) sub
    GROUP BY frequency_band;
END;
$$ LANGUAGE plpgsql;

For production frequency analysis, consider using MADlib or PL/R extensions, but the SQL approach demonstrates TimescaleDB’s extensibility.

Storing Vector Embeddings

Modern AI applications often work with vector embeddings. TimescaleDB’s PostgreSQL foundation supports storing and querying vectors efficiently.

Vector Storage

Store embeddings alongside time-series data:

-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Create table with vector embeddings
CREATE TABLE embeddings (
    time        TIMESTAMPTZ NOT NULL,
    entity_id   TEXT NOT NULL,
    entity_type TEXT NOT NULL,  -- 'sensor', 'event', 'pattern'
    embedding   VECTOR(384),     -- 384-dimensional embeddings
    metadata    JSONB
);

SELECT create_hypertable('embeddings', 'time');

-- Create index for similarity search
CREATE INDEX idx_embeddings_cosine 
ON embeddings USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

The pgvector extension provides efficient vector operations and supports multiple distance metrics: Euclidean, cosine, and inner product.

Combine time filtering with vector similarity:

-- Find similar events from the past week
SELECT 
    e.entity_id,
    e.time,
    e.embedding <=> $query_embedding AS similarity,
    e.metadata
FROM embeddings e
WHERE e.time > NOW() - INTERVAL '7 days'
  AND e.entity_type = 'event'
ORDER BY e.embedding <=> $query_embedding
LIMIT 10;

This hybrid approach filters by time first, then performs similarity search on the reduced dataset—efficient and practical for real-world applications.

Time-Series Pattern Matching

Use embeddings to find similar patterns:

-- Find historical periods similar to current anomaly
WITH current_pattern AS (
    SELECT embedding
    FROM embeddings
    WHERE time > NOW() - INTERVAL '1 hour'
      AND entity_type = 'anomaly'
    ORDER BY time DESC
    LIMIT 1
)
SELECT 
    time_bucket('1 hour', time) AS bucket,
    COUNT(*) AS similar_events,
    AVG((metadata->>'severity')::numeric) AS avg_severity
FROM embeddings e
CROSS JOIN current_pattern cp
WHERE e.entity_type = 'anomaly'
  AND e.embedding <=> cp.embedding < 0.2  -- Similarity threshold
  AND e.time < NOW() - INTERVAL '1 day'  -- Historical only
GROUP BY bucket
ORDER BY bucket DESC;

This pattern matching enables finding historical analogues to current events—a powerful technique for root cause analysis.

ML Model Training Pipelines

TimescaleDB can serve as the complete data platform for ML model development.

Training Data Preparation

Prepare datasets for model training:

-- Create training dataset combining features and labels
CREATE TABLE training_data (
    id          SERIAL PRIMARY KEY,
    bucket      TIMESTAMPTZ NOT NULL,
    sensor_id   TEXT NOT NULL,
    features    JSONB NOT NULL,
    label       BOOLEAN NOT NULL,  -- e.g., anomaly detected
    created_at  TIMESTAMPTZ DEFAULT NOW()
);

-- Populate from continuous aggregates
INSERT INTO training_data (bucket, sensor_id, features, label)
SELECT 
    f.bucket,
    f.sensor_id,
    jsonb_build_object(
        'mean_temp', f.mean_temp,
        'std_temp', f.std_temp,
        'temp_range', f.temp_range,
        'reading_count', f.reading_count,
        'null_count', f.null_count,
        'temp_change', f.temp_change
    ) AS features,
    CASE 
        WHEN f.max_temp > 100 THEN TRUE  -- Label: anomaly
        ELSE FALSE
    END AS label
FROM sensor_features f
WHERE f.bucket > NOW() - INTERVAL '90 days';

-- Export for Python ML training
\copy training_data(features, label) TO '/tmp/training_data.csv' 
    WITH (FORMAT CSV, HEADER);

The training data can be exported in various formats (CSV, Parquet) for Python, R, or other ML frameworks.

Model Storage and Versioning

Track model versions alongside data:

-- Model registry table
CREATE TABLE models (
    model_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    model_name      TEXT NOT NULL,
    version         TEXT NOT NULL,
    model_type      TEXT NOT NULL,  -- 'regression', 'classification', etc.
    hyperparameters JSONB,
    metrics         JSONB,
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    created_by      TEXT DEFAULT CURRENT_USER
);

-- Store trained model metadata
INSERT INTO models (model_name, version, model_type, hyperparameters, metrics)
VALUES (
    'temperature_anomaly_detector',
    'v1.2.0',
    'isolation_forest',
    '{"contamination": 0.05, "n_estimators": 100}',
    '{"precision": 0.92, "recall": 0.88, "f1": 0.90}'
);

-- Link models to training data
ALTER TABLE training_data ADD COLUMN model_id UUID;
ALTER TABLE training_data ADD CONSTRAINT fk_model 
    FOREIGN KEY (model_id) REFERENCES models(model_id);

This model registry approach ensures reproducibility and traceability of ML experiments.

Time-Series Forecasting

TimescaleDB’s time-series functions enable forecasting directly in the database.

Simple Forecasting with Moving Averages

Basic forecasting uses window functions:

-- Forecast using exponential moving average
WITH historical_data AS (
    SELECT 
        time_bucket('1 hour', time) AS bucket,
        AVG(value) AS value
    FROM metrics
    WHERE time > NOW() - INTERVAL '30 days'
      AND metric_name = 'cpu.usage'
    GROUP BY bucket
),
forecast AS (
    SELECT 
        bucket,
        value,
        AVG(value) OVER (
            ORDER BY bucket 
            ROWS BETWEEN 24 PRECEDING AND CURRENT ROW
        ) AS moving_avg,
        -- Simple linear trend
        AVG(value) OVER (
            ORDER BY bucket 
            ROWS BETWEEN 168 PRECEDING AND CURRENT ROW
        ) - AVG(value) OVER (
            ORDER BY bucket 
            ROWS BETWEEN 336 PRECEDING AND 168 PRECEDING
        ) AS hourly_trend
    FROM historical_data
)
SELECT 
    bucket + INTERVAL '1 hour' AS forecast_bucket,
    moving_avg + hourly_trend AS forecast_value,
    'trend' AS method
FROM forecast
ORDER BY bucket DESC
LIMIT 24;

This simple approach provides basic forecasting without external ML libraries.

Integration with Python Forecasting

For sophisticated forecasting, integrate with Python:

-- Create a function to call Python forecasting
CREATE OR REPLACE FUNCTION forecast_series(
    metric_name TEXT,
    forecast_horizon INTERVAL,
    model_type TEXT DEFAULT 'prophet'
) RETURNS TABLE (
    forecast_time TIMESTAMPTZ,
    predicted_value DOUBLE PRECISION,
    lower_bound DOUBLE PRECISION,
    upper_bound DOUBLE PRECISION
) AS $$
    import pandas as pd
    from prophet import Prophet
    import plpy
    
    # Fetch data
    plpy.execute(f"""
        CREATE TEMP TABLE forecast_data AS
        SELECT time_bucket('1 hour', time) AS ds, AVG(value) AS y
        FROM metrics
        WHERE time > NOW() - INTERVAL '30 days'
          AND metric_name = '{metric_name}'
        GROUP BY ds
    """)
    
    df = plpy.execute("SELECT ds, y FROM forecast_data ORDER BY ds")
    df = pd.DataFrame([(r['ds'], r['y']) for r in df], columns=['ds', 'y'])
    
    # Train model
    model = Prophet(yearly_seasonality=True, weekly_seasonality=True)
    model.fit(df)
    
    # Make future dataframe
    future = model.make_future_dataframe(periods=int(forecast_horizon.total_seconds()/3600))
    forecast = model.predict(future)
    
    # Return results
    return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(
        int(forecast_horizon.total_seconds()/3600)
    ).values.tolist()
$$ LANGUAGE plpython3u;

This integration enables using powerful forecasting libraries like Prophet or ARIMA directly from SQL.

Anomaly Detection

TimescaleDB excels at real-time anomaly detection patterns.

Statistical Anomaly Detection

Detect anomalies using statistical thresholds:

-- Real-time anomaly detection query
WITH recent_stats AS (
    SELECT 
        sensor_id,
        AVG(value) AS mean,
        STDDEV(value) AS stddev,
        AVG(value) + 3 * STDDEV(value) AS upper_threshold,
        AVG(value) - 3 * STDDEV(value) AS lower_threshold
    FROM metrics
    WHERE time > NOW() - INTERVAL '1 hour'
    GROUP BY sensor_id
),
current_values AS (
    SELECT 
        time,
        m.sensor_id,
        m.value,
        rs.mean,
        rs.stddev,
        rs.upper_threshold,
        rs.lower_threshold,
        CASE 
            WHEN m.value > rs.upper_threshold THEN 'high'
            WHEN m.value < rs.lower_threshold THEN 'low'
            ELSE 'normal'
        END AS anomaly_status
    FROM metrics m
    JOIN recent_stats rs ON m.sensor_id = rs.sensor_id
    WHERE m.time > NOW() - INTERVAL '5 minutes'
)
SELECT 
    time,
    sensor_id,
    value,
    anomaly_status,
    ROUND((value - mean) / NULLIF(stddev, 0), 2) AS z_score
FROM current_values
WHERE anomaly_status != 'normal'
ORDER BY time DESC;

This statistical approach flags values beyond 3 standard deviations from the mean—simple but effective for many use cases.

Machine Learning Anomaly Detection

For complex patterns, integrate with ML models:

-- Use stored model for anomaly scoring
CREATE OR REPLACE FUNCTION detect_anomaly(
    sensor_id TEXT,
    current_value DOUBLE PRECISION,
    context JSONB
) RETURNS JSONB AS $$
DECLARE
    model_row RECORD;
    prediction JSONB;
BEGIN
    -- Get active model
    SELECT * INTO model_row
    FROM models
    WHERE model_name = 'anomaly_detector'
      AND model_type = 'isolation_forest'
    ORDER BY created_at DESC
    LIMIT 1;
    
    -- In production, load model and predict
    -- This is a simplified example
    prediction := jsonb_build_object(
        'is_anomaly', current_value > 100,
        'anomaly_score', RANDOM(),
        'model_id', model_row.model_id
    );
    
    RETURN prediction;
END;
$$ LANGUAGE plpgsql;

-- Apply to recent data
SELECT 
    time,
    sensor_id,
    value,
    detect_anomaly(sensor_id, value, jsonb_build_object(
        'hour', EXTRACT(HOUR FROM time),
        'day_of_week', EXTRACT(DOW FROM time)
    )) AS anomaly_result
FROM metrics
WHERE time > NOW() - INTERVAL '1 hour'
  AND detect_anomaly(sensor_id, value, '{}'::jsonb)->>'is_anomaly' = 'true';

This pattern enables sophisticated ML-based anomaly detection while maintaining SQL accessibility.

Data Export for AI Frameworks

Export data efficiently for external ML processing.

Pandas Integration

Export to Python Pandas:

import pandas as pd
import psycopg2

# Connect to TimescaleDB
conn = psycopg2.connect(
    host="localhost",
    database="timeseries",
    user="postgres",
    password="password"
)

# Fetch training data
query = """
    SELECT features, label 
    FROM training_data 
    WHERE bucket > NOW() - INTERVAL '90 days'
"""

df = pd.read_sql(query, conn)
print(df.head())

# Prepare features
X = df['features'].apply(lambda x: pd.Series(x)).values
y = df['label'].values

This standard Python approach works with scikit-learn, TensorFlow, PyTorch, and other ML frameworks.

Apache Spark Integration

For large-scale processing:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TimescaleDB Spark") \
    .config("spark.jars", "/path/to/postgresql.jar") \
    .getOrCreate()

# Read from TimescaleDB
df = spark.read.format("jdbc").options(
    url="jdbc:postgresql://localhost:5432/timeseries",
    dbtable="sensor_features",
    user="postgres",
    password="password"
).load()

# Process with Spark ML
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Feature assembly
assembler = VectorAssembler(
    inputCols=["mean_temp", "std_temp", "temp_range"],
    outputCol="features"
)
data = assembler.transform(df)

# Train model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
model = rf.fit(data)

Spark integration enables processing at massive scale.

Complete AI Pipeline Example

Here’s a complete pipeline from data ingestion to predictions:

-- 1. Ingest raw sensor data (high throughput)
INSERT INTO sensor_data VALUES 
    (NOW(), 'sensor-001', 22.5, 45.0),
    (NOW(), 'sensor-002', 23.1, 44.8);

-- 2. Features are automatically computed (via continuous aggregate)
-- 3. Anomaly detection runs in real-time
SELECT * FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
  AND detect_anomaly(sensor_id, temperature, '{}'::jsonb)->>'is_anomaly' = 'true';

-- 4. Predictions stored alongside raw data
CREATE TABLE predictions (
    time        TIMESTAMPTZ NOT NULL,
    sensor_id   TEXT NOT NULL,
    prediction  JSONB NOT NULL,
    model_id    UUID NOT NULL
);

SELECT create_hypertable('predictions', 'time');

-- 5. Model feedback loop - compare predictions to actual
CREATE MATERIALIZED VIEW prediction_accuracy
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 hour', p.time) AS bucket,
    p.model_id,
    AVG(CASE 
        WHEN (p.prediction->>'is_anomaly')::boolean = (s.anomaly_label)::boolean 
        THEN 1.0 ELSE 0.0 
    END) AS accuracy
FROM predictions p
JOIN (
    SELECT sensor_id, time, 'true' AS anomaly_label
    FROM sensor_data
    WHERE is_anomaly = true
) s ON p.sensor_id = s.sensor_id 
   AND p.time = s.time
GROUP BY bucket, model_id;

This end-to-end pipeline demonstrates how TimescaleDB supports the complete ML lifecycle.

Conclusion

TimescaleDB provides a powerful platform for AI applications that work with time-series data. From feature engineering using SQL window functions to storing vector embeddings for similarity search, from training data preparation to real-time inference, TimescaleDB can serve as the data foundation for sophisticated ML pipelines.

Key capabilities include:

  • Feature engineering with continuous aggregates and window functions
  • Vector storage via pgvector extension for embedding similarity search
  • Model versioning with built-in model registry patterns
  • Forecasting through SQL extensions and Python integration
  • Anomaly detection using statistical methods and ML models
  • Pipeline integration with Pandas, Spark, and other ML frameworks

As AI continues to transform applications, TimescaleDB’s combination of time-series optimization and PostgreSQL compatibility makes it an excellent choice for AI-powered applications.

In the final article, we’ll explore TimescaleDB use cases across industries, from IoT and finance to DevOps and industrial applications.

Resources

Comments