Skip to main content
โšก Calmops

Building Production AI Agents: From Prototype to Production

Introduction

Building an AI agent prototype is one thing. Running it in production is another. Production AI agents face challenges that don’t appear in development: scaling, reliability, monitoring, security, and cost management.

This guide covers everything you need to deploy and operate AI agents at scale.


Production Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                 PRODUCTION AGENT ARCHITECTURE                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                      โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚
โ”‚   โ”‚  Users  โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  API    โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  Load   โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  Agent  โ”‚     โ”‚
โ”‚   โ”‚         โ”‚     โ”‚  Gateway โ”‚     โ”‚ Balancerโ”‚     โ”‚  Pool   โ”‚     โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜     โ”‚
โ”‚                                                        โ”‚            โ”‚
โ”‚         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚         โ”‚                                               โ”‚        โ”‚  โ”‚
โ”‚         โ–ผ                                               โ–ผ        โ–ผ  โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚   โ”‚   Memory    โ”‚                              โ”‚   Tools &    โ”‚   โ”‚
โ”‚   โ”‚  (Redis)    โ”‚                              โ”‚   Services   โ”‚   โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                      โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚   โ”‚                  OBSERVABILITY                                 โ”‚  โ”‚
โ”‚   โ”‚   Metrics โ”‚ Logs โ”‚ Traces โ”‚ Alerts                          โ”‚  โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Core Components

1. API Gateway

# API Gateway for agents
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import rate_limit

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://your-app.com"],
    allow_credentials=True,
)

@app.post("/agent/run")
@rate_limit(max_requests=100, window=60)
async def run_agent(request: Request):
    body = await request.json()
    
    # Authenticate
    user = await authenticate(request.headers)
    
    # Validate input
    validated = validate_input(body)
    
    # Queue for processing
    job = await queue.enqueue(
        "run_agent",
        user_id=user.id,
        input=validated
    )
    
    return {"job_id": job.id, "status": "queued"}

2. Agent Pool

# Agent pool for scaling
import asyncio
from dataclasses import dataclass

@dataclass
class AgentInstance:
    id: str
    agent: Any
    status: str = "idle"
    current_request: Optional[str] = None

class AgentPool:
    def __init__(self, size: int = 10):
        self.agents = [AgentInstance(id=i, agent=create_agent()) 
                      for i in range(size)]
        self.lock = asyncio.Lock()
    
    async def acquire(self) -> AgentInstance:
        """Get available agent"""
        async with self.lock:
            for agent in self.agents:
                if agent.status == "idle":
                    agent.status = "busy"
                    return agent
            # All busy - wait or scale
            raise NoAgentsAvailable()
    
    async def release(self, agent: AgentInstance):
        """Release agent back to pool"""
        async with self.lock:
            agent.status = "idle"
            agent.current_request = None
    
    async def scale_up(self, count: int):
        """Add more agents"""
        for i in range(count):
            self.agents.append(
                AgentInstance(id=len(self.agents), agent=create_agent())
            )
    
    async def scale_down(self, count: int):
        """Remove agents"""
        idle_agents = [a for a in self.agents if a.status == "idle"]
        for agent in idle_agents[:count]:
            self.agents.remove(agent)

3. Memory Management

# Distributed memory with Redis
import redis
import json

class AgentMemory:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
    
    async def save_context(self, session_id: str, context: dict, ttl: int = 3600):
        """Save conversation context"""
        key = f"memory:{session_id}"
        await self.redis.setex(
            key,
            ttl,
            json.dumps(context)
        )
    
    async def load_context(self, session_id: str) -> dict:
        """Load conversation context"""
        key = f"memory:{session_id}"
        data = await self.redis.get(key)
        return json.loads(data) if data else {}
    
    async def delete_context(self, session_id: str):
        """Clear context"""
        key = f"memory:{session_id}"
        await self.redis.delete(key)

Reliability Patterns

1. Retry Logic

import asyncio
from typing import TypeVar, Callable

T = TypeVar('T')

async def with_retry(
    func: Callable[..., T],
    max_retries: int = 3,
    base_delay: float = 1.0,
    exponential_base: float = 2.0
) -> T:
    """Retry with exponential backoff"""
    
    for attempt in range(max_retries):
        try:
            return await func()
        except RetryableError as e:
            if attempt == max_retries - 1:
                raise
            
            delay = base_delay * (exponential_base ** attempt)
            await asyncio.sleep(delay)
            
            # Log retry
            logger.warning(
                f"Retry {attempt + 1}/{max_retries} after {delay}s"
            )
        except NonRetryableError:
            raise

