Introduction: LLM applications are notoriously difficult to debug and monitor. Unlike traditional software where inputs and outputs are deterministic, LLMs produce variable outputs that can fail in subtle ways. Observability—the ability to understand system behavior from external outputs—is essential for production LLM systems. This guide covers practical observability patterns: distributed tracing for complex LLM chains, metrics collection for performance and cost monitoring, structured logging for debugging and analysis, and alerting strategies for catching issues before users do. Whether you’re running simple completions or complex multi-agent systems, robust observability transforms LLM operations from guesswork into engineering.

Distributed Tracing
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import uuid
import time
from contextlib import contextmanager
@dataclass
class Span:
"""A span in a trace."""
trace_id: str
span_id: str
parent_id: Optional[str]
name: str
start_time: float
end_time: float = None
attributes: dict = field(default_factory=dict)
events: list[dict] = field(default_factory=list)
status: str = "ok"
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
@dataclass
class Trace:
"""A complete trace."""
trace_id: str
spans: list[Span] = field(default_factory=list)
@property
def root_span(self) -> Optional[Span]:
for span in self.spans:
if span.parent_id is None:
return span
return None
@property
def total_duration_ms(self) -> float:
root = self.root_span
return root.duration_ms if root else 0
class LLMTracer:
"""Tracer for LLM operations."""
def __init__(self, service_name: str = "llm-service"):
self.service_name = service_name
self._current_trace: Optional[str] = None
self._current_span: Optional[str] = None
self._traces: dict[str, Trace] = {}
self._spans: dict[str, Span] = {}
def start_trace(self, name: str) -> str:
"""Start a new trace."""
trace_id = str(uuid.uuid4())
self._current_trace = trace_id
self._traces[trace_id] = Trace(trace_id=trace_id)
# Create root span
self.start_span(name)
return trace_id
def start_span(
self,
name: str,
attributes: dict = None
) -> str:
"""Start a new span."""
span_id = str(uuid.uuid4())
span = Span(
trace_id=self._current_trace,
span_id=span_id,
parent_id=self._current_span,
name=name,
start_time=time.time(),
attributes=attributes or {}
)
self._spans[span_id] = span
self._traces[self._current_trace].spans.append(span)
self._current_span = span_id
return span_id
def end_span(
self,
span_id: str = None,
status: str = "ok",
attributes: dict = None
):
"""End a span."""
span_id = span_id or self._current_span
span = self._spans.get(span_id)
if span:
span.end_time = time.time()
span.status = status
if attributes:
span.attributes.update(attributes)
# Move to parent span
self._current_span = span.parent_id
def add_event(
self,
name: str,
attributes: dict = None
):
"""Add an event to current span."""
span = self._spans.get(self._current_span)
if span:
span.events.append({
"name": name,
"timestamp": time.time(),
"attributes": attributes or {}
})
def set_attribute(self, key: str, value: Any):
"""Set attribute on current span."""
span = self._spans.get(self._current_span)
if span:
span.attributes[key] = value
@contextmanager
def span(self, name: str, attributes: dict = None):
"""Context manager for spans."""
span_id = self.start_span(name, attributes)
try:
yield span_id
except Exception as e:
self.end_span(span_id, status="error", attributes={"error": str(e)})
raise
else:
self.end_span(span_id)
def get_trace(self, trace_id: str) -> Optional[Trace]:
"""Get a trace by ID."""
return self._traces.get(trace_id)
class LLMSpanAttributes:
"""Standard attributes for LLM spans."""
# Request attributes
MODEL = "llm.model"
PROVIDER = "llm.provider"
TEMPERATURE = "llm.temperature"
MAX_TOKENS = "llm.max_tokens"
# Token attributes
INPUT_TOKENS = "llm.tokens.input"
OUTPUT_TOKENS = "llm.tokens.output"
TOTAL_TOKENS = "llm.tokens.total"
# Cost attributes
INPUT_COST = "llm.cost.input"
OUTPUT_COST = "llm.cost.output"
TOTAL_COST = "llm.cost.total"
# Response attributes
FINISH_REASON = "llm.finish_reason"
RESPONSE_ID = "llm.response_id"
# RAG attributes
RETRIEVAL_COUNT = "rag.retrieval.count"
RETRIEVAL_LATENCY = "rag.retrieval.latency_ms"
# Chain attributes
CHAIN_TYPE = "chain.type"
CHAIN_STEP = "chain.step"
def trace_llm_call(tracer: LLMTracer):
"""Decorator to trace LLM calls."""
def decorator(func):
async def wrapper(*args, **kwargs):
with tracer.span(f"llm.{func.__name__}") as span_id:
# Set request attributes
if "model" in kwargs:
tracer.set_attribute(LLMSpanAttributes.MODEL, kwargs["model"])
if "temperature" in kwargs:
tracer.set_attribute(LLMSpanAttributes.TEMPERATURE, kwargs["temperature"])
result = await func(*args, **kwargs)
# Set response attributes
if hasattr(result, "usage"):
tracer.set_attribute(LLMSpanAttributes.INPUT_TOKENS, result.usage.prompt_tokens)
tracer.set_attribute(LLMSpanAttributes.OUTPUT_TOKENS, result.usage.completion_tokens)
tracer.set_attribute(LLMSpanAttributes.TOTAL_TOKENS, result.usage.total_tokens)
return result
return wrapper
return decorator
Metrics Collection
from dataclasses import dataclass, field
from typing import Any, Optional
from collections import defaultdict
import time
import threading
@dataclass
class MetricValue:
"""A metric value."""
name: str
value: float
timestamp: float
labels: dict = field(default_factory=dict)
class Counter:
"""A counter metric."""
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
self._values: dict[tuple, float] = defaultdict(float)
self._lock = threading.Lock()
def inc(self, value: float = 1, labels: dict = None):
"""Increment counter."""
label_key = tuple(sorted((labels or {}).items()))
with self._lock:
self._values[label_key] += value
def get(self, labels: dict = None) -> float:
"""Get counter value."""
label_key = tuple(sorted((labels or {}).items()))
return self._values.get(label_key, 0)
class Gauge:
"""A gauge metric."""
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
self._values: dict[tuple, float] = {}
self._lock = threading.Lock()
def set(self, value: float, labels: dict = None):
"""Set gauge value."""
label_key = tuple(sorted((labels or {}).items()))
with self._lock:
self._values[label_key] = value
def inc(self, value: float = 1, labels: dict = None):
"""Increment gauge."""
label_key = tuple(sorted((labels or {}).items()))
with self._lock:
self._values[label_key] = self._values.get(label_key, 0) + value
def dec(self, value: float = 1, labels: dict = None):
"""Decrement gauge."""
self.inc(-value, labels)
def get(self, labels: dict = None) -> float:
"""Get gauge value."""
label_key = tuple(sorted((labels or {}).items()))
return self._values.get(label_key, 0)
class Histogram:
"""A histogram metric."""
def __init__(
self,
name: str,
description: str = "",
buckets: list[float] = None
):
self.name = name
self.description = description
self.buckets = buckets or [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
self._counts: dict[tuple, dict[float, int]] = defaultdict(lambda: defaultdict(int))
self._sums: dict[tuple, float] = defaultdict(float)
self._totals: dict[tuple, int] = defaultdict(int)
self._lock = threading.Lock()
def observe(self, value: float, labels: dict = None):
"""Record an observation."""
label_key = tuple(sorted((labels or {}).items()))
with self._lock:
self._sums[label_key] += value
self._totals[label_key] += 1
for bucket in self.buckets:
if value <= bucket:
self._counts[label_key][bucket] += 1
def get_percentile(self, percentile: float, labels: dict = None) -> float:
"""Get approximate percentile value."""
label_key = tuple(sorted((labels or {}).items()))
total = self._totals.get(label_key, 0)
if total == 0:
return 0
target = total * percentile
cumulative = 0
for bucket in sorted(self.buckets):
cumulative += self._counts[label_key].get(bucket, 0)
if cumulative >= target:
return bucket
return self.buckets[-1]
class LLMMetrics:
"""Metrics for LLM operations."""
def __init__(self):
# Request metrics
self.requests_total = Counter(
"llm_requests_total",
"Total LLM requests"
)
self.request_errors = Counter(
"llm_request_errors_total",
"Total LLM request errors"
)
# Latency metrics
self.request_latency = Histogram(
"llm_request_latency_seconds",
"LLM request latency",
buckets=[0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60]
)
self.time_to_first_token = Histogram(
"llm_ttft_seconds",
"Time to first token",
buckets=[0.05, 0.1, 0.25, 0.5, 1, 2, 5]
)
# Token metrics
self.input_tokens = Counter(
"llm_input_tokens_total",
"Total input tokens"
)
self.output_tokens = Counter(
"llm_output_tokens_total",
"Total output tokens"
)
# Cost metrics
self.cost_total = Counter(
"llm_cost_dollars_total",
"Total LLM cost in dollars"
)
# Active requests
self.active_requests = Gauge(
"llm_active_requests",
"Currently active LLM requests"
)
# Cache metrics
self.cache_hits = Counter(
"llm_cache_hits_total",
"Cache hits"
)
self.cache_misses = Counter(
"llm_cache_misses_total",
"Cache misses"
)
def record_request(
self,
model: str,
latency: float,
input_tokens: int,
output_tokens: int,
cost: float,
success: bool = True
):
"""Record a completed request."""
labels = {"model": model}
self.requests_total.inc(labels=labels)
self.request_latency.observe(latency, labels=labels)
self.input_tokens.inc(input_tokens, labels=labels)
self.output_tokens.inc(output_tokens, labels=labels)
self.cost_total.inc(cost, labels=labels)
if not success:
self.request_errors.inc(labels=labels)
def get_summary(self) -> dict:
"""Get metrics summary."""
return {
"total_requests": sum(self.requests_total._values.values()),
"total_errors": sum(self.request_errors._values.values()),
"total_input_tokens": sum(self.input_tokens._values.values()),
"total_output_tokens": sum(self.output_tokens._values.values()),
"total_cost": sum(self.cost_total._values.values()),
"p50_latency": self.request_latency.get_percentile(0.5),
"p95_latency": self.request_latency.get_percentile(0.95),
"p99_latency": self.request_latency.get_percentile(0.99),
}
Structured Logging
from dataclasses import dataclass, asdict
from typing import Any, Optional
from datetime import datetime
import json
import logging
from enum import Enum
class LogLevel(Enum):
"""Log levels."""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
@dataclass
class LLMLogEntry:
"""Structured log entry for LLM operations."""
timestamp: str
level: str
message: str
trace_id: str = None
span_id: str = None
# Request context
model: str = None
provider: str = None
# Token info
input_tokens: int = None
output_tokens: int = None
# Timing
latency_ms: float = None
# Cost
cost_usd: float = None
# Error info
error_type: str = None
error_message: str = None
# Custom attributes
attributes: dict = None
def to_json(self) -> str:
"""Convert to JSON string."""
data = {k: v for k, v in asdict(self).items() if v is not None}
return json.dumps(data)
class LLMLogger:
"""Structured logger for LLM operations."""
def __init__(
self,
name: str = "llm",
level: LogLevel = LogLevel.INFO
):
self.name = name
self.level = level
self._logger = logging.getLogger(name)
self._context: dict = {}
def set_context(self, **kwargs):
"""Set context for all subsequent logs."""
self._context.update(kwargs)
def clear_context(self):
"""Clear logging context."""
self._context = {}
def _create_entry(
self,
level: LogLevel,
message: str,
**kwargs
) -> LLMLogEntry:
"""Create a log entry."""
return LLMLogEntry(
timestamp=datetime.utcnow().isoformat(),
level=level.value,
message=message,
**{**self._context, **kwargs}
)
def debug(self, message: str, **kwargs):
"""Log debug message."""
entry = self._create_entry(LogLevel.DEBUG, message, **kwargs)
self._logger.debug(entry.to_json())
def info(self, message: str, **kwargs):
"""Log info message."""
entry = self._create_entry(LogLevel.INFO, message, **kwargs)
self._logger.info(entry.to_json())
def warning(self, message: str, **kwargs):
"""Log warning message."""
entry = self._create_entry(LogLevel.WARNING, message, **kwargs)
self._logger.warning(entry.to_json())
def error(self, message: str, **kwargs):
"""Log error message."""
entry = self._create_entry(LogLevel.ERROR, message, **kwargs)
self._logger.error(entry.to_json())
def log_request(
self,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
cost_usd: float = None,
**kwargs
):
"""Log a completed LLM request."""
self.info(
"LLM request completed",
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
cost_usd=cost_usd,
**kwargs
)
def log_error(
self,
error: Exception,
model: str = None,
**kwargs
):
"""Log an LLM error."""
self.error(
f"LLM request failed: {str(error)}",
model=model,
error_type=type(error).__name__,
error_message=str(error),
**kwargs
)
class PromptLogger:
"""Logger for prompts and responses."""
def __init__(
self,
logger: LLMLogger,
log_prompts: bool = True,
log_responses: bool = True,
max_length: int = 1000
):
self.logger = logger
self.log_prompts = log_prompts
self.log_responses = log_responses
self.max_length = max_length
def log_prompt(
self,
prompt: str,
model: str,
trace_id: str = None
):
"""Log a prompt."""
if not self.log_prompts:
return
truncated = prompt[:self.max_length]
if len(prompt) > self.max_length:
truncated += f"... ({len(prompt) - self.max_length} chars truncated)"
self.logger.debug(
"LLM prompt",
model=model,
trace_id=trace_id,
attributes={"prompt": truncated}
)
def log_response(
self,
response: str,
model: str,
trace_id: str = None
):
"""Log a response."""
if not self.log_responses:
return
truncated = response[:self.max_length]
if len(response) > self.max_length:
truncated += f"... ({len(response) - self.max_length} chars truncated)"
self.logger.debug(
"LLM response",
model=model,
trace_id=trace_id,
attributes={"response": truncated}
)
Alerting and Monitoring
from dataclasses import dataclass
from typing import Any, Optional, Callable
from enum import Enum
import asyncio
class AlertSeverity(Enum):
"""Alert severity levels."""
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class Alert:
"""An alert."""
name: str
severity: AlertSeverity
message: str
value: float
threshold: float
labels: dict = None
@dataclass
class AlertRule:
"""An alerting rule."""
name: str
description: str
condition: Callable[[Any], bool]
severity: AlertSeverity
message_template: str
class AlertManager:
"""Manage alerts for LLM operations."""
def __init__(self):
self.rules: list[AlertRule] = []
self.handlers: list[Callable[[Alert], None]] = []
self._active_alerts: dict[str, Alert] = {}
def add_rule(self, rule: AlertRule):
"""Add an alerting rule."""
self.rules.append(rule)
def add_handler(self, handler: Callable[[Alert], None]):
"""Add an alert handler."""
self.handlers.append(handler)
def check(self, metrics: LLMMetrics):
"""Check all rules against current metrics."""
for rule in self.rules:
try:
if rule.condition(metrics):
self._fire_alert(rule, metrics)
else:
self._resolve_alert(rule.name)
except Exception as e:
print(f"Error checking rule {rule.name}: {e}")
def _fire_alert(self, rule: AlertRule, metrics: LLMMetrics):
"""Fire an alert."""
if rule.name in self._active_alerts:
return # Already active
alert = Alert(
name=rule.name,
severity=rule.severity,
message=rule.message_template,
value=0, # Would be populated from metrics
threshold=0
)
self._active_alerts[rule.name] = alert
for handler in self.handlers:
handler(alert)
def _resolve_alert(self, name: str):
"""Resolve an alert."""
if name in self._active_alerts:
del self._active_alerts[name]
class LLMAlertRules:
"""Standard alert rules for LLM operations."""
@staticmethod
def high_error_rate(threshold: float = 0.05) -> AlertRule:
"""Alert on high error rate."""
def condition(metrics: LLMMetrics) -> bool:
total = sum(metrics.requests_total._values.values())
errors = sum(metrics.request_errors._values.values())
if total < 100: # Minimum sample
return False
return (errors / total) > threshold
return AlertRule(
name="high_error_rate",
description="LLM error rate exceeds threshold",
condition=condition,
severity=AlertSeverity.CRITICAL,
message_template=f"LLM error rate exceeds {threshold*100}%"
)
@staticmethod
def high_latency(threshold_ms: float = 5000) -> AlertRule:
"""Alert on high latency."""
def condition(metrics: LLMMetrics) -> bool:
p95 = metrics.request_latency.get_percentile(0.95)
return p95 * 1000 > threshold_ms
return AlertRule(
name="high_latency",
description="LLM latency exceeds threshold",
condition=condition,
severity=AlertSeverity.WARNING,
message_template=f"LLM p95 latency exceeds {threshold_ms}ms"
)
@staticmethod
def cost_spike(threshold_per_hour: float = 100) -> AlertRule:
"""Alert on cost spike."""
def condition(metrics: LLMMetrics) -> bool:
# This would need time-windowed metrics
total_cost = sum(metrics.cost_total._values.values())
return total_cost > threshold_per_hour
return AlertRule(
name="cost_spike",
description="LLM cost exceeds threshold",
condition=condition,
severity=AlertSeverity.WARNING,
message_template=f"LLM cost exceeds ${threshold_per_hour}/hour"
)
@staticmethod
def low_cache_hit_rate(threshold: float = 0.3) -> AlertRule:
"""Alert on low cache hit rate."""
def condition(metrics: LLMMetrics) -> bool:
hits = sum(metrics.cache_hits._values.values())
misses = sum(metrics.cache_misses._values.values())
total = hits + misses
if total < 100:
return False
return (hits / total) < threshold
return AlertRule(
name="low_cache_hit_rate",
description="Cache hit rate below threshold",
condition=condition,
severity=AlertSeverity.INFO,
message_template=f"Cache hit rate below {threshold*100}%"
)
class SlackAlertHandler:
"""Send alerts to Slack."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def handle(self, alert: Alert):
"""Send alert to Slack."""
color = {
AlertSeverity.INFO: "#36a64f",
AlertSeverity.WARNING: "#ff9800",
AlertSeverity.CRITICAL: "#f44336"
}.get(alert.severity, "#808080")
payload = {
"attachments": [{
"color": color,
"title": f"[{alert.severity.value.upper()}] {alert.name}",
"text": alert.message,
"fields": [
{"title": "Value", "value": str(alert.value), "short": True},
{"title": "Threshold", "value": str(alert.threshold), "short": True}
]
}]
}
# Would send to Slack webhook
print(f"Slack alert: {payload}")
Production Observability Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
tracer = LLMTracer()
metrics = LLMMetrics()
logger = LLMLogger()
alert_manager = AlertManager()
# Add default alert rules
alert_manager.add_rule(LLMAlertRules.high_error_rate())
alert_manager.add_rule(LLMAlertRules.high_latency())
alert_manager.add_rule(LLMAlertRules.cost_spike())
class TraceRequest(BaseModel):
name: str
class SpanRequest(BaseModel):
trace_id: str
name: str
attributes: Optional[dict] = None
class MetricRequest(BaseModel):
model: str
latency_seconds: float
input_tokens: int
output_tokens: int
cost_usd: float
success: bool = True
class LogRequest(BaseModel):
level: str
message: str
model: Optional[str] = None
trace_id: Optional[str] = None
attributes: Optional[dict] = None
@app.post("/v1/traces")
async def start_trace(request: TraceRequest):
"""Start a new trace."""
trace_id = tracer.start_trace(request.name)
return {
"trace_id": trace_id,
"status": "started"
}
@app.post("/v1/spans")
async def start_span(request: SpanRequest):
"""Start a new span."""
tracer._current_trace = request.trace_id
span_id = tracer.start_span(request.name, request.attributes)
return {
"span_id": span_id,
"trace_id": request.trace_id,
"status": "started"
}
@app.put("/v1/spans/{span_id}")
async def end_span(span_id: str, status: str = "ok"):
"""End a span."""
tracer.end_span(span_id, status)
return {"status": "ended"}
@app.get("/v1/traces/{trace_id}")
async def get_trace(trace_id: str):
"""Get trace details."""
trace = tracer.get_trace(trace_id)
if not trace:
raise HTTPException(404, "Trace not found")
return {
"trace_id": trace.trace_id,
"total_duration_ms": trace.total_duration_ms,
"spans": [
{
"span_id": s.span_id,
"name": s.name,
"duration_ms": s.duration_ms,
"status": s.status,
"attributes": s.attributes
}
for s in trace.spans
]
}
@app.post("/v1/metrics")
async def record_metric(request: MetricRequest):
"""Record LLM metrics."""
metrics.record_request(
model=request.model,
latency=request.latency_seconds,
input_tokens=request.input_tokens,
output_tokens=request.output_tokens,
cost=request.cost_usd,
success=request.success
)
# Check alerts
alert_manager.check(metrics)
return {"status": "recorded"}
@app.get("/v1/metrics")
async def get_metrics():
"""Get metrics summary."""
return metrics.get_summary()
@app.get("/v1/metrics/prometheus")
async def get_prometheus_metrics():
"""Get metrics in Prometheus format."""
lines = []
# Requests total
for labels, value in metrics.requests_total._values.items():
label_str = ",".join(f'{k}="{v}"' for k, v in labels)
lines.append(f'llm_requests_total{{{label_str}}} {value}')
# Errors total
for labels, value in metrics.request_errors._values.items():
label_str = ",".join(f'{k}="{v}"' for k, v in labels)
lines.append(f'llm_request_errors_total{{{label_str}}} {value}')
# Tokens
for labels, value in metrics.input_tokens._values.items():
label_str = ",".join(f'{k}="{v}"' for k, v in labels)
lines.append(f'llm_input_tokens_total{{{label_str}}} {value}')
for labels, value in metrics.output_tokens._values.items():
label_str = ",".join(f'{k}="{v}"' for k, v in labels)
lines.append(f'llm_output_tokens_total{{{label_str}}} {value}')
return "\n".join(lines)
@app.post("/v1/logs")
async def create_log(request: LogRequest):
"""Create a log entry."""
level = LogLevel(request.level)
if level == LogLevel.DEBUG:
logger.debug(request.message, model=request.model, trace_id=request.trace_id)
elif level == LogLevel.INFO:
logger.info(request.message, model=request.model, trace_id=request.trace_id)
elif level == LogLevel.WARNING:
logger.warning(request.message, model=request.model, trace_id=request.trace_id)
elif level == LogLevel.ERROR:
logger.error(request.message, model=request.model, trace_id=request.trace_id)
return {"status": "logged"}
@app.get("/v1/alerts")
async def get_active_alerts():
"""Get active alerts."""
return {
"alerts": [
{
"name": alert.name,
"severity": alert.severity.value,
"message": alert.message
}
for alert in alert_manager._active_alerts.values()
]
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenTelemetry: https://opentelemetry.io/
- LangSmith: https://smith.langchain.com/
- Weights & Biases: https://wandb.ai/
- Prometheus: https://prometheus.io/
Conclusion
Observability transforms LLM operations from black boxes into understandable systems. Start with distributed tracing—track requests through your entire LLM pipeline, from initial query through retrieval, generation, and post-processing. Capture standard attributes like model, tokens, latency, and cost on every span. Implement comprehensive metrics: request counts, error rates, latency distributions, token usage, and costs. Use histograms for latency to understand percentiles, not just averages. Add structured logging that captures prompts and responses (with appropriate truncation and privacy controls) for debugging. Build alerting on top of your metrics—catch high error rates, latency spikes, and cost anomalies before users complain. Export to standard formats like Prometheus for integration with existing monitoring infrastructure. The key insight is that LLM observability requires domain-specific instrumentation—generic APM tools miss critical details like token counts and model-specific behavior. Invest in observability infrastructure early and you'll debug issues in minutes instead of hours.
