Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

LLM Observability Patterns: Tracing, Metrics, and Logging for Production AI Systems

Introduction: LLM applications are notoriously difficult to debug and monitor. Unlike traditional software where inputs and outputs are deterministic, LLMs produce variable outputs that can fail in subtle ways. Observability—the ability to understand system behavior from external outputs—is essential for production LLM systems. This guide covers practical observability patterns: distributed tracing for complex LLM chains, metrics collection for performance and cost monitoring, structured logging for debugging and analysis, and alerting strategies for catching issues before users do. Whether you’re running simple completions or complex multi-agent systems, robust observability transforms LLM operations from guesswork into engineering.

LLM Observability
Observability: Distributed Tracing, Metrics Collection, Structured Logging

Distributed Tracing

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import uuid
import time
from contextlib import contextmanager

@dataclass
class Span:
    """A span in a trace."""
    
    trace_id: str
    span_id: str
    parent_id: Optional[str]
    name: str
    start_time: float
    end_time: float = None
    attributes: dict = field(default_factory=dict)
    events: list[dict] = field(default_factory=list)
    status: str = "ok"
    
    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0

@dataclass
class Trace:
    """A complete trace."""
    
    trace_id: str
    spans: list[Span] = field(default_factory=list)
    
    @property
    def root_span(self) -> Optional[Span]:
        for span in self.spans:
            if span.parent_id is None:
                return span
        return None
    
    @property
    def total_duration_ms(self) -> float:
        root = self.root_span
        return root.duration_ms if root else 0

class LLMTracer:
    """Tracer for LLM operations."""
    
    def __init__(self, service_name: str = "llm-service"):
        self.service_name = service_name
        self._current_trace: Optional[str] = None
        self._current_span: Optional[str] = None
        self._traces: dict[str, Trace] = {}
        self._spans: dict[str, Span] = {}
    
    def start_trace(self, name: str) -> str:
        """Start a new trace."""
        
        trace_id = str(uuid.uuid4())
        self._current_trace = trace_id
        self._traces[trace_id] = Trace(trace_id=trace_id)
        
        # Create root span
        self.start_span(name)
        
        return trace_id
    
    def start_span(
        self,
        name: str,
        attributes: dict = None
    ) -> str:
        """Start a new span."""
        
        span_id = str(uuid.uuid4())
        
        span = Span(
            trace_id=self._current_trace,
            span_id=span_id,
            parent_id=self._current_span,
            name=name,
            start_time=time.time(),
            attributes=attributes or {}
        )
        
        self._spans[span_id] = span
        self._traces[self._current_trace].spans.append(span)
        self._current_span = span_id
        
        return span_id
    
    def end_span(
        self,
        span_id: str = None,
        status: str = "ok",
        attributes: dict = None
    ):
        """End a span."""
        
        span_id = span_id or self._current_span
        span = self._spans.get(span_id)
        
        if span:
            span.end_time = time.time()
            span.status = status
            if attributes:
                span.attributes.update(attributes)
            
            # Move to parent span
            self._current_span = span.parent_id
    
    def add_event(
        self,
        name: str,
        attributes: dict = None
    ):
        """Add an event to current span."""
        
        span = self._spans.get(self._current_span)
        if span:
            span.events.append({
                "name": name,
                "timestamp": time.time(),
                "attributes": attributes or {}
            })
    
    def set_attribute(self, key: str, value: Any):
        """Set attribute on current span."""
        
        span = self._spans.get(self._current_span)
        if span:
            span.attributes[key] = value
    
    @contextmanager
    def span(self, name: str, attributes: dict = None):
        """Context manager for spans."""
        
        span_id = self.start_span(name, attributes)
        try:
            yield span_id
        except Exception as e:
            self.end_span(span_id, status="error", attributes={"error": str(e)})
            raise
        else:
            self.end_span(span_id)
    
    def get_trace(self, trace_id: str) -> Optional[Trace]:
        """Get a trace by ID."""
        return self._traces.get(trace_id)

