Skip to main content
โšก Calmops

AI Agent Observability: Monitoring & Debugging Agents

Introduction

You’ve deployed your AI agent to production. Now how do you know what’s happening? Why did the agent make that decision? Where are the bottlenecks?

This is where observability comes in. Just like traditional software, AI agents need comprehensive monitoring, logging, and debugging tools to operate reliably.

This guide covers everything about agent observability: logging, tracing, metrics, and debugging strategies.


The Observability Challenge

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              AGENT OBSERVABILITY CHALLENGES                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                      โ”‚
โ”‚   Traditional Software              AI Agents                          โ”‚
โ”‚   โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€              โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€                         โ”‚
โ”‚                                                                      โ”‚
โ”‚   Deterministic                   Probabilistic                        โ”‚
โ”‚   Clear state                     Hidden state                         โ”‚
โ”‚   Easy debugging                  Hard to understand                   โ”‚
โ”‚   Known failure modes             Unexpected behaviors                 โ”‚
โ”‚                                                                      โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚   โ”‚  What we need to know:                                       โ”‚   โ”‚
โ”‚   โ”‚                                                              โ”‚   โ”‚
โ”‚   โ”‚  โ€ข What did the agent decide?                               โ”‚   โ”‚
โ”‚   โ”‚  โ€ข Why did it make that decision?                           โ”‚   โ”‚
โ”‚   โ”‚  โ€ข What tools did it use?                                   โ”‚   โ”‚
โ”‚   โ”‚  โ€ข How long did each step take?                             โ”‚   โ”‚
โ”‚   โ”‚  โ€ข Where did things go wrong?                               โ”‚   โ”‚
โ”‚   โ”‚  โ€ข Is it behaving as expected?                             โ”‚   โ”‚
โ”‚   โ”‚                                                              โ”‚   โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Core Components

The Three Pillars

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              THREE PILLARS OF OBSERVABILITY                               โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                      โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”            โ”‚
โ”‚   โ”‚   Logs      โ”‚   โ”‚   Metrics   โ”‚   โ”‚   Traces    โ”‚            โ”‚
โ”‚   โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค   โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค   โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค            โ”‚
โ”‚   โ”‚             โ”‚   โ”‚             โ”‚   โ”‚             โ”‚            โ”‚
โ”‚   โ”‚ Timestamped โ”‚   โ”‚ Aggregated  โ”‚   โ”‚   End-to-   โ”‚            โ”‚
โ”‚   โ”‚ events      โ”‚   โ”‚ measures    โ”‚   โ”‚   end flow  โ”‚            โ”‚
โ”‚   โ”‚             โ”‚   โ”‚             โ”‚   โ”‚             โ”‚            โ”‚
โ”‚   โ”‚ What        โ”‚   โ”‚ How much/   โ”‚   โ”‚ How it      โ”‚            โ”‚
โ”‚   โ”‚ happened   โ”‚   โ”‚ how often   โ”‚   โ”‚ got there   โ”‚            โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ”‚
โ”‚                                                                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Implementation

1. Structured Logging

import structlog
import json
from datetime import datetime
from typing import Any, Dict

# Configure structured logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.JSONRenderer()
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
)

logger = structlog.get_logger()

class AgentLogger:
    """Specialized logger for agent operations"""
    
    def log_request(self, agent_id: str, request: Dict):
        logger.info(
            "agent_request",
            agent_id=agent_id,
            request_id=str(uuid.uuid4()),
            timestamp=datetime.utcnow().isoformat(),
            request_type=request.get("type"),
            input_length=len(request.get("input", ""))
        )
    
    def log_decision(self, agent_id: str, decision: Dict):
        logger.info(
            "agent_decision",
            agent_id=agent_id,
            timestamp=datetime.utcnow().isoformat(),
            decision_type=decision.get("type"),
            confidence=decision.get("confidence"),
            reasoning=decision.get("reasoning")[:500],  # Truncate long reasoning
            context_used=decision.get("context_used", [])
        )
    
    def log_tool_call(
        self, 
        agent_id: str, 
        tool_name: str, 
        params: Dict,
        result: Any,
        duration_ms: float
    ):
        logger.info(
            "tool_call",
            agent_id=agent_id,
            timestamp=datetime.utcnow().isoformat(),
            tool_name=tool_name,
            params=params,
            success=isinstance(result, dict) and result.get("success", True),
            duration_ms=duration_ms,
            result_size=len(str(result))
        )
    
    def log_error(
        self, 
        agent_id: str, 
        error: Exception, 
        context: Dict
    ):
        logger.error(
            "agent_error",
            agent_id=agent_id,
            timestamp=datetime.utcnow().isoformat(),
            error_type=type(error).__name__,
            error_message=str(error),
            context=context
        )

2. Distributed Tracing

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.llm import LlmSpanExporter

# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Add Jaeger exporter
jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger",
    agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

class TracedAgent:
    """Agent with full tracing"""
    
    def __init__(self, agent):
        self.agent = agent
        self.tracer = tracer
    
    async def execute(self, request: Dict) -> Dict:
        # Create span for entire request
        with self.tracer.start_as_current_span(
            "agent_execution",
            attributes={
                "agent.type": self.agent.type,
                "request.type": request.get("type"),
            }
        ) as span:
            try:
                # Trace: Understanding request
                with self.tracer.start_as_current_span("understand_request") as subspan:
                    subspan.set_attribute("operation", "understand")
                    understanding = await self.agent.understand(request)
                    subspan.set_attribute("understanding.confidence", understanding.confidence)
                
                # Trace: Decision making
                with self.tracer.start_as_current_span("make_decision") as subspan:
                    subspan.set_attribute("operation", "decide")
                    decision = await self.agent.decide(understanding)
                    subspan.set_attribute("decision.type", decision.type)
                    subspan.set_attribute("decision.confidence", decision.confidence)
                
                # Trace: Tool execution
                for tool_call in decision.tool_calls:
                    with self.tracer.start_as_current_span(f"tool.{tool_call.name}") as subspan:
                        subspan.set_attribute("tool.name", tool_call.name)
                        subspan.set_attribute("tool.input", str(tool_call.params)[:200])
                        
                        result = await self.agent.execute_tool(tool_call)
                        
                        subspan.set_attribute("tool.success", result.success)
                        subspan.set_attribute("tool.output", str(result)[:200])
                
                # Trace: Response generation
                with self.tracer.start_as_current_span("generate_response") as subspan:
                    response = await self.agent.generate_response(decision, tool_results)
                
                span.set_status(Status(StatusCode.OK))
                return response
                
            except Exception as e:
                span.set_status(Status(StatusCode.ERROR), str(e))
                span.record_exception(e)
                raise

3. Metrics Collection

from prometheus_client import Counter, Histogram, Gauge, Summary
import time

# Define metrics
AGENT_REQUESTS = Counter(
    'agent_requests_total',
    'Total agent requests',
    ['agent_type', 'status']
)

AGENT_LATENCY = Histogram(
    'agent_latency_seconds',
    'Agent request latency',
    ['agent_type', 'operation'],
    buckets=[0.1, 0.5, 1, 2, 5, 10]
)

TOOL_USAGE = Counter(
    'agent_tool_usage_total',
    'Tool usage count',
    ['agent_type', 'tool_name']
)

ACTIVE_AGENTS = Gauge(
    'agent_active_instances',
    'Number of active agent instances',
    ['agent_type']
)

CONTEXT_TOKENS = Histogram(
    'agent_context_tokens',
    'Context tokens used',
    ['agent_type']
)

class MetricsCollector:
    """Collect and track agent metrics"""
    
    def __init__(self, agent_type: str):
        self.agent_type = agent_type
    
    def track_request(self, status: str):
        AGENT_REQUESTS.labels(
            agent_type=self.agent_type,
            status=status
        ).inc()
    
    def track_latency(self, operation: str, duration: float):
        AGENT_LATENCY.labels(
            agent_type=self.agent_type,
            operation=operation
        ).observe(duration)
    
    def track_tool_usage(self, tool_name: str):
        TOOL_USAGE.labels(
            agent_type=self.agent_type,
            tool_name=tool_name
        ).inc()
    
    def track_tokens(self, count: int):
        CONTEXT_TOKENS.labels(
            agent_type=self.agent_type
        ).observe(count)
    
    async def __aenter__(self):
        ACTIVE_AGENTS.labels(agent_type=self.agent_type).inc()
        self.start_time = time.time()
        return self
    
    async def __aexit__(self, *args):
        duration = time.time() - self.start_time
        self.track_latency("total", duration)
        ACTIVE_AGENTS.labels(agent_type=self.agent_type).dec()

Observability Platforms

Langfuse Integration

from langfuse import Langfuse

class LangfuseObserver:
    """Langfuse integration for agent tracing"""
    
    def __init__(self, public_key: str, secret_key: str):
        self.langfuse = Langfuse(public_key, secret_key)
    
    def trace_agent(self, agent_id: str, session_id: str):
        """Create a Langfuse trace"""
        return self.langfuse.trace(
            name=agent_id,
            session_id=session_id,
            metadata={"agent_type": "chat"}
        )
    
    def log_generation(self, trace, model: str, prompt: str, completion: str):
        """Log LLM generation"""
        trace.generation(
            model=model,
            prompt=prompt,
            completion=completion,
            model_parameters={
                "temperature": 0.7,
                "max_tokens": 1000
            }
        )
    
    def log_tool_call(self, trace, tool_name: str, input: Dict, output: Any):
        """Log tool execution"""
        trace.span(
            name=tool_name,
            input=input,
            output=output,
            metadata={"type": "tool_call"}
        )

