Skip to main content
โšก Calmops

LLMOps: Operationalizing Large Language Models

Introduction

Large Language Models have transformed software development, enabling applications that were previously impossible. However, the journey from a promising model to a reliable production system is fraught with challenges. LLMOpsโ€”the discipline of operationalizing LLMsโ€”addresses these challenges by applying DevOps principles to the unique requirements of language models.

In 2026, LLMOps has matured into a distinct discipline with its own tools, best practices, and organizational patterns. Unlike traditional MLOps, LLMOps must handle the unique characteristics of generative AI: prompt engineering, token economics, model hallucination, and the rapid evolution of base models. This comprehensive guide explores every aspect of LLMOps, from initial model selection to production monitoring.

The LLMOps Lifecycle

The lifecycle of an LLM-powered application encompasses several distinct phases:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      LLMOps Lifecycle                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚   โ”‚  Model   โ”‚โ”€โ”€โ”€โ–ถโ”‚  Prompt  โ”‚โ”€โ”€โ”€โ–ถโ”‚  Fine-    โ”‚โ”€โ”€โ”€โ–ถโ”‚ Deploy โ”‚  โ”‚
โ”‚   โ”‚ Selectionโ”‚    โ”‚ Design   โ”‚    โ”‚ Tuning    โ”‚    โ”‚        โ”‚  โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                            โ”‚     โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚   โ”‚  Monitor โ”‚โ—€โ”€โ”€โ”€โ”‚  Evaluateโ”‚โ—€โ”€โ”€โ”€โ”‚  Test    โ”‚โ—€โ”€โ”€โ”€โ”‚  Serve  โ”‚  โ”‚
โ”‚   โ”‚          โ”‚    โ”‚          โ”‚    โ”‚          โ”‚    โ”‚         โ”‚  โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Model Selection

Choosing the right model is the foundation of successful LLM deployment. Consider multiple factors:

Model Comparison Framework

from dataclasses import dataclass
from typing import List, Dict, Optional
import json

@dataclass
class ModelRequirements:
    max_tokens: int
    latency_requirement_ms: float
    budget_per_1k_tokens: float
    needed_capabilities: List[str]
    context_length: int
    data_compliance: List[str]

@dataclass
class ModelInfo:
    name: str
    provider: str
    context_length: int
    input_cost_per_1k: float
    output_cost_per_1k: float
    latency_p50_ms: float
    latency_p99_ms: float
    capabilities: List[str]

def select_model(requirements: ModelRequirements, available_models: List[ModelInfo]) -> List[ModelInfo]:
    candidates = []
    
    for model in available_models:
        if model.context_length < requirements.context_length:
            continue
            
        if model.latency_p99_ms > requirements.latency_requirement_ms * 2:
            continue
            
        estimated_cost = (model.input_cost_per_1k + model.output_cost_per_1k) / 2
        if estimated_cost > requirements.budget_per_1k_tokens:
            continue
        
        capability_score = sum(
            1 for cap in requirements.needed_capabilities 
            if cap in model.capabilities
        )
        
        candidates.append((model, capability_score))
    
    return [m for m, _ in sorted(candidates, key=lambda x: x[1], reverse=True)]
Model Provider Context Strengths Best For
GPT-4o OpenAI 128K Balanced, multimodal General purpose
Claude 4 Anthropic 200K Long context, safety Complex reasoning
Gemini 2.5 Google 1M+ Long context, multimodal Research, analysis
Llama 4 Meta 128K Open source Custom fine-tuning
Mistral Large Mistral 128K Fast, European European compliance
Command R+ Cohere 128K Enterprise, RAG Business applications

Cost Optimization Strategies

def estimate_monthly_cost(
    requests_per_day: int,
    avg_input_tokens: int,
    avg_output_tokens: int,
    model: str
) -> float:
    pricing = {
        "gpt-4o": {"input": 0.0025, "output": 0.01},
        "claude-4-sonnet": {"input": 0.003, "output": 0.015},
        "gemini-2.5-pro": {"input": 0.00125, "output": 0.005},
    }
    
    daily_input_cost = requests_per_day * avg_input_tokens / 1000 * pricing[model]["input"]
    daily_output_cost = requests_per_day * avg_output_tokens / 1000 * pricing[model]["output"]
    
    return (daily_input_cost + daily_output_cost) * 30

