Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

LLM Error Handling: Building Resilient AI Applications

Introduction: LLM APIs fail. Rate limits get hit, servers time out, responses get truncated, and models occasionally return garbage. Production applications need robust error handling that gracefully recovers from failures without losing user context or corrupting state. This guide covers practical error handling strategies: detecting and classifying different error types, implementing retry logic with exponential backoff, building fallback chains that try alternative models, and creating resilient systems that maintain quality even when things go wrong.

Error Handling
Error Handling: Error Detection, Error Classification, Recovery Strategy

Error Classification

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class ErrorCategory(Enum):
    """Categories of LLM errors."""
    
    RATE_LIMIT = "rate_limit"
    TIMEOUT = "timeout"
    SERVER_ERROR = "server_error"
    INVALID_REQUEST = "invalid_request"
    CONTENT_FILTER = "content_filter"
    CONTEXT_LENGTH = "context_length"
    AUTHENTICATION = "authentication"
    NETWORK = "network"
    UNKNOWN = "unknown"

@dataclass
class ClassifiedError:
    """A classified LLM error."""
    
    category: ErrorCategory
    message: str
    retryable: bool
    retry_after: float = None
    original_exception: Exception = None

class ErrorClassifier:
    """Classify LLM errors by type."""
    
    # Error patterns for different providers
    PATTERNS = {
        ErrorCategory.RATE_LIMIT: [
            "rate limit",
            "rate_limit",
            "too many requests",
            "429",
            "quota exceeded"
        ],
        ErrorCategory.TIMEOUT: [
            "timeout",
            "timed out",
            "deadline exceeded",
            "request timeout"
        ],
        ErrorCategory.SERVER_ERROR: [
            "500",
            "502",
            "503",
            "504",
            "internal server error",
            "service unavailable",
            "bad gateway"
        ],
        ErrorCategory.INVALID_REQUEST: [
            "400",
            "invalid request",
            "bad request",
            "validation error",
            "invalid_request_error"
        ],
        ErrorCategory.CONTENT_FILTER: [
            "content filter",
            "content_filter",
            "content policy",
            "safety",
            "moderation"
        ],
        ErrorCategory.CONTEXT_LENGTH: [
            "context length",
            "context_length",
            "maximum context",
            "token limit",
            "too long"
        ],
        ErrorCategory.AUTHENTICATION: [
            "401",
            "403",
            "unauthorized",
            "forbidden",
            "invalid api key",
            "authentication"
        ],
        ErrorCategory.NETWORK: [
            "connection",
            "network",
            "dns",
            "ssl",
            "certificate"
        ]
    }
    
    RETRYABLE = {
        ErrorCategory.RATE_LIMIT,
        ErrorCategory.TIMEOUT,
        ErrorCategory.SERVER_ERROR,
        ErrorCategory.NETWORK
    }
    
    def classify(self, error: Exception) -> ClassifiedError:
        """Classify an error."""
        
        error_str = str(error).lower()
        error_type = type(error).__name__.lower()
        
        # Check patterns
        for category, patterns in self.PATTERNS.items():
            for pattern in patterns:
                if pattern in error_str or pattern in error_type:
                    return ClassifiedError(
                        category=category,
                        message=str(error),
                        retryable=category in self.RETRYABLE,
                        retry_after=self._extract_retry_after(error),
                        original_exception=error
                    )
        
        return ClassifiedError(
            category=ErrorCategory.UNKNOWN,
            message=str(error),
            retryable=False,
            original_exception=error
        )
    
    def _extract_retry_after(self, error: Exception) -> Optional[float]:
        """Extract retry-after from error if available."""
        
        error_str = str(error)
        
        # Try to find retry-after value
        import re
        
        patterns = [
            r"retry.?after[:\s]+(\d+\.?\d*)",
            r"wait[:\s]+(\d+\.?\d*)",
            r"(\d+\.?\d*)\s*seconds?"
        ]
        
        for pattern in patterns:
            match = re.search(pattern, error_str.lower())
            if match:
                return float(match.group(1))
        
        return None

