Introduction: Debugging LLM chains is fundamentally different from debugging traditional software. When a chain fails, the problem could be in the prompt, the model’s interpretation, the output parsing, or any of the intermediate steps. The non-deterministic nature of LLMs means the same input can produce different outputs, making reproduction difficult. Effective chain debugging requires comprehensive tracing that captures every step’s inputs, outputs, and timing; inspection tools that let you examine intermediate states; and analysis capabilities that help identify patterns in failures. This guide covers practical techniques for debugging LLM chains: from basic logging and tracing to sophisticated replay systems and automated failure analysis. The goal is to transform the black box of LLM chains into observable, debuggable systems where you can quickly identify and fix issues.

Tracing Infrastructure
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict, Callable
from datetime import datetime
from enum import Enum
import uuid
import json
import time
class StepType(Enum):
"""Types of chain steps."""
LLM_CALL = "llm_call"
RETRIEVAL = "retrieval"
TOOL_CALL = "tool_call"
TRANSFORM = "transform"
PARSE = "parse"
VALIDATE = "validate"
class StepStatus(Enum):
"""Status of a step execution."""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class StepTrace:
"""Trace of a single step execution."""
step_id: str
step_name: str
step_type: StepType
status: StepStatus
input_data: Any
output_data: Any = None
error: str = None
start_time: datetime = None
end_time: datetime = None
duration_ms: float = 0
metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"step_id": self.step_id,
"step_name": self.step_name,
"step_type": self.step_type.value,
"status": self.status.value,
"input_data": self._serialize(self.input_data),
"output_data": self._serialize(self.output_data),
"error": self.error,
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_ms": self.duration_ms,
"metadata": self.metadata
}
def _serialize(self, data: Any) -> Any:
try:
json.dumps(data)
return data
except (TypeError, ValueError):
return str(data)
@dataclass
class ChainTrace:
"""Complete trace of a chain execution."""
trace_id: str
chain_name: str
steps: list[StepTrace] = field(default_factory=list)
input_data: Any = None
output_data: Any = None
status: StepStatus = StepStatus.PENDING
start_time: datetime = None
end_time: datetime = None
total_duration_ms: float = 0
error: str = None
metadata: dict = field(default_factory=dict)
def add_step(self, step: StepTrace):
self.steps.append(step)
def get_failed_steps(self) -> list[StepTrace]:
return [s for s in self.steps if s.status == StepStatus.FAILED]
def get_step_by_name(self, name: str) -> Optional[StepTrace]:
for step in self.steps:
if step.step_name == name:
return step
return None
def to_dict(self) -> dict:
return {
"trace_id": self.trace_id,
"chain_name": self.chain_name,
"steps": [s.to_dict() for s in self.steps],
"input_data": self.input_data,
"output_data": self.output_data,
"status": self.status.value,
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"total_duration_ms": self.total_duration_ms,
"error": self.error,
"metadata": self.metadata
}
class Tracer:
"""Trace chain executions."""
def __init__(self):
self.current_trace: Optional[ChainTrace] = None
self.traces: list[ChainTrace] = []
self.callbacks: list[Callable] = []
def start_chain(self, chain_name: str, input_data: Any) -> ChainTrace:
"""Start tracing a chain."""
self.current_trace = ChainTrace(
trace_id=str(uuid.uuid4()),
chain_name=chain_name,
input_data=input_data,
status=StepStatus.RUNNING,
start_time=datetime.now()
)
return self.current_trace
def start_step(
self,
step_name: str,
step_type: StepType,
input_data: Any
) -> StepTrace:
"""Start tracing a step."""
step = StepTrace(
step_id=str(uuid.uuid4()),
step_name=step_name,
step_type=step_type,
status=StepStatus.RUNNING,
input_data=input_data,
start_time=datetime.now()
)
if self.current_trace:
self.current_trace.add_step(step)
return step
def end_step(
self,
step: StepTrace,
output_data: Any = None,
error: str = None
):
"""End tracing a step."""
step.end_time = datetime.now()
step.duration_ms = (step.end_time - step.start_time).total_seconds() * 1000
step.output_data = output_data
step.error = error
step.status = StepStatus.FAILED if error else StepStatus.SUCCESS
# Notify callbacks
for callback in self.callbacks:
callback("step_end", step)
def end_chain(
self,
output_data: Any = None,
error: str = None
):
"""End tracing a chain."""
if not self.current_trace:
return
self.current_trace.end_time = datetime.now()
self.current_trace.total_duration_ms = (
self.current_trace.end_time - self.current_trace.start_time
).total_seconds() * 1000
self.current_trace.output_data = output_data
self.current_trace.error = error
self.current_trace.status = StepStatus.FAILED if error else StepStatus.SUCCESS
self.traces.append(self.current_trace)
# Notify callbacks
for callback in self.callbacks:
callback("chain_end", self.current_trace)
self.current_trace = None
def add_callback(self, callback: Callable):
"""Add trace callback."""
self.callbacks.append(callback)
class TracingContext:
"""Context manager for tracing."""
def __init__(
self,
tracer: Tracer,
step_name: str,
step_type: StepType,
input_data: Any
):
self.tracer = tracer
self.step_name = step_name
self.step_type = step_type
self.input_data = input_data
self.step: Optional[StepTrace] = None
def __enter__(self) -> StepTrace:
self.step = self.tracer.start_step(
self.step_name,
self.step_type,
self.input_data
)
return self.step
def __exit__(self, exc_type, exc_val, exc_tb):
error = str(exc_val) if exc_val else None
self.tracer.end_step(self.step, error=error)
return False
Step Inspection
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import difflib
import json
@dataclass
class InspectionResult:
"""Result of step inspection."""
step_name: str
issues: list[str]
suggestions: list[str]
data_summary: dict
class StepInspector:
"""Inspect chain steps for issues."""
def __init__(self):
self.inspectors: dict[StepType, Callable] = {}
self._register_default_inspectors()
def _register_default_inspectors(self):
"""Register default inspectors."""
self.inspectors[StepType.LLM_CALL] = self._inspect_llm_call
self.inspectors[StepType.RETRIEVAL] = self._inspect_retrieval
self.inspectors[StepType.PARSE] = self._inspect_parse
self.inspectors[StepType.TOOL_CALL] = self._inspect_tool_call
def inspect(self, step: StepTrace) -> InspectionResult:
"""Inspect a step."""
inspector = self.inspectors.get(step.step_type)
if inspector:
return inspector(step)
return self._default_inspect(step)
def _inspect_llm_call(self, step: StepTrace) -> InspectionResult:
"""Inspect LLM call step."""
issues = []
suggestions = []
# Check input
input_data = step.input_data
if isinstance(input_data, dict):
prompt = input_data.get("prompt", "")
# Check prompt length
if len(prompt) > 10000:
issues.append("Prompt is very long (>10000 chars)")
suggestions.append("Consider summarizing or chunking the input")
# Check for empty prompt
if not prompt.strip():
issues.append("Empty prompt")
suggestions.append("Ensure prompt is properly constructed")
# Check output
if step.status == StepStatus.SUCCESS:
output = step.output_data
if isinstance(output, str):
# Check for truncation
if output.endswith("...") or len(output) < 10:
issues.append("Output may be truncated or incomplete")
suggestions.append("Check max_tokens setting")
# Check for refusal
refusal_phrases = ["I cannot", "I'm unable", "I don't have"]
for phrase in refusal_phrases:
if phrase.lower() in output.lower():
issues.append(f"Model may have refused: contains '{phrase}'")
suggestions.append("Review prompt for policy violations")
# Check timing
if step.duration_ms > 30000:
issues.append(f"Slow response: {step.duration_ms:.0f}ms")
suggestions.append("Consider using a faster model or reducing input size")
return InspectionResult(
step_name=step.step_name,
issues=issues,
suggestions=suggestions,
data_summary={
"input_length": len(str(step.input_data)),
"output_length": len(str(step.output_data)) if step.output_data else 0,
"duration_ms": step.duration_ms
}
)
def _inspect_retrieval(self, step: StepTrace) -> InspectionResult:
"""Inspect retrieval step."""
issues = []
suggestions = []
output = step.output_data
if isinstance(output, list):
# Check number of results
if len(output) == 0:
issues.append("No documents retrieved")
suggestions.append("Check query or expand search parameters")
elif len(output) < 3:
issues.append(f"Few documents retrieved: {len(output)}")
suggestions.append("Consider lowering similarity threshold")
# Check relevance scores if available
if output and isinstance(output[0], dict):
scores = [d.get("score", 0) for d in output]
if scores and max(scores) < 0.5:
issues.append(f"Low relevance scores: max={max(scores):.2f}")
suggestions.append("Query may not match document content well")
return InspectionResult(
step_name=step.step_name,
issues=issues,
suggestions=suggestions,
data_summary={
"num_results": len(output) if isinstance(output, list) else 0,
"duration_ms": step.duration_ms
}
)
def _inspect_parse(self, step: StepTrace) -> InspectionResult:
"""Inspect parse step."""
issues = []
suggestions = []
if step.status == StepStatus.FAILED:
issues.append(f"Parse failed: {step.error}")
# Analyze input for common issues
input_str = str(step.input_data)
if "```" in input_str:
suggestions.append("Input contains code blocks - try extracting content first")
if input_str.startswith("I ") or input_str.startswith("Here"):
suggestions.append("LLM added preamble - try stricter output instructions")
return InspectionResult(
step_name=step.step_name,
issues=issues,
suggestions=suggestions,
data_summary={
"input_length": len(str(step.input_data)),
"parse_success": step.status == StepStatus.SUCCESS
}
)
def _inspect_tool_call(self, step: StepTrace) -> InspectionResult:
"""Inspect tool call step."""
issues = []
suggestions = []
if step.status == StepStatus.FAILED:
error = step.error or ""
if "timeout" in error.lower():
issues.append("Tool call timed out")
suggestions.append("Increase timeout or optimize tool")
elif "not found" in error.lower():
issues.append("Tool not found")
suggestions.append("Check tool registration and naming")
elif "invalid" in error.lower():
issues.append("Invalid tool arguments")
suggestions.append("Review argument schema and LLM output")
return InspectionResult(
step_name=step.step_name,
issues=issues,
suggestions=suggestions,
data_summary={
"tool_name": step.metadata.get("tool_name", "unknown"),
"duration_ms": step.duration_ms
}
)
def _default_inspect(self, step: StepTrace) -> InspectionResult:
"""Default inspection."""
issues = []
suggestions = []
if step.status == StepStatus.FAILED:
issues.append(f"Step failed: {step.error}")
return InspectionResult(
step_name=step.step_name,
issues=issues,
suggestions=suggestions,
data_summary={
"duration_ms": step.duration_ms,
"status": step.status.value
}
)
class OutputComparator:
"""Compare outputs between runs."""
def compare(
self,
expected: Any,
actual: Any
) -> dict:
"""Compare expected vs actual output."""
result = {
"match": False,
"similarity": 0.0,
"differences": []
}
if expected == actual:
result["match"] = True
result["similarity"] = 1.0
return result
# String comparison
if isinstance(expected, str) and isinstance(actual, str):
result["similarity"] = self._string_similarity(expected, actual)
result["differences"] = self._string_diff(expected, actual)
# Dict comparison
elif isinstance(expected, dict) and isinstance(actual, dict):
result["similarity"] = self._dict_similarity(expected, actual)
result["differences"] = self._dict_diff(expected, actual)
# List comparison
elif isinstance(expected, list) and isinstance(actual, list):
result["similarity"] = self._list_similarity(expected, actual)
result["differences"] = self._list_diff(expected, actual)
return result
def _string_similarity(self, s1: str, s2: str) -> float:
"""Calculate string similarity."""
return difflib.SequenceMatcher(None, s1, s2).ratio()
def _string_diff(self, s1: str, s2: str) -> list[str]:
"""Get string differences."""
diff = difflib.unified_diff(
s1.splitlines(),
s2.splitlines(),
lineterm=""
)
return list(diff)
def _dict_similarity(self, d1: dict, d2: dict) -> float:
"""Calculate dict similarity."""
all_keys = set(d1.keys()) | set(d2.keys())
if not all_keys:
return 1.0
matching = sum(1 for k in all_keys if d1.get(k) == d2.get(k))
return matching / len(all_keys)
def _dict_diff(self, d1: dict, d2: dict) -> list[str]:
"""Get dict differences."""
diffs = []
all_keys = set(d1.keys()) | set(d2.keys())
for key in all_keys:
if key not in d1:
diffs.append(f"Missing in expected: {key}")
elif key not in d2:
diffs.append(f"Missing in actual: {key}")
elif d1[key] != d2[key]:
diffs.append(f"Different value for {key}: {d1[key]} vs {d2[key]}")
return diffs
def _list_similarity(self, l1: list, l2: list) -> float:
"""Calculate list similarity."""
if not l1 and not l2:
return 1.0
max_len = max(len(l1), len(l2))
matching = sum(1 for a, b in zip(l1, l2) if a == b)
return matching / max_len
def _list_diff(self, l1: list, l2: list) -> list[str]:
"""Get list differences."""
diffs = []
for i, (a, b) in enumerate(zip(l1, l2)):
if a != b:
diffs.append(f"Index {i}: {a} vs {b}")
if len(l1) != len(l2):
diffs.append(f"Length difference: {len(l1)} vs {len(l2)}")
return diffs
Failure Analysis
from dataclasses import dataclass
from typing import Any, Optional, List
from collections import Counter
import re
@dataclass
class FailurePattern:
"""A pattern of failures."""
pattern_name: str
description: str
occurrences: int
example_traces: list[str]
suggested_fix: str
class FailureAnalyzer:
"""Analyze chain failures."""
def __init__(self):
self.failure_patterns: list[tuple[str, Callable]] = []
self._register_default_patterns()
def _register_default_patterns(self):
"""Register default failure patterns."""
self.failure_patterns.append(("parse_failure", self._detect_parse_failure))
self.failure_patterns.append(("timeout", self._detect_timeout))
self.failure_patterns.append(("rate_limit", self._detect_rate_limit))
self.failure_patterns.append(("context_overflow", self._detect_context_overflow))
self.failure_patterns.append(("empty_response", self._detect_empty_response))
def analyze(self, traces: list[ChainTrace]) -> list[FailurePattern]:
"""Analyze traces for failure patterns."""
failed_traces = [t for t in traces if t.status == StepStatus.FAILED]
if not failed_traces:
return []
patterns = []
for pattern_name, detector in self.failure_patterns:
matches = detector(failed_traces)
if matches:
patterns.append(matches)
return patterns
def _detect_parse_failure(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
"""Detect parse failures."""
parse_failures = []
for trace in traces:
for step in trace.steps:
if step.step_type == StepType.PARSE and step.status == StepStatus.FAILED:
parse_failures.append(trace.trace_id)
if not parse_failures:
return None
return FailurePattern(
pattern_name="parse_failure",
description="LLM output could not be parsed into expected format",
occurrences=len(parse_failures),
example_traces=parse_failures[:5],
suggested_fix="Add output format examples to prompt, use JSON mode, or implement more robust parsing"
)
def _detect_timeout(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
"""Detect timeout failures."""
timeouts = []
for trace in traces:
if trace.error and "timeout" in trace.error.lower():
timeouts.append(trace.trace_id)
for step in trace.steps:
if step.error and "timeout" in step.error.lower():
timeouts.append(trace.trace_id)
if not timeouts:
return None
return FailurePattern(
pattern_name="timeout",
description="Chain or step timed out",
occurrences=len(set(timeouts)),
example_traces=list(set(timeouts))[:5],
suggested_fix="Increase timeout, reduce input size, or use a faster model"
)
def _detect_rate_limit(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
"""Detect rate limit failures."""
rate_limits = []
for trace in traces:
error = (trace.error or "").lower()
if "rate" in error and "limit" in error:
rate_limits.append(trace.trace_id)
for step in trace.steps:
error = (step.error or "").lower()
if "rate" in error and "limit" in error:
rate_limits.append(trace.trace_id)
if not rate_limits:
return None
return FailurePattern(
pattern_name="rate_limit",
description="API rate limit exceeded",
occurrences=len(set(rate_limits)),
example_traces=list(set(rate_limits))[:5],
suggested_fix="Implement rate limiting, add retries with backoff, or increase API quota"
)
def _detect_context_overflow(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
"""Detect context length overflow."""
overflows = []
for trace in traces:
error = (trace.error or "").lower()
if "context" in error and ("length" in error or "token" in error):
overflows.append(trace.trace_id)
for step in trace.steps:
error = (step.error or "").lower()
if "context" in error and ("length" in error or "token" in error):
overflows.append(trace.trace_id)
if not overflows:
return None
return FailurePattern(
pattern_name="context_overflow",
description="Input exceeded model's context window",
occurrences=len(set(overflows)),
example_traces=list(set(overflows))[:5],
suggested_fix="Implement context compression, chunking, or use a model with larger context"
)
def _detect_empty_response(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
"""Detect empty response failures."""
empty_responses = []
for trace in traces:
for step in trace.steps:
if step.step_type == StepType.LLM_CALL:
output = step.output_data
if output is None or (isinstance(output, str) and not output.strip()):
empty_responses.append(trace.trace_id)
if not empty_responses:
return None
return FailurePattern(
pattern_name="empty_response",
description="LLM returned empty or null response",
occurrences=len(set(empty_responses)),
example_traces=list(set(empty_responses))[:5],
suggested_fix="Check for content filtering, add explicit output instructions, or verify API response handling"
)
class RootCauseAnalyzer:
"""Analyze root cause of failures."""
def analyze(self, trace: ChainTrace) -> dict:
"""Analyze root cause of a failed trace."""
if trace.status != StepStatus.FAILED:
return {"root_cause": None, "analysis": "Chain did not fail"}
# Find first failed step
failed_steps = trace.get_failed_steps()
if not failed_steps:
return {
"root_cause": "chain_level",
"analysis": f"Chain failed without step failure: {trace.error}"
}
first_failure = failed_steps[0]
# Analyze the failure
analysis = {
"root_cause": first_failure.step_name,
"step_type": first_failure.step_type.value,
"error": first_failure.error,
"input_summary": self._summarize_input(first_failure.input_data),
"preceding_steps": self._get_preceding_steps(trace, first_failure),
"suggestions": self._get_suggestions(first_failure)
}
return analysis
def _summarize_input(self, input_data: Any) -> dict:
"""Summarize input data."""
if isinstance(input_data, str):
return {
"type": "string",
"length": len(input_data),
"preview": input_data[:200] + "..." if len(input_data) > 200 else input_data
}
elif isinstance(input_data, dict):
return {
"type": "dict",
"keys": list(input_data.keys()),
"size": len(str(input_data))
}
elif isinstance(input_data, list):
return {
"type": "list",
"length": len(input_data)
}
else:
return {
"type": type(input_data).__name__,
"value": str(input_data)[:200]
}
def _get_preceding_steps(
self,
trace: ChainTrace,
failed_step: StepTrace
) -> list[dict]:
"""Get steps that preceded the failure."""
preceding = []
for step in trace.steps:
if step.step_id == failed_step.step_id:
break
preceding.append({
"name": step.step_name,
"status": step.status.value,
"duration_ms": step.duration_ms
})
return preceding
def _get_suggestions(self, step: StepTrace) -> list[str]:
"""Get suggestions for fixing the failure."""
suggestions = []
error = (step.error or "").lower()
if "json" in error:
suggestions.append("Add JSON output format instructions to prompt")
suggestions.append("Use JSON mode if available")
if "timeout" in error:
suggestions.append("Increase timeout setting")
suggestions.append("Reduce input size")
if "rate" in error:
suggestions.append("Add rate limiting")
suggestions.append("Implement exponential backoff")
if not suggestions:
suggestions.append("Review step input and error message")
suggestions.append("Add more detailed logging")
return suggestions
Replay and Reproduction
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import json
import copy
@dataclass
class ReplayConfig:
"""Configuration for replay."""
use_cached_llm: bool = True
use_cached_retrieval: bool = True
use_cached_tools: bool = False
stop_on_divergence: bool = False
class TraceReplayer:
"""Replay chain executions from traces."""
def __init__(self, chain: Any, config: ReplayConfig = None):
self.chain = chain
self.config = config or ReplayConfig()
self.cached_responses: dict[str, Any] = {}
def load_trace(self, trace: ChainTrace):
"""Load trace for replay."""
# Cache responses from trace
for step in trace.steps:
if step.status == StepStatus.SUCCESS:
cache_key = self._make_cache_key(step)
self.cached_responses[cache_key] = step.output_data
def replay(
self,
trace: ChainTrace,
modifications: dict = None
) -> ChainTrace:
"""Replay a trace with optional modifications."""
# Load cached responses
self.load_trace(trace)
# Apply modifications to input
input_data = copy.deepcopy(trace.input_data)
if modifications:
input_data = self._apply_modifications(input_data, modifications)
# Create new tracer
tracer = Tracer()
# Inject cache into chain
original_llm = self.chain.llm
if self.config.use_cached_llm:
self.chain.llm = CachedLLM(original_llm, self.cached_responses)
try:
# Run chain
new_trace = tracer.start_chain(trace.chain_name, input_data)
result = self.chain.run(input_data, tracer=tracer)
tracer.end_chain(output_data=result)
except Exception as e:
tracer.end_chain(error=str(e))
finally:
# Restore original LLM
self.chain.llm = original_llm
return tracer.current_trace or new_trace
def _make_cache_key(self, step: StepTrace) -> str:
"""Create cache key for step."""
input_str = json.dumps(step.input_data, sort_keys=True, default=str)
return f"{step.step_name}:{hash(input_str)}"
def _apply_modifications(self, data: Any, modifications: dict) -> Any:
"""Apply modifications to data."""
if isinstance(data, dict):
result = dict(data)
for key, value in modifications.items():
if "." in key:
# Nested key
parts = key.split(".")
current = result
for part in parts[:-1]:
current = current.setdefault(part, {})
current[parts[-1]] = value
else:
result[key] = value
return result
return data
class CachedLLM:
"""LLM wrapper that uses cached responses."""
def __init__(self, llm: Any, cache: dict):
self.llm = llm
self.cache = cache
async def generate(self, prompt: str, **kwargs) -> str:
"""Generate with cache lookup."""
cache_key = f"llm:{hash(prompt)}"
if cache_key in self.cache:
return self.cache[cache_key]
return await self.llm.generate(prompt, **kwargs)
class DebugSession:
"""Interactive debug session."""
def __init__(self, trace: ChainTrace):
self.trace = trace
self.current_step_idx = 0
self.breakpoints: set[str] = set()
self.watches: dict[str, Callable] = {}
def set_breakpoint(self, step_name: str):
"""Set breakpoint at step."""
self.breakpoints.add(step_name)
def remove_breakpoint(self, step_name: str):
"""Remove breakpoint."""
self.breakpoints.discard(step_name)
def add_watch(self, name: str, extractor: Callable):
"""Add watch expression."""
self.watches[name] = extractor
def step_forward(self) -> Optional[StepTrace]:
"""Move to next step."""
if self.current_step_idx >= len(self.trace.steps):
return None
step = self.trace.steps[self.current_step_idx]
self.current_step_idx += 1
return step
def step_back(self) -> Optional[StepTrace]:
"""Move to previous step."""
if self.current_step_idx <= 0:
return None
self.current_step_idx -= 1
return self.trace.steps[self.current_step_idx]
def current_step(self) -> Optional[StepTrace]:
"""Get current step."""
if 0 <= self.current_step_idx < len(self.trace.steps):
return self.trace.steps[self.current_step_idx]
return None
def get_watches(self) -> dict[str, Any]:
"""Evaluate watch expressions."""
step = self.current_step()
if not step:
return {}
results = {}
for name, extractor in self.watches.items():
try:
results[name] = extractor(step)
except Exception as e:
results[name] = f"Error: {e}"
return results
def run_to_breakpoint(self) -> Optional[StepTrace]:
"""Run until breakpoint hit."""
while self.current_step_idx < len(self.trace.steps):
step = self.trace.steps[self.current_step_idx]
if step.step_name in self.breakpoints:
return step
self.current_step_idx += 1
return None
def get_state_at_step(self, step_idx: int) -> dict:
"""Get chain state at specific step."""
state = {
"step_idx": step_idx,
"completed_steps": [],
"current_output": None
}
for i, step in enumerate(self.trace.steps[:step_idx + 1]):
state["completed_steps"].append({
"name": step.step_name,
"status": step.status.value
})
if step.status == StepStatus.SUCCESS:
state["current_output"] = step.output_data
return state
Production Debug Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
app = FastAPI()
class TraceQuery(BaseModel):
chain_name: Optional[str] = None
status: Optional[str] = None
start_date: Optional[str] = None
end_date: Optional[str] = None
limit: int = 100
class InspectRequest(BaseModel):
trace_id: str
step_name: Optional[str] = None
class ReplayRequest(BaseModel):
trace_id: str
modifications: Optional[Dict[str, Any]] = None
# Initialize components
tracer = Tracer()
inspector = StepInspector()
failure_analyzer = FailureAnalyzer()
root_cause_analyzer = RootCauseAnalyzer()
# Store traces (would use database in production)
trace_store: dict[str, ChainTrace] = {}
@app.post("/v1/traces")
async def store_trace(trace: dict) -> dict:
"""Store a trace."""
trace_id = trace.get("trace_id")
if not trace_id:
raise HTTPException(status_code=400, detail="trace_id required")
# Convert to ChainTrace (simplified)
trace_store[trace_id] = trace
return {"stored": True, "trace_id": trace_id}
@app.get("/v1/traces/{trace_id}")
async def get_trace(trace_id: str) -> dict:
"""Get a trace by ID."""
trace = trace_store.get(trace_id)
if not trace:
raise HTTPException(status_code=404, detail="Trace not found")
return trace
@app.post("/v1/traces/search")
async def search_traces(query: TraceQuery) -> list[dict]:
"""Search traces."""
results = []
for trace_id, trace in trace_store.items():
# Apply filters
if query.chain_name and trace.get("chain_name") != query.chain_name:
continue
if query.status and trace.get("status") != query.status:
continue
results.append(trace)
if len(results) >= query.limit:
break
return results
@app.post("/v1/inspect")
async def inspect_trace(request: InspectRequest) -> dict:
"""Inspect a trace."""
trace = trace_store.get(request.trace_id)
if not trace:
raise HTTPException(status_code=404, detail="Trace not found")
# Convert and inspect
results = {
"trace_id": request.trace_id,
"inspections": []
}
steps = trace.get("steps", [])
for step_data in steps:
if request.step_name and step_data.get("step_name") != request.step_name:
continue
# Create StepTrace for inspection
step = StepTrace(
step_id=step_data.get("step_id", ""),
step_name=step_data.get("step_name", ""),
step_type=StepType(step_data.get("step_type", "transform")),
status=StepStatus(step_data.get("status", "success")),
input_data=step_data.get("input_data"),
output_data=step_data.get("output_data"),
error=step_data.get("error"),
duration_ms=step_data.get("duration_ms", 0)
)
inspection = inspector.inspect(step)
results["inspections"].append({
"step_name": inspection.step_name,
"issues": inspection.issues,
"suggestions": inspection.suggestions,
"data_summary": inspection.data_summary
})
return results
@app.post("/v1/analyze/failures")
async def analyze_failures(trace_ids: List[str]) -> dict:
"""Analyze failure patterns."""
traces = []
for trace_id in trace_ids:
trace = trace_store.get(trace_id)
if trace:
# Convert to ChainTrace (simplified)
chain_trace = ChainTrace(
trace_id=trace.get("trace_id", ""),
chain_name=trace.get("chain_name", ""),
status=StepStatus(trace.get("status", "success")),
error=trace.get("error")
)
traces.append(chain_trace)
patterns = failure_analyzer.analyze(traces)
return {
"total_traces": len(traces),
"failed_traces": sum(1 for t in traces if t.status == StepStatus.FAILED),
"patterns": [
{
"name": p.pattern_name,
"description": p.description,
"occurrences": p.occurrences,
"suggested_fix": p.suggested_fix
}
for p in patterns
]
}
@app.post("/v1/analyze/root-cause/{trace_id}")
async def analyze_root_cause(trace_id: str) -> dict:
"""Analyze root cause of failure."""
trace = trace_store.get(trace_id)
if not trace:
raise HTTPException(status_code=404, detail="Trace not found")
# Convert to ChainTrace
chain_trace = ChainTrace(
trace_id=trace.get("trace_id", ""),
chain_name=trace.get("chain_name", ""),
status=StepStatus(trace.get("status", "success")),
error=trace.get("error")
)
# Add steps
for step_data in trace.get("steps", []):
step = StepTrace(
step_id=step_data.get("step_id", ""),
step_name=step_data.get("step_name", ""),
step_type=StepType(step_data.get("step_type", "transform")),
status=StepStatus(step_data.get("status", "success")),
input_data=step_data.get("input_data"),
output_data=step_data.get("output_data"),
error=step_data.get("error"),
duration_ms=step_data.get("duration_ms", 0)
)
chain_trace.add_step(step)
analysis = root_cause_analyzer.analyze(chain_trace)
return analysis
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LangSmith: https://smith.langchain.com/
- Weights & Biases Prompts: https://docs.wandb.ai/guides/prompts
- Arize Phoenix: https://phoenix.arize.com/
- OpenTelemetry: https://opentelemetry.io/
- LangFuse: https://langfuse.com/
Conclusion
Debugging LLM chains requires a different mindset than traditional software debugging. The non-deterministic nature of LLMs means you can’t rely on exact reproduction—instead, you need comprehensive tracing that captures every step’s inputs, outputs, and timing. Build tracing into your chains from the start, not as an afterthought. Use structured traces with step types, status codes, and metadata that enable automated analysis. Inspection tools should understand the semantics of different step types—LLM calls have different failure modes than retrieval or parsing steps. Failure analysis across multiple traces reveals patterns that single-trace debugging misses: are parse failures concentrated in certain input types? Do timeouts correlate with input length? Replay capabilities let you reproduce issues with cached LLM responses, isolating whether problems are in the chain logic or the model’s behavior. For production systems, store traces in a searchable database and build dashboards that surface failure patterns automatically. The goal is to transform LLM chains from black boxes into observable systems where you can quickly identify, understand, and fix issues.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.