Skip to main content
โšก Calmops

Distributed Tracing with OpenTelemetry and Jaeger

Introduction

In microservices architectures, a single user request can flow through dozens of services. When something goes wrong, understanding what happened across this distributed system becomes a significant challenge. Distributed tracing provides visibility into the path each request takes, making it possible to identify bottlenecks, diagnose errors, and optimize performance.

In 2026, OpenTelemetry has emerged as the universal standard for observability, providing vendor-neutral APIs, SDKs, and tools for collecting traces, metrics, and logs. This comprehensive guide explores distributed tracing in depth, covering OpenTelemetry fundamentals, Jaeger implementation, trace analysis, and building observable distributed systems.

Understanding Distributed Tracing

Distributed tracing tracks a request as it moves through multiple services, capturing timing information, errors, and key events along the way.

Core Concepts

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Distributed Tracing Concepts                      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                      โ”‚
โ”‚  Trace:                                                             โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚              Single request across services                   โ”‚   โ”‚
โ”‚  โ”‚                                                              โ”‚   โ”‚
โ”‚  โ”‚  [API Gateway] โ”€โ”€โ–ถ [Auth Service] โ”€โ”€โ–ถ [Order Service]      โ”‚   โ”‚
โ”‚  โ”‚       โ”‚                 โ”‚                   โ”‚                 โ”‚   โ”‚
โ”‚  โ”‚   Span 1.1          Span 2.1            Span 3.1             โ”‚   โ”‚
โ”‚  โ”‚       โ”‚                 โ”‚                   โ”‚                 โ”‚   โ”‚
โ”‚  โ”‚   Span 1.2          Span 2.2            Span 3.2             โ”‚   โ”‚
โ”‚  โ”‚                      โ”‚                   โ”‚โ”€โ”€โ–ถ [Payment Svc]  โ”‚   โ”‚
โ”‚  โ”‚                      โ”‚                        โ”‚ Span 4.1      โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                      โ”‚
โ”‚  Span:                                                              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                          โ”‚
โ”‚  โ”‚  operation: GET /api/orders          โ”‚                          โ”‚
โ”‚  โ”‚  service: order-service             โ”‚                          โ”‚
โ”‚  โ”‚  duration: 150ms                    โ”‚                          โ”‚
โ”‚  โ”‚  start: 2026-01-15T10:00:00.000Z   โ”‚                          โ”‚
โ”‚  โ”‚  end:   2026-01-15T10:00:00.150Z   โ”‚                          โ”‚
โ”‚  โ”‚                                      โ”‚                          โ”‚
โ”‚  โ”‚  Tags:                               โ”‚                          โ”‚
โ”‚  โ”‚    http.method: GET                  โ”‚                          โ”‚
โ”‚  โ”‚    http.url: /api/orders            โ”‚                          โ”‚
โ”‚  โ”‚    http.status_code: 200            โ”‚                          โ”‚
โ”‚  โ”‚    error: false                     โ”‚                          โ”‚
โ”‚  โ”‚                                      โ”‚                          โ”‚
โ”‚  โ”‚  Events:                             โ”‚                          โ”‚
โ”‚  โ”‚    - db.query.started (10ms)        โ”‚                          โ”‚
โ”‚  โ”‚    - db.query.completed (50ms)       โ”‚                          โ”‚
โ”‚  โ”‚    - cache.miss (60ms)               โ”‚                          โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                          โ”‚
โ”‚                                                                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Trace Data Model

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
import uuid

@dataclass
class SpanContext:
    trace_id: str
    span_id: str
    trace_flags: int = 1
    trace_state: Optional[Dict] = None

@dataclass
class SpanKind:
    INTERNAL = 0
    SERVER = 1
    CLIENT = 2
    PRODUCER = 3
    CONSUMER = 4

@dataclass
class SpanStatus:
    code: int
    message: Optional[str] = None
    
    OK = 0
    ERROR = 1
    UNSET = -1

@dataclass
class SpanEvent:
    name: str
    timestamp: datetime
    attributes: Dict[str, Any] = field(default_factory=dict)

