LLM Application Monitoring: Metrics, Tracing, and Alerting for Production AI Systems

Introduction: LLM applications fail in ways traditional software doesn’t. A model might return syntactically correct but factually wrong responses. Latency can spike unpredictably. Costs can explode without warning. Token usage varies wildly based on input. Traditional APM tools miss these LLM-specific failure modes. This guide covers comprehensive monitoring for LLM applications: tracking latency, tokens, and costs; implementing distributed tracing across chains and agents; detecting quality degradation; and building alerting systems that catch problems before users notice them.

LLM Monitoring
Monitoring Pipeline: Metrics Collection, Distributed Tracing, Alerting

Metrics Collection

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import time
import json

@dataclass
class LLMMetrics:
    """Metrics for a single LLM call."""
    
    request_id: str
    model: str
    timestamp: datetime
    
    # Latency metrics
    latency_ms: float
    time_to_first_token_ms: Optional[float] = None
    
    # Token metrics
    prompt_tokens: int = 0
    completion_tokens: int = 0
    total_tokens: int = 0
    
    # Cost metrics
    cost_usd: float = 0.0
    
    # Quality metrics
    finish_reason: str = ""
    error: Optional[str] = None
    
    # Context
    endpoint: str = ""
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    tags: dict = field(default_factory=dict)

