Introduction: Production LLM applications require comprehensive logging and tracing to debug issues, monitor performance, and understand user interactions. Unlike traditional applications, LLM systems have unique logging needs: capturing prompts and responses, tracking token usage, measuring latency across chains, and correlating requests through multi-step workflows. This guide covers practical logging patterns: structured request/response logging, distributed tracing for LLM chains, metrics collection, cost tracking, and production-ready observability pipelines that provide visibility into your AI systems.

Structured Request Logging
from dataclasses import dataclass, field, asdict
from typing import Any, Optional
from datetime import datetime
import json
import uuid
import logging
@dataclass
class LLMRequestLog:
"""Structured log for LLM requests."""
request_id: str
timestamp: datetime
model: str
messages: list[dict]
# Request parameters
temperature: Optional[float] = None
max_tokens: Optional[int] = None
# Metadata
user_id: Optional[str] = None
session_id: Optional[str] = None
trace_id: Optional[str] = None
def to_dict(self) -> dict:
"""Convert to dictionary for logging."""
data = asdict(self)
data["timestamp"] = self.timestamp.isoformat()
return data
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict())
@dataclass
class LLMResponseLog:
"""Structured log for LLM responses."""
request_id: str
timestamp: datetime
# Response data
content: str
finish_reason: str
# Usage metrics
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
# Performance
latency_ms: float = 0.0
# Cost (if calculated)
estimated_cost: Optional[float] = None
# Error info
error: Optional[str] = None
def to_dict(self) -> dict:
data = asdict(self)
data["timestamp"] = self.timestamp.isoformat()
return data
class LLMLogger:
"""Logger for LLM requests and responses."""
def __init__(
self,
logger_name: str = "llm",
log_prompts: bool = True,
log_responses: bool = True,
mask_pii: bool = False
):
self.logger = logging.getLogger(logger_name)
self.log_prompts = log_prompts
self.log_responses = log_responses
self.mask_pii = mask_pii
def log_request(
self,
model: str,
messages: list[dict],
user_id: str = None,
session_id: str = None,
trace_id: str = None,
**kwargs
) -> str:
"""Log an LLM request, return request_id."""
request_id = str(uuid.uuid4())
# Optionally mask PII in messages
logged_messages = messages
if self.mask_pii:
logged_messages = self._mask_pii(messages)
log_entry = LLMRequestLog(
request_id=request_id,
timestamp=datetime.now(),
model=model,
messages=logged_messages if self.log_prompts else [],
temperature=kwargs.get("temperature"),
max_tokens=kwargs.get("max_tokens"),
user_id=user_id,
session_id=session_id,
trace_id=trace_id
)
self.logger.info(
"LLM_REQUEST",
extra={"llm_log": log_entry.to_dict()}
)
return request_id
def log_response(
self,
request_id: str,
response: Any,
latency_ms: float,
error: str = None
):
"""Log an LLM response."""
if error:
log_entry = LLMResponseLog(
request_id=request_id,
timestamp=datetime.now(),
content="",
finish_reason="error",
latency_ms=latency_ms,
error=error
)
else:
content = response.choices[0].message.content
if self.mask_pii:
content = self._mask_pii_text(content)
log_entry = LLMResponseLog(
request_id=request_id,
timestamp=datetime.now(),
content=content if self.log_responses else "[REDACTED]",
finish_reason=response.choices[0].finish_reason,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
latency_ms=latency_ms
)
self.logger.info(
"LLM_RESPONSE",
extra={"llm_log": log_entry.to_dict()}
)
def _mask_pii(self, messages: list[dict]) -> list[dict]:
"""Mask PII in messages."""
import re
masked = []
for msg in messages:
content = msg.get("content", "")
masked_content = self._mask_pii_text(content)
masked.append({**msg, "content": masked_content})
return masked
def _mask_pii_text(self, text: str) -> str:
"""Mask PII patterns in text."""
import re
# Email
text = re.sub(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'[EMAIL]',
text
)
# Phone
text = re.sub(
r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'[PHONE]',
text
)
# SSN
text = re.sub(
r'\b\d{3}-\d{2}-\d{4}\b',
'[SSN]',
text
)
return text
# Logging client wrapper
class LoggingLLMClient:
"""LLM client with automatic logging."""
def __init__(
self,
client: Any,
logger: LLMLogger = None
):
self.client = client
self.logger = logger or LLMLogger()
async def chat_completion(
self,
model: str,
messages: list[dict],
user_id: str = None,
session_id: str = None,
trace_id: str = None,
**kwargs
) -> Any:
"""Create chat completion with logging."""
request_id = self.logger.log_request(
model=model,
messages=messages,
user_id=user_id,
session_id=session_id,
trace_id=trace_id,
**kwargs
)
start = datetime.now()
try:
response = await self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
latency = (datetime.now() - start).total_seconds() * 1000
self.logger.log_response(request_id, response, latency)
return response
except Exception as e:
latency = (datetime.now() - start).total_seconds() * 1000
self.logger.log_response(request_id, None, latency, error=str(e))
raise
Distributed Tracing
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from contextlib import contextmanager
import uuid
@dataclass
class Span:
"""A span in a distributed trace."""
span_id: str
trace_id: str
parent_id: Optional[str]
name: str
start_time: datetime
end_time: Optional[datetime] = None
# Attributes
attributes: dict = field(default_factory=dict)
# Events
events: list[dict] = field(default_factory=list)
# Status
status: str = "ok"
error: Optional[str] = None
@property
def duration_ms(self) -> float:
if not self.end_time:
return 0.0
return (self.end_time - self.start_time).total_seconds() * 1000
def set_attribute(self, key: str, value: Any):
self.attributes[key] = value
def add_event(self, name: str, attributes: dict = None):
self.events.append({
"name": name,
"timestamp": datetime.now().isoformat(),
"attributes": attributes or {}
})
def set_error(self, error: str):
self.status = "error"
self.error = error
def end(self):
self.end_time = datetime.now()
class Tracer:
"""Distributed tracer for LLM applications."""
def __init__(self, service_name: str = "llm-service"):
self.service_name = service_name
self.spans: dict[str, Span] = {}
self.current_span: Optional[Span] = None
def start_trace(self, name: str) -> Span:
"""Start a new trace."""
trace_id = str(uuid.uuid4())
return self.start_span(name, trace_id=trace_id)
def start_span(
self,
name: str,
trace_id: str = None,
parent_id: str = None
) -> Span:
"""Start a new span."""
span = Span(
span_id=str(uuid.uuid4()),
trace_id=trace_id or (self.current_span.trace_id if self.current_span else str(uuid.uuid4())),
parent_id=parent_id or (self.current_span.span_id if self.current_span else None),
name=name,
start_time=datetime.now()
)
self.spans[span.span_id] = span
self.current_span = span
return span
@contextmanager
def span(self, name: str):
"""Context manager for spans."""
span = self.start_span(name)
try:
yield span
except Exception as e:
span.set_error(str(e))
raise
finally:
span.end()
self.current_span = self.spans.get(span.parent_id)
def get_trace(self, trace_id: str) -> list[Span]:
"""Get all spans for a trace."""
return [
span for span in self.spans.values()
if span.trace_id == trace_id
]
class LLMTracer(Tracer):
"""Tracer with LLM-specific instrumentation."""
def __init__(self, service_name: str = "llm-service"):
super().__init__(service_name)
@contextmanager
def llm_call(
self,
model: str,
operation: str = "chat_completion"
):
"""Trace an LLM call."""
with self.span(f"llm.{operation}") as span:
span.set_attribute("llm.model", model)
span.set_attribute("llm.operation", operation)
yield span
@contextmanager
def chain_step(self, step_name: str):
"""Trace a chain step."""
with self.span(f"chain.{step_name}") as span:
span.set_attribute("chain.step", step_name)
yield span
@contextmanager
def retrieval(self, source: str):
"""Trace a retrieval operation."""
with self.span("retrieval") as span:
span.set_attribute("retrieval.source", source)
yield span
# Traced LLM client
class TracedLLMClient:
"""LLM client with distributed tracing."""
def __init__(
self,
client: Any,
tracer: LLMTracer = None
):
self.client = client
self.tracer = tracer or LLMTracer()
async def chat_completion(
self,
model: str,
messages: list[dict],
**kwargs
) -> Any:
"""Create chat completion with tracing."""
with self.tracer.llm_call(model) as span:
span.set_attribute("llm.message_count", len(messages))
try:
response = await self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
# Record usage
span.set_attribute("llm.prompt_tokens", response.usage.prompt_tokens)
span.set_attribute("llm.completion_tokens", response.usage.completion_tokens)
span.set_attribute("llm.total_tokens", response.usage.total_tokens)
return response
except Exception as e:
span.set_error(str(e))
raise
Metrics Collection
from dataclasses import dataclass, field
from typing import Any, Callable
from datetime import datetime, timedelta
from collections import defaultdict
import threading
@dataclass
class MetricValue:
"""A metric value with timestamp."""
value: float
timestamp: datetime
labels: dict = field(default_factory=dict)
class Counter:
"""A counter metric."""
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
self._values: dict[tuple, float] = defaultdict(float)
self._lock = threading.Lock()
def inc(self, value: float = 1.0, **labels):
"""Increment counter."""
key = tuple(sorted(labels.items()))
with self._lock:
self._values[key] += value
def get(self, **labels) -> float:
"""Get counter value."""
key = tuple(sorted(labels.items()))
return self._values.get(key, 0.0)
class Histogram:
"""A histogram metric."""
def __init__(
self,
name: str,
description: str = "",
buckets: list[float] = None
):
self.name = name
self.description = description
self.buckets = buckets or [0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]
self._counts: dict[tuple, dict[float, int]] = defaultdict(
lambda: {b: 0 for b in self.buckets + [float('inf')]}
)
self._sums: dict[tuple, float] = defaultdict(float)
self._totals: dict[tuple, int] = defaultdict(int)
self._lock = threading.Lock()
def observe(self, value: float, **labels):
"""Record an observation."""
key = tuple(sorted(labels.items()))
with self._lock:
self._sums[key] += value
self._totals[key] += 1
for bucket in self.buckets + [float('inf')]:
if value <= bucket:
self._counts[key][bucket] += 1
def get_percentile(self, percentile: float, **labels) -> float:
"""Get approximate percentile."""
key = tuple(sorted(labels.items()))
total = self._totals.get(key, 0)
if total == 0:
return 0.0
target = total * percentile
cumulative = 0
for bucket in self.buckets:
cumulative += self._counts[key].get(bucket, 0)
if cumulative >= target:
return bucket
return self.buckets[-1]
class LLMMetrics:
"""Metrics collector for LLM applications."""
def __init__(self):
# Request metrics
self.request_count = Counter(
"llm_requests_total",
"Total LLM requests"
)
self.request_latency = Histogram(
"llm_request_latency_seconds",
"LLM request latency",
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
# Token metrics
self.tokens_used = Counter(
"llm_tokens_total",
"Total tokens used"
)
# Error metrics
self.error_count = Counter(
"llm_errors_total",
"Total LLM errors"
)
# Cost metrics
self.estimated_cost = Counter(
"llm_cost_dollars",
"Estimated cost in dollars"
)
def record_request(
self,
model: str,
latency_seconds: float,
prompt_tokens: int,
completion_tokens: int,
success: bool = True,
error_type: str = None
):
"""Record a request with all metrics."""
# Request count
self.request_count.inc(
model=model,
status="success" if success else "error"
)
# Latency
self.request_latency.observe(
latency_seconds,
model=model
)
# Tokens
self.tokens_used.inc(prompt_tokens, model=model, type="prompt")
self.tokens_used.inc(completion_tokens, model=model, type="completion")
# Errors
if not success:
self.error_count.inc(model=model, error_type=error_type or "unknown")
# Cost estimation
cost = self._estimate_cost(model, prompt_tokens, completion_tokens)
self.estimated_cost.inc(cost, model=model)
def _estimate_cost(
self,
model: str,
prompt_tokens: int,
completion_tokens: int
) -> float:
"""Estimate cost based on model pricing."""
# Pricing per 1M tokens (example rates)
pricing = {
"gpt-4o": {"prompt": 2.50, "completion": 10.00},
"gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
"claude-3-5-sonnet": {"prompt": 3.00, "completion": 15.00},
}
rates = pricing.get(model, {"prompt": 1.0, "completion": 2.0})
prompt_cost = (prompt_tokens / 1_000_000) * rates["prompt"]
completion_cost = (completion_tokens / 1_000_000) * rates["completion"]
return prompt_cost + completion_cost
def get_summary(self) -> dict:
"""Get metrics summary."""
return {
"total_requests": sum(self.request_count._values.values()),
"total_tokens": sum(self.tokens_used._values.values()),
"total_errors": sum(self.error_count._values.values()),
"total_cost": sum(self.estimated_cost._values.values()),
"p50_latency": self.request_latency.get_percentile(0.5),
"p95_latency": self.request_latency.get_percentile(0.95),
"p99_latency": self.request_latency.get_percentile(0.99)
}
Log Storage and Export
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
import json
import asyncio
class LogStorage:
"""Base class for log storage."""
async def store(self, log_entry: dict):
raise NotImplementedError
async def query(
self,
start_time: datetime,
end_time: datetime,
filters: dict = None
) -> list[dict]:
raise NotImplementedError
class InMemoryLogStorage(LogStorage):
"""In-memory log storage for development."""
def __init__(self, max_entries: int = 10000):
self.logs: list[dict] = []
self.max_entries = max_entries
async def store(self, log_entry: dict):
self.logs.append(log_entry)
# Trim if over limit
if len(self.logs) > self.max_entries:
self.logs = self.logs[-self.max_entries:]
async def query(
self,
start_time: datetime,
end_time: datetime,
filters: dict = None
) -> list[dict]:
results = []
for log in self.logs:
timestamp = datetime.fromisoformat(log.get("timestamp", ""))
if start_time <= timestamp <= end_time:
if self._matches_filters(log, filters):
results.append(log)
return results
def _matches_filters(self, log: dict, filters: dict) -> bool:
if not filters:
return True
for key, value in filters.items():
if log.get(key) != value:
return False
return True
class FileLogStorage(LogStorage):
"""File-based log storage."""
def __init__(self, log_dir: str):
self.log_dir = log_dir
import os
os.makedirs(log_dir, exist_ok=True)
async def store(self, log_entry: dict):
import os
date = datetime.now().strftime("%Y-%m-%d")
filename = os.path.join(self.log_dir, f"llm_logs_{date}.jsonl")
async with asyncio.Lock():
with open(filename, "a") as f:
f.write(json.dumps(log_entry) + "\n")
async def query(
self,
start_time: datetime,
end_time: datetime,
filters: dict = None
) -> list[dict]:
import os
import glob
results = []
pattern = os.path.join(self.log_dir, "llm_logs_*.jsonl")
for filepath in glob.glob(pattern):
with open(filepath) as f:
for line in f:
log = json.loads(line)
timestamp = datetime.fromisoformat(log.get("timestamp", ""))
if start_time <= timestamp <= end_time:
if self._matches_filters(log, filters):
results.append(log)
return results
def _matches_filters(self, log: dict, filters: dict) -> bool:
if not filters:
return True
for key, value in filters.items():
if log.get(key) != value:
return False
return True
# Export to external systems
class LogExporter:
"""Export logs to external systems."""
def __init__(self, storage: LogStorage):
self.storage = storage
async def export_to_json(
self,
start_time: datetime,
end_time: datetime,
output_file: str
):
"""Export logs to JSON file."""
logs = await self.storage.query(start_time, end_time)
with open(output_file, "w") as f:
json.dump(logs, f, indent=2)
async def export_to_csv(
self,
start_time: datetime,
end_time: datetime,
output_file: str
):
"""Export logs to CSV file."""
import csv
logs = await self.storage.query(start_time, end_time)
if not logs:
return
# Get all keys
keys = set()
for log in logs:
keys.update(log.keys())
with open(output_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=sorted(keys))
writer.writeheader()
writer.writerows(logs)
Production Logging Service
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import Optional
from datetime import datetime, timedelta
app = FastAPI()
# Initialize components
logger = LLMLogger()
tracer = LLMTracer()
metrics = LLMMetrics()
storage = InMemoryLogStorage()
class LogQueryRequest(BaseModel):
start_time: datetime
end_time: datetime
model: Optional[str] = None
user_id: Optional[str] = None
trace_id: Optional[str] = None
@app.get("/v1/logs")
async def query_logs(
start_time: datetime = Query(...),
end_time: datetime = Query(...),
model: Optional[str] = None,
user_id: Optional[str] = None
):
"""Query logs with filters."""
filters = {}
if model:
filters["model"] = model
if user_id:
filters["user_id"] = user_id
logs = await storage.query(start_time, end_time, filters)
return {"logs": logs, "count": len(logs)}
@app.get("/v1/traces/{trace_id}")
async def get_trace(trace_id: str):
"""Get all spans for a trace."""
spans = tracer.get_trace(trace_id)
if not spans:
raise HTTPException(404, "Trace not found")
return {
"trace_id": trace_id,
"spans": [
{
"span_id": s.span_id,
"parent_id": s.parent_id,
"name": s.name,
"start_time": s.start_time.isoformat(),
"end_time": s.end_time.isoformat() if s.end_time else None,
"duration_ms": s.duration_ms,
"attributes": s.attributes,
"status": s.status
}
for s in spans
]
}
@app.get("/v1/metrics")
async def get_metrics():
"""Get current metrics."""
return metrics.get_summary()
@app.get("/v1/metrics/prometheus")
async def get_prometheus_metrics():
"""Export metrics in Prometheus format."""
lines = []
# Request count
lines.append("# HELP llm_requests_total Total LLM requests")
lines.append("# TYPE llm_requests_total counter")
for labels, value in metrics.request_count._values.items():
label_str = ",".join(f'{k}="{v}"' for k, v in labels)
lines.append(f"llm_requests_total{{{label_str}}} {value}")
# Tokens
lines.append("# HELP llm_tokens_total Total tokens used")
lines.append("# TYPE llm_tokens_total counter")
for labels, value in metrics.tokens_used._values.items():
label_str = ",".join(f'{k}="{v}"' for k, v in labels)
lines.append(f"llm_tokens_total{{{label_str}}} {value}")
return "\n".join(lines)
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenTelemetry Python: https://opentelemetry.io/docs/instrumentation/python/
- LangSmith: https://docs.smith.langchain.com/
- Weights & Biases Prompts: https://docs.wandb.ai/guides/prompts
- Prometheus Python Client: https://github.com/prometheus/client_python
Conclusion
Comprehensive logging and tracing are essential for production LLM applications. Structured logging captures the full context of each request—prompts, responses, token usage, and latency—enabling debugging and analysis. Distributed tracing connects multi-step LLM chains, showing how requests flow through your system and where time is spent. Metrics provide aggregate views for monitoring and alerting—track request rates, error rates, latencies, and costs. Consider PII masking for sensitive data and implement log retention policies. Use specialized LLM observability tools like LangSmith or Weights & Biases for advanced features like prompt versioning and evaluation. The goal is complete visibility into your AI systems—you can’t improve what you can’t measure.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply