LLM Monitoring and Observability: Metrics, Traces, and Alerts

Introduction: LLM applications are notoriously difficult to debug. Unlike traditional software where errors are obvious, LLM issues manifest as subtle quality degradation, unexpected costs, or slow responses. Proper observability is essential for production LLM systems. This guide covers monitoring strategies: tracking latency, tokens, and costs; implementing distributed tracing for complex chains; structured logging for debugging; quality metrics and evaluation; and alerting on anomalies. These patterns help you understand what’s happening inside your LLM application and catch problems before users notice.

LLM Monitoring
LLM Observability: Traces, Metrics, and Logs

Basic Metrics Collection

import time
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict
import statistics

@dataclass
class LLMMetrics:
    """Metrics for a single LLM call."""
    
    model: str
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    latency_ms: float
    cost_usd: float
    success: bool
    error: Optional[str] = None
    timestamp: float = field(default_factory=time.time)

class MetricsCollector:
    """Collect and aggregate LLM metrics."""
    
    # Cost per 1K tokens (approximate)
    COSTS = {
        "gpt-4o": {"input": 0.005, "output": 0.015},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
        "gpt-4-turbo": {"input": 0.01, "output": 0.03},
        "claude-3-opus": {"input": 0.015, "output": 0.075},
        "claude-3-sonnet": {"input": 0.003, "output": 0.015},
    }
    
    def __init__(self):
        self.metrics: list[LLMMetrics] = []
        self.by_model: dict[str, list[LLMMetrics]] = defaultdict(list)
    
    def calculate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> float:
        """Calculate cost for a request."""
        costs = self.COSTS.get(model, {"input": 0.01, "output": 0.03})
        return (
            (prompt_tokens / 1000) * costs["input"] +
            (completion_tokens / 1000) * costs["output"]
        )
    
    def record(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int,
        latency_ms: float,
        success: bool = True,
        error: str = None
    ):
        """Record metrics for an LLM call."""
        
        cost = self.calculate_cost(model, prompt_tokens, completion_tokens)
        
        metric = LLMMetrics(
            model=model,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=prompt_tokens + completion_tokens,
            latency_ms=latency_ms,
            cost_usd=cost,
            success=success,
            error=error
        )
        
        self.metrics.append(metric)
        self.by_model[model].append(metric)
    
    def get_summary(self, window_seconds: int = 3600) -> dict:
        """Get metrics summary for time window."""
        
        cutoff = time.time() - window_seconds
        recent = [m for m in self.metrics if m.timestamp > cutoff]
        
        if not recent:
            return {"total_requests": 0}
        
        latencies = [m.latency_ms for m in recent]
        
        return {
            "total_requests": len(recent),
            "success_rate": sum(1 for m in recent if m.success) / len(recent),
            "total_tokens": sum(m.total_tokens for m in recent),
            "total_cost_usd": sum(m.cost_usd for m in recent),
            "avg_latency_ms": statistics.mean(latencies),
            "p50_latency_ms": statistics.median(latencies),
            "p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)] if len(latencies) > 20 else max(latencies),
            "p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if len(latencies) > 100 else max(latencies),
            "requests_by_model": {
                model: len(metrics)
                for model, metrics in self.by_model.items()
            }
        }

# Global collector
metrics = MetricsCollector()

# Usage with OpenAI
from openai import OpenAI

client = OpenAI()

def monitored_completion(prompt: str, model: str = "gpt-4o") -> str:
    """Completion with automatic metrics collection."""
    
    start_time = time.time()
    error = None
    success = True
    
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        result = response.choices[0].message.content
        
        metrics.record(
            model=model,
            prompt_tokens=response.usage.prompt_tokens,
            completion_tokens=response.usage.completion_tokens,
            latency_ms=(time.time() - start_time) * 1000,
            success=True
        )
        
        return result
    
    except Exception as e:
        metrics.record(
            model=model,
            prompt_tokens=0,
            completion_tokens=0,
            latency_ms=(time.time() - start_time) * 1000,
            success=False,
            error=str(e)
        )
        raise

# Get summary
summary = metrics.get_summary(window_seconds=3600)
print(f"Requests: {summary['total_requests']}")
print(f"Success rate: {summary.get('success_rate', 0):.1%}")
print(f"Total cost: ${summary.get('total_cost_usd', 0):.4f}")

Distributed Tracing

import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Optional, Any

@dataclass
class Span:
    """A span in a distributed trace."""
    
    trace_id: str
    span_id: str
    parent_id: Optional[str]
    name: str
    start_time: float
    end_time: Optional[float] = None
    attributes: dict = field(default_factory=dict)
    events: list = field(default_factory=list)
    status: str = "OK"
    
    @property
    def duration_ms(self) -> Optional[float]:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return None