class MetricsCollector:
    """Collect and aggregate LLM metrics."""
    
    def __init__(self):
        self.metrics: list[LLMMetrics] = []
        self.cost_per_1k_tokens = {
            "gpt-4o": {"input": 0.0025, "output": 0.01},
            "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
            "gpt-4-turbo": {"input": 0.01, "output": 0.03},
            "claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
            "claude-3-opus": {"input": 0.015, "output": 0.075},
        }
    
    def calculate_cost(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int
    ) -> float:
        """Calculate cost for a request."""
        
        if model not in self.cost_per_1k_tokens:
            return 0.0
        
        rates = self.cost_per_1k_tokens[model]
        input_cost = (prompt_tokens / 1000) * rates["input"]
        output_cost = (completion_tokens / 1000) * rates["output"]
        
        return input_cost + output_cost
    
    def record(self, metrics: LLMMetrics):
        """Record metrics."""
        
        # Calculate cost if not provided
        if metrics.cost_usd == 0.0:
            metrics.cost_usd = self.calculate_cost(
                metrics.model,
                metrics.prompt_tokens,
                metrics.completion_tokens
            )
        
        self.metrics.append(metrics)
    
    def get_summary(self, window_minutes: int = 60) -> dict:
        """Get summary statistics for recent metrics."""
        
        cutoff = datetime.utcnow().timestamp() - (window_minutes * 60)
        recent = [m for m in self.metrics if m.timestamp.timestamp() > cutoff]
        
        if not recent:
            return {"count": 0}
        
        latencies = [m.latency_ms for m in recent]
        tokens = [m.total_tokens for m in recent]
        costs = [m.cost_usd for m in recent]
        errors = [m for m in recent if m.error]
        
        return {
            "count": len(recent),
            "latency_p50_ms": sorted(latencies)[len(latencies) // 2],
            "latency_p95_ms": sorted(latencies)[int(len(latencies) * 0.95)],
            "latency_p99_ms": sorted(latencies)[int(len(latencies) * 0.99)],
            "avg_tokens": sum(tokens) / len(tokens),
            "total_cost_usd": sum(costs),
            "error_rate": len(errors) / len(recent),
            "errors": len(errors)
        }

class InstrumentedLLMClient:
    """LLM client with automatic metrics collection."""
    
    def __init__(
        self,
        client: Any,
        collector: MetricsCollector,
        default_tags: dict = None
    ):
        self.client = client
        self.collector = collector
        self.default_tags = default_tags or {}
    
    async def chat_completion(
        self,
        model: str,
        messages: list[dict],
        request_id: str = None,
        user_id: str = None,
        session_id: str = None,
        tags: dict = None,
        **kwargs
    ) -> Any:
        """Make instrumented chat completion request."""
        
        import uuid
        request_id = request_id or str(uuid.uuid4())
        start_time = time.time()
        error = None
        response = None
        
        try:
            response = await self.client.chat.completions.create(
                model=model,
                messages=messages,
                **kwargs
            )
            return response
        
        except Exception as e:
            error = str(e)
            raise
        
        finally:
            latency_ms = (time.time() - start_time) * 1000
            
            metrics = LLMMetrics(
                request_id=request_id,
                model=model,
                timestamp=datetime.utcnow(),
                latency_ms=latency_ms,
                prompt_tokens=response.usage.prompt_tokens if response else 0,
                completion_tokens=response.usage.completion_tokens if response else 0,
                total_tokens=response.usage.total_tokens if response else 0,
                finish_reason=response.choices[0].finish_reason if response else "",
                error=error,
                endpoint="chat.completions",
                user_id=user_id,
                session_id=session_id,
                tags={**self.default_tags, **(tags or {})}
            )
            
            self.collector.record(metrics)

Distributed Tracing

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from contextlib import contextmanager
import uuid
import threading

@dataclass
class Span:
    """A span in a distributed trace."""
    
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    name: str
    start_time: datetime
    end_time: Optional[datetime] = None
    
    # Span data
    attributes: dict = field(default_factory=dict)
    events: list[dict] = field(default_factory=list)
    status: str = "ok"
    error: Optional[str] = None
    
    @property
    def duration_ms(self) -> float:
        if not self.end_time:
            return 0
        return (self.end_time - self.start_time).total_seconds() * 1000

class Tracer:
    """Distributed tracing for LLM applications."""
    
    _local = threading.local()
    
    def __init__(self, service_name: str, exporter: Any = None):
        self.service_name = service_name
        self.exporter = exporter
        self.spans: list[Span] = []
    
    @property
    def current_span(self) -> Optional[Span]:
        return getattr(self._local, 'current_span', None)
    
    @current_span.setter
    def current_span(self, span: Optional[Span]):
        self._local.current_span = span
    
    @contextmanager
    def start_span(self, name: str, attributes: dict = None):
        """Start a new span."""
        
        parent = self.current_span
        
        span = Span(
            trace_id=parent.trace_id if parent else str(uuid.uuid4()),
            span_id=str(uuid.uuid4()),
            parent_span_id=parent.span_id if parent else None,
            name=name,
            start_time=datetime.utcnow(),
            attributes=attributes or {}
        )
        
        previous_span = self.current_span
        self.current_span = span
        
        try:
            yield span
            span.status = "ok"
        except Exception as e:
            span.status = "error"
            span.error = str(e)
            raise
        finally:
            span.end_time = datetime.utcnow()
            self.spans.append(span)
            self.current_span = previous_span
            
            if self.exporter:
                self.exporter.export(span)
    
    def add_event(self, name: str, attributes: dict = None):
        """Add event to current span."""
        
        if self.current_span:
            self.current_span.events.append({
                "name": name,
                "timestamp": datetime.utcnow().isoformat(),
                "attributes": attributes or {}
            })
    
    def set_attribute(self, key: str, value: Any):
        """Set attribute on current span."""
        
        if self.current_span:
            self.current_span.attributes[key] = value

class LLMTracer:
    """Specialized tracer for LLM operations."""
    
    def __init__(self, tracer: Tracer):
        self.tracer = tracer
    
    @contextmanager
    def trace_llm_call(
        self,
        model: str,
        operation: str = "chat.completion"
    ):
        """Trace an LLM API call."""
        
        with self.tracer.start_span(
            f"llm.{operation}",
            attributes={
                "llm.model": model,
                "llm.operation": operation
            }
        ) as span:
            yield span
    
    @contextmanager
    def trace_chain(self, chain_name: str):
        """Trace a chain execution."""
        
        with self.tracer.start_span(
            f"chain.{chain_name}",
            attributes={"chain.name": chain_name}
        ) as span:
            yield span
    
    @contextmanager
    def trace_retrieval(self, retriever_name: str):
        """Trace a retrieval operation."""
        
        with self.tracer.start_span(
            f"retrieval.{retriever_name}",
            attributes={"retriever.name": retriever_name}
        ) as span:
            yield span
    
    @contextmanager
    def trace_tool_call(self, tool_name: str):
        """Trace a tool/function call."""
        
        with self.tracer.start_span(
            f"tool.{tool_name}",
            attributes={"tool.name": tool_name}
        ) as span:
            yield span
    
    def record_tokens(self, prompt_tokens: int, completion_tokens: int):
        """Record token usage on current span."""
        
        self.tracer.set_attribute("llm.prompt_tokens", prompt_tokens)
        self.tracer.set_attribute("llm.completion_tokens", completion_tokens)
        self.tracer.set_attribute("llm.total_tokens", prompt_tokens + completion_tokens)
    
    def record_cost(self, cost_usd: float):
        """Record cost on current span."""
        
        self.tracer.set_attribute("llm.cost_usd", cost_usd)

class TracedRAGPipeline:
    """RAG pipeline with full tracing."""
    
    def __init__(
        self,
        retriever: Any,
        llm_client: Any,
        llm_tracer: LLMTracer
    ):
        self.retriever = retriever
        self.llm_client = llm_client
        self.llm_tracer = llm_tracer
    
    async def query(self, question: str) -> str:
        """Execute traced RAG query."""
        
        with self.llm_tracer.trace_chain("rag_pipeline"):
            # Trace retrieval
            with self.llm_tracer.trace_retrieval("vector_search"):
                docs = await self.retriever.search(question, top_k=5)
                self.llm_tracer.tracer.set_attribute("retrieval.doc_count", len(docs))
            
            # Build context
            context = "\n\n".join([d.content for d in docs])
            
            # Trace LLM call
            with self.llm_tracer.trace_llm_call("gpt-4o-mini"):
                response = await self.llm_client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[
                        {"role": "system", "content": f"Context:\n{context}"},
                        {"role": "user", "content": question}
                    ]
                )
                
                self.llm_tracer.record_tokens(
                    response.usage.prompt_tokens,
                    response.usage.completion_tokens
                )
            
            return response.choices[0].message.content