@dataclass
class Span:
    name: str
    context: SpanContext
    parent_id: Optional[str] = None
    kind: SpanKind = SpanKind.INTERNAL
    start_time: datetime = field(default_factory=datetime.utcnow)
    end_time: Optional[datetime] = None
    attributes: Dict[str, Any] = field(default_factory=dict)
    events: List[SpanEvent] = field(default_factory=list)
    status: SpanStatus = field(default_factory=lambda: SpanStatus(SpanStatus.UNSET))
    
    def set_attribute(self, key: str, value: Any):
        self.attributes[key] = value
    
    def add_event(self, name: str, attributes: Dict = None):
        event = SpanEvent(
            name=name,
            timestamp=datetime.utcnow(),
            attributes=attributes or {}
        )
        self.events.append(event)
    
    def set_status(self, code: int, message: str = None):
        self.status = SpanStatus(code, message)
    
    def end(self, end_time: datetime = None):
        self.end_time = end_time or datetime.utcnow()
    
    @property
    def duration(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time).total_seconds() * 1000
        return 0

@dataclass
class Trace:
    spans: List[Span] = field(default_factory=list)
    
    def add_span(self, span: Span):
        self.spans.append(span)
    
    def get_root_span(self) -> Optional[Span]:
        root_spans = [s for s in self.spans if s.parent_id is None]
        return root_spans[0] if root_spans else None
    
    def get_spans_by_service(self, service_name: str) -> List[Span]:
        return [s for s in self.spans if s.attributes.get('service.name') == service_name]

OpenTelemetry Fundamentals

OpenTelemetry provides a unified standard for collecting observability data. It consists of APIs, SDKs, and components for instrumenting applications.

Setting Up OpenTelemetry

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
    JaegerExporter
)
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.extension.aws.resource_lambda import LambdaResourceDetector

class OpenTelemetrySetup:
    def __init__(self, service_name: str, jaeger_endpoint: str = None):
        self.service_name = service_name
        self.jaeger_endpoint = jaeger_endpoint
    
    def setup(self):
        resource = Resource(attributes={
            SERVICE_NAME: self.service_name,
            "service.version": "1.0.0",
            "deployment.environment": "production"
        })
        
        provider = TracerProvider(resource=resource)
        
        provider.add_span_processor(
            BatchSpanProcessor(
                ConsoleSpanExporter()
            )
        )
        
        if self.jaeger_endpoint:
            provider.add_span_processor(
                BatchSpanProcessor(
                    JaegerExporter(
                        agent_host_name=self.jaeger_endpoint,
                        agent_port=6831
                    )
                )
            )
        
        trace.set_tracer_provider(provider)
        
        return trace.get_tracer(self.service_name)

class AWSLambdaOTelSetup:
    @staticmethod
    def setup():
        from opentelemetry import trace
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import BatchSpanProcessor
        from opentelemetry.aws.resource.lambda import LambdaResource
        from opentelemetry.sdk.extension.aws.trace import AWSXRayIdGenerator
        
        resource = LambdaResource.create(
            cloud_platform="aws.lambda",
            aws={"aws": {"log_group": "my-log-group"}}
        )
        
        provider = TracerProvider(
            resource=resource,
            id_generator=AWSXRayIdGenerator()
        )
        
        trace.set_tracer_provider(provider)
        
        return trace.get_tracer_provider()

Manual Instrumentation

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.trace.propagation import set_span_in_context
from opentelemetry.context import Context
import functools

tracer = trace.get_tracer(__name__)

class TracingDecorator:
    @staticmethod
    def traced_function(span_name: str = None, attributes: Dict = None):
        def decorator(func):
            @functools.wraps(func)
            async def async_wrapper(*args, **kwargs):
                name = span_name or func.__name__
                
                with tracer.start_as_current_span(
                    name,
                    attributes=attributes or {}
                ) as span:
                    try:
                        span.set_attribute("function.name", func.__name__)
                        span.set_attribute("function.module", func.__module__)
                        
                        result = await func(*args, **kwargs)
                        
                        span.set_status(Status(StatusCode.OK))
                        return result
                    
                    except Exception as e:
                        span.set_status(Status(StatusCode.ERROR, str(e)))
                        span.record_exception(e)
                        raise
            
            @functools.wraps(func)
            def sync_wrapper(*args, **kwargs):
                name = span_name or func.__name__
                
                with tracer.start_as_current_span(
                    name,
                    attributes=attributes or {}
                ) as span:
                    try:
                        span.set_attribute("function.name", func.__name__)
                        span.set_attribute("function.module", func.__module__)
                        
                        result = func(*args, **kwargs)
                        
                        span.set_status(Status(StatusCode.OK))
                        return result
                    
                    except Exception as e:
                        span.set_status(Status(StatusCode.ERROR, str(e)))
                        span.record_exception(e)
                        raise
            
            return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
        
        return decorator
    
    @staticmethod
    def create_span(name: str, kind: trace.SpanKind = trace.SpanKind.INTERNAL):
        return tracer.start_span(name, kind=kind)

class BusinessOperationTracing:
    def __init__(self, tracer):
        self.tracer = tracer
    
    async def process_order(self, order_id: str):
        with self.tracer.start_as_current_span(
            "process_order",
            kind=trace.SpanKind.SERVER,
            attributes={
                "order.id": order_id,
                "order.operation": "process"
            }
        ) as span:
            span.add_event("order_received", {"order_id": order_id})
            
            validated = await self.validate_order(order_id)
            
            inventory_reserved = await self.reserve_inventory(order_id)
            span.add_event("inventory_reserved", {"success": inventory_reserved})
            
            if not inventory_reserved:
                span.set_status(Status(StatusCode.ERROR), "Inventory reservation failed")
                return {"success": False, "error": "Inventory unavailable"}
            
            payment_processed = await self.process_payment(order_id)
            span.add_event("payment_processed", {"success": payment_processed})
            
            if not payment_processed:
                span.set_status(Status(StatusCode.ERROR), "Payment failed")
                await self.rollback_inventory(order_id)
                return {"success": False, "error": "Payment failed"}
            
            await self.confirm_order(order_id)
            
            span.set_attribute("order.status", "completed")
            
            return {"success": True, "order_id": order_id}
    
    async def validate_order(self, order_id: str):
        with self.tracer.start_as_current_span("validate_order"):
            return True
    
    async def reserve_inventory(self, order_id: str):
        with self.tracer.start_as_current_span("reserve_inventory"):
            return True
    
    async def process_payment(self, order_id: str):
        with self.tracer.start_as_current_span("process_payment"):
            return True
    
    async def rollback_inventory(self, order_id: str):
        with self.tracer.start_as_current_span("rollback_inventory"):
            pass
    
    async def confirm_order(self, order_id: str):
        with self.tracer.start_as_current_span("confirm_order"):
            pass

Automatic Instrumentation

from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.django import DjangoInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.pymongo import PymongoInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

class InstrumentationSetup:
    @staticmethod
    def instrument_flask(app):
        FlaskInstrumentor().instrument_app(app)
    
    @staticmethod
    def instrument_django():
        DjangoInstrumentor().instrument()
    
    @staticmethod
    def instrument_requests():
        RequestsInstrumentor().instrument()
    
    @staticmethod
    def instrument_httpx():
        HTTPXClientInstrumentor().instrument()
    
    @staticmethod
    def instrument_pymongo(mongo_client):
        PymongoInstrumentor().instrument_client(mongo_client)
    
    @staticmethod
    def instrument_redis(redis_client):
        RedisInstrumentor().instrument()
    
    @staticmethod
    def instrument_sqlalchemy(engine):
        SQLAlchemyInstrumentor().instrument_engine(engine)
    
    @staticmethod
    def instrument_all(app=None, mongo_client=None, redis_client=None, engine=None):
        if app:
            FlaskInstrumentor().instrument_app(app)
        
        RequestsInstrumentor().instrument()
        HTTPXClientInstrumentor().instrument()
        
        if redis_client:
            RedisInstrumentor().instrument()
        
        if engine:
            SQLAlchemyInstrumentor().instrument_engine(engine)

Custom Span Exporters

from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.trace import ReadableSpan

class CustomSpanExporter(SpanExporter):
    def __init__(self, backend_url: str, api_key: str):
        self.backend_url = backend_url
        self.api_key = api_key
        self.client = httpx.AsyncClient()
    
    def export(self, spans: List[ReadableSpan]) -> SpanExportResult:
        asyncio.create_task(self._send_spans(spans))
        return SpanExportResult.SUCCESS
    
    async def _send_spans(self, spans: List[ReadableSpan]):
        span_data = []
        
        for span in spans:
            span_data.append({
                "name": span.name,
                "trace_id": format(span.context.trace_id, '032x'),
                "span_id": format(span.context.span_id, '016x'),
                "start_time": span.start_time,
                "end_time": span.end_time,
                "attributes": dict(span.attributes),
                "events": [
                    {"name": e.name, "timestamp": e.timestamp}
                    for e in span.events
                ],
                "status": {
                    "code": span.status.code,
                    "description": span.status.description
                }
            })
        
        try:
            await self.client.post(
                f"{self.backend_url}/api/v1/traces",
                json={"spans": span_data},
                headers={"Authorization": f"Bearer {self.api_key}"}
            )
        except Exception as e:
            print(f"Failed to export spans: {e}")
    
    def shutdown(self):
        asyncio.get_event_loop().run_until_complete(self.client.aclose())

