Introduction: LLM applications are notoriously difficult to debug. Unlike traditional software where errors are obvious, LLM issues manifest as subtle quality degradation, unexpected costs, or slow responses. Proper observability is essential for production LLM systems. This guide covers monitoring strategies: tracking latency, tokens, and costs; implementing distributed tracing for complex chains; structured logging for debugging; quality metrics and evaluation; and alerting on anomalies. These patterns help you understand what’s happening inside your LLM application and catch problems before users notice.

Basic Metrics Collection
import time
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict
import statistics
@dataclass
class LLMMetrics:
"""Metrics for a single LLM call."""
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
latency_ms: float
cost_usd: float
success: bool
error: Optional[str] = None
timestamp: float = field(default_factory=time.time)
class MetricsCollector:
"""Collect and aggregate LLM metrics."""
# Cost per 1K tokens (approximate)
COSTS = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"claude-3-opus": {"input": 0.015, "output": 0.075},
"claude-3-sonnet": {"input": 0.003, "output": 0.015},
}
def __init__(self):
self.metrics: list[LLMMetrics] = []
self.by_model: dict[str, list[LLMMetrics]] = defaultdict(list)
def calculate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> float:
"""Calculate cost for a request."""
costs = self.COSTS.get(model, {"input": 0.01, "output": 0.03})
return (
(prompt_tokens / 1000) * costs["input"] +
(completion_tokens / 1000) * costs["output"]
)
def record(
self,
model: str,
prompt_tokens: int,
completion_tokens: int,
latency_ms: float,
success: bool = True,
error: str = None
):
"""Record metrics for an LLM call."""
cost = self.calculate_cost(model, prompt_tokens, completion_tokens)
metric = LLMMetrics(
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
latency_ms=latency_ms,
cost_usd=cost,
success=success,
error=error
)
self.metrics.append(metric)
self.by_model[model].append(metric)
def get_summary(self, window_seconds: int = 3600) -> dict:
"""Get metrics summary for time window."""
cutoff = time.time() - window_seconds
recent = [m for m in self.metrics if m.timestamp > cutoff]
if not recent:
return {"total_requests": 0}
latencies = [m.latency_ms for m in recent]
return {
"total_requests": len(recent),
"success_rate": sum(1 for m in recent if m.success) / len(recent),
"total_tokens": sum(m.total_tokens for m in recent),
"total_cost_usd": sum(m.cost_usd for m in recent),
"avg_latency_ms": statistics.mean(latencies),
"p50_latency_ms": statistics.median(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)] if len(latencies) > 20 else max(latencies),
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if len(latencies) > 100 else max(latencies),
"requests_by_model": {
model: len(metrics)
for model, metrics in self.by_model.items()
}
}
# Global collector
metrics = MetricsCollector()
# Usage with OpenAI
from openai import OpenAI
client = OpenAI()
def monitored_completion(prompt: str, model: str = "gpt-4o") -> str:
"""Completion with automatic metrics collection."""
start_time = time.time()
error = None
success = True
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
metrics.record(
model=model,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
latency_ms=(time.time() - start_time) * 1000,
success=True
)
return result
except Exception as e:
metrics.record(
model=model,
prompt_tokens=0,
completion_tokens=0,
latency_ms=(time.time() - start_time) * 1000,
success=False,
error=str(e)
)
raise
# Get summary
summary = metrics.get_summary(window_seconds=3600)
print(f"Requests: {summary['total_requests']}")
print(f"Success rate: {summary.get('success_rate', 0):.1%}")
print(f"Total cost: ${summary.get('total_cost_usd', 0):.4f}")
Distributed Tracing
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Optional, Any
@dataclass
class Span:
"""A span in a distributed trace."""
trace_id: str
span_id: str
parent_id: Optional[str]
name: str
start_time: float
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
events: list = field(default_factory=list)
status: str = "OK"
@property
def duration_ms(self) -> Optional[float]:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return None
class Tracer:
"""Simple distributed tracer for LLM applications."""
def __init__(self):
self.traces: dict[str, list[Span]] = defaultdict(list)
self._current_span: Optional[Span] = None
self._current_trace_id: Optional[str] = None
@contextmanager
def start_trace(self, name: str):
"""Start a new trace."""
trace_id = str(uuid.uuid4())
self._current_trace_id = trace_id
with self.start_span(name) as span:
yield span
self._current_trace_id = None
@contextmanager
def start_span(self, name: str, attributes: dict = None):
"""Start a new span within current trace."""
trace_id = self._current_trace_id or str(uuid.uuid4())
parent_id = self._current_span.span_id if self._current_span else None
span = Span(
trace_id=trace_id,
span_id=str(uuid.uuid4()),
parent_id=parent_id,
name=name,
start_time=time.time(),
attributes=attributes or {}
)
previous_span = self._current_span
self._current_span = span
try:
yield span
span.status = "OK"
except Exception as e:
span.status = "ERROR"
span.attributes["error"] = str(e)
raise
finally:
span.end_time = time.time()
self.traces[trace_id].append(span)
self._current_span = previous_span
def add_event(self, name: str, attributes: dict = None):
"""Add event to current span."""
if self._current_span:
self._current_span.events.append({
"name": name,
"timestamp": time.time(),
"attributes": attributes or {}
})
def set_attribute(self, key: str, value: Any):
"""Set attribute on current span."""
if self._current_span:
self._current_span.attributes[key] = value
def get_trace(self, trace_id: str) -> list[Span]:
"""Get all spans for a trace."""
return self.traces.get(trace_id, [])
def print_trace(self, trace_id: str):
"""Print trace in readable format."""
spans = self.get_trace(trace_id)
for span in sorted(spans, key=lambda s: s.start_time):
indent = " " if span.parent_id else ""
print(f"{indent}{span.name}: {span.duration_ms:.1f}ms [{span.status}]")
for key, value in span.attributes.items():
print(f"{indent} {key}: {value}")
# Global tracer
tracer = Tracer()
# Usage in RAG pipeline
def traced_rag_query(query: str) -> str:
"""RAG query with full tracing."""
with tracer.start_trace("rag_query") as root:
tracer.set_attribute("query", query)
# Embedding generation
with tracer.start_span("generate_embedding"):
response = client.embeddings.create(
model="text-embedding-3-small",
input=query
)
embedding = response.data[0].embedding
tracer.set_attribute("embedding_dim", len(embedding))
# Vector search
with tracer.start_span("vector_search"):
# Simulated search
results = [{"text": "Result 1"}, {"text": "Result 2"}]
tracer.set_attribute("num_results", len(results))
# LLM generation
with tracer.start_span("llm_generation"):
context = "\n".join(r["text"] for r in results)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"Context: {context}"},
{"role": "user", "content": query}
]
)
tracer.set_attribute("prompt_tokens", response.usage.prompt_tokens)
tracer.set_attribute("completion_tokens", response.usage.completion_tokens)
return response.choices[0].message.content
# Run traced query
result = traced_rag_query("What is machine learning?")
# Print trace
tracer.print_trace(tracer._current_trace_id)
Structured Logging
import json
import logging
from datetime import datetime
class StructuredLogger:
"""JSON structured logger for LLM applications."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.default_fields = {}
def set_default_field(self, key: str, value: Any):
"""Set a field that appears in all log entries."""
self.default_fields[key] = value
def _log(self, level: str, message: str, **kwargs):
"""Internal log method."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
**self.default_fields,
**kwargs
}
log_func = getattr(self.logger, level.lower())
log_func(json.dumps(entry))
def info(self, message: str, **kwargs):
self._log("INFO", message, **kwargs)
def warning(self, message: str, **kwargs):
self._log("WARNING", message, **kwargs)
def error(self, message: str, **kwargs):
self._log("ERROR", message, **kwargs)
def llm_request(
self,
model: str,
prompt_tokens: int,
completion_tokens: int,
latency_ms: float,
success: bool,
**kwargs
):
"""Log an LLM request with standard fields."""
self._log(
"INFO" if success else "ERROR",
"llm_request",
event_type="llm_request",
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
latency_ms=latency_ms,
success=success,
**kwargs
)
# Usage
logger = StructuredLogger("llm_app")
logger.set_default_field("service", "rag-api")
logger.set_default_field("environment", "production")
def logged_completion(prompt: str, model: str = "gpt-4o") -> str:
"""Completion with structured logging."""
request_id = str(uuid.uuid4())
start_time = time.time()
logger.info(
"llm_request_start",
request_id=request_id,
model=model,
prompt_length=len(prompt)
)
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
latency_ms = (time.time() - start_time) * 1000
logger.llm_request(
model=model,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
latency_ms=latency_ms,
success=True,
request_id=request_id
)
return response.choices[0].message.content
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
logger.llm_request(
model=model,
prompt_tokens=0,
completion_tokens=0,
latency_ms=latency_ms,
success=False,
request_id=request_id,
error=str(e),
error_type=type(e).__name__
)
raise
Quality Monitoring
from enum import Enum
class QualityDimension(str, Enum):
RELEVANCE = "relevance"
COHERENCE = "coherence"
FACTUALITY = "factuality"
HELPFULNESS = "helpfulness"
class QualityMonitor:
"""Monitor LLM output quality."""
def __init__(self, sample_rate: float = 0.1):
self.sample_rate = sample_rate
self.evaluations: list[dict] = []
def should_evaluate(self) -> bool:
"""Determine if this request should be evaluated."""
import random
return random.random() < self.sample_rate
def evaluate_response(
self,
query: str,
response: str,
context: str = None
) -> dict:
"""Evaluate response quality using LLM."""
eval_prompt = f"""Evaluate this AI response on a scale of 1-5 for each dimension.
Query: {query}
{"Context: " + context if context else ""}
Response: {response}
Rate each dimension (1=poor, 5=excellent):
- relevance: How relevant is the response to the query?
- coherence: How well-structured and clear is the response?
- factuality: How accurate and factual is the information?
- helpfulness: How helpful is the response for the user?
Return JSON: {{"relevance": 1-5, "coherence": 1-5, "factuality": 1-5, "helpfulness": 1-5, "issues": ["list of issues"]}}"""
eval_response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": eval_prompt}],
response_format={"type": "json_object"}
)
scores = json.loads(eval_response.choices[0].message.content)
evaluation = {
"timestamp": time.time(),
"query": query[:200],
"response": response[:500],
"scores": scores,
"avg_score": sum(scores.get(d.value, 0) for d in QualityDimension) / 4
}
self.evaluations.append(evaluation)
return evaluation
def get_quality_summary(self, window_seconds: int = 86400) -> dict:
"""Get quality metrics summary."""
cutoff = time.time() - window_seconds
recent = [e for e in self.evaluations if e["timestamp"] > cutoff]
if not recent:
return {"evaluations": 0}
avg_scores = {}
for dim in QualityDimension:
scores = [e["scores"].get(dim.value, 0) for e in recent]
avg_scores[dim.value] = sum(scores) / len(scores)
return {
"evaluations": len(recent),
"avg_scores": avg_scores,
"overall_avg": sum(avg_scores.values()) / len(avg_scores),
"low_quality_count": sum(1 for e in recent if e["avg_score"] < 3)
}
# Usage
quality_monitor = QualityMonitor(sample_rate=0.1)
def quality_monitored_completion(query: str, context: str = None) -> str:
"""Completion with quality monitoring."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"Context: {context}" if context else "You are helpful."},
{"role": "user", "content": query}
]
)
result = response.choices[0].message.content
# Sample-based quality evaluation
if quality_monitor.should_evaluate():
evaluation = quality_monitor.evaluate_response(query, result, context)
if evaluation["avg_score"] < 3:
logger.warning(
"low_quality_response",
query=query[:100],
scores=evaluation["scores"]
)
return result
Alerting System
from dataclasses import dataclass
from typing import Callable
@dataclass
class AlertRule:
name: str
condition: Callable[[dict], bool]
severity: str # "warning", "critical"
message_template: str
class AlertManager:
"""Manage alerts for LLM metrics."""
def __init__(self):
self.rules: list[AlertRule] = []
self.alerts: list[dict] = []
self.alert_handlers: list[Callable] = []
def add_rule(self, rule: AlertRule):
"""Add an alert rule."""
self.rules.append(rule)
def add_handler(self, handler: Callable):
"""Add alert handler (e.g., Slack, PagerDuty)."""
self.alert_handlers.append(handler)
def check_metrics(self, metrics_summary: dict):
"""Check metrics against all rules."""
for rule in self.rules:
try:
if rule.condition(metrics_summary):
alert = {
"timestamp": time.time(),
"rule": rule.name,
"severity": rule.severity,
"message": rule.message_template.format(**metrics_summary),
"metrics": metrics_summary
}
self.alerts.append(alert)
for handler in self.alert_handlers:
handler(alert)
except Exception as e:
logger.error(f"Alert rule check failed: {rule.name}", error=str(e))
def get_recent_alerts(self, window_seconds: int = 3600) -> list[dict]:
"""Get recent alerts."""
cutoff = time.time() - window_seconds
return [a for a in self.alerts if a["timestamp"] > cutoff]
# Setup alerts
alert_manager = AlertManager()
# High error rate alert
alert_manager.add_rule(AlertRule(
name="high_error_rate",
condition=lambda m: m.get("success_rate", 1) < 0.95,
severity="critical",
message_template="LLM error rate is {success_rate:.1%}, above 5% threshold"
))
# High latency alert
alert_manager.add_rule(AlertRule(
name="high_latency",
condition=lambda m: m.get("p95_latency_ms", 0) > 5000,
severity="warning",
message_template="P95 latency is {p95_latency_ms:.0f}ms, above 5s threshold"
))
# Cost spike alert
alert_manager.add_rule(AlertRule(
name="cost_spike",
condition=lambda m: m.get("total_cost_usd", 0) > 100,
severity="warning",
message_template="Hourly cost is ${total_cost_usd:.2f}, above $100 threshold"
))
# Low quality alert
alert_manager.add_rule(AlertRule(
name="low_quality",
condition=lambda m: m.get("overall_avg", 5) < 3.5,
severity="warning",
message_template="Average quality score is {overall_avg:.2f}, below 3.5 threshold"
))
# Slack handler example
def slack_alert_handler(alert: dict):
"""Send alert to Slack."""
# In production, use actual Slack webhook
print(f"[ALERT] [{alert['severity'].upper()}] {alert['message']}")
alert_manager.add_handler(slack_alert_handler)
# Check metrics periodically
summary = metrics.get_summary()
alert_manager.check_metrics(summary)
Production Monitoring Service
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class MetricsResponse(BaseModel):
requests: int
success_rate: float
avg_latency_ms: float
p95_latency_ms: float
total_cost_usd: float
tokens_used: int
@app.get("/metrics", response_model=MetricsResponse)
async def get_metrics(window_hours: int = 1):
"""Get LLM metrics for dashboard."""
summary = metrics.get_summary(window_seconds=window_hours * 3600)
return MetricsResponse(
requests=summary.get("total_requests", 0),
success_rate=summary.get("success_rate", 1.0),
avg_latency_ms=summary.get("avg_latency_ms", 0),
p95_latency_ms=summary.get("p95_latency_ms", 0),
total_cost_usd=summary.get("total_cost_usd", 0),
tokens_used=summary.get("total_tokens", 0)
)
@app.get("/quality")
async def get_quality(window_hours: int = 24):
"""Get quality metrics."""
return quality_monitor.get_quality_summary(window_seconds=window_hours * 3600)
@app.get("/alerts")
async def get_alerts(window_hours: int = 24):
"""Get recent alerts."""
return alert_manager.get_recent_alerts(window_seconds=window_hours * 3600)
@app.get("/health")
async def health_check():
"""Health check with basic metrics."""
summary = metrics.get_summary(window_seconds=300) # Last 5 minutes
healthy = (
summary.get("success_rate", 1) > 0.9 and
summary.get("p95_latency_ms", 0) < 10000
)
return {
"status": "healthy" if healthy else "degraded",
"checks": {
"error_rate": summary.get("success_rate", 1) > 0.9,
"latency": summary.get("p95_latency_ms", 0) < 10000
}
}
References
- OpenTelemetry: https://opentelemetry.io/docs/
- LangSmith: https://docs.smith.langchain.com/
- Weights & Biases: https://docs.wandb.ai/guides/prompts
- Helicone: https://docs.helicone.ai/
Conclusion
Observability is essential for production LLM applications. Start with basic metrics—latency, tokens, costs, and error rates. Add distributed tracing to understand complex chains and identify bottlenecks. Use structured logging for debugging and audit trails. Implement quality monitoring with sample-based evaluation to catch degradation. Set up alerts for critical thresholds before users notice problems. The combination of metrics, traces, and logs gives you complete visibility into your LLM system's behavior, enabling you to optimize performance, control costs, and maintain quality at scale.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.