class OpenAIErrorClassifier(ErrorClassifier):
    """Classifier specialized for OpenAI errors."""
    
    def classify(self, error: Exception) -> ClassifiedError:
        """Classify OpenAI-specific errors."""
        
        # Check for OpenAI-specific error types
        error_type = type(error).__name__
        
        if "RateLimitError" in error_type:
            return ClassifiedError(
                category=ErrorCategory.RATE_LIMIT,
                message=str(error),
                retryable=True,
                retry_after=self._extract_retry_after(error),
                original_exception=error
            )
        
        if "APITimeoutError" in error_type:
            return ClassifiedError(
                category=ErrorCategory.TIMEOUT,
                message=str(error),
                retryable=True,
                original_exception=error
            )
        
        if "APIConnectionError" in error_type:
            return ClassifiedError(
                category=ErrorCategory.NETWORK,
                message=str(error),
                retryable=True,
                original_exception=error
            )
        
        if "AuthenticationError" in error_type:
            return ClassifiedError(
                category=ErrorCategory.AUTHENTICATION,
                message=str(error),
                retryable=False,
                original_exception=error
            )
        
        if "BadRequestError" in error_type:
            # Check for context length
            if "context" in str(error).lower() or "token" in str(error).lower():
                return ClassifiedError(
                    category=ErrorCategory.CONTEXT_LENGTH,
                    message=str(error),
                    retryable=False,
                    original_exception=error
                )
            
            return ClassifiedError(
                category=ErrorCategory.INVALID_REQUEST,
                message=str(error),
                retryable=False,
                original_exception=error
            )
        
        # Fall back to pattern matching
        return super().classify(error)

Retry Strategies

from dataclasses import dataclass
from typing import Any, Optional, Callable
import asyncio
import random
import time

@dataclass
class RetryConfig:
    """Configuration for retry behavior."""
    
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    jitter_factor: float = 0.1

class RetryStrategy:
    """Base retry strategy."""
    
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
    
    def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
        """Calculate delay before next retry."""
        raise NotImplementedError

class ExponentialBackoff(RetryStrategy):
    """Exponential backoff with optional jitter."""
    
    def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
        """Calculate exponential backoff delay."""
        
        # Use retry-after if provided
        if error and error.retry_after:
            return error.retry_after
        
        # Calculate exponential delay
        delay = self.config.base_delay * (
            self.config.exponential_base ** attempt
        )
        
        # Cap at max delay
        delay = min(delay, self.config.max_delay)
        
        # Add jitter
        if self.config.jitter:
            jitter = delay * self.config.jitter_factor * random.random()
            delay += jitter
        
        return delay

class LinearBackoff(RetryStrategy):
    """Linear backoff strategy."""
    
    def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
        """Calculate linear backoff delay."""
        
        if error and error.retry_after:
            return error.retry_after
        
        delay = self.config.base_delay * (attempt + 1)
        return min(delay, self.config.max_delay)

class ConstantBackoff(RetryStrategy):
    """Constant delay between retries."""
    
    def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
        """Return constant delay."""
        
        if error and error.retry_after:
            return error.retry_after
        
        return self.config.base_delay

class AdaptiveBackoff(RetryStrategy):
    """Adaptive backoff based on error type."""
    
    def __init__(self, config: RetryConfig = None):
        super().__init__(config)
        self._recent_errors: list[float] = []
        self._window_size = 60  # seconds
    
    def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
        """Calculate adaptive delay based on recent errors."""
        
        if error and error.retry_after:
            return error.retry_after
        
        # Track error
        now = time.time()
        self._recent_errors.append(now)
        
        # Clean old errors
        self._recent_errors = [
            t for t in self._recent_errors
            if now - t < self._window_size
        ]
        
        # Increase delay based on error frequency
        error_rate = len(self._recent_errors) / self._window_size
        
        base_delay = self.config.base_delay * (
            self.config.exponential_base ** attempt
        )
        
        # Scale by error rate
        if error_rate > 0.5:
            base_delay *= 2
        elif error_rate > 0.2:
            base_delay *= 1.5
        
        return min(base_delay, self.config.max_delay)

