Latest Articles

LLM Chain Debugging: Tracing, Inspecting, and Fixing Multi-Step AI Workflows

Introduction: Debugging LLM chains is fundamentally different from debugging traditional software. When a chain fails, the problem could be in the prompt, the model’s interpretation, the output parsing, or any of the intermediate steps. The non-deterministic nature of LLMs means the same input can produce different outputs, making reproduction difficult. Effective chain debugging requires comprehensive tracing that captures every step’s inputs, outputs, and timing; inspection tools that let you examine intermediate states; and analysis capabilities that help identify patterns in failures. This guide covers practical techniques for debugging LLM chains: from basic logging and tracing to sophisticated replay systems and automated failure analysis. The goal is to transform the black box of LLM chains into observable, debuggable systems where you can quickly identify and fix issues.

LLM Chain Debugging
Chain Debugging: Trace, Inspect, Analyze

Tracing Infrastructure

from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict, Callable
from datetime import datetime
from enum import Enum
import uuid
import json
import time

class StepType(Enum):
    """Types of chain steps."""
    
    LLM_CALL = "llm_call"
    RETRIEVAL = "retrieval"
    TOOL_CALL = "tool_call"
    TRANSFORM = "transform"
    PARSE = "parse"
    VALIDATE = "validate"

class StepStatus(Enum):
    """Status of a step execution."""
    
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class StepTrace:
    """Trace of a single step execution."""
    
    step_id: str
    step_name: str
    step_type: StepType
    status: StepStatus
    input_data: Any
    output_data: Any = None
    error: str = None
    start_time: datetime = None
    end_time: datetime = None
    duration_ms: float = 0
    metadata: dict = field(default_factory=dict)
    
    def to_dict(self) -> dict:
        return {
            "step_id": self.step_id,
            "step_name": self.step_name,
            "step_type": self.step_type.value,
            "status": self.status.value,
            "input_data": self._serialize(self.input_data),
            "output_data": self._serialize(self.output_data),
            "error": self.error,
            "start_time": self.start_time.isoformat() if self.start_time else None,
            "end_time": self.end_time.isoformat() if self.end_time else None,
            "duration_ms": self.duration_ms,
            "metadata": self.metadata
        }
    
    def _serialize(self, data: Any) -> Any:
        try:
            json.dumps(data)
            return data
        except (TypeError, ValueError):
            return str(data)

@dataclass
class ChainTrace:
    """Complete trace of a chain execution."""
    
    trace_id: str
    chain_name: str
    steps: list[StepTrace] = field(default_factory=list)
    input_data: Any = None
    output_data: Any = None
    status: StepStatus = StepStatus.PENDING
    start_time: datetime = None
    end_time: datetime = None
    total_duration_ms: float = 0
    error: str = None
    metadata: dict = field(default_factory=dict)
    
    def add_step(self, step: StepTrace):
        self.steps.append(step)
    
    def get_failed_steps(self) -> list[StepTrace]:
        return [s for s in self.steps if s.status == StepStatus.FAILED]
    
    def get_step_by_name(self, name: str) -> Optional[StepTrace]:
        for step in self.steps:
            if step.step_name == name:
                return step
        return None
    
    def to_dict(self) -> dict:
        return {
            "trace_id": self.trace_id,
            "chain_name": self.chain_name,
            "steps": [s.to_dict() for s in self.steps],
            "input_data": self.input_data,
            "output_data": self.output_data,
            "status": self.status.value,
            "start_time": self.start_time.isoformat() if self.start_time else None,
            "end_time": self.end_time.isoformat() if self.end_time else None,
            "total_duration_ms": self.total_duration_ms,
            "error": self.error,
            "metadata": self.metadata
        }

