Introduction: LLM monitoring is essential for maintaining reliable, cost-effective AI applications in production. Unlike traditional software where errors are obvious, LLM failures can be subtle—degraded output quality, increased hallucinations, or slowly rising costs that go unnoticed until the monthly bill arrives. Effective monitoring tracks latency, token usage, error rates, output quality, and cost metrics in real-time, enabling teams to detect issues before they impact users. This guide covers the techniques that make LLM monitoring effective: metric collection, quality scoring, anomaly detection, alerting strategies, and dashboard design. Whether you’re running a chatbot, RAG system, or AI-powered feature, these patterns will help you maintain visibility into your LLM operations and respond quickly to problems.

Core Metrics Collection
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from datetime import datetime
from enum import Enum
import time
class MetricType(Enum):
"""Types of LLM metrics."""
LATENCY = "latency"
TOKENS = "tokens"
COST = "cost"
ERROR = "error"
QUALITY = "quality"
THROUGHPUT = "throughput"
@dataclass
class LLMMetric:
"""A single LLM metric."""
name: str
value: float
metric_type: MetricType
timestamp: datetime = field(default_factory=datetime.now)
tags: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"name": self.name,
"value": self.value,
"type": self.metric_type.value,
"timestamp": self.timestamp.isoformat(),
"tags": self.tags
}
@dataclass
class RequestMetrics:
"""Metrics for a single LLM request."""
request_id: str
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
latency_ms: float
time_to_first_token_ms: float = 0
cost_usd: float = 0
success: bool = True
error_type: str = ""
timestamp: datetime = field(default_factory=datetime.now)
class MetricsCollector:
"""Collect LLM metrics."""
def __init__(self):
self.metrics: list[LLMMetric] = []
self.request_metrics: list[RequestMetrics] = []
def record_request(self, metrics: RequestMetrics):
"""Record request metrics."""
self.request_metrics.append(metrics)
# Extract individual metrics
self.metrics.append(LLMMetric(
name="llm_latency_ms",
value=metrics.latency_ms,
metric_type=MetricType.LATENCY,
tags={"model": metrics.model, "request_id": metrics.request_id}
))
self.metrics.append(LLMMetric(
name="llm_tokens_total",
value=metrics.total_tokens,
metric_type=MetricType.TOKENS,
tags={"model": metrics.model, "type": "total"}
))
self.metrics.append(LLMMetric(
name="llm_cost_usd",
value=metrics.cost_usd,
metric_type=MetricType.COST,
tags={"model": metrics.model}
))
if not metrics.success:
self.metrics.append(LLMMetric(
name="llm_error",
value=1,
metric_type=MetricType.ERROR,
tags={"model": metrics.model, "error_type": metrics.error_type}
))
def record_quality(
self,
request_id: str,
quality_score: float,
quality_type: str = "overall"
):
"""Record quality metric."""
self.metrics.append(LLMMetric(
name="llm_quality_score",
value=quality_score,
metric_type=MetricType.QUALITY,
tags={"request_id": request_id, "quality_type": quality_type}
))
def get_metrics(
self,
metric_type: MetricType = None,
since: datetime = None
) -> list[LLMMetric]:
"""Get filtered metrics."""
result = self.metrics
if metric_type:
result = [m for m in result if m.metric_type == metric_type]
if since:
result = [m for m in result if m.timestamp >= since]
return result
class MetricsAggregator:
"""Aggregate metrics over time windows."""
def __init__(self, collector: MetricsCollector):
self.collector = collector
def aggregate(
self,
metric_name: str,
window_minutes: int = 5,
aggregation: str = "avg"
) -> float:
"""Aggregate metric over time window."""
from datetime import timedelta
since = datetime.now() - timedelta(minutes=window_minutes)
metrics = [
m for m in self.collector.metrics
if m.name == metric_name and m.timestamp >= since
]
if not metrics:
return 0.0
values = [m.value for m in metrics]
if aggregation == "avg":
return sum(values) / len(values)
elif aggregation == "sum":
return sum(values)
elif aggregation == "max":
return max(values)
elif aggregation == "min":
return min(values)
elif aggregation == "p50":
return sorted(values)[len(values) // 2]
elif aggregation == "p95":
return sorted(values)[int(len(values) * 0.95)]
elif aggregation == "p99":
return sorted(values)[int(len(values) * 0.99)]
elif aggregation == "count":
return len(values)
return 0.0
def get_summary(self, window_minutes: int = 60) -> dict:
"""Get summary statistics."""
return {
"latency_avg_ms": self.aggregate("llm_latency_ms", window_minutes, "avg"),
"latency_p95_ms": self.aggregate("llm_latency_ms", window_minutes, "p95"),
"latency_p99_ms": self.aggregate("llm_latency_ms", window_minutes, "p99"),
"tokens_total": self.aggregate("llm_tokens_total", window_minutes, "sum"),
"cost_total_usd": self.aggregate("llm_cost_usd", window_minutes, "sum"),
"error_count": self.aggregate("llm_error", window_minutes, "count"),
"request_count": len([
m for m in self.collector.request_metrics
if m.timestamp >= datetime.now() - timedelta(minutes=window_minutes)
]),
"quality_avg": self.aggregate("llm_quality_score", window_minutes, "avg")
}
class PrometheusExporter:
"""Export metrics to Prometheus format."""
def __init__(self, collector: MetricsCollector):
self.collector = collector
def export(self) -> str:
"""Export metrics in Prometheus format."""
lines = []
# Group by metric name
by_name: dict[str, list[LLMMetric]] = {}
for metric in self.collector.metrics:
if metric.name not in by_name:
by_name[metric.name] = []
by_name[metric.name].append(metric)
for name, metrics in by_name.items():
# Add help and type
lines.append(f"# HELP {name} LLM metric")
lines.append(f"# TYPE {name} gauge")
for metric in metrics[-100:]: # Last 100
tags_str = ",".join(f'{k}="{v}"' for k, v in metric.tags.items())
if tags_str:
lines.append(f"{name}{{{tags_str}}} {metric.value}")
else:
lines.append(f"{name} {metric.value}")
return "\n".join(lines)
Quality Monitoring
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import re
@dataclass
class QualityScore:
"""Quality score with breakdown."""
overall: float
relevance: float
coherence: float
completeness: float
safety: float
format_compliance: float
class QualityMonitor:
"""Monitor LLM output quality."""
def __init__(self):
self.scorers: list[Callable] = []
self.history: list[QualityScore] = []
def score(
self,
prompt: str,
output: str,
expected: str = None
) -> QualityScore:
"""Score output quality."""
relevance = self._score_relevance(prompt, output)
coherence = self._score_coherence(output)
completeness = self._score_completeness(prompt, output)
safety = self._score_safety(output)
format_compliance = self._score_format(prompt, output)
overall = (
relevance * 0.3 +
coherence * 0.2 +
completeness * 0.2 +
safety * 0.2 +
format_compliance * 0.1
)
score = QualityScore(
overall=overall,
relevance=relevance,
coherence=coherence,
completeness=completeness,
safety=safety,
format_compliance=format_compliance
)
self.history.append(score)
return score
def _score_relevance(self, prompt: str, output: str) -> float:
"""Score relevance to prompt."""
# Simple keyword overlap
prompt_words = set(prompt.lower().split())
output_words = set(output.lower().split())
# Remove stopwords
stopwords = {"the", "a", "an", "is", "are", "was", "were", "be", "been",
"have", "has", "had", "do", "does", "did", "will", "would",
"could", "should", "may", "might", "must", "shall", "can",
"to", "of", "in", "for", "on", "with", "at", "by", "from"}
prompt_keywords = prompt_words - stopwords
output_keywords = output_words - stopwords
if not prompt_keywords:
return 0.5
overlap = len(prompt_keywords & output_keywords) / len(prompt_keywords)
return min(overlap * 2, 1.0) # Scale up, cap at 1
def _score_coherence(self, output: str) -> float:
"""Score output coherence."""
# Check for complete sentences
sentences = re.split(r'[.!?]+', output)
complete_sentences = [s for s in sentences if len(s.split()) >= 3]
if not sentences:
return 0.0
sentence_ratio = len(complete_sentences) / len(sentences)
# Check for repetition
words = output.split()
if len(words) > 10:
unique_ratio = len(set(words)) / len(words)
else:
unique_ratio = 1.0
return (sentence_ratio + unique_ratio) / 2
def _score_completeness(self, prompt: str, output: str) -> float:
"""Score response completeness."""
# Check output length relative to prompt complexity
prompt_words = len(prompt.split())
output_words = len(output.split())
# Expect at least some response
if output_words < 5:
return 0.2
# Check for truncation indicators
if output.rstrip().endswith("..."):
return 0.5
# Check for complete ending
if output.rstrip()[-1] in ".!?":
return 1.0
return 0.7
def _score_safety(self, output: str) -> float:
"""Score output safety."""
# Check for unsafe patterns
unsafe_patterns = [
r"password",
r"credit card",
r"social security",
r"api key",
r"secret",
r"token"
]
output_lower = output.lower()
for pattern in unsafe_patterns:
if re.search(pattern, output_lower):
return 0.5
return 1.0
def _score_format(self, prompt: str, output: str) -> float:
"""Score format compliance."""
prompt_lower = prompt.lower()
# Check JSON format
if "json" in prompt_lower:
try:
import json
json.loads(output)
return 1.0
except json.JSONDecodeError:
return 0.0
# Check list format
if "list" in prompt_lower:
if re.search(r'^[\-\*\d]', output, re.MULTILINE):
return 1.0
return 0.5
return 1.0
def get_trend(self, window: int = 100) -> dict:
"""Get quality trend."""
if not self.history:
return {}
recent = self.history[-window:]
return {
"overall_avg": sum(s.overall for s in recent) / len(recent),
"overall_min": min(s.overall for s in recent),
"overall_max": max(s.overall for s in recent),
"relevance_avg": sum(s.relevance for s in recent) / len(recent),
"coherence_avg": sum(s.coherence for s in recent) / len(recent),
"safety_avg": sum(s.safety for s in recent) / len(recent)
}
class LLMQualityJudge:
"""Use LLM to judge output quality."""
def __init__(self, judge_model: Any):
self.judge = judge_model
async def score(
self,
prompt: str,
output: str,
criteria: list[str] = None
) -> dict:
"""Score output using LLM judge."""
criteria = criteria or ["relevance", "accuracy", "helpfulness"]
judge_prompt = f"""Rate the following LLM output on a scale of 1-10 for each criterion.
Original Prompt: {prompt}
Output: {output}
Rate each criterion:
{chr(10).join(f"- {c}" for c in criteria)}
Respond in this format:
{chr(10).join(f"{c}: [score]" for c in criteria)}
OVERALL: [average score]"""
response = await self.judge.generate(judge_prompt)
return self._parse_scores(response, criteria)
def _parse_scores(self, response: str, criteria: list[str]) -> dict:
"""Parse scores from judge response."""
scores = {}
for line in response.split("\n"):
for criterion in criteria + ["OVERALL"]:
if criterion.lower() in line.lower():
try:
score = float(re.search(r"(\d+(?:\.\d+)?)", line).group(1))
scores[criterion.lower()] = score / 10 # Normalize to 0-1
except (AttributeError, ValueError):
pass
return scores
Anomaly Detection
from dataclasses import dataclass
from typing import Any, Optional, List
from datetime import datetime, timedelta
import numpy as np
@dataclass
class Anomaly:
"""A detected anomaly."""
metric_name: str
value: float
expected_range: tuple[float, float]
severity: str # low, medium, high, critical
timestamp: datetime
description: str
class StatisticalAnomalyDetector:
"""Detect anomalies using statistical methods."""
def __init__(
self,
window_size: int = 100,
z_threshold: float = 3.0
):
self.window_size = window_size
self.z_threshold = z_threshold
self.history: dict[str, list[float]] = {}
def add_value(self, metric_name: str, value: float) -> Optional[Anomaly]:
"""Add value and check for anomaly."""
if metric_name not in self.history:
self.history[metric_name] = []
history = self.history[metric_name]
# Need enough history
if len(history) < 10:
history.append(value)
return None
# Calculate statistics
mean = np.mean(history[-self.window_size:])
std = np.std(history[-self.window_size:])
if std == 0:
history.append(value)
return None
# Calculate z-score
z_score = abs(value - mean) / std
history.append(value)
# Trim history
if len(history) > self.window_size * 2:
self.history[metric_name] = history[-self.window_size:]
if z_score > self.z_threshold:
severity = self._get_severity(z_score)
return Anomaly(
metric_name=metric_name,
value=value,
expected_range=(mean - 2*std, mean + 2*std),
severity=severity,
timestamp=datetime.now(),
description=f"Value {value:.2f} is {z_score:.1f} standard deviations from mean {mean:.2f}"
)
return None
def _get_severity(self, z_score: float) -> str:
"""Get severity based on z-score."""
if z_score > 5:
return "critical"
elif z_score > 4:
return "high"
elif z_score > 3.5:
return "medium"
else:
return "low"
class ThresholdAnomalyDetector:
"""Detect anomalies using fixed thresholds."""
def __init__(self):
self.thresholds: dict[str, dict] = {}
def set_threshold(
self,
metric_name: str,
warning: float = None,
critical: float = None,
direction: str = "above" # above or below
):
"""Set threshold for metric."""
self.thresholds[metric_name] = {
"warning": warning,
"critical": critical,
"direction": direction
}
def check(self, metric_name: str, value: float) -> Optional[Anomaly]:
"""Check value against thresholds."""
if metric_name not in self.thresholds:
return None
threshold = self.thresholds[metric_name]
direction = threshold["direction"]
# Check critical
if threshold["critical"] is not None:
is_critical = (
(direction == "above" and value > threshold["critical"]) or
(direction == "below" and value < threshold["critical"])
)
if is_critical:
return Anomaly(
metric_name=metric_name,
value=value,
expected_range=(0, threshold["critical"]) if direction == "above" else (threshold["critical"], float('inf')),
severity="critical",
timestamp=datetime.now(),
description=f"Value {value:.2f} exceeds critical threshold {threshold['critical']}"
)
# Check warning
if threshold["warning"] is not None:
is_warning = (
(direction == "above" and value > threshold["warning"]) or
(direction == "below" and value < threshold["warning"])
)
if is_warning:
return Anomaly(
metric_name=metric_name,
value=value,
expected_range=(0, threshold["warning"]) if direction == "above" else (threshold["warning"], float('inf')),
severity="medium",
timestamp=datetime.now(),
description=f"Value {value:.2f} exceeds warning threshold {threshold['warning']}"
)
return None
class TrendAnomalyDetector:
"""Detect anomalies in metric trends."""
def __init__(
self,
window_size: int = 50,
trend_threshold: float = 0.1
):
self.window_size = window_size
self.trend_threshold = trend_threshold
self.history: dict[str, list[tuple[datetime, float]]] = {}
def add_value(
self,
metric_name: str,
value: float,
timestamp: datetime = None
) -> Optional[Anomaly]:
"""Add value and check for trend anomaly."""
timestamp = timestamp or datetime.now()
if metric_name not in self.history:
self.history[metric_name] = []
self.history[metric_name].append((timestamp, value))
# Trim history
if len(self.history[metric_name]) > self.window_size * 2:
self.history[metric_name] = self.history[metric_name][-self.window_size:]
# Need enough history
if len(self.history[metric_name]) < self.window_size:
return None
# Calculate trend
recent = self.history[metric_name][-self.window_size:]
values = [v for _, v in recent]
# Simple linear regression
x = np.arange(len(values))
slope = np.polyfit(x, values, 1)[0]
# Normalize slope by mean
mean = np.mean(values)
if mean != 0:
normalized_slope = slope / mean
else:
normalized_slope = 0
if abs(normalized_slope) > self.trend_threshold:
direction = "increasing" if normalized_slope > 0 else "decreasing"
return Anomaly(
metric_name=metric_name,
value=value,
expected_range=(mean * 0.9, mean * 1.1),
severity="medium",
timestamp=timestamp,
description=f"Metric is {direction} at {abs(normalized_slope)*100:.1f}% per window"
)
return None
class CompositeAnomalyDetector:
"""Combine multiple anomaly detectors."""
def __init__(self):
self.detectors: list[Any] = []
self.anomalies: list[Anomaly] = []
def add_detector(self, detector: Any):
"""Add a detector."""
self.detectors.append(detector)
def check(self, metric_name: str, value: float) -> list[Anomaly]:
"""Check all detectors."""
anomalies = []
for detector in self.detectors:
if hasattr(detector, 'add_value'):
anomaly = detector.add_value(metric_name, value)
elif hasattr(detector, 'check'):
anomaly = detector.check(metric_name, value)
else:
continue
if anomaly:
anomalies.append(anomaly)
self.anomalies.append(anomaly)
return anomalies
def get_recent_anomalies(self, hours: int = 24) -> list[Anomaly]:
"""Get recent anomalies."""
since = datetime.now() - timedelta(hours=hours)
return [a for a in self.anomalies if a.timestamp >= since]
Alerting System
from dataclasses import dataclass, field
from typing import Any, Optional, List, Callable
from datetime import datetime, timedelta
from enum import Enum
import asyncio
class AlertSeverity(Enum):
"""Alert severity levels."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class Alert:
"""An alert."""
alert_id: str
title: str
description: str
severity: AlertSeverity
metric_name: str
value: float
threshold: float
timestamp: datetime = field(default_factory=datetime.now)
acknowledged: bool = False
resolved: bool = False
@dataclass
class AlertRule:
"""A rule for generating alerts."""
name: str
metric_name: str
condition: str # gt, lt, eq, ne
threshold: float
severity: AlertSeverity
cooldown_minutes: int = 5
last_triggered: datetime = None
class AlertManager:
"""Manage alerts and notifications."""
def __init__(self):
self.rules: list[AlertRule] = []
self.alerts: list[Alert] = []
self.handlers: list[Callable] = []
def add_rule(self, rule: AlertRule):
"""Add an alert rule."""
self.rules.append(rule)
def add_handler(self, handler: Callable):
"""Add alert handler."""
self.handlers.append(handler)
def check_metric(self, metric_name: str, value: float) -> list[Alert]:
"""Check metric against rules."""
triggered_alerts = []
for rule in self.rules:
if rule.metric_name != metric_name:
continue
# Check cooldown
if rule.last_triggered:
cooldown_end = rule.last_triggered + timedelta(minutes=rule.cooldown_minutes)
if datetime.now() < cooldown_end:
continue
# Check condition
triggered = False
if rule.condition == "gt" and value > rule.threshold:
triggered = True
elif rule.condition == "lt" and value < rule.threshold:
triggered = True
elif rule.condition == "eq" and value == rule.threshold:
triggered = True
elif rule.condition == "ne" and value != rule.threshold:
triggered = True
if triggered:
alert = Alert(
alert_id=f"alert_{len(self.alerts)}",
title=f"{rule.name} triggered",
description=f"{metric_name} is {value:.2f} (threshold: {rule.threshold})",
severity=rule.severity,
metric_name=metric_name,
value=value,
threshold=rule.threshold
)
self.alerts.append(alert)
triggered_alerts.append(alert)
rule.last_triggered = datetime.now()
# Notify handlers
for handler in self.handlers:
try:
handler(alert)
except Exception as e:
print(f"Handler error: {e}")
return triggered_alerts
def acknowledge(self, alert_id: str):
"""Acknowledge an alert."""
for alert in self.alerts:
if alert.alert_id == alert_id:
alert.acknowledged = True
break
def resolve(self, alert_id: str):
"""Resolve an alert."""
for alert in self.alerts:
if alert.alert_id == alert_id:
alert.resolved = True
break
def get_active_alerts(self) -> list[Alert]:
"""Get unresolved alerts."""
return [a for a in self.alerts if not a.resolved]
class SlackNotifier:
"""Send alerts to Slack."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def notify(self, alert: Alert):
"""Send alert to Slack."""
import httpx
color = {
AlertSeverity.INFO: "#36a64f",
AlertSeverity.WARNING: "#ffcc00",
AlertSeverity.ERROR: "#ff6600",
AlertSeverity.CRITICAL: "#ff0000"
}.get(alert.severity, "#808080")
payload = {
"attachments": [{
"color": color,
"title": alert.title,
"text": alert.description,
"fields": [
{"title": "Metric", "value": alert.metric_name, "short": True},
{"title": "Value", "value": f"{alert.value:.2f}", "short": True},
{"title": "Threshold", "value": f"{alert.threshold:.2f}", "short": True},
{"title": "Severity", "value": alert.severity.value, "short": True}
],
"ts": int(alert.timestamp.timestamp())
}]
}
async with httpx.AsyncClient() as client:
await client.post(self.webhook_url, json=payload)
class PagerDutyNotifier:
"""Send alerts to PagerDuty."""
def __init__(self, routing_key: str):
self.routing_key = routing_key
async def notify(self, alert: Alert):
"""Send alert to PagerDuty."""
import httpx
severity = {
AlertSeverity.INFO: "info",
AlertSeverity.WARNING: "warning",
AlertSeverity.ERROR: "error",
AlertSeverity.CRITICAL: "critical"
}.get(alert.severity, "info")
payload = {
"routing_key": self.routing_key,
"event_action": "trigger",
"dedup_key": alert.alert_id,
"payload": {
"summary": alert.title,
"severity": severity,
"source": "llm-monitoring",
"custom_details": {
"metric": alert.metric_name,
"value": alert.value,
"threshold": alert.threshold,
"description": alert.description
}
}
}
async with httpx.AsyncClient() as client:
await client.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
)
class EmailNotifier:
"""Send alerts via email."""
def __init__(self, smtp_config: dict, recipients: list[str]):
self.smtp_config = smtp_config
self.recipients = recipients
async def notify(self, alert: Alert):
"""Send alert via email."""
import smtplib
from email.mime.text import MIMEText
subject = f"[{alert.severity.value.upper()}] {alert.title}"
body = f"""
Alert: {alert.title}
Severity: {alert.severity.value}
Metric: {alert.metric_name}
Value: {alert.value:.2f}
Threshold: {alert.threshold:.2f}
Time: {alert.timestamp.isoformat()}
{alert.description}
"""
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = self.smtp_config.get("from", "alerts@example.com")
msg["To"] = ", ".join(self.recipients)
with smtplib.SMTP(
self.smtp_config["host"],
self.smtp_config.get("port", 587)
) as server:
server.starttls()
server.login(
self.smtp_config["username"],
self.smtp_config["password"]
)
server.send_message(msg)
Production Monitoring Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import uuid
import time
app = FastAPI()
class MetricRequest(BaseModel):
name: str
value: float
tags: Optional[dict] = None
class AlertRuleRequest(BaseModel):
name: str
metric_name: str
condition: str
threshold: float
severity: str
class DashboardData(BaseModel):
latency_avg: float
latency_p95: float
tokens_total: int
cost_total: float
error_rate: float
quality_avg: float
active_alerts: int
# Initialize components
collector = MetricsCollector()
aggregator = MetricsAggregator(collector)
alert_manager = AlertManager()
anomaly_detector = CompositeAnomalyDetector()
# Add default detectors
anomaly_detector.add_detector(StatisticalAnomalyDetector())
threshold_detector = ThresholdAnomalyDetector()
threshold_detector.set_threshold("llm_latency_ms", warning=2000, critical=5000)
threshold_detector.set_threshold("llm_error", warning=0.05, critical=0.1)
anomaly_detector.add_detector(threshold_detector)
# Add default alert rules
alert_manager.add_rule(AlertRule(
name="High Latency",
metric_name="llm_latency_ms",
condition="gt",
threshold=5000,
severity=AlertSeverity.ERROR
))
alert_manager.add_rule(AlertRule(
name="High Error Rate",
metric_name="llm_error_rate",
condition="gt",
threshold=0.1,
severity=AlertSeverity.CRITICAL
))
@app.post("/v1/metrics")
async def record_metric(request: MetricRequest) -> dict:
"""Record a metric."""
metric = LLMMetric(
name=request.name,
value=request.value,
metric_type=MetricType.LATENCY, # Would determine from name
tags=request.tags or {}
)
collector.metrics.append(metric)
# Check for anomalies
anomalies = anomaly_detector.check(request.name, request.value)
# Check alert rules
alerts = alert_manager.check_metric(request.name, request.value)
return {
"recorded": True,
"anomalies": len(anomalies),
"alerts": len(alerts)
}
@app.get("/v1/metrics/summary")
async def get_summary(window_minutes: int = 60) -> dict:
"""Get metrics summary."""
return aggregator.get_summary(window_minutes)
@app.get("/v1/dashboard")
async def get_dashboard() -> DashboardData:
"""Get dashboard data."""
summary = aggregator.get_summary(60)
return DashboardData(
latency_avg=summary.get("latency_avg_ms", 0),
latency_p95=summary.get("latency_p95_ms", 0),
tokens_total=int(summary.get("tokens_total", 0)),
cost_total=summary.get("cost_total_usd", 0),
error_rate=summary.get("error_count", 0) / max(summary.get("request_count", 1), 1),
quality_avg=summary.get("quality_avg", 0),
active_alerts=len(alert_manager.get_active_alerts())
)
@app.get("/v1/alerts")
async def get_alerts(active_only: bool = True) -> list[dict]:
"""Get alerts."""
if active_only:
alerts = alert_manager.get_active_alerts()
else:
alerts = alert_manager.alerts
return [
{
"alert_id": a.alert_id,
"title": a.title,
"severity": a.severity.value,
"metric": a.metric_name,
"value": a.value,
"timestamp": a.timestamp.isoformat(),
"acknowledged": a.acknowledged,
"resolved": a.resolved
}
for a in alerts
]
@app.post("/v1/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str) -> dict:
"""Acknowledge an alert."""
alert_manager.acknowledge(alert_id)
return {"acknowledged": True}
@app.post("/v1/alerts/{alert_id}/resolve")
async def resolve_alert(alert_id: str) -> dict:
"""Resolve an alert."""
alert_manager.resolve(alert_id)
return {"resolved": True}
@app.post("/v1/rules")
async def add_rule(request: AlertRuleRequest) -> dict:
"""Add alert rule."""
rule = AlertRule(
name=request.name,
metric_name=request.metric_name,
condition=request.condition,
threshold=request.threshold,
severity=AlertSeverity(request.severity)
)
alert_manager.add_rule(rule)
return {"added": True}
@app.get("/v1/anomalies")
async def get_anomalies(hours: int = 24) -> list[dict]:
"""Get recent anomalies."""
anomalies = anomaly_detector.get_recent_anomalies(hours)
return [
{
"metric": a.metric_name,
"value": a.value,
"expected_range": a.expected_range,
"severity": a.severity,
"timestamp": a.timestamp.isoformat(),
"description": a.description
}
for a in anomalies
]
@app.get("/metrics")
async def prometheus_metrics() -> str:
"""Export Prometheus metrics."""
exporter = PrometheusExporter(collector)
return exporter.export()
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LangSmith: https://docs.smith.langchain.com/
- Weights & Biases: https://wandb.ai/site/llm-monitoring
- Helicone: https://www.helicone.ai/
- Prometheus: https://prometheus.io/docs/introduction/overview/
- OpenTelemetry: https://opentelemetry.io/
Conclusion
LLM monitoring requires tracking metrics that traditional software monitoring doesn’t capture—token usage, output quality, and cost alongside latency and errors. Start with the basics: latency percentiles (p50, p95, p99), token counts, error rates, and cost per request. Add quality monitoring using automated scoring for relevance, coherence, and safety; LLM-as-judge provides deeper quality insights but adds cost and latency. Anomaly detection should combine statistical methods (z-score for sudden spikes) with threshold-based rules (hard limits on latency and cost) and trend detection (gradual degradation over time). Alert fatigue is real—tune your thresholds carefully and use cooldown periods to avoid notification storms. Build dashboards that show both real-time metrics and trends over time; a slowly increasing error rate is as important as a sudden spike. Export metrics to Prometheus for integration with existing monitoring infrastructure. The key insight is that LLM failures are often subtle—output quality degrades before errors appear, costs creep up before budgets are exceeded. Proactive monitoring catches these issues early, before they impact users or budgets.
