Skip to main content
โšก Calmops

Observability Cost Optimization: Sampling, Retention, Compression

Introduction

Observability infrastructure can become a major cost driver at scale. This article covers strategies to optimize observability costs while maintaining actionable insights.

Key Statistics:

  • Observability costs: 15-30% of cloud spend at scale
  • Sampling can reduce costs by 70-90%
  • Compression: 5-10x reduction in storage
  • Retention optimization: 40-60% storage savings

Cost Breakdown

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Observability Cost Components                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Metrics (Prometheus/DataDog)                                    โ”‚
โ”‚  โ”œโ”€โ”€ Storage: $0.01-0.03/GB/month                               โ”‚
โ”‚  โ”œโ”€โ”€ Ingestion: $0.01-0.03/10K samples                         โ”‚
โ”‚  โ”œโ”€โ”€ Retention: $0.01-0.02/GB/month per additional 30 days    โ”‚
โ”‚  โ””โ”€โ”€ Queries: $0.001-0.005/query                               โ”‚
โ”‚                                                                  โ”‚
โ”‚  Logs (Loki/ELK/Splunk)                                         โ”‚
โ”‚  โ”œโ”€โ”€ Ingestion: $0.50-2.00/GB                                  โ”‚
โ”‚  โ”œโ”€โ”€ Storage: $0.02-0.10/GB/month                              โ”‚
โ”‚  โ”œโ”€โ”€ Indexing: $0.50-5.00/GB                                   โ”‚
โ”‚  โ””โ”€โ”€ Retention: $0.01-0.05/GB/month per 30 days                โ”‚
โ”‚                                                                  โ”‚
โ”‚  Traces (Jaeger/Zipkin/DataDog)                                โ”‚
โ”‚  โ”œโ”€โ”€ Ingestion: $0.50-5.00/MB                                  โ”‚
โ”‚  โ”œโ”€โ”€ Storage: $0.05-0.20/GB/month                               โ”‚
โ”‚  โ””โ”€โ”€ Retention: $0.02-0.10/GB/month per 30 days                 โ”‚
โ”‚                                                                  โ”‚
โ”‚  Infrastructure                                                   โ”‚
โ”‚  โ”œโ”€โ”€ Compute (collectors, processors)                          โ”‚
โ”‚  โ”œโ”€โ”€ Network (data transfer)                                   โ”‚
โ”‚  โ””โ”€โ”€ Management (RBAC, dashboards)                             โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Sampling Strategies

Head Sampling

#!/usr/bin/env python3
"""Head sampling - sample at the start of the pipeline."""

import random
import time
from typing import Callable, Any
from dataclasses import dataclass

@dataclass
class HeadSamplingConfig:
    """Head sampling configuration."""
    
    sampling_rate: float = 0.1  # 10% of traces
    min_duration_ms: int = 100  # Always sample >100ms
    priority_sample: bool = True  # Always sample errors/high latency
    random_seed: int = 42

class HeadSampler:
    """Simple probabilistic sampling at ingestion."""
    
    def __init__(self, config: HeadSamplingConfig):
        self.config = config
        random.seed(config.random_seed)
        self.total_seen = 0
        self.total_sampled = 0
    
    def should_sample(self, trace: dict) -> bool:
        """Determine if trace should be sampled."""
        
        self.total_seen += 1
        
        # Always sample if priority
        if self.config.priority_sample:
            if self._is_priority_trace(trace):
                self.total_sampled += 1
                return True
        
        # Always sample long traces
        if self._get_duration_ms(trace) > self.config.min_duration_ms:
            self.total_sampled += 1
            return True
        
        # Probabilistic sampling
        if random.random() < self.config.sampling_rate:
            self.total_sampled += 1
            return True
        
        return False
    
    def _is_priority_trace(self, trace: dict) -> bool:
        """Check if trace is high priority (error, high latency)."""
        
        # Check for errors
        if trace.get('status_code', 200) >= 500:
            return True
        
        # Check for high latency
        if self._get_duration_ms(trace) > 5000:  # > 5s
            return True
        
        # Check for user priority tag
        if trace.get('priority') == 'high':
            return True
        
        return False
    
    def _get_duration_ms(self, trace: dict) -> float:
        """Calculate trace duration in milliseconds."""
        
        if 'start_time' in trace and 'end_time' in trace:
            return (trace['end_time'] - trace['start_time']) * 1000
        
        return 0
    
    def get_stats(self) -> dict:
        """Get sampling statistics."""
        
        return {
            'total_seen': self.total_seen,
            'total_sampled': self.total_sampled,
            'sample_rate': self.total_sampled / max(1, self.total_seen)
        }