class RetryExecutor:
    """Execute operations with retry logic."""
    
    def __init__(
        self,
        strategy: RetryStrategy = None,
        classifier: ErrorClassifier = None,
        on_retry: Callable = None
    ):
        self.strategy = strategy or ExponentialBackoff()
        self.classifier = classifier or ErrorClassifier()
        self.on_retry = on_retry
    
    async def execute(
        self,
        operation: Callable,
        *args,
        **kwargs
    ) -> Any:
        """Execute operation with retries."""
        
        last_error = None
        
        for attempt in range(self.strategy.config.max_retries + 1):
            try:
                return await operation(*args, **kwargs)
                
            except Exception as e:
                classified = self.classifier.classify(e)
                last_error = classified
                
                # Don't retry non-retryable errors
                if not classified.retryable:
                    raise e
                
                # Don't retry if out of attempts
                if attempt >= self.strategy.config.max_retries:
                    raise e
                
                # Calculate delay
                delay = self.strategy.get_delay(attempt, classified)
                
                # Callback
                if self.on_retry:
                    self.on_retry(attempt, classified, delay)
                
                # Wait
                await asyncio.sleep(delay)
        
        raise last_error.original_exception if last_error else Exception("Retry failed")

Fallback Chains

from dataclasses import dataclass
from typing import Any, Optional, Callable

@dataclass
class FallbackResult:
    """Result from fallback chain."""
    
    success: bool
    result: Any = None
    provider_used: str = None
    attempts: int = 0
    errors: list[str] = None

