Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

LLM Monitoring and Alerting: Building Observability for Production AI Systems

Introduction: LLM monitoring is essential for maintaining reliable, cost-effective AI applications in production. Unlike traditional software where errors are obvious, LLM failures can be subtle—degraded output quality, increased hallucinations, or slowly rising costs that go unnoticed until the monthly bill arrives. Effective monitoring tracks latency, token usage, error rates, output quality, and cost metrics in real-time, enabling teams to detect issues before they impact users. This guide covers the techniques that make LLM monitoring effective: metric collection, quality scoring, anomaly detection, alerting strategies, and dashboard design. Whether you’re running a chatbot, RAG system, or AI-powered feature, these patterns will help you maintain visibility into your LLM operations and respond quickly to problems.

LLM Monitoring and Alerting
LLM Monitoring: Metrics Collection, Analysis, Alerting

Core Metrics Collection

from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from datetime import datetime
from enum import Enum
import time

class MetricType(Enum):
    """Types of LLM metrics."""
    
    LATENCY = "latency"
    TOKENS = "tokens"
    COST = "cost"
    ERROR = "error"
    QUALITY = "quality"
    THROUGHPUT = "throughput"

@dataclass
class LLMMetric:
    """A single LLM metric."""
    
    name: str
    value: float
    metric_type: MetricType
    timestamp: datetime = field(default_factory=datetime.now)
    tags: dict = field(default_factory=dict)
    
    def to_dict(self) -> dict:
        return {
            "name": self.name,
            "value": self.value,
            "type": self.metric_type.value,
            "timestamp": self.timestamp.isoformat(),
            "tags": self.tags
        }

@dataclass
class RequestMetrics:
    """Metrics for a single LLM request."""
    
    request_id: str
    model: str
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    latency_ms: float
    time_to_first_token_ms: float = 0
    cost_usd: float = 0
    success: bool = True
    error_type: str = ""
    timestamp: datetime = field(default_factory=datetime.now)

class MetricsCollector:
    """Collect LLM metrics."""
    
    def __init__(self):
        self.metrics: list[LLMMetric] = []
        self.request_metrics: list[RequestMetrics] = []
    
    def record_request(self, metrics: RequestMetrics):
        """Record request metrics."""
        
        self.request_metrics.append(metrics)
        
        # Extract individual metrics
        self.metrics.append(LLMMetric(
            name="llm_latency_ms",
            value=metrics.latency_ms,
            metric_type=MetricType.LATENCY,
            tags={"model": metrics.model, "request_id": metrics.request_id}
        ))
        
        self.metrics.append(LLMMetric(
            name="llm_tokens_total",
            value=metrics.total_tokens,
            metric_type=MetricType.TOKENS,
            tags={"model": metrics.model, "type": "total"}
        ))
        
        self.metrics.append(LLMMetric(
            name="llm_cost_usd",
            value=metrics.cost_usd,
            metric_type=MetricType.COST,
            tags={"model": metrics.model}
        ))
        
        if not metrics.success:
            self.metrics.append(LLMMetric(
                name="llm_error",
                value=1,
                metric_type=MetricType.ERROR,
                tags={"model": metrics.model, "error_type": metrics.error_type}
            ))
    
    def record_quality(
        self,
        request_id: str,
        quality_score: float,
        quality_type: str = "overall"
    ):
        """Record quality metric."""
        
        self.metrics.append(LLMMetric(
            name="llm_quality_score",
            value=quality_score,
            metric_type=MetricType.QUALITY,
            tags={"request_id": request_id, "quality_type": quality_type}
        ))
    
    def get_metrics(
        self,
        metric_type: MetricType = None,
        since: datetime = None
    ) -> list[LLMMetric]:
        """Get filtered metrics."""
        
        result = self.metrics
        
        if metric_type:
            result = [m for m in result if m.metric_type == metric_type]
        
        if since:
            result = [m for m in result if m.timestamp >= since]
        
        return result