print(f"Estimated monthly cost: ${estimate_monthly_cost(10000, 500, 200, 'gpt-4o'):.2f}")

Prompt Engineering

Prompt engineering is the art and science of crafting inputs that elicit desired outputs from LLMs. In production systems, prompts become configuration that must be managed with the same rigor as code.

Prompt Management System

from typing import Dict, Any, List, Optional
from datetime import datetime
import hashlib
import json

class PromptManager:
    def __init__(self, storage_path: str = "./prompts"):
        self.storage_path = storage_path
        self.prompts: Dict[str, Dict] = {}
        self._load_prompts()
    
    def register_prompt(
        self,
        name: str,
        template: str,
        description: str,
        variables: List[str],
        examples: List[Dict[str, str]] = None,
        metadata: Dict = None
    ) -> str:
        version = self._generate_version(template)
        prompt_id = f"{name}:{version}"
        
        self.prompts[prompt_id] = {
            "name": name,
            "version": version,
            "template": template,
            "description": description,
            "variables": variables,
            "examples": examples or [],
            "metadata": metadata or {},
            "created_at": datetime.utcnow().isoformat(),
            "hash": hashlib.sha256(template.encode()).hexdigest()[:8]
        }
        
        return prompt_id
    
    def get_prompt(self, name: str, version: str = None, variables: Dict[str, Any] = None) -> str:
        if version:
            prompt_id = f"{name}:{version}"
        else:
            matching = [k for k in self.prompts.keys() if k.startswith(f"{name}:")]
            if not matching:
                raise ValueError(f"Prompt {name} not found")
            prompt_id = max(matching, key=lambda k: self.prompts[k]["created_at"])
        
        template = self.prompts[prompt_id]["template"]
        
        if variables:
            return self._render_template(template, variables)
        
        return template
    
    def _render_template(self, template: str, variables: Dict[str, Any]) -> str:
        try:
            return template.format(**variables)
        except KeyError as e:
            raise ValueError(f"Missing variable: {e}")
    
    def _generate_version(self, template: str) -> str:
        return hashlib.sha256(template.encode()).hexdigest()[:8]
    
    def _load_prompts(self):
        # Load from storage
        pass

Prompt Patterns

Few-Shot Learning

def create_few_shot_prompt(task_description: str, examples: List[Dict], query: str) -> str:
    prompt = f"""Task: {task_description}

Examples:"""
    
    for example in examples:
        prompt += f"""

Input: {example['input']}
Output: {example['output']}"""
    
    prompt += f"""

Now complete the following:

Input: {query}
Output:"""
    
    return prompt

Chain-of-Thought

def create_cot_prompt(question: str, include_instructions: bool = True) -> str:
    instructions = """Let's think step by step.""" if include_instructions else ""
    
    return f"""{instructions}

Question: {question}

Solution:"""

Role-Based Prompting

def create_role_prompt(role: str, context: str, task: str, format_instructions: str = None) -> str:
    prompt = f"""You are a {role}.

{context}

Task: {task}"""
    
    if format_instructions:
        prompt += f"\n\nOutput format:\n{format_instructions}"
    
    return prompt

Fine-Tuning

Fine-tuning adapts pre-trained models to specific tasks or domains. While often unnecessary for many applications, it can significantly improve performance for specialized use cases.

When to Fine-Tune

Consider fine-tuning when:

  • You need consistent style or format that prompt engineering cannot achieve
  • You have sufficient domain-specific training data
  • Latency or cost constraints make larger models impractical
  • You need proprietary behavior that base models don’t provide

Fine-Tuning Process

import json
from typing import List, Dict

