Skip to main content
โšก Calmops

InfluxDB for AI: Machine Learning, Forecasting, and Anomaly Detection

Introduction

Time-series data is the backbone of AI applications for monitoring, forecasting, and anomaly detection. InfluxDB’s efficient storage and query capabilities make it ideal for AI/ML pipelines. This article explores how to leverage InfluxDB for machine learning: feature engineering, forecasting, anomaly detection, and complete ML pipelines.

Time-Series Feature Engineering

Extracting features from time-series data for ML models.

Basic Statistical Features

from influxdb_client import InfluxDBClient
import pandas as pd

client = InfluxDBClient(url="http://localhost:8086", token="token")

def get_features(bucket, measurement, host, window='1h'):
    query = f'''
    from(bucket: "{bucket}")
      |> range(start: -24h)
      |> filter(fn: (r) => r._measurement == "{measurement}")
      |> filter(fn: (r) => r.host == "{host}")
      |> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
    '''
    df = client.query_api().query_data_frame(query)
    
    # Calculate features
    features = {
        'mean': df['_value'].mean(),
        'std': df['_value'].std(),
        'min': df['_value'].min(),
        'max': df['_value'].max(),
        'median': df['_value'].median(),
        'p95': df['_value'].quantile(0.95),
        'p05': df['_value'].quantile(0.05),
    }
    
    return features

features = get_features('metrics', 'cpu', 'server01')
print(features)

Trend Features

# Detect trends using linear regression
import numpy as np

def calculate_trend(df):
    x = np.arange(len(df))
    y = df['_value'].values
    
    # Linear regression
    slope, intercept = np.polyfit(x, y, 1)
    
    return {
        'slope': slope,
        'intercept': intercept,
        'trend_direction': 'increasing' if slope > 0 else 'decreasing'
    }

# Rolling statistics
def rolling_features(df, window=12):
    return {
        'rolling_mean': df['_value'].rolling(window).mean().iloc[-1],
        'rolling_std': df['_value'].rolling(window).std().iloc[-1],
        'rolling_min': df['_value'].rolling(window).min().iloc[-1],
        'rolling_max': df['_value'].rolling(window).max().iloc[-1],
    }

Seasonality Features

# Extract hourly patterns
def extract_hourly_pattern(df):
    df['hour'] = df['_time'].dt.hour
    hourly_avg = df.groupby('hour')['_value'].mean()
    
    return {
        'hourly_avg': hourly_avg.to_dict(),
        'is_peak_hour': df['hour'].iloc[-1] in [9, 10, 11, 14, 15, 16]
    }

# Daily patterns
def extract_daily_pattern(df):
    df['day_of_week'] = df['_time'].dt.dayofweek
    daily_avg = df.groupby('day_of_week')['_value'].mean()
    
    return daily_avg.to_dict()

Time-Series Forecasting

Building forecasting models using InfluxDB data.

Prophet Integration

from prophet import Prophet
import pandas as pd

# Fetch data from InfluxDB
def get_training_data(bucket, measurement):
    query = f'''
    from(bucket: "{bucket}")
      |> range(start: -30d)
      |> filter(fn: (r) => r._measurement == "{measurement}")
      |> filter(fn: (r) => r._field == "value")
    '''
    df = client.query_api().query_data_frame(query)
    
    # Format for Prophet
    prophet_df = pd.DataFrame({
        'ds': df['_time'],
        'y': df['_value']
    })
    
    return prophet_df

# Train model
def train_forecast_model(bucket, measurement):
    df = get_training_data(bucket, measurement)
    
    model = Prophet(
        daily_seasonality=True,
        weekly_seasonality=True,
        yearly_seasonality=True
    )
    model.fit(df)
    
    return model

# Make predictions
def predict(model, periods=24):
    future = model.make_future_dataframe(periods=periods, freq='H')
    forecast = model.predict(future)
    
    return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

# Store predictions in InfluxDB
def store_predictions(forecast, bucket):
    write_api = client.write_api()
    
    for _, row in forecast.iterrows():
        point = (
            Point("cpu_forecast")
            .tag("type", "prediction")
            .field("value", row['yhat'])
            .field("lower", row['yhat_lower'])
            .field("upper", row['yhat_upper'])
            .time(row['ds'])
        )
        write_api.write(bucket=bucket, org="my-org", record=point)

ARIMA Forecasting

from statsmodels.tsa.arima.model import ARIMA
import pandas as pd

def arima_forecast(df, steps=24):
    # Fit ARIMA model
    model = ARIMA(df['_value'].values, order=(5, 1, 2))
    fitted = model.fit()
    
    # Forecast
    forecast = fitted.forecast(steps=steps)
    
    return forecast