class Tracer:
    """Simple distributed tracer for LLM applications."""
    
    def __init__(self):
        self.traces: dict[str, list[Span]] = defaultdict(list)
        self._current_span: Optional[Span] = None
        self._current_trace_id: Optional[str] = None
    
    @contextmanager
    def start_trace(self, name: str):
        """Start a new trace."""
        trace_id = str(uuid.uuid4())
        self._current_trace_id = trace_id
        
        with self.start_span(name) as span:
            yield span
        
        self._current_trace_id = None
    
    @contextmanager
    def start_span(self, name: str, attributes: dict = None):
        """Start a new span within current trace."""
        
        trace_id = self._current_trace_id or str(uuid.uuid4())
        parent_id = self._current_span.span_id if self._current_span else None
        
        span = Span(
            trace_id=trace_id,
            span_id=str(uuid.uuid4()),
            parent_id=parent_id,
            name=name,
            start_time=time.time(),
            attributes=attributes or {}
        )
        
        previous_span = self._current_span
        self._current_span = span
        
        try:
            yield span
            span.status = "OK"
        except Exception as e:
            span.status = "ERROR"
            span.attributes["error"] = str(e)
            raise
        finally:
            span.end_time = time.time()
            self.traces[trace_id].append(span)
            self._current_span = previous_span
    
    def add_event(self, name: str, attributes: dict = None):
        """Add event to current span."""
        if self._current_span:
            self._current_span.events.append({
                "name": name,
                "timestamp": time.time(),
                "attributes": attributes or {}
            })
    
    def set_attribute(self, key: str, value: Any):
        """Set attribute on current span."""
        if self._current_span:
            self._current_span.attributes[key] = value
    
    def get_trace(self, trace_id: str) -> list[Span]:
        """Get all spans for a trace."""
        return self.traces.get(trace_id, [])
    
    def print_trace(self, trace_id: str):
        """Print trace in readable format."""
        spans = self.get_trace(trace_id)
        
        for span in sorted(spans, key=lambda s: s.start_time):
            indent = "  " if span.parent_id else ""
            print(f"{indent}{span.name}: {span.duration_ms:.1f}ms [{span.status}]")
            for key, value in span.attributes.items():
                print(f"{indent}  {key}: {value}")

# Global tracer
tracer = Tracer()

# Usage in RAG pipeline
def traced_rag_query(query: str) -> str:
    """RAG query with full tracing."""
    
    with tracer.start_trace("rag_query") as root:
        tracer.set_attribute("query", query)
        
        # Embedding generation
        with tracer.start_span("generate_embedding"):
            response = client.embeddings.create(
                model="text-embedding-3-small",
                input=query
            )
            embedding = response.data[0].embedding
            tracer.set_attribute("embedding_dim", len(embedding))
        
        # Vector search
        with tracer.start_span("vector_search"):
            # Simulated search
            results = [{"text": "Result 1"}, {"text": "Result 2"}]
            tracer.set_attribute("num_results", len(results))
        
        # LLM generation
        with tracer.start_span("llm_generation"):
            context = "\n".join(r["text"] for r in results)
            
            response = client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": f"Context: {context}"},
                    {"role": "user", "content": query}
                ]
            )
            
            tracer.set_attribute("prompt_tokens", response.usage.prompt_tokens)
            tracer.set_attribute("completion_tokens", response.usage.completion_tokens)
            
            return response.choices[0].message.content

# Run traced query
result = traced_rag_query("What is machine learning?")

# Print trace
tracer.print_trace(tracer._current_trace_id)

Structured Logging

import json
import logging
from datetime import datetime

class StructuredLogger:
    """JSON structured logger for LLM applications."""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # JSON formatter
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
        
        self.default_fields = {}
    
    def set_default_field(self, key: str, value: Any):
        """Set a field that appears in all log entries."""
        self.default_fields[key] = value
    
    def _log(self, level: str, message: str, **kwargs):
        """Internal log method."""
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": level,
            "message": message,
            **self.default_fields,
            **kwargs
        }
        
        log_func = getattr(self.logger, level.lower())
        log_func(json.dumps(entry))
    
    def info(self, message: str, **kwargs):
        self._log("INFO", message, **kwargs)
    
    def warning(self, message: str, **kwargs):
        self._log("WARNING", message, **kwargs)
    
    def error(self, message: str, **kwargs):
        self._log("ERROR", message, **kwargs)
    
    def llm_request(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int,
        latency_ms: float,
        success: bool,
        **kwargs
    ):
        """Log an LLM request with standard fields."""
        self._log(
            "INFO" if success else "ERROR",
            "llm_request",
            event_type="llm_request",
            model=model,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=prompt_tokens + completion_tokens,
            latency_ms=latency_ms,
            success=success,
            **kwargs
        )

