Introduction
Distributed tracing provides visibility into complex microservices architectures. With requests spanning dozens of services, tracing is essential for debugging and performance optimization.
Key Statistics:
- 80% of microservices issues require distributed tracing to diagnose
- Average MTTR improvement with tracing: 60%
- Traces can reveal issues invisible to logs and metrics
- Proper tracing reduces debugging time by 70%
Tracing Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Distributed Tracing Flow โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Service โโโโโถโ Service โโโโโถโ Service โโโโโถโ Service โ โ
โ โ A โ โ B โ โ C โ โ D โ โ
โ โ โ โ โ โ โ โ โ โ
โ โ Span A1 โ โ Span B1 โ โ Span C1 โ โ Span D1 โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ โ โ
โ โโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ Trace Collector โ โ
โ โ (Jaeger/Zipkin)โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ Storage โ โ
โ โ (ES/Cassandra) โ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
OpenTelemetry Instrumentation
Automatic Instrumentation
#!/usr/bin/env python3
"""OpenTelemetry Python instrumentation."""
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.database import DatabaseInstrumentor
# Configure tracing
trace.set_tracer_provider(
TracerProvider(
resource=Resource.create({
SERVICE_NAME: "my-service",
"service.version": "1.0.0",
"deployment.environment": "production"
})
)
)
# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
# Add processor
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Instrument libraries
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
DatabaseInstrumentor().instrument()
Manual Instrumentation
#!/usr/bin/env python3
"""Manual span creation."""
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
# Create spans
@tracer.start_as_current_span("process_order")
def process_order(order_id):
# Parent span
with tracer.start_as_current_span("validate_order") as span:
span.set_attribute("order.id", order_id)
span.add_event("Order validation started")
validate_order(order_id)
# Child spans
with tracer.start_as_current_span("check_inventory") as span:
span.set_attribute("order.id", order_id)
span.set_attribute("span.kind", "client")
check_inventory(order_id)
with tracer.start_as_current_span("process_payment") as span:
span.set_attribute("order.id", order_id)
span.set_attribute("span.kind", "client")
process_payment(order_id)
with tracer.start_as_current_span("update_inventory") as span:
span.set_attribute("order.id", order_id)
span.set_attribute("span.kind", "client")
update_inventory(order_id)
# Error handling
try:
confirm_order(order_id)
except Exception as e:
span = trace.get_current_span()
span.set_status(trace.StatusCode.ERROR, str(e))
span.record_exception(e)
raise
# Database instrumentation
@tracer.start_as_current_span("db_query")
def execute_query(query, params):
span = trace.get_current_span()
span.set_attribute("db.system", "postgresql")
span.set_attribute("db.statement", query)
span.set_attribute("db.operation", "SELECT")
with tracer.start_as_current_span("db_connection") as db_span:
db_span.set_attribute("peer.address", "localhost:5432")
result = db.execute(query, params)
return result
Jaeger
Deployment
# Jaeger operator deployment
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: production
spec:
strategy: production
collector:
maxReplicas: 3
resources:
limits:
cpu: 500m
memory: 512Mi
query:
replicas: 2
options:
query.max-clock-skew-ttl: 60s
storage:
type: elasticsearch
elasticsearch:
nodeCount: 3
redundancyPolicy: SingleRedundancy
storage:
size: 200Gi
ingress:
enabled: true
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
hosts:
- jaeger.example.com
tls:
- secretName: jaeger-tls
hosts:
- jaeger.example.com
Querying Traces
#!/usr/bin/env python3
"""Jaeger client for querying."""
from jaeger_client import Client
client = Client(host='jaeger', port=16686)
# Find traces
traces = client.search_traces(
service='order-service',
operation='POST /orders',
tags={'http.status_code': '500'},
start_time_min=start_time,
limit=10
)
# Get specific trace
trace = client.get_trace(trace_id)
# Analyze span timings
for span in trace.spans:
print(f"{span.operation_name}: {span.duration}ms")
if span.tags.get('error'):
print(f" Error: {span.tags.get('error.message')}")
Sampling Strategies
#!/usr/bin/env python3
"""Adaptive sampling strategies."""
from opentelemetry.sdk.trace.sampling import (
SamplingResult,
Decision,
TraceIdRatio,
ParentBasedTraceIdRatio,
)
class AdaptiveSampler:
"""Adaptive sampling based on traffic and errors."""
def __init__(self):
self.base_rate = 0.1
self.error_rate = 0.0
self.target_spans_per_second = 100
def should_sample(self, parent_context, trace_id, name, kind, attributes):
"""Determine if span should be sampled."""
# Always sample errors
if attributes.get('error'):
return SamplingResult(Decision.RECORD_AND_SAMPLE)
# Increase rate for high-traffic services
traffic_multiplier = self._get_traffic_multiplier(name)
# Adjust for error rate
if self.error_rate > 0.1:
traffic_multiplier *= 2
rate = min(1.0, self.base_rate * traffic_multiplier)
trace_id_ratio = TraceIdRatio(rate)
return trace_id_ratio.should_sample(parent_context, trace_id, name, kind, attributes)
def _get_traffic_multiplier(self, service_name):
"""Get sampling multiplier based on service traffic."""
multipliers = {
'order-service': 2.0,
'payment-service': 3.0,
'notification-service': 0.5,
}
return multipliers.get(service_name, 1.0)
class Tail-Based Sampling
"""Tail-based sampling for complete traces."""
def __init__(self):
self.pending_traces = {}
self.error_policy = ErrorSamplingPolicy()
self.latency_policy = LatencySamplingPolicy(p99_threshold_ms=5000)
def process_span(self, span):
"""Process completed span."""
trace_id = span.trace_id
if trace_id not in self.pending_traces:
self.pending_traces[trace_id] = []
self.pending_traces[trace_id].append(span)
# Check if trace is complete
if self._is_trace_complete(trace_id):
self._evaluate_and_sample(trace_id)
def _is_trace_complete(self, trace_id):
"""Check if all spans in trace are complete."""
spans = self.pending_traces.get(trace_id, [])
root_span = next((s for s in spans if s.parent_id is None), None)
if not root_span:
return False
# Check if root span is complete (has end time)
return root_span.end_time is not None
def _evaluate_and_sample(self, trace_id):
"""Evaluate trace and decide sampling."""
spans = self.pending_traces.pop(trace_id)
# Check policies
if self.error_policy.should_sample(spans):
self._sample_trace(trace_id, spans, 'error')
elif self.latency_policy.should_sample(spans):
self._sample_trace(trace_id, spans, 'latency')
def _sample_trace(self, trace_id, spans, reason):
"""Sample trace for export."""
print(f"Sampling trace {trace_id} due to {reason}")
# Send to collector
Comments