Skip to main content

Observability Patterns: Logging, Metrics, and Tracing for Distributed Systems

Published: March 19, 2026 Updated: May 8, 2026 Larry Qu 9 min read

Introduction

Observability enables understanding of system behavior from its outputs. In distributed systems, where requests span multiple services, observability is essential for debugging, performance optimization, and maintaining reliability. The three pillars of observability—logs, metrics, and traces—provide complementary views into system behavior.

This guide covers structured logging, metrics collection with Prometheus, distributed tracing with OpenTelemetry, and alerting strategies for building observable systems.

Structured Logging

JSON Logging

Structured logging produces machine-readable log entries, enabling powerful querying and analysis.

import json
import logging
import sys
from datetime import datetime
from typing import Any, Dict
from contextvars import ContextVar

class StructuredLogger:
    """Structured JSON logger."""
    
    def __init__(self, name: str = "app"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(self._JsonFormatter())
        self.logger.addHandler(handler)
        
        self.context: ContextVar[Dict] = ContextVar("log_context", default={})
    
    def set_context(self, key: str, value: Any) -> None:
        """Set context variable for logging."""
        current = self.context.get()
        current[key] = value
        self.context.set(current)
    
    def clear_context(self) -> None:
        """Clear logging context."""
        self.context.set({})
    
    def log(self, level: str, message: str, **kwargs) -> None:
        """Log with structured data."""
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": level,
            "message": message,
            **self.context.get(),
            **kwargs
        }
        
        getattr(self.logger, level.lower())(json.dumps(log_data))
    
    def info(self, message: str, **kwargs) -> None:
        self.log("INFO", message, **kwargs)
    
    def error(self, message: str, **kwargs) -> None:
        self.log("ERROR", message, **kwargs)
    
    def warning(self, message: str, **kwargs) -> None:
        self.log("WARNING", message, **kwargs)
    
    def debug(self, message: str, **kwargs) -> None:
        self.log("DEBUG", message, **kwargs)
    
    class _JsonFormatter(logging.Formatter):
        def format(self, record):
            return record.getMessage()

# Usage
logger = StructuredLogger("api")

logger.set_context("request_id", "req-123")
logger.set_context("user_id", "user-456")

logger.info("User action", action="login", ip="192.168.1.1")

Log Correlation

import uuid
from functools import wraps

class RequestContext:
    """Manage request-scoped context for logging."""
    
    request_id: ContextVar[str] = ContextVar("request_id", default=None)
    user_id: ContextVar[str] = ContextVar("user_id", default=None)
    span_id: ContextVar[str] = ContextVar("span_id", default=None)
    
    @classmethod
    def start_request(cls) -> str:
        """Start a new request context."""
        request_id = f"req-{uuid.uuid4().hex[:12]}"
        cls.request_id.set(request_id)
        return request_id
    
    @classmethod
    def get_context(cls) -> Dict:
        """Get current context."""
        return {
            "request_id": cls.request_id.get(),
            "user_id": cls.user_id.get(),
            "span_id": cls.span_id.get()
        }