# Usage
logger = StructuredLogger("llm_app")
logger.set_default_field("service", "rag-api")
logger.set_default_field("environment", "production")

def logged_completion(prompt: str, model: str = "gpt-4o") -> str:
    """Completion with structured logging."""
    
    request_id = str(uuid.uuid4())
    start_time = time.time()
    
    logger.info(
        "llm_request_start",
        request_id=request_id,
        model=model,
        prompt_length=len(prompt)
    )
    
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        latency_ms = (time.time() - start_time) * 1000
        
        logger.llm_request(
            model=model,
            prompt_tokens=response.usage.prompt_tokens,
            completion_tokens=response.usage.completion_tokens,
            latency_ms=latency_ms,
            success=True,
            request_id=request_id
        )
        
        return response.choices[0].message.content
    
    except Exception as e:
        latency_ms = (time.time() - start_time) * 1000
        
        logger.llm_request(
            model=model,
            prompt_tokens=0,
            completion_tokens=0,
            latency_ms=latency_ms,
            success=False,
            request_id=request_id,
            error=str(e),
            error_type=type(e).__name__
        )
        raise

Quality Monitoring

from enum import Enum

class QualityDimension(str, Enum):
    RELEVANCE = "relevance"
    COHERENCE = "coherence"
    FACTUALITY = "factuality"
    HELPFULNESS = "helpfulness"

class QualityMonitor:
    """Monitor LLM output quality."""
    
    def __init__(self, sample_rate: float = 0.1):
        self.sample_rate = sample_rate
        self.evaluations: list[dict] = []
    
    def should_evaluate(self) -> bool:
        """Determine if this request should be evaluated."""
        import random
        return random.random() < self.sample_rate
    
    def evaluate_response(
        self,
        query: str,
        response: str,
        context: str = None
    ) -> dict:
        """Evaluate response quality using LLM."""
        
        eval_prompt = f"""Evaluate this AI response on a scale of 1-5 for each dimension.

Query: {query}
{"Context: " + context if context else ""}
Response: {response}

Rate each dimension (1=poor, 5=excellent):
- relevance: How relevant is the response to the query?
- coherence: How well-structured and clear is the response?
- factuality: How accurate and factual is the information?
- helpfulness: How helpful is the response for the user?

Return JSON: {{"relevance": 1-5, "coherence": 1-5, "factuality": 1-5, "helpfulness": 1-5, "issues": ["list of issues"]}}"""
        
        eval_response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": eval_prompt}],
            response_format={"type": "json_object"}
        )
        
        scores = json.loads(eval_response.choices[0].message.content)
        
        evaluation = {
            "timestamp": time.time(),
            "query": query[:200],
            "response": response[:500],
            "scores": scores,
            "avg_score": sum(scores.get(d.value, 0) for d in QualityDimension) / 4
        }
        
        self.evaluations.append(evaluation)
        return evaluation
    
    def get_quality_summary(self, window_seconds: int = 86400) -> dict:
        """Get quality metrics summary."""
        
        cutoff = time.time() - window_seconds
        recent = [e for e in self.evaluations if e["timestamp"] > cutoff]
        
        if not recent:
            return {"evaluations": 0}
        
        avg_scores = {}
        for dim in QualityDimension:
            scores = [e["scores"].get(dim.value, 0) for e in recent]
            avg_scores[dim.value] = sum(scores) / len(scores)
        
        return {
            "evaluations": len(recent),
            "avg_scores": avg_scores,
            "overall_avg": sum(avg_scores.values()) / len(avg_scores),
            "low_quality_count": sum(1 for e in recent if e["avg_score"] < 3)
        }

# Usage
quality_monitor = QualityMonitor(sample_rate=0.1)

