Here’s a truth that took me six months to learn the hard way: traditional observability doesn’t work for AI applications. You can have perfect uptime, sub-100ms latency, and zero errors—and your AI product can still be completely broken.
Why? Because LLMs fail differently. They don’t throw errors; they confidently hallucinate. They don’t crash; they drift. They don’t timeout; they slowly become irrelevant as your data changes.
This guide covers everything you need to know about observability for AI applications: the metrics that matter, the tools available, implementation patterns, and the hard-won lessons from running LLM applications in production.
What you’ll learn: How to build comprehensive observability for AI applications, compare leading tools, track the right metrics, and implement production-grade monitoring from day one.
Why AI Observability is Different
Traditional observability focuses on three pillars: logs, metrics, and traces. For AI applications, we need to add three more:
- Prompt tracking – What you asked the model
- Output quality – How good was the response
- Cost attribution – Who/what is spending money
The Metrics That Actually Matter
After running LLM applications for over a year, here are the metrics I check every single day:
1. Performance Metrics
| Metric | What It Measures | Target | Why It Matters |
|---|---|---|---|
| Latency (P50/P95/P99) | End-to-end response time | P95 < 3s | User experience |
| Time to First Token (TTFT) | How fast streaming starts | < 500ms | Perceived responsiveness |
| Tokens per Second | Streaming throughput | > 30 tok/s | Reading speed match |
| Queue Time | Wait before processing | < 100ms | Capacity planning |
2. Cost Metrics
| Metric | What It Measures | Action Threshold |
|---|---|---|
| Cost per Request | Average $/request | > 2x baseline |
| Daily/Monthly Spend | Total API costs | > budget |
| Token Ratio | Input vs output tokens | > 10:1 (investigate prompts) |
| Cost per User | Attribution by user | Outliers > 5x avg |
| Cost per Feature | Which features cost most | ROI analysis |
3. Quality Metrics
| Metric | How to Measure | Target |
|---|---|---|
| Relevance Score | LLM-as-judge or embedding similarity | > 0.85 |
| Faithfulness | Does output match source docs (RAG) | > 0.90 |
| User Feedback | Thumbs up/down, ratings | > 80% positive |
| Task Completion | Did user achieve their goal | > 70% |
| Hallucination Rate | Factual errors detected | < 5% |
4. Operational Metrics
| Metric | What It Measures | Alert Threshold |
|---|---|---|
| Error Rate | API failures, timeouts | > 1% |
| Rate Limit Hits | 429 responses | > 0.5% |
| Cache Hit Rate | Semantic cache effectiveness | < 20% (investigate) |
| Model Distribution | Which models are being used | Unexpected changes |
Observability Tools Comparison
The LLM observability space has exploded in 2024-2025. Here’s an honest comparison of the leading tools:
Langfuse: The Open Source Leader
Best for: Teams wanting self-hosting, full control, and comprehensive features.
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from openai import OpenAI
langfuse = Langfuse()
client = OpenAI()
@observe()
def rag_pipeline(query: str) -> str:
"""Complete RAG pipeline with full observability."""
# Track retrieval step
with langfuse_context.observe(name="retrieval") as span:
docs = retrieve_documents(query)
span.update(
input=query,
output={"doc_count": len(docs)},
metadata={"retriever": "pinecone"}
)
# Track generation step
with langfuse_context.observe(name="generation") as span:
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": f"Context: {docs}"},
{"role": "user", "content": query}
]
)
result = response.choices[0].message.content
span.update(
input={"query": query, "context_length": len(docs)},
output=result,
usage={
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
}
)
# Score the trace
langfuse_context.score_current_trace(
name="user-feedback",
value=1, # Will be updated when user provides feedback
comment="Pending user feedback"
)
return result
# Usage
answer = rag_pipeline("What is our refund policy?")
Langfuse Evaluation Framework
from langfuse import Langfuse
langfuse = Langfuse()
# Create evaluation dataset
dataset = langfuse.create_dataset(name="qa-evaluation-v1")
# Add test cases
dataset.add_item(
input={"query": "What is the return policy?"},
expected_output="30-day money-back guarantee",
metadata={"category": "policy"}
)
# Run evaluation
def evaluate_model(item):
output = rag_pipeline(item.input["query"])
# LLM-as-judge for relevance
judge_response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[{
"role": "user",
"content": f"""Rate the relevance of this answer (0-1):
Question: {item.input['query']}
Expected: {item.expected_output}
Actual: {output}
Return only a number between 0 and 1."""
}]
)
score = float(judge_response.choices[0].message.content.strip())
return {
"output": output,
"scores": {"relevance": score}
}
# Run on dataset
for item in dataset.items:
result = evaluate_model(item)
langfuse.score(
trace_id=langfuse_context.get_current_trace_id(),
name="relevance",
value=result["scores"]["relevance"]
)
Helicone: The Simplest Setup
Best for: Quick integration with minimal code changes.
# Just change your base URL - that's it!
from openai import OpenAI
client = OpenAI(
base_url="https://oai.helicone.ai/v1",
default_headers={
"Helicone-Auth": f"Bearer {HELICONE_API_KEY}",
"Helicone-User-Id": user_id, # Track by user
"Helicone-Property-Feature": "chat", # Custom property
"Helicone-Cache-Enabled": "true", # Enable caching
}
)
# Use OpenAI as normal - Helicone proxies and logs everything
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": "Hello!"}]
)
Helicone Custom Properties for Analysis
def chat_with_tracking(user_id: str, feature: str, message: str):
"""Chat with full Helicone tracking."""
return client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": message}],
extra_headers={
"Helicone-User-Id": user_id,
"Helicone-Property-Feature": feature,
"Helicone-Property-Environment": "production",
"Helicone-Property-Version": "v2.1.0",
}
)
# Now you can filter and analyze by any property in the dashboard
LangSmith: LangChain Native
Best for: Teams heavily invested in LangChain.
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-ai-app"
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Everything is automatically traced
llm = ChatOpenAI(model="gpt-4-turbo")
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{input}")
])
chain = prompt | llm | StrOutputParser()
# This trace appears in LangSmith automatically
result = chain.invoke({"input": "What is machine learning?"})
LangSmith Evaluation
from langsmith import Client
from langsmith.evaluation import evaluate
client = Client()
# Create a dataset
dataset = client.create_dataset("qa-eval")
client.create_examples(
inputs=[{"question": "What is ML?"}],
outputs=[{"answer": "Machine learning is..."}],
dataset_id=dataset.id
)
# Define evaluator
def relevance_evaluator(run, example):
prediction = run.outputs["output"]
reference = example.outputs["answer"]
# Your evaluation logic
score = calculate_similarity(prediction, reference)
return {"score": score, "key": "relevance"}
# Run evaluation
results = evaluate(
lambda x: chain.invoke(x),
data=dataset.name,
evaluators=[relevance_evaluator],
)
Arize Phoenix: ML + LLM Unified
Best for: Teams with both traditional ML and LLM workloads.
import phoenix as px
from phoenix.trace.openai import OpenAIInstrumentor
from openai import OpenAI
# Start Phoenix locally
session = px.launch_app()
# Instrument OpenAI
OpenAIInstrumentor().instrument()
client = OpenAI()
# All calls are now traced
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": "Explain quantum computing"}]
)
# View traces at http://localhost:6006
print(f"Phoenix UI: {session.url}")
Phoenix Embedding Analysis
import phoenix as px
import pandas as pd
# Analyze embedding drift
embeddings_df = pd.DataFrame({
"embedding": embeddings_list,
"text": texts,
"timestamp": timestamps,
})
# Launch with embedding analysis
session = px.launch_app(trace_dataset=None)
# Visualize embedding clusters and drift over time
px.Client().log_embeddings(
embeddings_df,
embedding_column="embedding",
text_column="text",
)
OpenLLMetry: OpenTelemetry Native
Best for: Teams with existing OTel infrastructure.
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from openai import OpenAI
# Initialize with any OTel-compatible backend
Traceloop.init(
app_name="my-ai-app",
api_endpoint="https://your-otel-collector:4318"
)
client = OpenAI()
@workflow(name="document_qa")
def answer_question(question: str) -> str:
@task(name="retrieve_context")
def retrieve(q: str):
return search_documents(q)
@task(name="generate_answer")
def generate(q: str, context: str):
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": f"Context: {context}"},
{"role": "user", "content": q}
]
)
return response.choices[0].message.content
context = retrieve(question)
return generate(question, context)
# Traces go to your OTel backend (Jaeger, Grafana, Datadog, etc.)
Implementing Evaluation Pipelines
Quality evaluation is the most underinvested area in AI engineering. Here’s how to build a robust evaluation pipeline:
LLM-as-Judge Pattern
from openai import OpenAI
from pydantic import BaseModel
from typing import Literal
client = OpenAI()
class EvaluationResult(BaseModel):
relevance: float # 0-1
faithfulness: float # 0-1
coherence: float # 0-1
reasoning: str
def evaluate_response(
question: str,
answer: str,
context: str = None
) -> EvaluationResult:
"""Use GPT-4 as a judge to evaluate response quality."""
eval_prompt = f"""Evaluate this AI response on three dimensions.
Question: {question}
Answer: {answer}
{"Context (for RAG): " + context if context else ""}
Rate each dimension from 0 to 1:
1. Relevance: Does the answer address the question?
2. Faithfulness: Is the answer grounded in the context (if provided)?
3. Coherence: Is the answer well-structured and clear?
Respond in JSON format:
{{"relevance": 0.0-1.0, "faithfulness": 0.0-1.0, "coherence": 0.0-1.0, "reasoning": "..."}}"""
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": eval_prompt}],
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return EvaluationResult(**result)
# Usage
eval_result = evaluate_response(
question="What is the refund policy?",
answer="We offer a 30-day money-back guarantee on all products.",
context="Our refund policy allows returns within 30 days for a full refund."
)
print(f"Relevance: {eval_result.relevance}")
print(f"Faithfulness: {eval_result.faithfulness}")
print(f"Reasoning: {eval_result.reasoning}")
Automated Evaluation Pipeline
import asyncio
from dataclasses import dataclass
from typing import List, Callable
@dataclass
class TestCase:
id: str
input: dict
expected_output: str = None
metadata: dict = None
@dataclass
class EvalResult:
test_id: str
output: str
scores: dict
latency_ms: float
class EvaluationPipeline:
def __init__(self, evaluators: List[Callable]):
self.evaluators = evaluators
async def evaluate_single(
self,
test_case: TestCase,
model_fn: Callable
) -> EvalResult:
import time
start = time.time()
output = await model_fn(test_case.input)
latency_ms = (time.time() - start) * 1000
scores = {}
for evaluator in self.evaluators:
name = evaluator.__name__
scores[name] = await evaluator(
test_case.input,
output,
test_case.expected_output
)
return EvalResult(
test_id=test_case.id,
output=output,
scores=scores,
latency_ms=latency_ms
)
async def run(
self,
test_cases: List[TestCase],
model_fn: Callable,
concurrency: int = 5
) -> List[EvalResult]:
semaphore = asyncio.Semaphore(concurrency)
async def bounded_eval(tc):
async with semaphore:
return await self.evaluate_single(tc, model_fn)
results = await asyncio.gather(
*[bounded_eval(tc) for tc in test_cases]
)
return results
def summary(self, results: List[EvalResult]) -> dict:
"""Generate evaluation summary."""
all_scores = {}
for result in results:
for name, score in result.scores.items():
if name not in all_scores:
all_scores[name] = []
all_scores[name].append(score)
summary = {
"total_tests": len(results),
"avg_latency_ms": sum(r.latency_ms for r in results) / len(results),
"scores": {
name: {
"mean": sum(scores) / len(scores),
"min": min(scores),
"max": max(scores)
}
for name, scores in all_scores.items()
}
}
return summary
# Evaluator functions
async def relevance_eval(input, output, expected):
# Your relevance evaluation logic
result = await evaluate_response(input["query"], output, input.get("context"))
return result.relevance
async def length_eval(input, output, expected):
# Simple length check
return 1.0 if 50 < len(output) < 500 else 0.5
# Run evaluation
pipeline = EvaluationPipeline([relevance_eval, length_eval])
results = asyncio.run(pipeline.run(test_cases, my_model_fn))
print(pipeline.summary(results))
Cost Tracking and Optimization
Cost is the silent killer of AI projects. Here's how to track and optimize it:
from dataclasses import dataclass
from datetime import datetime
from typing import Dict
import json
# Pricing per 1M tokens (as of Oct 2025)
MODEL_PRICING = {
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-haiku": {"input": 0.25, "output": 1.25},
}
@dataclass
class CostTracker:
"""Track and analyze LLM costs."""
def __init__(self):
self.usage_log = []
def log_usage(
self,
model: str,
input_tokens: int,
output_tokens: int,
user_id: str = None,
feature: str = None
):
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = (
(input_tokens / 1_000_000) * pricing["input"] +
(output_tokens / 1_000_000) * pricing["output"]
)
self.usage_log.append({
"timestamp": datetime.now().isoformat(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": cost,
"user_id": user_id,
"feature": feature
})
return cost
def daily_summary(self) -> Dict:
"""Get daily cost summary."""
today = datetime.now().date().isoformat()
today_logs = [
l for l in self.usage_log
if l["timestamp"].startswith(today)
]
total_cost = sum(l["cost"] for l in today_logs)
by_model = {}
by_feature = {}
by_user = {}
for log in today_logs:
# By model
model = log["model"]
by_model[model] = by_model.get(model, 0) + log["cost"]
# By feature
feature = log.get("feature", "unknown")
by_feature[feature] = by_feature.get(feature, 0) + log["cost"]
# By user
user = log.get("user_id", "anonymous")
by_user[user] = by_user.get(user, 0) + log["cost"]
return {
"date": today,
"total_cost": round(total_cost, 4),
"request_count": len(today_logs),
"avg_cost_per_request": round(total_cost / len(today_logs), 6) if today_logs else 0,
"by_model": by_model,
"by_feature": by_feature,
"top_users": dict(sorted(by_user.items(), key=lambda x: -x[1])[:10])
}
def set_alert(self, daily_budget: float, callback):
"""Alert when daily budget exceeded."""
summary = self.daily_summary()
if summary["total_cost"] > daily_budget:
callback(f"Budget exceeded: ${summary['total_cost']:.2f} > ${daily_budget}")
# Usage
tracker = CostTracker()
# After each LLM call
cost = tracker.log_usage(
model="gpt-4-turbo",
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
user_id="user_123",
feature="document-qa"
)
# Daily report
print(json.dumps(tracker.daily_summary(), indent=2))
Production Alerting Strategy
Set up alerts that actually matter:
from dataclasses import dataclass
from enum import Enum
from typing import Callable, List
import asyncio
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class Alert:
name: str
severity: AlertSeverity
message: str
value: float
threshold: float
class AlertManager:
def __init__(self, notification_handler: Callable):
self.rules = []
self.notify = notification_handler
def add_rule(
self,
name: str,
metric_fn: Callable,
threshold: float,
comparison: str, # "gt", "lt", "eq"
severity: AlertSeverity
):
self.rules.append({
"name": name,
"metric_fn": metric_fn,
"threshold": threshold,
"comparison": comparison,
"severity": severity
})
async def check_all(self) -> List[Alert]:
alerts = []
for rule in self.rules:
value = await rule["metric_fn"]()
triggered = False
if rule["comparison"] == "gt" and value > rule["threshold"]:
triggered = True
elif rule["comparison"] == "lt" and value < rule["threshold"]:
triggered = True
elif rule["comparison"] == "eq" and value == rule["threshold"]:
triggered = True
if triggered:
alert = Alert(
name=rule["name"],
severity=rule["severity"],
message=f"{rule['name']}: {value} {'>' if rule['comparison'] == 'gt' else '<'} {rule['threshold']}",
value=value,
threshold=rule["threshold"]
)
alerts.append(alert)
await self.notify(alert)
return alerts
# Example metric functions
async def get_error_rate():
# Query your metrics store
return 0.02 # 2%
async def get_daily_cost():
return tracker.daily_summary()["total_cost"]
async def get_avg_latency():
# Query your metrics store
return 2.5 # seconds
async def get_quality_score():
# Query your evaluation results
return 0.82
# Notification handler
async def send_to_slack(alert: Alert):
# Your Slack webhook logic
print(f"[{alert.severity.value.upper()}] {alert.message}")
# Setup
alert_manager = AlertManager(send_to_slack)
alert_manager.add_rule(
name="High Error Rate",
metric_fn=get_error_rate,
threshold=0.01,
comparison="gt",
severity=AlertSeverity.CRITICAL
)
alert_manager.add_rule(
name="Daily Budget Exceeded",
metric_fn=get_daily_cost,
threshold=200.0,
comparison="gt",
severity=AlertSeverity.WARNING
)
alert_manager.add_rule(
name="Quality Degradation",
metric_fn=get_quality_score,
threshold=0.80,
comparison="lt",
severity=AlertSeverity.WARNING
)
alert_manager.add_rule(
name="High Latency",
metric_fn=get_avg_latency,
threshold=3.0,
comparison="gt",
severity=AlertSeverity.WARNING
)
# Run checks (in production, run on a schedule)
asyncio.run(alert_manager.check_all())
My Recommended Stack
After evaluating all options, here's what I recommend for different team sizes:
Solo Developer / Small Team (<5 engineers)
- Primary: Helicone (quick setup, good free tier)
- Evaluation: Manual + simple LLM-as-judge
- Cost: ~$0-50/month
Growing Startup (5-20 engineers)
- Primary: Langfuse (self-hosted or cloud)
- Evaluation: Langfuse evals + custom pipeline
- Integration: OpenTelemetry for unified observability
- Cost: ~$100-500/month
Enterprise (20+ engineers)
- Primary: Langfuse self-hosted or Arize Phoenix
- Evaluation: Full evaluation pipeline with datasets
- Integration: OpenLLMetry → existing OTel infrastructure
- Alerting: Integration with PagerDuty/OpsGenie
- Cost: Variable based on scale
Key Takeaways
- AI observability ≠ traditional observability - Add prompts, quality, and cost tracking
- Quality metrics are non-negotiable - Use LLM-as-judge patterns for automated evaluation
- Cost visibility from day one - Track by user, feature, and model
- Langfuse is the current leader - Open source, full-featured, self-hostable
- Helicone for quick starts - One-line integration, great for prototypes
- Build evaluation pipelines early - You can't improve what you don't measure
- Alert on quality, not just errors - LLMs fail silently
References & Further Reading
- Langfuse Documentation - langfuse.com/docs
- Helicone - helicone.ai
- LangSmith - docs.smith.langchain.com
- Arize Phoenix - docs.arize.com/phoenix
- OpenLLMetry - github.com/traceloop/openllmetry
- Weights & Biases Weave - wandb.ai/weave
- OpenTelemetry - opentelemetry.io
- RAGAS (RAG Evaluation) - docs.ragas.io
Observability is what separates hobby AI projects from production systems. Start with the basics—tracing and cost tracking—then build up to quality evaluation and alerting. The tools are mature enough now that there's no excuse for flying blind.
Building observable AI systems? I'd love to hear about your setup. Connect on LinkedIn or drop a comment below.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.