Observability Practices in AI Engineering: A Complete Guide to LLM Monitoring

Here’s a truth that took me six months to learn the hard way: traditional observability doesn’t work for AI applications. You can have perfect uptime, sub-100ms latency, and zero errors—and your AI product can still be completely broken.

Why? Because LLMs fail differently. They don’t throw errors; they confidently hallucinate. They don’t crash; they drift. They don’t timeout; they slowly become irrelevant as your data changes.

This guide covers everything you need to know about observability for AI applications: the metrics that matter, the tools available, implementation patterns, and the hard-won lessons from running LLM applications in production.

What you’ll learn: How to build comprehensive observability for AI applications, compare leading tools, track the right metrics, and implement production-grade monitoring from day one.

Why AI Observability is Different

Traditional observability focuses on three pillars: logs, metrics, and traces. For AI applications, we need to add three more:

  • Prompt tracking – What you asked the model
  • Output quality – How good was the response
  • Cost attribution – Who/what is spending money
LLM Observability Architecture
Figure 1: The complete LLM observability stack – from application to alerting

The Metrics That Actually Matter

After running LLM applications for over a year, here are the metrics I check every single day:

Key LLM Observability Metrics Dashboard
Figure 2: The key metrics for production LLM observability

1. Performance Metrics

Metric What It Measures Target Why It Matters
Latency (P50/P95/P99) End-to-end response time P95 < 3s User experience
Time to First Token (TTFT) How fast streaming starts < 500ms Perceived responsiveness
Tokens per Second Streaming throughput > 30 tok/s Reading speed match
Queue Time Wait before processing < 100ms Capacity planning

2. Cost Metrics

Metric What It Measures Action Threshold
Cost per Request Average $/request > 2x baseline
Daily/Monthly Spend Total API costs > budget
Token Ratio Input vs output tokens > 10:1 (investigate prompts)
Cost per User Attribution by user Outliers > 5x avg
Cost per Feature Which features cost most ROI analysis

3. Quality Metrics

Metric How to Measure Target
Relevance Score LLM-as-judge or embedding similarity > 0.85
Faithfulness Does output match source docs (RAG) > 0.90
User Feedback Thumbs up/down, ratings > 80% positive
Task Completion Did user achieve their goal > 70%
Hallucination Rate Factual errors detected < 5%

4. Operational Metrics

Metric What It Measures Alert Threshold
Error Rate API failures, timeouts > 1%
Rate Limit Hits 429 responses > 0.5%
Cache Hit Rate Semantic cache effectiveness < 20% (investigate)
Model Distribution Which models are being used Unexpected changes

Observability Tools Comparison

The LLM observability space has exploded in 2024-2025. Here’s an honest comparison of the leading tools:

LLM Observability Tools Comparison
Figure 3: Comparing the leading LLM observability tools

Langfuse: The Open Source Leader

Best for: Teams wanting self-hosting, full control, and comprehensive features.

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from openai import OpenAI

langfuse = Langfuse()
client = OpenAI()

@observe()
def rag_pipeline(query: str) -> str:
    """Complete RAG pipeline with full observability."""
    
    # Track retrieval step
    with langfuse_context.observe(name="retrieval") as span:
        docs = retrieve_documents(query)
        span.update(
            input=query,
            output={"doc_count": len(docs)},
            metadata={"retriever": "pinecone"}
        )
    
    # Track generation step
    with langfuse_context.observe(name="generation") as span:
        response = client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[
                {"role": "system", "content": f"Context: {docs}"},
                {"role": "user", "content": query}
            ]
        )
        result = response.choices[0].message.content
        
        span.update(
            input={"query": query, "context_length": len(docs)},
            output=result,
            usage={
                "input_tokens": response.usage.prompt_tokens,
                "output_tokens": response.usage.completion_tokens
            }
        )
    
    # Score the trace
    langfuse_context.score_current_trace(
        name="user-feedback",
        value=1,  # Will be updated when user provides feedback
        comment="Pending user feedback"
    )
    
    return result

# Usage
answer = rag_pipeline("What is our refund policy?")

Langfuse Evaluation Framework

from langfuse import Langfuse

langfuse = Langfuse()

# Create evaluation dataset
dataset = langfuse.create_dataset(name="qa-evaluation-v1")