def with_request_context(func):
    """Decorator to add request context to function calls."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        RequestContext.start_request()
        try:
            return func(*args, **kwargs)
        finally:
            RequestContext.clear_context()
    return wrapper

Metrics with Prometheus

Metric Types

from prometheus_client import Counter, Histogram, Gauge, Summary, start_http_server
import random
from time import sleep

# Counter: Monotonically increasing values
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"]
)

# Histogram: Distribution of values
REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "HTTP request latency",
    ["method", "endpoint"],
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

# Gauge: Current value
ACTIVE_USERS = Gauge(
    "active_users",
    "Number of currently active users"
)

# Summary: Similar to histogram
ORDER_PROCESSING_TIME = Summary(
    "order_processing_seconds",
    "Time to process an order",
    ["status"]
)

class MetricsCollector:
    """Collect and expose application metrics."""
    
    def __init__(self, port: int = 8000):
        self.port = port
    
    def start_server(self):
        """Start Prometheus metrics server."""
        start_http_server(self.port)
    
    def record_request(self, method: str, endpoint: str, status: int, duration: float):
        """Record HTTP request metrics."""
        REQUEST_COUNT.labels(
            method=method,
            endpoint=endpoint,
            status=str(status)
        ).inc()
        
        REQUEST_LATENCY.labels(
            method=method,
            endpoint=endpoint
        ).observe(duration)
    
    def set_active_users(self, count: int):
        """Set active user count."""
        ACTIVE_USERS.set(count)
    
    def record_order_processing(self, status: str, duration: float):
        """Record order processing time."""
        ORDER_PROCESSING_TIME.labels(status=status).observe(duration)

Custom Business Metrics

from prometheus_client import Counter, Gauge, Info

# Business-specific metrics
USER_REGISTRATION_COUNT = Counter(
    "user_registrations_total",
    "Total user registrations",
    ["country", "source"]
)

ORDER_VALUE = Counter(
    "order_value_total",
    "Total order value",
    ["currency", "status"]
)

CART_ABANDONMENT_RATE = Gauge(
    "cart_abandonment_rate",
    "Rate of abandoned shopping carts"
)

PRODUCT_VIEW_COUNT = Counter(
    "product_views_total",
    "Total product views",
    ["category", "product_id"]
)

class BusinessMetrics:
    """Business-level metrics collection."""
    
    def __init__(self):
        self._info = {}
    
    def track_registration(self, country: str, source: str):
        """Track user registration."""
        USER_REGISTRATION_COUNT.labels(country=country, source=source).inc()
    
    def track_order_value(self, amount: float, currency: str, status: str):
        """Track order value."""
        ORDER_VALUE.labels(currency=currency, status=status).inc(amount)
    
    def set_cart_abandonment_rate(self, rate: float):
        """Set cart abandonment rate."""
        CART_ABANDONMENT_RATE.set(rate)
    
    def track_product_view(self, category: str, product_id: str):
        """Track product view."""
        PRODUCT_VIEW_COUNT.labels(category=category, product_id=product_id).inc()

Distributed Tracing

OpenTelemetry Integration

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import requests

# Initialize tracer
resource = Resource.create({"service.name": "order-service"})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)
propagator = TraceContextTextMapPropagator()

class TracingClient:
    """HTTP client with distributed tracing."""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
    
    def get(self, path: str, headers: Dict = None) -> Dict:
        """GET request with trace propagation."""
        url = f"{self.base_url}{path}"
        
        # Inject trace context into headers
        carrier = {}
        propagator.inject(carrier)
        
        request_headers = {**(headers or {}), **carrier}
        
        with tracer.start_as_current_span("http GET") as span:
            span.set_attribute("http.method", "GET")
            span.set_attribute("http.url", url)
            
            try:
                response = requests.get(url, headers=request_headers)
                span.set_attribute("http.status_code", response.status_code)
                
                if response.status_code >= 400:
                    span.set_status(Status(StatusCode.ERROR))
                
                return response.json()
            except Exception as e:
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                raise

Trace-Based Operations

from contextlib import contextmanager
from typing import Generator

class TracingService:
    """Service for creating and managing traces."""
    
    def __init__(self, tracer):
        self.tracer = tracer
    
    @contextmanager
    def create_span(
        self,
        name: str,
        attributes: Dict = None
    ) -> Generator:
        """Create a new span."""
        with self.tracer.start_as_current_span(name) as span:
            if attributes:
                for key, value in attributes.items():
                    span.set_attribute(key, value)
            
            try:
                yield span
                span.set_status(Status(StatusCode.OK))
            except Exception as e:
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                raise
    
    def create_child_span(self, name: str, attributes: Dict = None):
        """Create a child span from current context."""
        return self.create_span(name, attributes)

Alerting

Alert Rules and Notification

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Callable, List

@dataclass
class AlertRule:
    name: str
    condition: Callable[[], bool]
    severity: str  # critical, warning, info
    message: str
    cooldown: int = 300  # seconds

class AlertManager:
    """Manage alerts and notifications."""
    
    def __init__(self):
        self.rules: List[AlertRule] = []
        self.alert_history: List[dict] = []
        self.notification_channels: List[Callable] = []
    
    def add_rule(self, rule: AlertRule) -> None:
        """Add an alert rule."""
        self.rules.append(rule)
    
    def add_notification_channel(self, channel: Callable) -> None:
        """Add a notification channel."""
        self.notification_channels.append(channel)
    
    def check_rules(self) -> List[dict]:
        """Check all rules and fire alerts."""
        fired = []
        
        for rule in self.rules:
            # Check cooldown
            recent = [
                a for a in self.alert_history
                if a["rule_name"] == rule.name
                and (datetime.utcnow() - a["fired_at"]).seconds < rule.cooldown
            ]
            
            if recent:
                continue
            
            try:
                if rule.condition():
                    alert = {
                        "rule_name": rule.name,
                        "severity": rule.severity,
                        "message": rule.message,
                        "fired_at": datetime.utcnow()
                    }
                    fired.append(alert)
                    self.alert_history.append(alert)
                    
                    for channel in self.notification_channels:
                        channel(alert)
            except Exception:
                pass
        
        return fired

# Example alert rules
def create_alert_rules(metrics_service) -> List[AlertRule]:
    """Create common alert rules."""
    return [
        AlertRule(
            name="high_error_rate",
            condition=lambda: metrics_service.get_error_rate() > 0.05,
            severity="critical",
            message="Error rate exceeds 5%"
        ),
        AlertRule(
            name="high_latency",
            condition=lambda: metrics_service.get_p99_latency() > 2.0,
            severity="warning",
            message="P99 latency exceeds 2 seconds"
        ),
        AlertRule(
            name="low_availability",
            condition=lambda: metrics_service.get_availability() < 0.99,
            severity="critical",
            message="Availability below 99%"
        )
    ]

Prometheus Architecture

┌─────────────────────────────────────────────────────────────┐
│                  Prometheus Architecture                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────────────────────────────────────┐             │
│  │           Prometheus Server               │             │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  │             │
│  │  │Scraper │  │TSDB   │  │Alerts  │  │             │
│  │  │        │  │      │  │       │  │             │
│  │  └─────────┘  └─────────┘  └─────────┘  │             │
│  └──────────────────────────────────────────┘             │
│         ▲                ▲               ▲                   │
│         │                │               │                   │
│    ┌────┴────┐    ┌─────┴─────┐   ┌─────┴─────┐          │
│    │Targets  │    │Alerts    │   │ Grafana   │          │
│    │(Exporters)│   │Manager   │   │ Query UI  │          │
│    └─────────┘    └───────────┘   └───────────┘          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

### Service Discovery

```yaml
# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'kubernetes-applications'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)
      - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
        action: replace
        regex: ([^:]+)(?::\d+)?;(\d+)
        replacement: $1:$2
        target_label: __address__
```text

## Grafana Dashboards

### Dashboard Configuration

```json
{
  "dashboard": {
    "title": "Payment Service Overview",
    "panels": [
      {
        "title": "Requests per Second",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(http_requests_total[5m])",
            "legendFormat": "{{method}} {{endpoint}}"
          }
        ]
      },
      {
        "title": "Error Rate",
        "type": "stat",
        "targets": [
          {
            "expr": "sum(rate(http_requests_total{status=~\"5..\"}[5m])) / sum(rate(http_requests_total[5m]))",
            "unit": "percentunit"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "thresholds": {
              "mode": "absolute",
              "steps": [
                {"value": 0, "color": "green"},
                {"value": 0.01, "color": "yellow"},
                {"value": 0.05, "color": "red"}
              ]
            }
          }
        }
      },
      {
        "title": "P99 Latency",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))",
            "legendFormat": "P99"
          }
        ]
      }
    ]
  }
}
```text

### Alert Rules

```yaml
groups:
  - name: payment-service
    rules:
      - alert: HighErrorRate
        expr: |
          sum(rate(payment_requests_total{status=~"5.."}[5m])) 
          / sum(rate(payment_requests_total[5m])) > 0.05
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Payment service error rate is high"
          description: "{{ $value | humanizePercentage }} error rate"

      - alert: HighLatency
        expr: |
          histogram_quantile(0.99, 
            rate(payment_request_duration_seconds_bucket[5m])) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Payment service latency is high"
          description: "P99 latency is {{ $value | humanizeDuration }}"
```text

## Log Aggregation with Loki

### Loki Configuration

```yaml
# loki-config.yaml
server:
  http_listen_port: 3100

common:
  path_prefix: /loki
  storage:
    filesystem:
      chunks_directory: /loki/chunks
      rules_directory: /loki/rules
  replication_factor: 1
  ring:
    instance_addr: 127.0.0.1
    kvstore:
      store: inmemory

schema_config:
  configs:
    - from: 2026-01-01
      store: boltdb-shipper
      object_store: filesystem
      schema: v11
      index:
        prefix: index_
        period: 24h

limits_config:
  reject_old_samples: true
  reject_old_samples_max_age: 168h
```text

### Log Queries

```logql
# Find errors in payment service
{job="payment-service"} |= "ERROR"

# Parse and filter
{job="payment-service"} | json | level="error" | duration > 1s

# Count by level
sum(count_over_time({job="payment-service"}[5m])) by (level)

# Latency percentiles
histogram_quantile(0.99, sum(rate({job="api"}[5m])) by (le))
```text

## Dashboard Best Practices

### Key Metrics by Service

| Service | Key Metrics |
|---------|-------------|
| API | requests_total, request_duration_seconds, request_size_bytes, errors_total |
| Database | connections_active, query_duration_seconds, queries_total |
| Cache | hits_total, misses_total, evictions_total |

### Dashboard Guidelines

1. **Use red/green/yellow thresholds**: Clear visual indicators
2. **Show trends**: Include time series, not just current values
3. **Link to runbooks**: Include links from alerts to documentation
4. **Set appropriate ranges**: Match to your SLOs
5. **Use meaningful names**: Service + metric description

## Alert Design

### Alert Quality

┌─────────────────────────────────────────────────────────────┐ │ Good Alert Characteristics │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ✓ Actionable: Clear what to do │ │ ✓ Timely: Fires before users notice │ │ ✓ Accurate: Low false positive rate │ │ ✓ Relevant: Indicates real problem │ │ ✓ Prioritized: Severity matches impact │ │ │ └─────────────────────────────────────────────────────────────┘


### Runbook Integration

```yaml
annotations:
  summary: "Database connection pool exhausted"
  description: "{{ $value }} active connections"
  runbook_url: "https://wiki.example.com/runbooks/db-connections"
  
# Runbook content
# # Database Connection Pool Exhaustion
# 
# ## Symptoms
# - High number of active connections
# - Applications timing out on DB queries
# 
# ## Impact
# - Failed database writes
# - User-facing errors
# 
# ## Resolution
# 1. Check for long-running queries
# 2. Identify and kill blocking sessions
# 3. Scale up connection pool if needed
```text

## Best Practices

1. **Start with SLOs**: Define what good looks like
2. **Alert on symptoms**: Not causes
3. **Use golden signals**: Latency, traffic, errors, saturation
4. **Keep dashboards focused**: One per service
5. **Automate remediation**: Where possible

## Conclusion

Observability is essential for operating distributed systems. Structured logging provides detailed records for debugging. Metrics offer quantitative understanding of system behavior. Traces reveal request flow across services. Together with alerting, they enable proactive monitoring and rapid incident response.

Key practices: implement all three pillars of observability, use consistent context propagation, establish meaningful alert thresholds, and invest in dashboards that provide actionable insights. Observability is not a feature but an operational capability.

## Resources

- "Observability Engineering" by Charity Majors
- Prometheus Documentation
- OpenTelemetry Documentation
- "Site Reliability Engineering" by Google

Comments

👍 Was this article helpful?