# Usage
async def call_agent(input_data):
    return await with_retry(
        lambda: agent.execute(input_data),
        max_retries=3
    )

2. Circuit Breaker

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        half_open_requests: int = 3
    ):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None
        self.half_open_successes = 0
        self.half_open_requests = half_open_requests
    
    async def call(self, func):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_successes = 0
            else:
                raise CircuitOpenError()
        
        try:
            result = await func()
            
            if self.state == CircuitState.HALF_OPEN:
                self.half_open_successes += 1
                if self.half_open_successes >= self.half_open_requests:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
            
            return result
            
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise

3. Rate Limiting

# Token bucket rate limiter
import time
from dataclasses import dataclass

@dataclass
class RateLimitConfig:
    requests_per_minute: int
    burst: int

class TokenBucketLimiter:
    def __init__(self, config: RateLimitConfig):
        self.rate = config.requests_per_minute / 60
        self.burst = config.burst
        self.tokens = self.burst
        self.last_update = time.time()
        self.lock = asyncio.Lock()
    
    async def acquire(self) -> bool:
        async with self.lock:
            now = time.time()
            elapsed = now - self.last_update
            self.tokens = min(
                self.burst,
                self.tokens + elapsed * self.rate
            )
            self.last_update = now
            
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False
    
    async def wait_for_token(self, timeout: float = 30):
        """Wait until token available"""
        start = time.time()
        while time.time() - start < timeout:
            if await self.acquire():
                return
            await asyncio.sleep(0.1)
        raise RateLimitExceeded()

Monitoring & Observability

1. Metrics Collection

from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
agent_requests = Counter(
    'agent_requests_total',
    'Total agent requests',
    ['status', 'agent_type']
)

agent_latency = Histogram(
    'agent_latency_seconds',
    'Agent request latency',
    ['agent_type'],
    buckets=[0.1, 0.5, 1, 2, 5, 10]
)

active_agents = Gauge(
    'active_agents',
    'Number of active agents',
    ['agent_type']
)

# Instrument agent
class InstrumentedAgent:
    def __init__(self, agent, agent_type: str = "default"):
        self.agent = agent
        self.agent_type = agent_type
    
    async def execute(self, input_data):
        active_agents.labels(self.agent_type).inc()
        start = time.time()
        
        try:
            result = await self.agent.execute(input_data)
            agent_requests.labels(status="success", agent_type=self.agent_type).inc()
            return result
        except Exception as e:
            agent_requests.labels(status="error", agent_type=self.agent_type).inc()
            raise
        finally:
            agent_latency.labels(agent_type=self.agent_type).observe(
                time.time() - start
            )
            active_agents.labels(self.agent_type).dec()

2. Structured Logging

import structlog

structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

# Log agent requests
async def log_agent_execution(agent, input_data, result):
    logger.info(
        "agent_execution",
        agent_type=agent.type,
        input_length=len(input_data),
        output_length=len(result),
        duration_ms=result.duration,
        success=result.success
    )

3. Distributed Tracing

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

class TracedAgent:
    def __init__(self, agent):
        self.agent = agent
    
    async def execute(self, input_data):
        with tracer.start_as_current_span("agent_execution") as span:
            span.set_attribute("agent.type", self.agent.type)
            span.set_attribute("input.size", len(input_data))
            
            try:
                result = await self.agent.execute(input_data)
                span.set_status(Status(StatusCode.OK))
                span.set_attribute("output.size", len(result))
                return result
            except Exception as e:
                span.set_status(Status(StatusCode.ERROR), str(e))
                span.record_exception(e)
                raise

Security

1. Input Validation

from pydantic import BaseModel, validator
import re

class AgentInput(BaseModel):
    user_message: str
    session_id: str
    
    @validator('user_message')
    def validate_message(cls, v):
        if not v or len(v.strip()) == 0:
            raise ValueError("Message cannot be empty")
        
        if len(v) > 10000:
            raise ValueError("Message too long (max 10000 chars)")
        
        # Check for prompt injection
        if re.search(r"(ignore previous|ignore all|prompt:", v, re.I):
            raise ValueError("Invalid input detected")
        
        return v

# Usage
@app.post("/agent/run")
async def run_agent(input_data: AgentInput):
    result = await agent.execute(input_data.user_message)
    return {"result": result}

2. Tool Permissions