class LLMSpanAttributes:
    """Standard attributes for LLM spans."""
    
    # Request attributes
    MODEL = "llm.model"
    PROVIDER = "llm.provider"
    TEMPERATURE = "llm.temperature"
    MAX_TOKENS = "llm.max_tokens"
    
    # Token attributes
    INPUT_TOKENS = "llm.tokens.input"
    OUTPUT_TOKENS = "llm.tokens.output"
    TOTAL_TOKENS = "llm.tokens.total"
    
    # Cost attributes
    INPUT_COST = "llm.cost.input"
    OUTPUT_COST = "llm.cost.output"
    TOTAL_COST = "llm.cost.total"
    
    # Response attributes
    FINISH_REASON = "llm.finish_reason"
    RESPONSE_ID = "llm.response_id"
    
    # RAG attributes
    RETRIEVAL_COUNT = "rag.retrieval.count"
    RETRIEVAL_LATENCY = "rag.retrieval.latency_ms"
    
    # Chain attributes
    CHAIN_TYPE = "chain.type"
    CHAIN_STEP = "chain.step"

def trace_llm_call(tracer: LLMTracer):
    """Decorator to trace LLM calls."""
    
    def decorator(func):
        async def wrapper(*args, **kwargs):
            with tracer.span(f"llm.{func.__name__}") as span_id:
                # Set request attributes
                if "model" in kwargs:
                    tracer.set_attribute(LLMSpanAttributes.MODEL, kwargs["model"])
                if "temperature" in kwargs:
                    tracer.set_attribute(LLMSpanAttributes.TEMPERATURE, kwargs["temperature"])
                
                result = await func(*args, **kwargs)
                
                # Set response attributes
                if hasattr(result, "usage"):
                    tracer.set_attribute(LLMSpanAttributes.INPUT_TOKENS, result.usage.prompt_tokens)
                    tracer.set_attribute(LLMSpanAttributes.OUTPUT_TOKENS, result.usage.completion_tokens)
                    tracer.set_attribute(LLMSpanAttributes.TOTAL_TOKENS, result.usage.total_tokens)
                
                return result
        
        return wrapper
    return decorator

Metrics Collection

from dataclasses import dataclass, field
from typing import Any, Optional
from collections import defaultdict
import time
import threading

@dataclass
class MetricValue:
    """A metric value."""
    
    name: str
    value: float
    timestamp: float
    labels: dict = field(default_factory=dict)

class Counter:
    """A counter metric."""
    
    def __init__(self, name: str, description: str = ""):
        self.name = name
        self.description = description
        self._values: dict[tuple, float] = defaultdict(float)
        self._lock = threading.Lock()
    
    def inc(self, value: float = 1, labels: dict = None):
        """Increment counter."""
        
        label_key = tuple(sorted((labels or {}).items()))
        
        with self._lock:
            self._values[label_key] += value
    
    def get(self, labels: dict = None) -> float:
        """Get counter value."""
        
        label_key = tuple(sorted((labels or {}).items()))
        return self._values.get(label_key, 0)

class Gauge:
    """A gauge metric."""
    
    def __init__(self, name: str, description: str = ""):
        self.name = name
        self.description = description
        self._values: dict[tuple, float] = {}
        self._lock = threading.Lock()
    
    def set(self, value: float, labels: dict = None):
        """Set gauge value."""
        
        label_key = tuple(sorted((labels or {}).items()))
        
        with self._lock:
            self._values[label_key] = value
    
    def inc(self, value: float = 1, labels: dict = None):
        """Increment gauge."""
        
        label_key = tuple(sorted((labels or {}).items()))
        
        with self._lock:
            self._values[label_key] = self._values.get(label_key, 0) + value
    
    def dec(self, value: float = 1, labels: dict = None):
        """Decrement gauge."""
        self.inc(-value, labels)
    
    def get(self, labels: dict = None) -> float:
        """Get gauge value."""
        
        label_key = tuple(sorted((labels or {}).items()))
        return self._values.get(label_key, 0)

