Introduction
You’ve deployed your AI agent to production. Now how do you know what’s happening? Why did the agent make that decision? Where are the bottlenecks?
This is where observability comes in. Just like traditional software, AI agents need comprehensive monitoring, logging, and debugging tools to operate reliably.
This guide covers everything about agent observability: logging, tracing, metrics, and debugging strategies.
The Observability Challenge
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ AGENT OBSERVABILITY CHALLENGES โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Traditional Software AI Agents โ
โ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ
โ Deterministic Probabilistic โ
โ Clear state Hidden state โ
โ Easy debugging Hard to understand โ
โ Known failure modes Unexpected behaviors โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ What we need to know: โ โ
โ โ โ โ
โ โ โข What did the agent decide? โ โ
โ โ โข Why did it make that decision? โ โ
โ โ โข What tools did it use? โ โ
โ โ โข How long did each step take? โ โ
โ โ โข Where did things go wrong? โ โ
โ โ โข Is it behaving as expected? โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Core Components
The Three Pillars
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ THREE PILLARS OF OBSERVABILITY โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ Logs โ โ Metrics โ โ Traces โ โ
โ โโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโค โ
โ โ โ โ โ โ โ โ
โ โ Timestamped โ โ Aggregated โ โ End-to- โ โ
โ โ events โ โ measures โ โ end flow โ โ
โ โ โ โ โ โ โ โ
โ โ What โ โ How much/ โ โ How it โ โ
โ โ happened โ โ how often โ โ got there โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Implementation
1. Structured Logging
import structlog
import json
from datetime import datetime
from typing import Any, Dict
# Configure structured logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
class AgentLogger:
"""Specialized logger for agent operations"""
def log_request(self, agent_id: str, request: Dict):
logger.info(
"agent_request",
agent_id=agent_id,
request_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat(),
request_type=request.get("type"),
input_length=len(request.get("input", ""))
)
def log_decision(self, agent_id: str, decision: Dict):
logger.info(
"agent_decision",
agent_id=agent_id,
timestamp=datetime.utcnow().isoformat(),
decision_type=decision.get("type"),
confidence=decision.get("confidence"),
reasoning=decision.get("reasoning")[:500], # Truncate long reasoning
context_used=decision.get("context_used", [])
)
def log_tool_call(
self,
agent_id: str,
tool_name: str,
params: Dict,
result: Any,
duration_ms: float
):
logger.info(
"tool_call",
agent_id=agent_id,
timestamp=datetime.utcnow().isoformat(),
tool_name=tool_name,
params=params,
success=isinstance(result, dict) and result.get("success", True),
duration_ms=duration_ms,
result_size=len(str(result))
)
def log_error(
self,
agent_id: str,
error: Exception,
context: Dict
):
logger.error(
"agent_error",
agent_id=agent_id,
timestamp=datetime.utcnow().isoformat(),
error_type=type(error).__name__,
error_message=str(error),
context=context
)
2. Distributed Tracing
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.llm import LlmSpanExporter
# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Add Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
class TracedAgent:
"""Agent with full tracing"""
def __init__(self, agent):
self.agent = agent
self.tracer = tracer
async def execute(self, request: Dict) -> Dict:
# Create span for entire request
with self.tracer.start_as_current_span(
"agent_execution",
attributes={
"agent.type": self.agent.type,
"request.type": request.get("type"),
}
) as span:
try:
# Trace: Understanding request
with self.tracer.start_as_current_span("understand_request") as subspan:
subspan.set_attribute("operation", "understand")
understanding = await self.agent.understand(request)
subspan.set_attribute("understanding.confidence", understanding.confidence)
# Trace: Decision making
with self.tracer.start_as_current_span("make_decision") as subspan:
subspan.set_attribute("operation", "decide")
decision = await self.agent.decide(understanding)
subspan.set_attribute("decision.type", decision.type)
subspan.set_attribute("decision.confidence", decision.confidence)
# Trace: Tool execution
for tool_call in decision.tool_calls:
with self.tracer.start_as_current_span(f"tool.{tool_call.name}") as subspan:
subspan.set_attribute("tool.name", tool_call.name)
subspan.set_attribute("tool.input", str(tool_call.params)[:200])
result = await self.agent.execute_tool(tool_call)
subspan.set_attribute("tool.success", result.success)
subspan.set_attribute("tool.output", str(result)[:200])
# Trace: Response generation
with self.tracer.start_as_current_span("generate_response") as subspan:
response = await self.agent.generate_response(decision, tool_results)
span.set_status(Status(StatusCode.OK))
return response
except Exception as e:
span.set_status(Status(StatusCode.ERROR), str(e))
span.record_exception(e)
raise
3. Metrics Collection
from prometheus_client import Counter, Histogram, Gauge, Summary
import time
# Define metrics
AGENT_REQUESTS = Counter(
'agent_requests_total',
'Total agent requests',
['agent_type', 'status']
)
AGENT_LATENCY = Histogram(
'agent_latency_seconds',
'Agent request latency',
['agent_type', 'operation'],
buckets=[0.1, 0.5, 1, 2, 5, 10]
)
TOOL_USAGE = Counter(
'agent_tool_usage_total',
'Tool usage count',
['agent_type', 'tool_name']
)
ACTIVE_AGENTS = Gauge(
'agent_active_instances',
'Number of active agent instances',
['agent_type']
)
CONTEXT_TOKENS = Histogram(
'agent_context_tokens',
'Context tokens used',
['agent_type']
)
class MetricsCollector:
"""Collect and track agent metrics"""
def __init__(self, agent_type: str):
self.agent_type = agent_type
def track_request(self, status: str):
AGENT_REQUESTS.labels(
agent_type=self.agent_type,
status=status
).inc()
def track_latency(self, operation: str, duration: float):
AGENT_LATENCY.labels(
agent_type=self.agent_type,
operation=operation
).observe(duration)
def track_tool_usage(self, tool_name: str):
TOOL_USAGE.labels(
agent_type=self.agent_type,
tool_name=tool_name
).inc()
def track_tokens(self, count: int):
CONTEXT_TOKENS.labels(
agent_type=self.agent_type
).observe(count)
async def __aenter__(self):
ACTIVE_AGENTS.labels(agent_type=self.agent_type).inc()
self.start_time = time.time()
return self
async def __aexit__(self, *args):
duration = time.time() - self.start_time
self.track_latency("total", duration)
ACTIVE_AGENTS.labels(agent_type=self.agent_type).dec()
Observability Platforms
Langfuse Integration
from langfuse import Langfuse
class LangfuseObserver:
"""Langfuse integration for agent tracing"""
def __init__(self, public_key: str, secret_key: str):
self.langfuse = Langfuse(public_key, secret_key)
def trace_agent(self, agent_id: str, session_id: str):
"""Create a Langfuse trace"""
return self.langfuse.trace(
name=agent_id,
session_id=session_id,
metadata={"agent_type": "chat"}
)
def log_generation(self, trace, model: str, prompt: str, completion: str):
"""Log LLM generation"""
trace.generation(
model=model,
prompt=prompt,
completion=completion,
model_parameters={
"temperature": 0.7,
"max_tokens": 1000
}
)
def log_tool_call(self, trace, tool_name: str, input: Dict, output: Any):
"""Log tool execution"""
trace.span(
name=tool_name,
input=input,
output=output,
metadata={"type": "tool_call"}
)
# Usage
observer = LangfuseObserver(KEY, SECRET)
with observer.trace_agent("agent-123", "session-456") as trace:
# Log understanding
understanding = await agent.understand(request)
trace.span(name="understand", input=request, output=understanding)
# Log generation
observer.log_generation(
trace,
model="gpt-4",
prompt=prompt,
completion=response
)
LangSmith Integration
from langsmith import Client
class LangSmithObserver:
"""LangSmith integration"""
def __init__(self, api_key: str):
self.client = Client(api_key=api_key)
self.run_id = None
async def start_run(self, name: str, inputs: Dict):
"""Start a tracked run"""
self.run_id = self.client.create_run(
name=name,
inputs=inputs,
run_type="agent"
)
async def end_run(self, outputs: Dict, error: Exception = None):
"""End the run"""
self.client.end_run(
run_id=self.run_id,
outputs=outputs,
error=str(error) if error else None
)
async def log_tool_event(self, tool_name: str, event: str, data: Dict):
"""Log tool events"""
self.client.create_feedback(
run_id=self.run_id,
key=f"tool_{event}",
score=1 if event == "success" else 0,
data=data
)
# Usage
observer = LangSmithObserver(KEY)
await observer.start_run("agent_execution", {"input": user_message})
try:
result = await agent.execute(user_message)
await observer.end_run({"result": result})
except Exception as e:
await observer.end_run({}, error=e)
Debugging Strategies
1. Request Replay
class AgentDebugger:
"""Debug agents by replaying requests"""
def __init__(self, agent):
self.agent = agent
self.event_log = []
async def execute_with_logging(self, request: Dict) -> Dict:
"""Execute with detailed logging"""
self.event_log = []
# Log initial state
self._log_event("start", {"request": request})
try:
# Step 1: Understand
understanding = await self.agent.understand(request)
self._log_event("understand", {
"understanding": understanding.dict()
})
# Step 2: Decide
decision = await self.agent.decide(understanding)
self._log_event("decide", {
"decision": decision.dict()
})
# Step 3: Execute tools
for tool_call in decision.tool_calls:
result = await self.agent.execute_tool(tool_call)
self._log_event("tool_call", {
"tool": tool_call.name,
"input": tool_call.params,
"output": result
})
# Step 4: Generate response
response = await self.agent.generate_response(decision)
self._log_event("complete", {"response": response})
return response
except Exception as e:
self._log_event("error", {
"error": str(e),
"traceback": traceback.format_exc()
})
raise
def _log_event(self, event_type: str, data: Dict):
"""Log an event"""
self.event_log.append({
"timestamp": datetime.utcnow().isoformat(),
"type": event_type,
"data": data
})
def get_execution_trace(self) -> List[Dict]:
"""Get full execution trace for debugging"""
return self.event_log
def export_for_replay(self, path: str):
"""Export for replay"""
with open(path, 'w') as f:
json.dump({
"events": self.event_log,
"agent_config": self.agent.config
}, f, indent=2)
2. Decision Explainability
class DecisionExplainer:
"""Explain agent decisions"""
def __init__(self, agent):
self.agent = agent
async def explain(self, decision: Decision) -> Explanation:
"""Generate human-readable explanation"""
parts = []
# What was the goal
parts.append(f"Goal: {decision.goal}")
# What context was used
if decision.context:
parts.append(f"\nContext used:")
for ctx in decision.context[:3]:
parts.append(f" - {ctx}")
# Why this decision
if decision.reasoning:
parts.append(f"\nReasoning:\n{decision.reasoning}")
# What tools were chosen
if decision.tool_calls:
parts.append(f"\nTools selected:")
for tc in decision.tool_calls:
parts.append(f" - {tc.name}: {tc.reasoning}")
# Confidence
parts.append(f"\nConfidence: {decision.confidence:.0%}")
# Alternatives considered
if decision.alternatives:
parts.append(f"\nAlternatives considered:")
for alt in decision.alternatives:
parts.append(f" - {alt.reason} (confidence: {alt.confidence:.0%})")
return Explanation(
decision_id=decision.id,
summary="\n".join(parts),
confidence=decision.confidence,
factors=self._extract_factors(decision)
)
def _extract_factors(self, decision: Decision) -> Dict:
"""Extract key factors"""
return {
"context_quality": decision.context_quality,
"reasoning_length": len(decision.reasoning),
"tool_count": len(decision.tool_calls),
"alternatives_considered": len(decision.alternatives)
}
3. Failure Analysis
class FailureAnalyzer:
"""Analyze agent failures"""
def __init__(self):
self.failure_log = []
async def analyze_failure(
self,
request: Dict,
error: Exception,
execution_trace: List[Dict]
) -> FailureReport:
"""Analyze what went wrong"""
# Categorize failure
failure_type = self._categorize_failure(error, execution_trace)
# Find root cause
root_cause = self._find_root_cause(
failure_type,
error,
execution_trace
)
# Suggest fixes
suggestions = self._suggest_fixes(failure_type, root_cause)
report = FailureReport(
failure_type=failure_type,
root_cause=root_cause,
suggestions=suggestions,
execution_trace=execution_trace,
severity=self._calculate_severity(failure_type)
)
self.failure_log.append(report)
return report
def _categorize_failure(
self,
error: Exception,
trace: List[Dict]
) -> str:
"""Categorize the failure type"""
error_msg = str(error).lower()
if "timeout" in error_msg:
return "timeout"
elif "rate limit" in error_msg:
return "rate_limit"
elif "permission" in error_msg:
return "permission_error"
elif "invalid input" in error_msg:
return "invalid_input"
elif "api" in error_msg:
return "api_error"
else:
return "unknown_error"
def _find_root_cause(
self,
failure_type: str,
error: Exception,
trace: List[Dict]
) -> str:
"""Find root cause in trace"""
if failure_type == "timeout":
# Find slowest operation
durations = []
for event in trace:
if event.get("duration_ms", 0) > 10000:
durations.append(event)
return f"Timeout in: {durations[0]['type'] if durations else 'unknown'}"
# ... more analysis
return str(error)
Alerting
Setting Up Alerts
class AgentAlerts:
"""Alerting for agent issues"""
def __init__(self):
self.thresholds = {
"latency_p95": 5000, # ms
"error_rate": 0.05, # 5%
"tool_failure_rate": 0.1, # 10%
"context_token_limit": 0.9 # 90% of max
}
self.notifiers = []
def add_notifier(self, notifier):
self.notifiers.append(notifier)
async def check_metrics(self, metrics: MetricsSnapshot):
"""Check metrics against thresholds"""
alerts = []
# Latency
if metrics.latency_p95 > self.thresholds["latency_p95"]:
alerts.append(Alert(
severity="warning",
type="high_latency",
message=f"P95 latency {metrics.latency_p95}ms exceeds threshold"
))
# Error rate
if metrics.error_rate > self.thresholds["error_rate"]:
alerts.append(Alert(
severity="critical",
type="high_error_rate",
message=f"Error rate {metrics.error_rate:.1%} exceeds threshold"
))
# Send alerts
for alert in alerts:
for notifier in self.notifiers:
await notifier.send(alert)
Best Practices
Good: Comprehensive Logging
# Good: Log at appropriate levels
logger.debug("Detailed reasoning", reasoning=reasoning)
logger.info("Tool call", tool=tool_name, params=params)
logger.warning("Retrying after failure", attempt=attempt)
logger.error("Agent failed", error=str(error))
Bad: Too Much or Too Little
# Bad: Logging everything
logger.info("Agent starting") # Too much
logger.info("Agent completed") # Missing context
# Good: Meaningful logging
logger.info("Agent request",
request_type=request.type,
has_context=bool(context),
tool_count=len(tool_calls)
)
Good: Distributed Context
# Good: Include correlation IDs
def log_with_context(logger, request_id: str, **kwargs):
logger.info(
"event",
request_id=request_id,
**kwargs
)
Tools Comparison
| Tool | Type | Best For | Cost |
|---|---|---|---|
| Langfuse | Platform | Full-stack observability | Free tier + paid |
| LangSmith | Platform | LLM application tracing | Free tier + paid |
| Arize | Platform | Enterprise ML observability | Paid |
| OpenTelemetry | Library | Custom implementation | Free |
| Datadog | Platform | Full-stack monitoring | Paid |
| Grafana | Platform | Metrics + visualization | Free |
Conclusion
Observability is essential for production agents:
- Structured logging - Track events with context
- Distributed tracing - Understand execution flow
- Metrics - Measure performance and health
- Debugging tools - Replay and explain decisions
- Alerting - Proactive issue detection
Invest in observability from day one.
Related Articles
- Building Production AI Agents
- AI Agent Security
- AI Agent Frameworks Comparison
- Introduction to Agentic AI
Comments