Introduction: LLM applications fail in ways traditional software doesn’t. A model might return syntactically correct but factually wrong responses. Latency can spike unpredictably. Costs can explode without warning. Token usage varies wildly based on input. Traditional APM tools miss these LLM-specific failure modes. This guide covers comprehensive monitoring for LLM applications: tracking latency, tokens, and costs; implementing distributed tracing across chains and agents; detecting quality degradation; and building alerting systems that catch problems before users notice them.

Metrics Collection
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import time
import json
@dataclass
class LLMMetrics:
"""Metrics for a single LLM call."""
request_id: str
model: str
timestamp: datetime
# Latency metrics
latency_ms: float
time_to_first_token_ms: Optional[float] = None
# Token metrics
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
# Cost metrics
cost_usd: float = 0.0
# Quality metrics
finish_reason: str = ""
error: Optional[str] = None
# Context
endpoint: str = ""
user_id: Optional[str] = None
session_id: Optional[str] = None
tags: dict = field(default_factory=dict)
class MetricsCollector:
"""Collect and aggregate LLM metrics."""
def __init__(self):
self.metrics: list[LLMMetrics] = []
self.cost_per_1k_tokens = {
"gpt-4o": {"input": 0.0025, "output": 0.01},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
"claude-3-opus": {"input": 0.015, "output": 0.075},
}
def calculate_cost(
self,
model: str,
prompt_tokens: int,
completion_tokens: int
) -> float:
"""Calculate cost for a request."""
if model not in self.cost_per_1k_tokens:
return 0.0
rates = self.cost_per_1k_tokens[model]
input_cost = (prompt_tokens / 1000) * rates["input"]
output_cost = (completion_tokens / 1000) * rates["output"]
return input_cost + output_cost
def record(self, metrics: LLMMetrics):
"""Record metrics."""
# Calculate cost if not provided
if metrics.cost_usd == 0.0:
metrics.cost_usd = self.calculate_cost(
metrics.model,
metrics.prompt_tokens,
metrics.completion_tokens
)
self.metrics.append(metrics)
def get_summary(self, window_minutes: int = 60) -> dict:
"""Get summary statistics for recent metrics."""
cutoff = datetime.utcnow().timestamp() - (window_minutes * 60)
recent = [m for m in self.metrics if m.timestamp.timestamp() > cutoff]
if not recent:
return {"count": 0}
latencies = [m.latency_ms for m in recent]
tokens = [m.total_tokens for m in recent]
costs = [m.cost_usd for m in recent]
errors = [m for m in recent if m.error]
return {
"count": len(recent),
"latency_p50_ms": sorted(latencies)[len(latencies) // 2],
"latency_p95_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"latency_p99_ms": sorted(latencies)[int(len(latencies) * 0.99)],
"avg_tokens": sum(tokens) / len(tokens),
"total_cost_usd": sum(costs),
"error_rate": len(errors) / len(recent),
"errors": len(errors)
}
class InstrumentedLLMClient:
"""LLM client with automatic metrics collection."""
def __init__(
self,
client: Any,
collector: MetricsCollector,
default_tags: dict = None
):
self.client = client
self.collector = collector
self.default_tags = default_tags or {}
async def chat_completion(
self,
model: str,
messages: list[dict],
request_id: str = None,
user_id: str = None,
session_id: str = None,
tags: dict = None,
**kwargs
) -> Any:
"""Make instrumented chat completion request."""
import uuid
request_id = request_id or str(uuid.uuid4())
start_time = time.time()
error = None
response = None
try:
response = await self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
return response
except Exception as e:
error = str(e)
raise
finally:
latency_ms = (time.time() - start_time) * 1000
metrics = LLMMetrics(
request_id=request_id,
model=model,
timestamp=datetime.utcnow(),
latency_ms=latency_ms,
prompt_tokens=response.usage.prompt_tokens if response else 0,
completion_tokens=response.usage.completion_tokens if response else 0,
total_tokens=response.usage.total_tokens if response else 0,
finish_reason=response.choices[0].finish_reason if response else "",
error=error,
endpoint="chat.completions",
user_id=user_id,
session_id=session_id,
tags={**self.default_tags, **(tags or {})}
)
self.collector.record(metrics)
Distributed Tracing
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from contextlib import contextmanager
import uuid
import threading
@dataclass
class Span:
"""A span in a distributed trace."""
trace_id: str
span_id: str
parent_span_id: Optional[str]
name: str
start_time: datetime
end_time: Optional[datetime] = None
# Span data
attributes: dict = field(default_factory=dict)
events: list[dict] = field(default_factory=list)
status: str = "ok"
error: Optional[str] = None
@property
def duration_ms(self) -> float:
if not self.end_time:
return 0
return (self.end_time - self.start_time).total_seconds() * 1000
class Tracer:
"""Distributed tracing for LLM applications."""
_local = threading.local()
def __init__(self, service_name: str, exporter: Any = None):
self.service_name = service_name
self.exporter = exporter
self.spans: list[Span] = []
@property
def current_span(self) -> Optional[Span]:
return getattr(self._local, 'current_span', None)
@current_span.setter
def current_span(self, span: Optional[Span]):
self._local.current_span = span
@contextmanager
def start_span(self, name: str, attributes: dict = None):
"""Start a new span."""
parent = self.current_span
span = Span(
trace_id=parent.trace_id if parent else str(uuid.uuid4()),
span_id=str(uuid.uuid4()),
parent_span_id=parent.span_id if parent else None,
name=name,
start_time=datetime.utcnow(),
attributes=attributes or {}
)
previous_span = self.current_span
self.current_span = span
try:
yield span
span.status = "ok"
except Exception as e:
span.status = "error"
span.error = str(e)
raise
finally:
span.end_time = datetime.utcnow()
self.spans.append(span)
self.current_span = previous_span
if self.exporter:
self.exporter.export(span)
def add_event(self, name: str, attributes: dict = None):
"""Add event to current span."""
if self.current_span:
self.current_span.events.append({
"name": name,
"timestamp": datetime.utcnow().isoformat(),
"attributes": attributes or {}
})
def set_attribute(self, key: str, value: Any):
"""Set attribute on current span."""
if self.current_span:
self.current_span.attributes[key] = value
class LLMTracer:
"""Specialized tracer for LLM operations."""
def __init__(self, tracer: Tracer):
self.tracer = tracer
@contextmanager
def trace_llm_call(
self,
model: str,
operation: str = "chat.completion"
):
"""Trace an LLM API call."""
with self.tracer.start_span(
f"llm.{operation}",
attributes={
"llm.model": model,
"llm.operation": operation
}
) as span:
yield span
@contextmanager
def trace_chain(self, chain_name: str):
"""Trace a chain execution."""
with self.tracer.start_span(
f"chain.{chain_name}",
attributes={"chain.name": chain_name}
) as span:
yield span
@contextmanager
def trace_retrieval(self, retriever_name: str):
"""Trace a retrieval operation."""
with self.tracer.start_span(
f"retrieval.{retriever_name}",
attributes={"retriever.name": retriever_name}
) as span:
yield span
@contextmanager
def trace_tool_call(self, tool_name: str):
"""Trace a tool/function call."""
with self.tracer.start_span(
f"tool.{tool_name}",
attributes={"tool.name": tool_name}
) as span:
yield span
def record_tokens(self, prompt_tokens: int, completion_tokens: int):
"""Record token usage on current span."""
self.tracer.set_attribute("llm.prompt_tokens", prompt_tokens)
self.tracer.set_attribute("llm.completion_tokens", completion_tokens)
self.tracer.set_attribute("llm.total_tokens", prompt_tokens + completion_tokens)
def record_cost(self, cost_usd: float):
"""Record cost on current span."""
self.tracer.set_attribute("llm.cost_usd", cost_usd)
class TracedRAGPipeline:
"""RAG pipeline with full tracing."""
def __init__(
self,
retriever: Any,
llm_client: Any,
llm_tracer: LLMTracer
):
self.retriever = retriever
self.llm_client = llm_client
self.llm_tracer = llm_tracer
async def query(self, question: str) -> str:
"""Execute traced RAG query."""
with self.llm_tracer.trace_chain("rag_pipeline"):
# Trace retrieval
with self.llm_tracer.trace_retrieval("vector_search"):
docs = await self.retriever.search(question, top_k=5)
self.llm_tracer.tracer.set_attribute("retrieval.doc_count", len(docs))
# Build context
context = "\n\n".join([d.content for d in docs])
# Trace LLM call
with self.llm_tracer.trace_llm_call("gpt-4o-mini"):
response = await self.llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"Context:\n{context}"},
{"role": "user", "content": question}
]
)
self.llm_tracer.record_tokens(
response.usage.prompt_tokens,
response.usage.completion_tokens
)
return response.choices[0].message.content
Quality Monitoring
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
import re
@dataclass
class QualityMetrics:
"""Quality metrics for an LLM response."""
request_id: str
timestamp: datetime
# Response quality
response_length: int
has_refusal: bool
has_hallucination_markers: bool
sentiment_score: float
# Factual consistency (if ground truth available)
factual_accuracy: Optional[float] = None
# User feedback
user_rating: Optional[int] = None
user_feedback: Optional[str] = None
class QualityMonitor:
"""Monitor LLM response quality."""
def __init__(self, client: Any = None):
self.client = client
self.refusal_patterns = [
r"I cannot",
r"I'm unable to",
r"I don't have access",
r"I apologize, but",
r"As an AI",
r"I'm not able to"
]
self.hallucination_markers = [
r"I think",
r"I believe",
r"probably",
r"might be",
r"I'm not sure",
r"as far as I know"
]
def analyze_response(
self,
request_id: str,
response: str,
query: str = None
) -> QualityMetrics:
"""Analyze response quality."""
has_refusal = any(
re.search(pattern, response, re.IGNORECASE)
for pattern in self.refusal_patterns
)
has_hallucination_markers = any(
re.search(pattern, response, re.IGNORECASE)
for pattern in self.hallucination_markers
)
# Simple sentiment (positive words - negative words)
positive_words = len(re.findall(r'\b(good|great|excellent|helpful|correct)\b', response, re.IGNORECASE))
negative_words = len(re.findall(r'\b(bad|wrong|error|incorrect|sorry)\b', response, re.IGNORECASE))
sentiment = (positive_words - negative_words) / max(len(response.split()), 1)
return QualityMetrics(
request_id=request_id,
timestamp=datetime.utcnow(),
response_length=len(response),
has_refusal=has_refusal,
has_hallucination_markers=has_hallucination_markers,
sentiment_score=sentiment
)
async def evaluate_factual_accuracy(
self,
response: str,
ground_truth: str
) -> float:
"""Evaluate factual accuracy using LLM."""
if not self.client:
return 0.0
prompt = f"""Compare the response to the ground truth and rate factual accuracy from 0 to 1.
Only respond with a number.
Response: {response[:1000]}
Ground Truth: {ground_truth[:1000]}
Accuracy (0-1):"""
result = await self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=5
)
try:
return float(result.choices[0].message.content.strip())
except ValueError:
return 0.5
class DriftDetector:
"""Detect quality drift over time."""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.baseline_metrics: list[QualityMetrics] = []
self.current_metrics: list[QualityMetrics] = []
def set_baseline(self, metrics: list[QualityMetrics]):
"""Set baseline metrics for comparison."""
self.baseline_metrics = metrics[-self.window_size:]
def add_metric(self, metric: QualityMetrics):
"""Add new metric and check for drift."""
self.current_metrics.append(metric)
if len(self.current_metrics) > self.window_size:
self.current_metrics.pop(0)
def check_drift(self) -> dict:
"""Check for quality drift."""
if len(self.baseline_metrics) < 10 or len(self.current_metrics) < 10:
return {"drift_detected": False, "reason": "insufficient_data"}
# Compare refusal rates
baseline_refusal_rate = sum(1 for m in self.baseline_metrics if m.has_refusal) / len(self.baseline_metrics)
current_refusal_rate = sum(1 for m in self.current_metrics if m.has_refusal) / len(self.current_metrics)
# Compare response lengths
baseline_avg_length = sum(m.response_length for m in self.baseline_metrics) / len(self.baseline_metrics)
current_avg_length = sum(m.response_length for m in self.current_metrics) / len(self.current_metrics)
# Compare hallucination markers
baseline_hallucination_rate = sum(1 for m in self.baseline_metrics if m.has_hallucination_markers) / len(self.baseline_metrics)
current_hallucination_rate = sum(1 for m in self.current_metrics if m.has_hallucination_markers) / len(self.current_metrics)
drift_detected = False
reasons = []
if abs(current_refusal_rate - baseline_refusal_rate) > 0.1:
drift_detected = True
reasons.append(f"refusal_rate_change: {baseline_refusal_rate:.2f} -> {current_refusal_rate:.2f}")
if abs(current_avg_length - baseline_avg_length) / baseline_avg_length > 0.3:
drift_detected = True
reasons.append(f"length_change: {baseline_avg_length:.0f} -> {current_avg_length:.0f}")
if abs(current_hallucination_rate - baseline_hallucination_rate) > 0.15:
drift_detected = True
reasons.append(f"hallucination_rate_change: {baseline_hallucination_rate:.2f} -> {current_hallucination_rate:.2f}")
return {
"drift_detected": drift_detected,
"reasons": reasons,
"metrics": {
"baseline_refusal_rate": baseline_refusal_rate,
"current_refusal_rate": current_refusal_rate,
"baseline_avg_length": baseline_avg_length,
"current_avg_length": current_avg_length
}
}
Alerting System
from dataclasses import dataclass
from typing import Any, Callable
from datetime import datetime, timedelta
from enum import Enum
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class Alert:
"""An alert triggered by monitoring."""
id: str
name: str
severity: AlertSeverity
message: str
timestamp: datetime
metadata: dict = None
@dataclass
class AlertRule:
"""A rule for triggering alerts."""
name: str
condition: Callable[[dict], bool]
severity: AlertSeverity
message_template: str
cooldown_minutes: int = 5
class AlertManager:
"""Manage alerts for LLM applications."""
def __init__(self):
self.rules: list[AlertRule] = []
self.alerts: list[Alert] = []
self.last_alert_times: dict[str, datetime] = {}
self.notifiers: list[Callable[[Alert], None]] = []
def add_rule(self, rule: AlertRule):
"""Add an alert rule."""
self.rules.append(rule)
def add_notifier(self, notifier: Callable[[Alert], None]):
"""Add a notification handler."""
self.notifiers.append(notifier)
def check_rules(self, metrics: dict):
"""Check all rules against current metrics."""
import uuid
for rule in self.rules:
# Check cooldown
last_alert = self.last_alert_times.get(rule.name)
if last_alert:
cooldown_end = last_alert + timedelta(minutes=rule.cooldown_minutes)
if datetime.utcnow() < cooldown_end:
continue
# Check condition
if rule.condition(metrics):
alert = Alert(
id=str(uuid.uuid4()),
name=rule.name,
severity=rule.severity,
message=rule.message_template.format(**metrics),
timestamp=datetime.utcnow(),
metadata=metrics
)
self.alerts.append(alert)
self.last_alert_times[rule.name] = datetime.utcnow()
# Notify
for notifier in self.notifiers:
try:
notifier(alert)
except Exception as e:
print(f"Notifier error: {e}")
def get_active_alerts(self, hours: int = 24) -> list[Alert]:
"""Get recent alerts."""
cutoff = datetime.utcnow() - timedelta(hours=hours)
return [a for a in self.alerts if a.timestamp > cutoff]
def create_default_alert_rules() -> list[AlertRule]:
"""Create default alert rules for LLM monitoring."""
return [
AlertRule(
name="high_latency",
condition=lambda m: m.get("latency_p95_ms", 0) > 5000,
severity=AlertSeverity.WARNING,
message_template="High latency detected: P95 = {latency_p95_ms:.0f}ms"
),
AlertRule(
name="critical_latency",
condition=lambda m: m.get("latency_p99_ms", 0) > 10000,
severity=AlertSeverity.CRITICAL,
message_template="Critical latency: P99 = {latency_p99_ms:.0f}ms"
),
AlertRule(
name="high_error_rate",
condition=lambda m: m.get("error_rate", 0) > 0.05,
severity=AlertSeverity.WARNING,
message_template="High error rate: {error_rate:.1%}"
),
AlertRule(
name="critical_error_rate",
condition=lambda m: m.get("error_rate", 0) > 0.1,
severity=AlertSeverity.CRITICAL,
message_template="Critical error rate: {error_rate:.1%}"
),
AlertRule(
name="cost_spike",
condition=lambda m: m.get("total_cost_usd", 0) > 100,
severity=AlertSeverity.WARNING,
message_template="Cost spike: ${total_cost_usd:.2f} in monitoring window"
),
AlertRule(
name="token_spike",
condition=lambda m: m.get("avg_tokens", 0) > 10000,
severity=AlertSeverity.WARNING,
message_template="High token usage: avg {avg_tokens:.0f} tokens per request"
)
]
class SlackNotifier:
"""Send alerts to Slack."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def __call__(self, alert: Alert):
"""Send alert to Slack."""
import requests
color = {
AlertSeverity.INFO: "#36a64f",
AlertSeverity.WARNING: "#ff9800",
AlertSeverity.CRITICAL: "#f44336"
}.get(alert.severity, "#808080")
payload = {
"attachments": [{
"color": color,
"title": f"[{alert.severity.value.upper()}] {alert.name}",
"text": alert.message,
"ts": int(alert.timestamp.timestamp())
}]
}
requests.post(self.webhook_url, json=payload)
class PagerDutyNotifier:
"""Send critical alerts to PagerDuty."""
def __init__(self, routing_key: str):
self.routing_key = routing_key
def __call__(self, alert: Alert):
"""Send alert to PagerDuty."""
if alert.severity != AlertSeverity.CRITICAL:
return
import requests
payload = {
"routing_key": self.routing_key,
"event_action": "trigger",
"payload": {
"summary": alert.message,
"severity": "critical",
"source": "llm-monitoring",
"custom_details": alert.metadata
}
}
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
)
Production Monitoring Service
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
app = FastAPI()
# Initialize components
metrics_collector = MetricsCollector()
tracer = Tracer("llm-service")
llm_tracer = LLMTracer(tracer)
quality_monitor = QualityMonitor()
drift_detector = DriftDetector()
alert_manager = AlertManager()
# Add default alert rules
for rule in create_default_alert_rules():
alert_manager.add_rule(rule)
class MetricsRequest(BaseModel):
request_id: str
model: str
latency_ms: float
prompt_tokens: int
completion_tokens: int
finish_reason: str
error: Optional[str] = None
user_id: Optional[str] = None
class QualityRequest(BaseModel):
request_id: str
response: str
query: Optional[str] = None
class FeedbackRequest(BaseModel):
request_id: str
rating: int
feedback: Optional[str] = None
@app.post("/v1/metrics")
async def record_metrics(request: MetricsRequest, background_tasks: BackgroundTasks):
"""Record LLM metrics."""
metrics = LLMMetrics(
request_id=request.request_id,
model=request.model,
timestamp=datetime.utcnow(),
latency_ms=request.latency_ms,
prompt_tokens=request.prompt_tokens,
completion_tokens=request.completion_tokens,
total_tokens=request.prompt_tokens + request.completion_tokens,
finish_reason=request.finish_reason,
error=request.error,
user_id=request.user_id
)
metrics_collector.record(metrics)
# Check alerts in background
background_tasks.add_task(check_alerts)
return {"status": "recorded", "request_id": request.request_id}
@app.post("/v1/quality")
async def analyze_quality(request: QualityRequest):
"""Analyze response quality."""
quality_metrics = quality_monitor.analyze_response(
request.request_id,
request.response,
request.query
)
drift_detector.add_metric(quality_metrics)
return {
"request_id": request.request_id,
"response_length": quality_metrics.response_length,
"has_refusal": quality_metrics.has_refusal,
"has_hallucination_markers": quality_metrics.has_hallucination_markers,
"sentiment_score": quality_metrics.sentiment_score
}
@app.post("/v1/feedback")
async def record_feedback(request: FeedbackRequest):
"""Record user feedback."""
# Find and update quality metrics
for metric in drift_detector.current_metrics:
if metric.request_id == request.request_id:
metric.user_rating = request.rating
metric.user_feedback = request.feedback
break
return {"status": "recorded", "request_id": request.request_id}
@app.get("/v1/summary")
async def get_summary(window_minutes: int = 60):
"""Get metrics summary."""
return metrics_collector.get_summary(window_minutes)
@app.get("/v1/drift")
async def check_drift():
"""Check for quality drift."""
return drift_detector.check_drift()
@app.get("/v1/alerts")
async def get_alerts(hours: int = 24):
"""Get recent alerts."""
alerts = alert_manager.get_active_alerts(hours)
return {
"count": len(alerts),
"alerts": [
{
"id": a.id,
"name": a.name,
"severity": a.severity.value,
"message": a.message,
"timestamp": a.timestamp.isoformat()
}
for a in alerts
]
}
@app.get("/v1/traces/{trace_id}")
async def get_trace(trace_id: str):
"""Get trace by ID."""
spans = [s for s in tracer.spans if s.trace_id == trace_id]
if not spans:
return {"error": "trace_not_found"}
return {
"trace_id": trace_id,
"spans": [
{
"span_id": s.span_id,
"parent_span_id": s.parent_span_id,
"name": s.name,
"duration_ms": s.duration_ms,
"status": s.status,
"attributes": s.attributes
}
for s in spans
]
}
async def check_alerts():
"""Background task to check alerts."""
summary = metrics_collector.get_summary(window_minutes=5)
alert_manager.check_rules(summary)
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenTelemetry Python: https://opentelemetry.io/docs/instrumentation/python/
- LangSmith Tracing: https://docs.smith.langchain.com/
- Weights & Biases Prompts: https://docs.wandb.ai/guides/prompts
- Prometheus Python Client: https://prometheus.io/docs/instrumenting/clientlibs/
Conclusion
Comprehensive monitoring is essential for production LLM applications. Track the metrics that matter: latency (including time-to-first-token for streaming), token usage, costs, and error rates. Implement distributed tracing to understand how requests flow through chains, retrievers, and tool calls. Monitor quality through automated analysis of refusals, hallucination markers, and response characteristics. Set up drift detection to catch gradual degradation before it impacts users. Build alerting systems with appropriate thresholds and cooldowns to avoid alert fatigue. The key insight is that LLM monitoring requires domain-specific metrics beyond traditional APM—you need to track tokens, costs, and quality alongside latency and errors. Start with basic metrics collection, add tracing for debugging, then layer in quality monitoring and alerting as your system matures.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.