class Histogram:
    """A histogram metric."""
    
    def __init__(
        self,
        name: str,
        description: str = "",
        buckets: list[float] = None
    ):
        self.name = name
        self.description = description
        self.buckets = buckets or [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
        self._counts: dict[tuple, dict[float, int]] = defaultdict(lambda: defaultdict(int))
        self._sums: dict[tuple, float] = defaultdict(float)
        self._totals: dict[tuple, int] = defaultdict(int)
        self._lock = threading.Lock()
    
    def observe(self, value: float, labels: dict = None):
        """Record an observation."""
        
        label_key = tuple(sorted((labels or {}).items()))
        
        with self._lock:
            self._sums[label_key] += value
            self._totals[label_key] += 1
            
            for bucket in self.buckets:
                if value <= bucket:
                    self._counts[label_key][bucket] += 1
    
    def get_percentile(self, percentile: float, labels: dict = None) -> float:
        """Get approximate percentile value."""
        
        label_key = tuple(sorted((labels or {}).items()))
        total = self._totals.get(label_key, 0)
        
        if total == 0:
            return 0
        
        target = total * percentile
        cumulative = 0
        
        for bucket in sorted(self.buckets):
            cumulative += self._counts[label_key].get(bucket, 0)
            if cumulative >= target:
                return bucket
        
        return self.buckets[-1]

class LLMMetrics:
    """Metrics for LLM operations."""
    
    def __init__(self):
        # Request metrics
        self.requests_total = Counter(
            "llm_requests_total",
            "Total LLM requests"
        )
        self.request_errors = Counter(
            "llm_request_errors_total",
            "Total LLM request errors"
        )
        
        # Latency metrics
        self.request_latency = Histogram(
            "llm_request_latency_seconds",
            "LLM request latency",
            buckets=[0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60]
        )
        self.time_to_first_token = Histogram(
            "llm_ttft_seconds",
            "Time to first token",
            buckets=[0.05, 0.1, 0.25, 0.5, 1, 2, 5]
        )
        
        # Token metrics
        self.input_tokens = Counter(
            "llm_input_tokens_total",
            "Total input tokens"
        )
        self.output_tokens = Counter(
            "llm_output_tokens_total",
            "Total output tokens"
        )
        
        # Cost metrics
        self.cost_total = Counter(
            "llm_cost_dollars_total",
            "Total LLM cost in dollars"
        )
        
        # Active requests
        self.active_requests = Gauge(
            "llm_active_requests",
            "Currently active LLM requests"
        )
        
        # Cache metrics
        self.cache_hits = Counter(
            "llm_cache_hits_total",
            "Cache hits"
        )
        self.cache_misses = Counter(
            "llm_cache_misses_total",
            "Cache misses"
        )
    
    def record_request(
        self,
        model: str,
        latency: float,
        input_tokens: int,
        output_tokens: int,
        cost: float,
        success: bool = True
    ):
        """Record a completed request."""
        
        labels = {"model": model}
        
        self.requests_total.inc(labels=labels)
        self.request_latency.observe(latency, labels=labels)
        self.input_tokens.inc(input_tokens, labels=labels)
        self.output_tokens.inc(output_tokens, labels=labels)
        self.cost_total.inc(cost, labels=labels)
        
        if not success:
            self.request_errors.inc(labels=labels)
    
    def get_summary(self) -> dict:
        """Get metrics summary."""
        
        return {
            "total_requests": sum(self.requests_total._values.values()),
            "total_errors": sum(self.request_errors._values.values()),
            "total_input_tokens": sum(self.input_tokens._values.values()),
            "total_output_tokens": sum(self.output_tokens._values.values()),
            "total_cost": sum(self.cost_total._values.values()),
            "p50_latency": self.request_latency.get_percentile(0.5),
            "p95_latency": self.request_latency.get_percentile(0.95),
            "p99_latency": self.request_latency.get_percentile(0.99),
        }

Structured Logging

from dataclasses import dataclass, asdict
from typing import Any, Optional
from datetime import datetime
import json
import logging
from enum import Enum

class LogLevel(Enum):
    """Log levels."""
    
    DEBUG = "debug"
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"

@dataclass
class LLMLogEntry:
    """Structured log entry for LLM operations."""
    
    timestamp: str
    level: str
    message: str
    trace_id: str = None
    span_id: str = None
    
    # Request context
    model: str = None
    provider: str = None
    
    # Token info
    input_tokens: int = None
    output_tokens: int = None
    
    # Timing
    latency_ms: float = None
    
    # Cost
    cost_usd: float = None
    
    # Error info
    error_type: str = None
    error_message: str = None
    
    # Custom attributes
    attributes: dict = None
    
    def to_json(self) -> str:
        """Convert to JSON string."""
        
        data = {k: v for k, v in asdict(self).items() if v is not None}
        return json.dumps(data)

class LLMLogger:
    """Structured logger for LLM operations."""
    
    def __init__(
        self,
        name: str = "llm",
        level: LogLevel = LogLevel.INFO
    ):
        self.name = name
        self.level = level
        self._logger = logging.getLogger(name)
        self._context: dict = {}
    
    def set_context(self, **kwargs):
        """Set context for all subsequent logs."""
        self._context.update(kwargs)
    
    def clear_context(self):
        """Clear logging context."""
        self._context = {}
    
    def _create_entry(
        self,
        level: LogLevel,
        message: str,
        **kwargs
    ) -> LLMLogEntry:
        """Create a log entry."""
        
        return LLMLogEntry(
            timestamp=datetime.utcnow().isoformat(),
            level=level.value,
            message=message,
            **{**self._context, **kwargs}
        )
    
    def debug(self, message: str, **kwargs):
        """Log debug message."""
        
        entry = self._create_entry(LogLevel.DEBUG, message, **kwargs)
        self._logger.debug(entry.to_json())
    
    def info(self, message: str, **kwargs):
        """Log info message."""
        
        entry = self._create_entry(LogLevel.INFO, message, **kwargs)
        self._logger.info(entry.to_json())
    
    def warning(self, message: str, **kwargs):
        """Log warning message."""
        
        entry = self._create_entry(LogLevel.WARNING, message, **kwargs)
        self._logger.warning(entry.to_json())
    
    def error(self, message: str, **kwargs):
        """Log error message."""
        
        entry = self._create_entry(LogLevel.ERROR, message, **kwargs)
        self._logger.error(entry.to_json())
    
    def log_request(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: float,
        cost_usd: float = None,
        **kwargs
    ):
        """Log a completed LLM request."""
        
        self.info(
            "LLM request completed",
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            latency_ms=latency_ms,
            cost_usd=cost_usd,
            **kwargs
        )
    
    def log_error(
        self,
        error: Exception,
        model: str = None,
        **kwargs
    ):
        """Log an LLM error."""
        
        self.error(
            f"LLM request failed: {str(error)}",
            model=model,
            error_type=type(error).__name__,
            error_message=str(error),
            **kwargs
        )

class PromptLogger:
    """Logger for prompts and responses."""
    
    def __init__(
        self,
        logger: LLMLogger,
        log_prompts: bool = True,
        log_responses: bool = True,
        max_length: int = 1000
    ):
        self.logger = logger
        self.log_prompts = log_prompts
        self.log_responses = log_responses
        self.max_length = max_length
    
    def log_prompt(
        self,
        prompt: str,
        model: str,
        trace_id: str = None
    ):
        """Log a prompt."""
        
        if not self.log_prompts:
            return
        
        truncated = prompt[:self.max_length]
        if len(prompt) > self.max_length:
            truncated += f"... ({len(prompt) - self.max_length} chars truncated)"
        
        self.logger.debug(
            "LLM prompt",
            model=model,
            trace_id=trace_id,
            attributes={"prompt": truncated}
        )
    
    def log_response(
        self,
        response: str,
        model: str,
        trace_id: str = None
    ):
        """Log a response."""
        
        if not self.log_responses:
            return
        
        truncated = response[:self.max_length]
        if len(response) > self.max_length:
            truncated += f"... ({len(response) - self.max_length} chars truncated)"
        
        self.logger.debug(
            "LLM response",
            model=model,
            trace_id=trace_id,
            attributes={"response": truncated}
        )

Alerting and Monitoring

from dataclasses import dataclass
from typing import Any, Optional, Callable
from enum import Enum
import asyncio

class AlertSeverity(Enum):
    """Alert severity levels."""
    
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    """An alert."""
    
    name: str
    severity: AlertSeverity
    message: str
    value: float
    threshold: float
    labels: dict = None

@dataclass
class AlertRule:
    """An alerting rule."""
    
    name: str
    description: str
    condition: Callable[[Any], bool]
    severity: AlertSeverity
    message_template: str

class AlertManager:
    """Manage alerts for LLM operations."""
    
    def __init__(self):
        self.rules: list[AlertRule] = []
        self.handlers: list[Callable[[Alert], None]] = []
        self._active_alerts: dict[str, Alert] = {}
    
    def add_rule(self, rule: AlertRule):
        """Add an alerting rule."""
        self.rules.append(rule)
    
    def add_handler(self, handler: Callable[[Alert], None]):
        """Add an alert handler."""
        self.handlers.append(handler)
    
    def check(self, metrics: LLMMetrics):
        """Check all rules against current metrics."""
        
        for rule in self.rules:
            try:
                if rule.condition(metrics):
                    self._fire_alert(rule, metrics)
                else:
                    self._resolve_alert(rule.name)
            except Exception as e:
                print(f"Error checking rule {rule.name}: {e}")
    
    def _fire_alert(self, rule: AlertRule, metrics: LLMMetrics):
        """Fire an alert."""
        
        if rule.name in self._active_alerts:
            return  # Already active
        
        alert = Alert(
            name=rule.name,
            severity=rule.severity,
            message=rule.message_template,
            value=0,  # Would be populated from metrics
            threshold=0
        )
        
        self._active_alerts[rule.name] = alert
        
        for handler in self.handlers:
            handler(alert)
    
    def _resolve_alert(self, name: str):
        """Resolve an alert."""
        
        if name in self._active_alerts:
            del self._active_alerts[name]

class LLMAlertRules:
    """Standard alert rules for LLM operations."""
    
    @staticmethod
    def high_error_rate(threshold: float = 0.05) -> AlertRule:
        """Alert on high error rate."""
        
        def condition(metrics: LLMMetrics) -> bool:
            total = sum(metrics.requests_total._values.values())
            errors = sum(metrics.request_errors._values.values())
            
            if total < 100:  # Minimum sample
                return False
            
            return (errors / total) > threshold
        
        return AlertRule(
            name="high_error_rate",
            description="LLM error rate exceeds threshold",
            condition=condition,
            severity=AlertSeverity.CRITICAL,
            message_template=f"LLM error rate exceeds {threshold*100}%"
        )
    
    @staticmethod
    def high_latency(threshold_ms: float = 5000) -> AlertRule:
        """Alert on high latency."""
        
        def condition(metrics: LLMMetrics) -> bool:
            p95 = metrics.request_latency.get_percentile(0.95)
            return p95 * 1000 > threshold_ms
        
        return AlertRule(
            name="high_latency",
            description="LLM latency exceeds threshold",
            condition=condition,
            severity=AlertSeverity.WARNING,
            message_template=f"LLM p95 latency exceeds {threshold_ms}ms"
        )
    
    @staticmethod
    def cost_spike(threshold_per_hour: float = 100) -> AlertRule:
        """Alert on cost spike."""
        
        def condition(metrics: LLMMetrics) -> bool:
            # This would need time-windowed metrics
            total_cost = sum(metrics.cost_total._values.values())
            return total_cost > threshold_per_hour
        
        return AlertRule(
            name="cost_spike",
            description="LLM cost exceeds threshold",
            condition=condition,
            severity=AlertSeverity.WARNING,
            message_template=f"LLM cost exceeds ${threshold_per_hour}/hour"
        )
    
    @staticmethod
    def low_cache_hit_rate(threshold: float = 0.3) -> AlertRule:
        """Alert on low cache hit rate."""
        
        def condition(metrics: LLMMetrics) -> bool:
            hits = sum(metrics.cache_hits._values.values())
            misses = sum(metrics.cache_misses._values.values())
            total = hits + misses
            
            if total < 100:
                return False
            
            return (hits / total) < threshold
        
        return AlertRule(
            name="low_cache_hit_rate",
            description="Cache hit rate below threshold",
            condition=condition,
            severity=AlertSeverity.INFO,
            message_template=f"Cache hit rate below {threshold*100}%"
        )

class SlackAlertHandler:
    """Send alerts to Slack."""
    
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    async def handle(self, alert: Alert):
        """Send alert to Slack."""
        
        color = {
            AlertSeverity.INFO: "#36a64f",
            AlertSeverity.WARNING: "#ff9800",
            AlertSeverity.CRITICAL: "#f44336"
        }.get(alert.severity, "#808080")
        
        payload = {
            "attachments": [{
                "color": color,
                "title": f"[{alert.severity.value.upper()}] {alert.name}",
                "text": alert.message,
                "fields": [
                    {"title": "Value", "value": str(alert.value), "short": True},
                    {"title": "Threshold", "value": str(alert.threshold), "short": True}
                ]
            }]
        }
        
        # Would send to Slack webhook
        print(f"Slack alert: {payload}")

Production Observability Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
tracer = LLMTracer()
metrics = LLMMetrics()
logger = LLMLogger()
alert_manager = AlertManager()

# Add default alert rules
alert_manager.add_rule(LLMAlertRules.high_error_rate())
alert_manager.add_rule(LLMAlertRules.high_latency())
alert_manager.add_rule(LLMAlertRules.cost_spike())

class TraceRequest(BaseModel):
    name: str

class SpanRequest(BaseModel):
    trace_id: str
    name: str
    attributes: Optional[dict] = None

class MetricRequest(BaseModel):
    model: str
    latency_seconds: float
    input_tokens: int
    output_tokens: int
    cost_usd: float
    success: bool = True

class LogRequest(BaseModel):
    level: str
    message: str
    model: Optional[str] = None
    trace_id: Optional[str] = None
    attributes: Optional[dict] = None

@app.post("/v1/traces")
async def start_trace(request: TraceRequest):
    """Start a new trace."""
    
    trace_id = tracer.start_trace(request.name)
    
    return {
        "trace_id": trace_id,
        "status": "started"
    }

@app.post("/v1/spans")
async def start_span(request: SpanRequest):
    """Start a new span."""
    
    tracer._current_trace = request.trace_id
    span_id = tracer.start_span(request.name, request.attributes)
    
    return {
        "span_id": span_id,
        "trace_id": request.trace_id,
        "status": "started"
    }

@app.put("/v1/spans/{span_id}")
async def end_span(span_id: str, status: str = "ok"):
    """End a span."""
    
    tracer.end_span(span_id, status)
    
    return {"status": "ended"}

@app.get("/v1/traces/{trace_id}")
async def get_trace(trace_id: str):
    """Get trace details."""
    
    trace = tracer.get_trace(trace_id)
    
    if not trace:
        raise HTTPException(404, "Trace not found")
    
    return {
        "trace_id": trace.trace_id,
        "total_duration_ms": trace.total_duration_ms,
        "spans": [
            {
                "span_id": s.span_id,
                "name": s.name,
                "duration_ms": s.duration_ms,
                "status": s.status,
                "attributes": s.attributes
            }
            for s in trace.spans
        ]
    }

@app.post("/v1/metrics")
async def record_metric(request: MetricRequest):
    """Record LLM metrics."""
    
    metrics.record_request(
        model=request.model,
        latency=request.latency_seconds,
        input_tokens=request.input_tokens,
        output_tokens=request.output_tokens,
        cost=request.cost_usd,
        success=request.success
    )
    
    # Check alerts
    alert_manager.check(metrics)
    
    return {"status": "recorded"}

@app.get("/v1/metrics")
async def get_metrics():
    """Get metrics summary."""
    
    return metrics.get_summary()

@app.get("/v1/metrics/prometheus")
async def get_prometheus_metrics():
    """Get metrics in Prometheus format."""
    
    lines = []
    
    # Requests total
    for labels, value in metrics.requests_total._values.items():
        label_str = ",".join(f'{k}="{v}"' for k, v in labels)
        lines.append(f'llm_requests_total{{{label_str}}} {value}')
    
    # Errors total
    for labels, value in metrics.request_errors._values.items():
        label_str = ",".join(f'{k}="{v}"' for k, v in labels)
        lines.append(f'llm_request_errors_total{{{label_str}}} {value}')
    
    # Tokens
    for labels, value in metrics.input_tokens._values.items():
        label_str = ",".join(f'{k}="{v}"' for k, v in labels)
        lines.append(f'llm_input_tokens_total{{{label_str}}} {value}')
    
    for labels, value in metrics.output_tokens._values.items():
        label_str = ",".join(f'{k}="{v}"' for k, v in labels)
        lines.append(f'llm_output_tokens_total{{{label_str}}} {value}')
    
    return "\n".join(lines)

@app.post("/v1/logs")
async def create_log(request: LogRequest):
    """Create a log entry."""
    
    level = LogLevel(request.level)
    
    if level == LogLevel.DEBUG:
        logger.debug(request.message, model=request.model, trace_id=request.trace_id)
    elif level == LogLevel.INFO:
        logger.info(request.message, model=request.model, trace_id=request.trace_id)
    elif level == LogLevel.WARNING:
        logger.warning(request.message, model=request.model, trace_id=request.trace_id)
    elif level == LogLevel.ERROR:
        logger.error(request.message, model=request.model, trace_id=request.trace_id)
    
    return {"status": "logged"}

@app.get("/v1/alerts")
async def get_active_alerts():
    """Get active alerts."""
    
    return {
        "alerts": [
            {
                "name": alert.name,
                "severity": alert.severity.value,
                "message": alert.message
            }
            for alert in alert_manager._active_alerts.values()
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Observability transforms LLM operations from black boxes into understandable systems. Start with distributed tracing—track requests through your entire LLM pipeline, from initial query through retrieval, generation, and post-processing. Capture standard attributes like model, tokens, latency, and cost on every span. Implement comprehensive metrics: request counts, error rates, latency distributions, token usage, and costs. Use histograms for latency to understand percentiles, not just averages. Add structured logging that captures prompts and responses (with appropriate truncation and privacy controls) for debugging. Build alerting on top of your metrics—catch high error rates, latency spikes, and cost anomalies before users complain. Export to standard formats like Prometheus for integration with existing monitoring infrastructure. The key insight is that LLM observability requires domain-specific instrumentation—generic APM tools miss critical details like token counts and model-specific behavior. Invest in observability infrastructure early and you'll debug issues in minutes instead of hours.