Introduction
Building an AI agent prototype is one thing. Running it in production is another. Production AI agents face challenges that don’t appear in development: scaling, reliability, monitoring, security, and cost management.
This guide covers everything you need to deploy and operate AI agents at scale.
Production Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ PRODUCTION AGENT ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Users โโโโโโถโ API โโโโโโถโ Load โโโโโโถโ Agent โ โ
โ โ โ โ Gateway โ โ Balancerโ โ Pool โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโฌโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Memory โ โ Tools & โ โ
โ โ (Redis) โ โ Services โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ OBSERVABILITY โ โ
โ โ Metrics โ Logs โ Traces โ Alerts โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Core Components
1. API Gateway
# API Gateway for agents
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import rate_limit
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-app.com"],
allow_credentials=True,
)
@app.post("/agent/run")
@rate_limit(max_requests=100, window=60)
async def run_agent(request: Request):
body = await request.json()
# Authenticate
user = await authenticate(request.headers)
# Validate input
validated = validate_input(body)
# Queue for processing
job = await queue.enqueue(
"run_agent",
user_id=user.id,
input=validated
)
return {"job_id": job.id, "status": "queued"}
2. Agent Pool
# Agent pool for scaling
import asyncio
from dataclasses import dataclass
@dataclass
class AgentInstance:
id: str
agent: Any
status: str = "idle"
current_request: Optional[str] = None
class AgentPool:
def __init__(self, size: int = 10):
self.agents = [AgentInstance(id=i, agent=create_agent())
for i in range(size)]
self.lock = asyncio.Lock()
async def acquire(self) -> AgentInstance:
"""Get available agent"""
async with self.lock:
for agent in self.agents:
if agent.status == "idle":
agent.status = "busy"
return agent
# All busy - wait or scale
raise NoAgentsAvailable()
async def release(self, agent: AgentInstance):
"""Release agent back to pool"""
async with self.lock:
agent.status = "idle"
agent.current_request = None
async def scale_up(self, count: int):
"""Add more agents"""
for i in range(count):
self.agents.append(
AgentInstance(id=len(self.agents), agent=create_agent())
)
async def scale_down(self, count: int):
"""Remove agents"""
idle_agents = [a for a in self.agents if a.status == "idle"]
for agent in idle_agents[:count]:
self.agents.remove(agent)
3. Memory Management
# Distributed memory with Redis
import redis
import json
class AgentMemory:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def save_context(self, session_id: str, context: dict, ttl: int = 3600):
"""Save conversation context"""
key = f"memory:{session_id}"
await self.redis.setex(
key,
ttl,
json.dumps(context)
)
async def load_context(self, session_id: str) -> dict:
"""Load conversation context"""
key = f"memory:{session_id}"
data = await self.redis.get(key)
return json.loads(data) if data else {}
async def delete_context(self, session_id: str):
"""Clear context"""
key = f"memory:{session_id}"
await self.redis.delete(key)
Reliability Patterns
1. Retry Logic
import asyncio
from typing import TypeVar, Callable
T = TypeVar('T')
async def with_retry(
func: Callable[..., T],
max_retries: int = 3,
base_delay: float = 1.0,
exponential_base: float = 2.0
) -> T:
"""Retry with exponential backoff"""
for attempt in range(max_retries):
try:
return await func()
except RetryableError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (exponential_base ** attempt)
await asyncio.sleep(delay)
# Log retry
logger.warning(
f"Retry {attempt + 1}/{max_retries} after {delay}s"
)
except NonRetryableError:
raise
# Usage
async def call_agent(input_data):
return await with_retry(
lambda: agent.execute(input_data),
max_retries=3
)
2. Circuit Breaker
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_requests: int = 3
):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
self.half_open_successes = 0
self.half_open_requests = half_open_requests
async def call(self, func):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_successes = 0
else:
raise CircuitOpenError()
try:
result = await func()
if self.state == CircuitState.HALF_OPEN:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
3. Rate Limiting
# Token bucket rate limiter
import time
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
requests_per_minute: int
burst: int
class TokenBucketLimiter:
def __init__(self, config: RateLimitConfig):
self.rate = config.requests_per_minute / 60
self.burst = config.burst
self.tokens = self.burst
self.last_update = time.time()
self.lock = asyncio.Lock()
async def acquire(self) -> bool:
async with self.lock:
now = time.time()
elapsed = now - self.last_update
self.tokens = min(
self.burst,
self.tokens + elapsed * self.rate
)
self.last_update = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
async def wait_for_token(self, timeout: float = 30):
"""Wait until token available"""
start = time.time()
while time.time() - start < timeout:
if await self.acquire():
return
await asyncio.sleep(0.1)
raise RateLimitExceeded()
Monitoring & Observability
1. Metrics Collection
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
agent_requests = Counter(
'agent_requests_total',
'Total agent requests',
['status', 'agent_type']
)
agent_latency = Histogram(
'agent_latency_seconds',
'Agent request latency',
['agent_type'],
buckets=[0.1, 0.5, 1, 2, 5, 10]
)
active_agents = Gauge(
'active_agents',
'Number of active agents',
['agent_type']
)
# Instrument agent
class InstrumentedAgent:
def __init__(self, agent, agent_type: str = "default"):
self.agent = agent
self.agent_type = agent_type
async def execute(self, input_data):
active_agents.labels(self.agent_type).inc()
start = time.time()
try:
result = await self.agent.execute(input_data)
agent_requests.labels(status="success", agent_type=self.agent_type).inc()
return result
except Exception as e:
agent_requests.labels(status="error", agent_type=self.agent_type).inc()
raise
finally:
agent_latency.labels(agent_type=self.agent_type).observe(
time.time() - start
)
active_agents.labels(self.agent_type).dec()
2. Structured Logging
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# Log agent requests
async def log_agent_execution(agent, input_data, result):
logger.info(
"agent_execution",
agent_type=agent.type,
input_length=len(input_data),
output_length=len(result),
duration_ms=result.duration,
success=result.success
)
3. Distributed Tracing
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
class TracedAgent:
def __init__(self, agent):
self.agent = agent
async def execute(self, input_data):
with tracer.start_as_current_span("agent_execution") as span:
span.set_attribute("agent.type", self.agent.type)
span.set_attribute("input.size", len(input_data))
try:
result = await self.agent.execute(input_data)
span.set_status(Status(StatusCode.OK))
span.set_attribute("output.size", len(result))
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR), str(e))
span.record_exception(e)
raise
Security
1. Input Validation
from pydantic import BaseModel, validator
import re
class AgentInput(BaseModel):
user_message: str
session_id: str
@validator('user_message')
def validate_message(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError("Message cannot be empty")
if len(v) > 10000:
raise ValueError("Message too long (max 10000 chars)")
# Check for prompt injection
if re.search(r"(ignore previous|ignore all|prompt:", v, re.I):
raise ValueError("Invalid input detected")
return v
# Usage
@app.post("/agent/run")
async def run_agent(input_data: AgentInput):
result = await agent.execute(input_data.user_message)
return {"result": result}
2. Tool Permissions
# Define allowed tools per user role
TOOL_PERMISSIONS = {
"user": ["search", "read_file"],
"admin": ["search", "read_file", "write_file", "execute_command"],
"superadmin": ["*"]
}
class ToolPermissionChecker:
def __init__(self, user_role: str):
self.allowed_tools = TOOL_PERMISSIONS.get(user_role, [])
def can_use_tool(self, tool_name: str) -> bool:
if "*" in self.allowed_tools:
return True
return tool_name in self.allowed_tools
def filter_tools(self, tools: list) -> list:
return [t for t in tools if self.can_use_tool(t["name"])]
3. Audit Logging
# Audit log for compliance
class AuditLogger:
async def log(self, event: str, user_id: str, details: dict):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": event,
"user_id": user_id,
"details": details,
"ip_address": get_client_ip()
}
# Write to secure audit log
await self.audit_log.write(entry)
# Also log for security monitoring
await self.security_log.warn(entry)
audit = AuditLogger()
# Usage
async def execute_tool(user_id, tool_name, tool_input):
await audit.log(
"tool_execution",
user_id,
{"tool": tool_name, "input_size": len(tool_input)}
)
Cost Management
1. Token Tracking
# Track token usage per request
class TokenTracker:
def __init__(self):
self.total_tokens = 0
self.prompt_tokens = 0
self.completion_tokens = 0
async def track(self, model: str, prompt_tokens: int, completion_tokens: int):
cost = self.calculate_cost(model, prompt_tokens, completion_tokens)
await self.db.log_usage(
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cost=cost
)
def calculate_cost(self, model, prompt, completion):
rates = {
"gpt-4o": {"prompt": 0.005, "completion": 0.015},
"gpt-4o-mini": {"prompt": 0.0015, "completion": 0.004},
"claude-3.5-sonnet": {"prompt": 0.003, "completion": 0.015}
}
rate = rates.get(model, {"prompt": 0.01, "completion": 0.03})
return (prompt / 1000 * rate["prompt"]) + (completion / 1000 * rate["completion"])
2. Budget Alerts
# Budget management
class BudgetManager:
def __init__(self, monthly_budget: float):
self.budget = monthly_budget
self.spent = 0
self.alert_thresholds = [0.5, 0.75, 0.9, 1.0]
async def check_and_alert(self, amount: float):
self.spent += amount
percentage = self.spent / self.budget
for threshold in self.alert_thresholds:
if percentage >= threshold:
await self.send_alert(threshold, self.spent, self.budget)
async def can_spend(self, amount: float) -> bool:
return (self.spent + amount) <= self.budget
Deployment
Docker
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Kubernetes
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-agent
spec:
replicas: 3
selector:
matchLabels:
app: ai-agent
template:
metadata:
labels:
app: ai-agent
spec:
containers:
- name: agent
image: your-registry/ai-agent:latest
resources:
limits:
memory: "2Gi"
cpu: "1000m"
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai
- name: REDIS_URL
valueFrom:
configMapKeyRef:
name: agent-config
key: redis_url
Best Practices
Good: Health Checks
# Health check endpoint
@app.get("/health")
async def health_check():
# Check all dependencies
checks = {
"redis": await check_redis(),
"llm": await check_llm(),
"agents": await check_agents()
}
all_healthy = all(c["healthy"] for c in checks.values())
return {
"status": "healthy" if all_healthy else "degraded",
"checks": checks
}
Bad: No Error Handling
# Bad: Silent failures
async def run_agent(input_data):
result = await agent.execute(input_data) # No try/catch
return result
Good: Graceful Degradation
# Good: Fallback behavior
async def run_agent(input_data):
try:
# Try primary agent
return await primary_agent.execute(input_data)
except PrimaryAgentError:
# Fallback to simpler agent
try:
return await fallback_agent.execute(input_data)
except FallbackError:
# Last resort - return error message
return {"error": "Service temporarily unavailable"}
Conclusion
Production AI agents require careful planning around:
- Reliability - Retries, circuit breakers, timeouts
- Monitoring - Metrics, logs, traces
- Security - Input validation, tool permissions, audit logs
- Cost - Token tracking, budget alerts
- Scaling - Agent pools, load balancing
Start with the basics, add complexity as needed, and always plan for failure.
Related Articles
- AI Agent Trends 2026
- AI Agent Frameworks Comparison
- Building AI Agents with LangGraph
- Introduction to Agentic AI
Comments