class Tracer:
    """Trace chain executions."""
    
    def __init__(self):
        self.current_trace: Optional[ChainTrace] = None
        self.traces: list[ChainTrace] = []
        self.callbacks: list[Callable] = []
    
    def start_chain(self, chain_name: str, input_data: Any) -> ChainTrace:
        """Start tracing a chain."""
        
        self.current_trace = ChainTrace(
            trace_id=str(uuid.uuid4()),
            chain_name=chain_name,
            input_data=input_data,
            status=StepStatus.RUNNING,
            start_time=datetime.now()
        )
        
        return self.current_trace
    
    def start_step(
        self,
        step_name: str,
        step_type: StepType,
        input_data: Any
    ) -> StepTrace:
        """Start tracing a step."""
        
        step = StepTrace(
            step_id=str(uuid.uuid4()),
            step_name=step_name,
            step_type=step_type,
            status=StepStatus.RUNNING,
            input_data=input_data,
            start_time=datetime.now()
        )
        
        if self.current_trace:
            self.current_trace.add_step(step)
        
        return step
    
    def end_step(
        self,
        step: StepTrace,
        output_data: Any = None,
        error: str = None
    ):
        """End tracing a step."""
        
        step.end_time = datetime.now()
        step.duration_ms = (step.end_time - step.start_time).total_seconds() * 1000
        step.output_data = output_data
        step.error = error
        step.status = StepStatus.FAILED if error else StepStatus.SUCCESS
        
        # Notify callbacks
        for callback in self.callbacks:
            callback("step_end", step)
    
    def end_chain(
        self,
        output_data: Any = None,
        error: str = None
    ):
        """End tracing a chain."""
        
        if not self.current_trace:
            return
        
        self.current_trace.end_time = datetime.now()
        self.current_trace.total_duration_ms = (
            self.current_trace.end_time - self.current_trace.start_time
        ).total_seconds() * 1000
        self.current_trace.output_data = output_data
        self.current_trace.error = error
        self.current_trace.status = StepStatus.FAILED if error else StepStatus.SUCCESS
        
        self.traces.append(self.current_trace)
        
        # Notify callbacks
        for callback in self.callbacks:
            callback("chain_end", self.current_trace)
        
        self.current_trace = None
    
    def add_callback(self, callback: Callable):
        """Add trace callback."""
        
        self.callbacks.append(callback)

class TracingContext:
    """Context manager for tracing."""
    
    def __init__(
        self,
        tracer: Tracer,
        step_name: str,
        step_type: StepType,
        input_data: Any
    ):
        self.tracer = tracer
        self.step_name = step_name
        self.step_type = step_type
        self.input_data = input_data
        self.step: Optional[StepTrace] = None
    
    def __enter__(self) -> StepTrace:
        self.step = self.tracer.start_step(
            self.step_name,
            self.step_type,
            self.input_data
        )
        return self.step
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        error = str(exc_val) if exc_val else None
        self.tracer.end_step(self.step, error=error)
        return False

Step Inspection

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import difflib
import json

@dataclass
class InspectionResult:
    """Result of step inspection."""
    
    step_name: str
    issues: list[str]
    suggestions: list[str]
    data_summary: dict

