Skip to main content

Observability Automation: Anomaly Detection, Auto-Remediation

Created: February 18, 2026 3 min read

Introduction

Automated observability uses ML to detect anomalies and trigger auto-remediation, reducing MTTR from hours to seconds.

Key Statistics:

  • Auto-remediation reduces MTTR by 80%
  • ML anomaly detection: 90% accuracy
  • 40% of incidents can be auto-remediated

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Automated Observability Flow                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────┐    ┌─────────────┐    ┌─────────────┐            │
│  │ Metrics │───▶│  Anomaly    │───▶│  Decision  │            │
│  │ Logs   │    │  Detection  │    │  Engine    │            │
│  │ Traces │    │  (ML Model) │    │             │            │
│  └─────────┘    └─────────────┘    └──────┬──────┘            │
│                                           │                    │
│                   ┌───────────────────────┬┘                    │
│                   ▼                       ▼                     │
│          ┌─────────────┐         ┌─────────────┐             │
│          │  Automatic  │         │   Manual   │             │
│          │ Remediation │         │   Alert    │             │
│          └─────────────┘         └─────────────┘             │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Anomaly Detection

#!/usr/bin/env python3
"""Anomaly detection for metrics."""

import numpy as np
from sklearn.ensemble import IsolationForest

class AnomalyDetector:
    """Detect anomalies in time-series metrics."""
    
    def __init__(self, contamination=0.1):
        self.model = IsolationForest(
            contamination=contamination,
            random_state=42
        )
        self.baseline = None
    
    def train(self, historical_data):
        """Train on historical data."""
        
        features = self.extract_features(historical_data)
        self.model.fit(features)
        self.baseline = {
            'mean': np.mean(historical_data),
            'std': np.std(historical_data)
        }
    
    def detect(self, current_value):
        """Detect if current value is anomalous."""
        
        # Z-score based detection
        z_score = (current_value - self.baseline['mean']) / self.baseline['std']
        
        is_anomaly = abs(z_score) > 3
        
        return {
            'anomaly': is_anomaly,
            'z_score': z_score,
            'severity': 'high' if abs(z_score) > 4 else 'medium' if abs(z_score) > 3 else 'low'
        }
    
    def extract_features(self, data):
        """Extract features for ML model."""
        
        features = []
        
        for i in range(len(data) - 10):
            window = data[i:i+10]
            features.append([
                np.mean(window),
                np.std(window),
                np.min(window),
                np.max(window),
                window[-1] - window[0]  # Trend
            ])
        
        return np.array(features)

Auto-Remediation

#!/usr/bin/env python3
"""Automated remediation actions."""

class AutoRemediator:
    """Execute automated remediation."""
    
    def __init__(self, k8s_client, cloud_client):
        self.k8s = k8s_client
        self.cloud = cloud_client
    
    def handle_high_cpu(self, pod):
        """Auto-scale or restart high CPU pod."""
        
        # Check if auto-scaling is enabled
        if self.is_hpa_enabled(pod):
            # Let HPA handle it
            return {"action": "hpa_will_handle"}
        
        # Restart pod if not HPA
        self.k8s.delete_pod(pod)
        
        return {"action": "pod_restarted", "pod": pod}
    
    def handle_high_memory(self, pod):
        """Restart pod with memory issue."""
        
        self.k8s.delete_pod(pod)
        
        return {"action": "pod_restarted", "reasonoom"}
    
    def handle_database_connection(self, issue": "memory_):
        """Handle database connection pool exhaustion."""
        
        # Scale up database connections
        self.cloud.scale_database(
            instance=issue['database'],
            new_size=issue['current_size'] * 2
        )
        
        return {"action": "database_scaled", "new_size": issue['current_size'] * 2}
    
    def handle_redis_full(self):
        """Clear Redis cache."""
        
        # Evict oldest keys
        self.redis.execute('EVICT', 1000)
        
        return {"action": "cache_evicted", "keys": 1000}

Integration

# Automated response rules
automations:
  - name: "High CPU Remediation"
    trigger:
      metric: "container_cpu_usage_seconds_total"
      condition: "> 0.9 for 5m"
    actions:
      - type: "scale"
        target: "deployment"
        action: "scale_up"
        max_replicas: 20
      - type: "notify"
        channel: "#ops"

  - name: "Pod Crash Loop"
    trigger:
      metric: "kube_pod_container_status_restarts"
      condition: "> 5 in 10m"
    actions:
      - type: "diagnose"
        gather_logs: true
      - type: "rollback"
        last_known_good: true

  - name: "Database High Connections"
    trigger:
      metric: "pg_stat_activity_count"
      condition: "> 0.8 * max_connections for 2m"
    actions:
      - type: "scale"
        target: "database"
      - type: "notify"
        channel: "#database-alerts"

Resources

Comments

Share this article

Scan to read on mobile