Tail-Based Sampling

#!/usr/bin/env python3
"""Tail-based sampling - sample after collecting full trace."""

from typing import List, Dict, Optional
from dataclasses import dataclass, field
from collections import defaultdict
import threading
import time

@dataclass
class TraceData:
    """Complete trace data."""
    
    trace_id: str
    spans: List[Dict] = field(default_factory=list)
    start_time: float = 0
    end_time: float = 0
    tags: Dict = field(default_factory=dict)
    
    @property
    def duration_ms(self) -> float:
        """Calculate trace duration."""
        return (self.end_time - self.start_time) * 1000
    
    @property
    def error_count(self) -> int:
        """Count error spans."""
        return sum(1 for span in self.spans if span.get('status_code', 200) >= 500)

class TailSamplingPolicy:
    """Tail-based sampling policies."""
    
    def __init__(self):
        self.policies = []
    
    def add_error_policy(self, error_rate_threshold: float = 0.1):
        """Sample traces with high error rate."""
        
        def policy(trace: TraceData) -> bool:
            if trace.error_count > 0:
                return True
            return False
        
        self.policies.append(('error', policy))
    
    def add_latency_policy(self, latency_threshold_ms: float):
        """Sample traces exceeding latency threshold."""
        
        def policy(trace: TraceData) -> bool:
            return trace.duration_ms > latency_threshold_ms
        
        self.policies.append(('latency', policy))
    
    def add_probabilistic_policy(self, sample_rate: float):
        """Add probabilistic sampling."""
        
        import random
        
        def policy(trace: TraceData) -> bool:
            return random.random() < sample_rate
        
        self.policies.append(('probabilistic', policy))
    
    def add_rare_attribute_policy(self, attributes: List[str]):
        """Sample traces with rare attribute values."""
        
        seen_values = defaultdict(int)
        
        def policy(trace: TraceData) -> bool:
            for attr in attributes:
                value = trace.tags.get(attr)
                if value:
                    seen_values[value] += 1
                    # Sample if rare (< 1% of traces)
                    if seen_values[value] < 10:
                        return True
            return False
        
        self.policies.append(('rare_attribute', policy))
    
    def should_sample(self, trace: TraceData) -> bool:
        """Check if any policy matches."""
        
        for name, policy in self.policies:
            if policy(trace):
                return True
        
        return False

class TailSamplingProcessor:
    """Tail-based sampling processor."""
    
    def __init__(self, buffer_size: int = 10000, 
                 flush_interval_sec: int = 10):
        self.buffer: Dict[str, TraceData] = {}
        self.policy = TailSamplingPolicy()
        self.buffer_size = buffer_size
        self.flush_interval = flush_interval_sec
        self.lock = threading.Lock()
        self.sampled_count = 0
        self.dropped_count = 0
        
        # Start background flush
        self._start_flush_thread()
    
    def add_span(self, span: Dict):
        """Add span to trace buffer."""
        
        trace_id = span.get('trace_id')
        
        with self.lock:
            if trace_id not in self.buffer:
                self.buffer[trace_id] = TraceData(
                    trace_id=trace_id,
                    start_time=span.get('timestamp', time.time())
                )
            
            self.buffer[trace_id].spans.append(span)
            self.buffer[trace_id].end_time = span.get('timestamp', time.time())
            self.buffer[trace_id].tags.update(span.get('tags', {}))
    
    def _start_flush_thread(self):
        """Start background flush thread."""
        
        import threading
        
        def flush_loop():
            while True:
                time.sleep(self.flush_interval)
                self._flush_completed_traces()
        
        thread = threading.Thread(target=flush_loop, daemon=True)
        thread.start()
    
    def _flush_completed_traces(self):
        """Flush and sample completed traces."""
        
        current_time = time.time()
        
        with self.lock:
            to_remove = []
            
            for trace_id, trace in self.buffer.items():
                # Consider trace complete if no spans for 5 seconds
                if current_time - trace.end_time > 5:
                    if self.policy.should_sample(trace):
                        self._send_to_backend(trace)
                        self.sampled_count += 1
                    else:
                        self.dropped_count += 1
                    
                    to_remove.append(trace_id)
            
            for trace_id in to_remove:
                del self.buffer[trace_id]
    
    def _send_to_backend(self, trace: TraceData):
        """Send sampled trace to backend."""
        
        # Implementation would send to Jaeger/Zipkin
        pass
    
    def get_stats(self) -> dict:
        """Get sampling statistics."""
        
        return {
            'buffer_size': len(self.buffer),
            'sampled_count': self.sampled_count,
            'dropped_count': self.dropped_count,
            'sample_rate': self.sampled_count / max(1, 
                self.sampled_count + self.dropped_count)
        }

OpenTelemetry Sampling

# OpenTelemetry Collector sampling configuration
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  # Probabilistic sampling
  probabilistic_sampler:
    hash_seed: 0
    sampling_percentage: 10.0
    
  # Tail-based sampling
  tail_sampling:
    decision_wait: 10s
    policies:
      - name: errors-policy
        type: status_code
        status_code: {status_codes: [ERROR]}
      - name: slow-traces-policy
        type: latency
        latency: {threshold_ms: 1000}
      - name: probabilistic-policy
        type: probabilistic
        probabilistic: {sampling_percentage: 10}
      - name: errors-and-slow-policy
        type: and
        and: 
          and_policy:
            - status_code: {status_codes: [ERROR]}
          and_policy:
            - latency: {threshold_ms: 500}

exporters:
  otlp:
    endpoint: "https://tempo.example.com:4317"
    tls:
      insecure: false
      
  prometheus:
    endpoint: "0.0.0.0:8889"

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [tail_sampling]
      exporters: [otlp]
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [prometheus]

Log Sampling

#!/usr/bin/env python3
"""Intelligent log sampling strategies."""

import hashlib
import json
import logging
from typing import Optional
from enum import Enum

class LogLevel(Enum):
    """Log severity levels."""
    DEBUG = 10
    INFO = 20
    WARNING = 30
    ERROR = 40
    CRITICAL = 50

class LogSampler:
    """Sampling for high-volume log streams."""
    
    def __init__(self, sample_rate: float = 0.1):
        self.sample_rate = sample_rate
    
    def should_sample(self, log: dict) -> bool:
        """Determine if log should be sampled."""
        
        # Use trace_id or message hash for consistent sampling
        content = log.get('trace_id', '') + log.get('message', '')
        hash_val = int(hashlib.md5(content.encode()).hexdigest(), 16)
        
        return (hash_val % 100) < (self.sample_rate * 100)

class AdaptiveLogSampler:
    """Adaptive log sampling based on volume."""
    
    def __init__(self, target_rps: int = 1000):
        self.target_rps = target_rps
        self.current_rps = 0
        self.sample_rate = 1.0
        self.window_size = 60  # 60 second windows
        
        # Track logs per window
        self.log_counts = []
        self.max_history = 100
    
    def update(self, current_rps: int):
        """Update current RPS and adjust sample rate."""
        
        self.current_rps = current_rps
        self.log_counts.append(current_rps)
        
        if len(self.log_counts) > self.max_history:
            self.log_counts.pop(0)
        
        # Calculate average
        avg_rps = sum(self.log_counts) / len(self.log_counts)
        
        # Adjust sample rate
        if avg_rps > self.target_rps:
            # Decrease sample rate
            self.sample_rate = max(0.01, self.sample_rate * 0.9)
        elif avg_rps < self.target_rps * 0.8:
            # Increase sample rate
            self.sample_rate = min(1.0, self.sample_rate * 1.1)
    
    def should_sample(self, log: dict) -> bool:
        """Determine if log should be sampled."""
        
        # Always sample errors
        if log.get('level') in ['ERROR', 'CRITICAL']:
            return True
        
        # Probabilistic sampling based on adjusted rate
        import random
        return random.random() < self.sample_rate

class StructuredLogFilter:
    """Filter structured logs intelligently."""
    
    def __init__(self):
        # Always keep these
        self.always_keep_keys = {'level', 'timestamp', 'trace_id', 
                                 'service', 'error'}
        
        # Drop these keys if present
        self.drop_keys = {'http_request_body', 'http_response_body'}
        
        # Truncate these keys
        self.truncate_keys = {'stack_trace': 500, 'message': 1000}
    
    def filter_log(self, log: dict) -> Optional[dict]:
        """Filter and transform log entry."""
        
        # Drop entire log if DEBUG and sampling disabled
        if log.get('level') == 'DEBUG':
            return None
        
        # Create filtered copy
        filtered = {}
        
        for key, value in log.items():
            # Skip dropped keys
            if key in self.drop_keys:
                continue
            
            # Truncate long values
            if key in self.truncate_keys:
                max_len = self.truncate_keys[key]
                if isinstance(value, str) and len(value) > max_len:
                    value = value[:max_len] + '...'
            
            filtered[key] = value
        
        return filtered

Retention Policies

Prometheus Retention

# Prometheus retention configuration
global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    cluster: 'production'
    environment: 'prod'

storage:
  # Time series retention
  tsdb.retention.time: 15d
  
  # Maximum block size
  tsdb.max-block.duration: 2h
  
  # WAL (Write-Ahead Log)
  wal_compression: true

# Remote write configuration for long-term storage
remote_write:
  - url: https://remote-storage.example.com/api/v1/write
    tls_config:
      cert_file: /etc/prometheus/certs/client.crt
      key_file: /etc/prometheus/certs/client.key
    basic_auth:
      username: prometheus
      password: secret
    queue_config:
      capacity: 10000
      max_shards: 30
      min_shards: 1
      max_samples_per_send: 500
      batch_send_deadline: 5s

