Introduction
Observability enables understanding of system behavior from its outputs. In distributed systems, where requests span multiple services, observability is essential for debugging, performance optimization, and maintaining reliability. The three pillars of observabilityโlogs, metrics, and tracesโprovide complementary views into system behavior.
This guide covers structured logging, metrics collection with Prometheus, distributed tracing with OpenTelemetry, and alerting strategies for building observable systems.
Structured Logging
JSON Logging
Structured logging produces machine-readable log entries, enabling powerful querying and analysis.
import json
import logging
import sys
from datetime import datetime
from typing import Any, Dict
from contextvars import ContextVar
class StructuredLogger:
"""Structured JSON logger."""
def __init__(self, name: str = "app"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(self._JsonFormatter())
self.logger.addHandler(handler)
self.context: ContextVar[Dict] = ContextVar("log_context", default={})
def set_context(self, key: str, value: Any) -> None:
"""Set context variable for logging."""
current = self.context.get()
current[key] = value
self.context.set(current)
def clear_context(self) -> None:
"""Clear logging context."""
self.context.set({})
def log(self, level: str, message: str, **kwargs) -> None:
"""Log with structured data."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
**self.context.get(),
**kwargs
}
getattr(self.logger, level.lower())(json.dumps(log_data))
def info(self, message: str, **kwargs) -> None:
self.log("INFO", message, **kwargs)
def error(self, message: str, **kwargs) -> None:
self.log("ERROR", message, **kwargs)
def warning(self, message: str, **kwargs) -> None:
self.log("WARNING", message, **kwargs)
def debug(self, message: str, **kwargs) -> None:
self.log("DEBUG", message, **kwargs)
class _JsonFormatter(logging.Formatter):
def format(self, record):
return record.getMessage()
# Usage
logger = StructuredLogger("api")
logger.set_context("request_id", "req-123")
logger.set_context("user_id", "user-456")
logger.info("User action", action="login", ip="192.168.1.1")
Log Correlation
import uuid
from functools import wraps
class RequestContext:
"""Manage request-scoped context for logging."""
request_id: ContextVar[str] = ContextVar("request_id", default=None)
user_id: ContextVar[str] = ContextVar("user_id", default=None)
span_id: ContextVar[str] = ContextVar("span_id", default=None)
@classmethod
def start_request(cls) -> str:
"""Start a new request context."""
request_id = f"req-{uuid.uuid4().hex[:12]}"
cls.request_id.set(request_id)
return request_id
@classmethod
def get_context(cls) -> Dict:
"""Get current context."""
return {
"request_id": cls.request_id.get(),
"user_id": cls.user_id.get(),
"span_id": cls.span_id.get()
}
def with_request_context(func):
"""Decorator to add request context to function calls."""
@wraps(func)
def wrapper(*args, **kwargs):
RequestContext.start_request()
try:
return func(*args, **kwargs)
finally:
RequestContext.clear_context()
return wrapper
Metrics with Prometheus
Metric Types
from prometheus_client import Counter, Histogram, Gauge, Summary, start_http_server
import random
from time import sleep
# Counter: Monotonically increasing values
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
# Histogram: Distribution of values
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# Gauge: Current value
ACTIVE_USERS = Gauge(
"active_users",
"Number of currently active users"
)
# Summary: Similar to histogram
ORDER_PROCESSING_TIME = Summary(
"order_processing_seconds",
"Time to process an order",
["status"]
)
class MetricsCollector:
"""Collect and expose application metrics."""
def __init__(self, port: int = 8000):
self.port = port
def start_server(self):
"""Start Prometheus metrics server."""
start_http_server(self.port)
def record_request(self, method: str, endpoint: str, status: int, duration: float):
"""Record HTTP request metrics."""
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=str(status)
).inc()
REQUEST_LATENCY.labels(
method=method,
endpoint=endpoint
).observe(duration)
def set_active_users(self, count: int):
"""Set active user count."""
ACTIVE_USERS.set(count)
def record_order_processing(self, status: str, duration: float):
"""Record order processing time."""
ORDER_PROCESSING_TIME.labels(status=status).observe(duration)
Custom Business Metrics
from prometheus_client import Counter, Gauge, Info
# Business-specific metrics
USER_REGISTRATION_COUNT = Counter(
"user_registrations_total",
"Total user registrations",
["country", "source"]
)
ORDER_VALUE = Counter(
"order_value_total",
"Total order value",
["currency", "status"]
)
CART_ABANDONMENT_RATE = Gauge(
"cart_abandonment_rate",
"Rate of abandoned shopping carts"
)
PRODUCT_VIEW_COUNT = Counter(
"product_views_total",
"Total product views",
["category", "product_id"]
)
class BusinessMetrics:
"""Business-level metrics collection."""
def __init__(self):
self._info = {}
def track_registration(self, country: str, source: str):
"""Track user registration."""
USER_REGISTRATION_COUNT.labels(country=country, source=source).inc()
def track_order_value(self, amount: float, currency: str, status: str):
"""Track order value."""
ORDER_VALUE.labels(currency=currency, status=status).inc(amount)
def set_cart_abandonment_rate(self, rate: float):
"""Set cart abandonment rate."""
CART_ABANDONMENT_RATE.set(rate)
def track_product_view(self, category: str, product_id: str):
"""Track product view."""
PRODUCT_VIEW_COUNT.labels(category=category, product_id=product_id).inc()
Distributed Tracing
OpenTelemetry Integration
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import requests
# Initialize tracer
resource = Resource.create({"service.name": "order-service"})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
propagator = TraceContextTextMapPropagator()
class TracingClient:
"""HTTP client with distributed tracing."""
def __init__(self, base_url: str):
self.base_url = base_url
def get(self, path: str, headers: Dict = None) -> Dict:
"""GET request with trace propagation."""
url = f"{self.base_url}{path}"
# Inject trace context into headers
carrier = {}
propagator.inject(carrier)
request_headers = {**(headers or {}), **carrier}
with tracer.start_as_current_span("http GET") as span:
span.set_attribute("http.method", "GET")
span.set_attribute("http.url", url)
try:
response = requests.get(url, headers=request_headers)
span.set_attribute("http.status_code", response.status_code)
if response.status_code >= 400:
span.set_status(Status(StatusCode.ERROR))
return response.json()
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Trace-Based Operations
from contextlib import contextmanager
from typing import Generator
class TracingService:
"""Service for creating and managing traces."""
def __init__(self, tracer):
self.tracer = tracer
@contextmanager
def create_span(
self,
name: str,
attributes: Dict = None
) -> Generator:
"""Create a new span."""
with self.tracer.start_as_current_span(name) as span:
if attributes:
for key, value in attributes.items():
span.set_attribute(key, value)
try:
yield span
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
def create_child_span(self, name: str, attributes: Dict = None):
"""Create a child span from current context."""
return self.create_span(name, attributes)
Alerting
Alert Rules and Notification
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Callable, List
@dataclass
class AlertRule:
name: str
condition: Callable[[], bool]
severity: str # critical, warning, info
message: str
cooldown: int = 300 # seconds
class AlertManager:
"""Manage alerts and notifications."""
def __init__(self):
self.rules: List[AlertRule] = []
self.alert_history: List[dict] = []
self.notification_channels: List[Callable] = []
def add_rule(self, rule: AlertRule) -> None:
"""Add an alert rule."""
self.rules.append(rule)
def add_notification_channel(self, channel: Callable) -> None:
"""Add a notification channel."""
self.notification_channels.append(channel)
def check_rules(self) -> List[dict]:
"""Check all rules and fire alerts."""
fired = []
for rule in self.rules:
# Check cooldown
recent = [
a for a in self.alert_history
if a["rule_name"] == rule.name
and (datetime.utcnow() - a["fired_at"]).seconds < rule.cooldown
]
if recent:
continue
try:
if rule.condition():
alert = {
"rule_name": rule.name,
"severity": rule.severity,
"message": rule.message,
"fired_at": datetime.utcnow()
}
fired.append(alert)
self.alert_history.append(alert)
for channel in self.notification_channels:
channel(alert)
except Exception:
pass
return fired
# Example alert rules
def create_alert_rules(metrics_service) -> List[AlertRule]:
"""Create common alert rules."""
return [
AlertRule(
name="high_error_rate",
condition=lambda: metrics_service.get_error_rate() > 0.05,
severity="critical",
message="Error rate exceeds 5%"
),
AlertRule(
name="high_latency",
condition=lambda: metrics_service.get_p99_latency() > 2.0,
severity="warning",
message="P99 latency exceeds 2 seconds"
),
AlertRule(
name="low_availability",
condition=lambda: metrics_service.get_availability() < 0.99,
severity="critical",
message="Availability below 99%"
)
]
Conclusion
Observability is essential for operating distributed systems. Structured logging provides detailed records for debugging. Metrics offer quantitative understanding of system behavior. Traces reveal request flow across services. Together with alerting, they enable proactive monitoring and rapid incident response.
Key practices: implement all three pillars of observability, use consistent context propagation, establish meaningful alert thresholds, and invest in dashboards that provide actionable insights. Observability is not a feature but an operational capability.
Resources
- “Observability Engineering” by Charity Majors
- Prometheus Documentation
- OpenTelemetry Documentation
- “Site Reliability Engineering” by Google
Comments