Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

LLM Application Logging and Tracing: Building Observable AI Systems

Introduction: Production LLM applications require comprehensive logging and tracing to debug issues, monitor performance, and understand user interactions. Unlike traditional applications, LLM systems have unique logging needs: capturing prompts and responses, tracking token usage, measuring latency across chains, and correlating requests through multi-step workflows. This guide covers practical logging patterns: structured request/response logging, distributed tracing for LLM chains, metrics collection, cost tracking, and production-ready observability pipelines that provide visibility into your AI systems.

LLM Logging
Logging System: Request Capture, Distributed Tracing, Metrics Collection

Structured Request Logging

from dataclasses import dataclass, field, asdict
from typing import Any, Optional
from datetime import datetime
import json
import uuid
import logging

@dataclass
class LLMRequestLog:
    """Structured log for LLM requests."""
    
    request_id: str
    timestamp: datetime
    model: str
    messages: list[dict]
    
    # Request parameters
    temperature: Optional[float] = None
    max_tokens: Optional[int] = None
    
    # Metadata
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    trace_id: Optional[str] = None
    
    def to_dict(self) -> dict:
        """Convert to dictionary for logging."""
        
        data = asdict(self)
        data["timestamp"] = self.timestamp.isoformat()
        return data
    
    def to_json(self) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict())

@dataclass
class LLMResponseLog:
    """Structured log for LLM responses."""
    
    request_id: str
    timestamp: datetime
    
    # Response data
    content: str
    finish_reason: str
    
    # Usage metrics
    prompt_tokens: int = 0
    completion_tokens: int = 0
    total_tokens: int = 0
    
    # Performance
    latency_ms: float = 0.0
    
    # Cost (if calculated)
    estimated_cost: Optional[float] = None
    
    # Error info
    error: Optional[str] = None
    
    def to_dict(self) -> dict:
        data = asdict(self)
        data["timestamp"] = self.timestamp.isoformat()
        return data

class LLMLogger:
    """Logger for LLM requests and responses."""
    
    def __init__(
        self,
        logger_name: str = "llm",
        log_prompts: bool = True,
        log_responses: bool = True,
        mask_pii: bool = False
    ):
        self.logger = logging.getLogger(logger_name)
        self.log_prompts = log_prompts
        self.log_responses = log_responses
        self.mask_pii = mask_pii
    
    def log_request(
        self,
        model: str,
        messages: list[dict],
        user_id: str = None,
        session_id: str = None,
        trace_id: str = None,
        **kwargs
    ) -> str:
        """Log an LLM request, return request_id."""
        
        request_id = str(uuid.uuid4())
        
        # Optionally mask PII in messages
        logged_messages = messages
        if self.mask_pii:
            logged_messages = self._mask_pii(messages)
        
        log_entry = LLMRequestLog(
            request_id=request_id,
            timestamp=datetime.now(),
            model=model,
            messages=logged_messages if self.log_prompts else [],
            temperature=kwargs.get("temperature"),
            max_tokens=kwargs.get("max_tokens"),
            user_id=user_id,
            session_id=session_id,
            trace_id=trace_id
        )
        
        self.logger.info(
            "LLM_REQUEST",
            extra={"llm_log": log_entry.to_dict()}
        )
        
        return request_id
    
    def log_response(
        self,
        request_id: str,
        response: Any,
        latency_ms: float,
        error: str = None
    ):
        """Log an LLM response."""
        
        if error:
            log_entry = LLMResponseLog(
                request_id=request_id,
                timestamp=datetime.now(),
                content="",
                finish_reason="error",
                latency_ms=latency_ms,
                error=error
            )
        else:
            content = response.choices[0].message.content
            if self.mask_pii:
                content = self._mask_pii_text(content)
            
            log_entry = LLMResponseLog(
                request_id=request_id,
                timestamp=datetime.now(),
                content=content if self.log_responses else "[REDACTED]",
                finish_reason=response.choices[0].finish_reason,
                prompt_tokens=response.usage.prompt_tokens,
                completion_tokens=response.usage.completion_tokens,
                total_tokens=response.usage.total_tokens,
                latency_ms=latency_ms
            )
        
        self.logger.info(
            "LLM_RESPONSE",
            extra={"llm_log": log_entry.to_dict()}
        )
    
    def _mask_pii(self, messages: list[dict]) -> list[dict]:
        """Mask PII in messages."""
        
        import re
        
        masked = []
        for msg in messages:
            content = msg.get("content", "")
            masked_content = self._mask_pii_text(content)
            masked.append({**msg, "content": masked_content})
        
        return masked
    
    def _mask_pii_text(self, text: str) -> str:
        """Mask PII patterns in text."""
        
        import re
        
        # Email
        text = re.sub(
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            '[EMAIL]',
            text
        )
        
        # Phone
        text = re.sub(
            r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            '[PHONE]',
            text
        )
        
        # SSN
        text = re.sub(
            r'\b\d{3}-\d{2}-\d{4}\b',
            '[SSN]',
            text
        )
        
        return text