class FallbackProvider:
    """A provider in the fallback chain."""
    
    def __init__(
        self,
        name: str,
        client: Any,
        model: str,
        priority: int = 0
    ):
        self.name = name
        self.client = client
        self.model = model
        self.priority = priority
    
    async def complete(self, messages: list[dict], **kwargs) -> str:
        """Complete using this provider."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            **kwargs
        )
        return response.choices[0].message.content

class FallbackChain:
    """Chain of fallback providers."""
    
    def __init__(
        self,
        providers: list[FallbackProvider],
        classifier: ErrorClassifier = None,
        retry_config: RetryConfig = None
    ):
        # Sort by priority
        self.providers = sorted(providers, key=lambda p: p.priority)
        self.classifier = classifier or ErrorClassifier()
        self.retry_config = retry_config or RetryConfig(max_retries=1)
    
    async def complete(
        self,
        messages: list[dict],
        **kwargs
    ) -> FallbackResult:
        """Try providers in order until one succeeds."""
        
        errors = []
        attempts = 0
        
        for provider in self.providers:
            retry_executor = RetryExecutor(
                strategy=ExponentialBackoff(self.retry_config),
                classifier=self.classifier
            )
            
            try:
                attempts += 1
                result = await retry_executor.execute(
                    provider.complete,
                    messages,
                    **kwargs
                )
                
                return FallbackResult(
                    success=True,
                    result=result,
                    provider_used=provider.name,
                    attempts=attempts,
                    errors=errors
                )
                
            except Exception as e:
                errors.append(f"{provider.name}: {str(e)}")
                continue
        
        return FallbackResult(
            success=False,
            attempts=attempts,
            errors=errors
        )

class SmartFallbackChain:
    """Fallback chain with health tracking."""
    
    def __init__(self, providers: list[FallbackProvider]):
        self.providers = providers
        self.classifier = ErrorClassifier()
        
        # Track provider health
        self._success_counts: dict[str, int] = {p.name: 0 for p in providers}
        self._failure_counts: dict[str, int] = {p.name: 0 for p in providers}
        self._last_failure: dict[str, float] = {}
        self._cooldown_seconds = 60
    
    def _get_ordered_providers(self) -> list[FallbackProvider]:
        """Get providers ordered by health."""
        
        now = time.time()
        
        def health_score(provider: FallbackProvider) -> float:
            name = provider.name
            
            # Check cooldown
            if name in self._last_failure:
                if now - self._last_failure[name] < self._cooldown_seconds:
                    return -1  # In cooldown
            
            total = self._success_counts[name] + self._failure_counts[name]
            if total == 0:
                return 0.5  # Unknown
            
            return self._success_counts[name] / total
        
        return sorted(
            self.providers,
            key=lambda p: (health_score(p), -p.priority),
            reverse=True
        )
    
    async def complete(
        self,
        messages: list[dict],
        **kwargs
    ) -> FallbackResult:
        """Complete with health-aware fallback."""
        
        errors = []
        attempts = 0
        
        for provider in self._get_ordered_providers():
            try:
                attempts += 1
                result = await provider.complete(messages, **kwargs)
                
                # Track success
                self._success_counts[provider.name] += 1
                
                return FallbackResult(
                    success=True,
                    result=result,
                    provider_used=provider.name,
                    attempts=attempts,
                    errors=errors
                )
                
            except Exception as e:
                # Track failure
                self._failure_counts[provider.name] += 1
                self._last_failure[provider.name] = time.time()
                
                errors.append(f"{provider.name}: {str(e)}")
                continue
        
        return FallbackResult(
            success=False,
            attempts=attempts,
            errors=errors
        )
    
    def get_health_stats(self) -> dict:
        """Get health statistics for all providers."""
        
        stats = {}
        for provider in self.providers:
            name = provider.name
            total = self._success_counts[name] + self._failure_counts[name]
            
            stats[name] = {
                "success_count": self._success_counts[name],
                "failure_count": self._failure_counts[name],
                "success_rate": self._success_counts[name] / total if total > 0 else None,
                "in_cooldown": name in self._last_failure and 
                    time.time() - self._last_failure[name] < self._cooldown_seconds
            }
        
        return stats

Circuit Breaker

from dataclasses import dataclass
from typing import Any, Optional, Callable
from enum import Enum
import asyncio
import time

class CircuitState(Enum):
    """Circuit breaker states."""
    
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

@dataclass
class CircuitConfig:
    """Circuit breaker configuration."""
    
    failure_threshold: int = 5
    success_threshold: int = 2
    timeout_seconds: float = 30.0
    half_open_max_calls: int = 3

class CircuitBreaker:
    """Circuit breaker for LLM calls."""
    
    def __init__(self, config: CircuitConfig = None):
        self.config = config or CircuitConfig()
        
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: float = None
        self._half_open_calls = 0
        self._lock = asyncio.Lock()
    
    @property
    def state(self) -> CircuitState:
        """Get current state, checking for timeout."""
        
        if self._state == CircuitState.OPEN:
            if self._last_failure_time:
                elapsed = time.time() - self._last_failure_time
                if elapsed >= self.config.timeout_seconds:
                    return CircuitState.HALF_OPEN
        
        return self._state
    
    async def call(self, operation: Callable, *args, **kwargs) -> Any:
        """Execute operation through circuit breaker."""
        
        async with self._lock:
            state = self.state
            
            if state == CircuitState.OPEN:
                raise CircuitOpenError("Circuit is open")
            
            if state == CircuitState.HALF_OPEN:
                if self._half_open_calls >= self.config.half_open_max_calls:
                    raise CircuitOpenError("Circuit is half-open, max calls reached")
                self._half_open_calls += 1
        
        try:
            result = await operation(*args, **kwargs)
            await self._on_success()
            return result
            
        except Exception as e:
            await self._on_failure()
            raise
    
    async def _on_success(self):
        """Handle successful call."""
        
        async with self._lock:
            if self._state == CircuitState.HALF_OPEN:
                self._success_count += 1
                
                if self._success_count >= self.config.success_threshold:
                    self._state = CircuitState.CLOSED
                    self._reset_counts()
            
            elif self._state == CircuitState.CLOSED:
                self._failure_count = 0
    
    async def _on_failure(self):
        """Handle failed call."""
        
        async with self._lock:
            self._failure_count += 1
            self._last_failure_time = time.time()
            
            if self._state == CircuitState.HALF_OPEN:
                self._state = CircuitState.OPEN
                self._reset_counts()
            
            elif self._state == CircuitState.CLOSED:
                if self._failure_count >= self.config.failure_threshold:
                    self._state = CircuitState.OPEN
                    self._reset_counts()
    
    def _reset_counts(self):
        """Reset counters."""
        self._failure_count = 0
        self._success_count = 0
        self._half_open_calls = 0

class CircuitOpenError(Exception):
    """Raised when circuit is open."""
    pass

class ResilientLLMClient:
    """LLM client with circuit breaker and fallback."""
    
    def __init__(
        self,
        primary_client: Any,
        primary_model: str,
        fallback_chain: FallbackChain = None,
        circuit_config: CircuitConfig = None,
        retry_config: RetryConfig = None
    ):
        self.primary_client = primary_client
        self.primary_model = primary_model
        self.fallback_chain = fallback_chain
        
        self.circuit = CircuitBreaker(circuit_config)
        self.retry_executor = RetryExecutor(
            strategy=ExponentialBackoff(retry_config or RetryConfig())
        )
    
    async def complete(
        self,
        messages: list[dict],
        **kwargs
    ) -> str:
        """Complete with full resilience."""
        
        # Try primary with circuit breaker
        try:
            return await self.circuit.call(
                self._primary_complete,
                messages,
                **kwargs
            )
        except CircuitOpenError:
            pass
        except Exception:
            pass
        
        # Fall back to chain
        if self.fallback_chain:
            result = await self.fallback_chain.complete(messages, **kwargs)
            
            if result.success:
                return result.result
            
            raise Exception(f"All providers failed: {result.errors}")
        
        raise Exception("Primary failed and no fallback available")
    
    async def _primary_complete(
        self,
        messages: list[dict],
        **kwargs
    ) -> str:
        """Complete using primary provider with retries."""
        
        async def operation():
            response = await self.primary_client.chat.completions.create(
                model=self.primary_model,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        
        return await self.retry_executor.execute(operation)

Production Error Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
resilient_client = None  # Initialize with clients

class CompleteRequest(BaseModel):
    messages: list[dict]
    model: Optional[str] = None
    max_tokens: int = 1000

@app.post("/v1/complete")
async def complete_resilient(request: CompleteRequest):
    """Complete with full error handling."""
    
    try:
        result = await resilient_client.complete(
            messages=request.messages,
            max_tokens=request.max_tokens
        )
        
        return {
            "content": result,
            "success": True
        }
        
    except Exception as e:
        return {
            "content": None,
            "success": False,
            "error": str(e)
        }

@app.get("/v1/health/circuit")
async def circuit_health():
    """Get circuit breaker status."""
    
    return {
        "state": resilient_client.circuit.state.value,
        "failure_count": resilient_client.circuit._failure_count,
        "success_count": resilient_client.circuit._success_count
    }

@app.get("/v1/health/providers")
async def provider_health():
    """Get provider health stats."""
    
    if resilient_client.fallback_chain:
        if hasattr(resilient_client.fallback_chain, 'get_health_stats'):
            return resilient_client.fallback_chain.get_health_stats()
    
    return {"message": "No health stats available"}

@app.post("/v1/circuit/reset")
async def reset_circuit():
    """Manually reset circuit breaker."""
    
    resilient_client.circuit._state = CircuitState.CLOSED
    resilient_client.circuit._reset_counts()
    
    return {"reset": True}

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Robust error handling is essential for production LLM applications. Start by classifying errors—rate limits and timeouts are retryable, authentication errors are not. Implement exponential backoff with jitter to avoid thundering herd problems when services recover. Use fallback chains to try alternative providers when your primary fails. Add circuit breakers to prevent cascading failures and give struggling services time to recover. Track provider health to route requests intelligently. The key insight is that LLM APIs are external dependencies you don't control—design your system to gracefully handle their failures. Build resilience in layers: retries for transient errors, fallbacks for persistent failures, and circuit breakers to protect your system from repeated failures. Monitor error rates and circuit states to catch problems early and understand your system's reliability characteristics.