Introduction
Enterprise AI agents differ from prototypes in three critical dimensions: reliability (they must handle failures gracefully), observability (you must know what the agent is doing and why), and governance (you must control what tools and data the agent can access). A prototype that works 80% of the time is a research success; an enterprise agent that works 80% of the time is a production incident waiting to happen.
This guide covers a production agent deployment pattern with tool sandboxing, OpenTelemetry integration for tracing and cost monitoring, a YAML-based governance template for tool allowlists and data classification, and a runbook for common production incidents.
Production Agent Architecture
flowchart TD
subgraph AgentRuntime["Agent Runtime"]
A[Agent Orchestrator<br/>LangGraph / Custom]
T[Tool Registry<br/>with sandboxing]
M[Memory Store<br/>Redis / PostgreSQL]
L[LLM Client<br/>Claude / GPT]
end
subgraph Observability["Observability Stack"]
OT[OpenTelemetry Collector]
Trace[(Trace Store<br/>Jaeger / Grafana)]
Metric[(Metrics<br/>Prometheus)]
Log[(Logs<br/>Loki / ELK)]
end
subgraph Governance["Governance Layer"]
Policy[Policy Engine<br/>OPA / Custom]
Audit[Audit Log]
Cost[Cost Tracker]
end
User[User / API] --> A
A --> T
A --> L
T -->|API calls| External[External Systems<br/>CRM, DB, APIs]
A --> M
A -.-> OT
T -.-> OT
OT --> Trace
OT --> Metric
OT --> Log
A -.-> Policy
Policy -.->|Allow/Deny| T
T -.-> Audit
L -.-> Cost
Deployment Pattern: Agent with Tool Sandboxing
Base Agent Class with Error Handling
import logging
import time
from typing import Any, Dict, List, Optional
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
logger = logging.getLogger("enterprise-agent")
tracer = trace.get_tracer(__name__)
class EnterpriseAgent:
"""Base class for production AI agents with observability and error handling."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.max_retries = config.get("max_retries", 3)
self.tools = self._load_tools(config.get("allowed_tools", []))
def _load_tools(self, tool_names: List[str]) -> Dict:
"""Load only explicitly allowed tools from the registry.
This is the sandboxing boundary — the agent cannot access
any tool not in this allowlist.
"""
registry = {
"search_kb": self._search_knowledge_base,
"get_customer": self._get_customer_data,
"create_ticket": self._create_support_ticket,
"send_email": self._send_email,
"run_sql_query": self._run_readonly_sql,
}
return {name: fn for name, fn in registry.items() if name in tool_names}
@tracer.start_as_current_span("agent.run")
async def run(self, task: str, context: Dict = None) -> Dict:
"""Execute an agent task with tracing and retry logic."""
span = trace.get_current_span()
span.set_attribute("agent.task", task)
span.set_attribute("agent.config", str(self.config))
for attempt in range(self.max_retries):
try:
with tracer.start_as_current_span(f"agent.attempt.{attempt}") as attempt_span:
result = await self._execute(task, context)
attempt_span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
span.add_event("retry", {"attempt": attempt, "error": str(e)})
if attempt == self.max_retries - 1:
span.set_status(Status(StatusCode.ERROR, str(e)))
return {"status": "error", "error": str(e), "task": task}
time.sleep(2 ** attempt) # Exponential backoff
async def _execute(self, task: str, context: Dict) -> Dict:
"""Execute the task — implemented by subclass or LLM orchestration."""
raise NotImplementedError
Tool Implementation with Audit Logging
class CustomerSupportAgent(EnterpriseAgent):
"""Enterprise customer support agent with audit-logged tool calls."""
@tracer.start_as_current_span("tool.search_kb")
async def _search_knowledge_base(self, query: str) -> List[Dict]:
span = trace.get_current_span()
span.set_attribute("kb.query", query)
# Log all tool calls for audit
logger.info(f"TOOL_CALL: search_kb query={query}")
results = await kb_client.search(query, top_k=5)
span.set_attribute("kb.result_count", len(results))
return results
@tracer.start_as_current_span("tool.get_customer")
async def _get_customer_data(self, customer_id: str) -> Dict:
span = trace.get_current_span()
span.set_attribute("customer.id", customer_id)
# Data classification check — PII data requires explicit flag
if not self.config.get("allow_pii_access", False):
span.set_status(Status(StatusCode.ERROR, "PII access denied"))
logger.warning(f"BLOCKED: PII access for customer {customer_id}")
return {"error": "PII access not permitted for this agent configuration"}
data = await crm_client.get_customer(customer_id)
span.set_attribute("customer.exists", data is not None)
return data
@tracer.start_as_current_span("tool.create_ticket")
async def _create_support_ticket(self, customer_id: str, issue: str, priority: str) -> str:
# Rate-limit ticket creation
tickets_this_hour = await self._count_recent_tickets(customer_id, 3600)
if tickets_this_hour >= self.config.get("max_tickets_per_hour", 5):
raise RuntimeError(f"Rate limit exceeded: {tickets_this_hour} tickets in last hour")
ticket_id = await ticketing_client.create(customer_id, issue, priority)
logger.info(f"TOOL_CALL: create_ticket id={ticket_id} priority={priority}")
return ticket_id
OpenTelemetry Monitoring
Deploy the OpenTelemetry collector alongside agents to capture traces, metrics, and logs:
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 1s
attributes:
actions:
- key: environment
value: production
action: upsert
exporters:
otlp:
endpoint: jaeger:4317
tls:
insecure: true
prometheus:
endpoint: 0.0.0.0:8889
loki:
endpoint: http://loki:3100/loki/api/v1/push
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, attributes]
exporters: [otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
Key Metrics to Track
# Prometheus counters for agent observability
from prometheus_client import Counter, Histogram, Gauge
agent_tasks_total = Counter(
"agent_tasks_total", "Total agent tasks executed",
["agent_name", "status"] # status = success/error/timeout
)
agent_task_duration = Histogram(
"agent_task_duration_seconds", "Agent task execution time",
["agent_name", "tool"],
buckets=[0.1, 0.5, 1, 2.5, 5, 10, 30, 60]
)
agent_cost_total = Counter(
"agent_cost_total_usd", "Total LLM API cost in USD",
["agent_name", "model"]
)
agent_active_runs = Gauge(
"agent_active_runs", "Currently executing agent runs",
["agent_name"]
)
Cost Tracking
@tracer.start_as_current_span("llm.call")
async def tracked_llm_call(prompt: str, model: str = "claude-sonnet-4-20260514") -> str:
"""LLM call with automatic cost tracking."""
start = time.time()
response = await anthropic_client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
duration = time.time() - start
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
# Track costs by model pricing
rates = {"claude-sonnet-4-20260514": {"input": 3.00, "output": 15.00}}
rate = rates.get(model, {"input": 3.00, "output": 15.00})
cost = (input_tokens / 1_000_000 * rate["input"] +
output_tokens / 1_000_000 * rate["output"])
agent_cost_total.labels(agent_name=os.getenv("AGENT_NAME"), model=model).inc(cost)
agent_task_duration.labels(agent_name=os.getenv("AGENT_NAME"), tool="llm").observe(duration)
logger.info(f"LLM_CALL model={model} input_tokens={input_tokens} output_tokens={output_tokens} cost=${cost:.6f}")
return response.content[0].text
Governance Template
# agent-config.yaml — enterprise agent governance policy
agent:
name: customer-support-v2
model: claude-sonnet-4-20260514
max_retries: 3
max_concurrent_runs: 10
# Tool allowlist — agent CANNOT use unlisted tools
allowed_tools:
- search_kb
- get_customer
- create_ticket
- send_email
- run_sql_query
# Data classification — what data this agent can access
data_access:
allow_pii: false # No PII access
allow_financial: false # No financial data
allow_internal_docs: true # Internal knowledge base OK
max_records_per_query: 50 # Limit data extraction volume
# Rate limiting
rate_limits:
max_tickets_per_hour: 5
max_emails_per_hour: 10
max_searches_per_minute: 30
# Cost controls
cost_controls:
max_daily_spend_usd: 50.00
max_cost_per_run_usd: 2.00
alert_on_threshold: 0.8 # Alert at 80% of daily budget
Production Runbook
Incident: Agent Returns Consistent Errors
# 1. Check recent traces
kubectl logs -l app=agent -n agents --tail=100 | grep ERROR
# 2. Check tool connectivity
kubectl exec deploy/agent -n agents -- curl -sf http://kb-service:8000/health
# 3. Check rate limits
kubectl exec deploy/agent -n agents -- cat /var/log/agent/rate_limit.log
# 4. Check LLM API quota
curl -H "Authorization: Bearer $ANTHROPIC_KEY" https://api.anthropic.com/v1/me/limits
Incident: Agent Cost Spike
# 1. Check cost metrics
curl -s http://prometheus:9090/api/v1/query \
--data-urlencode 'query=sum(rate(agent_cost_total_usd[1h])) by (agent_name)'
# 2. Identify expensive runs
kubectl logs -l app=agent -n agents | grep "LLM_CALL" | sort -t'=' -k5 -rn | head -10
# 3. Find repeated failed calls (wasted spend)
kubectl logs -l app=agent -n agents | grep "ERROR" | cut -d' ' -f1 | sort | uniq -c | sort -rn | head -5
Resources
- OpenTelemetry Python Documentation — Tracing and metrics setup
- Prometheus Monitoring Best Practices — Metric naming and labels
- LangGraph Production Deployment — Enterprise agent orchestration
- Agent Governance Patterns — Anthropic production patterns
Comments