# Logging client wrapper
class LoggingLLMClient:
    """LLM client with automatic logging."""
    
    def __init__(
        self,
        client: Any,
        logger: LLMLogger = None
    ):
        self.client = client
        self.logger = logger or LLMLogger()
    
    async def chat_completion(
        self,
        model: str,
        messages: list[dict],
        user_id: str = None,
        session_id: str = None,
        trace_id: str = None,
        **kwargs
    ) -> Any:
        """Create chat completion with logging."""
        
        request_id = self.logger.log_request(
            model=model,
            messages=messages,
            user_id=user_id,
            session_id=session_id,
            trace_id=trace_id,
            **kwargs
        )
        
        start = datetime.now()
        
        try:
            response = await self.client.chat.completions.create(
                model=model,
                messages=messages,
                **kwargs
            )
            
            latency = (datetime.now() - start).total_seconds() * 1000
            self.logger.log_response(request_id, response, latency)
            
            return response
        
        except Exception as e:
            latency = (datetime.now() - start).total_seconds() * 1000
            self.logger.log_response(request_id, None, latency, error=str(e))
            raise

Distributed Tracing

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from contextlib import contextmanager
import uuid

@dataclass
class Span:
    """A span in a distributed trace."""
    
    span_id: str
    trace_id: str
    parent_id: Optional[str]
    name: str
    start_time: datetime
    end_time: Optional[datetime] = None
    
    # Attributes
    attributes: dict = field(default_factory=dict)
    
    # Events
    events: list[dict] = field(default_factory=list)
    
    # Status
    status: str = "ok"
    error: Optional[str] = None
    
    @property
    def duration_ms(self) -> float:
        if not self.end_time:
            return 0.0
        return (self.end_time - self.start_time).total_seconds() * 1000
    
    def set_attribute(self, key: str, value: Any):
        self.attributes[key] = value
    
    def add_event(self, name: str, attributes: dict = None):
        self.events.append({
            "name": name,
            "timestamp": datetime.now().isoformat(),
            "attributes": attributes or {}
        })
    
    def set_error(self, error: str):
        self.status = "error"
        self.error = error
    
    def end(self):
        self.end_time = datetime.now()

class Tracer:
    """Distributed tracer for LLM applications."""
    
    def __init__(self, service_name: str = "llm-service"):
        self.service_name = service_name
        self.spans: dict[str, Span] = {}
        self.current_span: Optional[Span] = None
    
    def start_trace(self, name: str) -> Span:
        """Start a new trace."""
        
        trace_id = str(uuid.uuid4())
        return self.start_span(name, trace_id=trace_id)
    
    def start_span(
        self,
        name: str,
        trace_id: str = None,
        parent_id: str = None
    ) -> Span:
        """Start a new span."""
        
        span = Span(
            span_id=str(uuid.uuid4()),
            trace_id=trace_id or (self.current_span.trace_id if self.current_span else str(uuid.uuid4())),
            parent_id=parent_id or (self.current_span.span_id if self.current_span else None),
            name=name,
            start_time=datetime.now()
        )
        
        self.spans[span.span_id] = span
        self.current_span = span
        
        return span
    
    @contextmanager
    def span(self, name: str):
        """Context manager for spans."""
        
        span = self.start_span(name)
        
        try:
            yield span
        except Exception as e:
            span.set_error(str(e))
            raise
        finally:
            span.end()
            self.current_span = self.spans.get(span.parent_id)
    
    def get_trace(self, trace_id: str) -> list[Span]:
        """Get all spans for a trace."""
        
        return [
            span for span in self.spans.values()
            if span.trace_id == trace_id
        ]

