Introduction
Observability enables understanding of system behavior from its outputs. In distributed systems, where requests span multiple services, observability is essential for debugging, performance optimization, and maintaining reliability. The three pillars of observability—logs, metrics, and traces—provide complementary views into system behavior.
This guide covers structured logging, metrics collection with Prometheus, distributed tracing with OpenTelemetry, and alerting strategies for building observable systems.
Structured Logging
JSON Logging
Structured logging produces machine-readable log entries, enabling powerful querying and analysis.
import json
import logging
import sys
from datetime import datetime
from typing import Any, Dict
from contextvars import ContextVar
class StructuredLogger:
"""Structured JSON logger."""
def __init__(self, name: str = "app"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(self._JsonFormatter())
self.logger.addHandler(handler)
self.context: ContextVar[Dict] = ContextVar("log_context", default={})
def set_context(self, key: str, value: Any) -> None:
"""Set context variable for logging."""
current = self.context.get()
current[key] = value
self.context.set(current)
def clear_context(self) -> None:
"""Clear logging context."""
self.context.set({})
def log(self, level: str, message: str, **kwargs) -> None:
"""Log with structured data."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
**self.context.get(),
**kwargs
}
getattr(self.logger, level.lower())(json.dumps(log_data))
def info(self, message: str, **kwargs) -> None:
self.log("INFO", message, **kwargs)
def error(self, message: str, **kwargs) -> None:
self.log("ERROR", message, **kwargs)
def warning(self, message: str, **kwargs) -> None:
self.log("WARNING", message, **kwargs)
def debug(self, message: str, **kwargs) -> None:
self.log("DEBUG", message, **kwargs)
class _JsonFormatter(logging.Formatter):
def format(self, record):
return record.getMessage()
# Usage
logger = StructuredLogger("api")
logger.set_context("request_id", "req-123")
logger.set_context("user_id", "user-456")
logger.info("User action", action="login", ip="192.168.1.1")
Log Correlation
import uuid
from functools import wraps
class RequestContext:
"""Manage request-scoped context for logging."""
request_id: ContextVar[str] = ContextVar("request_id", default=None)
user_id: ContextVar[str] = ContextVar("user_id", default=None)
span_id: ContextVar[str] = ContextVar("span_id", default=None)
@classmethod
def start_request(cls) -> str:
"""Start a new request context."""
request_id = f"req-{uuid.uuid4().hex[:12]}"
cls.request_id.set(request_id)
return request_id
@classmethod
def get_context(cls) -> Dict:
"""Get current context."""
return {
"request_id": cls.request_id.get(),
"user_id": cls.user_id.get(),
"span_id": cls.span_id.get()
}
def with_request_context(func):
"""Decorator to add request context to function calls."""
@wraps(func)
def wrapper(*args, **kwargs):
RequestContext.start_request()
try:
return func(*args, **kwargs)
finally:
RequestContext.clear_context()
return wrapper
Metrics with Prometheus
Metric Types
from prometheus_client import Counter, Histogram, Gauge, Summary, start_http_server
import random
from time import sleep
# Counter: Monotonically increasing values
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
# Histogram: Distribution of values
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# Gauge: Current value
ACTIVE_USERS = Gauge(
"active_users",
"Number of currently active users"
)
# Summary: Similar to histogram
ORDER_PROCESSING_TIME = Summary(
"order_processing_seconds",
"Time to process an order",
["status"]
)
class MetricsCollector:
"""Collect and expose application metrics."""
def __init__(self, port: int = 8000):
self.port = port
def start_server(self):
"""Start Prometheus metrics server."""
start_http_server(self.port)
def record_request(self, method: str, endpoint: str, status: int, duration: float):
"""Record HTTP request metrics."""
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=str(status)
).inc()
REQUEST_LATENCY.labels(
method=method,
endpoint=endpoint
).observe(duration)
def set_active_users(self, count: int):
"""Set active user count."""
ACTIVE_USERS.set(count)
def record_order_processing(self, status: str, duration: float):
"""Record order processing time."""
ORDER_PROCESSING_TIME.labels(status=status).observe(duration)
Custom Business Metrics
from prometheus_client import Counter, Gauge, Info
# Business-specific metrics
USER_REGISTRATION_COUNT = Counter(
"user_registrations_total",
"Total user registrations",
["country", "source"]
)
ORDER_VALUE = Counter(
"order_value_total",
"Total order value",
["currency", "status"]
)
CART_ABANDONMENT_RATE = Gauge(
"cart_abandonment_rate",
"Rate of abandoned shopping carts"
)
PRODUCT_VIEW_COUNT = Counter(
"product_views_total",
"Total product views",
["category", "product_id"]
)
class BusinessMetrics:
"""Business-level metrics collection."""
def __init__(self):
self._info = {}
def track_registration(self, country: str, source: str):
"""Track user registration."""
USER_REGISTRATION_COUNT.labels(country=country, source=source).inc()
def track_order_value(self, amount: float, currency: str, status: str):
"""Track order value."""
ORDER_VALUE.labels(currency=currency, status=status).inc(amount)
def set_cart_abandonment_rate(self, rate: float):
"""Set cart abandonment rate."""
CART_ABANDONMENT_RATE.set(rate)
def track_product_view(self, category: str, product_id: str):
"""Track product view."""
PRODUCT_VIEW_COUNT.labels(category=category, product_id=product_id).inc()
Distributed Tracing
OpenTelemetry Integration
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import requests
# Initialize tracer
resource = Resource.create({"service.name": "order-service"})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
propagator = TraceContextTextMapPropagator()
class TracingClient:
"""HTTP client with distributed tracing."""
def __init__(self, base_url: str):
self.base_url = base_url
def get(self, path: str, headers: Dict = None) -> Dict:
"""GET request with trace propagation."""
url = f"{self.base_url}{path}"
# Inject trace context into headers
carrier = {}
propagator.inject(carrier)
request_headers = {**(headers or {}), **carrier}
with tracer.start_as_current_span("http GET") as span:
span.set_attribute("http.method", "GET")
span.set_attribute("http.url", url)
try:
response = requests.get(url, headers=request_headers)
span.set_attribute("http.status_code", response.status_code)
if response.status_code >= 400:
span.set_status(Status(StatusCode.ERROR))
return response.json()
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Trace-Based Operations
from contextlib import contextmanager
from typing import Generator
class TracingService:
"""Service for creating and managing traces."""
def __init__(self, tracer):
self.tracer = tracer
@contextmanager
def create_span(
self,
name: str,
attributes: Dict = None
) -> Generator:
"""Create a new span."""
with self.tracer.start_as_current_span(name) as span:
if attributes:
for key, value in attributes.items():
span.set_attribute(key, value)
try:
yield span
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
def create_child_span(self, name: str, attributes: Dict = None):
"""Create a child span from current context."""
return self.create_span(name, attributes)
Alerting
Alert Rules and Notification
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Callable, List
@dataclass
class AlertRule:
name: str
condition: Callable[[], bool]
severity: str # critical, warning, info
message: str
cooldown: int = 300 # seconds
class AlertManager:
"""Manage alerts and notifications."""
def __init__(self):
self.rules: List[AlertRule] = []
self.alert_history: List[dict] = []
self.notification_channels: List[Callable] = []
def add_rule(self, rule: AlertRule) -> None:
"""Add an alert rule."""
self.rules.append(rule)
def add_notification_channel(self, channel: Callable) -> None:
"""Add a notification channel."""
self.notification_channels.append(channel)
def check_rules(self) -> List[dict]:
"""Check all rules and fire alerts."""
fired = []
for rule in self.rules:
# Check cooldown
recent = [
a for a in self.alert_history
if a["rule_name"] == rule.name
and (datetime.utcnow() - a["fired_at"]).seconds < rule.cooldown
]
if recent:
continue
try:
if rule.condition():
alert = {
"rule_name": rule.name,
"severity": rule.severity,
"message": rule.message,
"fired_at": datetime.utcnow()
}
fired.append(alert)
self.alert_history.append(alert)
for channel in self.notification_channels:
channel(alert)
except Exception:
pass
return fired
# Example alert rules
def create_alert_rules(metrics_service) -> List[AlertRule]:
"""Create common alert rules."""
return [
AlertRule(
name="high_error_rate",
condition=lambda: metrics_service.get_error_rate() > 0.05,
severity="critical",
message="Error rate exceeds 5%"
),
AlertRule(
name="high_latency",
condition=lambda: metrics_service.get_p99_latency() > 2.0,
severity="warning",
message="P99 latency exceeds 2 seconds"
),
AlertRule(
name="low_availability",
condition=lambda: metrics_service.get_availability() < 0.99,
severity="critical",
message="Availability below 99%"
)
]
Prometheus Architecture
┌─────────────────────────────────────────────────────────────┐
│ Prometheus Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Prometheus Server │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Scraper │ │TSDB │ │Alerts │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ ▲ ▲ ▲ │
│ │ │ │ │
│ ┌────┴────┐ ┌─────┴─────┐ ┌─────┴─────┐ │
│ │Targets │ │Alerts │ │ Grafana │ │
│ │(Exporters)│ │Manager │ │ Query UI │ │
│ └─────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
### Service Discovery
```yaml
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kubernetes-applications'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
```text
## Grafana Dashboards
### Dashboard Configuration
```json
{
"dashboard": {
"title": "Payment Service Overview",
"panels": [
{
"title": "Requests per Second",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
]
},
{
"title": "Error Rate",
"type": "stat",
"targets": [
{
"expr": "sum(rate(http_requests_total{status=~\"5..\"}[5m])) / sum(rate(http_requests_total[5m]))",
"unit": "percentunit"
}
],
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{"value": 0, "color": "green"},
{"value": 0.01, "color": "yellow"},
{"value": 0.05, "color": "red"}
]
}
}
}
},
{
"title": "P99 Latency",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))",
"legendFormat": "P99"
}
]
}
]
}
}
```text
### Alert Rules
```yaml
groups:
- name: payment-service
rules:
- alert: HighErrorRate
expr: |
sum(rate(payment_requests_total{status=~"5.."}[5m]))
/ sum(rate(payment_requests_total[5m])) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "Payment service error rate is high"
description: "{{ $value | humanizePercentage }} error rate"
- alert: HighLatency
expr: |
histogram_quantile(0.99,
rate(payment_request_duration_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "Payment service latency is high"
description: "P99 latency is {{ $value | humanizeDuration }}"
```text
## Log Aggregation with Loki
### Loki Configuration
```yaml
# loki-config.yaml
server:
http_listen_port: 3100
common:
path_prefix: /loki
storage:
filesystem:
chunks_directory: /loki/chunks
rules_directory: /loki/rules
replication_factor: 1
ring:
instance_addr: 127.0.0.1
kvstore:
store: inmemory
schema_config:
configs:
- from: 2026-01-01
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
limits_config:
reject_old_samples: true
reject_old_samples_max_age: 168h
```text
### Log Queries
```logql
# Find errors in payment service
{job="payment-service"} |= "ERROR"
# Parse and filter
{job="payment-service"} | json | level="error" | duration > 1s
# Count by level
sum(count_over_time({job="payment-service"}[5m])) by (level)
# Latency percentiles
histogram_quantile(0.99, sum(rate({job="api"}[5m])) by (le))
```text
## Dashboard Best Practices
### Key Metrics by Service
| Service | Key Metrics |
|---------|-------------|
| API | requests_total, request_duration_seconds, request_size_bytes, errors_total |
| Database | connections_active, query_duration_seconds, queries_total |
| Cache | hits_total, misses_total, evictions_total |
### Dashboard Guidelines
1. **Use red/green/yellow thresholds**: Clear visual indicators
2. **Show trends**: Include time series, not just current values
3. **Link to runbooks**: Include links from alerts to documentation
4. **Set appropriate ranges**: Match to your SLOs
5. **Use meaningful names**: Service + metric description
## Alert Design
### Alert Quality
┌─────────────────────────────────────────────────────────────┐ │ Good Alert Characteristics │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ✓ Actionable: Clear what to do │ │ ✓ Timely: Fires before users notice │ │ ✓ Accurate: Low false positive rate │ │ ✓ Relevant: Indicates real problem │ │ ✓ Prioritized: Severity matches impact │ │ │ └─────────────────────────────────────────────────────────────┘
### Runbook Integration
```yaml
annotations:
summary: "Database connection pool exhausted"
description: "{{ $value }} active connections"
runbook_url: "https://wiki.example.com/runbooks/db-connections"
# Runbook content
# # Database Connection Pool Exhaustion
#
# ## Symptoms
# - High number of active connections
# - Applications timing out on DB queries
#
# ## Impact
# - Failed database writes
# - User-facing errors
#
# ## Resolution
# 1. Check for long-running queries
# 2. Identify and kill blocking sessions
# 3. Scale up connection pool if needed
```text
## Best Practices
1. **Start with SLOs**: Define what good looks like
2. **Alert on symptoms**: Not causes
3. **Use golden signals**: Latency, traffic, errors, saturation
4. **Keep dashboards focused**: One per service
5. **Automate remediation**: Where possible
## Conclusion
Observability is essential for operating distributed systems. Structured logging provides detailed records for debugging. Metrics offer quantitative understanding of system behavior. Traces reveal request flow across services. Together with alerting, they enable proactive monitoring and rapid incident response.
Key practices: implement all three pillars of observability, use consistent context propagation, establish meaningful alert thresholds, and invest in dashboards that provide actionable insights. Observability is not a feature but an operational capability.
## Resources
- "Observability Engineering" by Charity Majors
- Prometheus Documentation
- OpenTelemetry Documentation
- "Site Reliability Engineering" by Google
Comments