Introduction
LLMs in production require continuous monitoring to detect quality degradation, cost anomalies, and behavioral drift. Unlike traditional ML models, LLMs are black boxes that can degrade subtly through API changes, prompt variations, or data distribution shifts. This guide covers building comprehensive observability systems for production LLM applications with practical metrics, alerting, and debugging strategies.
Key Statistics:
- 40% of LLM quality issues go undetected for weeks
- Model drift costs average $50k-$500k annually
- Monitoring reduces incident response time by 70%
- Real-time alerts prevent 80% of user-facing issues
Core Concepts & Terminology
1. Quality Metrics
Measurements of LLM output quality (accuracy, relevance, toxicity).
2. Drift Detection
Identifying when model behavior changes over time.
3. Cost Anomaly Detection
Detecting unusual spending patterns or token usage.
4. Latency Monitoring
Tracking response times and identifying slowdowns.
5. Token Usage Tracking
Monitoring input/output token consumption.
6. Error Rate Monitoring
Tracking API errors and failures.
7. User Satisfaction Metrics
Measuring user feedback and satisfaction.
8. Hallucination Detection
Identifying when models generate false information.
9. Bias Monitoring
Detecting discriminatory or biased outputs.
10. Observability Stack
Complete monitoring infrastructure (metrics, logs, traces).
LLM Monitoring Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ LLM Requests โ
โโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Instrumentation Layer โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Request โ โ Response โ โ Latency โ โ
โ โ Logging โ โ Logging โ โ Tracking โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Metrics Collection Layer โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Quality โ โ Cost โ โ Performance โ โ
โ โ Metrics โ โ Metrics โ โ Metrics โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Analysis & Detection Layer โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Drift โ โ Anomaly โ โ Trend โ โ
โ โ Detection โ โ Detection โ โ Analysis โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Alerting & Visualization โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Alerts โ โ Dashboards โ โ Reports โ โ
โ โ (Slack, etc) โ โ (Grafana) โ โ (Analytics) โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Quality Metrics Implementation
Core Quality Metrics
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime
import numpy as np
@dataclass
class QualityMetrics:
"""Quality metrics for LLM outputs"""
timestamp: datetime
request_id: str
model: str
# Relevance metrics
relevance_score: float # 0-1, how relevant to query
completeness_score: float # 0-1, how complete the answer
# Safety metrics
toxicity_score: float # 0-1, toxicity level
bias_score: float # 0-1, bias level
# Accuracy metrics
factuality_score: Optional[float] # 0-1, factual accuracy
hallucination_detected: bool
# User satisfaction
user_rating: Optional[int] # 1-5 stars
user_feedback: Optional[str]
# Performance metrics
latency_ms: int
input_tokens: int
output_tokens: int
cost: float
class QualityMonitor:
"""Monitor LLM output quality"""
def __init__(self, db_connection):
self.db = db_connection
self.metrics_buffer = []
def evaluate_response(self, request_id: str, prompt: str,
response: str, model: str) -> QualityMetrics:
"""Evaluate LLM response quality"""
# Relevance scoring
relevance_score = self._score_relevance(prompt, response)
completeness_score = self._score_completeness(response)
# Safety scoring
toxicity_score = self._detect_toxicity(response)
bias_score = self._detect_bias(response)
# Factuality scoring
factuality_score = self._score_factuality(response)
hallucination_detected = factuality_score < 0.5
metrics = QualityMetrics(
timestamp=datetime.now(),
request_id=request_id,
model=model,
relevance_score=relevance_score,
completeness_score=completeness_score,
toxicity_score=toxicity_score,
bias_score=bias_score,
factuality_score=factuality_score,
hallucination_detected=hallucination_detected,
user_rating=None,
user_feedback=None,
latency_ms=0,
input_tokens=len(prompt.split()),
output_tokens=len(response.split()),
cost=0.0
)
return metrics
def _score_relevance(self, prompt: str, response: str) -> float:
"""Score response relevance to prompt"""
# Use semantic similarity
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
vectorizer = TfidfVectorizer()
vectors = vectorizer.fit_transform([prompt, response])
similarity = cosine_similarity(vectors[0], vectors[1])[0][0]
return float(similarity)
def _score_completeness(self, response: str) -> float:
"""Score response completeness"""
# Check for incomplete indicators
incomplete_indicators = ['...', 'I don\'t know', 'unclear', 'not sure']
has_incomplete = any(indicator in response.lower()
for indicator in incomplete_indicators)
# Check response length
min_length = 50 # Minimum reasonable response length
is_long_enough = len(response) > min_length
completeness = 0.5 if has_incomplete else 1.0
completeness *= (1.0 if is_long_enough else 0.5)
return min(1.0, completeness)
def _detect_toxicity(self, response: str) -> float:
"""Detect toxicity in response"""
# Use toxicity detection library
try:
from detoxify import Detoxify
model = Detoxify("original")
results = model.predict(response)
return results['toxicity']
except:
return 0.0
def _detect_bias(self, response: str) -> float:
"""Detect bias in response"""
# Check for biased language
biased_terms = {
'gender': ['he', 'she', 'man', 'woman'],
'race': ['white', 'black', 'asian'],
'age': ['old', 'young', 'elderly']
}
bias_score = 0.0
for category, terms in biased_terms.items():
term_count = sum(response.lower().count(term) for term in terms)
if term_count > 0:
bias_score += 0.1
return min(1.0, bias_score)
def _score_factuality(self, response: str) -> float:
"""Score factual accuracy"""
# Use fact-checking service or model
# Simplified: check for common factual errors
factual_errors = 0
# Check for obvious errors
if '2+2=5' in response:
factual_errors += 1
if 'Earth is flat' in response:
factual_errors += 1
factuality = 1.0 - (factual_errors * 0.5)
return max(0.0, factuality)
def record_metrics(self, metrics: QualityMetrics):
"""Record quality metrics"""
self.metrics_buffer.append(metrics)
# Flush to database periodically
if len(self.metrics_buffer) >= 100:
self._flush_metrics()
def _flush_metrics(self):
"""Flush metrics to database"""
for metrics in self.metrics_buffer:
self.db.insert('llm_quality_metrics', {
'timestamp': metrics.timestamp,
'request_id': metrics.request_id,
'model': metrics.model,
'relevance_score': metrics.relevance_score,
'completeness_score': metrics.completeness_score,
'toxicity_score': metrics.toxicity_score,
'bias_score': metrics.bias_score,
'factuality_score': metrics.factuality_score,
'hallucination_detected': metrics.hallucination_detected,
'latency_ms': metrics.latency_ms,
'input_tokens': metrics.input_tokens,
'output_tokens': metrics.output_tokens,
'cost': metrics.cost
})
self.metrics_buffer = []
def get_quality_summary(self, model: str,
hours: int = 24) -> dict:
"""Get quality summary for model"""
metrics = self.db.query(
"""SELECT * FROM llm_quality_metrics
WHERE model = ? AND timestamp > datetime('now', '-' || ? || ' hours')""",
(model, hours)
)
if not metrics:
return {}
relevance_scores = [m['relevance_score'] for m in metrics]
toxicity_scores = [m['toxicity_score'] for m in metrics]
hallucinations = sum(1 for m in metrics if m['hallucination_detected'])
return {
'avg_relevance': np.mean(relevance_scores),
'avg_toxicity': np.mean(toxicity_scores),
'hallucination_rate': hallucinations / len(metrics),
'total_requests': len(metrics),
'time_period_hours': hours
}
# Usage
monitor = QualityMonitor(db)
metrics = monitor.evaluate_response(
request_id='req_123',
prompt='What is machine learning?',
response='Machine learning is a subset of AI...',
model='gpt-4'
)
monitor.record_metrics(metrics)
summary = monitor.get_quality_summary('gpt-4', hours=24)
print(f"Average relevance: {summary['avg_relevance']:.2f}")
print(f"Hallucination rate: {summary['hallucination_rate']:.2%}")
Drift Detection
Statistical Drift Detection
from scipy import stats
import numpy as np
class DriftDetector:
"""Detect model drift over time"""
def __init__(self, baseline_window_hours: int = 168):
self.baseline_window = baseline_window_hours
self.baseline_metrics = None
def establish_baseline(self, metrics: List[QualityMetrics]):
"""Establish baseline metrics"""
relevance_scores = [m.relevance_score for m in metrics]
latencies = [m.latency_ms for m in metrics]
token_usage = [m.input_tokens + m.output_tokens for m in metrics]
self.baseline_metrics = {
'relevance_mean': np.mean(relevance_scores),
'relevance_std': np.std(relevance_scores),
'latency_mean': np.mean(latencies),
'latency_std': np.std(latencies),
'token_usage_mean': np.mean(token_usage),
'token_usage_std': np.std(token_usage)
}
def detect_drift(self, current_metrics: List[QualityMetrics],
threshold: float = 0.05) -> dict:
"""Detect drift in current metrics"""
if not self.baseline_metrics:
return {'drift_detected': False}
current_relevance = [m.relevance_score for m in current_metrics]
current_latency = [m.latency_ms for m in current_metrics]
# Kolmogorov-Smirnov test
ks_stat_relevance, p_value_relevance = stats.ks_2samp(
[self.baseline_metrics['relevance_mean']] * len(current_relevance),
current_relevance
)
ks_stat_latency, p_value_latency = stats.ks_2samp(
[self.baseline_metrics['latency_mean']] * len(current_latency),
current_latency
)
drift_detected = (p_value_relevance < threshold or
p_value_latency < threshold)
return {
'drift_detected': drift_detected,
'relevance_p_value': p_value_relevance,
'latency_p_value': p_value_latency,
'relevance_change': (np.mean(current_relevance) -
self.baseline_metrics['relevance_mean']) / self.baseline_metrics['relevance_mean'],
'latency_change': (np.mean(current_latency) -
self.baseline_metrics['latency_mean']) / self.baseline_metrics['latency_mean']
}
# Usage
detector = DriftDetector()
# Establish baseline
baseline_metrics = [...] # Historical metrics
detector.establish_baseline(baseline_metrics)
# Check for drift
current_metrics = [...] # Recent metrics
drift_result = detector.detect_drift(current_metrics)
if drift_result['drift_detected']:
print(f"Drift detected! Relevance change: {drift_result['relevance_change']:.2%}")
Cost Tracking & Anomaly Detection
Cost Monitoring
class CostMonitor:
"""Monitor LLM costs and detect anomalies"""
def __init__(self, db_connection, alert_threshold: float = 1.5):
self.db = db_connection
self.alert_threshold = alert_threshold # 1.5x normal cost
def track_request_cost(self, request_id: str, model: str,
input_tokens: int, output_tokens: int,
cost: float):
"""Track individual request cost"""
self.db.insert('llm_costs', {
'request_id': request_id,
'model': model,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'cost': cost,
'timestamp': datetime.now()
})
def get_daily_cost(self, date: str) -> float:
"""Get total cost for a day"""
result = self.db.query(
"""SELECT SUM(cost) as total_cost FROM llm_costs
WHERE DATE(timestamp) = ?""",
(date,)
)
return result[0]['total_cost'] if result else 0.0
def get_model_cost_breakdown(self, hours: int = 24) -> dict:
"""Get cost breakdown by model"""
results = self.db.query(
"""SELECT model, SUM(cost) as total_cost, COUNT(*) as request_count
FROM llm_costs
WHERE timestamp > datetime('now', '-' || ? || ' hours')
GROUP BY model""",
(hours,)
)
return {r['model']: {
'total_cost': r['total_cost'],
'request_count': r['request_count'],
'avg_cost_per_request': r['total_cost'] / r['request_count']
} for r in results}
def detect_cost_anomaly(self, model: str,
current_cost: float) -> bool:
"""Detect cost anomalies"""
# Get historical average
historical = self.db.query(
"""SELECT AVG(cost) as avg_cost FROM llm_costs
WHERE model = ? AND timestamp > datetime('now', '-7 days')""",
(model,)
)
if not historical or not historical[0]['avg_cost']:
return False
avg_cost = historical[0]['avg_cost']
# Alert if cost is significantly higher
return current_cost > (avg_cost * self.alert_threshold)
# Usage
cost_monitor = CostMonitor(db)
# Track costs
cost_monitor.track_request_cost(
request_id='req_123',
model='gpt-4',
input_tokens=500,
output_tokens=200,
cost=0.015
)
# Get breakdown
breakdown = cost_monitor.get_model_cost_breakdown(hours=24)
print(f"GPT-4 cost: ${breakdown['gpt-4']['total_cost']:.2f}")
# Detect anomalies
is_anomaly = cost_monitor.detect_cost_anomaly('gpt-4', 0.050)
if is_anomaly:
print("Cost anomaly detected!")
Best Practices
- Continuous Monitoring: Track metrics in real-time
- Establish Baselines: Know normal behavior before detecting drift
- Multi-Metric Approach: Monitor quality, cost, and performance
- User Feedback: Collect and analyze user ratings
- Automated Alerts: Alert on anomalies immediately
- Regular Reviews: Weekly/monthly metric reviews
- Root Cause Analysis: Investigate drift causes
- Version Tracking: Track model versions and changes
- Comparative Analysis: Compare models and prompts
- Documentation: Document all monitoring decisions
Common Pitfalls
- No Baseline: Can’t detect drift without baseline
- Ignoring Quality: Only tracking cost
- Delayed Alerts: Not alerting in real-time
- No User Feedback: Missing user satisfaction signals
- Ignoring Hallucinations: Not detecting false information
- No Cost Tracking: Surprised by bills
- Ignoring Latency: Not monitoring response times
- No Drift Detection: Unaware of model degradation
- Ignoring Safety: Not monitoring toxicity/bias
- No Documentation: Can’t explain metric changes
Monitoring Metrics Summary
| Metric | Target | Alert Threshold |
|---|---|---|
| Relevance Score | >0.85 | <0.75 |
| Toxicity Score | <0.05 | >0.10 |
| Hallucination Rate | <5% | >10% |
| Latency (ms) | <2000 | >5000 |
| Cost Anomaly | Normal | >1.5x |
| Error Rate | <1% | >5% |
External Resources
Advanced Monitoring Techniques
Real-time Quality Scoring
class QualityScorer:
"""Score LLM response quality in real-time"""
def __init__(self, reference_model="gpt-4"):
self.reference_model = reference_model
self.scores = []
def score_response(self, prompt: str, response: str) -> float:
"""Score response quality (0-1)"""
# 1. Relevance score
relevance = self._calculate_relevance(prompt, response)
# 2. Coherence score
coherence = self._calculate_coherence(response)
# 3. Factuality score
factuality = self._calculate_factuality(response)
# 4. Completeness score
completeness = self._calculate_completeness(prompt, response)
# Weighted average
score = (
relevance * 0.3 +
coherence * 0.2 +
factuality * 0.3 +
completeness * 0.2
)
self.scores.append(score)
return score
def _calculate_relevance(self, prompt: str, response: str) -> float:
"""Calculate relevance to prompt"""
# Use semantic similarity
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
vectorizer = TfidfVectorizer()
vectors = vectorizer.fit_transform([prompt, response])
similarity = cosine_similarity(vectors[0], vectors[1])[0][0]
return float(similarity)
def _calculate_coherence(self, response: str) -> float:
"""Calculate response coherence"""
# Check for logical flow, sentence structure
sentences = response.split('.')
if len(sentences) < 2:
return 0.5
return min(1.0, len(sentences) / 10)
def _calculate_factuality(self, response: str) -> float:
"""Calculate factuality score"""
# Check against knowledge base or fact-checker
# Simplified: check for confidence indicators
confidence_words = ['definitely', 'certainly', 'proven', 'verified']
uncertainty_words = ['might', 'could', 'possibly', 'perhaps']
confidence_count = sum(1 for word in confidence_words if word in response.lower())
uncertainty_count = sum(1 for word in uncertainty_words if word in response.lower())
return min(1.0, (confidence_count - uncertainty_count) / 10)
def _calculate_completeness(self, prompt: str, response: str) -> float:
"""Calculate response completeness"""
# Check if response addresses all parts of prompt
prompt_words = set(prompt.lower().split())
response_words = set(response.lower().split())
coverage = len(prompt_words & response_words) / len(prompt_words)
return coverage
def get_average_score(self) -> float:
"""Get average quality score"""
return sum(self.scores) / len(self.scores) if self.scores else 0
Drift Detection
import numpy as np
from scipy import stats
class DriftDetector:
"""Detect model drift in LLM outputs"""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.baseline_metrics = None
self.current_metrics = []
def set_baseline(self, metrics: list):
"""Set baseline metrics"""
self.baseline_metrics = {
'mean': np.mean(metrics),
'std': np.std(metrics),
'median': np.median(metrics)
}
def add_metric(self, metric: float):
"""Add new metric"""
self.current_metrics.append(metric)
if len(self.current_metrics) > self.window_size:
self.current_metrics.pop(0)
def detect_drift(self) -> bool:
"""Detect if drift occurred"""
if not self.baseline_metrics or len(self.current_metrics) < 10:
return False
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(
self.current_metrics,
[self.baseline_metrics['mean']] * len(self.current_metrics)
)
# Drift detected if p-value < 0.05
return p_value < 0.05
def get_drift_severity(self) -> float:
"""Get drift severity (0-1)"""
if not self.baseline_metrics:
return 0
current_mean = np.mean(self.current_metrics)
baseline_mean = self.baseline_metrics['mean']
baseline_std = self.baseline_metrics['std']
# Calculate z-score
z_score = abs((current_mean - baseline_mean) / baseline_std)
# Convert to severity (0-1)
severity = min(1.0, z_score / 3)
return severity
User Feedback Integration
class FeedbackCollector:
"""Collect and analyze user feedback"""
def __init__(self):
self.feedback = []
self.ratings = []
def collect_feedback(self, response_id: str, rating: int,
comment: str = None, tags: list = None):
"""Collect user feedback"""
self.feedback.append({
'response_id': response_id,
'rating': rating, # 1-5
'comment': comment,
'tags': tags or [],
'timestamp': datetime.now()
})
self.ratings.append(rating)
def get_satisfaction_rate(self) -> float:
"""Get user satisfaction rate"""
if not self.ratings:
return 0
satisfied = sum(1 for r in self.ratings if r >= 4)
return satisfied / len(self.ratings)
def get_common_issues(self) -> dict:
"""Get most common issues from feedback"""
issues = {}
for item in self.feedback:
if item['tags']:
for tag in item['tags']:
issues[tag] = issues.get(tag, 0) + 1
return sorted(issues.items(), key=lambda x: x[1], reverse=True)
def analyze_sentiment(self) -> dict:
"""Analyze sentiment of feedback"""
from textblob import TextBlob
sentiments = {'positive': 0, 'neutral': 0, 'negative': 0}
for item in self.feedback:
if item['comment']:
blob = TextBlob(item['comment'])
polarity = blob.sentiment.polarity
if polarity > 0.1:
sentiments['positive'] += 1
elif polarity < -0.1:
sentiments['negative'] += 1
else:
sentiments['neutral'] += 1
return sentiments
Production Monitoring Dashboard
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
import json
app = FastAPI()
class MonitoringDashboard:
"""Production monitoring dashboard"""
def __init__(self):
self.quality_scorer = QualityScorer()
self.drift_detector = DriftDetector()
self.feedback_collector = FeedbackCollector()
self.cost_tracker = CostTracker()
def get_dashboard_data(self) -> dict:
"""Get all dashboard metrics"""
return {
'quality': {
'average_score': self.quality_scorer.get_average_score(),
'trend': 'up' if self._is_quality_improving() else 'down'
},
'drift': {
'detected': self.drift_detector.detect_drift(),
'severity': self.drift_detector.get_drift_severity()
},
'user_satisfaction': {
'rate': self.feedback_collector.get_satisfaction_rate(),
'common_issues': self.feedback_collector.get_common_issues()[:5]
},
'costs': {
'daily': self.cost_tracker.get_daily_cost(),
'monthly_projection': self.cost_tracker.get_monthly_projection()
}
}
def _is_quality_improving(self) -> bool:
"""Check if quality is improving"""
if len(self.quality_scorer.scores) < 10:
return True
recent = self.quality_scorer.scores[-5:]
older = self.quality_scorer.scores[-10:-5]
return np.mean(recent) > np.mean(older)
@app.get("/dashboard", response_class=HTMLResponse)
async def get_dashboard():
"""Serve monitoring dashboard"""
dashboard = MonitoringDashboard()
data = dashboard.get_dashboard_data()
html = f"""
<html>
<head>
<title>LLM Monitoring Dashboard</title>
<style>
body {{ font-family: Arial; margin: 20px; }}
.metric {{ display: inline-block; margin: 10px; padding: 10px; border: 1px solid #ccc; }}
.good {{ color: green; }}
.warning {{ color: orange; }}
.critical {{ color: red; }}
</style>
</head>
<body>
<h1>LLM Monitoring Dashboard</h1>
<div class="metric">
<h3>Quality Score</h3>
<p class="good">{data['quality']['average_score']:.2f}/1.0</p>
</div>
<div class="metric">
<h3>Drift Detection</h3>
<p class="{'critical' if data['drift']['detected'] else 'good'}">
{'โ ๏ธ Drift Detected' if data['drift']['detected'] else 'โ No Drift'}
</p>
</div>
<div class="metric">
<h3>User Satisfaction</h3>
<p class="good">{data['user_satisfaction']['rate']:.1%}</p>
</div>
<div class="metric">
<h3>Daily Cost</h3>
<p>${data['costs']['daily']:.2f}</p>
</div>
</body>
</html>
"""
return html
Alerting Strategy
class AlertManager:
"""Manage monitoring alerts"""
def __init__(self):
self.alerts = []
self.thresholds = {
'quality_score': 0.7,
'drift_severity': 0.5,
'satisfaction_rate': 0.8,
'daily_cost': 1000
}
def check_metrics(self, metrics: dict):
"""Check metrics against thresholds"""
# Quality alert
if metrics['quality'] < self.thresholds['quality_score']:
self._create_alert('LOW_QUALITY', f"Quality score: {metrics['quality']:.2f}")
# Drift alert
if metrics['drift_severity'] > self.thresholds['drift_severity']:
self._create_alert('DRIFT_DETECTED', f"Drift severity: {metrics['drift_severity']:.2f}")
# Satisfaction alert
if metrics['satisfaction_rate'] < self.thresholds['satisfaction_rate']:
self._create_alert('LOW_SATISFACTION', f"Satisfaction: {metrics['satisfaction_rate']:.1%}")
# Cost alert
if metrics['daily_cost'] > self.thresholds['daily_cost']:
self._create_alert('HIGH_COST', f"Daily cost: ${metrics['daily_cost']:.2f}")
def _create_alert(self, alert_type: str, message: str):
"""Create alert"""
alert = {
'type': alert_type,
'message': message,
'timestamp': datetime.now(),
'severity': self._get_severity(alert_type)
}
self.alerts.append(alert)
self._send_notification(alert)
def _get_severity(self, alert_type: str) -> str:
"""Get alert severity"""
critical_alerts = ['DRIFT_DETECTED', 'HIGH_COST']
return 'critical' if alert_type in critical_alerts else 'warning'
def _send_notification(self, alert: dict):
"""Send alert notification"""
# Send to Slack, email, PagerDuty, etc.
print(f"[{alert['severity'].upper()}] {alert['type']}: {alert['message']}")
Conclusion
Comprehensive LLM monitoring requires tracking quality metrics, detecting drift, collecting user feedback, and monitoring costs. By implementing the patterns in this guide, you can maintain high-quality LLM applications and quickly identify and resolve issues.
Key Takeaways:
- Track quality metrics continuously
- Implement drift detection
- Collect and analyze user feedback
- Monitor costs in real-time
- Set up automated alerts
- Create monitoring dashboards
- Establish baselines for comparison
- Review metrics regularly
Next Steps:
- Implement quality scoring
- Set up drift detection
- Create feedback collection system
- Build monitoring dashboard
- Configure alerts
Comments