Introduction
Time-series data is the backbone of AI applications for monitoring, forecasting, and anomaly detection. InfluxDB’s efficient storage and query capabilities make it ideal for AI/ML pipelines. This article explores how to leverage InfluxDB for machine learning: feature engineering, forecasting, anomaly detection, and complete ML pipelines.
Time-Series Feature Engineering
Extracting features from time-series data for ML models.
Basic Statistical Features
from influxdb_client import InfluxDBClient
import pandas as pd
client = InfluxDBClient(url="http://localhost:8086", token="token")
def get_features(bucket, measurement, host, window='1h'):
query = f'''
from(bucket: "{bucket}")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r.host == "{host}")
|> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
'''
df = client.query_api().query_data_frame(query)
# Calculate features
features = {
'mean': df['_value'].mean(),
'std': df['_value'].std(),
'min': df['_value'].min(),
'max': df['_value'].max(),
'median': df['_value'].median(),
'p95': df['_value'].quantile(0.95),
'p05': df['_value'].quantile(0.05),
}
return features
features = get_features('metrics', 'cpu', 'server01')
print(features)
Trend Features
# Detect trends using linear regression
import numpy as np
def calculate_trend(df):
x = np.arange(len(df))
y = df['_value'].values
# Linear regression
slope, intercept = np.polyfit(x, y, 1)
return {
'slope': slope,
'intercept': intercept,
'trend_direction': 'increasing' if slope > 0 else 'decreasing'
}
# Rolling statistics
def rolling_features(df, window=12):
return {
'rolling_mean': df['_value'].rolling(window).mean().iloc[-1],
'rolling_std': df['_value'].rolling(window).std().iloc[-1],
'rolling_min': df['_value'].rolling(window).min().iloc[-1],
'rolling_max': df['_value'].rolling(window).max().iloc[-1],
}
Seasonality Features
# Extract hourly patterns
def extract_hourly_pattern(df):
df['hour'] = df['_time'].dt.hour
hourly_avg = df.groupby('hour')['_value'].mean()
return {
'hourly_avg': hourly_avg.to_dict(),
'is_peak_hour': df['hour'].iloc[-1] in [9, 10, 11, 14, 15, 16]
}
# Daily patterns
def extract_daily_pattern(df):
df['day_of_week'] = df['_time'].dt.dayofweek
daily_avg = df.groupby('day_of_week')['_value'].mean()
return daily_avg.to_dict()
Time-Series Forecasting
Building forecasting models using InfluxDB data.
Prophet Integration
from prophet import Prophet
import pandas as pd
# Fetch data from InfluxDB
def get_training_data(bucket, measurement):
query = f'''
from(bucket: "{bucket}")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r._field == "value")
'''
df = client.query_api().query_data_frame(query)
# Format for Prophet
prophet_df = pd.DataFrame({
'ds': df['_time'],
'y': df['_value']
})
return prophet_df
# Train model
def train_forecast_model(bucket, measurement):
df = get_training_data(bucket, measurement)
model = Prophet(
daily_seasonality=True,
weekly_seasonality=True,
yearly_seasonality=True
)
model.fit(df)
return model
# Make predictions
def predict(model, periods=24):
future = model.make_future_dataframe(periods=periods, freq='H')
forecast = model.predict(future)
return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
# Store predictions in InfluxDB
def store_predictions(forecast, bucket):
write_api = client.write_api()
for _, row in forecast.iterrows():
point = (
Point("cpu_forecast")
.tag("type", "prediction")
.field("value", row['yhat'])
.field("lower", row['yhat_lower'])
.field("upper", row['yhat_upper'])
.time(row['ds'])
)
write_api.write(bucket=bucket, org="my-org", record=point)
ARIMA Forecasting
from statsmodels.tsa.arima.model import ARIMA
import pandas as pd
def arima_forecast(df, steps=24):
# Fit ARIMA model
model = ARIMA(df['_value'].values, order=(5, 1, 2))
fitted = model.fit()
# Forecast
forecast = fitted.forecast(steps=steps)
return forecast
# Store in InfluxDB
def store_arima_predictions(forecast, host, bucket):
write_api = client.write_api()
for i, value in enumerate(forecast):
point = (
Point("cpu_forecast_arima")
.tag("host", host)
.field("value", value)
.time(i)
)
write_api.write(bucket=bucket, org="my-org", record=point)
Anomaly Detection
Detecting anomalies in time-series data.
Statistical Anomaly Detection
# Z-score based anomaly detection
def detect_anomalies_zscore(df, threshold=3):
mean = df['_value'].mean()
std = df['_value'].std()
df['zscore'] = (df['_value'] - mean) / std
anomalies = df[abs(df['zscore']) > threshold]
return anomalies
# IQR based detection
def detect_anomalies_iqr(df, multiplier=1.5):
q1 = df['_value'].quantile(0.25)
q3 = df['_value'].quantile(0.75)
iqr = q3 - q1
lower = q1 - multiplier * iqr
upper = q3 + multiplier * iqr
anomalies = df[(df['_value'] < lower) | (df['_value'] > upper)]
return anomalies
# Rolling window detection
def detect_rolling_anomalies(df, window=12, threshold=3):
rolling_mean = df['_value'].rolling(window).mean()
rolling_std = df['_value'].rolling(window).std()
df['deviation'] = abs(df['_value'] - rolling_mean) / rolling_std
anomalies = df[df['deviation'] > threshold]
return anomalies
Isolation Forest
from sklearn.ensemble import IsolationForest
import pandas as pd
def detect_anomalies_isolation_forest(df, contamination=0.1):
# Extract features
features = pd.DataFrame({
'value': df['_value'],
'rolling_mean': df['_value'].rolling(12).mean(),
'rolling_std': df['_value'].rolling(12).std().fillna(0),
})
# Train model
model = IsolationForest(contamination=contamination)
predictions = model.fit_predict(features)
# Get anomaly scores
scores = model.score_samples(features)
df['anomaly'] = predictions
df['anomaly_score'] = scores
return df[df['anomaly'] == -1]
# Real-time anomaly detection
def real_time_anomaly_check(value, historical_mean, historical_std, threshold=3):
z_score = abs(value - historical_mean) / historical_std
return {
'is_anomaly': z_score > threshold,
'z_score': z_score,
'severity': 'high' if z_score > 3 * threshold else 'medium' if z_score > 2 * threshold else 'low'
}
Storing Anomalies in InfluxDB
def store_anomalies(anomalies_df, bucket):
write_api = client.write_api()
for _, row in anomalies_df.iterrows():
point = (
Point("cpu_anomalies")
.tag("type", "anomaly")
.field("value", row['_value'])
.field("zscore", row.get('zscore', 0))
.time(row['_time'])
)
write_api.write(bucket=bucket, org="my-org", record=point)
Complete ML Pipeline
End-to-end ML pipeline with InfluxDB.
Data Collection and Storage
class TimeSeriesMLPipeline:
def __init__(self, influx_client, bucket):
self.client = influx_client
self.bucket = bucket
def collect_data(self, measurement, host, duration='24h'):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r.host == "{host}")
'''
return self.client.query_api().query_data_frame(query)
def engineer_features(self, df):
"""Extract features for ML training"""
features = pd.DataFrame()
# Time features
features['hour'] = df['_time'].dt.hour
features['day_of_week'] = df['_time'].dt.dayofweek
# Rolling statistics
features['rolling_mean_5'] = df['_value'].rolling(5).mean()
features['rolling_std_5'] = df['_value'].rolling(5).std()
# Lag features
for lag in [1, 5, 10]:
features[f'lag_{lag}'] = df['_value'].shift(lag)
# Difference features
features['diff_1'] = df['_value'].diff(1)
return features.fillna(0)
def train_model(self, features, labels):
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor(n_estimators=100)
model.fit(features, labels)
return model
def predict(self, model, features):
predictions = model.predict(features)
# Store predictions
write_api = self.client.write_api()
for i, pred in enumerate(predictions):
point = (
Point("prediction")
.field("value", pred)
.time(i)
)
write_api.write(bucket=self.bucket, record=point)
return predictions
Model Monitoring
def monitor_model_performance(actual_df, predicted_df):
"""Compare predictions to actual values"""
merged = actual_df.merge(predicted_df, on='_time', suffixes=('_actual', '_pred'))
mae = abs(merged['_value_actual'] - merged['_value_pred']).mean()
mape = (abs(merged['_value_actual'] - merged['_value_pred']) / merged['_value_actual']).mean() * 100
# Store metrics in InfluxDB
write_api = client.write_api()
point = (
Point("model_metrics")
.field("mae", mae)
.field("mape", mape)
.time('now()')
)
write_api.write(bucket="monitoring", record=point)
return {'mae': mae, 'mape': mape}
Anomaly Alerting
Real-time alerting on detected anomalies.
from influxdb_client.client.write_api import SYNCHRONOUS
class AnomalyAlerter:
def __init__(self, client, threshold=3):
self.client = client
self.threshold = threshold
self.historical_stats = {}
def update_stats(self, measurement, host):
"""Update baseline statistics"""
query = f'''
from(bucket: "metrics")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r.host == "{host}")
|> filter(fn: (r) => r._field == "value")
'''
df = self.client.query_api().query_data_frame(query)
self.historical_stats[host] = {
'mean': df['_value'].mean(),
'std': df['_value'].std()
}
def check_anomaly(self, value, host):
stats = self.historical_stats.get(host)
if not stats:
return None
z_score = abs(value - stats['mean']) / stats['std']
if z_score > self.threshold:
# Trigger alert
self.send_alert(host, value, z_score)
return z_score
def send_alert(self, host, value, z_score):
"""Send alert (email, Slack, PagerDuty, etc.)"""
alert = {
'host': host,
'value': value,
'z_score': z_score,
'timestamp': 'now()'
}
# Store in InfluxDB for alert history
write_api = self.client.write_api()
point = (
Point("alerts")
.tag("severity", "critical" if z_score > 3 else "warning")
.field("value", value)
.field("z_score", z_score)
)
write_api.write(bucket="alerts", record=point)
print(f"ALERT: Anomaly detected on {host}: value={value}, z_score={z_score}")
Forecasting Pipeline Example
Complete example combining multiple techniques:
class ForecastingPipeline:
def __init__(self, client, bucket):
self.client = client
self.bucket = bucket
def run_daily_forecast(self, measurement, host):
# 1. Get historical data
historical = self.get_historical_data(measurement, host, days=30)
# 2. Train model
model = self.train_forecast_model(historical)
# 3. Generate forecast (next 24 hours)
forecast = model.predict(steps=24)
# 4. Store forecast
self.store_forecast(forecast, measurement)
# 5. Detect anomalies in forecast
anomalies = self.detect_forecast_anomalies(forecast)
if anomalies:
self.alert_anomalies(anomalies)
return forecast
def get_historical_data(self, measurement, host, days):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -{days}d)
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r.host == "{host}")
'''
return self.client.query_api().query_data_frame(query)
def train_forecast_model(self, df):
from prophet import Prophet
prophet_df = pd.DataFrame({
'ds': df['_time'],
'y': df['_value']
})
model = Prophet(daily_seasonality=True)
model.fit(prophet_df)
return model
def store_forecast(self, forecast, measurement):
write_api = self.client.write_api()
for _, row in forecast.iterrows():
point = (
Point(f"{measurement}_forecast")
.tag("host", "predicted")
.field("value", row['yhat'])
.field("lower", row['yhat_lower'])
.field("upper", row['yhat_upper'])
.time(row['ds'])
)
write_api.write(bucket=self.bucket, record=point)
def detect_forecast_anomalies(self, forecast):
# Check if any forecast values are outside historical norms
return forecast[forecast['yhat_lower'] < 0] # Example condition
def alert_anomalies(self, anomalies):
print(f"Forecasting anomalies: {len(anomalies)} detected")
Conclusion
InfluxDB provides excellent foundation for AI/ML applications on time-series data. Key capabilities:
- Feature engineering directly from queries
- Integration with Python ML libraries
- Storing predictions alongside raw data
- Real-time anomaly detection and alerting
- Complete end-to-end ML pipelines
Combine InfluxDB’s efficient storage with your favorite ML frameworks (Prophet, scikit-learn, TensorFlow) for powerful time-series AI applications.
In the final article, we’ll explore real-world InfluxDB use cases across industries.
Resources
- Prophet Documentation
- scikit-learn Time Series
- InfluxDB Python Client
- Time-Series Analysis with Python
Comments