Quality Monitoring

from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
import re

@dataclass
class QualityMetrics:
    """Quality metrics for an LLM response."""
    
    request_id: str
    timestamp: datetime
    
    # Response quality
    response_length: int
    has_refusal: bool
    has_hallucination_markers: bool
    sentiment_score: float
    
    # Factual consistency (if ground truth available)
    factual_accuracy: Optional[float] = None
    
    # User feedback
    user_rating: Optional[int] = None
    user_feedback: Optional[str] = None

class QualityMonitor:
    """Monitor LLM response quality."""
    
    def __init__(self, client: Any = None):
        self.client = client
        self.refusal_patterns = [
            r"I cannot",
            r"I'm unable to",
            r"I don't have access",
            r"I apologize, but",
            r"As an AI",
            r"I'm not able to"
        ]
        self.hallucination_markers = [
            r"I think",
            r"I believe",
            r"probably",
            r"might be",
            r"I'm not sure",
            r"as far as I know"
        ]
    
    def analyze_response(
        self,
        request_id: str,
        response: str,
        query: str = None
    ) -> QualityMetrics:
        """Analyze response quality."""
        
        has_refusal = any(
            re.search(pattern, response, re.IGNORECASE)
            for pattern in self.refusal_patterns
        )
        
        has_hallucination_markers = any(
            re.search(pattern, response, re.IGNORECASE)
            for pattern in self.hallucination_markers
        )
        
        # Simple sentiment (positive words - negative words)
        positive_words = len(re.findall(r'\b(good|great|excellent|helpful|correct)\b', response, re.IGNORECASE))
        negative_words = len(re.findall(r'\b(bad|wrong|error|incorrect|sorry)\b', response, re.IGNORECASE))
        sentiment = (positive_words - negative_words) / max(len(response.split()), 1)
        
        return QualityMetrics(
            request_id=request_id,
            timestamp=datetime.utcnow(),
            response_length=len(response),
            has_refusal=has_refusal,
            has_hallucination_markers=has_hallucination_markers,
            sentiment_score=sentiment
        )
    
    async def evaluate_factual_accuracy(
        self,
        response: str,
        ground_truth: str
    ) -> float:
        """Evaluate factual accuracy using LLM."""
        
        if not self.client:
            return 0.0
        
        prompt = f"""Compare the response to the ground truth and rate factual accuracy from 0 to 1.
Only respond with a number.

Response: {response[:1000]}

Ground Truth: {ground_truth[:1000]}

Accuracy (0-1):"""
        
        result = await self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=5
        )
        
        try:
            return float(result.choices[0].message.content.strip())
        except ValueError:
            return 0.5