class StepInspector:
    """Inspect chain steps for issues."""
    
    def __init__(self):
        self.inspectors: dict[StepType, Callable] = {}
        self._register_default_inspectors()
    
    def _register_default_inspectors(self):
        """Register default inspectors."""
        
        self.inspectors[StepType.LLM_CALL] = self._inspect_llm_call
        self.inspectors[StepType.RETRIEVAL] = self._inspect_retrieval
        self.inspectors[StepType.PARSE] = self._inspect_parse
        self.inspectors[StepType.TOOL_CALL] = self._inspect_tool_call
    
    def inspect(self, step: StepTrace) -> InspectionResult:
        """Inspect a step."""
        
        inspector = self.inspectors.get(step.step_type)
        
        if inspector:
            return inspector(step)
        
        return self._default_inspect(step)
    
    def _inspect_llm_call(self, step: StepTrace) -> InspectionResult:
        """Inspect LLM call step."""
        
        issues = []
        suggestions = []
        
        # Check input
        input_data = step.input_data
        if isinstance(input_data, dict):
            prompt = input_data.get("prompt", "")
            
            # Check prompt length
            if len(prompt) > 10000:
                issues.append("Prompt is very long (>10000 chars)")
                suggestions.append("Consider summarizing or chunking the input")
            
            # Check for empty prompt
            if not prompt.strip():
                issues.append("Empty prompt")
                suggestions.append("Ensure prompt is properly constructed")
        
        # Check output
        if step.status == StepStatus.SUCCESS:
            output = step.output_data
            
            if isinstance(output, str):
                # Check for truncation
                if output.endswith("...") or len(output) < 10:
                    issues.append("Output may be truncated or incomplete")
                    suggestions.append("Check max_tokens setting")
                
                # Check for refusal
                refusal_phrases = ["I cannot", "I'm unable", "I don't have"]
                for phrase in refusal_phrases:
                    if phrase.lower() in output.lower():
                        issues.append(f"Model may have refused: contains '{phrase}'")
                        suggestions.append("Review prompt for policy violations")
        
        # Check timing
        if step.duration_ms > 30000:
            issues.append(f"Slow response: {step.duration_ms:.0f}ms")
            suggestions.append("Consider using a faster model or reducing input size")
        
        return InspectionResult(
            step_name=step.step_name,
            issues=issues,
            suggestions=suggestions,
            data_summary={
                "input_length": len(str(step.input_data)),
                "output_length": len(str(step.output_data)) if step.output_data else 0,
                "duration_ms": step.duration_ms
            }
        )
    
    def _inspect_retrieval(self, step: StepTrace) -> InspectionResult:
        """Inspect retrieval step."""
        
        issues = []
        suggestions = []
        
        output = step.output_data
        
        if isinstance(output, list):
            # Check number of results
            if len(output) == 0:
                issues.append("No documents retrieved")
                suggestions.append("Check query or expand search parameters")
            elif len(output) < 3:
                issues.append(f"Few documents retrieved: {len(output)}")
                suggestions.append("Consider lowering similarity threshold")
            
            # Check relevance scores if available
            if output and isinstance(output[0], dict):
                scores = [d.get("score", 0) for d in output]
                if scores and max(scores) < 0.5:
                    issues.append(f"Low relevance scores: max={max(scores):.2f}")
                    suggestions.append("Query may not match document content well")
        
        return InspectionResult(
            step_name=step.step_name,
            issues=issues,
            suggestions=suggestions,
            data_summary={
                "num_results": len(output) if isinstance(output, list) else 0,
                "duration_ms": step.duration_ms
            }
        )
    
    def _inspect_parse(self, step: StepTrace) -> InspectionResult:
        """Inspect parse step."""
        
        issues = []
        suggestions = []
        
        if step.status == StepStatus.FAILED:
            issues.append(f"Parse failed: {step.error}")
            
            # Analyze input for common issues
            input_str = str(step.input_data)
            
            if "```" in input_str:
                suggestions.append("Input contains code blocks - try extracting content first")
            
            if input_str.startswith("I ") or input_str.startswith("Here"):
                suggestions.append("LLM added preamble - try stricter output instructions")
        
        return InspectionResult(
            step_name=step.step_name,
            issues=issues,
            suggestions=suggestions,
            data_summary={
                "input_length": len(str(step.input_data)),
                "parse_success": step.status == StepStatus.SUCCESS
            }
        )
    
    def _inspect_tool_call(self, step: StepTrace) -> InspectionResult:
        """Inspect tool call step."""
        
        issues = []
        suggestions = []
        
        if step.status == StepStatus.FAILED:
            error = step.error or ""
            
            if "timeout" in error.lower():
                issues.append("Tool call timed out")
                suggestions.append("Increase timeout or optimize tool")
            elif "not found" in error.lower():
                issues.append("Tool not found")
                suggestions.append("Check tool registration and naming")
            elif "invalid" in error.lower():
                issues.append("Invalid tool arguments")
                suggestions.append("Review argument schema and LLM output")
        
        return InspectionResult(
            step_name=step.step_name,
            issues=issues,
            suggestions=suggestions,
            data_summary={
                "tool_name": step.metadata.get("tool_name", "unknown"),
                "duration_ms": step.duration_ms
            }
        )
    
    def _default_inspect(self, step: StepTrace) -> InspectionResult:
        """Default inspection."""
        
        issues = []
        suggestions = []
        
        if step.status == StepStatus.FAILED:
            issues.append(f"Step failed: {step.error}")
        
        return InspectionResult(
            step_name=step.step_name,
            issues=issues,
            suggestions=suggestions,
            data_summary={
                "duration_ms": step.duration_ms,
                "status": step.status.value
            }
        )