# Usage
observer = LangfuseObserver(KEY, SECRET)

with observer.trace_agent("agent-123", "session-456") as trace:
    # Log understanding
    understanding = await agent.understand(request)
    trace.span(name="understand", input=request, output=understanding)
    
    # Log generation
    observer.log_generation(
        trace,
        model="gpt-4",
        prompt=prompt,
        completion=response
    )

LangSmith Integration

from langsmith import Client

class LangSmithObserver:
    """LangSmith integration"""
    
    def __init__(self, api_key: str):
        self.client = Client(api_key=api_key)
        self.run_id = None
    
    async def start_run(self, name: str, inputs: Dict):
        """Start a tracked run"""
        self.run_id = self.client.create_run(
            name=name,
            inputs=inputs,
            run_type="agent"
        )
    
    async def end_run(self, outputs: Dict, error: Exception = None):
        """End the run"""
        self.client.end_run(
            run_id=self.run_id,
            outputs=outputs,
            error=str(error) if error else None
        )
    
    async def log_tool_event(self, tool_name: str, event: str, data: Dict):
        """Log tool events"""
        self.client.create_feedback(
            run_id=self.run_id,
            key=f"tool_{event}",
            score=1 if event == "success" else 0,
            data=data
        )

# Usage
observer = LangSmithObserver(KEY)

await observer.start_run("agent_execution", {"input": user_message})

try:
    result = await agent.execute(user_message)
    await observer.end_run({"result": result})
except Exception as e:
    await observer.end_run({}, error=e)

Debugging Strategies

1. Request Replay

class AgentDebugger:
    """Debug agents by replaying requests"""
    
    def __init__(self, agent):
        self.agent = agent
        self.event_log = []
    
    async def execute_with_logging(self, request: Dict) -> Dict:
        """Execute with detailed logging"""
        
        self.event_log = []
        
        # Log initial state
        self._log_event("start", {"request": request})
        
        try:
            # Step 1: Understand
            understanding = await self.agent.understand(request)
            self._log_event("understand", {
                "understanding": understanding.dict()
            })
            
            # Step 2: Decide
            decision = await self.agent.decide(understanding)
            self._log_event("decide", {
                "decision": decision.dict()
            })
            
            # Step 3: Execute tools
            for tool_call in decision.tool_calls:
                result = await self.agent.execute_tool(tool_call)
                self._log_event("tool_call", {
                    "tool": tool_call.name,
                    "input": tool_call.params,
                    "output": result
                })
            
            # Step 4: Generate response
            response = await self.agent.generate_response(decision)
            self._log_event("complete", {"response": response})
            
            return response
            
        except Exception as e:
            self._log_event("error", {
                "error": str(e),
                "traceback": traceback.format_exc()
            })
            raise
    
    def _log_event(self, event_type: str, data: Dict):
        """Log an event"""
        self.event_log.append({
            "timestamp": datetime.utcnow().isoformat(),
            "type": event_type,
            "data": data
        })
    
    def get_execution_trace(self) -> List[Dict]:
        """Get full execution trace for debugging"""
        return self.event_log
    
    def export_for_replay(self, path: str):
        """Export for replay"""
        with open(path, 'w') as f:
            json.dump({
                "events": self.event_log,
                "agent_config": self.agent.config
            }, f, indent=2)

2. Decision Explainability

class DecisionExplainer:
    """Explain agent decisions"""
    
    def __init__(self, agent):
        self.agent = agent
    
    async def explain(self, decision: Decision) -> Explanation:
        """Generate human-readable explanation"""
        
        parts = []
        
        # What was the goal
        parts.append(f"Goal: {decision.goal}")
        
        # What context was used
        if decision.context:
            parts.append(f"\nContext used:")
            for ctx in decision.context[:3]:
                parts.append(f"  - {ctx}")
        
        # Why this decision
        if decision.reasoning:
            parts.append(f"\nReasoning:\n{decision.reasoning}")
        
        # What tools were chosen
        if decision.tool_calls:
            parts.append(f"\nTools selected:")
            for tc in decision.tool_calls:
                parts.append(f"  - {tc.name}: {tc.reasoning}")
        
        # Confidence
        parts.append(f"\nConfidence: {decision.confidence:.0%}")
        
        # Alternatives considered
        if decision.alternatives:
            parts.append(f"\nAlternatives considered:")
            for alt in decision.alternatives:
                parts.append(f"  - {alt.reason} (confidence: {alt.confidence:.0%})")
        
        return Explanation(
            decision_id=decision.id,
            summary="\n".join(parts),
            confidence=decision.confidence,
            factors=self._extract_factors(decision)
        )
    
    def _extract_factors(self, decision: Decision) -> Dict:
        """Extract key factors"""
        return {
            "context_quality": decision.context_quality,
            "reasoning_length": len(decision.reasoning),
            "tool_count": len(decision.tool_calls),
            "alternatives_considered": len(decision.alternatives)
        }