class DriftDetector:
    """Detect quality drift over time."""
    
    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.baseline_metrics: list[QualityMetrics] = []
        self.current_metrics: list[QualityMetrics] = []
    
    def set_baseline(self, metrics: list[QualityMetrics]):
        """Set baseline metrics for comparison."""
        self.baseline_metrics = metrics[-self.window_size:]
    
    def add_metric(self, metric: QualityMetrics):
        """Add new metric and check for drift."""
        
        self.current_metrics.append(metric)
        if len(self.current_metrics) > self.window_size:
            self.current_metrics.pop(0)
    
    def check_drift(self) -> dict:
        """Check for quality drift."""
        
        if len(self.baseline_metrics) < 10 or len(self.current_metrics) < 10:
            return {"drift_detected": False, "reason": "insufficient_data"}
        
        # Compare refusal rates
        baseline_refusal_rate = sum(1 for m in self.baseline_metrics if m.has_refusal) / len(self.baseline_metrics)
        current_refusal_rate = sum(1 for m in self.current_metrics if m.has_refusal) / len(self.current_metrics)
        
        # Compare response lengths
        baseline_avg_length = sum(m.response_length for m in self.baseline_metrics) / len(self.baseline_metrics)
        current_avg_length = sum(m.response_length for m in self.current_metrics) / len(self.current_metrics)
        
        # Compare hallucination markers
        baseline_hallucination_rate = sum(1 for m in self.baseline_metrics if m.has_hallucination_markers) / len(self.baseline_metrics)
        current_hallucination_rate = sum(1 for m in self.current_metrics if m.has_hallucination_markers) / len(self.current_metrics)
        
        drift_detected = False
        reasons = []
        
        if abs(current_refusal_rate - baseline_refusal_rate) > 0.1:
            drift_detected = True
            reasons.append(f"refusal_rate_change: {baseline_refusal_rate:.2f} -> {current_refusal_rate:.2f}")
        
        if abs(current_avg_length - baseline_avg_length) / baseline_avg_length > 0.3:
            drift_detected = True
            reasons.append(f"length_change: {baseline_avg_length:.0f} -> {current_avg_length:.0f}")
        
        if abs(current_hallucination_rate - baseline_hallucination_rate) > 0.15:
            drift_detected = True
            reasons.append(f"hallucination_rate_change: {baseline_hallucination_rate:.2f} -> {current_hallucination_rate:.2f}")
        
        return {
            "drift_detected": drift_detected,
            "reasons": reasons,
            "metrics": {
                "baseline_refusal_rate": baseline_refusal_rate,
                "current_refusal_rate": current_refusal_rate,
                "baseline_avg_length": baseline_avg_length,
                "current_avg_length": current_avg_length
            }
        }

Alerting System

from dataclasses import dataclass
from typing import Any, Callable
from datetime import datetime, timedelta
from enum import Enum

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    """An alert triggered by monitoring."""
    
    id: str
    name: str
    severity: AlertSeverity
    message: str
    timestamp: datetime
    metadata: dict = None

@dataclass
class AlertRule:
    """A rule for triggering alerts."""
    
    name: str
    condition: Callable[[dict], bool]
    severity: AlertSeverity
    message_template: str
    cooldown_minutes: int = 5

class AlertManager:
    """Manage alerts for LLM applications."""
    
    def __init__(self):
        self.rules: list[AlertRule] = []
        self.alerts: list[Alert] = []
        self.last_alert_times: dict[str, datetime] = {}
        self.notifiers: list[Callable[[Alert], None]] = []
    
    def add_rule(self, rule: AlertRule):
        """Add an alert rule."""
        self.rules.append(rule)
    
    def add_notifier(self, notifier: Callable[[Alert], None]):
        """Add a notification handler."""
        self.notifiers.append(notifier)
    
    def check_rules(self, metrics: dict):
        """Check all rules against current metrics."""
        
        import uuid
        
        for rule in self.rules:
            # Check cooldown
            last_alert = self.last_alert_times.get(rule.name)
            if last_alert:
                cooldown_end = last_alert + timedelta(minutes=rule.cooldown_minutes)
                if datetime.utcnow() < cooldown_end:
                    continue
            
            # Check condition
            if rule.condition(metrics):
                alert = Alert(
                    id=str(uuid.uuid4()),
                    name=rule.name,
                    severity=rule.severity,
                    message=rule.message_template.format(**metrics),
                    timestamp=datetime.utcnow(),
                    metadata=metrics
                )
                
                self.alerts.append(alert)
                self.last_alert_times[rule.name] = datetime.utcnow()
                
                # Notify
                for notifier in self.notifiers:
                    try:
                        notifier(alert)
                    except Exception as e:
                        print(f"Notifier error: {e}")
    
    def get_active_alerts(self, hours: int = 24) -> list[Alert]:
        """Get recent alerts."""
        
        cutoff = datetime.utcnow() - timedelta(hours=hours)
        return [a for a in self.alerts if a.timestamp > cutoff]