class LLMTracer(Tracer):
    """Tracer with LLM-specific instrumentation."""
    
    def __init__(self, service_name: str = "llm-service"):
        super().__init__(service_name)
    
    @contextmanager
    def llm_call(
        self,
        model: str,
        operation: str = "chat_completion"
    ):
        """Trace an LLM call."""
        
        with self.span(f"llm.{operation}") as span:
            span.set_attribute("llm.model", model)
            span.set_attribute("llm.operation", operation)
            
            yield span
    
    @contextmanager
    def chain_step(self, step_name: str):
        """Trace a chain step."""
        
        with self.span(f"chain.{step_name}") as span:
            span.set_attribute("chain.step", step_name)
            yield span
    
    @contextmanager
    def retrieval(self, source: str):
        """Trace a retrieval operation."""
        
        with self.span("retrieval") as span:
            span.set_attribute("retrieval.source", source)
            yield span

# Traced LLM client
class TracedLLMClient:
    """LLM client with distributed tracing."""
    
    def __init__(
        self,
        client: Any,
        tracer: LLMTracer = None
    ):
        self.client = client
        self.tracer = tracer or LLMTracer()
    
    async def chat_completion(
        self,
        model: str,
        messages: list[dict],
        **kwargs
    ) -> Any:
        """Create chat completion with tracing."""
        
        with self.tracer.llm_call(model) as span:
            span.set_attribute("llm.message_count", len(messages))
            
            try:
                response = await self.client.chat.completions.create(
                    model=model,
                    messages=messages,
                    **kwargs
                )
                
                # Record usage
                span.set_attribute("llm.prompt_tokens", response.usage.prompt_tokens)
                span.set_attribute("llm.completion_tokens", response.usage.completion_tokens)
                span.set_attribute("llm.total_tokens", response.usage.total_tokens)
                
                return response
            
            except Exception as e:
                span.set_error(str(e))
                raise

Metrics Collection

from dataclasses import dataclass, field
from typing import Any, Callable
from datetime import datetime, timedelta
from collections import defaultdict
import threading

@dataclass
class MetricValue:
    """A metric value with timestamp."""
    
    value: float
    timestamp: datetime
    labels: dict = field(default_factory=dict)

class Counter:
    """A counter metric."""
    
    def __init__(self, name: str, description: str = ""):
        self.name = name
        self.description = description
        self._values: dict[tuple, float] = defaultdict(float)
        self._lock = threading.Lock()
    
    def inc(self, value: float = 1.0, **labels):
        """Increment counter."""
        
        key = tuple(sorted(labels.items()))
        
        with self._lock:
            self._values[key] += value
    
    def get(self, **labels) -> float:
        """Get counter value."""
        
        key = tuple(sorted(labels.items()))
        return self._values.get(key, 0.0)

class Histogram:
    """A histogram metric."""
    
    def __init__(
        self,
        name: str,
        description: str = "",
        buckets: list[float] = None
    ):
        self.name = name
        self.description = description
        self.buckets = buckets or [0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]
        
        self._counts: dict[tuple, dict[float, int]] = defaultdict(
            lambda: {b: 0 for b in self.buckets + [float('inf')]}
        )
        self._sums: dict[tuple, float] = defaultdict(float)
        self._totals: dict[tuple, int] = defaultdict(int)
        self._lock = threading.Lock()
    
    def observe(self, value: float, **labels):
        """Record an observation."""
        
        key = tuple(sorted(labels.items()))
        
        with self._lock:
            self._sums[key] += value
            self._totals[key] += 1
            
            for bucket in self.buckets + [float('inf')]:
                if value <= bucket:
                    self._counts[key][bucket] += 1
    
    def get_percentile(self, percentile: float, **labels) -> float:
        """Get approximate percentile."""
        
        key = tuple(sorted(labels.items()))
        total = self._totals.get(key, 0)
        
        if total == 0:
            return 0.0
        
        target = total * percentile
        cumulative = 0
        
        for bucket in self.buckets:
            cumulative += self._counts[key].get(bucket, 0)
            if cumulative >= target:
                return bucket
        
        return self.buckets[-1]