def prepare_finetuning_data(
    examples: List[Dict[str, str]],
    format: str = "chatml"
) -> List[Dict]:
    """Prepare training data for fine-tuning."""
    
    if format == "chatml":
        formatted = []
        for ex in examples:
            formatted.append({
                "messages": [
                    {"role": "system", "content": ex.get("system", "You are a helpful assistant.")},
                    {"role": "user", "content": ex["input"]},
                    {"role": "assistant", "content": ex["output"]}
                ]
            })
        return formatted
    
    elif format == "instruction":
        formatted = []
        for ex in examples:
            formatted.append({
                "instruction": ex["instruction"],
                "input": ex.get("input", ""),
                "output": ex["output"]
            })
        return formatted
    
    raise ValueError(f"Unknown format: {format}")

training_data = prepare_finetuning_data([
    {"input": "What is Kubernetes?", "output": "Kubernetes is an open-source container orchestration platform..."},
    {"input": "Explain Docker", "output": "Docker is a platform for developing, shipping, and running applications in containers..."},
])

with open("training_data.jsonl", "w") as f:
    for item in training_data:
        f.write(json.dumps(item) + "\n")

LoRA Fine-Tuning

Low-Rank Adaptation (LoRA) enables efficient fine-tuning without modifying all model parameters:

from peft import LoraConfig, get_peft_model, TaskType
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer

model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")

lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type=TaskType.CAUSAL_LM
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

training_args = TrainingArguments(
    output_dir="./lora_output",
    per_device_train_batch_size=4,
    gradient_accumulation_steps=4,
    learning_rate=2e-4,
    num_train_epochs=3,
    logging_steps=10,
    save_steps=500,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
)

trainer.train()

Evaluation

Evaluating LLM outputs requires different approaches than traditional ML models:

Evaluation Frameworks

from typing import List, Dict, Any, Callable
import numpy as np

class LLMEvaluator:
    def __init__(self):
        self.metrics = {}
    
    def add_metric(self, name: str, func: Callable):
        self.metrics[name] = func
    
    def evaluate(
        self, 
        predictions: List[str], 
        references: List[str] = None,
        contexts: List[str] = None
    ) -> Dict[str, float]:
        results = {}
        
        for name, metric_func in self.metrics.items():
            try:
                scores = []
                for i, pred in enumerate(predictions):
                    ref = references[i] if references else None
                    ctx = contexts[i] if contexts else None
                    score = metric_func(pred, ref, ctx)
                    scores.append(score)
                
                results[name] = {
                    "mean": np.mean(scores),
                    "std": np.std(scores),
                    "scores": scores
                }
            except Exception as e:
                results[name] = {"error": str(e)}
        
        return results
    
    def evaluate_rag(self, predictions: List[Dict], references: List[Dict]) -> Dict[str, float]:
        results = {
            "answer_relevance": [],
            "context_relevance": [],
            "faithfulness": []
        }
        
        for pred, ref in zip(predictions, references):
            results["answer_relevance"].append(
                self._answer_relevance(pred["answer"], ref["question"])
            )
            results["context_relevance"].append(
                self._context_relevance(pred["contexts"], ref["expected_contexts"])
            )
            results["faithfulness"].append(
                self._faithfulness(pred["answer"], pred["contexts"])
            )
        
        return {k: np.mean(v) for k, v in results.items()}

Automated Metrics

from rouge_score import rouge_scorer
from bert_score import score as bert_score
import re

def calculate_rouge(prediction: str, reference: str) -> Dict[str, float]:
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    scores = scorer.score(reference, prediction)
    return {k: v.fmeasure for k, v in scores.items()}

def calculate_bert_score(prediction: str, reference: str) -> float:
    P, R, F1 = bert_score([prediction], [reference], lang='en')
    return F1.item()

def calculate_exact_match(prediction: str, reference: str) -> float:
    return float(prediction.strip() == reference.strip())

def calculate_format_score(prediction: str, required_format: str) -> float:
    if required_format == "json":
        try:
            json.loads(prediction)
            return 1.0
        except:
            return 0.0
    
    if required_format == "xml":
        return 1.0 if re.match(r'<.*>.*</.*>', prediction) else 0.0
    
    return 0.0

def calculate_hallucination_score(prediction: str, context: str) -> float:
    context_lower = context.lower()
    pred_lower = prediction.lower()
    
    relevant_terms = sum(1 for term in context_lower.split() if len(term) > 5)
    contained_terms = sum(1 for term in pred_lower.split() 
                         if term in context_lower and len(term) > 5)
    
    return contained_terms / relevant_terms if relevant_terms > 0 else 0.0

Human Evaluation Integration

from enum import Enum

class Rating(Enum):
    EXCELLENT = 5
    GOOD = 4
    ADEQUATE = 3
    POOR = 2
    VERY_POOR = 1

class HumanEvaluationQueue:
    def __init__(self, db_connection):
        self.db = db_connection
    
    def create_evaluation_task(
        self,
        prompt: str,
        prediction: str,
        reference: str,
        context: str = None,
        evaluation_type: str = "general"
    ) -> str:
        task_id = self.db.insert({
            "prompt": prompt,
            "prediction": prediction,
            "reference": reference,
            "context": context,
            "evaluation_type": evaluation_type,
            "status": "pending"
        })
        return task_id
    
    def record_evaluation(
        self,
        task_id: str,
        ratings: Dict[str, Rating],
        feedback: str = None
    ):
        self.db.update(task_id, {
            "ratings": {k: v.value for k, v in ratings.items()},
            "feedback": feedback,
            "status": "completed",
            "evaluated_at": datetime.utcnow().isoformat()
        })
    
    def get_aggregate_scores(self, evaluation_type: str = None) -> Dict[str, float]:
        query = {"status": "completed"}
        if evaluation_type:
            query["evaluation_type"] = evaluation_type
        
        evaluations = self.db.find(query)
        
        if not evaluations:
            return {}
        
        rating_keys = set()
        for eval in evaluations:
            rating_keys.update(eval["ratings"].keys())
        
        return {
            key: np.mean([e["ratings"].get(key, 0) for e in evaluations])
            for key in rating_keys
        }

Testing LLM Applications

Testing LLM applications requires specialized approaches:

Prompt Testing Suite

import pytest
from typing import Dict, Any, List

class TestPrompts:
    @pytest.fixture
    def llm_client(self):
        return LLMClient(api_key="test-key")
    
    @pytest.fixture
    def test_cases(self) -> List[Dict[str, Any]]:
        return [
            {
                "name": "basic_question",
                "prompt": "What is Python?",
                "expected_contains": ["programming", "language"],
                "max_length": 500,
                "forbidden_terms": ["Java", "C++"]
            },
            {
                "name": "code_generation",
                "prompt": "Write a function to reverse a string in Python",
                "expected_contains": ["def", "return", "reverse"],
                "should_be_valid_python": True
            }
        ]
    
    def test_prompt_response_contains(self, llm_client, test_cases):
        for case in test_cases:
            if "expected_contains" in case:
                response = llm_client.generate(case["prompt"])
                for term in case["expected_contains"]:
                    assert term.lower() in response.lower(), \
                        f"Expected '{term}' in response for {case['name']}"
    
    def test_response_length(self, llm_client, test_cases):
        for case in test_cases:
            if "max_length" in case:
                response = llm_client.generate(case["prompt"])
                assert len(response) <= case["max_length"], \
                    f"Response too long for {case['name']}"
    
    def test_forbidden_terms(self, llm_client, test_cases):
        for case in test_cases:
            if "forbidden_terms" in case:
                response = llm_client.generate(case["prompt"])
                for term in case["forbidden_terms"]:
                    assert term not in response, \
                        f"Forbidden term '{term}' found in {case['name']}"

Regression Testing

class PromptRegressionTest:
    def __init__(self, production_client, baseline_results: Dict):
        self.client = production_client
        self.baseline = baseline_results
    
    def run_regression_tests(self, test_prompts: List[Dict]) -> Dict[str, Any]:
        results = {
            "passed": [],
            "failed": [],
            "degraded": []
        }
        
        for test in test_prompts:
            new_response = self.client.generate(test["prompt"])
            baseline_response = self.baseline.get(test["name"])
            
            if baseline_response is None:
                continue
            
            similarity = self._calculate_similarity(new_response, baseline_response)
            
            if similarity >= 0.95:
                results["passed"].append({"test": test["name"], "similarity": similarity})
            elif similarity >= 0.8:
                results["degraded"].append({"test": test["name"], "similarity": similarity})
            else:
                results["failed"].append({"test": test["name"], "similarity": similarity})
        
        return results
    
    def _calculate_similarity(self, text1: str, text2: str) -> float:
        # Using BERT score or similar
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.metrics.pairwise import cosine_similarity
        
        vectorizer = TfidfVectorizer()
        tfidf = vectorizer.fit_transform([text1, text2])
        return cosine_similarity(tfidf[0:1], tfidf[1:2])[0][0]