class LoggingSpanExporter(SpanExporter):
    def __init__(self, logger: logging.Logger):
        self.logger = logger
    
    def export(self, spans: List[ReadableSpan]) -> SpanExportResult:
        for span in spans:
            self.logger.info(
                f"Trace: {format(span.context.trace_id, '032x')} | "
                f"Span: {span.name} | "
                f"Duration: {span.end_time - span.start_time}ms | "
                f"Attributes: {dict(span.attributes)}"
            )
        
        return SpanExportResult.SUCCESS
    
    def shutdown(self):
        pass

Jaeger Implementation

Jaeger is a popular open-source distributed tracing system. It provides a complete solution for storing, querying, and visualizing traces.

Jaeger Collector

from jaeger_client import Tracer, Config
from jaeger_client.reporter import Reporter
from jaeger_client.sampler import ProbabilisticSampler, RateLimitingSampler, ConstSampler
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

class JaegerConfig:
    @staticmethod
    def create_tracer(
        service_name: str,
        agent_host: str = "localhost",
        agent_port: int = 6831,
        sampler_type: str = "const",
        sampler_param: float = 1.0
    ):
        sampler_config = {
            "type": sampler_type,
            "param": sampler_param
        }
        
        reporter_config = {
            "log_spans": True,
            "max_queue_size": 10000,
            "flush_interval": 1.0
        }
        
        config = Config(
            service_name=service_name,
            sampler_config=sampler_config,
            reporter_config=reporter_config,
            metrics_prefix=service_name
        )
        
        return config.initialize_tracer()

class JaegerSampler:
    @staticmethod
    def probabilistic(probability: float = 0.1):
        return ProbabilisticSampler(probability)
    
    @staticmethod
    def rate_limiting(max_traces_per_second: int = 10):
        return RateLimitingSampler(max_traces_per_second)
    
    @staticmethod
    def const(decision: bool = True):
        return ConstSampler(decision)
    
    @staticmethod
    def adaptive():
        return {
            "strategy": "adaptive",
            "sampling_rate": 0.1
        }

class JaegerReporter:
    @staticmethod
    def create_remote_reporter(agent_host: str = "localhost", agent_port: int = 6831):
        return Reporter(
            agent_host_name=agent_host,
            agent_port=agent_port,
            log_spans=True,
            max_queue_size=10000,
            flush_interval=1.0
        )
    
    @staticmethod
    def create_composite_reporter(
        remote_host: str = "localhost",
        remote_port: int = 6831,
        log_file: str = None
    ):
        reporters = []
        
        reporters.append(
            Reporter(
                agent_host_name=remote_host,
                agent_port=remote_port
            )
        )
        
        if log_file:
            from jaeger_client.reporter import LogReporter
            reporters.append(LogReporter(log_file))
        
        from jaeger_client.reporter import CompositeReporter
        return CompositeReporter(reporters)

Jaeger Query Client

from jaeger_client import Query
import requests
from typing import List, Dict, Optional
from datetime import datetime, timedelta

class JaegerQueryClient:
    def __init__(self, jaeger_url: str = "http://localhost:16686"):
        self.jaeger_url = jaeger_url
        self.api_url = f"{jaeger_url}/api"
    
    def find_traces(
        self,
        service: str,
        operation: str = None,
        start_time: datetime = None,
        end_time: datetime = None,
        tags: Dict = None,
        limit: int = 100
    ) -> List[Dict]:
        params = {
            "service": service,
            "limit": limit
        }
        
        if operation:
            params["operation"] = operation
        
        if start_time:
            params["start"] = int(start_time.timestamp() * 1000)
        
        if end_time:
            params["end"] = int(end_time.timestamp() * 1000)
        
        if tags:
            params["tags"] = json.dumps(tags)
        
        response = requests.get(f"{self.api_url}/traces", params=params)
        response.raise_for_status()
        
        return response.json().get("data", [])
    
    def get_trace(self, trace_id: str) -> Dict:
        response = requests.get(f"{self.api_url}/trace/{trace_id}")
        response.raise_for_status()
        
        return response.json()
    
    def get_service_operations(self, service: str) -> List[str]:
        response = requests.get(f"{self.api_url}/services/{service}/operations")
        response.raise_for_status()
        
        data = response.json()
        return data.get("data", [])
    
    def get_services(self) -> List[str]:
        response = requests.get(f"{self.api_url}/services")
        response.raise_for_status()
        
        data = response.json()
        return data.get("data", [])
    
    def get_dependencies(self, end_time: datetime = None, lookback: str = "1h") -> List[Dict]:
        params = {"lookback": lookback}
        
        if end_time:
            params["end_time"] = int(end_time.timestamp() * 1000)
        
        response = requests.get(f"{self.api_url}/dependencies", params=params)
        response.raise_for_status()
        
        return response.json().get("data", [])

