Introduction: LLM APIs fail. Rate limits get hit, servers time out, responses get truncated, and models occasionally return garbage. Production applications need robust error handling that gracefully recovers from failures without losing user context or corrupting state. This guide covers practical error handling strategies: detecting and classifying different error types, implementing retry logic with exponential backoff, building fallback chains that try alternative models, and creating resilient systems that maintain quality even when things go wrong.

Error Classification
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class ErrorCategory(Enum):
"""Categories of LLM errors."""
RATE_LIMIT = "rate_limit"
TIMEOUT = "timeout"
SERVER_ERROR = "server_error"
INVALID_REQUEST = "invalid_request"
CONTENT_FILTER = "content_filter"
CONTEXT_LENGTH = "context_length"
AUTHENTICATION = "authentication"
NETWORK = "network"
UNKNOWN = "unknown"
@dataclass
class ClassifiedError:
"""A classified LLM error."""
category: ErrorCategory
message: str
retryable: bool
retry_after: float = None
original_exception: Exception = None
class ErrorClassifier:
"""Classify LLM errors by type."""
# Error patterns for different providers
PATTERNS = {
ErrorCategory.RATE_LIMIT: [
"rate limit",
"rate_limit",
"too many requests",
"429",
"quota exceeded"
],
ErrorCategory.TIMEOUT: [
"timeout",
"timed out",
"deadline exceeded",
"request timeout"
],
ErrorCategory.SERVER_ERROR: [
"500",
"502",
"503",
"504",
"internal server error",
"service unavailable",
"bad gateway"
],
ErrorCategory.INVALID_REQUEST: [
"400",
"invalid request",
"bad request",
"validation error",
"invalid_request_error"
],
ErrorCategory.CONTENT_FILTER: [
"content filter",
"content_filter",
"content policy",
"safety",
"moderation"
],
ErrorCategory.CONTEXT_LENGTH: [
"context length",
"context_length",
"maximum context",
"token limit",
"too long"
],
ErrorCategory.AUTHENTICATION: [
"401",
"403",
"unauthorized",
"forbidden",
"invalid api key",
"authentication"
],
ErrorCategory.NETWORK: [
"connection",
"network",
"dns",
"ssl",
"certificate"
]
}
RETRYABLE = {
ErrorCategory.RATE_LIMIT,
ErrorCategory.TIMEOUT,
ErrorCategory.SERVER_ERROR,
ErrorCategory.NETWORK
}
def classify(self, error: Exception) -> ClassifiedError:
"""Classify an error."""
error_str = str(error).lower()
error_type = type(error).__name__.lower()
# Check patterns
for category, patterns in self.PATTERNS.items():
for pattern in patterns:
if pattern in error_str or pattern in error_type:
return ClassifiedError(
category=category,
message=str(error),
retryable=category in self.RETRYABLE,
retry_after=self._extract_retry_after(error),
original_exception=error
)
return ClassifiedError(
category=ErrorCategory.UNKNOWN,
message=str(error),
retryable=False,
original_exception=error
)
def _extract_retry_after(self, error: Exception) -> Optional[float]:
"""Extract retry-after from error if available."""
error_str = str(error)
# Try to find retry-after value
import re
patterns = [
r"retry.?after[:\s]+(\d+\.?\d*)",
r"wait[:\s]+(\d+\.?\d*)",
r"(\d+\.?\d*)\s*seconds?"
]
for pattern in patterns:
match = re.search(pattern, error_str.lower())
if match:
return float(match.group(1))
return None
class OpenAIErrorClassifier(ErrorClassifier):
"""Classifier specialized for OpenAI errors."""
def classify(self, error: Exception) -> ClassifiedError:
"""Classify OpenAI-specific errors."""
# Check for OpenAI-specific error types
error_type = type(error).__name__
if "RateLimitError" in error_type:
return ClassifiedError(
category=ErrorCategory.RATE_LIMIT,
message=str(error),
retryable=True,
retry_after=self._extract_retry_after(error),
original_exception=error
)
if "APITimeoutError" in error_type:
return ClassifiedError(
category=ErrorCategory.TIMEOUT,
message=str(error),
retryable=True,
original_exception=error
)
if "APIConnectionError" in error_type:
return ClassifiedError(
category=ErrorCategory.NETWORK,
message=str(error),
retryable=True,
original_exception=error
)
if "AuthenticationError" in error_type:
return ClassifiedError(
category=ErrorCategory.AUTHENTICATION,
message=str(error),
retryable=False,
original_exception=error
)
if "BadRequestError" in error_type:
# Check for context length
if "context" in str(error).lower() or "token" in str(error).lower():
return ClassifiedError(
category=ErrorCategory.CONTEXT_LENGTH,
message=str(error),
retryable=False,
original_exception=error
)
return ClassifiedError(
category=ErrorCategory.INVALID_REQUEST,
message=str(error),
retryable=False,
original_exception=error
)
# Fall back to pattern matching
return super().classify(error)
Retry Strategies
from dataclasses import dataclass
from typing import Any, Optional, Callable
import asyncio
import random
import time
@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
jitter_factor: float = 0.1
class RetryStrategy:
"""Base retry strategy."""
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
"""Calculate delay before next retry."""
raise NotImplementedError
class ExponentialBackoff(RetryStrategy):
"""Exponential backoff with optional jitter."""
def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
"""Calculate exponential backoff delay."""
# Use retry-after if provided
if error and error.retry_after:
return error.retry_after
# Calculate exponential delay
delay = self.config.base_delay * (
self.config.exponential_base ** attempt
)
# Cap at max delay
delay = min(delay, self.config.max_delay)
# Add jitter
if self.config.jitter:
jitter = delay * self.config.jitter_factor * random.random()
delay += jitter
return delay
class LinearBackoff(RetryStrategy):
"""Linear backoff strategy."""
def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
"""Calculate linear backoff delay."""
if error and error.retry_after:
return error.retry_after
delay = self.config.base_delay * (attempt + 1)
return min(delay, self.config.max_delay)
class ConstantBackoff(RetryStrategy):
"""Constant delay between retries."""
def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
"""Return constant delay."""
if error and error.retry_after:
return error.retry_after
return self.config.base_delay
class AdaptiveBackoff(RetryStrategy):
"""Adaptive backoff based on error type."""
def __init__(self, config: RetryConfig = None):
super().__init__(config)
self._recent_errors: list[float] = []
self._window_size = 60 # seconds
def get_delay(self, attempt: int, error: ClassifiedError = None) -> float:
"""Calculate adaptive delay based on recent errors."""
if error and error.retry_after:
return error.retry_after
# Track error
now = time.time()
self._recent_errors.append(now)
# Clean old errors
self._recent_errors = [
t for t in self._recent_errors
if now - t < self._window_size
]
# Increase delay based on error frequency
error_rate = len(self._recent_errors) / self._window_size
base_delay = self.config.base_delay * (
self.config.exponential_base ** attempt
)
# Scale by error rate
if error_rate > 0.5:
base_delay *= 2
elif error_rate > 0.2:
base_delay *= 1.5
return min(base_delay, self.config.max_delay)
class RetryExecutor:
"""Execute operations with retry logic."""
def __init__(
self,
strategy: RetryStrategy = None,
classifier: ErrorClassifier = None,
on_retry: Callable = None
):
self.strategy = strategy or ExponentialBackoff()
self.classifier = classifier or ErrorClassifier()
self.on_retry = on_retry
async def execute(
self,
operation: Callable,
*args,
**kwargs
) -> Any:
"""Execute operation with retries."""
last_error = None
for attempt in range(self.strategy.config.max_retries + 1):
try:
return await operation(*args, **kwargs)
except Exception as e:
classified = self.classifier.classify(e)
last_error = classified
# Don't retry non-retryable errors
if not classified.retryable:
raise e
# Don't retry if out of attempts
if attempt >= self.strategy.config.max_retries:
raise e
# Calculate delay
delay = self.strategy.get_delay(attempt, classified)
# Callback
if self.on_retry:
self.on_retry(attempt, classified, delay)
# Wait
await asyncio.sleep(delay)
raise last_error.original_exception if last_error else Exception("Retry failed")
Fallback Chains
from dataclasses import dataclass
from typing import Any, Optional, Callable
@dataclass
class FallbackResult:
"""Result from fallback chain."""
success: bool
result: Any = None
provider_used: str = None
attempts: int = 0
errors: list[str] = None
class FallbackProvider:
"""A provider in the fallback chain."""
def __init__(
self,
name: str,
client: Any,
model: str,
priority: int = 0
):
self.name = name
self.client = client
self.model = model
self.priority = priority
async def complete(self, messages: list[dict], **kwargs) -> str:
"""Complete using this provider."""
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
**kwargs
)
return response.choices[0].message.content
class FallbackChain:
"""Chain of fallback providers."""
def __init__(
self,
providers: list[FallbackProvider],
classifier: ErrorClassifier = None,
retry_config: RetryConfig = None
):
# Sort by priority
self.providers = sorted(providers, key=lambda p: p.priority)
self.classifier = classifier or ErrorClassifier()
self.retry_config = retry_config or RetryConfig(max_retries=1)
async def complete(
self,
messages: list[dict],
**kwargs
) -> FallbackResult:
"""Try providers in order until one succeeds."""
errors = []
attempts = 0
for provider in self.providers:
retry_executor = RetryExecutor(
strategy=ExponentialBackoff(self.retry_config),
classifier=self.classifier
)
try:
attempts += 1
result = await retry_executor.execute(
provider.complete,
messages,
**kwargs
)
return FallbackResult(
success=True,
result=result,
provider_used=provider.name,
attempts=attempts,
errors=errors
)
except Exception as e:
errors.append(f"{provider.name}: {str(e)}")
continue
return FallbackResult(
success=False,
attempts=attempts,
errors=errors
)
class SmartFallbackChain:
"""Fallback chain with health tracking."""
def __init__(self, providers: list[FallbackProvider]):
self.providers = providers
self.classifier = ErrorClassifier()
# Track provider health
self._success_counts: dict[str, int] = {p.name: 0 for p in providers}
self._failure_counts: dict[str, int] = {p.name: 0 for p in providers}
self._last_failure: dict[str, float] = {}
self._cooldown_seconds = 60
def _get_ordered_providers(self) -> list[FallbackProvider]:
"""Get providers ordered by health."""
now = time.time()
def health_score(provider: FallbackProvider) -> float:
name = provider.name
# Check cooldown
if name in self._last_failure:
if now - self._last_failure[name] < self._cooldown_seconds:
return -1 # In cooldown
total = self._success_counts[name] + self._failure_counts[name]
if total == 0:
return 0.5 # Unknown
return self._success_counts[name] / total
return sorted(
self.providers,
key=lambda p: (health_score(p), -p.priority),
reverse=True
)
async def complete(
self,
messages: list[dict],
**kwargs
) -> FallbackResult:
"""Complete with health-aware fallback."""
errors = []
attempts = 0
for provider in self._get_ordered_providers():
try:
attempts += 1
result = await provider.complete(messages, **kwargs)
# Track success
self._success_counts[provider.name] += 1
return FallbackResult(
success=True,
result=result,
provider_used=provider.name,
attempts=attempts,
errors=errors
)
except Exception as e:
# Track failure
self._failure_counts[provider.name] += 1
self._last_failure[provider.name] = time.time()
errors.append(f"{provider.name}: {str(e)}")
continue
return FallbackResult(
success=False,
attempts=attempts,
errors=errors
)
def get_health_stats(self) -> dict:
"""Get health statistics for all providers."""
stats = {}
for provider in self.providers:
name = provider.name
total = self._success_counts[name] + self._failure_counts[name]
stats[name] = {
"success_count": self._success_counts[name],
"failure_count": self._failure_counts[name],
"success_rate": self._success_counts[name] / total if total > 0 else None,
"in_cooldown": name in self._last_failure and
time.time() - self._last_failure[name] < self._cooldown_seconds
}
return stats
Circuit Breaker
from dataclasses import dataclass
from typing import Any, Optional, Callable
from enum import Enum
import asyncio
import time
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
@dataclass
class CircuitConfig:
"""Circuit breaker configuration."""
failure_threshold: int = 5
success_threshold: int = 2
timeout_seconds: float = 30.0
half_open_max_calls: int = 3
class CircuitBreaker:
"""Circuit breaker for LLM calls."""
def __init__(self, config: CircuitConfig = None):
self.config = config or CircuitConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: float = None
self._half_open_calls = 0
self._lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
"""Get current state, checking for timeout."""
if self._state == CircuitState.OPEN:
if self._last_failure_time:
elapsed = time.time() - self._last_failure_time
if elapsed >= self.config.timeout_seconds:
return CircuitState.HALF_OPEN
return self._state
async def call(self, operation: Callable, *args, **kwargs) -> Any:
"""Execute operation through circuit breaker."""
async with self._lock:
state = self.state
if state == CircuitState.OPEN:
raise CircuitOpenError("Circuit is open")
if state == CircuitState.HALF_OPEN:
if self._half_open_calls >= self.config.half_open_max_calls:
raise CircuitOpenError("Circuit is half-open, max calls reached")
self._half_open_calls += 1
try:
result = await operation(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise
async def _on_success(self):
"""Handle successful call."""
async with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.config.success_threshold:
self._state = CircuitState.CLOSED
self._reset_counts()
elif self._state == CircuitState.CLOSED:
self._failure_count = 0
async def _on_failure(self):
"""Handle failed call."""
async with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
self._reset_counts()
elif self._state == CircuitState.CLOSED:
if self._failure_count >= self.config.failure_threshold:
self._state = CircuitState.OPEN
self._reset_counts()
def _reset_counts(self):
"""Reset counters."""
self._failure_count = 0
self._success_count = 0
self._half_open_calls = 0
class CircuitOpenError(Exception):
"""Raised when circuit is open."""
pass
class ResilientLLMClient:
"""LLM client with circuit breaker and fallback."""
def __init__(
self,
primary_client: Any,
primary_model: str,
fallback_chain: FallbackChain = None,
circuit_config: CircuitConfig = None,
retry_config: RetryConfig = None
):
self.primary_client = primary_client
self.primary_model = primary_model
self.fallback_chain = fallback_chain
self.circuit = CircuitBreaker(circuit_config)
self.retry_executor = RetryExecutor(
strategy=ExponentialBackoff(retry_config or RetryConfig())
)
async def complete(
self,
messages: list[dict],
**kwargs
) -> str:
"""Complete with full resilience."""
# Try primary with circuit breaker
try:
return await self.circuit.call(
self._primary_complete,
messages,
**kwargs
)
except CircuitOpenError:
pass
except Exception:
pass
# Fall back to chain
if self.fallback_chain:
result = await self.fallback_chain.complete(messages, **kwargs)
if result.success:
return result.result
raise Exception(f"All providers failed: {result.errors}")
raise Exception("Primary failed and no fallback available")
async def _primary_complete(
self,
messages: list[dict],
**kwargs
) -> str:
"""Complete using primary provider with retries."""
async def operation():
response = await self.primary_client.chat.completions.create(
model=self.primary_model,
messages=messages,
**kwargs
)
return response.choices[0].message.content
return await self.retry_executor.execute(operation)
Production Error Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
resilient_client = None # Initialize with clients
class CompleteRequest(BaseModel):
messages: list[dict]
model: Optional[str] = None
max_tokens: int = 1000
@app.post("/v1/complete")
async def complete_resilient(request: CompleteRequest):
"""Complete with full error handling."""
try:
result = await resilient_client.complete(
messages=request.messages,
max_tokens=request.max_tokens
)
return {
"content": result,
"success": True
}
except Exception as e:
return {
"content": None,
"success": False,
"error": str(e)
}
@app.get("/v1/health/circuit")
async def circuit_health():
"""Get circuit breaker status."""
return {
"state": resilient_client.circuit.state.value,
"failure_count": resilient_client.circuit._failure_count,
"success_count": resilient_client.circuit._success_count
}
@app.get("/v1/health/providers")
async def provider_health():
"""Get provider health stats."""
if resilient_client.fallback_chain:
if hasattr(resilient_client.fallback_chain, 'get_health_stats'):
return resilient_client.fallback_chain.get_health_stats()
return {"message": "No health stats available"}
@app.post("/v1/circuit/reset")
async def reset_circuit():
"""Manually reset circuit breaker."""
resilient_client.circuit._state = CircuitState.CLOSED
resilient_client.circuit._reset_counts()
return {"reset": True}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Error Handling: https://platform.openai.com/docs/guides/error-codes
- Circuit Breaker Pattern: https://martinfowler.com/bliki/CircuitBreaker.html
- Exponential Backoff: https://cloud.google.com/storage/docs/exponential-backoff
- Tenacity Library: https://tenacity.readthedocs.io/
Conclusion
Robust error handling is essential for production LLM applications. Start by classifying errors—rate limits and timeouts are retryable, authentication errors are not. Implement exponential backoff with jitter to avoid thundering herd problems when services recover. Use fallback chains to try alternative providers when your primary fails. Add circuit breakers to prevent cascading failures and give struggling services time to recover. Track provider health to route requests intelligently. The key insight is that LLM APIs are external dependencies you don't control—design your system to gracefully handle their failures. Build resilience in layers: retries for transient errors, fallbacks for persistent failures, and circuit breakers to protect your system from repeated failures. Monitor error rates and circuit states to catch problems early and understand your system's reliability characteristics.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.