Introduction
As artificial intelligence and machine learning become central to modern applications, the role of databases in AI pipelines has expanded significantly. TimescaleDB, built on PostgreSQL, offers unique capabilities for AI workloads that traditional time-series databases cannot match. In this article, we explore how TimescaleDB serves as a foundation for AI applications, from feature engineering and time-series forecasting to storing vector embeddings and building complete ML pipelines.
The combination of time-series data with AI creates powerful applications: predictive maintenance in manufacturing, anomaly detection in monitoring systems, demand forecasting in retail, and many more. TimescaleDB’s SQL interface, PostgreSQL ecosystem, and time-series optimizations make it an excellent choice for these use cases.
Time-Series Feature Engineering
Machine learning models require carefully engineered features from raw time-series data. TimescaleDB provides powerful SQL capabilities for feature engineering directly in the database.
Statistical Features
Computing statistical features is fundamental to time-series ML:
-- Create a feature view for ML training
CREATE MATERIALIZED VIEW sensor_features
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', time) AS bucket,
sensor_id,
-- Statistical features
AVG(temperature) AS mean_temp,
STDDEV(temperature) AS std_temp,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
MAX(temp) - MIN(temperature) AS temp_range,
-- Count of readings
COUNT(*) AS reading_count,
-- Missing data indicator
COUNT(*) FILTER (WHERE temperature IS NULL) AS null_count,
-- Trend indicators
AVG(temperature) - LAG(AVG(temperature), 1) OVER w AS temp_change,
-- Volatility
AVG(ABS(temperature - AVG(temperature) OVER w)) AS avg_deviation
FROM sensor_data
WHERE time > NOW() - INTERVAL '30 days'
WINDOW w AS (
PARTITION BY sensor_id
ORDER BY time
ROWS BETWEEN 11 PRECEDING AND CURRENT ROW
)
GROUP BY bucket, sensor_id;
-- Create refresh policy
SELECT add_continuous_aggregate_policy('sensor_features',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '5 minutes',
schedule_interval => INTERVAL '10 minutes');
This materialized view automatically maintains features like mean, standard deviation, min/max, trend indicators, and volatility measures—everything needed for ML model training.
Window-Based Features
Window functions enable sophisticated feature extraction:
-- Rolling window features for anomaly detection
CREATE MATERIALIZED VIEW rolling_features
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
sensor_id,
-- 5-minute rolling statistics
AVG(value) OVER (
PARTITION BY sensor_id
ORDER BY time
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS rolling_avg_5m,
-- Exponential moving average
AVG(value) OVER (
PARTITION BY sensor_id
ORDER BY time
ROWS BETWEEN 20 PRECEDING AND CURRENT ROW
) AS ema_20,
-- Lag features
LAG(value, 1) OVER (PARTITION BY sensor_id ORDER BY time) AS value_l LAG(valueag_1,
, 5) OVER (PARTITION BY sensor_id ORDER BY time) AS value_lag_5,
-- Percentage change
(value - LAG(value, 1) OVER (PARTITION BY sensor_id ORDER BY time))
/ NULLIF(LAG(value, 1) OVER (PARTITION BY sensor_id ORDER BY time), 0)
* 100 AS pct_change
FROM raw_metrics
WHERE time > NOW() - INTERVAL '7 days';
SELECT add_continuous_aggregate_policy('rolling_features',
start_offset => INTERVAL '10 minutes',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '5 minutes');
These rolling features capture temporal patterns that ML models use to make predictions.
Frequency Domain Features
For certain applications, frequency domain features are valuable:
-- Approximate frequency analysis using discrete Fourier transform
-- This is a simplified example; real implementations may use extensions
CREATE OR REPLACE FUNCTION compute_fft_features(
sensor_id TEXT,
start_time TIMESTAMPTZ,
end_time TIMESTAMPTZ
) RETURNS TABLE (
frequency_band TEXT,
power NUMERIC
) AS $$
BEGIN
-- Simplified FFT approximation using SQL
-- Real implementation would use PL/Python or extension
RETURN QUERY
WITH RECURSIVE fft AS (
SELECT
time_bucket('1 second', time) AS t,
AVG(value) AS v
FROM sensor_data
WHERE sensor_id = compute_fft_features.sensor_id
AND time BETWEEN compute_fft_features.start_time
AND compute_fft_features.end_time
GROUP BY t
)
SELECT
CASE
WHEN bucket < 10 THEN 'low'
WHEN bucket < 50 THEN 'medium'
ELSE 'high'
END AS frequency_band,
SUM(POWER(v, 2)) AS power
FROM (
SELECT v, NTILE(100) OVER (ORDER BY t) AS bucket
FROM fft
) sub
GROUP BY frequency_band;
END;
$$ LANGUAGE plpgsql;
For production frequency analysis, consider using MADlib or PL/R extensions, but the SQL approach demonstrates TimescaleDB’s extensibility.
Storing Vector Embeddings
Modern AI applications often work with vector embeddings. TimescaleDB’s PostgreSQL foundation supports storing and querying vectors efficiently.
Vector Storage
Store embeddings alongside time-series data:
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Create table with vector embeddings
CREATE TABLE embeddings (
time TIMESTAMPTZ NOT NULL,
entity_id TEXT NOT NULL,
entity_type TEXT NOT NULL, -- 'sensor', 'event', 'pattern'
embedding VECTOR(384), -- 384-dimensional embeddings
metadata JSONB
);
SELECT create_hypertable('embeddings', 'time');
-- Create index for similarity search
CREATE INDEX idx_embeddings_cosine
ON embeddings USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
The pgvector extension provides efficient vector operations and supports multiple distance metrics: Euclidean, cosine, and inner product.
Hybrid Search
Combine time filtering with vector similarity:
-- Find similar events from the past week
SELECT
e.entity_id,
e.time,
e.embedding <=> $query_embedding AS similarity,
e.metadata
FROM embeddings e
WHERE e.time > NOW() - INTERVAL '7 days'
AND e.entity_type = 'event'
ORDER BY e.embedding <=> $query_embedding
LIMIT 10;
This hybrid approach filters by time first, then performs similarity search on the reduced dataset—efficient and practical for real-world applications.
Time-Series Pattern Matching
Use embeddings to find similar patterns:
-- Find historical periods similar to current anomaly
WITH current_pattern AS (
SELECT embedding
FROM embeddings
WHERE time > NOW() - INTERVAL '1 hour'
AND entity_type = 'anomaly'
ORDER BY time DESC
LIMIT 1
)
SELECT
time_bucket('1 hour', time) AS bucket,
COUNT(*) AS similar_events,
AVG((metadata->>'severity')::numeric) AS avg_severity
FROM embeddings e
CROSS JOIN current_pattern cp
WHERE e.entity_type = 'anomaly'
AND e.embedding <=> cp.embedding < 0.2 -- Similarity threshold
AND e.time < NOW() - INTERVAL '1 day' -- Historical only
GROUP BY bucket
ORDER BY bucket DESC;
This pattern matching enables finding historical analogues to current events—a powerful technique for root cause analysis.
ML Model Training Pipelines
TimescaleDB can serve as the complete data platform for ML model development.
Training Data Preparation
Prepare datasets for model training:
-- Create training dataset combining features and labels
CREATE TABLE training_data (
id SERIAL PRIMARY KEY,
bucket TIMESTAMPTZ NOT NULL,
sensor_id TEXT NOT NULL,
features JSONB NOT NULL,
label BOOLEAN NOT NULL, -- e.g., anomaly detected
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Populate from continuous aggregates
INSERT INTO training_data (bucket, sensor_id, features, label)
SELECT
f.bucket,
f.sensor_id,
jsonb_build_object(
'mean_temp', f.mean_temp,
'std_temp', f.std_temp,
'temp_range', f.temp_range,
'reading_count', f.reading_count,
'null_count', f.null_count,
'temp_change', f.temp_change
) AS features,
CASE
WHEN f.max_temp > 100 THEN TRUE -- Label: anomaly
ELSE FALSE
END AS label
FROM sensor_features f
WHERE f.bucket > NOW() - INTERVAL '90 days';
-- Export for Python ML training
\copy training_data(features, label) TO '/tmp/training_data.csv'
WITH (FORMAT CSV, HEADER);
The training data can be exported in various formats (CSV, Parquet) for Python, R, or other ML frameworks.
Model Storage and Versioning
Track model versions alongside data:
-- Model registry table
CREATE TABLE models (
model_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
model_name TEXT NOT NULL,
version TEXT NOT NULL,
model_type TEXT NOT NULL, -- 'regression', 'classification', etc.
hyperparameters JSONB,
metrics JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by TEXT DEFAULT CURRENT_USER
);
-- Store trained model metadata
INSERT INTO models (model_name, version, model_type, hyperparameters, metrics)
VALUES (
'temperature_anomaly_detector',
'v1.2.0',
'isolation_forest',
'{"contamination": 0.05, "n_estimators": 100}',
'{"precision": 0.92, "recall": 0.88, "f1": 0.90}'
);
-- Link models to training data
ALTER TABLE training_data ADD COLUMN model_id UUID;
ALTER TABLE training_data ADD CONSTRAINT fk_model
FOREIGN KEY (model_id) REFERENCES models(model_id);
This model registry approach ensures reproducibility and traceability of ML experiments.
Time-Series Forecasting
TimescaleDB’s time-series functions enable forecasting directly in the database.
Simple Forecasting with Moving Averages
Basic forecasting uses window functions:
-- Forecast using exponential moving average
WITH historical_data AS (
SELECT
time_bucket('1 hour', time) AS bucket,
AVG(value) AS value
FROM metrics
WHERE time > NOW() - INTERVAL '30 days'
AND metric_name = 'cpu.usage'
GROUP BY bucket
),
forecast AS (
SELECT
bucket,
value,
AVG(value) OVER (
ORDER BY bucket
ROWS BETWEEN 24 PRECEDING AND CURRENT ROW
) AS moving_avg,
-- Simple linear trend
AVG(value) OVER (
ORDER BY bucket
ROWS BETWEEN 168 PRECEDING AND CURRENT ROW
) - AVG(value) OVER (
ORDER BY bucket
ROWS BETWEEN 336 PRECEDING AND 168 PRECEDING
) AS hourly_trend
FROM historical_data
)
SELECT
bucket + INTERVAL '1 hour' AS forecast_bucket,
moving_avg + hourly_trend AS forecast_value,
'trend' AS method
FROM forecast
ORDER BY bucket DESC
LIMIT 24;
This simple approach provides basic forecasting without external ML libraries.
Integration with Python Forecasting
For sophisticated forecasting, integrate with Python:
-- Create a function to call Python forecasting
CREATE OR REPLACE FUNCTION forecast_series(
metric_name TEXT,
forecast_horizon INTERVAL,
model_type TEXT DEFAULT 'prophet'
) RETURNS TABLE (
forecast_time TIMESTAMPTZ,
predicted_value DOUBLE PRECISION,
lower_bound DOUBLE PRECISION,
upper_bound DOUBLE PRECISION
) AS $$
import pandas as pd
from prophet import Prophet
import plpy
# Fetch data
plpy.execute(f"""
CREATE TEMP TABLE forecast_data AS
SELECT time_bucket('1 hour', time) AS ds, AVG(value) AS y
FROM metrics
WHERE time > NOW() - INTERVAL '30 days'
AND metric_name = '{metric_name}'
GROUP BY ds
""")
df = plpy.execute("SELECT ds, y FROM forecast_data ORDER BY ds")
df = pd.DataFrame([(r['ds'], r['y']) for r in df], columns=['ds', 'y'])
# Train model
model = Prophet(yearly_seasonality=True, weekly_seasonality=True)
model.fit(df)
# Make future dataframe
future = model.make_future_dataframe(periods=int(forecast_horizon.total_seconds()/3600))
forecast = model.predict(future)
# Return results
return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(
int(forecast_horizon.total_seconds()/3600)
).values.tolist()
$$ LANGUAGE plpython3u;
This integration enables using powerful forecasting libraries like Prophet or ARIMA directly from SQL.
Anomaly Detection
TimescaleDB excels at real-time anomaly detection patterns.
Statistical Anomaly Detection
Detect anomalies using statistical thresholds:
-- Real-time anomaly detection query
WITH recent_stats AS (
SELECT
sensor_id,
AVG(value) AS mean,
STDDEV(value) AS stddev,
AVG(value) + 3 * STDDEV(value) AS upper_threshold,
AVG(value) - 3 * STDDEV(value) AS lower_threshold
FROM metrics
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY sensor_id
),
current_values AS (
SELECT
time,
m.sensor_id,
m.value,
rs.mean,
rs.stddev,
rs.upper_threshold,
rs.lower_threshold,
CASE
WHEN m.value > rs.upper_threshold THEN 'high'
WHEN m.value < rs.lower_threshold THEN 'low'
ELSE 'normal'
END AS anomaly_status
FROM metrics m
JOIN recent_stats rs ON m.sensor_id = rs.sensor_id
WHERE m.time > NOW() - INTERVAL '5 minutes'
)
SELECT
time,
sensor_id,
value,
anomaly_status,
ROUND((value - mean) / NULLIF(stddev, 0), 2) AS z_score
FROM current_values
WHERE anomaly_status != 'normal'
ORDER BY time DESC;
This statistical approach flags values beyond 3 standard deviations from the mean—simple but effective for many use cases.
Machine Learning Anomaly Detection
For complex patterns, integrate with ML models:
-- Use stored model for anomaly scoring
CREATE OR REPLACE FUNCTION detect_anomaly(
sensor_id TEXT,
current_value DOUBLE PRECISION,
context JSONB
) RETURNS JSONB AS $$
DECLARE
model_row RECORD;
prediction JSONB;
BEGIN
-- Get active model
SELECT * INTO model_row
FROM models
WHERE model_name = 'anomaly_detector'
AND model_type = 'isolation_forest'
ORDER BY created_at DESC
LIMIT 1;
-- In production, load model and predict
-- This is a simplified example
prediction := jsonb_build_object(
'is_anomaly', current_value > 100,
'anomaly_score', RANDOM(),
'model_id', model_row.model_id
);
RETURN prediction;
END;
$$ LANGUAGE plpgsql;
-- Apply to recent data
SELECT
time,
sensor_id,
value,
detect_anomaly(sensor_id, value, jsonb_build_object(
'hour', EXTRACT(HOUR FROM time),
'day_of_week', EXTRACT(DOW FROM time)
)) AS anomaly_result
FROM metrics
WHERE time > NOW() - INTERVAL '1 hour'
AND detect_anomaly(sensor_id, value, '{}'::jsonb)->>'is_anomaly' = 'true';
This pattern enables sophisticated ML-based anomaly detection while maintaining SQL accessibility.
Data Export for AI Frameworks
Export data efficiently for external ML processing.
Pandas Integration
Export to Python Pandas:
import pandas as pd
import psycopg2
# Connect to TimescaleDB
conn = psycopg2.connect(
host="localhost",
database="timeseries",
user="postgres",
password="password"
)
# Fetch training data
query = """
SELECT features, label
FROM training_data
WHERE bucket > NOW() - INTERVAL '90 days'
"""
df = pd.read_sql(query, conn)
print(df.head())
# Prepare features
X = df['features'].apply(lambda x: pd.Series(x)).values
y = df['label'].values
This standard Python approach works with scikit-learn, TensorFlow, PyTorch, and other ML frameworks.
Apache Spark Integration
For large-scale processing:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("TimescaleDB Spark") \
.config("spark.jars", "/path/to/postgresql.jar") \
.getOrCreate()
# Read from TimescaleDB
df = spark.read.format("jdbc").options(
url="jdbc:postgresql://localhost:5432/timeseries",
dbtable="sensor_features",
user="postgres",
password="password"
).load()
# Process with Spark ML
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
# Feature assembly
assembler = VectorAssembler(
inputCols=["mean_temp", "std_temp", "temp_range"],
outputCol="features"
)
data = assembler.transform(df)
# Train model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
model = rf.fit(data)
Spark integration enables processing at massive scale.
Complete AI Pipeline Example
Here’s a complete pipeline from data ingestion to predictions:
-- 1. Ingest raw sensor data (high throughput)
INSERT INTO sensor_data VALUES
(NOW(), 'sensor-001', 22.5, 45.0),
(NOW(), 'sensor-002', 23.1, 44.8);
-- 2. Features are automatically computed (via continuous aggregate)
-- 3. Anomaly detection runs in real-time
SELECT * FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
AND detect_anomaly(sensor_id, temperature, '{}'::jsonb)->>'is_anomaly' = 'true';
-- 4. Predictions stored alongside raw data
CREATE TABLE predictions (
time TIMESTAMPTZ NOT NULL,
sensor_id TEXT NOT NULL,
prediction JSONB NOT NULL,
model_id UUID NOT NULL
);
SELECT create_hypertable('predictions', 'time');
-- 5. Model feedback loop - compare predictions to actual
CREATE MATERIALIZED VIEW prediction_accuracy
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', p.time) AS bucket,
p.model_id,
AVG(CASE
WHEN (p.prediction->>'is_anomaly')::boolean = (s.anomaly_label)::boolean
THEN 1.0 ELSE 0.0
END) AS accuracy
FROM predictions p
JOIN (
SELECT sensor_id, time, 'true' AS anomaly_label
FROM sensor_data
WHERE is_anomaly = true
) s ON p.sensor_id = s.sensor_id
AND p.time = s.time
GROUP BY bucket, model_id;
This end-to-end pipeline demonstrates how TimescaleDB supports the complete ML lifecycle.
Conclusion
TimescaleDB provides a powerful platform for AI applications that work with time-series data. From feature engineering using SQL window functions to storing vector embeddings for similarity search, from training data preparation to real-time inference, TimescaleDB can serve as the data foundation for sophisticated ML pipelines.
Key capabilities include:
- Feature engineering with continuous aggregates and window functions
- Vector storage via pgvector extension for embedding similarity search
- Model versioning with built-in model registry patterns
- Forecasting through SQL extensions and Python integration
- Anomaly detection using statistical methods and ML models
- Pipeline integration with Pandas, Spark, and other ML frameworks
As AI continues to transform applications, TimescaleDB’s combination of time-series optimization and PostgreSQL compatibility makes it an excellent choice for AI-powered applications.
In the final article, we’ll explore TimescaleDB use cases across industries, from IoT and finance to DevOps and industrial applications.
Resources
- TimescaleDB AI Use Cases
- pgvector Documentation
- Prophet Documentation
- scikit-learn Time-Series
- Apache Spark MLlib
Comments