class OutputComparator:
    """Compare outputs between runs."""
    
    def compare(
        self,
        expected: Any,
        actual: Any
    ) -> dict:
        """Compare expected vs actual output."""
        
        result = {
            "match": False,
            "similarity": 0.0,
            "differences": []
        }
        
        if expected == actual:
            result["match"] = True
            result["similarity"] = 1.0
            return result
        
        # String comparison
        if isinstance(expected, str) and isinstance(actual, str):
            result["similarity"] = self._string_similarity(expected, actual)
            result["differences"] = self._string_diff(expected, actual)
        
        # Dict comparison
        elif isinstance(expected, dict) and isinstance(actual, dict):
            result["similarity"] = self._dict_similarity(expected, actual)
            result["differences"] = self._dict_diff(expected, actual)
        
        # List comparison
        elif isinstance(expected, list) and isinstance(actual, list):
            result["similarity"] = self._list_similarity(expected, actual)
            result["differences"] = self._list_diff(expected, actual)
        
        return result
    
    def _string_similarity(self, s1: str, s2: str) -> float:
        """Calculate string similarity."""
        
        return difflib.SequenceMatcher(None, s1, s2).ratio()
    
    def _string_diff(self, s1: str, s2: str) -> list[str]:
        """Get string differences."""
        
        diff = difflib.unified_diff(
            s1.splitlines(),
            s2.splitlines(),
            lineterm=""
        )
        return list(diff)
    
    def _dict_similarity(self, d1: dict, d2: dict) -> float:
        """Calculate dict similarity."""
        
        all_keys = set(d1.keys()) | set(d2.keys())
        if not all_keys:
            return 1.0
        
        matching = sum(1 for k in all_keys if d1.get(k) == d2.get(k))
        return matching / len(all_keys)
    
    def _dict_diff(self, d1: dict, d2: dict) -> list[str]:
        """Get dict differences."""
        
        diffs = []
        all_keys = set(d1.keys()) | set(d2.keys())
        
        for key in all_keys:
            if key not in d1:
                diffs.append(f"Missing in expected: {key}")
            elif key not in d2:
                diffs.append(f"Missing in actual: {key}")
            elif d1[key] != d2[key]:
                diffs.append(f"Different value for {key}: {d1[key]} vs {d2[key]}")
        
        return diffs
    
    def _list_similarity(self, l1: list, l2: list) -> float:
        """Calculate list similarity."""
        
        if not l1 and not l2:
            return 1.0
        
        max_len = max(len(l1), len(l2))
        matching = sum(1 for a, b in zip(l1, l2) if a == b)
        
        return matching / max_len
    
    def _list_diff(self, l1: list, l2: list) -> list[str]:
        """Get list differences."""
        
        diffs = []
        
        for i, (a, b) in enumerate(zip(l1, l2)):
            if a != b:
                diffs.append(f"Index {i}: {a} vs {b}")
        
        if len(l1) != len(l2):
            diffs.append(f"Length difference: {len(l1)} vs {len(l2)}")
        
        return diffs

Failure Analysis

from dataclasses import dataclass
from typing import Any, Optional, List
from collections import Counter
import re

@dataclass
class FailurePattern:
    """A pattern of failures."""
    
    pattern_name: str
    description: str
    occurrences: int
    example_traces: list[str]
    suggested_fix: str

