Introduction
The shift from passive AI assistants to autonomous agents represents one of the most significant transformations in enterprise AI. While traditional LLM applications respond to single requests, AI agents can plan, execute, and iterate on complex multi-step tasks. However, this autonomy introduces new challenges: how do you ensure reliability? How do you monitor decision-making? How do you control costs when agents can make unlimited tool calls?
This guide covers everything you need to deploy and operate AI agents in production, from architectural patterns to operational best practices.
Understanding AI Agents in Production
What Makes Production Agents Different
Development-time agents and production agents face fundamentally different challenges:
| Aspect | Dev/Prototype | Production |
|---|---|---|
| Execution | Single turn | Multi-turn, long-running |
| Tools | Limited set | Dynamic, secure access |
| Monitoring | Print statements | Comprehensive observability |
| Error Handling | Manual recovery | Automated fallbacks |
| Cost | Not tracked | Strict budgets |
| Compliance | Ignored | Built-in |
| Reliability | 80% OK | 99.9% required |
Agent Architecture Layers
A production-ready agent system consists of multiple layers:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Agent Layer โ
โ Planning, Reasoning, Tool Selection โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Memory Layer โ
โ Short-term, Long-term, Context Management โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Tool Layer โ
โ Tool Registry, Execution, Safety โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Execution Layer โ
โ Async Tasks, Rate Limiting, Circuit Breakers โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Observability Layer โ
โ Tracing, Metrics, Logging, Alerting โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Core Agent Components
1. Agent Orchestrator
The orchestrator manages agent lifecycle and decision-making:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from enum import Enum
import asyncio
import structlog
logger = structlog.get_logger()
class AgentState(Enum):
IDLE = "idle"
THINKING = "thinking"
EXECUTING = "executing"
WAITING = "waiting"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentStep:
step_number: int
action: str
tool_name: Optional[str]
input_params: Dict
output: Optional[Any]
error: Optional[str]
duration_ms: float
tokens_used: int
@dataclass
class AgentExecution:
execution_id: str
user_id: str
task: str
state: AgentState
steps: List[AgentStep] = field(default_factory=list)
total_tokens: int = 0
total_cost: float = 0.0
started_at: float = 0.0
completed_at: Optional[float] = None
@property
def duration_seconds(self) -> float:
if self.completed_at:
return self.completed_at - self.started_at
return 0.0
class AgentOrchestrator:
def __init__(self, llm_client, tool_registry, config: Dict):
self.llm = llm_client
self.tools = tool_registry
self.config = config
self.max_steps = config.get('max_steps', 20)
self.max_tokens_per_step = config.get('max_tokens_per_step', 4000)
self.execution_timeout = config.get('execution_timeout', 300)
async def execute(self, task: str, context: Dict = None) -> AgentExecution:
execution = AgentExecution(
execution_id=self._generate_id(),
user_id=context.get('user_id', 'anonymous') if context else 'anonymous',
task=task,
state=AgentState.THINKING,
started_at=asyncio.get_event_loop().time()
)
current_plan = await self._create_initial_plan(task, context)
for step_num in range(1, self.max_steps + 1):
if execution.state == AgentState.COMPLETED:
break
step = await self._execute_step(
execution, step_num, current_plan, context
)
execution.steps.append(step)
if step.error:
logger.warning("step_failed",
execution_id=execution.execution_id,
step=step_num,
error=step.error)
# Check for termination conditions
if self._should_terminate(step, current_plan):
execution.state = AgentState.COMPLETED
break
execution.completed_at = asyncio.get_event_loop().time()
if execution.state != AgentState.COMPLETED:
execution.state = AgentState.FAILED
return execution
async def _create_initial_plan(self, task: str,
context: Optional[Dict]) -> List[Dict]:
prompt = f"""Create a detailed plan to accomplish this task:
Task: {task}
Context: {context or 'No additional context'}
Provide a step-by-step plan, identifying:
1. Each step needed
2. The tool/action for each step
3. Dependencies between steps
Format your response as a JSON array of steps."""
response = await self.llm.agenerate(
messages=[{'role': 'user', 'content': prompt}],
max_tokens=2000
)
return self._parse_plan(response.content)
async def _execute_step(self, execution: AgentExecution,
step_num: int,
plan: List[Dict],
context: Optional[Dict]) -> AgentStep:
import time
start_time = time.time()
try:
# Get current state from context
current_state = self._build_current_state(execution, context)
# Determine next action
action_prompt = f"""
Task: {execution.task}
Plan:
{self._format_plan(plan)}
Current execution state:
{current_state}
What is the next action? Respond with:
1. Action type: think, tool_use, or complete
2. If tool_use: tool name and parameters
3. If think: what you're considering
4. Reasoning for your choice
"""
action_response = await self.llm.agenerate(
messages=[{'role': 'user', 'content': action_prompt}],
max_tokens=self.max_tokens_per_step
)
action = self._parse_action(action_response.content)
if action['type'] == 'complete':
execution.state = AgentState.COMPLETED
return AgentStep(
step_number=step_num,
action='complete',
tool_name=None,
input_params={},
output='Task completed successfully',
error=None,
duration_ms=(time.time() - start_time) * 1000,
tokens_used=action_response.usage.total_tokens
)
if action['type'] == 'tool_use':
tool_name = action['tool_name']
params = action['parameters']
# Execute tool with safety checks
result = await self._execute_tool(
tool_name, params, execution
)
execution.total_tokens += action_response.usage.total_tokens
return AgentStep(
step_number=step_num,
action='tool_use',
tool_name=tool_name,
input_params=params,
output=result,
error=None,
duration_ms=(time.time() - start_time) * 1000,
tokens_used=action_response.usage.total_tokens
)
# Think step
return AgentStep(
step_number=step_num,
action='think',
tool_name=None,
input_params={},
output=action.get('thought'),
error=None,
duration_ms=(time.time() - start_time) * 1000,
tokens_used=action_response.usage.total_tokens
)
except Exception as e:
return AgentStep(
step_number=step_num,
action='error',
tool_name=None,
input_params={},
output=None,
error=str(e),
duration_ms=(time.time() - start_time) * 1000,
tokens_used=0
)
2. Tool Registry and Execution
Secure tool management is critical for production agents:
from typing import Callable, Any, Dict
import json
class ToolDefinition:
def __init__(self, name: str, description: str,
parameters: Dict, handler: Callable,
requires_approval: bool = False,
rate_limit: int = 100):
self.name = name
self.description = description
self.parameters = parameters
self.handler = handler
self.requires_approval = requires_approval
self.rate_limit = rate_limit
def validate_params(self, params: Dict) -> bool:
required = self.parameters.get('required', [])
for req in required:
if req not in params:
return False
return True
class ToolRegistry:
def __init__(self):
self.tools: Dict[str, ToolDefinition] = {}
self.usage_counts: Dict[str, int] = {}
def register(self, tool: ToolDefinition):
self.tools[tool.name] = tool
self.usage_counts[tool.name] = 0
def get_tool(self, name: str) -> Optional[ToolDefinition]:
return self.tools.get(name)
def list_tools(self) -> List[Dict]:
return [
{
'name': t.name,
'description': t.description,
'parameters': t.parameters,
'requires_approval': t.requires_approval
}
for t in self.tools.values()
]
class SecureToolExecutor:
def __init__(self, tool_registry: ToolRegistry,
approval_service, audit_logger):
self.registry = tool_registry
self.approval_service = approval_service
self.audit = audit_logger
async def execute(self, tool_name: str, params: Dict,
execution_context: AgentExecution) -> Any:
tool = self.registry.get_tool(tool_name)
if not tool:
raise ValueError(f"Tool '{tool_name}' not found")
# Validate parameters
if not tool.validate_params(params):
raise ValueError(f"Invalid parameters for tool '{tool_name}'")
# Check rate limit
if not self._check_rate_limit(tool):
raise ValueError(f"Rate limit exceeded for tool '{tool_name}'")
# Check if approval required
if tool.requires_approval:
approved = await self.approval_service.request(
tool_name=tool_name,
params=params,
user_id=execution_context.user_id,
task=execution_context.task
)
if not approved:
raise PermissionError(
f"Approval denied for tool '{tool_name}'"
)
# Log audit trail
await self.audit.log_tool_execution(
execution_id=execution_context.execution_id,
tool_name=tool_name,
params=self._sanitize_params(params),
user_id=execution_context.user_id
)
# Execute with timeout
try:
result = await asyncio.wait_for(
tool.handler(**params),
timeout=30
)
return result
except asyncio.TimeoutError:
raise TimeoutError(f"Tool '{tool_name}' timed out")
def _check_rate_limit(self, tool: ToolDefinition) -> bool:
current = self.registry.usage_counts.get(tool.name, 0)
return current < tool.rate_limit
def _sanitize_params(self, params: Dict) -> Dict:
# Remove sensitive data before logging
sensitive_keys = ['password', 'token', 'secret', 'api_key']
sanitized = params.copy()
for key in sensitive_keys:
if key in sanitized:
sanitized[key] = '***REDACTED***'
return sanitized
3. Memory Management
Production agents need robust memory systems:
import json
from datetime import datetime, timedelta
class AgentMemory:
def __init__(self, redis_client, vector_store):
self.redis = redis_client
self.vector = vector_store
async def add_to_context(self, execution_id: str,
content: str,
memory_type: str = 'action'):
"""Add to short-term (working) memory"""
key = f"memory:execution:{execution_id}:context"
entry = {
'type': memory_type,
'content': content,
'timestamp': datetime.utcnow().isoformat()
}
await self.redis.rpush(key, json.dumps(entry))
await self.redis.expire(key, 3600) # 1 hour TTL
async def get_context_window(self, execution_id: str,
max_entries: int = 10) -> List[Dict]:
"""Retrieve recent context for agent"""
key = f"memory:execution:{execution_id}:context"
entries = await self.redis.lrange(key, -max_entries, -1)
return [json.loads(e) for e in entries]
async def store_long_term(self, user_id: str,
content: str,
metadata: Dict):
"""Store important information in long-term memory"""
doc_id = f"memory:user:{user_id}:{datetime.utcnow().timestamp()}"
await self.vector.add_documents(
documents=[content],
ids=[doc_id],
metadatas=[{
**metadata,
'user_id': user_id,
'stored_at': datetime.utcnow().isoformat()
}]
)
async def recall_similar(self, user_id: str,
query: str,
limit: int = 5) -> List[Dict]:
"""Retrieve relevant memories"""
results = await self.vector.similarity_search(
query=query,
filter={'user_id': user_id},
k=limit
)
return [
{
'content': doc.page_content,
'metadata': doc.metadata
}
for doc in results
]
Reliability Patterns
Circuit Breaker
Prevent cascading failures:
import asyncio
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60,
success_threshold: int = 2):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
if not self.last_failure_time:
return True
return (datetime.utcnow() - self.last_failure_time).seconds > \
self.recovery_timeout
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Fallback Strategies
Ensure graceful degradation:
class AgentFallback:
def __init__(self):
self.fallbacks = {}
def register_fallback(self, error_type: type,
fallback_fn: Callable):
self.fallbacks[error_type] = fallback_fn
async def execute_with_fallback(self, primary_fn: Callable,
context: AgentExecution) -> Any:
try:
return await primary_fn()
except Exception as e:
# Find appropriate fallback
for error_type, fallback in self.fallbacks.items():
if isinstance(e, error_type):
logger.warning(
"using_fallback",
error_type=error_type.__name__,
execution_id=context.execution_id
)
return await fallback(context)
# No fallback found, re-raise
raise
async def llm_fallback(context: AgentExecution) -> str:
"""Fallback when LLM is unavailable"""
return "I apologize, but I'm temporarily unable to process your request. Please try again in a few moments."
async def tool_fallback(context: AgentExecution) -> str:
"""Fallback when tool execution fails"""
return "I encountered an issue while trying to complete that action. I've noted this and will try an alternative approach."
async def timeout_fallback(context: AgentExecution) -> str:
"""Fallback when execution times out"""
return "The request is taking longer than expected. I've saved your progress and will continue where I left off."
Monitoring and Observability
Agent-Specific Metrics
from dataclasses import dataclass
from typing import Dict, List
import time
@dataclass
class AgentMetrics:
execution_id: str
user_id: str
task_type: str
# Timing
total_duration_ms: float
planning_duration_ms: float
execution_duration_ms: float
# Token usage
prompt_tokens: int
completion_tokens: int
# Tool usage
tools_used: int
tool_failures: int
# Outcomes
succeeded: bool
terminated_reason: str # completed, max_steps, timeout, error
# Quality (if evaluated)
quality_score: float = 0.0
user_satisfaction: float = 0.0
class AgentMetricsCollector:
def __init__(self, metrics_backend):
self.backend = metrics_backend
async def record_execution(self, metrics: AgentMetrics):
# Record to time-series backend
await self.backend.record({
'measurement': 'agent_executions',
'tags': {
'task_type': metrics.task_type,
'succeeded': metrics.succeeded
},
'fields': {
'duration_ms': metrics.total_duration_ms,
'tools_used': metrics.tools_used,
'tool_failures': metrics.tool_failures,
'prompt_tokens': metrics.prompt_tokens,
'completion_tokens': metrics.completion_tokens,
'quality_score': metrics.quality_score
},
'timestamp': time.time()
})
async def get_agent_health(self, time_window_minutes: int = 60) -> Dict:
query = f"""
SELECT
count(*) as total_executions,
sum(case when succeeded then 1 else 0 end) as successes,
avg(duration_ms) as avg_duration,
sum(tools_used) as total_tools_used,
sum(tool_failures) as total_tool_failures
FROM agent_executions
WHERE time > now() - {time_window_minutes}m
"""
result = await self.backend.query(query)
return {
'success_rate': result['successes'] / result['total_executions'],
'avg_duration_ms': result['avg_duration'],
'tool_failure_rate': result['total_tool_failures'] / \
max(result['total_tools_used'], 1),
'total_executions': result['total_executions']
}
Distributed Tracing for Agents
from opentelemetry import trace
from opentelemetry.trace import SpanKind
tracer = trace.get_tracer(__name__)
class AgentTrace:
def __init__(self):
self.span_map = {}
def start_execution_span(self, execution_id: str,
task: str) -> Any:
span = tracer.start_span(
f"agent.execution.{execution_id}",
kind=SpanKind.INTERNAL,
attributes={
"agent.execution_id": execution_id,
"agent.task": task[:100]
}
)
self.span_map[execution_id] = span
return span
def record_step(self, execution_id: str, step: AgentStep):
span = self.span_map.get(execution_id)
if not span:
return
span.add_event(
f"step.{step.step_number}.{step.action}",
attributes={
"step.tool": step.tool_name or "none",
"step.duration_ms": step.duration_ms,
"step.error": step.error or "none",
"step.tokens": step.tokens_used
}
)
def end_execution_span(self, execution_id: str,
succeeded: bool):
span = self.span_map.pop(execution_id, None)
if span:
span.set_attribute("agent.succeeded", succeeded)
span.end()
Security Considerations
Agent Permission System
from enum import Enum
class Permission(Enum):
ALLOW = "allow"
DENY = "deny"
ASK = "ask"
class PermissionMatrix:
def __init__(self):
self.rules = []
def add_rule(self, user_role: str, tool_pattern: str,
resource_pattern: str, permission: Permission):
self.rules.append({
'user_role': user_role,
'tool_pattern': tool_pattern,
'resource_pattern': resource_pattern,
'permission': permission
})
def check_permission(self, user_role: str, tool_name: str,
resource_id: str) -> Permission:
# Check rules in order (first match wins)
for rule in self.rules:
if self._matches_pattern(user_role, rule['user_role']) and \
self._matches_pattern(tool_name, rule['tool_pattern']) and \
self._matches_pattern(resource_id, rule['resource_pattern']):
return rule['permission']
return Permission.DENY # Default deny
def _matches_pattern(self, value: str, pattern: str) -> bool:
if pattern == '*':
return True
if pattern.endswith('*'):
return value.startswith(pattern[:-1])
return value == pattern
# Example permission matrix
permissions = PermissionMatrix()
permissions.add_rule('admin', '*', '*', Permission.ALLOW)
permissions.add_rule('user', 'read_*', '*', Permission.ALLOW)
permissions.add_rule('user', 'write_*', 'own:*', Permission.ALLOW)
permissions.add_rule('user', 'write_*', '*', Permission.DENY)
permissions.add_rule('user', 'delete_*', '*', Permission.ASK)
Cost Management
Budget Enforcement
class AgentBudget:
def __init__(self, user_id: str, monthly_budget: float):
self.user_id = user_id
self.monthly_budget = monthly_budget
self.current_spend = 0.0
self.current_period_start = datetime.utcnow().replace(
day=1, hour=0, minute=0, second=0
)
async def check_budget(self, estimated_cost: float) -> bool:
await self._refresh_period()
if self.current_spend + estimated_cost > self.monthly_budget:
logger.warning(
"budget_exceeded",
user_id=self.user_id,
current_spend=self.current_spend,
estimated_cost=estimated_cost,
budget=self.monthly_budget
)
return False
return True
async def record_cost(self, actual_cost: float):
await self._refresh_period()
self.current_spend += actual_cost
async def _refresh_period(self):
now = datetime.utcnow()
if now.month != self.current_period_start.month:
self.current_spend = 0.0
self.current_period_start = now.replace(
day=1, hour=0, minute=0, second=0
)
class CostControlledExecutor:
def __init__(self, executor: SecureToolExecutor,
budget: AgentBudget):
self.executor = executor
self.budget = budget
async def execute(self, tool_name: str, params: Dict,
context: AgentExecution) -> Any:
# Estimate cost before execution
estimated_cost = self._estimate_cost(tool_name, params)
if not await self.budget.check_budget(estimated_cost):
raise BudgetExceededError(
f"Execution would exceed monthly budget"
)
result = await self.executor.execute(
tool_name, params, context
)
# Record actual cost
actual_cost = self._calculate_actual_cost(tool_name, result)
await self.budget.record_cost(actual_cost)
return result
Best Practices
1. Start with Clear Scope
- Define what agents can and cannot do
- Set explicit boundaries on tool access
- Implement hard limits on execution
2. Build Comprehensive Monitoring
- Track every decision point
- Log all tool calls and results
- Measure quality and user satisfaction
3. Implement Defense in Depth
- Input validation at every layer
- Output filtering for sensitive data
- Audit logging for compliance
4. Plan for Failure
- Always have fallback responses
- Implement circuit breakers
- Save state for recovery
5. Control Costs Aggressively
- Set strict budgets per user
- Monitor token usage in real-time
- Implement caching strategies
Common Pitfalls
1. Unlimited Tool Access
Without strict controls, agents can:
- Make excessive API calls
- Access unauthorized data
- Execute dangerous operations
Always implement permission layers.
2. Ignoring Latency
Agent latency compounds:
- Each tool call adds network overhead
- LLM inference time accumulates
- Long-running tasks frustrate users
Set appropriate timeouts and manage expectations.
3. No Version Control
Production agents change constantly:
- Prompt updates can break behavior
- Tool changes affect capabilities
- Model updates alter outputs
Version everything and implement gradual rollouts.
4. Missing Feedback Loops
Continuous improvement requires:
- User satisfaction tracking
- Success/failure analysis
- A/B testing of strategies
Build feedback collection into the system from day one.
External Resources
- LangChain Agent Documentation
- AutoGen Framework
- CrewAI Agents
- OpenAI Agent SDK
- OpenTelemetry Agent Instrumentation
- Circuit Breaker Pattern
- Agent Evaluation Benchmarks
- Secure AI Agent Design
Conclusion
Deploying AI agents to production requires careful attention to reliability, security, and observability. The patterns and practices outlined in this guide provide a foundation for building production-ready agent systems that are robust, secure, and cost-effective.
Key takeaways:
- Start with clear boundaries and permissions
- Implement comprehensive monitoring from day one
- Plan for failure with circuit breakers and fallbacks
- Control costs with budgets and rate limiting
- Build feedback loops for continuous improvement
The autonomous agent paradigm represents a fundamental shift in AI application development. By applying rigorous operational practices, you can harness this power while managing the associated risks.
Comments