# Define allowed tools per user role
TOOL_PERMISSIONS = {
    "user": ["search", "read_file"],
    "admin": ["search", "read_file", "write_file", "execute_command"],
    "superadmin": ["*"]
}

class ToolPermissionChecker:
    def __init__(self, user_role: str):
        self.allowed_tools = TOOL_PERMISSIONS.get(user_role, [])
    
    def can_use_tool(self, tool_name: str) -> bool:
        if "*" in self.allowed_tools:
            return True
        return tool_name in self.allowed_tools
    
    def filter_tools(self, tools: list) -> list:
        return [t for t in tools if self.can_use_tool(t["name"])]

3. Audit Logging

# Audit log for compliance
class AuditLogger:
    async def log(self, event: str, user_id: str, details: dict):
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "event": event,
            "user_id": user_id,
            "details": details,
            "ip_address": get_client_ip()
        }
        
        # Write to secure audit log
        await self.audit_log.write(entry)
        
        # Also log for security monitoring
        await self.security_log.warn(entry)

audit = AuditLogger()

# Usage
async def execute_tool(user_id, tool_name, tool_input):
    await audit.log(
        "tool_execution",
        user_id,
        {"tool": tool_name, "input_size": len(tool_input)}
    )

Cost Management

1. Token Tracking

# Track token usage per request
class TokenTracker:
    def __init__(self):
        self.total_tokens = 0
        self.prompt_tokens = 0
        self.completion_tokens = 0
    
    async def track(self, model: str, prompt_tokens: int, completion_tokens: int):
        cost = self.calculate_cost(model, prompt_tokens, completion_tokens)
        
        await self.db.log_usage(
            model=model,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            cost=cost
        )
    
    def calculate_cost(self, model, prompt, completion):
        rates = {
            "gpt-4o": {"prompt": 0.005, "completion": 0.015},
            "gpt-4o-mini": {"prompt": 0.0015, "completion": 0.004},
            "claude-3.5-sonnet": {"prompt": 0.003, "completion": 0.015}
        }
        
        rate = rates.get(model, {"prompt": 0.01, "completion": 0.03})
        return (prompt / 1000 * rate["prompt"]) + (completion / 1000 * rate["completion"])

2. Budget Alerts

# Budget management
class BudgetManager:
    def __init__(self, monthly_budget: float):
        self.budget = monthly_budget
        self.spent = 0
        self.alert_thresholds = [0.5, 0.75, 0.9, 1.0]
    
    async def check_and_alert(self, amount: float):
        self.spent += amount
        
        percentage = self.spent / self.budget
        
        for threshold in self.alert_thresholds:
            if percentage >= threshold:
                await self.send_alert(threshold, self.spent, self.budget)
    
    async def can_spend(self, amount: float) -> bool:
        return (self.spent + amount) <= self.budget

Deployment

Docker

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes

# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-agent
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ai-agent
  template:
    metadata:
      labels:
        app: ai-agent
    spec:
      containers:
      - name: agent
        image: your-registry/ai-agent:latest
        resources:
          limits:
            memory: "2Gi"
            cpu: "1000m"
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai
        - name: REDIS_URL
          valueFrom:
            configMapKeyRef:
              name: agent-config
              key: redis_url

Best Practices

Good: Health Checks

# Health check endpoint
@app.get("/health")
async def health_check():
    # Check all dependencies
    checks = {
        "redis": await check_redis(),
        "llm": await check_llm(),
        "agents": await check_agents()
    }
    
    all_healthy = all(c["healthy"] for c in checks.values())
    
    return {
        "status": "healthy" if all_healthy else "degraded",
        "checks": checks
    }

Bad: No Error Handling

# Bad: Silent failures
async def run_agent(input_data):
    result = await agent.execute(input_data)  # No try/catch
    return result

Good: Graceful Degradation

# Good: Fallback behavior
async def run_agent(input_data):
    try:
        # Try primary agent
        return await primary_agent.execute(input_data)
    except PrimaryAgentError:
        # Fallback to simpler agent
        try:
            return await fallback_agent.execute(input_data)
        except FallbackError:
            # Last resort - return error message
            return {"error": "Service temporarily unavailable"}

Conclusion

Production AI agents require careful planning around:

  • Reliability - Retries, circuit breakers, timeouts
  • Monitoring - Metrics, logs, traces
  • Security - Input validation, tool permissions, audit logs
  • Cost - Token tracking, budget alerts
  • Scaling - Agent pools, load balancing

Start with the basics, add complexity as needed, and always plan for failure.


Comments