Deployment Architecture

Production LLM deployment requires careful architecture design:

Basic Deployment Pattern

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import asyncio
from functools import lru_cache

app = FastAPI()

class GenerationRequest(BaseModel):
    prompt: str
    max_tokens: Optional[int] = 1000
    temperature: Optional[float] = 0.7
    top_p: Optional[float] = 1.0
    stop: Optional[List[str]] = None

class GenerationResponse(BaseModel):
    text: str
    model: str
    usage: dict
    latency_ms: float

@lru_cache(maxsize=1000)
def get_cached_prompt_hash(prompt: str, **kwargs) -> str:
    import hashlib
    config = json.dumps(kwargs, sort_keys=True)
    return hashlib.sha256(f"{prompt}:{config}".encode()).hexdigest()

@app.post("/v1/generate", response_model=GenerationResponse)
async def generate(request: GenerationRequest):
    start_time = time.time()
    
    response = await llm_client.generate(
        prompt=request.prompt,
        max_tokens=request.max_tokens,
        temperature=request.temperature,
        top_p=request.top_p,
        stop=request.stop
    )
    
    latency = (time.time() - start_time) * 1000
    
    return GenerationResponse(
        text=response.text,
        model=response.model,
        usage=response.usage,
        latency_ms=latency
    )

Caching Layer

import hashlib
import json
import redis

class PromptCache:
    def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
        self.redis = redis_client
        self.ttl = ttl
    
    def _get_cache_key(self, prompt: str, params: dict) -> str:
        config = json.dumps(params, sort_keys=True)
        hash_input = f"{prompt}:{config}".encode()
        return f"llm:cache:{hashlib.sha256(hash_input).hexdigest()}"
    
    def get(self, prompt: str, params: dict) -> Optional[str]:
        key = self._get_cache_key(prompt, params)
        cached = self.redis.get(key)
        return cached.decode() if cached else None
    
    def set(self, prompt: str, params: dict, response: str):
        key = self._get_cache_key(prompt, params)
        self.redis.setex(key, self.ttl, response)
    
    def invalidate(self, prompt_pattern: str = None):
        if prompt_pattern:
            keys = self.redis.keys(f"llm:cache:{prompt_pattern}*")
            if keys:
                self.redis.delete(*keys)
        else:
            self.redis.flushdb()

Rate Limiting

from fastapi import Request, HTTPException
from datetime import datetime, timedelta
import redis

class RateLimiter:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    async def check_rate_limit(
        self, 
        client_id: str, 
        max_requests: int, 
        window_seconds: int
    ) -> bool:
        key = f"ratelimit:{client_id}"
        
        current = self.redis.get(key)
        
        if current and int(current) >= max_requests:
            return False
        
        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, window_seconds)
        pipe.execute()
        
        return True

rate_limiter = RateLimiter(redis.Redis())

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_id = request.client.host
    
    if not await rate_limiter.check_rate_limit(client_id, 100, 60):
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    
    response = await call_next(request)
    return response

Load Balancing Multiple Models

class ModelLoadBalancer:
    def __init__(self):
        self.models = []
        self.health_status = {}
    
    def register_model(self, model_id: str, endpoint: str, capacity: int):
        self.models.append({
            "id": model_id,
            "endpoint": endpoint,
            "capacity": capacity,
            "current_load": 0
        })
    
    def select_model(self, requirements: dict = None) -> Optional[dict]:
        available = [m for m in self.models if m["current_load"] < m["capacity"]]
        
        if not available:
            return None
        
        selected = min(available, key=lambda m: m["current_load"])
        selected["current_load"] += 1
        
        return selected
    
    def release_model(self, model_id: str):
        for model in self.models:
            if model["id"] == model_id:
                model["current_load"] = max(0, model["current_load"] - 1)
                break
    
    async def generate(self, prompt: str, requirements: dict = None) -> str:
        model = self.select_model(requirements)
        
        if not model:
            raise HTTPException(status_code=503, detail="No available models")
        
        try:
            response = await self._call_model(model["endpoint"], prompt)
            return response
        finally:
            self.release_model(model["id"])