# Store in InfluxDB
def store_arima_predictions(forecast, host, bucket):
    write_api = client.write_api()
    
    for i, value in enumerate(forecast):
        point = (
            Point("cpu_forecast_arima")
            .tag("host", host)
            .field("value", value)
            .time(i)
        )
        write_api.write(bucket=bucket, org="my-org", record=point)

Anomaly Detection

Detecting anomalies in time-series data.

Statistical Anomaly Detection

# Z-score based anomaly detection
def detect_anomalies_zscore(df, threshold=3):
    mean = df['_value'].mean()
    std = df['_value'].std()
    
    df['zscore'] = (df['_value'] - mean) / std
    anomalies = df[abs(df['zscore']) > threshold]
    
    return anomalies

# IQR based detection
def detect_anomalies_iqr(df, multiplier=1.5):
    q1 = df['_value'].quantile(0.25)
    q3 = df['_value'].quantile(0.75)
    iqr = q3 - q1
    
    lower = q1 - multiplier * iqr
    upper = q3 + multiplier * iqr
    
    anomalies = df[(df['_value'] < lower) | (df['_value'] > upper)]
    
    return anomalies

# Rolling window detection
def detect_rolling_anomalies(df, window=12, threshold=3):
    rolling_mean = df['_value'].rolling(window).mean()
    rolling_std = df['_value'].rolling(window).std()
    
    df['deviation'] = abs(df['_value'] - rolling_mean) / rolling_std
    anomalies = df[df['deviation'] > threshold]
    
    return anomalies

Isolation Forest

from sklearn.ensemble import IsolationForest
import pandas as pd

def detect_anomalies_isolation_forest(df, contamination=0.1):
    # Extract features
    features = pd.DataFrame({
        'value': df['_value'],
        'rolling_mean': df['_value'].rolling(12).mean(),
        'rolling_std': df['_value'].rolling(12).std().fillna(0),
    })
    
    # Train model
    model = IsolationForest(contamination=contamination)
    predictions = model.fit_predict(features)
    
    # Get anomaly scores
    scores = model.score_samples(features)
    df['anomaly'] = predictions
    df['anomaly_score'] = scores
    
    return df[df['anomaly'] == -1]

# Real-time anomaly detection
def real_time_anomaly_check(value, historical_mean, historical_std, threshold=3):
    z_score = abs(value - historical_mean) / historical_std
    
    return {
        'is_anomaly': z_score > threshold,
        'z_score': z_score,
        'severity': 'high' if z_score > 3 * threshold else 'medium' if z_score > 2 * threshold else 'low'
    }

Storing Anomalies in InfluxDB

def store_anomalies(anomalies_df, bucket):
    write_api = client.write_api()
    
    for _, row in anomalies_df.iterrows():
        point = (
            Point("cpu_anomalies")
            .tag("type", "anomaly")
            .field("value", row['_value'])
            .field("zscore", row.get('zscore', 0))
            .time(row['_time'])
        )
        write_api.write(bucket=bucket, org="my-org", record=point)

Complete ML Pipeline

End-to-end ML pipeline with InfluxDB.

Data Collection and Storage