class LLMMetrics:
    """Metrics collector for LLM applications."""
    
    def __init__(self):
        # Request metrics
        self.request_count = Counter(
            "llm_requests_total",
            "Total LLM requests"
        )
        
        self.request_latency = Histogram(
            "llm_request_latency_seconds",
            "LLM request latency",
            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
        )
        
        # Token metrics
        self.tokens_used = Counter(
            "llm_tokens_total",
            "Total tokens used"
        )
        
        # Error metrics
        self.error_count = Counter(
            "llm_errors_total",
            "Total LLM errors"
        )
        
        # Cost metrics
        self.estimated_cost = Counter(
            "llm_cost_dollars",
            "Estimated cost in dollars"
        )
    
    def record_request(
        self,
        model: str,
        latency_seconds: float,
        prompt_tokens: int,
        completion_tokens: int,
        success: bool = True,
        error_type: str = None
    ):
        """Record a request with all metrics."""
        
        # Request count
        self.request_count.inc(
            model=model,
            status="success" if success else "error"
        )
        
        # Latency
        self.request_latency.observe(
            latency_seconds,
            model=model
        )
        
        # Tokens
        self.tokens_used.inc(prompt_tokens, model=model, type="prompt")
        self.tokens_used.inc(completion_tokens, model=model, type="completion")
        
        # Errors
        if not success:
            self.error_count.inc(model=model, error_type=error_type or "unknown")
        
        # Cost estimation
        cost = self._estimate_cost(model, prompt_tokens, completion_tokens)
        self.estimated_cost.inc(cost, model=model)
    
    def _estimate_cost(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int
    ) -> float:
        """Estimate cost based on model pricing."""
        
        # Pricing per 1M tokens (example rates)
        pricing = {
            "gpt-4o": {"prompt": 2.50, "completion": 10.00},
            "gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
            "claude-3-5-sonnet": {"prompt": 3.00, "completion": 15.00},
        }
        
        rates = pricing.get(model, {"prompt": 1.0, "completion": 2.0})
        
        prompt_cost = (prompt_tokens / 1_000_000) * rates["prompt"]
        completion_cost = (completion_tokens / 1_000_000) * rates["completion"]
        
        return prompt_cost + completion_cost
    
    def get_summary(self) -> dict:
        """Get metrics summary."""
        
        return {
            "total_requests": sum(self.request_count._values.values()),
            "total_tokens": sum(self.tokens_used._values.values()),
            "total_errors": sum(self.error_count._values.values()),
            "total_cost": sum(self.estimated_cost._values.values()),
            "p50_latency": self.request_latency.get_percentile(0.5),
            "p95_latency": self.request_latency.get_percentile(0.95),
            "p99_latency": self.request_latency.get_percentile(0.99)
        }

Log Storage and Export

from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
import json
import asyncio

class LogStorage:
    """Base class for log storage."""
    
    async def store(self, log_entry: dict):
        raise NotImplementedError
    
    async def query(
        self,
        start_time: datetime,
        end_time: datetime,
        filters: dict = None
    ) -> list[dict]:
        raise NotImplementedError

class InMemoryLogStorage(LogStorage):
    """In-memory log storage for development."""
    
    def __init__(self, max_entries: int = 10000):
        self.logs: list[dict] = []
        self.max_entries = max_entries
    
    async def store(self, log_entry: dict):
        self.logs.append(log_entry)
        
        # Trim if over limit
        if len(self.logs) > self.max_entries:
            self.logs = self.logs[-self.max_entries:]
    
    async def query(
        self,
        start_time: datetime,
        end_time: datetime,
        filters: dict = None
    ) -> list[dict]:
        results = []
        
        for log in self.logs:
            timestamp = datetime.fromisoformat(log.get("timestamp", ""))
            
            if start_time <= timestamp <= end_time:
                if self._matches_filters(log, filters):
                    results.append(log)
        
        return results
    
    def _matches_filters(self, log: dict, filters: dict) -> bool:
        if not filters:
            return True
        
        for key, value in filters.items():
            if log.get(key) != value:
                return False
        
        return True

class FileLogStorage(LogStorage):
    """File-based log storage."""
    
    def __init__(self, log_dir: str):
        self.log_dir = log_dir
        import os
        os.makedirs(log_dir, exist_ok=True)
    
    async def store(self, log_entry: dict):
        import os
        
        date = datetime.now().strftime("%Y-%m-%d")
        filename = os.path.join(self.log_dir, f"llm_logs_{date}.jsonl")
        
        async with asyncio.Lock():
            with open(filename, "a") as f:
                f.write(json.dumps(log_entry) + "\n")
    
    async def query(
        self,
        start_time: datetime,
        end_time: datetime,
        filters: dict = None
    ) -> list[dict]:
        import os
        import glob
        
        results = []
        pattern = os.path.join(self.log_dir, "llm_logs_*.jsonl")
        
        for filepath in glob.glob(pattern):
            with open(filepath) as f:
                for line in f:
                    log = json.loads(line)
                    timestamp = datetime.fromisoformat(log.get("timestamp", ""))
                    
                    if start_time <= timestamp <= end_time:
                        if self._matches_filters(log, filters):
                            results.append(log)
        
        return results
    
    def _matches_filters(self, log: dict, filters: dict) -> bool:
        if not filters:
            return True
        
        for key, value in filters.items():
            if log.get(key) != value:
                return False
        
        return True