def quality_monitored_completion(query: str, context: str = None) -> str:
    """Completion with quality monitoring."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"Context: {context}" if context else "You are helpful."},
            {"role": "user", "content": query}
        ]
    )
    
    result = response.choices[0].message.content
    
    # Sample-based quality evaluation
    if quality_monitor.should_evaluate():
        evaluation = quality_monitor.evaluate_response(query, result, context)
        
        if evaluation["avg_score"] < 3:
            logger.warning(
                "low_quality_response",
                query=query[:100],
                scores=evaluation["scores"]
            )
    
    return result

Alerting System

from dataclasses import dataclass
from typing import Callable

@dataclass
class AlertRule:
    name: str
    condition: Callable[[dict], bool]
    severity: str  # "warning", "critical"
    message_template: str

class AlertManager:
    """Manage alerts for LLM metrics."""
    
    def __init__(self):
        self.rules: list[AlertRule] = []
        self.alerts: list[dict] = []
        self.alert_handlers: list[Callable] = []
    
    def add_rule(self, rule: AlertRule):
        """Add an alert rule."""
        self.rules.append(rule)
    
    def add_handler(self, handler: Callable):
        """Add alert handler (e.g., Slack, PagerDuty)."""
        self.alert_handlers.append(handler)
    
    def check_metrics(self, metrics_summary: dict):
        """Check metrics against all rules."""
        
        for rule in self.rules:
            try:
                if rule.condition(metrics_summary):
                    alert = {
                        "timestamp": time.time(),
                        "rule": rule.name,
                        "severity": rule.severity,
                        "message": rule.message_template.format(**metrics_summary),
                        "metrics": metrics_summary
                    }
                    
                    self.alerts.append(alert)
                    
                    for handler in self.alert_handlers:
                        handler(alert)
            except Exception as e:
                logger.error(f"Alert rule check failed: {rule.name}", error=str(e))
    
    def get_recent_alerts(self, window_seconds: int = 3600) -> list[dict]:
        """Get recent alerts."""
        cutoff = time.time() - window_seconds
        return [a for a in self.alerts if a["timestamp"] > cutoff]

# Setup alerts
alert_manager = AlertManager()

# High error rate alert
alert_manager.add_rule(AlertRule(
    name="high_error_rate",
    condition=lambda m: m.get("success_rate", 1) < 0.95,
    severity="critical",
    message_template="LLM error rate is {success_rate:.1%}, above 5% threshold"
))

# High latency alert
alert_manager.add_rule(AlertRule(
    name="high_latency",
    condition=lambda m: m.get("p95_latency_ms", 0) > 5000,
    severity="warning",
    message_template="P95 latency is {p95_latency_ms:.0f}ms, above 5s threshold"
))

# Cost spike alert
alert_manager.add_rule(AlertRule(
    name="cost_spike",
    condition=lambda m: m.get("total_cost_usd", 0) > 100,
    severity="warning",
    message_template="Hourly cost is ${total_cost_usd:.2f}, above $100 threshold"
))

# Low quality alert
alert_manager.add_rule(AlertRule(
    name="low_quality",
    condition=lambda m: m.get("overall_avg", 5) < 3.5,
    severity="warning",
    message_template="Average quality score is {overall_avg:.2f}, below 3.5 threshold"
))

# Slack handler example
def slack_alert_handler(alert: dict):
    """Send alert to Slack."""
    # In production, use actual Slack webhook
    print(f"[ALERT] [{alert['severity'].upper()}] {alert['message']}")

alert_manager.add_handler(slack_alert_handler)

# Check metrics periodically
summary = metrics.get_summary()
alert_manager.check_metrics(summary)

Production Monitoring Service

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class MetricsResponse(BaseModel):
    requests: int
    success_rate: float
    avg_latency_ms: float
    p95_latency_ms: float
    total_cost_usd: float
    tokens_used: int

@app.get("/metrics", response_model=MetricsResponse)
async def get_metrics(window_hours: int = 1):
    """Get LLM metrics for dashboard."""
    
    summary = metrics.get_summary(window_seconds=window_hours * 3600)
    
    return MetricsResponse(
        requests=summary.get("total_requests", 0),
        success_rate=summary.get("success_rate", 1.0),
        avg_latency_ms=summary.get("avg_latency_ms", 0),
        p95_latency_ms=summary.get("p95_latency_ms", 0),
        total_cost_usd=summary.get("total_cost_usd", 0),
        tokens_used=summary.get("total_tokens", 0)
    )

@app.get("/quality")
async def get_quality(window_hours: int = 24):
    """Get quality metrics."""
    return quality_monitor.get_quality_summary(window_seconds=window_hours * 3600)

@app.get("/alerts")
async def get_alerts(window_hours: int = 24):
    """Get recent alerts."""
    return alert_manager.get_recent_alerts(window_seconds=window_hours * 3600)

@app.get("/health")
async def health_check():
    """Health check with basic metrics."""
    summary = metrics.get_summary(window_seconds=300)  # Last 5 minutes
    
    healthy = (
        summary.get("success_rate", 1) > 0.9 and
        summary.get("p95_latency_ms", 0) < 10000
    )
    
    return {
        "status": "healthy" if healthy else "degraded",
        "checks": {
            "error_rate": summary.get("success_rate", 1) > 0.9,
            "latency": summary.get("p95_latency_ms", 0) < 10000
        }
    }

References

Conclusion

Observability is essential for production LLM applications. Start with basic metrics—latency, tokens, costs, and error rates. Add distributed tracing to understand complex chains and identify bottlenecks. Use structured logging for debugging and audit trails. Implement quality monitoring with sample-based evaluation to catch degradation. Set up alerts for critical thresholds before users notice problems. The combination of metrics, traces, and logs gives you complete visibility into your LLM system's behavior, enabling you to optimize performance, control costs, and maintain quality at scale.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.