Monitoring and Observability

Monitoring LLM applications requires tracking traditional metrics plus LLM-specific ones:

Key Metrics

from prometheus_client import Counter, Histogram, Gauge, Summary
import time

llm_requests_total = Counter(
    'llm_requests_total',
    'Total LLM requests',
    ['model', 'status']
)

llm_token_usage = Histogram(
    'llm_token_usage',
    'Token usage',
    ['model', 'type'],
    buckets=[100, 500, 1000, 2000, 5000, 10000]
)

llm_latency = Histogram(
    'llm_latency_seconds',
    'LLM request latency',
    ['model'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)

llm_errors = Counter(
    'llm_errors_total',
    'Total LLM errors',
    ['model', 'error_type']
)

prompt_cache_hits = Counter(
    'prompt_cache_hits_total',
    'Total cache hits'
)

active_requests = Gauge(
    'llm_active_requests',
    'Currently active requests',
    ['model']
)

class LLMPrometheusMonitor:
    def __init__(self, model_name: str):
        self.model_name = model_name
    
    def track_request(self, status: str = "success"):
        llm_requests_total.labels(
            model=self.model_name,
            status=status
        ).inc()
    
    def track_tokens(self, input_tokens: int, output_tokens: int):
        llm_token_usage.labels(
            model=self.model_name,
            type="input"
        ).observe(input_tokens)
        
        llm_token_usage.labels(
            model=self.model_name,
            type="output"
        ).observe(output_tokens)
    
    def track_latency(self, duration_seconds: float):
        llm_latency.labels(model=self.model_name).observe(duration_seconds)
    
    def track_error(self, error_type: str):
        llm_errors.labels(
            model=self.model_name,
            error_type=error_type
        ).inc()

Logging

import structlog

logger = structlog.get_logger()

class LLMLogger:
    def log_request(
        self,
        prompt: str,
        response: str,
        model: str,
        latency_ms: float,
        token_usage: dict,
        metadata: dict = None
    ):
        logger.info(
            "llm_request",
            prompt_length=len(prompt),
            response_length=len(response),
            model=model,
            latency_ms=latency_ms,
            input_tokens=token_usage.get("input_tokens"),
            output_tokens=token_usage.get("output_tokens"),
            metadata=metadata or {}
        )
    
    def log_error(
        self,
        error: Exception,
        prompt: str,
        model: str,
        context: dict = None
    ):
        logger.error(
            "llm_error",
            error_type=type(error).__name__,
            error_message=str(error),
            prompt_length=len(prompt),
            model=model,
            context=context or {}
        )

Cost Tracking

class CostTracker:
    def __init__(self):
        self.daily_costs = {}
        self.monthly_budget = 10000
        self.alert_threshold = 0.8
    
    def track_request(self, model: str, input_tokens: int, output_tokens: int):
        pricing = self._get_pricing(model)
        
        cost = (
            (input_tokens / 1000) * pricing["input"] +
            (output_tokens / 1000) * pricing["output"]
        )
        
        today = datetime.now().date().isoformat()
        if today not in self.daily_costs:
            self.daily_costs[today] = 0
        self.daily_costs[today] += cost
        
        self._check_budget_alert()
        
        return cost
    
    def _get_pricing(self, model: str) -> dict:
        pricing_table = {
            "gpt-4o": {"input": 0.0025, "output": 0.01},
            "claude-4": {"input": 0.003, "output": 0.015},
        }
        return pricing_table.get(model, {"input": 0, "output": 0})
    
    def _check_budget_alert(self):
        total_spent = sum(self.daily_costs.values())
        if total_spent > self.monthly_budget * self.alert_threshold:
            logger.warning(
                "budget_alert",
                spent=total_spent,
                budget=self.monthly_budget,
                percentage=(total_spent / self.monthly_budget) * 100
            )
    
    def get_daily_cost(self, date: str = None) -> float:
        date = date or datetime.now().date().isoformat()
        return self.daily_costs.get(date, 0)
    
    def get_monthly_cost(self) -> float:
        return sum(self.daily_costs.values())

Security Considerations

LLM applications require special security considerations:

Prompt Injection Prevention

import re

class PromptInjectionGuard:
    def __init__(self):
        self.injection_patterns = [
            r"ignore.*previous.*instructions",
            r"disregard.*system.*prompt",
            r"forget.*all.*rules",
            r"you.*are.*now.*",
            r"new.*instructions.*:",
        ]
    
    def scan(self, text: str) -> bool:
        for pattern in self.injection_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return True
        return False
    
    def sanitize(self, text: str) -> str:
        sanitized = text
        for pattern in self.injection_patterns:
            sanitized = re.sub(pattern, "[FILTERED]", sanitized, flags=re.IGNORECASE)
        return sanitized

PII Detection

import re

class PIIDetector:
    def __init__(self):
        self.patterns = {
            "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
            "credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
        }
    
    def detect(self, text: str) -> dict:
        findings = {}
        for pii_type, pattern in self.patterns.items():
            matches = re.findall(pattern, text)
            if matches:
                findings[pii_type] = matches
        return findings
    
    def redact(self, text: str) -> str:
        redacted = text
        for pii_type, pattern in self.patterns.items():
            redacted = re.sub(pattern, f"[{pii_type.upper()}]", redacted)
        return redacted

Best Practices

A/B Testing Prompts

class PromptExperiment:
    def __init__(self, experiment_id: str, variants: List[dict]):
        self.experiment_id = experiment_id
        self.variants = variants
        self.assignments = {}
    
    def get_variant(self, user_id: str) -> dict:
        if user_id in self.assignments:
            variant_index = self.assignments[user_id]
        else:
            variant_index = hash(user_id) % len(self.variants)
            self.assignments[user_id] = variant_index
        
        return self.variants[variant_index]
    
    def track_outcome(self, user_id: str, outcome: dict):
        variant_index = self.assignments.get(user_id)
        if variant_index is not None:
            logger.info(
                "experiment_outcome",
                experiment=self.experiment_id,
                variant=variant_index,
                outcome=outcome
            )

Continuous Improvement

class ContinuousImprovement:
    def __init__(self, evaluator: LLMEvaluator, human_eval: HumanEvaluationQueue):
        self.evaluator = evaluator
        self.human_eval = human_eval
    
    def identify_improvements(self, predictions: List[dict], metrics: dict) -> List[str]:
        improvements = []
        
        if metrics.get("answer_relevance", 0) < 0.7:
            improvements.append("Improve prompt clarity and specificity")
        
        if metrics.get("faithfulness", 0) < 0.8:
            improvements.append("Review context utilization in prompts")
        
        low_rated_count = sum(1 for p in predictions if p.get("rating", 5) < 3)
        if low_rated_count > len(predictions) * 0.1:
            improvements.append("Review edge cases and failure modes")
        
        return improvements
    
    def generate_new_prompts(self, current_prompt: str, improvements: List[str]) -> List[str]:
        new_prompts = [current_prompt]
        
        for improvement in improvements:
            if "clarity" in improvement.lower():
                new_prompts.append(current_prompt + "\n\nBe specific and clear in your response.")
            if "context" in improvement.lower():
                new_prompts.append(current_prompt + "\n\nUse only the provided context to answer.")
            if "structure" in improvement.lower():
                new_prompts.append(current_prompt + "\n\nRespond in the following format: ...")
        
        return new_prompts

Resources

Conclusion

LLMOps represents a critical discipline for organizations building production AI applications. By applying rigorous operational practicesโ€”model selection, prompt management, testing, deployment, monitoring, and continuous improvementโ€”you can build reliable, scalable, and cost-effective LLM-powered systems.

The field continues to evolve rapidly. New models, tools, and best practices emerge regularly. Stay current by participating in communities, reading research papers, and experimenting with new approaches. The investment in robust LLMOps practices will pay dividends in system reliability, developer productivity, and user satisfaction.

Comments