def create_default_alert_rules() -> list[AlertRule]:
    """Create default alert rules for LLM monitoring."""
    
    return [
        AlertRule(
            name="high_latency",
            condition=lambda m: m.get("latency_p95_ms", 0) > 5000,
            severity=AlertSeverity.WARNING,
            message_template="High latency detected: P95 = {latency_p95_ms:.0f}ms"
        ),
        AlertRule(
            name="critical_latency",
            condition=lambda m: m.get("latency_p99_ms", 0) > 10000,
            severity=AlertSeverity.CRITICAL,
            message_template="Critical latency: P99 = {latency_p99_ms:.0f}ms"
        ),
        AlertRule(
            name="high_error_rate",
            condition=lambda m: m.get("error_rate", 0) > 0.05,
            severity=AlertSeverity.WARNING,
            message_template="High error rate: {error_rate:.1%}"
        ),
        AlertRule(
            name="critical_error_rate",
            condition=lambda m: m.get("error_rate", 0) > 0.1,
            severity=AlertSeverity.CRITICAL,
            message_template="Critical error rate: {error_rate:.1%}"
        ),
        AlertRule(
            name="cost_spike",
            condition=lambda m: m.get("total_cost_usd", 0) > 100,
            severity=AlertSeverity.WARNING,
            message_template="Cost spike: ${total_cost_usd:.2f} in monitoring window"
        ),
        AlertRule(
            name="token_spike",
            condition=lambda m: m.get("avg_tokens", 0) > 10000,
            severity=AlertSeverity.WARNING,
            message_template="High token usage: avg {avg_tokens:.0f} tokens per request"
        )
    ]

class SlackNotifier:
    """Send alerts to Slack."""
    
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    def __call__(self, alert: Alert):
        """Send alert to Slack."""
        
        import requests
        
        color = {
            AlertSeverity.INFO: "#36a64f",
            AlertSeverity.WARNING: "#ff9800",
            AlertSeverity.CRITICAL: "#f44336"
        }.get(alert.severity, "#808080")
        
        payload = {
            "attachments": [{
                "color": color,
                "title": f"[{alert.severity.value.upper()}] {alert.name}",
                "text": alert.message,
                "ts": int(alert.timestamp.timestamp())
            }]
        }
        
        requests.post(self.webhook_url, json=payload)

class PagerDutyNotifier:
    """Send critical alerts to PagerDuty."""
    
    def __init__(self, routing_key: str):
        self.routing_key = routing_key
    
    def __call__(self, alert: Alert):
        """Send alert to PagerDuty."""
        
        if alert.severity != AlertSeverity.CRITICAL:
            return
        
        import requests
        
        payload = {
            "routing_key": self.routing_key,
            "event_action": "trigger",
            "payload": {
                "summary": alert.message,
                "severity": "critical",
                "source": "llm-monitoring",
                "custom_details": alert.metadata
            }
        }
        
        requests.post(
            "https://events.pagerduty.com/v2/enqueue",
            json=payload
        )

Production Monitoring Service

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio

app = FastAPI()

# Initialize components
metrics_collector = MetricsCollector()
tracer = Tracer("llm-service")
llm_tracer = LLMTracer(tracer)
quality_monitor = QualityMonitor()
drift_detector = DriftDetector()
alert_manager = AlertManager()

# Add default alert rules
for rule in create_default_alert_rules():
    alert_manager.add_rule(rule)