3. Failure Analysis

class FailureAnalyzer:
    """Analyze agent failures"""
    
    def __init__(self):
        self.failure_log = []
    
    async def analyze_failure(
        self, 
        request: Dict, 
        error: Exception,
        execution_trace: List[Dict]
    ) -> FailureReport:
        """Analyze what went wrong"""
        
        # Categorize failure
        failure_type = self._categorize_failure(error, execution_trace)
        
        # Find root cause
        root_cause = self._find_root_cause(
            failure_type, 
            error, 
            execution_trace
        )
        
        # Suggest fixes
        suggestions = self._suggest_fixes(failure_type, root_cause)
        
        report = FailureReport(
            failure_type=failure_type,
            root_cause=root_cause,
            suggestions=suggestions,
            execution_trace=execution_trace,
            severity=self._calculate_severity(failure_type)
        )
        
        self.failure_log.append(report)
        return report
    
    def _categorize_failure(
        self, 
        error: Exception, 
        trace: List[Dict]
    ) -> str:
        """Categorize the failure type"""
        
        error_msg = str(error).lower()
        
        if "timeout" in error_msg:
            return "timeout"
        elif "rate limit" in error_msg:
            return "rate_limit"
        elif "permission" in error_msg:
            return "permission_error"
        elif "invalid input" in error_msg:
            return "invalid_input"
        elif "api" in error_msg:
            return "api_error"
        else:
            return "unknown_error"
    
    def _find_root_cause(
        self, 
        failure_type: str, 
        error: Exception,
        trace: List[Dict]
    ) -> str:
        """Find root cause in trace"""
        
        if failure_type == "timeout":
            # Find slowest operation
            durations = []
            for event in trace:
                if event.get("duration_ms", 0) > 10000:
                    durations.append(event)
            return f"Timeout in: {durations[0]['type'] if durations else 'unknown'}"
        
        # ... more analysis
        
        return str(error)

Alerting

Setting Up Alerts

class AgentAlerts:
    """Alerting for agent issues"""
    
    def __init__(self):
        self.thresholds = {
            "latency_p95": 5000,  # ms
            "error_rate": 0.05,  # 5%
            "tool_failure_rate": 0.1,  # 10%
            "context_token_limit": 0.9  # 90% of max
        }
        self.notifiers = []
    
    def add_notifier(self, notifier):
        self.notifiers.append(notifier)
    
    async def check_metrics(self, metrics: MetricsSnapshot):
        """Check metrics against thresholds"""
        
        alerts = []
        
        # Latency
        if metrics.latency_p95 > self.thresholds["latency_p95"]:
            alerts.append(Alert(
                severity="warning",
                type="high_latency",
                message=f"P95 latency {metrics.latency_p95}ms exceeds threshold"
            ))
        
        # Error rate
        if metrics.error_rate > self.thresholds["error_rate"]:
            alerts.append(Alert(
                severity="critical",
                type="high_error_rate",
                message=f"Error rate {metrics.error_rate:.1%} exceeds threshold"
            ))
        
        # Send alerts
        for alert in alerts:
            for notifier in self.notifiers:
                await notifier.send(alert)

Best Practices

Good: Comprehensive Logging

# Good: Log at appropriate levels
logger.debug("Detailed reasoning", reasoning=reasoning)
logger.info("Tool call", tool=tool_name, params=params)
logger.warning("Retrying after failure", attempt=attempt)
logger.error("Agent failed", error=str(error))

Bad: Too Much or Too Little

# Bad: Logging everything
logger.info("Agent starting")  # Too much
logger.info("Agent completed")  # Missing context

# Good: Meaningful logging
logger.info("Agent request", 
    request_type=request.type,
    has_context=bool(context),
    tool_count=len(tool_calls)
)

Good: Distributed Context

# Good: Include correlation IDs
def log_with_context(logger, request_id: str, **kwargs):
    logger.info(
        "event",
        request_id=request_id,
        **kwargs
    )

Tools Comparison

Tool Type Best For Cost
Langfuse Platform Full-stack observability Free tier + paid
LangSmith Platform LLM application tracing Free tier + paid
Arize Platform Enterprise ML observability Paid
OpenTelemetry Library Custom implementation Free
Datadog Platform Full-stack monitoring Paid
Grafana Platform Metrics + visualization Free

Conclusion

Observability is essential for production agents:

  1. Structured logging - Track events with context
  2. Distributed tracing - Understand execution flow
  3. Metrics - Measure performance and health
  4. Debugging tools - Replay and explain decisions
  5. Alerting - Proactive issue detection

Invest in observability from day one.


Comments