# Add test cases
dataset.add_item(
    input={"query": "What is the return policy?"},
    expected_output="30-day money-back guarantee",
    metadata={"category": "policy"}
)

# Run evaluation
def evaluate_model(item):
    output = rag_pipeline(item.input["query"])
    
    # LLM-as-judge for relevance
    judge_response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{
            "role": "user",
            "content": f"""Rate the relevance of this answer (0-1):
            Question: {item.input['query']}
            Expected: {item.expected_output}
            Actual: {output}
            
            Return only a number between 0 and 1."""
        }]
    )
    
    score = float(judge_response.choices[0].message.content.strip())
    
    return {
        "output": output,
        "scores": {"relevance": score}
    }

# Run on dataset
for item in dataset.items:
    result = evaluate_model(item)
    langfuse.score(
        trace_id=langfuse_context.get_current_trace_id(),
        name="relevance",
        value=result["scores"]["relevance"]
    )

Helicone: The Simplest Setup

Best for: Quick integration with minimal code changes.

# Just change your base URL - that's it!
from openai import OpenAI

client = OpenAI(
    base_url="https://oai.helicone.ai/v1",
    default_headers={
        "Helicone-Auth": f"Bearer {HELICONE_API_KEY}",
        "Helicone-User-Id": user_id,  # Track by user
        "Helicone-Property-Feature": "chat",  # Custom property
        "Helicone-Cache-Enabled": "true",  # Enable caching
    }
)

# Use OpenAI as normal - Helicone proxies and logs everything
response = client.chat.completions.create(
    model="gpt-4-turbo",
    messages=[{"role": "user", "content": "Hello!"}]
)

Helicone Custom Properties for Analysis

def chat_with_tracking(user_id: str, feature: str, message: str):
    """Chat with full Helicone tracking."""
    return client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{"role": "user", "content": message}],
        extra_headers={
            "Helicone-User-Id": user_id,
            "Helicone-Property-Feature": feature,
            "Helicone-Property-Environment": "production",
            "Helicone-Property-Version": "v2.1.0",
        }
    )

# Now you can filter and analyze by any property in the dashboard

LangSmith: LangChain Native

Best for: Teams heavily invested in LangChain.

import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-ai-app"

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Everything is automatically traced
llm = ChatOpenAI(model="gpt-4-turbo")

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{input}")
])

chain = prompt | llm | StrOutputParser()

# This trace appears in LangSmith automatically
result = chain.invoke({"input": "What is machine learning?"})

LangSmith Evaluation

from langsmith import Client
from langsmith.evaluation import evaluate

client = Client()

# Create a dataset
dataset = client.create_dataset("qa-eval")
client.create_examples(
    inputs=[{"question": "What is ML?"}],
    outputs=[{"answer": "Machine learning is..."}],
    dataset_id=dataset.id
)

# Define evaluator
def relevance_evaluator(run, example):
    prediction = run.outputs["output"]
    reference = example.outputs["answer"]
    
    # Your evaluation logic
    score = calculate_similarity(prediction, reference)
    return {"score": score, "key": "relevance"}

# Run evaluation
results = evaluate(
    lambda x: chain.invoke(x),
    data=dataset.name,
    evaluators=[relevance_evaluator],
)

Arize Phoenix: ML + LLM Unified

Best for: Teams with both traditional ML and LLM workloads.

import phoenix as px
from phoenix.trace.openai import OpenAIInstrumentor
from openai import OpenAI

# Start Phoenix locally
session = px.launch_app()

# Instrument OpenAI
OpenAIInstrumentor().instrument()

client = OpenAI()

# All calls are now traced
response = client.chat.completions.create(
    model="gpt-4-turbo",
    messages=[{"role": "user", "content": "Explain quantum computing"}]
)

# View traces at http://localhost:6006
print(f"Phoenix UI: {session.url}")

Phoenix Embedding Analysis

import phoenix as px
import pandas as pd

# Analyze embedding drift
embeddings_df = pd.DataFrame({
    "embedding": embeddings_list,
    "text": texts,
    "timestamp": timestamps,
})

# Launch with embedding analysis
session = px.launch_app(trace_dataset=None)