class MetricsRequest(BaseModel):
    request_id: str
    model: str
    latency_ms: float
    prompt_tokens: int
    completion_tokens: int
    finish_reason: str
    error: Optional[str] = None
    user_id: Optional[str] = None

class QualityRequest(BaseModel):
    request_id: str
    response: str
    query: Optional[str] = None

class FeedbackRequest(BaseModel):
    request_id: str
    rating: int
    feedback: Optional[str] = None

@app.post("/v1/metrics")
async def record_metrics(request: MetricsRequest, background_tasks: BackgroundTasks):
    """Record LLM metrics."""
    
    metrics = LLMMetrics(
        request_id=request.request_id,
        model=request.model,
        timestamp=datetime.utcnow(),
        latency_ms=request.latency_ms,
        prompt_tokens=request.prompt_tokens,
        completion_tokens=request.completion_tokens,
        total_tokens=request.prompt_tokens + request.completion_tokens,
        finish_reason=request.finish_reason,
        error=request.error,
        user_id=request.user_id
    )
    
    metrics_collector.record(metrics)
    
    # Check alerts in background
    background_tasks.add_task(check_alerts)
    
    return {"status": "recorded", "request_id": request.request_id}

@app.post("/v1/quality")
async def analyze_quality(request: QualityRequest):
    """Analyze response quality."""
    
    quality_metrics = quality_monitor.analyze_response(
        request.request_id,
        request.response,
        request.query
    )
    
    drift_detector.add_metric(quality_metrics)
    
    return {
        "request_id": request.request_id,
        "response_length": quality_metrics.response_length,
        "has_refusal": quality_metrics.has_refusal,
        "has_hallucination_markers": quality_metrics.has_hallucination_markers,
        "sentiment_score": quality_metrics.sentiment_score
    }

@app.post("/v1/feedback")
async def record_feedback(request: FeedbackRequest):
    """Record user feedback."""
    
    # Find and update quality metrics
    for metric in drift_detector.current_metrics:
        if metric.request_id == request.request_id:
            metric.user_rating = request.rating
            metric.user_feedback = request.feedback
            break
    
    return {"status": "recorded", "request_id": request.request_id}

@app.get("/v1/summary")
async def get_summary(window_minutes: int = 60):
    """Get metrics summary."""
    
    return metrics_collector.get_summary(window_minutes)

@app.get("/v1/drift")
async def check_drift():
    """Check for quality drift."""
    
    return drift_detector.check_drift()

@app.get("/v1/alerts")
async def get_alerts(hours: int = 24):
    """Get recent alerts."""
    
    alerts = alert_manager.get_active_alerts(hours)
    
    return {
        "count": len(alerts),
        "alerts": [
            {
                "id": a.id,
                "name": a.name,
                "severity": a.severity.value,
                "message": a.message,
                "timestamp": a.timestamp.isoformat()
            }
            for a in alerts
        ]
    }

@app.get("/v1/traces/{trace_id}")
async def get_trace(trace_id: str):
    """Get trace by ID."""
    
    spans = [s for s in tracer.spans if s.trace_id == trace_id]
    
    if not spans:
        return {"error": "trace_not_found"}
    
    return {
        "trace_id": trace_id,
        "spans": [
            {
                "span_id": s.span_id,
                "parent_span_id": s.parent_span_id,
                "name": s.name,
                "duration_ms": s.duration_ms,
                "status": s.status,
                "attributes": s.attributes
            }
            for s in spans
        ]
    }

async def check_alerts():
    """Background task to check alerts."""
    
    summary = metrics_collector.get_summary(window_minutes=5)
    alert_manager.check_rules(summary)

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Comprehensive monitoring is essential for production LLM applications. Track the metrics that matter: latency (including time-to-first-token for streaming), token usage, costs, and error rates. Implement distributed tracing to understand how requests flow through chains, retrievers, and tool calls. Monitor quality through automated analysis of refusals, hallucination markers, and response characteristics. Set up drift detection to catch gradual degradation before it impacts users. Build alerting systems with appropriate thresholds and cooldowns to avoid alert fatigue. The key insight is that LLM monitoring requires domain-specific metrics beyond traditional APM—you need to track tokens, costs, and quality alongside latency and errors. Start with basic metrics collection, add tracing for debugging, then layer in quality monitoring and alerting as your system matures.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.