class FailureAnalyzer:
    """Analyze chain failures."""
    
    def __init__(self):
        self.failure_patterns: list[tuple[str, Callable]] = []
        self._register_default_patterns()
    
    def _register_default_patterns(self):
        """Register default failure patterns."""
        
        self.failure_patterns.append(("parse_failure", self._detect_parse_failure))
        self.failure_patterns.append(("timeout", self._detect_timeout))
        self.failure_patterns.append(("rate_limit", self._detect_rate_limit))
        self.failure_patterns.append(("context_overflow", self._detect_context_overflow))
        self.failure_patterns.append(("empty_response", self._detect_empty_response))
    
    def analyze(self, traces: list[ChainTrace]) -> list[FailurePattern]:
        """Analyze traces for failure patterns."""
        
        failed_traces = [t for t in traces if t.status == StepStatus.FAILED]
        
        if not failed_traces:
            return []
        
        patterns = []
        
        for pattern_name, detector in self.failure_patterns:
            matches = detector(failed_traces)
            if matches:
                patterns.append(matches)
        
        return patterns
    
    def _detect_parse_failure(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
        """Detect parse failures."""
        
        parse_failures = []
        
        for trace in traces:
            for step in trace.steps:
                if step.step_type == StepType.PARSE and step.status == StepStatus.FAILED:
                    parse_failures.append(trace.trace_id)
        
        if not parse_failures:
            return None
        
        return FailurePattern(
            pattern_name="parse_failure",
            description="LLM output could not be parsed into expected format",
            occurrences=len(parse_failures),
            example_traces=parse_failures[:5],
            suggested_fix="Add output format examples to prompt, use JSON mode, or implement more robust parsing"
        )
    
    def _detect_timeout(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
        """Detect timeout failures."""
        
        timeouts = []
        
        for trace in traces:
            if trace.error and "timeout" in trace.error.lower():
                timeouts.append(trace.trace_id)
            
            for step in trace.steps:
                if step.error and "timeout" in step.error.lower():
                    timeouts.append(trace.trace_id)
        
        if not timeouts:
            return None
        
        return FailurePattern(
            pattern_name="timeout",
            description="Chain or step timed out",
            occurrences=len(set(timeouts)),
            example_traces=list(set(timeouts))[:5],
            suggested_fix="Increase timeout, reduce input size, or use a faster model"
        )
    
    def _detect_rate_limit(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
        """Detect rate limit failures."""
        
        rate_limits = []
        
        for trace in traces:
            error = (trace.error or "").lower()
            if "rate" in error and "limit" in error:
                rate_limits.append(trace.trace_id)
            
            for step in trace.steps:
                error = (step.error or "").lower()
                if "rate" in error and "limit" in error:
                    rate_limits.append(trace.trace_id)
        
        if not rate_limits:
            return None
        
        return FailurePattern(
            pattern_name="rate_limit",
            description="API rate limit exceeded",
            occurrences=len(set(rate_limits)),
            example_traces=list(set(rate_limits))[:5],
            suggested_fix="Implement rate limiting, add retries with backoff, or increase API quota"
        )
    
    def _detect_context_overflow(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
        """Detect context length overflow."""
        
        overflows = []
        
        for trace in traces:
            error = (trace.error or "").lower()
            if "context" in error and ("length" in error or "token" in error):
                overflows.append(trace.trace_id)
            
            for step in trace.steps:
                error = (step.error or "").lower()
                if "context" in error and ("length" in error or "token" in error):
                    overflows.append(trace.trace_id)
        
        if not overflows:
            return None
        
        return FailurePattern(
            pattern_name="context_overflow",
            description="Input exceeded model's context window",
            occurrences=len(set(overflows)),
            example_traces=list(set(overflows))[:5],
            suggested_fix="Implement context compression, chunking, or use a model with larger context"
        )
    
    def _detect_empty_response(self, traces: list[ChainTrace]) -> Optional[FailurePattern]:
        """Detect empty response failures."""
        
        empty_responses = []
        
        for trace in traces:
            for step in trace.steps:
                if step.step_type == StepType.LLM_CALL:
                    output = step.output_data
                    if output is None or (isinstance(output, str) and not output.strip()):
                        empty_responses.append(trace.trace_id)
        
        if not empty_responses:
            return None
        
        return FailurePattern(
            pattern_name="empty_response",
            description="LLM returned empty or null response",
            occurrences=len(set(empty_responses)),
            example_traces=list(set(empty_responses))[:5],
            suggested_fix="Check for content filtering, add explicit output instructions, or verify API response handling"
        )

class RootCauseAnalyzer:
    """Analyze root cause of failures."""
    
    def analyze(self, trace: ChainTrace) -> dict:
        """Analyze root cause of a failed trace."""
        
        if trace.status != StepStatus.FAILED:
            return {"root_cause": None, "analysis": "Chain did not fail"}
        
        # Find first failed step
        failed_steps = trace.get_failed_steps()
        
        if not failed_steps:
            return {
                "root_cause": "chain_level",
                "analysis": f"Chain failed without step failure: {trace.error}"
            }
        
        first_failure = failed_steps[0]
        
        # Analyze the failure
        analysis = {
            "root_cause": first_failure.step_name,
            "step_type": first_failure.step_type.value,
            "error": first_failure.error,
            "input_summary": self._summarize_input(first_failure.input_data),
            "preceding_steps": self._get_preceding_steps(trace, first_failure),
            "suggestions": self._get_suggestions(first_failure)
        }
        
        return analysis
    
    def _summarize_input(self, input_data: Any) -> dict:
        """Summarize input data."""
        
        if isinstance(input_data, str):
            return {
                "type": "string",
                "length": len(input_data),
                "preview": input_data[:200] + "..." if len(input_data) > 200 else input_data
            }
        elif isinstance(input_data, dict):
            return {
                "type": "dict",
                "keys": list(input_data.keys()),
                "size": len(str(input_data))
            }
        elif isinstance(input_data, list):
            return {
                "type": "list",
                "length": len(input_data)
            }
        else:
            return {
                "type": type(input_data).__name__,
                "value": str(input_data)[:200]
            }
    
    def _get_preceding_steps(
        self,
        trace: ChainTrace,
        failed_step: StepTrace
    ) -> list[dict]:
        """Get steps that preceded the failure."""
        
        preceding = []
        
        for step in trace.steps:
            if step.step_id == failed_step.step_id:
                break
            
            preceding.append({
                "name": step.step_name,
                "status": step.status.value,
                "duration_ms": step.duration_ms
            })
        
        return preceding
    
    def _get_suggestions(self, step: StepTrace) -> list[str]:
        """Get suggestions for fixing the failure."""
        
        suggestions = []
        error = (step.error or "").lower()
        
        if "json" in error:
            suggestions.append("Add JSON output format instructions to prompt")
            suggestions.append("Use JSON mode if available")
        
        if "timeout" in error:
            suggestions.append("Increase timeout setting")
            suggestions.append("Reduce input size")
        
        if "rate" in error:
            suggestions.append("Add rate limiting")
            suggestions.append("Implement exponential backoff")
        
        if not suggestions:
            suggestions.append("Review step input and error message")
            suggestions.append("Add more detailed logging")
        
        return suggestions

Replay and Reproduction

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import json
import copy

@dataclass
class ReplayConfig:
    """Configuration for replay."""
    
    use_cached_llm: bool = True
    use_cached_retrieval: bool = True
    use_cached_tools: bool = False
    stop_on_divergence: bool = False

class TraceReplayer:
    """Replay chain executions from traces."""
    
    def __init__(self, chain: Any, config: ReplayConfig = None):
        self.chain = chain
        self.config = config or ReplayConfig()
        self.cached_responses: dict[str, Any] = {}
    
    def load_trace(self, trace: ChainTrace):
        """Load trace for replay."""
        
        # Cache responses from trace
        for step in trace.steps:
            if step.status == StepStatus.SUCCESS:
                cache_key = self._make_cache_key(step)
                self.cached_responses[cache_key] = step.output_data
    
    def replay(
        self,
        trace: ChainTrace,
        modifications: dict = None
    ) -> ChainTrace:
        """Replay a trace with optional modifications."""
        
        # Load cached responses
        self.load_trace(trace)
        
        # Apply modifications to input
        input_data = copy.deepcopy(trace.input_data)
        if modifications:
            input_data = self._apply_modifications(input_data, modifications)
        
        # Create new tracer
        tracer = Tracer()
        
        # Inject cache into chain
        original_llm = self.chain.llm
        if self.config.use_cached_llm:
            self.chain.llm = CachedLLM(original_llm, self.cached_responses)
        
        try:
            # Run chain
            new_trace = tracer.start_chain(trace.chain_name, input_data)
            
            result = self.chain.run(input_data, tracer=tracer)
            
            tracer.end_chain(output_data=result)
            
        except Exception as e:
            tracer.end_chain(error=str(e))
        
        finally:
            # Restore original LLM
            self.chain.llm = original_llm
        
        return tracer.current_trace or new_trace
    
    def _make_cache_key(self, step: StepTrace) -> str:
        """Create cache key for step."""
        
        input_str = json.dumps(step.input_data, sort_keys=True, default=str)
        return f"{step.step_name}:{hash(input_str)}"
    
    def _apply_modifications(self, data: Any, modifications: dict) -> Any:
        """Apply modifications to data."""
        
        if isinstance(data, dict):
            result = dict(data)
            for key, value in modifications.items():
                if "." in key:
                    # Nested key
                    parts = key.split(".")
                    current = result
                    for part in parts[:-1]:
                        current = current.setdefault(part, {})
                    current[parts[-1]] = value
                else:
                    result[key] = value
            return result
        
        return data

class CachedLLM:
    """LLM wrapper that uses cached responses."""
    
    def __init__(self, llm: Any, cache: dict):
        self.llm = llm
        self.cache = cache
    
    async def generate(self, prompt: str, **kwargs) -> str:
        """Generate with cache lookup."""
        
        cache_key = f"llm:{hash(prompt)}"
        
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        return await self.llm.generate(prompt, **kwargs)

class DebugSession:
    """Interactive debug session."""
    
    def __init__(self, trace: ChainTrace):
        self.trace = trace
        self.current_step_idx = 0
        self.breakpoints: set[str] = set()
        self.watches: dict[str, Callable] = {}
    
    def set_breakpoint(self, step_name: str):
        """Set breakpoint at step."""
        
        self.breakpoints.add(step_name)
    
    def remove_breakpoint(self, step_name: str):
        """Remove breakpoint."""
        
        self.breakpoints.discard(step_name)
    
    def add_watch(self, name: str, extractor: Callable):
        """Add watch expression."""
        
        self.watches[name] = extractor
    
    def step_forward(self) -> Optional[StepTrace]:
        """Move to next step."""
        
        if self.current_step_idx >= len(self.trace.steps):
            return None
        
        step = self.trace.steps[self.current_step_idx]
        self.current_step_idx += 1
        
        return step
    
    def step_back(self) -> Optional[StepTrace]:
        """Move to previous step."""
        
        if self.current_step_idx <= 0:
            return None
        
        self.current_step_idx -= 1
        return self.trace.steps[self.current_step_idx]
    
    def current_step(self) -> Optional[StepTrace]:
        """Get current step."""
        
        if 0 <= self.current_step_idx < len(self.trace.steps):
            return self.trace.steps[self.current_step_idx]
        return None
    
    def get_watches(self) -> dict[str, Any]:
        """Evaluate watch expressions."""
        
        step = self.current_step()
        if not step:
            return {}
        
        results = {}
        for name, extractor in self.watches.items():
            try:
                results[name] = extractor(step)
            except Exception as e:
                results[name] = f"Error: {e}"
        
        return results
    
    def run_to_breakpoint(self) -> Optional[StepTrace]:
        """Run until breakpoint hit."""
        
        while self.current_step_idx < len(self.trace.steps):
            step = self.trace.steps[self.current_step_idx]
            
            if step.step_name in self.breakpoints:
                return step
            
            self.current_step_idx += 1
        
        return None
    
    def get_state_at_step(self, step_idx: int) -> dict:
        """Get chain state at specific step."""
        
        state = {
            "step_idx": step_idx,
            "completed_steps": [],
            "current_output": None
        }
        
        for i, step in enumerate(self.trace.steps[:step_idx + 1]):
            state["completed_steps"].append({
                "name": step.step_name,
                "status": step.status.value
            })
            
            if step.status == StepStatus.SUCCESS:
                state["current_output"] = step.output_data
        
        return state

Production Debug Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any

app = FastAPI()

class TraceQuery(BaseModel):
    chain_name: Optional[str] = None
    status: Optional[str] = None
    start_date: Optional[str] = None
    end_date: Optional[str] = None
    limit: int = 100

class InspectRequest(BaseModel):
    trace_id: str
    step_name: Optional[str] = None

class ReplayRequest(BaseModel):
    trace_id: str
    modifications: Optional[Dict[str, Any]] = None

# Initialize components
tracer = Tracer()
inspector = StepInspector()
failure_analyzer = FailureAnalyzer()
root_cause_analyzer = RootCauseAnalyzer()

# Store traces (would use database in production)
trace_store: dict[str, ChainTrace] = {}

@app.post("/v1/traces")
async def store_trace(trace: dict) -> dict:
    """Store a trace."""
    
    trace_id = trace.get("trace_id")
    if not trace_id:
        raise HTTPException(status_code=400, detail="trace_id required")
    
    # Convert to ChainTrace (simplified)
    trace_store[trace_id] = trace
    
    return {"stored": True, "trace_id": trace_id}

@app.get("/v1/traces/{trace_id}")
async def get_trace(trace_id: str) -> dict:
    """Get a trace by ID."""
    
    trace = trace_store.get(trace_id)
    
    if not trace:
        raise HTTPException(status_code=404, detail="Trace not found")
    
    return trace

@app.post("/v1/traces/search")
async def search_traces(query: TraceQuery) -> list[dict]:
    """Search traces."""
    
    results = []
    
    for trace_id, trace in trace_store.items():
        # Apply filters
        if query.chain_name and trace.get("chain_name") != query.chain_name:
            continue
        
        if query.status and trace.get("status") != query.status:
            continue
        
        results.append(trace)
        
        if len(results) >= query.limit:
            break
    
    return results

@app.post("/v1/inspect")
async def inspect_trace(request: InspectRequest) -> dict:
    """Inspect a trace."""
    
    trace = trace_store.get(request.trace_id)
    
    if not trace:
        raise HTTPException(status_code=404, detail="Trace not found")
    
    # Convert and inspect
    results = {
        "trace_id": request.trace_id,
        "inspections": []
    }
    
    steps = trace.get("steps", [])
    
    for step_data in steps:
        if request.step_name and step_data.get("step_name") != request.step_name:
            continue
        
        # Create StepTrace for inspection
        step = StepTrace(
            step_id=step_data.get("step_id", ""),
            step_name=step_data.get("step_name", ""),
            step_type=StepType(step_data.get("step_type", "transform")),
            status=StepStatus(step_data.get("status", "success")),
            input_data=step_data.get("input_data"),
            output_data=step_data.get("output_data"),
            error=step_data.get("error"),
            duration_ms=step_data.get("duration_ms", 0)
        )
        
        inspection = inspector.inspect(step)
        
        results["inspections"].append({
            "step_name": inspection.step_name,
            "issues": inspection.issues,
            "suggestions": inspection.suggestions,
            "data_summary": inspection.data_summary
        })
    
    return results

@app.post("/v1/analyze/failures")
async def analyze_failures(trace_ids: List[str]) -> dict:
    """Analyze failure patterns."""
    
    traces = []
    
    for trace_id in trace_ids:
        trace = trace_store.get(trace_id)
        if trace:
            # Convert to ChainTrace (simplified)
            chain_trace = ChainTrace(
                trace_id=trace.get("trace_id", ""),
                chain_name=trace.get("chain_name", ""),
                status=StepStatus(trace.get("status", "success")),
                error=trace.get("error")
            )
            traces.append(chain_trace)
    
    patterns = failure_analyzer.analyze(traces)
    
    return {
        "total_traces": len(traces),
        "failed_traces": sum(1 for t in traces if t.status == StepStatus.FAILED),
        "patterns": [
            {
                "name": p.pattern_name,
                "description": p.description,
                "occurrences": p.occurrences,
                "suggested_fix": p.suggested_fix
            }
            for p in patterns
        ]
    }

@app.post("/v1/analyze/root-cause/{trace_id}")
async def analyze_root_cause(trace_id: str) -> dict:
    """Analyze root cause of failure."""
    
    trace = trace_store.get(trace_id)
    
    if not trace:
        raise HTTPException(status_code=404, detail="Trace not found")
    
    # Convert to ChainTrace
    chain_trace = ChainTrace(
        trace_id=trace.get("trace_id", ""),
        chain_name=trace.get("chain_name", ""),
        status=StepStatus(trace.get("status", "success")),
        error=trace.get("error")
    )
    
    # Add steps
    for step_data in trace.get("steps", []):
        step = StepTrace(
            step_id=step_data.get("step_id", ""),
            step_name=step_data.get("step_name", ""),
            step_type=StepType(step_data.get("step_type", "transform")),
            status=StepStatus(step_data.get("status", "success")),
            input_data=step_data.get("input_data"),
            output_data=step_data.get("output_data"),
            error=step_data.get("error"),
            duration_ms=step_data.get("duration_ms", 0)
        )
        chain_trace.add_step(step)
    
    analysis = root_cause_analyzer.analyze(chain_trace)
    
    return analysis

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Debugging LLM chains requires a different mindset than traditional software debugging. The non-deterministic nature of LLMs means you can’t rely on exact reproduction—instead, you need comprehensive tracing that captures every step’s inputs, outputs, and timing. Build tracing into your chains from the start, not as an afterthought. Use structured traces with step types, status codes, and metadata that enable automated analysis. Inspection tools should understand the semantics of different step types—LLM calls have different failure modes than retrieval or parsing steps. Failure analysis across multiple traces reveals patterns that single-trace debugging misses: are parse failures concentrated in certain input types? Do timeouts correlate with input length? Replay capabilities let you reproduce issues with cached LLM responses, isolating whether problems are in the chain logic or the model’s behavior. For production systems, store traces in a searchable database and build dashboards that surface failure patterns automatically. The goal is to transform LLM chains from black boxes into observable systems where you can quickly identify, understand, and fix issues.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion