Introduction
Observability infrastructure can become a major cost driver at scale. This article covers strategies to optimize observability costs while maintaining actionable insights.
Key Statistics:
- Observability costs: 15-30% of cloud spend at scale
- Sampling can reduce costs by 70-90%
- Compression: 5-10x reduction in storage
- Retention optimization: 40-60% storage savings
Cost Breakdown
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Observability Cost Components โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Metrics (Prometheus/DataDog) โ
โ โโโ Storage: $0.01-0.03/GB/month โ
โ โโโ Ingestion: $0.01-0.03/10K samples โ
โ โโโ Retention: $0.01-0.02/GB/month per additional 30 days โ
โ โโโ Queries: $0.001-0.005/query โ
โ โ
โ Logs (Loki/ELK/Splunk) โ
โ โโโ Ingestion: $0.50-2.00/GB โ
โ โโโ Storage: $0.02-0.10/GB/month โ
โ โโโ Indexing: $0.50-5.00/GB โ
โ โโโ Retention: $0.01-0.05/GB/month per 30 days โ
โ โ
โ Traces (Jaeger/Zipkin/DataDog) โ
โ โโโ Ingestion: $0.50-5.00/MB โ
โ โโโ Storage: $0.05-0.20/GB/month โ
โ โโโ Retention: $0.02-0.10/GB/month per 30 days โ
โ โ
โ Infrastructure โ
โ โโโ Compute (collectors, processors) โ
โ โโโ Network (data transfer) โ
โ โโโ Management (RBAC, dashboards) โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Sampling Strategies
Head Sampling
#!/usr/bin/env python3
"""Head sampling - sample at the start of the pipeline."""
import random
import time
from typing import Callable, Any
from dataclasses import dataclass
@dataclass
class HeadSamplingConfig:
"""Head sampling configuration."""
sampling_rate: float = 0.1 # 10% of traces
min_duration_ms: int = 100 # Always sample >100ms
priority_sample: bool = True # Always sample errors/high latency
random_seed: int = 42
class HeadSampler:
"""Simple probabilistic sampling at ingestion."""
def __init__(self, config: HeadSamplingConfig):
self.config = config
random.seed(config.random_seed)
self.total_seen = 0
self.total_sampled = 0
def should_sample(self, trace: dict) -> bool:
"""Determine if trace should be sampled."""
self.total_seen += 1
# Always sample if priority
if self.config.priority_sample:
if self._is_priority_trace(trace):
self.total_sampled += 1
return True
# Always sample long traces
if self._get_duration_ms(trace) > self.config.min_duration_ms:
self.total_sampled += 1
return True
# Probabilistic sampling
if random.random() < self.config.sampling_rate:
self.total_sampled += 1
return True
return False
def _is_priority_trace(self, trace: dict) -> bool:
"""Check if trace is high priority (error, high latency)."""
# Check for errors
if trace.get('status_code', 200) >= 500:
return True
# Check for high latency
if self._get_duration_ms(trace) > 5000: # > 5s
return True
# Check for user priority tag
if trace.get('priority') == 'high':
return True
return False
def _get_duration_ms(self, trace: dict) -> float:
"""Calculate trace duration in milliseconds."""
if 'start_time' in trace and 'end_time' in trace:
return (trace['end_time'] - trace['start_time']) * 1000
return 0
def get_stats(self) -> dict:
"""Get sampling statistics."""
return {
'total_seen': self.total_seen,
'total_sampled': self.total_sampled,
'sample_rate': self.total_sampled / max(1, self.total_seen)
}
Tail-Based Sampling
#!/usr/bin/env python3
"""Tail-based sampling - sample after collecting full trace."""
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from collections import defaultdict
import threading
import time
@dataclass
class TraceData:
"""Complete trace data."""
trace_id: str
spans: List[Dict] = field(default_factory=list)
start_time: float = 0
end_time: float = 0
tags: Dict = field(default_factory=dict)
@property
def duration_ms(self) -> float:
"""Calculate trace duration."""
return (self.end_time - self.start_time) * 1000
@property
def error_count(self) -> int:
"""Count error spans."""
return sum(1 for span in self.spans if span.get('status_code', 200) >= 500)
class TailSamplingPolicy:
"""Tail-based sampling policies."""
def __init__(self):
self.policies = []
def add_error_policy(self, error_rate_threshold: float = 0.1):
"""Sample traces with high error rate."""
def policy(trace: TraceData) -> bool:
if trace.error_count > 0:
return True
return False
self.policies.append(('error', policy))
def add_latency_policy(self, latency_threshold_ms: float):
"""Sample traces exceeding latency threshold."""
def policy(trace: TraceData) -> bool:
return trace.duration_ms > latency_threshold_ms
self.policies.append(('latency', policy))
def add_probabilistic_policy(self, sample_rate: float):
"""Add probabilistic sampling."""
import random
def policy(trace: TraceData) -> bool:
return random.random() < sample_rate
self.policies.append(('probabilistic', policy))
def add_rare_attribute_policy(self, attributes: List[str]):
"""Sample traces with rare attribute values."""
seen_values = defaultdict(int)
def policy(trace: TraceData) -> bool:
for attr in attributes:
value = trace.tags.get(attr)
if value:
seen_values[value] += 1
# Sample if rare (< 1% of traces)
if seen_values[value] < 10:
return True
return False
self.policies.append(('rare_attribute', policy))
def should_sample(self, trace: TraceData) -> bool:
"""Check if any policy matches."""
for name, policy in self.policies:
if policy(trace):
return True
return False
class TailSamplingProcessor:
"""Tail-based sampling processor."""
def __init__(self, buffer_size: int = 10000,
flush_interval_sec: int = 10):
self.buffer: Dict[str, TraceData] = {}
self.policy = TailSamplingPolicy()
self.buffer_size = buffer_size
self.flush_interval = flush_interval_sec
self.lock = threading.Lock()
self.sampled_count = 0
self.dropped_count = 0
# Start background flush
self._start_flush_thread()
def add_span(self, span: Dict):
"""Add span to trace buffer."""
trace_id = span.get('trace_id')
with self.lock:
if trace_id not in self.buffer:
self.buffer[trace_id] = TraceData(
trace_id=trace_id,
start_time=span.get('timestamp', time.time())
)
self.buffer[trace_id].spans.append(span)
self.buffer[trace_id].end_time = span.get('timestamp', time.time())
self.buffer[trace_id].tags.update(span.get('tags', {}))
def _start_flush_thread(self):
"""Start background flush thread."""
import threading
def flush_loop():
while True:
time.sleep(self.flush_interval)
self._flush_completed_traces()
thread = threading.Thread(target=flush_loop, daemon=True)
thread.start()
def _flush_completed_traces(self):
"""Flush and sample completed traces."""
current_time = time.time()
with self.lock:
to_remove = []
for trace_id, trace in self.buffer.items():
# Consider trace complete if no spans for 5 seconds
if current_time - trace.end_time > 5:
if self.policy.should_sample(trace):
self._send_to_backend(trace)
self.sampled_count += 1
else:
self.dropped_count += 1
to_remove.append(trace_id)
for trace_id in to_remove:
del self.buffer[trace_id]
def _send_to_backend(self, trace: TraceData):
"""Send sampled trace to backend."""
# Implementation would send to Jaeger/Zipkin
pass
def get_stats(self) -> dict:
"""Get sampling statistics."""
return {
'buffer_size': len(self.buffer),
'sampled_count': self.sampled_count,
'dropped_count': self.dropped_count,
'sample_rate': self.sampled_count / max(1,
self.sampled_count + self.dropped_count)
}
OpenTelemetry Sampling
# OpenTelemetry Collector sampling configuration
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
# Probabilistic sampling
probabilistic_sampler:
hash_seed: 0
sampling_percentage: 10.0
# Tail-based sampling
tail_sampling:
decision_wait: 10s
policies:
- name: errors-policy
type: status_code
status_code: {status_codes: [ERROR]}
- name: slow-traces-policy
type: latency
latency: {threshold_ms: 1000}
- name: probabilistic-policy
type: probabilistic
probabilistic: {sampling_percentage: 10}
- name: errors-and-slow-policy
type: and
and:
and_policy:
- status_code: {status_codes: [ERROR]}
and_policy:
- latency: {threshold_ms: 500}
exporters:
otlp:
endpoint: "https://tempo.example.com:4317"
tls:
insecure: false
prometheus:
endpoint: "0.0.0.0:8889"
service:
pipelines:
traces:
receivers: [otlp]
processors: [tail_sampling]
exporters: [otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
Log Sampling
#!/usr/bin/env python3
"""Intelligent log sampling strategies."""
import hashlib
import json
import logging
from typing import Optional
from enum import Enum
class LogLevel(Enum):
"""Log severity levels."""
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
CRITICAL = 50
class LogSampler:
"""Sampling for high-volume log streams."""
def __init__(self, sample_rate: float = 0.1):
self.sample_rate = sample_rate
def should_sample(self, log: dict) -> bool:
"""Determine if log should be sampled."""
# Use trace_id or message hash for consistent sampling
content = log.get('trace_id', '') + log.get('message', '')
hash_val = int(hashlib.md5(content.encode()).hexdigest(), 16)
return (hash_val % 100) < (self.sample_rate * 100)
class AdaptiveLogSampler:
"""Adaptive log sampling based on volume."""
def __init__(self, target_rps: int = 1000):
self.target_rps = target_rps
self.current_rps = 0
self.sample_rate = 1.0
self.window_size = 60 # 60 second windows
# Track logs per window
self.log_counts = []
self.max_history = 100
def update(self, current_rps: int):
"""Update current RPS and adjust sample rate."""
self.current_rps = current_rps
self.log_counts.append(current_rps)
if len(self.log_counts) > self.max_history:
self.log_counts.pop(0)
# Calculate average
avg_rps = sum(self.log_counts) / len(self.log_counts)
# Adjust sample rate
if avg_rps > self.target_rps:
# Decrease sample rate
self.sample_rate = max(0.01, self.sample_rate * 0.9)
elif avg_rps < self.target_rps * 0.8:
# Increase sample rate
self.sample_rate = min(1.0, self.sample_rate * 1.1)
def should_sample(self, log: dict) -> bool:
"""Determine if log should be sampled."""
# Always sample errors
if log.get('level') in ['ERROR', 'CRITICAL']:
return True
# Probabilistic sampling based on adjusted rate
import random
return random.random() < self.sample_rate
class StructuredLogFilter:
"""Filter structured logs intelligently."""
def __init__(self):
# Always keep these
self.always_keep_keys = {'level', 'timestamp', 'trace_id',
'service', 'error'}
# Drop these keys if present
self.drop_keys = {'http_request_body', 'http_response_body'}
# Truncate these keys
self.truncate_keys = {'stack_trace': 500, 'message': 1000}
def filter_log(self, log: dict) -> Optional[dict]:
"""Filter and transform log entry."""
# Drop entire log if DEBUG and sampling disabled
if log.get('level') == 'DEBUG':
return None
# Create filtered copy
filtered = {}
for key, value in log.items():
# Skip dropped keys
if key in self.drop_keys:
continue
# Truncate long values
if key in self.truncate_keys:
max_len = self.truncate_keys[key]
if isinstance(value, str) and len(value) > max_len:
value = value[:max_len] + '...'
filtered[key] = value
return filtered
Retention Policies
Prometheus Retention
# Prometheus retention configuration
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
cluster: 'production'
environment: 'prod'
storage:
# Time series retention
tsdb.retention.time: 15d
# Maximum block size
tsdb.max-block.duration: 2h
# WAL (Write-Ahead Log)
wal_compression: true
# Remote write configuration for long-term storage
remote_write:
- url: https://remote-storage.example.com/api/v1/write
tls_config:
cert_file: /etc/prometheus/certs/client.crt
key_file: /etc/prometheus/certs/client.key
basic_auth:
username: prometheus
password: secret
queue_config:
capacity: 10000
max_shards: 30
min_shards: 1
max_samples_per_send: 500
batch_send_deadline: 5s
# Rule evaluation and recording
rule_files:
- /etc/prometheus/rules/*.yml
Loki Retention
# Loki retention configuration
schema_config:
configs:
- from: 2024-01-01
store: boltdb-shipper
object_store: s3
index:
prefix: loki_index_
period: 24h
limits_config:
# Retention period
retention_period: 15d
# Per-tenant limits
per_tenant_limits:
ingestion_rate_mb: 50
ingestion_burst_size_mb: 100
max_global_streams_per_tenant: 10000
max_chunks_per_query: 2000000
max_entries_limit_per_query: 5000
storage_config:
boltdb:
directory: /loki/index
aws:
s3: s3://.amazonaws.com/loki-storage
s3forcepathstyle: false
compactor:
working_directory: /loki/compactor
retention_enabled: true
deletion_mode: filter-and-delete
retention_delete_work_count: 15000
Compression Strategies
Time Series Compression
#!/usr/bin/env python3
"""Time series data compression."""
import struct
import zlib
from typing import List, Tuple
class TimeSeriesCompressor:
"""Compress time series data."""
def __init__(self):
self.delta_encoding = True
self.zlib_compression = True
def compress(self, timestamps: List[int],
values: List[float]) -> bytes:
"""Compress time series using delta encoding."""
compressed = bytearray()
# First timestamp is absolute
if timestamps:
compressed.extend(struct.pack('>Q', timestamps[0]))
# Delta encode subsequent timestamps
prev_ts = timestamps[0]
for ts in timestamps[1:]:
delta = ts - prev_ts
compressed.extend(self._encode_varint(delta))
prev_ts = ts
# Encode values
prev_val = 0
for val in values:
# Convert to integer (assuming 4 decimal places)
val_int = int(val * 10000)
delta = val_int - prev_val
compressed.extend(self._encode_varint(delta))
prev_val = val_int
# Optional zlib compression
if self.zlib_compression:
compressed = zlib.compress(bytes(compressed))
return bytes(compressed)
def _encode_varint(self, value: int) -> bytes:
"""Encode integer as varint."""
result = bytearray()
while value > 0x7F:
result.append((value & 0x7F) | 0x80)
value >>= 7
result.append(value & 0x7F)
return bytes(result)
class PrometheusTSDBCompression:
"""Prometheus TSDB compression analysis."""
# Typical compression ratios
COMPRESSION_RATIOS = {
'raw': 1.0,
'xor': 8.0,
'lz4': 10.0,
'zstd': 15.0,
'snappy': 12.0
}
@staticmethod
def estimate_storage(metrics: int,
retention_days: int,
scrape_interval: str = '15s') -> dict:
"""Estimate storage requirements."""
# Parse scrape interval
seconds = int(scrape_interval.rstrip('s'))
# Calculate samples per metric
samples_per_metric = (86400 / seconds) * retention_days
# Assume 16 bytes per sample (typical)
bytes_per_sample = 16
raw_size = metrics * samples_per_metric * bytes_per_sample
return {
'metrics_count': metrics,
'samples_per_metric': samples_per_metric,
'raw_size_gb': raw_size / (1024**3),
'compressed_size_gb': raw_size / (1024**3) / 10, # 10x compression
'estimated_cost_monthly': (raw_size / (1024**3) / 10) * 0.02
}
Cost Monitoring
#!/usr/bin/env python3
"""Monitor and alert on observability costs."""
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime, timedelta
import json
@dataclass
class CostAlert:
"""Cost alert configuration."""
metric_name: str
threshold_gb: float
warning_threshold_pct: float = 0.8
alert_type: str = "budget"
class ObservabilityCostMonitor:
"""Monitor observability infrastructure costs."""
def __init__(self):
self.cost_per_gb = {
'prometheus_metrics': 0.023, # $ per GB/month
'loki_logs': 0.10,
'jaeger_traces': 0.15,
'tempo_traces': 0.12,
'datadog': 0.023,
}
self.budgets = {}
self.alerts: List[CostAlert] = []
def set_budget(self, component: str, monthly_budget_usd: float):
"""Set monthly budget for component."""
self.budgets[component] = {
'monthly_budget': monthly_budget_usd,
'current_spend': 0,
'start_date': datetime.now()
}
def record_usage(self, component: str,
gb_used: float,
period_days: int = 30):
"""Record usage and calculate cost."""
# Calculate monthly equivalent
monthly_gb = gb_used * (30 / period_days)
cost = monthly_gb * self.cost_per_gb.get(component, 0.10)
if component in self.budgets:
self.budgets[component]['current_spend'] = cost
return cost
def check_alerts(self, component: str) -> List[dict]:
"""Check if budget alerts should fire."""
if component not in self.budgets:
return []
budget = self.budgets[component]
spend = budget['current_sudget']
limit = budget['monthly_budget']
alerts = []
if spend > limit:
alerts.append({
'severity': 'critical',
'message': f'{component} over budget: ${spend:.2f} > ${limit:.2f}'
})
elif spend > limit * 0.8:
alerts.append({
'severity': 'warning',
'message': f'{component} at 80% budget: ${spend:.2f} / ${limit:.2f}'
})
return alerts
def generate_report(self) -> dict:
"""Generate cost report."""
report = {
'generated_at': datetime.now().isoformat(),
'budgets': {},
'total_spend': 0
}
for component, budget in self.budgets.items():
spend = budget['current_spend']
limit = budget['monthly_budget']
report['budgets'][component] = {
'current_spend': spend,
'monthly_budget': limit,
'utilization_pct': (spend / limit * 100) if limit > 0 else 0,
'projected_monthly': spend * (30 / (datetime.now() -
budget['start_date']).days)
}
report['total_spend'] += spend
return report
Best Practices Summary
| Strategy | Impact | Implementation Complexity |
|---|---|---|
| Head Sampling | 70-90% reduction | Low |
| Tail Sampling | 50-80% reduction | Medium |
| Log Level Filtering | 30-50% reduction | Low |
| Retention Policies | 40-60% reduction | Low |
| Compression | 5-15x storage savings | Medium |
| Tiered Storage | 50-80% reduction | High |
| Adaptive Sampling | 40-70% reduction | High |
External Resources
- OpenTelemetry Sampling Documentation
- Prometheus Storage
- Loki Retention
- Google Cloud Operations Pricing
Related Articles
- Metrics Collection: Prometheus, InfluxDB, Telegraf
- Log Aggregation: ELK Stack, Loki, Splunk
- Alerting Strategy: Alert Fatigue, Runbooks
- SLO Implementation: Error Budgets
Comments