class TimeSeriesMLPipeline:
    def __init__(self, influx_client, bucket):
        self.client = influx_client
        self.bucket = bucket
        
    def collect_data(self, measurement, host, duration='24h'):
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -{duration})
          |> filter(fn: (r) => r._measurement == "{measurement}")
          |> filter(fn: (r) => r.host == "{host}")
        '''
        return self.client.query_api().query_data_frame(query)
    
    def engineer_features(self, df):
        """Extract features for ML training"""
        features = pd.DataFrame()
        
        # Time features
        features['hour'] = df['_time'].dt.hour
        features['day_of_week'] = df['_time'].dt.dayofweek
        
        # Rolling statistics
        features['rolling_mean_5'] = df['_value'].rolling(5).mean()
        features['rolling_std_5'] = df['_value'].rolling(5).std()
        
        # Lag features
        for lag in [1, 5, 10]:
            features[f'lag_{lag}'] = df['_value'].shift(lag)
        
        # Difference features
        features['diff_1'] = df['_value'].diff(1)
        
        return features.fillna(0)
    
    def train_model(self, features, labels):
        from sklearn.ensemble import RandomForestRegressor
        
        model = RandomForestRegressor(n_estimators=100)
        model.fit(features, labels)
        
        return model
    
    def predict(self, model, features):
        predictions = model.predict(features)
        
        # Store predictions
        write_api = self.client.write_api()
        for i, pred in enumerate(predictions):
            point = (
                Point("prediction")
                .field("value", pred)
                .time(i)
            )
            write_api.write(bucket=self.bucket, record=point)
        
        return predictions

Model Monitoring

def monitor_model_performance(actual_df, predicted_df):
    """Compare predictions to actual values"""
    merged = actual_df.merge(predicted_df, on='_time', suffixes=('_actual', '_pred'))
    
    mae = abs(merged['_value_actual'] - merged['_value_pred']).mean()
    mape = (abs(merged['_value_actual'] - merged['_value_pred']) / merged['_value_actual']).mean() * 100
    
    # Store metrics in InfluxDB
    write_api = client.write_api()
    
    point = (
        Point("model_metrics")
        .field("mae", mae)
        .field("mape", mape)
        .time('now()')
    )
    write_api.write(bucket="monitoring", record=point)
    
    return {'mae': mae, 'mape': mape}

Anomaly Alerting

Real-time alerting on detected anomalies.

from influxdb_client.client.write_api import SYNCHRONOUS

class AnomalyAlerter:
    def __init__(self, client, threshold=3):
        self.client = client
        self.threshold = threshold
        self.historical_stats = {}
    
    def update_stats(self, measurement, host):
        """Update baseline statistics"""
        query = f'''
        from(bucket: "metrics")
          |> range(start: -7d)
          |> filter(fn: (r) => r._measurement == "{measurement}")
          |> filter(fn: (r) => r.host == "{host}")
          |> filter(fn: (r) => r._field == "value")
        '''
        df = self.client.query_api().query_data_frame(query)
        
        self.historical_stats[host] = {
            'mean': df['_value'].mean(),
            'std': df['_value'].std()
        }
    
    def check_anomaly(self, value, host):
        stats = self.historical_stats.get(host)
        if not stats:
            return None
        
        z_score = abs(value - stats['mean']) / stats['std']
        
        if z_score > self.threshold:
            # Trigger alert
            self.send_alert(host, value, z_score)
            
        return z_score
    
    def send_alert(self, host, value, z_score):
        """Send alert (email, Slack, PagerDuty, etc.)"""
        alert = {
            'host': host,
            'value': value,
            'z_score': z_score,
            'timestamp': 'now()'
        }
        
        # Store in InfluxDB for alert history
        write_api = self.client.write_api()
        point = (
            Point("alerts")
            .tag("severity", "critical" if z_score > 3 else "warning")
            .field("value", value)
            .field("z_score", z_score)
        )
        write_api.write(bucket="alerts", record=point)
        
        print(f"ALERT: Anomaly detected on {host}: value={value}, z_score={z_score}")

Forecasting Pipeline Example

Complete example combining multiple techniques:

class ForecastingPipeline:
    def __init__(self, client, bucket):
        self.client = client
        self.bucket = bucket
    
    def run_daily_forecast(self, measurement, host):
        # 1. Get historical data
        historical = self.get_historical_data(measurement, host, days=30)
        
        # 2. Train model
        model = self.train_forecast_model(historical)
        
        # 3. Generate forecast (next 24 hours)
        forecast = model.predict(steps=24)
        
        # 4. Store forecast
        self.store_forecast(forecast, measurement)
        
        # 5. Detect anomalies in forecast
        anomalies = self.detect_forecast_anomalies(forecast)
        if anomalies:
            self.alert_anomalies(anomalies)
        
        return forecast
    
    def get_historical_data(self, measurement, host, days):
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -{days}d)
          |> filter(fn: (r) => r._measurement == "{measurement}")
          |> filter(fn: (r) => r.host == "{host}")
        '''
        return self.client.query_api().query_data_frame(query)
    
    def train_forecast_model(self, df):
        from prophet import Prophet
        
        prophet_df = pd.DataFrame({
            'ds': df['_time'],
            'y': df['_value']
        })
        
        model = Prophet(daily_seasonality=True)
        model.fit(prophet_df)
        
        return model
    
    def store_forecast(self, forecast, measurement):
        write_api = self.client.write_api()
        
        for _, row in forecast.iterrows():
            point = (
                Point(f"{measurement}_forecast")
                .tag("host", "predicted")
                .field("value", row['yhat'])
                .field("lower", row['yhat_lower'])
                .field("upper", row['yhat_upper'])
                .time(row['ds'])
            )
            write_api.write(bucket=self.bucket, record=point)
    
    def detect_forecast_anomalies(self, forecast):
        # Check if any forecast values are outside historical norms
        return forecast[forecast['yhat_lower'] < 0]  # Example condition
    
    def alert_anomalies(self, anomalies):
        print(f"Forecasting anomalies: {len(anomalies)} detected")

Conclusion

InfluxDB provides excellent foundation for AI/ML applications on time-series data. Key capabilities:

  • Feature engineering directly from queries
  • Integration with Python ML libraries
  • Storing predictions alongside raw data
  • Real-time anomaly detection and alerting
  • Complete end-to-end ML pipelines

Combine InfluxDB’s efficient storage with your favorite ML frameworks (Prophet, scikit-learn, TensorFlow) for powerful time-series AI applications.

In the final article, we’ll explore real-world InfluxDB use cases across industries.

Resources

Comments