# Rule evaluation and recording
rule_files:
  - /etc/prometheus/rules/*.yml

Loki Retention

# Loki retention configuration
schema_config:
  configs:
    - from: 2024-01-01
      store: boltdb-shipper
      object_store: s3
      index:
        prefix: loki_index_
        period: 24h

limits_config:
  # Retention period
  retention_period: 15d
  
  # Per-tenant limits
  per_tenant_limits:
    ingestion_rate_mb: 50
    ingestion_burst_size_mb: 100
    max_global_streams_per_tenant: 10000
    max_chunks_per_query: 2000000
    max_entries_limit_per_query: 5000

storage_config:
  boltdb:
    directory: /loki/index
  
  aws:
    s3: s3://.amazonaws.com/loki-storage
    s3forcepathstyle: false

compactor:
  working_directory: /loki/compactor
  retention_enabled: true
  deletion_mode: filter-and-delete
  retention_delete_work_count: 15000

Compression Strategies

Time Series Compression

#!/usr/bin/env python3
"""Time series data compression."""

import struct
import zlib
from typing import List, Tuple

class TimeSeriesCompressor:
    """Compress time series data."""
    
    def __init__(self):
        self.delta_encoding = True
        self.zlib_compression = True
    
    def compress(self, timestamps: List[int], 
                 values: List[float]) -> bytes:
        """Compress time series using delta encoding."""
        
        compressed = bytearray()
        
        # First timestamp is absolute
        if timestamps:
            compressed.extend(struct.pack('>Q', timestamps[0]))
            
            # Delta encode subsequent timestamps
            prev_ts = timestamps[0]
            for ts in timestamps[1:]:
                delta = ts - prev_ts
                compressed.extend(self._encode_varint(delta))
                prev_ts = ts
            
            # Encode values
            prev_val = 0
            for val in values:
                # Convert to integer (assuming 4 decimal places)
                val_int = int(val * 10000)
                delta = val_int - prev_val
                compressed.extend(self._encode_varint(delta))
                prev_val = val_int
        
        # Optional zlib compression
        if self.zlib_compression:
            compressed = zlib.compress(bytes(compressed))
        
        return bytes(compressed)
    
    def _encode_varint(self, value: int) -> bytes:
        """Encode integer as varint."""
        
        result = bytearray()
        
        while value > 0x7F:
            result.append((value & 0x7F) | 0x80)
            value >>= 7
        
        result.append(value & 0x7F)
        
        return bytes(result)

class PrometheusTSDBCompression:
    """Prometheus TSDB compression analysis."""
    
    # Typical compression ratios
    COMPRESSION_RATIOS = {
        'raw': 1.0,
        'xor': 8.0,
        'lz4': 10.0,
        'zstd': 15.0,
        'snappy': 12.0
    }
    
    @staticmethod
    def estimate_storage(metrics: int, 
                         retention_days: int,
                         scrape_interval: str = '15s') -> dict:
        """Estimate storage requirements."""
        
        # Parse scrape interval
        seconds = int(scrape_interval.rstrip('s'))
        
        # Calculate samples per metric
        samples_per_metric = (86400 / seconds) * retention_days
        
        # Assume 16 bytes per sample (typical)
        bytes_per_sample = 16
        raw_size = metrics * samples_per_metric * bytes_per_sample
        
        return {
            'metrics_count': metrics,
            'samples_per_metric': samples_per_metric,
            'raw_size_gb': raw_size / (1024**3),
            'compressed_size_gb': raw_size / (1024**3) / 10,  # 10x compression
            'estimated_cost_monthly': (raw_size / (1024**3) / 10) * 0.02
        }

Cost Monitoring

#!/usr/bin/env python3
"""Monitor and alert on observability costs."""

from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime, timedelta
import json

@dataclass
class CostAlert:
    """Cost alert configuration."""
    
    metric_name: str
    threshold_gb: float
    warning_threshold_pct: float = 0.8
    alert_type: str = "budget"

class ObservabilityCostMonitor:
    """Monitor observability infrastructure costs."""
    
    def __init__(self):
        self.cost_per_gb = {
            'prometheus_metrics': 0.023,  # $ per GB/month
            'loki_logs': 0.10,
            'jaeger_traces': 0.15,
            'tempo_traces': 0.12,
            'datadog': 0.023,
        }
        
        self.budgets = {}
        self.alerts: List[CostAlert] = []
    
    def set_budget(self, component: str, monthly_budget_usd: float):
        """Set monthly budget for component."""
        
        self.budgets[component] = {
            'monthly_budget': monthly_budget_usd,
            'current_spend': 0,
            'start_date': datetime.now()
        }
    
    def record_usage(self, component: str, 
                     gb_used: float, 
                     period_days: int = 30):
        """Record usage and calculate cost."""
        
        # Calculate monthly equivalent
        monthly_gb = gb_used * (30 / period_days)
        
        cost = monthly_gb * self.cost_per_gb.get(component, 0.10)
        
        if component in self.budgets:
            self.budgets[component]['current_spend'] = cost
        
        return cost
    
    def check_alerts(self, component: str) -> List[dict]:
        """Check if budget alerts should fire."""
        
        if component not in self.budgets:
            return []
        
        budget = self.budgets[component]
        spend = budget['current_sudget']
        limit = budget['monthly_budget']
        
        alerts = []
        
        if spend > limit:
            alerts.append({
                'severity': 'critical',
                'message': f'{component} over budget: ${spend:.2f} > ${limit:.2f}'
            })
        elif spend > limit * 0.8:
            alerts.append({
                'severity': 'warning',
                'message': f'{component} at 80% budget: ${spend:.2f} / ${limit:.2f}'
            })
        
        return alerts
    
    def generate_report(self) -> dict:
        """Generate cost report."""
        
        report = {
            'generated_at': datetime.now().isoformat(),
            'budgets': {},
            'total_spend': 0
        }
        
        for component, budget in self.budgets.items():
            spend = budget['current_spend']
            limit = budget['monthly_budget']
            
            report['budgets'][component] = {
                'current_spend': spend,
                'monthly_budget': limit,
                'utilization_pct': (spend / limit * 100) if limit > 0 else 0,
                'projected_monthly': spend * (30 / (datetime.now() - 
                    budget['start_date']).days)
            }
            
            report['total_spend'] += spend
        
        return report

Best Practices Summary

Strategy Impact Implementation Complexity
Head Sampling 70-90% reduction Low
Tail Sampling 50-80% reduction Medium
Log Level Filtering 30-50% reduction Low
Retention Policies 40-60% reduction Low
Compression 5-15x storage savings Medium
Tiered Storage 50-80% reduction High
Adaptive Sampling 40-70% reduction High

External Resources


Comments