# Visualize embedding clusters and drift over time
px.Client().log_embeddings(
    embeddings_df,
    embedding_column="embedding",
    text_column="text",
)

OpenLLMetry: OpenTelemetry Native

Best for: Teams with existing OTel infrastructure.

from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from openai import OpenAI

# Initialize with any OTel-compatible backend
Traceloop.init(
    app_name="my-ai-app",
    api_endpoint="https://your-otel-collector:4318"
)

client = OpenAI()

@workflow(name="document_qa")
def answer_question(question: str) -> str:
    
    @task(name="retrieve_context")
    def retrieve(q: str):
        return search_documents(q)
    
    @task(name="generate_answer")
    def generate(q: str, context: str):
        response = client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[
                {"role": "system", "content": f"Context: {context}"},
                {"role": "user", "content": q}
            ]
        )
        return response.choices[0].message.content
    
    context = retrieve(question)
    return generate(question, context)

# Traces go to your OTel backend (Jaeger, Grafana, Datadog, etc.)

Implementing Evaluation Pipelines

Quality evaluation is the most underinvested area in AI engineering. Here’s how to build a robust evaluation pipeline:

LLM-as-Judge Pattern

from openai import OpenAI
from pydantic import BaseModel
from typing import Literal

client = OpenAI()

class EvaluationResult(BaseModel):
    relevance: float  # 0-1
    faithfulness: float  # 0-1
    coherence: float  # 0-1
    reasoning: str

def evaluate_response(
    question: str,
    answer: str,
    context: str = None
) -> EvaluationResult:
    """Use GPT-4 as a judge to evaluate response quality."""
    
    eval_prompt = f"""Evaluate this AI response on three dimensions.
    
Question: {question}
Answer: {answer}
{"Context (for RAG): " + context if context else ""}

Rate each dimension from 0 to 1:
1. Relevance: Does the answer address the question?
2. Faithfulness: Is the answer grounded in the context (if provided)?
3. Coherence: Is the answer well-structured and clear?

Respond in JSON format:
{{"relevance": 0.0-1.0, "faithfulness": 0.0-1.0, "coherence": 0.0-1.0, "reasoning": "..."}}"""

    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{"role": "user", "content": eval_prompt}],
        response_format={"type": "json_object"}
    )
    
    result = json.loads(response.choices[0].message.content)
    return EvaluationResult(**result)

# Usage
eval_result = evaluate_response(
    question="What is the refund policy?",
    answer="We offer a 30-day money-back guarantee on all products.",
    context="Our refund policy allows returns within 30 days for a full refund."
)

print(f"Relevance: {eval_result.relevance}")
print(f"Faithfulness: {eval_result.faithfulness}")
print(f"Reasoning: {eval_result.reasoning}")

Automated Evaluation Pipeline

import asyncio
from dataclasses import dataclass
from typing import List, Callable

@dataclass
class TestCase:
    id: str
    input: dict
    expected_output: str = None
    metadata: dict = None

@dataclass
class EvalResult:
    test_id: str
    output: str
    scores: dict
    latency_ms: float

class EvaluationPipeline:
    def __init__(self, evaluators: List[Callable]):
        self.evaluators = evaluators
    
    async def evaluate_single(
        self,
        test_case: TestCase,
        model_fn: Callable
    ) -> EvalResult:
        import time
        
        start = time.time()
        output = await model_fn(test_case.input)
        latency_ms = (time.time() - start) * 1000
        
        scores = {}
        for evaluator in self.evaluators:
            name = evaluator.__name__
            scores[name] = await evaluator(
                test_case.input,
                output,
                test_case.expected_output
            )
        
        return EvalResult(
            test_id=test_case.id,
            output=output,
            scores=scores,
            latency_ms=latency_ms
        )
    
    async def run(
        self,
        test_cases: List[TestCase],
        model_fn: Callable,
        concurrency: int = 5
    ) -> List[EvalResult]:
        semaphore = asyncio.Semaphore(concurrency)
        
        async def bounded_eval(tc):
            async with semaphore:
                return await self.evaluate_single(tc, model_fn)
        
        results = await asyncio.gather(
            *[bounded_eval(tc) for tc in test_cases]
        )
        
        return results
    
    def summary(self, results: List[EvalResult]) -> dict:
        """Generate evaluation summary."""
        all_scores = {}
        for result in results:
            for name, score in result.scores.items():
                if name not in all_scores:
                    all_scores[name] = []
                all_scores[name].append(score)
        
        summary = {
            "total_tests": len(results),
            "avg_latency_ms": sum(r.latency_ms for r in results) / len(results),
            "scores": {
                name: {
                    "mean": sum(scores) / len(scores),
                    "min": min(scores),
                    "max": max(scores)
                }
                for name, scores in all_scores.items()
            }
        }
        
        return summary

