Introduction
In microservices architectures, a single user request can flow through dozens of services. When something goes wrong, understanding what happened across this distributed system becomes a significant challenge. Distributed tracing provides visibility into the path each request takes, making it possible to identify bottlenecks, diagnose errors, and optimize performance.
In 2026, OpenTelemetry has emerged as the universal standard for observability, providing vendor-neutral APIs, SDKs, and tools for collecting traces, metrics, and logs. This comprehensive guide explores distributed tracing in depth, covering OpenTelemetry fundamentals, Jaeger implementation, trace analysis, and building observable distributed systems.
Understanding Distributed Tracing
Distributed tracing tracks a request as it moves through multiple services, capturing timing information, errors, and key events along the way.
Core Concepts
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Distributed Tracing Concepts โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Trace: โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Single request across services โ โ
โ โ โ โ
โ โ [API Gateway] โโโถ [Auth Service] โโโถ [Order Service] โ โ
โ โ โ โ โ โ โ
โ โ Span 1.1 Span 2.1 Span 3.1 โ โ
โ โ โ โ โ โ โ
โ โ Span 1.2 Span 2.2 Span 3.2 โ โ
โ โ โ โโโโถ [Payment Svc] โ โ
โ โ โ โ Span 4.1 โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Span: โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ operation: GET /api/orders โ โ
โ โ service: order-service โ โ
โ โ duration: 150ms โ โ
โ โ start: 2026-01-15T10:00:00.000Z โ โ
โ โ end: 2026-01-15T10:00:00.150Z โ โ
โ โ โ โ
โ โ Tags: โ โ
โ โ http.method: GET โ โ
โ โ http.url: /api/orders โ โ
โ โ http.status_code: 200 โ โ
โ โ error: false โ โ
โ โ โ โ
โ โ Events: โ โ
โ โ - db.query.started (10ms) โ โ
โ โ - db.query.completed (50ms) โ โ
โ โ - cache.miss (60ms) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Trace Data Model
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
import uuid
@dataclass
class SpanContext:
trace_id: str
span_id: str
trace_flags: int = 1
trace_state: Optional[Dict] = None
@dataclass
class SpanKind:
INTERNAL = 0
SERVER = 1
CLIENT = 2
PRODUCER = 3
CONSUMER = 4
@dataclass
class SpanStatus:
code: int
message: Optional[str] = None
OK = 0
ERROR = 1
UNSET = -1
@dataclass
class SpanEvent:
name: str
timestamp: datetime
attributes: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Span:
name: str
context: SpanContext
parent_id: Optional[str] = None
kind: SpanKind = SpanKind.INTERNAL
start_time: datetime = field(default_factory=datetime.utcnow)
end_time: Optional[datetime] = None
attributes: Dict[str, Any] = field(default_factory=dict)
events: List[SpanEvent] = field(default_factory=list)
status: SpanStatus = field(default_factory=lambda: SpanStatus(SpanStatus.UNSET))
def set_attribute(self, key: str, value: Any):
self.attributes[key] = value
def add_event(self, name: str, attributes: Dict = None):
event = SpanEvent(
name=name,
timestamp=datetime.utcnow(),
attributes=attributes or {}
)
self.events.append(event)
def set_status(self, code: int, message: str = None):
self.status = SpanStatus(code, message)
def end(self, end_time: datetime = None):
self.end_time = end_time or datetime.utcnow()
@property
def duration(self) -> float:
if self.end_time:
return (self.end_time - self.start_time).total_seconds() * 1000
return 0
@dataclass
class Trace:
spans: List[Span] = field(default_factory=list)
def add_span(self, span: Span):
self.spans.append(span)
def get_root_span(self) -> Optional[Span]:
root_spans = [s for s in self.spans if s.parent_id is None]
return root_spans[0] if root_spans else None
def get_spans_by_service(self, service_name: str) -> List[Span]:
return [s for s in self.spans if s.attributes.get('service.name') == service_name]
OpenTelemetry Fundamentals
OpenTelemetry provides a unified standard for collecting observability data. It consists of APIs, SDKs, and components for instrumenting applications.
Setting Up OpenTelemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
JaegerExporter
)
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.extension.aws.resource_lambda import LambdaResourceDetector
class OpenTelemetrySetup:
def __init__(self, service_name: str, jaeger_endpoint: str = None):
self.service_name = service_name
self.jaeger_endpoint = jaeger_endpoint
def setup(self):
resource = Resource(attributes={
SERVICE_NAME: self.service_name,
"service.version": "1.0.0",
"deployment.environment": "production"
})
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(
ConsoleSpanExporter()
)
)
if self.jaeger_endpoint:
provider.add_span_processor(
BatchSpanProcessor(
JaegerExporter(
agent_host_name=self.jaeger_endpoint,
agent_port=6831
)
)
)
trace.set_tracer_provider(provider)
return trace.get_tracer(self.service_name)
class AWSLambdaOTelSetup:
@staticmethod
def setup():
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.aws.resource.lambda import LambdaResource
from opentelemetry.sdk.extension.aws.trace import AWSXRayIdGenerator
resource = LambdaResource.create(
cloud_platform="aws.lambda",
aws={"aws": {"log_group": "my-log-group"}}
)
provider = TracerProvider(
resource=resource,
id_generator=AWSXRayIdGenerator()
)
trace.set_tracer_provider(provider)
return trace.get_tracer_provider()
Manual Instrumentation
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.trace.propagation import set_span_in_context
from opentelemetry.context import Context
import functools
tracer = trace.get_tracer(__name__)
class TracingDecorator:
@staticmethod
def traced_function(span_name: str = None, attributes: Dict = None):
def decorator(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
name = span_name or func.__name__
with tracer.start_as_current_span(
name,
attributes=attributes or {}
) as span:
try:
span.set_attribute("function.name", func.__name__)
span.set_attribute("function.module", func.__module__)
result = await func(*args, **kwargs)
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
name = span_name or func.__name__
with tracer.start_as_current_span(
name,
attributes=attributes or {}
) as span:
try:
span.set_attribute("function.name", func.__name__)
span.set_attribute("function.module", func.__module__)
result = func(*args, **kwargs)
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
@staticmethod
def create_span(name: str, kind: trace.SpanKind = trace.SpanKind.INTERNAL):
return tracer.start_span(name, kind=kind)
class BusinessOperationTracing:
def __init__(self, tracer):
self.tracer = tracer
async def process_order(self, order_id: str):
with self.tracer.start_as_current_span(
"process_order",
kind=trace.SpanKind.SERVER,
attributes={
"order.id": order_id,
"order.operation": "process"
}
) as span:
span.add_event("order_received", {"order_id": order_id})
validated = await self.validate_order(order_id)
inventory_reserved = await self.reserve_inventory(order_id)
span.add_event("inventory_reserved", {"success": inventory_reserved})
if not inventory_reserved:
span.set_status(Status(StatusCode.ERROR), "Inventory reservation failed")
return {"success": False, "error": "Inventory unavailable"}
payment_processed = await self.process_payment(order_id)
span.add_event("payment_processed", {"success": payment_processed})
if not payment_processed:
span.set_status(Status(StatusCode.ERROR), "Payment failed")
await self.rollback_inventory(order_id)
return {"success": False, "error": "Payment failed"}
await self.confirm_order(order_id)
span.set_attribute("order.status", "completed")
return {"success": True, "order_id": order_id}
async def validate_order(self, order_id: str):
with self.tracer.start_as_current_span("validate_order"):
return True
async def reserve_inventory(self, order_id: str):
with self.tracer.start_as_current_span("reserve_inventory"):
return True
async def process_payment(self, order_id: str):
with self.tracer.start_as_current_span("process_payment"):
return True
async def rollback_inventory(self, order_id: str):
with self.tracer.start_as_current_span("rollback_inventory"):
pass
async def confirm_order(self, order_id: str):
with self.tracer.start_as_current_span("confirm_order"):
pass
Automatic Instrumentation
from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.django import DjangoInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.pymongo import PymongoInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
class InstrumentationSetup:
@staticmethod
def instrument_flask(app):
FlaskInstrumentor().instrument_app(app)
@staticmethod
def instrument_django():
DjangoInstrumentor().instrument()
@staticmethod
def instrument_requests():
RequestsInstrumentor().instrument()
@staticmethod
def instrument_httpx():
HTTPXClientInstrumentor().instrument()
@staticmethod
def instrument_pymongo(mongo_client):
PymongoInstrumentor().instrument_client(mongo_client)
@staticmethod
def instrument_redis(redis_client):
RedisInstrumentor().instrument()
@staticmethod
def instrument_sqlalchemy(engine):
SQLAlchemyInstrumentor().instrument_engine(engine)
@staticmethod
def instrument_all(app=None, mongo_client=None, redis_client=None, engine=None):
if app:
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
HTTPXClientInstrumentor().instrument()
if redis_client:
RedisInstrumentor().instrument()
if engine:
SQLAlchemyInstrumentor().instrument_engine(engine)
Custom Span Exporters
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.trace import ReadableSpan
class CustomSpanExporter(SpanExporter):
def __init__(self, backend_url: str, api_key: str):
self.backend_url = backend_url
self.api_key = api_key
self.client = httpx.AsyncClient()
def export(self, spans: List[ReadableSpan]) -> SpanExportResult:
asyncio.create_task(self._send_spans(spans))
return SpanExportResult.SUCCESS
async def _send_spans(self, spans: List[ReadableSpan]):
span_data = []
for span in spans:
span_data.append({
"name": span.name,
"trace_id": format(span.context.trace_id, '032x'),
"span_id": format(span.context.span_id, '016x'),
"start_time": span.start_time,
"end_time": span.end_time,
"attributes": dict(span.attributes),
"events": [
{"name": e.name, "timestamp": e.timestamp}
for e in span.events
],
"status": {
"code": span.status.code,
"description": span.status.description
}
})
try:
await self.client.post(
f"{self.backend_url}/api/v1/traces",
json={"spans": span_data},
headers={"Authorization": f"Bearer {self.api_key}"}
)
except Exception as e:
print(f"Failed to export spans: {e}")
def shutdown(self):
asyncio.get_event_loop().run_until_complete(self.client.aclose())
class LoggingSpanExporter(SpanExporter):
def __init__(self, logger: logging.Logger):
self.logger = logger
def export(self, spans: List[ReadableSpan]) -> SpanExportResult:
for span in spans:
self.logger.info(
f"Trace: {format(span.context.trace_id, '032x')} | "
f"Span: {span.name} | "
f"Duration: {span.end_time - span.start_time}ms | "
f"Attributes: {dict(span.attributes)}"
)
return SpanExportResult.SUCCESS
def shutdown(self):
pass
Jaeger Implementation
Jaeger is a popular open-source distributed tracing system. It provides a complete solution for storing, querying, and visualizing traces.
Jaeger Collector
from jaeger_client import Tracer, Config
from jaeger_client.reporter import Reporter
from jaeger_client.sampler import ProbabilisticSampler, RateLimitingSampler, ConstSampler
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
class JaegerConfig:
@staticmethod
def create_tracer(
service_name: str,
agent_host: str = "localhost",
agent_port: int = 6831,
sampler_type: str = "const",
sampler_param: float = 1.0
):
sampler_config = {
"type": sampler_type,
"param": sampler_param
}
reporter_config = {
"log_spans": True,
"max_queue_size": 10000,
"flush_interval": 1.0
}
config = Config(
service_name=service_name,
sampler_config=sampler_config,
reporter_config=reporter_config,
metrics_prefix=service_name
)
return config.initialize_tracer()
class JaegerSampler:
@staticmethod
def probabilistic(probability: float = 0.1):
return ProbabilisticSampler(probability)
@staticmethod
def rate_limiting(max_traces_per_second: int = 10):
return RateLimitingSampler(max_traces_per_second)
@staticmethod
def const(decision: bool = True):
return ConstSampler(decision)
@staticmethod
def adaptive():
return {
"strategy": "adaptive",
"sampling_rate": 0.1
}
class JaegerReporter:
@staticmethod
def create_remote_reporter(agent_host: str = "localhost", agent_port: int = 6831):
return Reporter(
agent_host_name=agent_host,
agent_port=agent_port,
log_spans=True,
max_queue_size=10000,
flush_interval=1.0
)
@staticmethod
def create_composite_reporter(
remote_host: str = "localhost",
remote_port: int = 6831,
log_file: str = None
):
reporters = []
reporters.append(
Reporter(
agent_host_name=remote_host,
agent_port=remote_port
)
)
if log_file:
from jaeger_client.reporter import LogReporter
reporters.append(LogReporter(log_file))
from jaeger_client.reporter import CompositeReporter
return CompositeReporter(reporters)
Jaeger Query Client
from jaeger_client import Query
import requests
from typing import List, Dict, Optional
from datetime import datetime, timedelta
class JaegerQueryClient:
def __init__(self, jaeger_url: str = "http://localhost:16686"):
self.jaeger_url = jaeger_url
self.api_url = f"{jaeger_url}/api"
def find_traces(
self,
service: str,
operation: str = None,
start_time: datetime = None,
end_time: datetime = None,
tags: Dict = None,
limit: int = 100
) -> List[Dict]:
params = {
"service": service,
"limit": limit
}
if operation:
params["operation"] = operation
if start_time:
params["start"] = int(start_time.timestamp() * 1000)
if end_time:
params["end"] = int(end_time.timestamp() * 1000)
if tags:
params["tags"] = json.dumps(tags)
response = requests.get(f"{self.api_url}/traces", params=params)
response.raise_for_status()
return response.json().get("data", [])
def get_trace(self, trace_id: str) -> Dict:
response = requests.get(f"{self.api_url}/trace/{trace_id}")
response.raise_for_status()
return response.json()
def get_service_operations(self, service: str) -> List[str]:
response = requests.get(f"{self.api_url}/services/{service}/operations")
response.raise_for_status()
data = response.json()
return data.get("data", [])
def get_services(self) -> List[str]:
response = requests.get(f"{self.api_url}/services")
response.raise_for_status()
data = response.json()
return data.get("data", [])
def get_dependencies(self, end_time: datetime = None, lookback: str = "1h") -> List[Dict]:
params = {"lookback": lookback}
if end_time:
params["end_time"] = int(end_time.timestamp() * 1000)
response = requests.get(f"{self.api_url}/dependencies", params=params)
response.raise_for_status()
return response.json().get("data", [])
Analyzing Traces
class TraceAnalyzer:
def __init__(self, query_client: JaegerQueryClient):
self.client = query_client
def analyze_latency(
self,
service: str,
operation: str = None,
time_range: timedelta = timedelta(hours=1)
) -> Dict:
end_time = datetime.utcnow()
start_time = end_time - time_range
traces = self.client.find_traces(
service=service,
operation=operation,
start_time=start_time,
end_time=end_time
)
if not traces:
return {}
durations = []
for trace in traces:
root_span = next(
(s for s in trace.get("spans", []) if s.get("parentSpanID") == ""),
None
)
if root_span:
duration = root_span.get("duration", 0)
durations.append(duration)
if not durations:
return {}
durations.sort()
return {
"count": len(durations),
"min": min(durations),
"max": max(durations),
"mean": sum(durations) / len(durations),
"p50": durations[len(durations) // 2],
"p95": durations[int(len(durations) * 0.95)],
"p99": durations[int(len(durations) * 0.99)]
}
def find_error_traces(
self,
service: str,
time_range: timedelta = timedelta(hours=1)
) -> List[Dict]:
end_time = datetime.utcnow()
start_time = end_time - time_range
traces = self.client.find_traces(
service=service,
tags={"error": "true"},
start_time=start_time,
end_time=end_time
)
return traces
def analyze_dependencies(
self,
time_range: timedelta = timedelta(hours=1)
) -> Dict:
end_time = datetime.utcnow()
dependencies = self.client.get_dependencies(
end_time=end_time,
lookback=f"{time_range.seconds // 3600}h"
)
service_map = {}
for dep in dependencies:
source = dep.get("client", {}).get("serviceName", "unknown")
target = dep.get("server", {}).get("serviceName", "unknown")
calls = dep.get("callCount", 0)
key = f"{source}->{target}"
service_map[key] = {
"source": source,
"target": target,
"calls": calls
}
return service_map
def find_slow_operations(
self,
service: str,
time_range: timedelta = timedelta(hours=1),
threshold_ms: int = 1000
) -> List[Dict]:
end_time = datetime.utcnow()
start_time = end_time - time_range
traces = self.client.find_traces(
service=service,
start_time=start_time,
end_time=end_time
)
slow_operations = []
for trace in traces:
for span in trace.get("spans", []):
duration = span.get("duration", 0)
if duration > threshold_ms:
slow_operations.append({
"operation": span.get("operationName"),
"duration": duration,
"trace_id": trace.get("traceID"),
"start_time": span.get("startTime")
})
return sorted(
slow_operations,
key=lambda x: x["duration"],
reverse=True
)[:20]
Context Propagation
Context propagation links spans across service boundaries, enabling end-to-end trace correlation.
W3C Trace Context
from opentelemetry import context
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace.propagation import SpanContext
from opentelemetry.trace.propagation import set_global_textmap
from opentelemetry.propagate import inject, extract
from opentelemetry.propagation import TextMapPropagator
class W3CTraceContextPropagator:
def __init__(self):
self.propagator = W3CPropagator()
set_global_textmap(self.propagator)
def inject(self, carrier: Dict):
inject(carrier)
def extract(self, carrier: Dict):
return extract(carrier)
class CustomPropagator(TextMapPropagator):
def inject(self, set_in_carrier, context: Context):
span = trace.get_span(context)
if span:
trace_id = format(span.context.trace_id, '032x')
span_id = format(span.context.span_id, '016x')
set_in_carrier["x-trace-id"] = trace_id
set_in_carrier["x-span-id"] = span_id
def extract(self, get_from_carrier, context: Context) -> Context:
trace_id = get_from_carrier("x-trace-id")
span_id = get_from_carrier("x-span-id")
if trace_id and span_id:
span_context = SpanContext(
trace_id=int(trace_id, 16),
span_id=int(span_id, 16)
)
span = NonRecordingSpan(span_context)
return set_span_in_context(span, context)
return context
def inject_trace_context(headers: Dict):
inject(headers)
def extract_trace_context(headers: Dict):
return extract(headers)
HTTP Context Propagation
import httpx
class TracedHTTPClient:
def __init__(self, base_url: str = None):
self.client = httpx.Client(base_url=base_url)
def get(self, url: str, headers: Dict = None):
headers = headers or {}
inject(headers)
with tracer.start_as_current_span("http_get") as span:
span.set_attribute("http.method", "GET")
span.set_attribute("http.url", url)
response = self.client.get(url, headers=headers)
span.set_attribute("http.status_code", response.status_code)
return response
def post(self, url: str, data: Dict = None, headers: Dict = None):
headers = headers or {}
inject(headers)
with tracer.start_as_current_span("http_post") as span:
span.set_attribute("http.method", "POST")
span.set_attribute("http.url", url)
response = self.client.post(url, json=data, headers=headers)
span.set_attribute("http.status_code", response.status_code)
return response
class TracedHTTPHandler:
def __init__(self):
self.tracer = trace.get_tracer(__name__)
def extract_and_create_span(self, request):
ctx = extract(request.headers)
with self.tracer.start_as_current_span(
request.url.path,
context=ctx,
kind=trace.SpanKind.SERVER
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
span.set_attribute("http.host", request.url.netloc)
span.set_attribute("http.user_agent", request.headers.get("user-agent", ""))
return span
Messaging System Propagation
from opentelemetry import context
from opentelemetry.trace.propagation import SpanContext
class KafkaContextPropagation:
@staticmethod
def inject_producer_headers(message_headers: Dict):
inject(message_headers)
@staticmethod
def extract_consumer_headers(message_headers: Dict) -> context.Context:
return extract(message_headers)
class RabbitMQContextPropagation:
@staticmethod
def inject_message_properties(properties, message):
inject(properties)
@staticmethod
def extract_message_properties(properties) -> context.Context:
return extract(properties)
class AsyncMessagingTracing:
def __init__(self, tracer):
self.tracer = tracer
async def send_message(self, topic: str, message: Dict, headers: Dict = None):
headers = headers or {}
inject(headers)
with self.tracer.start_as_current_span(
f"send to {topic}",
kind=trace.SpanKind.PRODUCER
) as span:
span.set_attribute("messaging.system", "kafka")
span.set_attribute("messaging.destination", topic)
await self._send_to_kafka(topic, message, headers)
async def receive_message(self, topic: str, message: Dict, headers: Dict):
ctx = extract(headers)
with self.tracer.start_as_current_span(
f"receive from {topic}",
kind=trace.SpanKind.CONSUMER,
context=ctx
) as span:
span.set_attribute("messaging.system", "kafka")
span.set_attribute("messaging.destination", topic)
await self._process_message(message)
async def _send_to_kafka(self, topic: str, message: Dict, headers: Dict):
pass
async def _process_message(self, message: Dict):
pass
Sampling Strategies
Sampling reduces the volume of traces collected while maintaining representative data.
Sampling Strategies
from opentelemetry.sdk.trace.sampling import (
Sampler,
SamplingResult,
Decision,
ParentBased,
TraceIdRatioBased,
RateLimitingSampler
)
class CustomSampler(Sampler):
def __init__(self, sample_rate: float = 0.1):
self.sample_rate = sample_rate
self.trace_id_upper_bound = sample_rate * (2 ** 31)
def should_sample(
self,
parent_context: context.Context = None,
trace_id: int = None,
name: str = None,
attributes: Dict = None
) -> SamplingResult:
trace_id_lower = trace_id & (2 ** 31 - 1)
if trace_id_lower < self.trace_id_upper_bound:
return SamplingResult(
Decision.RECORD_AND_SPAN,
attributes or {}
)
return SamplingResult(
Decision.DROP,
{}
)
@property
def description(self) -> str:
return f"CustomSampler({self.sample_rate})"
class SamplingStrategies:
@staticmethod
def probabilistic(rate: float = 0.01):
return TraceIdRatioBased(rate)
@staticmethod
def rate_limited(max_traces_per_second: int = 10):
return RateLimitingSampler(max_traces_per_second)
@staticmethod
def parent_based(root: Sampler, remote_parent: Sampler = None):
return ParentBased(
root=root,
remote_parent_sampler=remote_parent
)
@staticmethod
def always_on():
from opentelemetry.sdk.trace.sampling import AlwaysOnSampler
return AlwaysOnSampler()
@staticmethod
def always_off():
from opentelemetry.sdk.trace.sampling import AlwaysOffSampler
return AlwaysOffSampler()
@staticmethod
def adaptive(sample_rate: float = 0.1, error_threshold: float = 0.05):
return {
"strategy": "adaptive",
"sample_rate": sample_rate,
"error_threshold": error_threshold
}
Dynamic Sampling
class DynamicSampler:
def __init__(self, base_rate: float = 0.01):
self.base_rate = base_rate
self.error_rates = {}
self.latency_rates = {}
def should_sample(
self,
service: str,
operation: str,
has_error: bool,
latency_ms: float
) -> bool:
key = f"{service}:{operation}"
error_rate = self.error_rates.get(key, 0)
if error_rate > 0.1:
return True
latency_rate = self.latency_rates.get(key, 0)
if latency_rate > 1000:
return True
import random
return random.random() < self.base_rate
def update_metrics(
self,
service: str,
operation: str,
has_error: bool,
latency_ms: float
):
key = f"{service}:{operation}"
if has_error:
current = self.error_rates.get(key, 0)
self.error_rates[key] = current * 0.9 + 0.1
else:
current = self.error_rates.get(key, 0)
self.error_rates[key] = current * 0.9
current_latency = self.latency_rates.get(key, 0)
self.latency_rates[key] = current_latency * 0.9 + latency_ms * 0.1
Visualization and Analysis
Building Trace Dashboards
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.express as px
import pandas as pd
class TraceDashboard:
def __init__(self, jaeger_client: JaegerQueryClient):
self.client = jaeger_client
self.app = dash.Dash(__name__)
self._setup_layout()
self._setup_callbacks()
def _setup_layout(self):
self.app.layout = html.Div([
html.H1("Distributed Tracing Dashboard"),
html.Div([
html.Label("Service:"),
dcc.Dropdown(
id="service-selector",
options=self._get_services(),
value="api-gateway"
),
html.Label("Time Range:"),
dcc.Dropdown(
id="time-range",
options=[
{"label": "Last 15 minutes", "value": "15m"},
{"label": "Last hour", "value": "1h"},
{"label": "Last 24 hours", "value": "24h"}
],
value="1h"
)
]),
html.Div([
dcc.Graph(id="latency-chart"),
dcc.Graph(id="error-chart"),
dcc.Graph(id="dependency-graph")
]),
dcc.Interval(
id="refresh-interval",
interval=60000,
n_intervals=0
)
])
def _setup_callbacks(self):
@self.app.callback(
[Output("latency-chart", "figure"),
Output("error-chart", "figure"),
Output("dependency-graph", "figure")],
[Input("service-selector", "value"),
Input("time-range", "value"),
Input("refresh-interval", "n_intervals")]
)
def update_charts(service, time_range, n):
latency_data = self._get_latency_data(service, time_range)
error_data = self._get_error_data(service, time_range)
dep_data = self._get_dependency_data(time_range)
latency_fig = px.line(
latency_data,
x="timestamp",
y="p95",
title=f"Latency - {service}"
)
error_fig = px.bar(
error_data,
x="operation",
y="error_count",
title=f"Errors - {service}"
)
dep_fig = px.bar(
dep_data,
x="source",
y="calls",
barmode="group"
)
return latency_fig, error_fig, dep_fig
def _get_services(self):
services = self.client.get_services()
return [{"label": s, "value": s} for s in services]
def _get_latency_data(self, service: str, time_range: str):
return pd.DataFrame()
def _get_error_data(self, service: str, time_range: str):
return pd.DataFrame()
def _get_dependency_data(self, time_range: str):
return pd.DataFrame()
def run(self, debug: bool = True, port: int = 8050):
self.app.run_server(debug=debug, port=port)
Best Practices
OBSERVABILITY_BEST_PRACTICES = {
"instrumentation": [
"Instrument at component boundaries (HTTP, database, messaging)",
"Add business context to spans",
"Use consistent naming conventions",
"Include error information in spans",
"Add relevant tags for filtering"
],
"sampling": [
"Use head sampling for high-volume services",
"Use tail sampling to capture errors and slow traces",
"Adjust sampling rates based on load",
"Ensure representative sampling across services"
],
"context_propagation": [
"Propagate context across all service boundaries",
"Use standard propagation formats (W3C)",
"Handle missing context gracefully",
"Ensure backward compatibility"
],
"storage": [
"Set appropriate retention periods",
"Implement tiered storage (hot/warm/cold)",
"Monitor storage costs and growth",
"Plan for disaster recovery"
],
"analysis": [
"Create dashboards for key metrics",
"Set up alerts for error rates and latency",
"Regularly review slow traces",
"Document investigation procedures"
]
}
Integration Examples
Flask Integration
from flask import Flask, request
from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.trace.propagation import set_span_in_context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
def create_traced_app(service_name: str, jaeger_endpoint: str):
app = Flask(__name__)
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
BatchSpanProcessor(
JaegerExporter(
agent_host_name=jaeger_endpoint,
agent_port=6831
)
)
)
trace.set_tracer_provider(tracer_provider)
FlaskInstrumentor().instrument_app(app)
@app.route("/api/<path>")
def api_handler(path):
with trace.get_tracer(__name__).start_as_current_span(path) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", request.url)
span.set_attribute("http.target", path)
return {"status": "ok"}
return app
Django Integration
from opentelemetry.instrumentation.django import DjangoInstrumentor
class DjangoOTelMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.tracer = trace.get_tracer(__name__)
def __call__(self, request):
with self.tracer.start_as_current_span(
request.path,
kind=trace.SpanKind.SERVER
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", request.get_full_path())
span.set_attribute("http.host", request.get_host())
response = self.get_response(request)
span.set_attribute("http.status_code", response.status_code)
return response
class Settings:
OPENTELEMETRY = {
"SERVICE_NAME": "my-django-app",
"JAEGER_ENDPOINT": "http://localhost:6831",
"SAMPLER": {
"type": "probabilistic",
"param": 0.1
}
}
Resources
- OpenTelemetry Documentation
- Jaeger Documentation
- OpenTelemetry Python SDK
- W3C Trace Context
- Distributed Tracing Best Practices
Conclusion
Distributed tracing with OpenTelemetry and Jaeger provides the visibility needed to understand and optimize modern microservices architectures. By implementing comprehensive tracing, you can quickly identify performance bottlenecks, diagnose errors, and make data-driven decisions about your system’s architecture.
This guide covered the fundamentals of distributed tracing, OpenTelemetry implementation, Jaeger integration, context propagation, sampling strategies, and practical applications. With these tools and techniques, you can build observable systems that provide deep insights into behavior across your entire infrastructure.
Comments