# Export to external systems
class LogExporter:
    """Export logs to external systems."""
    
    def __init__(self, storage: LogStorage):
        self.storage = storage
    
    async def export_to_json(
        self,
        start_time: datetime,
        end_time: datetime,
        output_file: str
    ):
        """Export logs to JSON file."""
        
        logs = await self.storage.query(start_time, end_time)
        
        with open(output_file, "w") as f:
            json.dump(logs, f, indent=2)
    
    async def export_to_csv(
        self,
        start_time: datetime,
        end_time: datetime,
        output_file: str
    ):
        """Export logs to CSV file."""
        
        import csv
        
        logs = await self.storage.query(start_time, end_time)
        
        if not logs:
            return
        
        # Get all keys
        keys = set()
        for log in logs:
            keys.update(log.keys())
        
        with open(output_file, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=sorted(keys))
            writer.writeheader()
            writer.writerows(logs)

Production Logging Service

from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import Optional
from datetime import datetime, timedelta

app = FastAPI()

# Initialize components
logger = LLMLogger()
tracer = LLMTracer()
metrics = LLMMetrics()
storage = InMemoryLogStorage()

class LogQueryRequest(BaseModel):
    start_time: datetime
    end_time: datetime
    model: Optional[str] = None
    user_id: Optional[str] = None
    trace_id: Optional[str] = None

@app.get("/v1/logs")
async def query_logs(
    start_time: datetime = Query(...),
    end_time: datetime = Query(...),
    model: Optional[str] = None,
    user_id: Optional[str] = None
):
    """Query logs with filters."""
    
    filters = {}
    if model:
        filters["model"] = model
    if user_id:
        filters["user_id"] = user_id
    
    logs = await storage.query(start_time, end_time, filters)
    
    return {"logs": logs, "count": len(logs)}

@app.get("/v1/traces/{trace_id}")
async def get_trace(trace_id: str):
    """Get all spans for a trace."""
    
    spans = tracer.get_trace(trace_id)
    
    if not spans:
        raise HTTPException(404, "Trace not found")
    
    return {
        "trace_id": trace_id,
        "spans": [
            {
                "span_id": s.span_id,
                "parent_id": s.parent_id,
                "name": s.name,
                "start_time": s.start_time.isoformat(),
                "end_time": s.end_time.isoformat() if s.end_time else None,
                "duration_ms": s.duration_ms,
                "attributes": s.attributes,
                "status": s.status
            }
            for s in spans
        ]
    }

@app.get("/v1/metrics")
async def get_metrics():
    """Get current metrics."""
    
    return metrics.get_summary()

@app.get("/v1/metrics/prometheus")
async def get_prometheus_metrics():
    """Export metrics in Prometheus format."""
    
    lines = []
    
    # Request count
    lines.append("# HELP llm_requests_total Total LLM requests")
    lines.append("# TYPE llm_requests_total counter")
    for labels, value in metrics.request_count._values.items():
        label_str = ",".join(f'{k}="{v}"' for k, v in labels)
        lines.append(f"llm_requests_total{{{label_str}}} {value}")
    
    # Tokens
    lines.append("# HELP llm_tokens_total Total tokens used")
    lines.append("# TYPE llm_tokens_total counter")
    for labels, value in metrics.tokens_used._values.items():
        label_str = ",".join(f'{k}="{v}"' for k, v in labels)
        lines.append(f"llm_tokens_total{{{label_str}}} {value}")
    
    return "\n".join(lines)

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Comprehensive logging and tracing are essential for production LLM applications. Structured logging captures the full context of each request—prompts, responses, token usage, and latency—enabling debugging and analysis. Distributed tracing connects multi-step LLM chains, showing how requests flow through your system and where time is spent. Metrics provide aggregate views for monitoring and alerting—track request rates, error rates, latencies, and costs. Consider PII masking for sensitive data and implement log retention policies. Use specialized LLM observability tools like LangSmith or Weights & Biases for advanced features like prompt versioning and evaluation. The goal is complete visibility into your AI systems—you can’t improve what you can’t measure.