# Evaluator functions
async def relevance_eval(input, output, expected):
    # Your relevance evaluation logic
    result = await evaluate_response(input["query"], output, input.get("context"))
    return result.relevance

async def length_eval(input, output, expected):
    # Simple length check
    return 1.0 if 50 < len(output) < 500 else 0.5

# Run evaluation
pipeline = EvaluationPipeline([relevance_eval, length_eval])
results = asyncio.run(pipeline.run(test_cases, my_model_fn))
print(pipeline.summary(results))

Cost Tracking and Optimization

Cost is the silent killer of AI projects. Here's how to track and optimize it:

from dataclasses import dataclass
from datetime import datetime
from typing import Dict
import json

# Pricing per 1M tokens (as of Oct 2025)
MODEL_PRICING = {
    "gpt-4-turbo": {"input": 10.00, "output": 30.00},
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
    "claude-3-haiku": {"input": 0.25, "output": 1.25},
}

@dataclass
class CostTracker:
    """Track and analyze LLM costs."""
    
    def __init__(self):
        self.usage_log = []
    
    def log_usage(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        user_id: str = None,
        feature: str = None
    ):
        pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
        
        cost = (
            (input_tokens / 1_000_000) * pricing["input"] +
            (output_tokens / 1_000_000) * pricing["output"]
        )
        
        self.usage_log.append({
            "timestamp": datetime.now().isoformat(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost": cost,
            "user_id": user_id,
            "feature": feature
        })
        
        return cost
    
    def daily_summary(self) -> Dict:
        """Get daily cost summary."""
        today = datetime.now().date().isoformat()
        today_logs = [
            l for l in self.usage_log 
            if l["timestamp"].startswith(today)
        ]
        
        total_cost = sum(l["cost"] for l in today_logs)
        by_model = {}
        by_feature = {}
        by_user = {}
        
        for log in today_logs:
            # By model
            model = log["model"]
            by_model[model] = by_model.get(model, 0) + log["cost"]
            
            # By feature
            feature = log.get("feature", "unknown")
            by_feature[feature] = by_feature.get(feature, 0) + log["cost"]
            
            # By user
            user = log.get("user_id", "anonymous")
            by_user[user] = by_user.get(user, 0) + log["cost"]
        
        return {
            "date": today,
            "total_cost": round(total_cost, 4),
            "request_count": len(today_logs),
            "avg_cost_per_request": round(total_cost / len(today_logs), 6) if today_logs else 0,
            "by_model": by_model,
            "by_feature": by_feature,
            "top_users": dict(sorted(by_user.items(), key=lambda x: -x[1])[:10])
        }
    
    def set_alert(self, daily_budget: float, callback):
        """Alert when daily budget exceeded."""
        summary = self.daily_summary()
        if summary["total_cost"] > daily_budget:
            callback(f"Budget exceeded: ${summary['total_cost']:.2f} > ${daily_budget}")

# Usage
tracker = CostTracker()

# After each LLM call
cost = tracker.log_usage(
    model="gpt-4-turbo",
    input_tokens=response.usage.prompt_tokens,
    output_tokens=response.usage.completion_tokens,
    user_id="user_123",
    feature="document-qa"
)

# Daily report
print(json.dumps(tracker.daily_summary(), indent=2))

Production Alerting Strategy

Set up alerts that actually matter:

from dataclasses import dataclass
from enum import Enum
from typing import Callable, List
import asyncio

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    name: str
    severity: AlertSeverity
    message: str
    value: float
    threshold: float

class AlertManager:
    def __init__(self, notification_handler: Callable):
        self.rules = []
        self.notify = notification_handler
    
    def add_rule(
        self,
        name: str,
        metric_fn: Callable,
        threshold: float,
        comparison: str,  # "gt", "lt", "eq"
        severity: AlertSeverity
    ):
        self.rules.append({
            "name": name,
            "metric_fn": metric_fn,
            "threshold": threshold,
            "comparison": comparison,
            "severity": severity
        })
    
    async def check_all(self) -> List[Alert]:
        alerts = []
        
        for rule in self.rules:
            value = await rule["metric_fn"]()
            triggered = False
            
            if rule["comparison"] == "gt" and value > rule["threshold"]:
                triggered = True
            elif rule["comparison"] == "lt" and value < rule["threshold"]:
                triggered = True
            elif rule["comparison"] == "eq" and value == rule["threshold"]:
                triggered = True
            
            if triggered:
                alert = Alert(
                    name=rule["name"],
                    severity=rule["severity"],
                    message=f"{rule['name']}: {value} {'>' if rule['comparison'] == 'gt' else '<'} {rule['threshold']}",
                    value=value,
                    threshold=rule["threshold"]
                )
                alerts.append(alert)
                await self.notify(alert)
        
        return alerts

# Example metric functions
async def get_error_rate():
    # Query your metrics store
    return 0.02  # 2%

async def get_daily_cost():
    return tracker.daily_summary()["total_cost"]

async def get_avg_latency():
    # Query your metrics store
    return 2.5  # seconds

async def get_quality_score():
    # Query your evaluation results
    return 0.82

# Notification handler
async def send_to_slack(alert: Alert):
    # Your Slack webhook logic
    print(f"[{alert.severity.value.upper()}] {alert.message}")

# Setup
alert_manager = AlertManager(send_to_slack)

alert_manager.add_rule(
    name="High Error Rate",
    metric_fn=get_error_rate,
    threshold=0.01,
    comparison="gt",
    severity=AlertSeverity.CRITICAL
)

alert_manager.add_rule(
    name="Daily Budget Exceeded",
    metric_fn=get_daily_cost,
    threshold=200.0,
    comparison="gt",
    severity=AlertSeverity.WARNING
)

alert_manager.add_rule(
    name="Quality Degradation",
    metric_fn=get_quality_score,
    threshold=0.80,
    comparison="lt",
    severity=AlertSeverity.WARNING
)

alert_manager.add_rule(
    name="High Latency",
    metric_fn=get_avg_latency,
    threshold=3.0,
    comparison="gt",
    severity=AlertSeverity.WARNING
)

# Run checks (in production, run on a schedule)
asyncio.run(alert_manager.check_all())

My Recommended Stack

After evaluating all options, here's what I recommend for different team sizes:

Solo Developer / Small Team (<5 engineers)

  • Primary: Helicone (quick setup, good free tier)
  • Evaluation: Manual + simple LLM-as-judge
  • Cost: ~$0-50/month

Growing Startup (5-20 engineers)

  • Primary: Langfuse (self-hosted or cloud)
  • Evaluation: Langfuse evals + custom pipeline
  • Integration: OpenTelemetry for unified observability
  • Cost: ~$100-500/month

Enterprise (20+ engineers)

  • Primary: Langfuse self-hosted or Arize Phoenix
  • Evaluation: Full evaluation pipeline with datasets
  • Integration: OpenLLMetry → existing OTel infrastructure
  • Alerting: Integration with PagerDuty/OpsGenie
  • Cost: Variable based on scale

Key Takeaways

  • AI observability ≠ traditional observability - Add prompts, quality, and cost tracking
  • Quality metrics are non-negotiable - Use LLM-as-judge patterns for automated evaluation
  • Cost visibility from day one - Track by user, feature, and model
  • Langfuse is the current leader - Open source, full-featured, self-hostable
  • Helicone for quick starts - One-line integration, great for prototypes
  • Build evaluation pipelines early - You can't improve what you don't measure
  • Alert on quality, not just errors - LLMs fail silently

References & Further Reading


Observability is what separates hobby AI projects from production systems. Start with the basics—tracing and cost tracking—then build up to quality evaluation and alerting. The tools are mature enough now that there's no excuse for flying blind.

Building observable AI systems? I'd love to hear about your setup. Connect on LinkedIn or drop a comment below.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.