Analyzing Traces

class TraceAnalyzer:
    def __init__(self, query_client: JaegerQueryClient):
        self.client = query_client
    
    def analyze_latency(
        self,
        service: str,
        operation: str = None,
        time_range: timedelta = timedelta(hours=1)
    ) -> Dict:
        end_time = datetime.utcnow()
        start_time = end_time - time_range
        
        traces = self.client.find_traces(
            service=service,
            operation=operation,
            start_time=start_time,
            end_time=end_time
        )
        
        if not traces:
            return {}
        
        durations = []
        
        for trace in traces:
            root_span = next(
                (s for s in trace.get("spans", []) if s.get("parentSpanID") == ""),
                None
            )
            
            if root_span:
                duration = root_span.get("duration", 0)
                durations.append(duration)
        
        if not durations:
            return {}
        
        durations.sort()
        
        return {
            "count": len(durations),
            "min": min(durations),
            "max": max(durations),
            "mean": sum(durations) / len(durations),
            "p50": durations[len(durations) // 2],
            "p95": durations[int(len(durations) * 0.95)],
            "p99": durations[int(len(durations) * 0.99)]
        }
    
    def find_error_traces(
        self,
        service: str,
        time_range: timedelta = timedelta(hours=1)
    ) -> List[Dict]:
        end_time = datetime.utcnow()
        start_time = end_time - time_range
        
        traces = self.client.find_traces(
            service=service,
            tags={"error": "true"},
            start_time=start_time,
            end_time=end_time
        )
        
        return traces
    
    def analyze_dependencies(
        self,
        time_range: timedelta = timedelta(hours=1)
    ) -> Dict:
        end_time = datetime.utcnow()
        
        dependencies = self.client.get_dependencies(
            end_time=end_time,
            lookback=f"{time_range.seconds // 3600}h"
        )
        
        service_map = {}
        
        for dep in dependencies:
            source = dep.get("client", {}).get("serviceName", "unknown")
            target = dep.get("server", {}).get("serviceName", "unknown")
            calls = dep.get("callCount", 0)
            
            key = f"{source}->{target}"
            service_map[key] = {
                "source": source,
                "target": target,
                "calls": calls
            }
        
        return service_map
    
    def find_slow_operations(
        self,
        service: str,
        time_range: timedelta = timedelta(hours=1),
        threshold_ms: int = 1000
    ) -> List[Dict]:
        end_time = datetime.utcnow()
        start_time = end_time - time_range
        
        traces = self.client.find_traces(
            service=service,
            start_time=start_time,
            end_time=end_time
        )
        
        slow_operations = []
        
        for trace in traces:
            for span in trace.get("spans", []):
                duration = span.get("duration", 0)
                
                if duration > threshold_ms:
                    slow_operations.append({
                        "operation": span.get("operationName"),
                        "duration": duration,
                        "trace_id": trace.get("traceID"),
                        "start_time": span.get("startTime")
                    })
        
        return sorted(
            slow_operations,
            key=lambda x: x["duration"],
            reverse=True
        )[:20]

Context Propagation

Context propagation links spans across service boundaries, enabling end-to-end trace correlation.

W3C Trace Context

from opentelemetry import context
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace.propagation import SpanContext
from opentelemetry.trace.propagation import set_global_textmap
from opentelemetry.propagate import inject, extract
from opentelemetry.propagation import TextMapPropagator

class W3CTraceContextPropagator:
    def __init__(self):
        self.propagator = W3CPropagator()
        set_global_textmap(self.propagator)
    
    def inject(self, carrier: Dict):
        inject(carrier)
    
    def extract(self, carrier: Dict):
        return extract(carrier)

class CustomPropagator(TextMapPropagator):
    def inject(self, set_in_carrier, context: Context):
        span = trace.get_span(context)
        
        if span:
            trace_id = format(span.context.trace_id, '032x')
            span_id = format(span.context.span_id, '016x')
            
            set_in_carrier["x-trace-id"] = trace_id
            set_in_carrier["x-span-id"] = span_id
    
    def extract(self, get_from_carrier, context: Context) -> Context:
        trace_id = get_from_carrier("x-trace-id")
        span_id = get_from_carrier("x-span-id")
        
        if trace_id and span_id:
            span_context = SpanContext(
                trace_id=int(trace_id, 16),
                span_id=int(span_id, 16)
            )
            
            span = NonRecordingSpan(span_context)
            return set_span_in_context(span, context)
        
        return context

def inject_trace_context(headers: Dict):
    inject(headers)

def extract_trace_context(headers: Dict):
    return extract(headers)

HTTP Context Propagation

import httpx

class TracedHTTPClient:
    def __init__(self, base_url: str = None):
        self.client = httpx.Client(base_url=base_url)
    
    def get(self, url: str, headers: Dict = None):
        headers = headers or {}
        inject(headers)
        
        with tracer.start_as_current_span("http_get") as span:
            span.set_attribute("http.method", "GET")
            span.set_attribute("http.url", url)
            
            response = self.client.get(url, headers=headers)
            
            span.set_attribute("http.status_code", response.status_code)
            
            return response
    
    def post(self, url: str, data: Dict = None, headers: Dict = None):
        headers = headers or {}
        inject(headers)
        
        with tracer.start_as_current_span("http_post") as span:
            span.set_attribute("http.method", "POST")
            span.set_attribute("http.url", url)
            
            response = self.client.post(url, json=data, headers=headers)
            
            span.set_attribute("http.status_code", response.status_code)
            
            return response

class TracedHTTPHandler:
    def __init__(self):
        self.tracer = trace.get_tracer(__name__)
    
    def extract_and_create_span(self, request):
        ctx = extract(request.headers)
        
        with self.tracer.start_as_current_span(
            request.url.path,
            context=ctx,
            kind=trace.SpanKind.SERVER
        ) as span:
            span.set_attribute("http.method", request.method)
            span.set_attribute("http.url", str(request.url))
            span.set_attribute("http.host", request.url.netloc)
            span.set_attribute("http.user_agent", request.headers.get("user-agent", ""))
            
            return span

Messaging System Propagation

from opentelemetry import context
from opentelemetry.trace.propagation import SpanContext

class KafkaContextPropagation:
    @staticmethod
    def inject_producer_headers(message_headers: Dict):
        inject(message_headers)
    
    @staticmethod
    def extract_consumer_headers(message_headers: Dict) -> context.Context:
        return extract(message_headers)

class RabbitMQContextPropagation:
    @staticmethod
    def inject_message_properties(properties, message):
        inject(properties)
    
    @staticmethod
    def extract_message_properties(properties) -> context.Context:
        return extract(properties)

class AsyncMessagingTracing:
    def __init__(self, tracer):
        self.tracer = tracer
    
    async def send_message(self, topic: str, message: Dict, headers: Dict = None):
        headers = headers or {}
        inject(headers)
        
        with self.tracer.start_as_current_span(
            f"send to {topic}",
            kind=trace.SpanKind.PRODUCER
        ) as span:
            span.set_attribute("messaging.system", "kafka")
            span.set_attribute("messaging.destination", topic)
            
            await self._send_to_kafka(topic, message, headers)
    
    async def receive_message(self, topic: str, message: Dict, headers: Dict):
        ctx = extract(headers)
        
        with self.tracer.start_as_current_span(
            f"receive from {topic}",
            kind=trace.SpanKind.CONSUMER,
            context=ctx
        ) as span:
            span.set_attribute("messaging.system", "kafka")
            span.set_attribute("messaging.destination", topic)
            
            await self._process_message(message)
    
    async def _send_to_kafka(self, topic: str, message: Dict, headers: Dict):
        pass
    
    async def _process_message(self, message: Dict):
        pass

Sampling Strategies

Sampling reduces the volume of traces collected while maintaining representative data.

Sampling Strategies

from opentelemetry.sdk.trace.sampling import (
    Sampler,
    SamplingResult,
    Decision,
    ParentBased,
    TraceIdRatioBased,
    RateLimitingSampler
)

class CustomSampler(Sampler):
    def __init__(self, sample_rate: float = 0.1):
        self.sample_rate = sample_rate
        self.trace_id_upper_bound = sample_rate * (2 ** 31)
    
    def should_sample(
        self,
        parent_context: context.Context = None,
        trace_id: int = None,
        name: str = None,
        attributes: Dict = None
    ) -> SamplingResult:
        trace_id_lower = trace_id & (2 ** 31 - 1)
        
        if trace_id_lower < self.trace_id_upper_bound:
            return SamplingResult(
                Decision.RECORD_AND_SPAN,
                attributes or {}
            )
        
        return SamplingResult(
            Decision.DROP,
            {}
        )
    
    @property
    def description(self) -> str:
        return f"CustomSampler({self.sample_rate})"

class SamplingStrategies:
    @staticmethod
    def probabilistic(rate: float = 0.01):
        return TraceIdRatioBased(rate)
    
    @staticmethod
    def rate_limited(max_traces_per_second: int = 10):
        return RateLimitingSampler(max_traces_per_second)
    
    @staticmethod
    def parent_based(root: Sampler, remote_parent: Sampler = None):
        return ParentBased(
            root=root,
            remote_parent_sampler=remote_parent
        )
    
    @staticmethod
    def always_on():
        from opentelemetry.sdk.trace.sampling import AlwaysOnSampler
        return AlwaysOnSampler()
    
    @staticmethod
    def always_off():
        from opentelemetry.sdk.trace.sampling import AlwaysOffSampler
        return AlwaysOffSampler()
    
    @staticmethod
    def adaptive(sample_rate: float = 0.1, error_threshold: float = 0.05):
        return {
            "strategy": "adaptive",
            "sample_rate": sample_rate,
            "error_threshold": error_threshold
        }

Dynamic Sampling

class DynamicSampler:
    def __init__(self, base_rate: float = 0.01):
        self.base_rate = base_rate
        self.error_rates = {}
        self.latency_rates = {}
    
    def should_sample(
        self,
        service: str,
        operation: str,
        has_error: bool,
        latency_ms: float
    ) -> bool:
        key = f"{service}:{operation}"
        
        error_rate = self.error_rates.get(key, 0)
        
        if error_rate > 0.1:
            return True
        
        latency_rate = self.latency_rates.get(key, 0)
        
        if latency_rate > 1000:
            return True
        
        import random
        return random.random() < self.base_rate
    
    def update_metrics(
        self,
        service: str,
        operation: str,
        has_error: bool,
        latency_ms: float
    ):
        key = f"{service}:{operation}"
        
        if has_error:
            current = self.error_rates.get(key, 0)
            self.error_rates[key] = current * 0.9 + 0.1
        else:
            current = self.error_rates.get(key, 0)
            self.error_rates[key] = current * 0.9
        
        current_latency = self.latency_rates.get(key, 0)
        self.latency_rates[key] = current_latency * 0.9 + latency_ms * 0.1

Visualization and Analysis

Building Trace Dashboards

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.express as px
import pandas as pd

class TraceDashboard:
    def __init__(self, jaeger_client: JaegerQueryClient):
        self.client = jaeger_client
        self.app = dash.Dash(__name__)
        self._setup_layout()
        self._setup_callbacks()
    
    def _setup_layout(self):
        self.app.layout = html.Div([
            html.H1("Distributed Tracing Dashboard"),
            
            html.Div([
                html.Label("Service:"),
                dcc.Dropdown(
                    id="service-selector",
                    options=self._get_services(),
                    value="api-gateway"
                ),
                
                html.Label("Time Range:"),
                dcc.Dropdown(
                    id="time-range",
                    options=[
                        {"label": "Last 15 minutes", "value": "15m"},
                        {"label": "Last hour", "value": "1h"},
                        {"label": "Last 24 hours", "value": "24h"}
                    ],
                    value="1h"
                )
            ]),
            
            html.Div([
                dcc.Graph(id="latency-chart"),
                dcc.Graph(id="error-chart"),
                dcc.Graph(id="dependency-graph")
            ]),
            
            dcc.Interval(
                id="refresh-interval",
                interval=60000,
                n_intervals=0
            )
        ])
    
    def _setup_callbacks(self):
        @self.app.callback(
            [Output("latency-chart", "figure"),
             Output("error-chart", "figure"),
             Output("dependency-graph", "figure")],
            [Input("service-selector", "value"),
             Input("time-range", "value"),
             Input("refresh-interval", "n_intervals")]
        )
        def update_charts(service, time_range, n):
            latency_data = self._get_latency_data(service, time_range)
            error_data = self._get_error_data(service, time_range)
            dep_data = self._get_dependency_data(time_range)
            
            latency_fig = px.line(
                latency_data,
                x="timestamp",
                y="p95",
                title=f"Latency - {service}"
            )
            
            error_fig = px.bar(
                error_data,
                x="operation",
                y="error_count",
                title=f"Errors - {service}"
            )
            
            dep_fig = px.bar(
                dep_data,
                x="source",
                y="calls",
                barmode="group"
            )
            
            return latency_fig, error_fig, dep_fig
    
    def _get_services(self):
        services = self.client.get_services()
        return [{"label": s, "value": s} for s in services]
    
    def _get_latency_data(self, service: str, time_range: str):
        return pd.DataFrame()
    
    def _get_error_data(self, service: str, time_range: str):
        return pd.DataFrame()
    
    def _get_dependency_data(self, time_range: str):
        return pd.DataFrame()
    
    def run(self, debug: bool = True, port: int = 8050):
        self.app.run_server(debug=debug, port=port)

Best Practices

OBSERVABILITY_BEST_PRACTICES = {
    "instrumentation": [
        "Instrument at component boundaries (HTTP, database, messaging)",
        "Add business context to spans",
        "Use consistent naming conventions",
        "Include error information in spans",
        "Add relevant tags for filtering"
    ],
    
    "sampling": [
        "Use head sampling for high-volume services",
        "Use tail sampling to capture errors and slow traces",
        "Adjust sampling rates based on load",
        "Ensure representative sampling across services"
    ],
    
    "context_propagation": [
        "Propagate context across all service boundaries",
        "Use standard propagation formats (W3C)",
        "Handle missing context gracefully",
        "Ensure backward compatibility"
    ],
    
    "storage": [
        "Set appropriate retention periods",
        "Implement tiered storage (hot/warm/cold)",
        "Monitor storage costs and growth",
        "Plan for disaster recovery"
    ],
    
    "analysis": [
        "Create dashboards for key metrics",
        "Set up alerts for error rates and latency",
        "Regularly review slow traces",
        "Document investigation procedures"
    ]
}

Integration Examples

Flask Integration

from flask import Flask, request
from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.trace.propagation import set_span_in_context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

def create_traced_app(service_name: str, jaeger_endpoint: str):
    app = Flask(__name__)
    
    tracer_provider = TracerProvider()
    tracer_provider.add_span_processor(
        BatchSpanProcessor(
            JaegerExporter(
                agent_host_name=jaeger_endpoint,
                agent_port=6831
            )
        )
    )
    
    trace.set_tracer_provider(tracer_provider)
    
    FlaskInstrumentor().instrument_app(app)
    
    @app.route("/api/<path>")
    def api_handler(path):
        with trace.get_tracer(__name__).start_as_current_span(path) as span:
            span.set_attribute("http.method", request.method)
            span.set_attribute("http.url", request.url)
            span.set_attribute("http.target", path)
            
            return {"status": "ok"}
    
    return app

Django Integration

from opentelemetry.instrumentation.django import DjangoInstrumentor

class DjangoOTelMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response
        self.tracer = trace.get_tracer(__name__)
    
    def __call__(self, request):
        with self.tracer.start_as_current_span(
            request.path,
            kind=trace.SpanKind.SERVER
        ) as span:
            span.set_attribute("http.method", request.method)
            span.set_attribute("http.url", request.get_full_path())
            span.set_attribute("http.host", request.get_host())
            
            response = self.get_response(request)
            
            span.set_attribute("http.status_code", response.status_code)
            
            return response

class Settings:
    OPENTELEMETRY = {
        "SERVICE_NAME": "my-django-app",
        "JAEGER_ENDPOINT": "http://localhost:6831",
        "SAMPLER": {
            "type": "probabilistic",
            "param": 0.1
        }
    }

Resources

Conclusion

Distributed tracing with OpenTelemetry and Jaeger provides the visibility needed to understand and optimize modern microservices architectures. By implementing comprehensive tracing, you can quickly identify performance bottlenecks, diagnose errors, and make data-driven decisions about your system’s architecture.

This guide covered the fundamentals of distributed tracing, OpenTelemetry implementation, Jaeger integration, context propagation, sampling strategies, and practical applications. With these tools and techniques, you can build observable systems that provide deep insights into behavior across your entire infrastructure.

Comments