class MetricsAggregator:
    """Aggregate metrics over time windows."""
    
    def __init__(self, collector: MetricsCollector):
        self.collector = collector
    
    def aggregate(
        self,
        metric_name: str,
        window_minutes: int = 5,
        aggregation: str = "avg"
    ) -> float:
        """Aggregate metric over time window."""
        
        from datetime import timedelta
        
        since = datetime.now() - timedelta(minutes=window_minutes)
        metrics = [
            m for m in self.collector.metrics
            if m.name == metric_name and m.timestamp >= since
        ]
        
        if not metrics:
            return 0.0
        
        values = [m.value for m in metrics]
        
        if aggregation == "avg":
            return sum(values) / len(values)
        elif aggregation == "sum":
            return sum(values)
        elif aggregation == "max":
            return max(values)
        elif aggregation == "min":
            return min(values)
        elif aggregation == "p50":
            return sorted(values)[len(values) // 2]
        elif aggregation == "p95":
            return sorted(values)[int(len(values) * 0.95)]
        elif aggregation == "p99":
            return sorted(values)[int(len(values) * 0.99)]
        elif aggregation == "count":
            return len(values)
        
        return 0.0
    
    def get_summary(self, window_minutes: int = 60) -> dict:
        """Get summary statistics."""
        
        return {
            "latency_avg_ms": self.aggregate("llm_latency_ms", window_minutes, "avg"),
            "latency_p95_ms": self.aggregate("llm_latency_ms", window_minutes, "p95"),
            "latency_p99_ms": self.aggregate("llm_latency_ms", window_minutes, "p99"),
            "tokens_total": self.aggregate("llm_tokens_total", window_minutes, "sum"),
            "cost_total_usd": self.aggregate("llm_cost_usd", window_minutes, "sum"),
            "error_count": self.aggregate("llm_error", window_minutes, "count"),
            "request_count": len([
                m for m in self.collector.request_metrics
                if m.timestamp >= datetime.now() - timedelta(minutes=window_minutes)
            ]),
            "quality_avg": self.aggregate("llm_quality_score", window_minutes, "avg")
        }

class PrometheusExporter:
    """Export metrics to Prometheus format."""
    
    def __init__(self, collector: MetricsCollector):
        self.collector = collector
    
    def export(self) -> str:
        """Export metrics in Prometheus format."""
        
        lines = []
        
        # Group by metric name
        by_name: dict[str, list[LLMMetric]] = {}
        for metric in self.collector.metrics:
            if metric.name not in by_name:
                by_name[metric.name] = []
            by_name[metric.name].append(metric)
        
        for name, metrics in by_name.items():
            # Add help and type
            lines.append(f"# HELP {name} LLM metric")
            lines.append(f"# TYPE {name} gauge")
            
            for metric in metrics[-100:]:  # Last 100
                tags_str = ",".join(f'{k}="{v}"' for k, v in metric.tags.items())
                if tags_str:
                    lines.append(f"{name}{{{tags_str}}} {metric.value}")
                else:
                    lines.append(f"{name} {metric.value}")
        
        return "\n".join(lines)

Quality Monitoring

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import re

@dataclass
class QualityScore:
    """Quality score with breakdown."""
    
    overall: float
    relevance: float
    coherence: float
    completeness: float
    safety: float
    format_compliance: float

class QualityMonitor:
    """Monitor LLM output quality."""
    
    def __init__(self):
        self.scorers: list[Callable] = []
        self.history: list[QualityScore] = []
    
    def score(
        self,
        prompt: str,
        output: str,
        expected: str = None
    ) -> QualityScore:
        """Score output quality."""
        
        relevance = self._score_relevance(prompt, output)
        coherence = self._score_coherence(output)
        completeness = self._score_completeness(prompt, output)
        safety = self._score_safety(output)
        format_compliance = self._score_format(prompt, output)
        
        overall = (
            relevance * 0.3 +
            coherence * 0.2 +
            completeness * 0.2 +
            safety * 0.2 +
            format_compliance * 0.1
        )
        
        score = QualityScore(
            overall=overall,
            relevance=relevance,
            coherence=coherence,
            completeness=completeness,
            safety=safety,
            format_compliance=format_compliance
        )
        
        self.history.append(score)
        return score
    
    def _score_relevance(self, prompt: str, output: str) -> float:
        """Score relevance to prompt."""
        
        # Simple keyword overlap
        prompt_words = set(prompt.lower().split())
        output_words = set(output.lower().split())
        
        # Remove stopwords
        stopwords = {"the", "a", "an", "is", "are", "was", "were", "be", "been",
                    "have", "has", "had", "do", "does", "did", "will", "would",
                    "could", "should", "may", "might", "must", "shall", "can",
                    "to", "of", "in", "for", "on", "with", "at", "by", "from"}
        
        prompt_keywords = prompt_words - stopwords
        output_keywords = output_words - stopwords
        
        if not prompt_keywords:
            return 0.5
        
        overlap = len(prompt_keywords & output_keywords) / len(prompt_keywords)
        return min(overlap * 2, 1.0)  # Scale up, cap at 1
    
    def _score_coherence(self, output: str) -> float:
        """Score output coherence."""
        
        # Check for complete sentences
        sentences = re.split(r'[.!?]+', output)
        complete_sentences = [s for s in sentences if len(s.split()) >= 3]
        
        if not sentences:
            return 0.0
        
        sentence_ratio = len(complete_sentences) / len(sentences)
        
        # Check for repetition
        words = output.split()
        if len(words) > 10:
            unique_ratio = len(set(words)) / len(words)
        else:
            unique_ratio = 1.0
        
        return (sentence_ratio + unique_ratio) / 2
    
    def _score_completeness(self, prompt: str, output: str) -> float:
        """Score response completeness."""
        
        # Check output length relative to prompt complexity
        prompt_words = len(prompt.split())
        output_words = len(output.split())
        
        # Expect at least some response
        if output_words < 5:
            return 0.2
        
        # Check for truncation indicators
        if output.rstrip().endswith("..."):
            return 0.5
        
        # Check for complete ending
        if output.rstrip()[-1] in ".!?":
            return 1.0
        
        return 0.7
    
    def _score_safety(self, output: str) -> float:
        """Score output safety."""
        
        # Check for unsafe patterns
        unsafe_patterns = [
            r"password",
            r"credit card",
            r"social security",
            r"api key",
            r"secret",
            r"token"
        ]
        
        output_lower = output.lower()
        
        for pattern in unsafe_patterns:
            if re.search(pattern, output_lower):
                return 0.5
        
        return 1.0
    
    def _score_format(self, prompt: str, output: str) -> float:
        """Score format compliance."""
        
        prompt_lower = prompt.lower()
        
        # Check JSON format
        if "json" in prompt_lower:
            try:
                import json
                json.loads(output)
                return 1.0
            except json.JSONDecodeError:
                return 0.0
        
        # Check list format
        if "list" in prompt_lower:
            if re.search(r'^[\-\*\d]', output, re.MULTILINE):
                return 1.0
            return 0.5
        
        return 1.0
    
    def get_trend(self, window: int = 100) -> dict:
        """Get quality trend."""
        
        if not self.history:
            return {}
        
        recent = self.history[-window:]
        
        return {
            "overall_avg": sum(s.overall for s in recent) / len(recent),
            "overall_min": min(s.overall for s in recent),
            "overall_max": max(s.overall for s in recent),
            "relevance_avg": sum(s.relevance for s in recent) / len(recent),
            "coherence_avg": sum(s.coherence for s in recent) / len(recent),
            "safety_avg": sum(s.safety for s in recent) / len(recent)
        }

class LLMQualityJudge:
    """Use LLM to judge output quality."""
    
    def __init__(self, judge_model: Any):
        self.judge = judge_model
    
    async def score(
        self,
        prompt: str,
        output: str,
        criteria: list[str] = None
    ) -> dict:
        """Score output using LLM judge."""
        
        criteria = criteria or ["relevance", "accuracy", "helpfulness"]
        
        judge_prompt = f"""Rate the following LLM output on a scale of 1-10 for each criterion.

Original Prompt: {prompt}

Output: {output}

Rate each criterion:
{chr(10).join(f"- {c}" for c in criteria)}

Respond in this format:
{chr(10).join(f"{c}: [score]" for c in criteria)}
OVERALL: [average score]"""
        
        response = await self.judge.generate(judge_prompt)
        
        return self._parse_scores(response, criteria)
    
    def _parse_scores(self, response: str, criteria: list[str]) -> dict:
        """Parse scores from judge response."""
        
        scores = {}
        
        for line in response.split("\n"):
            for criterion in criteria + ["OVERALL"]:
                if criterion.lower() in line.lower():
                    try:
                        score = float(re.search(r"(\d+(?:\.\d+)?)", line).group(1))
                        scores[criterion.lower()] = score / 10  # Normalize to 0-1
                    except (AttributeError, ValueError):
                        pass
        
        return scores

Anomaly Detection

from dataclasses import dataclass
from typing import Any, Optional, List
from datetime import datetime, timedelta
import numpy as np

@dataclass
class Anomaly:
    """A detected anomaly."""
    
    metric_name: str
    value: float
    expected_range: tuple[float, float]
    severity: str  # low, medium, high, critical
    timestamp: datetime
    description: str

class StatisticalAnomalyDetector:
    """Detect anomalies using statistical methods."""
    
    def __init__(
        self,
        window_size: int = 100,
        z_threshold: float = 3.0
    ):
        self.window_size = window_size
        self.z_threshold = z_threshold
        self.history: dict[str, list[float]] = {}
    
    def add_value(self, metric_name: str, value: float) -> Optional[Anomaly]:
        """Add value and check for anomaly."""
        
        if metric_name not in self.history:
            self.history[metric_name] = []
        
        history = self.history[metric_name]
        
        # Need enough history
        if len(history) < 10:
            history.append(value)
            return None
        
        # Calculate statistics
        mean = np.mean(history[-self.window_size:])
        std = np.std(history[-self.window_size:])
        
        if std == 0:
            history.append(value)
            return None
        
        # Calculate z-score
        z_score = abs(value - mean) / std
        
        history.append(value)
        
        # Trim history
        if len(history) > self.window_size * 2:
            self.history[metric_name] = history[-self.window_size:]
        
        if z_score > self.z_threshold:
            severity = self._get_severity(z_score)
            
            return Anomaly(
                metric_name=metric_name,
                value=value,
                expected_range=(mean - 2*std, mean + 2*std),
                severity=severity,
                timestamp=datetime.now(),
                description=f"Value {value:.2f} is {z_score:.1f} standard deviations from mean {mean:.2f}"
            )
        
        return None
    
    def _get_severity(self, z_score: float) -> str:
        """Get severity based on z-score."""
        
        if z_score > 5:
            return "critical"
        elif z_score > 4:
            return "high"
        elif z_score > 3.5:
            return "medium"
        else:
            return "low"

class ThresholdAnomalyDetector:
    """Detect anomalies using fixed thresholds."""
    
    def __init__(self):
        self.thresholds: dict[str, dict] = {}
    
    def set_threshold(
        self,
        metric_name: str,
        warning: float = None,
        critical: float = None,
        direction: str = "above"  # above or below
    ):
        """Set threshold for metric."""
        
        self.thresholds[metric_name] = {
            "warning": warning,
            "critical": critical,
            "direction": direction
        }
    
    def check(self, metric_name: str, value: float) -> Optional[Anomaly]:
        """Check value against thresholds."""
        
        if metric_name not in self.thresholds:
            return None
        
        threshold = self.thresholds[metric_name]
        direction = threshold["direction"]
        
        # Check critical
        if threshold["critical"] is not None:
            is_critical = (
                (direction == "above" and value > threshold["critical"]) or
                (direction == "below" and value < threshold["critical"])
            )
            
            if is_critical:
                return Anomaly(
                    metric_name=metric_name,
                    value=value,
                    expected_range=(0, threshold["critical"]) if direction == "above" else (threshold["critical"], float('inf')),
                    severity="critical",
                    timestamp=datetime.now(),
                    description=f"Value {value:.2f} exceeds critical threshold {threshold['critical']}"
                )
        
        # Check warning
        if threshold["warning"] is not None:
            is_warning = (
                (direction == "above" and value > threshold["warning"]) or
                (direction == "below" and value < threshold["warning"])
            )
            
            if is_warning:
                return Anomaly(
                    metric_name=metric_name,
                    value=value,
                    expected_range=(0, threshold["warning"]) if direction == "above" else (threshold["warning"], float('inf')),
                    severity="medium",
                    timestamp=datetime.now(),
                    description=f"Value {value:.2f} exceeds warning threshold {threshold['warning']}"
                )
        
        return None

class TrendAnomalyDetector:
    """Detect anomalies in metric trends."""
    
    def __init__(
        self,
        window_size: int = 50,
        trend_threshold: float = 0.1
    ):
        self.window_size = window_size
        self.trend_threshold = trend_threshold
        self.history: dict[str, list[tuple[datetime, float]]] = {}
    
    def add_value(
        self,
        metric_name: str,
        value: float,
        timestamp: datetime = None
    ) -> Optional[Anomaly]:
        """Add value and check for trend anomaly."""
        
        timestamp = timestamp or datetime.now()
        
        if metric_name not in self.history:
            self.history[metric_name] = []
        
        self.history[metric_name].append((timestamp, value))
        
        # Trim history
        if len(self.history[metric_name]) > self.window_size * 2:
            self.history[metric_name] = self.history[metric_name][-self.window_size:]
        
        # Need enough history
        if len(self.history[metric_name]) < self.window_size:
            return None
        
        # Calculate trend
        recent = self.history[metric_name][-self.window_size:]
        values = [v for _, v in recent]
        
        # Simple linear regression
        x = np.arange(len(values))
        slope = np.polyfit(x, values, 1)[0]
        
        # Normalize slope by mean
        mean = np.mean(values)
        if mean != 0:
            normalized_slope = slope / mean
        else:
            normalized_slope = 0
        
        if abs(normalized_slope) > self.trend_threshold:
            direction = "increasing" if normalized_slope > 0 else "decreasing"
            
            return Anomaly(
                metric_name=metric_name,
                value=value,
                expected_range=(mean * 0.9, mean * 1.1),
                severity="medium",
                timestamp=timestamp,
                description=f"Metric is {direction} at {abs(normalized_slope)*100:.1f}% per window"
            )
        
        return None

class CompositeAnomalyDetector:
    """Combine multiple anomaly detectors."""
    
    def __init__(self):
        self.detectors: list[Any] = []
        self.anomalies: list[Anomaly] = []
    
    def add_detector(self, detector: Any):
        """Add a detector."""
        
        self.detectors.append(detector)
    
    def check(self, metric_name: str, value: float) -> list[Anomaly]:
        """Check all detectors."""
        
        anomalies = []
        
        for detector in self.detectors:
            if hasattr(detector, 'add_value'):
                anomaly = detector.add_value(metric_name, value)
            elif hasattr(detector, 'check'):
                anomaly = detector.check(metric_name, value)
            else:
                continue
            
            if anomaly:
                anomalies.append(anomaly)
                self.anomalies.append(anomaly)
        
        return anomalies
    
    def get_recent_anomalies(self, hours: int = 24) -> list[Anomaly]:
        """Get recent anomalies."""
        
        since = datetime.now() - timedelta(hours=hours)
        return [a for a in self.anomalies if a.timestamp >= since]

Alerting System

from dataclasses import dataclass, field
from typing import Any, Optional, List, Callable
from datetime import datetime, timedelta
from enum import Enum
import asyncio

class AlertSeverity(Enum):
    """Alert severity levels."""
    
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

@dataclass
class Alert:
    """An alert."""
    
    alert_id: str
    title: str
    description: str
    severity: AlertSeverity
    metric_name: str
    value: float
    threshold: float
    timestamp: datetime = field(default_factory=datetime.now)
    acknowledged: bool = False
    resolved: bool = False

@dataclass
class AlertRule:
    """A rule for generating alerts."""
    
    name: str
    metric_name: str
    condition: str  # gt, lt, eq, ne
    threshold: float
    severity: AlertSeverity
    cooldown_minutes: int = 5
    last_triggered: datetime = None

class AlertManager:
    """Manage alerts and notifications."""
    
    def __init__(self):
        self.rules: list[AlertRule] = []
        self.alerts: list[Alert] = []
        self.handlers: list[Callable] = []
    
    def add_rule(self, rule: AlertRule):
        """Add an alert rule."""
        
        self.rules.append(rule)
    
    def add_handler(self, handler: Callable):
        """Add alert handler."""
        
        self.handlers.append(handler)
    
    def check_metric(self, metric_name: str, value: float) -> list[Alert]:
        """Check metric against rules."""
        
        triggered_alerts = []
        
        for rule in self.rules:
            if rule.metric_name != metric_name:
                continue
            
            # Check cooldown
            if rule.last_triggered:
                cooldown_end = rule.last_triggered + timedelta(minutes=rule.cooldown_minutes)
                if datetime.now() < cooldown_end:
                    continue
            
            # Check condition
            triggered = False
            
            if rule.condition == "gt" and value > rule.threshold:
                triggered = True
            elif rule.condition == "lt" and value < rule.threshold:
                triggered = True
            elif rule.condition == "eq" and value == rule.threshold:
                triggered = True
            elif rule.condition == "ne" and value != rule.threshold:
                triggered = True
            
            if triggered:
                alert = Alert(
                    alert_id=f"alert_{len(self.alerts)}",
                    title=f"{rule.name} triggered",
                    description=f"{metric_name} is {value:.2f} (threshold: {rule.threshold})",
                    severity=rule.severity,
                    metric_name=metric_name,
                    value=value,
                    threshold=rule.threshold
                )
                
                self.alerts.append(alert)
                triggered_alerts.append(alert)
                rule.last_triggered = datetime.now()
                
                # Notify handlers
                for handler in self.handlers:
                    try:
                        handler(alert)
                    except Exception as e:
                        print(f"Handler error: {e}")
        
        return triggered_alerts
    
    def acknowledge(self, alert_id: str):
        """Acknowledge an alert."""
        
        for alert in self.alerts:
            if alert.alert_id == alert_id:
                alert.acknowledged = True
                break
    
    def resolve(self, alert_id: str):
        """Resolve an alert."""
        
        for alert in self.alerts:
            if alert.alert_id == alert_id:
                alert.resolved = True
                break
    
    def get_active_alerts(self) -> list[Alert]:
        """Get unresolved alerts."""
        
        return [a for a in self.alerts if not a.resolved]

class SlackNotifier:
    """Send alerts to Slack."""
    
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    async def notify(self, alert: Alert):
        """Send alert to Slack."""
        
        import httpx
        
        color = {
            AlertSeverity.INFO: "#36a64f",
            AlertSeverity.WARNING: "#ffcc00",
            AlertSeverity.ERROR: "#ff6600",
            AlertSeverity.CRITICAL: "#ff0000"
        }.get(alert.severity, "#808080")
        
        payload = {
            "attachments": [{
                "color": color,
                "title": alert.title,
                "text": alert.description,
                "fields": [
                    {"title": "Metric", "value": alert.metric_name, "short": True},
                    {"title": "Value", "value": f"{alert.value:.2f}", "short": True},
                    {"title": "Threshold", "value": f"{alert.threshold:.2f}", "short": True},
                    {"title": "Severity", "value": alert.severity.value, "short": True}
                ],
                "ts": int(alert.timestamp.timestamp())
            }]
        }
        
        async with httpx.AsyncClient() as client:
            await client.post(self.webhook_url, json=payload)

class PagerDutyNotifier:
    """Send alerts to PagerDuty."""
    
    def __init__(self, routing_key: str):
        self.routing_key = routing_key
    
    async def notify(self, alert: Alert):
        """Send alert to PagerDuty."""
        
        import httpx
        
        severity = {
            AlertSeverity.INFO: "info",
            AlertSeverity.WARNING: "warning",
            AlertSeverity.ERROR: "error",
            AlertSeverity.CRITICAL: "critical"
        }.get(alert.severity, "info")
        
        payload = {
            "routing_key": self.routing_key,
            "event_action": "trigger",
            "dedup_key": alert.alert_id,
            "payload": {
                "summary": alert.title,
                "severity": severity,
                "source": "llm-monitoring",
                "custom_details": {
                    "metric": alert.metric_name,
                    "value": alert.value,
                    "threshold": alert.threshold,
                    "description": alert.description
                }
            }
        }
        
        async with httpx.AsyncClient() as client:
            await client.post(
                "https://events.pagerduty.com/v2/enqueue",
                json=payload
            )

class EmailNotifier:
    """Send alerts via email."""
    
    def __init__(self, smtp_config: dict, recipients: list[str]):
        self.smtp_config = smtp_config
        self.recipients = recipients
    
    async def notify(self, alert: Alert):
        """Send alert via email."""
        
        import smtplib
        from email.mime.text import MIMEText
        
        subject = f"[{alert.severity.value.upper()}] {alert.title}"
        
        body = f"""
Alert: {alert.title}
Severity: {alert.severity.value}
Metric: {alert.metric_name}
Value: {alert.value:.2f}
Threshold: {alert.threshold:.2f}
Time: {alert.timestamp.isoformat()}

{alert.description}
"""
        
        msg = MIMEText(body)
        msg["Subject"] = subject
        msg["From"] = self.smtp_config.get("from", "alerts@example.com")
        msg["To"] = ", ".join(self.recipients)
        
        with smtplib.SMTP(
            self.smtp_config["host"],
            self.smtp_config.get("port", 587)
        ) as server:
            server.starttls()
            server.login(
                self.smtp_config["username"],
                self.smtp_config["password"]
            )
            server.send_message(msg)

Production Monitoring Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import uuid
import time

app = FastAPI()

class MetricRequest(BaseModel):
    name: str
    value: float
    tags: Optional[dict] = None

class AlertRuleRequest(BaseModel):
    name: str
    metric_name: str
    condition: str
    threshold: float
    severity: str

class DashboardData(BaseModel):
    latency_avg: float
    latency_p95: float
    tokens_total: int
    cost_total: float
    error_rate: float
    quality_avg: float
    active_alerts: int

# Initialize components
collector = MetricsCollector()
aggregator = MetricsAggregator(collector)
alert_manager = AlertManager()
anomaly_detector = CompositeAnomalyDetector()

# Add default detectors
anomaly_detector.add_detector(StatisticalAnomalyDetector())
threshold_detector = ThresholdAnomalyDetector()
threshold_detector.set_threshold("llm_latency_ms", warning=2000, critical=5000)
threshold_detector.set_threshold("llm_error", warning=0.05, critical=0.1)
anomaly_detector.add_detector(threshold_detector)

# Add default alert rules
alert_manager.add_rule(AlertRule(
    name="High Latency",
    metric_name="llm_latency_ms",
    condition="gt",
    threshold=5000,
    severity=AlertSeverity.ERROR
))

alert_manager.add_rule(AlertRule(
    name="High Error Rate",
    metric_name="llm_error_rate",
    condition="gt",
    threshold=0.1,
    severity=AlertSeverity.CRITICAL
))

@app.post("/v1/metrics")
async def record_metric(request: MetricRequest) -> dict:
    """Record a metric."""
    
    metric = LLMMetric(
        name=request.name,
        value=request.value,
        metric_type=MetricType.LATENCY,  # Would determine from name
        tags=request.tags or {}
    )
    
    collector.metrics.append(metric)
    
    # Check for anomalies
    anomalies = anomaly_detector.check(request.name, request.value)
    
    # Check alert rules
    alerts = alert_manager.check_metric(request.name, request.value)
    
    return {
        "recorded": True,
        "anomalies": len(anomalies),
        "alerts": len(alerts)
    }

@app.get("/v1/metrics/summary")
async def get_summary(window_minutes: int = 60) -> dict:
    """Get metrics summary."""
    
    return aggregator.get_summary(window_minutes)

@app.get("/v1/dashboard")
async def get_dashboard() -> DashboardData:
    """Get dashboard data."""
    
    summary = aggregator.get_summary(60)
    
    return DashboardData(
        latency_avg=summary.get("latency_avg_ms", 0),
        latency_p95=summary.get("latency_p95_ms", 0),
        tokens_total=int(summary.get("tokens_total", 0)),
        cost_total=summary.get("cost_total_usd", 0),
        error_rate=summary.get("error_count", 0) / max(summary.get("request_count", 1), 1),
        quality_avg=summary.get("quality_avg", 0),
        active_alerts=len(alert_manager.get_active_alerts())
    )

@app.get("/v1/alerts")
async def get_alerts(active_only: bool = True) -> list[dict]:
    """Get alerts."""
    
    if active_only:
        alerts = alert_manager.get_active_alerts()
    else:
        alerts = alert_manager.alerts
    
    return [
        {
            "alert_id": a.alert_id,
            "title": a.title,
            "severity": a.severity.value,
            "metric": a.metric_name,
            "value": a.value,
            "timestamp": a.timestamp.isoformat(),
            "acknowledged": a.acknowledged,
            "resolved": a.resolved
        }
        for a in alerts
    ]

@app.post("/v1/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str) -> dict:
    """Acknowledge an alert."""
    
    alert_manager.acknowledge(alert_id)
    return {"acknowledged": True}

@app.post("/v1/alerts/{alert_id}/resolve")
async def resolve_alert(alert_id: str) -> dict:
    """Resolve an alert."""
    
    alert_manager.resolve(alert_id)
    return {"resolved": True}

@app.post("/v1/rules")
async def add_rule(request: AlertRuleRequest) -> dict:
    """Add alert rule."""
    
    rule = AlertRule(
        name=request.name,
        metric_name=request.metric_name,
        condition=request.condition,
        threshold=request.threshold,
        severity=AlertSeverity(request.severity)
    )
    
    alert_manager.add_rule(rule)
    return {"added": True}

@app.get("/v1/anomalies")
async def get_anomalies(hours: int = 24) -> list[dict]:
    """Get recent anomalies."""
    
    anomalies = anomaly_detector.get_recent_anomalies(hours)
    
    return [
        {
            "metric": a.metric_name,
            "value": a.value,
            "expected_range": a.expected_range,
            "severity": a.severity,
            "timestamp": a.timestamp.isoformat(),
            "description": a.description
        }
        for a in anomalies
    ]

@app.get("/metrics")
async def prometheus_metrics() -> str:
    """Export Prometheus metrics."""
    
    exporter = PrometheusExporter(collector)
    return exporter.export()

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

LLM monitoring requires tracking metrics that traditional software monitoring doesn’t capture—token usage, output quality, and cost alongside latency and errors. Start with the basics: latency percentiles (p50, p95, p99), token counts, error rates, and cost per request. Add quality monitoring using automated scoring for relevance, coherence, and safety; LLM-as-judge provides deeper quality insights but adds cost and latency. Anomaly detection should combine statistical methods (z-score for sudden spikes) with threshold-based rules (hard limits on latency and cost) and trend detection (gradual degradation over time). Alert fatigue is real—tune your thresholds carefully and use cooldown periods to avoid notification storms. Build dashboards that show both real-time metrics and trends over time; a slowly increasing error rate is as important as a sudden spike. Export metrics to Prometheus for integration with existing monitoring infrastructure. The key insight is that LLM failures are often subtle—output quality degrades before errors appear, costs creep up before budgets are exceeded. Proactive monitoring catches these issues early, before they impact users or budgets.