Etiam pulvinar consectetur dolor sed malesuada. Ut convallis
euismod dolor nec pretium. Nunc ut tristique
massa.
Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan.
Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla
lorem,
suscipit in posuere in, interdum non magna.
Microsoft has released an update for Visual Studio 2013, this update is the latest in a cumulative series of feature additions and bug fixes for Visual Studio 2013.
Introduction: LLM APIs fail. Rate limits hit, services go down, models return errors, and responses sometimes don’t meet quality thresholds. Building reliable AI applications requires robust fallback strategies that gracefully handle these failures without degrading user experience. A well-designed fallback system tries alternative models, implements retry logic with exponential backoff, caches successful responses, and provides meaningful degraded experiences when all else fails. The key is detecting failures quickly, routing to alternatives seamlessly, and maintaining response quality across fallback paths. This guide covers practical patterns for implementing LLM fallback strategies: from simple retry logic to sophisticated multi-provider routing with quality-aware failover.
Fallback: Primary, Detect, Backup
Failure Detection
from dataclasses import dataclass, field
from typing import Any, Optional, List, Callable
from enum import Enum
from datetime import datetime, timedelta
import asyncio
class FailureType(Enum):
"""Types of LLM failures."""
RATE_LIMIT = "rate_limit"
TIMEOUT = "timeout"
API_ERROR = "api_error"
INVALID_RESPONSE = "invalid_response"
QUALITY_FAILURE = "quality_failure"
CONTENT_FILTER = "content_filter"
CONTEXT_LENGTH = "context_length"
UNKNOWN = "unknown"
@dataclass
class FailureEvent:
"""A failure event."""
failure_type: FailureType
provider: str
model: str
error_message: str
timestamp: datetime = field(default_factory=datetime.now)
retryable: bool = True
retry_after: int = None # seconds
class FailureDetector:
"""Detect and classify LLM failures."""
def __init__(self):
self.error_patterns: dict[str, FailureType] = {
"rate limit": FailureType.RATE_LIMIT,
"429": FailureType.RATE_LIMIT,
"timeout": FailureType.TIMEOUT,
"timed out": FailureType.TIMEOUT,
"context length": FailureType.CONTEXT_LENGTH,
"maximum context": FailureType.CONTEXT_LENGTH,
"content filter": FailureType.CONTENT_FILTER,
"content policy": FailureType.CONTENT_FILTER,
"invalid": FailureType.INVALID_RESPONSE,
"500": FailureType.API_ERROR,
"502": FailureType.API_ERROR,
"503": FailureType.API_ERROR
}
def detect(self, error: Exception, provider: str, model: str) -> FailureEvent:
"""Detect failure type from exception."""
error_str = str(error).lower()
failure_type = FailureType.UNKNOWN
retryable = True
retry_after = None
for pattern, ftype in self.error_patterns.items():
if pattern in error_str:
failure_type = ftype
break
# Set retry behavior based on failure type
if failure_type == FailureType.RATE_LIMIT:
retry_after = self._extract_retry_after(error_str)
retryable = True
elif failure_type == FailureType.CONTENT_FILTER:
retryable = False
elif failure_type == FailureType.CONTEXT_LENGTH:
retryable = False
return FailureEvent(
failure_type=failure_type,
provider=provider,
model=model,
error_message=str(error),
retryable=retryable,
retry_after=retry_after
)
def _extract_retry_after(self, error_str: str) -> int:
"""Extract retry-after value from error."""
import re
# Look for retry-after patterns
patterns = [
r'retry.?after[:\s]+(\d+)',
r'wait[:\s]+(\d+)',
r'(\d+)\s*seconds?'
]
for pattern in patterns:
match = re.search(pattern, error_str)
if match:
return int(match.group(1))
return 60 # Default retry after
class QualityChecker:
"""Check response quality."""
def __init__(self):
self.validators: list[Callable[[str], bool]] = []
def add_validator(self, validator: Callable[[str], bool]):
"""Add quality validator."""
self.validators.append(validator)
def check(self, response: str) -> tuple[bool, str]:
"""Check response quality."""
# Basic checks
if not response or not response.strip():
return False, "Empty response"
if len(response) < 10:
return False, "Response too short"
# Run custom validators
for validator in self.validators:
try:
if not validator(response):
return False, "Custom validation failed"
except Exception as e:
return False, f"Validator error: {str(e)}"
return True, "OK"
class CircuitBreaker:
"""Circuit breaker for LLM providers."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_requests: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.failures: dict[str, int] = {}
self.last_failure: dict[str, datetime] = {}
self.state: dict[str, str] = {} # closed, open, half-open
self.half_open_successes: dict[str, int] = {}
def is_available(self, provider: str) -> bool:
"""Check if provider is available."""
state = self.state.get(provider, "closed")
if state == "closed":
return True
if state == "open":
# Check if recovery timeout has passed
last = self.last_failure.get(provider)
if last and datetime.now() - last > timedelta(seconds=self.recovery_timeout):
self.state[provider] = "half-open"
self.half_open_successes[provider] = 0
return True
return False
if state == "half-open":
return True
return True
def record_success(self, provider: str):
"""Record successful request."""
state = self.state.get(provider, "closed")
if state == "half-open":
self.half_open_successes[provider] = self.half_open_successes.get(provider, 0) + 1
if self.half_open_successes[provider] >= self.half_open_requests:
self.state[provider] = "closed"
self.failures[provider] = 0
elif state == "closed":
self.failures[provider] = 0
def record_failure(self, provider: str):
"""Record failed request."""
self.failures[provider] = self.failures.get(provider, 0) + 1
self.last_failure[provider] = datetime.now()
state = self.state.get(provider, "closed")
if state == "half-open":
self.state[provider] = "open"
elif state == "closed":
if self.failures[provider] >= self.failure_threshold:
self.state[provider] = "open"
def get_state(self, provider: str) -> str:
"""Get circuit state."""
return self.state.get(provider, "closed")
Retry Strategies
from dataclasses import dataclass
from typing import Any, Optional, Callable, TypeVar
import asyncio
import random
T = TypeVar('T')
@dataclass
class RetryConfig:
"""Configuration for retry logic."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_exceptions: tuple = (Exception,)
class RetryStrategy:
"""Retry strategy with exponential backoff."""
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
def calculate_delay(self, attempt: int) -> float:
"""Calculate delay for attempt."""
delay = self.config.base_delay * (self.config.exponential_base ** attempt)
delay = min(delay, self.config.max_delay)
if self.config.jitter:
delay = delay * (0.5 + random.random())
return delay
async def execute(
self,
func: Callable[[], T],
on_retry: Callable[[int, Exception], None] = None
) -> T:
"""Execute function with retry."""
last_exception = None
for attempt in range(self.config.max_retries + 1):
try:
result = func()
if asyncio.iscoroutine(result):
result = await result
return result
except self.config.retryable_exceptions as e:
last_exception = e
if attempt < self.config.max_retries:
delay = self.calculate_delay(attempt)
if on_retry:
on_retry(attempt + 1, e)
await asyncio.sleep(delay)
raise last_exception
class AdaptiveRetry:
"""Adaptive retry based on failure patterns."""
def __init__(self):
self.failure_history: list[tuple[datetime, FailureType]] = []
self.base_config = RetryConfig()
def get_config(self, failure_type: FailureType = None) -> RetryConfig:
"""Get retry config based on recent failures."""
# Analyze recent failures
recent = [f for f in self.failure_history
if datetime.now() - f[0] < timedelta(minutes=5)]
config = RetryConfig(
max_retries=self.base_config.max_retries,
base_delay=self.base_config.base_delay,
max_delay=self.base_config.max_delay
)
# Adjust based on failure patterns
rate_limit_count = sum(1 for _, ft in recent if ft == FailureType.RATE_LIMIT)
if rate_limit_count > 3:
config.base_delay *= 2
config.max_retries = min(config.max_retries + 2, 10)
if failure_type == FailureType.RATE_LIMIT:
config.base_delay = max(config.base_delay, 5.0)
return config
def record_failure(self, failure_type: FailureType):
"""Record failure for adaptation."""
self.failure_history.append((datetime.now(), failure_type))
# Keep only recent history
cutoff = datetime.now() - timedelta(hours=1)
self.failure_history = [f for f in self.failure_history if f[0] > cutoff]
class RateLimitAwareRetry:
"""Retry with rate limit awareness."""
def __init__(self):
self.rate_limits: dict[str, dict] = {}
def update_rate_limit(
self,
provider: str,
requests_remaining: int,
reset_time: datetime
):
"""Update rate limit info."""
self.rate_limits[provider] = {
"remaining": requests_remaining,
"reset": reset_time
}
async def wait_if_needed(self, provider: str):
"""Wait if rate limited."""
if provider not in self.rate_limits:
return
info = self.rate_limits[provider]
if info["remaining"] <= 0:
wait_time = (info["reset"] - datetime.now()).total_seconds()
if wait_time > 0:
await asyncio.sleep(wait_time)
def should_retry(self, provider: str, failure: FailureEvent) -> bool:
"""Check if should retry based on rate limits."""
if failure.failure_type != FailureType.RATE_LIMIT:
return True
if failure.retry_after:
return True
if provider in self.rate_limits:
return self.rate_limits[provider]["remaining"] > 0
return True
Fallback Routing
from dataclasses import dataclass
from typing import Any, Optional, List
import asyncio
@dataclass
class ModelConfig:
"""Configuration for a model."""
provider: str
model: str
priority: int = 0
max_tokens: int = 4096
cost_per_1k_tokens: float = 0.0
latency_ms: int = 1000
@dataclass
class FallbackResult:
"""Result from fallback execution."""
response: str
provider: str
model: str
attempts: int
fallback_used: bool
latency_ms: float
class FallbackRouter:
"""Route requests with fallback."""
def __init__(self):
self.models: list[ModelConfig] = []
self.clients: dict[str, Any] = {}
self.circuit_breaker = CircuitBreaker()
self.failure_detector = FailureDetector()
def add_model(self, config: ModelConfig, client: Any):
"""Add model to fallback chain."""
self.models.append(config)
self.models.sort(key=lambda m: m.priority)
self.clients[f"{config.provider}:{config.model}"] = client
async def execute(
self,
messages: list[dict],
max_attempts: int = None
) -> FallbackResult:
"""Execute with fallback."""
import time
start_time = time.time()
attempts = 0
last_error = None
max_attempts = max_attempts or len(self.models)
for config in self.models:
if attempts >= max_attempts:
break
# Check circuit breaker
if not self.circuit_breaker.is_available(config.provider):
continue
attempts += 1
client_key = f"{config.provider}:{config.model}"
client = self.clients.get(client_key)
if not client:
continue
try:
response = await client.generate(
messages=messages,
model=config.model,
max_tokens=config.max_tokens
)
self.circuit_breaker.record_success(config.provider)
return FallbackResult(
response=response,
provider=config.provider,
model=config.model,
attempts=attempts,
fallback_used=attempts > 1,
latency_ms=(time.time() - start_time) * 1000
)
except Exception as e:
last_error = e
failure = self.failure_detector.detect(e, config.provider, config.model)
self.circuit_breaker.record_failure(config.provider)
if not failure.retryable:
continue
if failure.retry_after:
await asyncio.sleep(failure.retry_after)
raise last_error or Exception("All fallback models failed")
class CascadeFallback:
"""Cascade through models of decreasing capability."""
def __init__(self):
self.tiers: list[list[ModelConfig]] = []
self.clients: dict[str, Any] = {}
def add_tier(self, models: list[ModelConfig], clients: dict[str, Any]):
"""Add a tier of models."""
self.tiers.append(models)
self.clients.update(clients)
async def execute(
self,
messages: list[dict],
quality_checker: QualityChecker = None
) -> FallbackResult:
"""Execute with cascade fallback."""
import time
start_time = time.time()
attempts = 0
for tier_idx, tier in enumerate(self.tiers):
for config in tier:
attempts += 1
client_key = f"{config.provider}:{config.model}"
client = self.clients.get(client_key)
if not client:
continue
try:
response = await client.generate(
messages=messages,
model=config.model
)
# Check quality
if quality_checker:
is_valid, reason = quality_checker.check(response)
if not is_valid:
continue
return FallbackResult(
response=response,
provider=config.provider,
model=config.model,
attempts=attempts,
fallback_used=tier_idx > 0 or attempts > 1,
latency_ms=(time.time() - start_time) * 1000
)
except Exception:
continue
raise Exception("All cascade tiers failed")
class ParallelFallback:
"""Execute multiple models in parallel, use first success."""
def __init__(self):
self.models: list[ModelConfig] = []
self.clients: dict[str, Any] = {}
def add_model(self, config: ModelConfig, client: Any):
"""Add model for parallel execution."""
self.models.append(config)
self.clients[f"{config.provider}:{config.model}"] = client
async def execute(
self,
messages: list[dict],
timeout: float = 30.0
) -> FallbackResult:
"""Execute all models in parallel."""
import time
start_time = time.time()
async def try_model(config: ModelConfig) -> tuple[ModelConfig, str]:
client_key = f"{config.provider}:{config.model}"
client = self.clients.get(client_key)
if not client:
raise Exception("No client")
response = await client.generate(
messages=messages,
model=config.model
)
return config, response
tasks = [
asyncio.create_task(try_model(config))
for config in self.models
]
done, pending = await asyncio.wait(
tasks,
timeout=timeout,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending tasks
for task in pending:
task.cancel()
# Get first successful result
for task in done:
try:
config, response = task.result()
return FallbackResult(
response=response,
provider=config.provider,
model=config.model,
attempts=len(self.models),
fallback_used=False,
latency_ms=(time.time() - start_time) * 1000
)
except:
continue
raise Exception("All parallel models failed")
Production Fallback Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict
app = FastAPI()
class CompletionRequest(BaseModel):
messages: List[Dict]
max_tokens: int = 1024
fallback_strategy: str = "cascade"
timeout: float = 30.0
class CompletionResponse(BaseModel):
response: str
provider: str
model: str
attempts: int
fallback_used: bool
latency_ms: float
# Initialize components
fallback_router = FallbackRouter()
cascade_fallback = CascadeFallback()
circuit_breaker = CircuitBreaker()
@app.post("/v1/completions")
async def create_completion(request: CompletionRequest) -> CompletionResponse:
"""Create completion with fallback."""
try:
if request.fallback_strategy == "cascade":
result = await cascade_fallback.execute(
request.messages
)
else:
result = await fallback_router.execute(
request.messages
)
return CompletionResponse(
response=result.response,
provider=result.provider,
model=result.model,
attempts=result.attempts,
fallback_used=result.fallback_used,
latency_ms=result.latency_ms
)
except Exception as e:
raise HTTPException(status_code=503, detail=str(e))
@app.get("/v1/providers/status")
async def get_provider_status() -> dict:
"""Get status of all providers."""
status = {}
for config in fallback_router.models:
provider = config.provider
status[provider] = {
"available": circuit_breaker.is_available(provider),
"state": circuit_breaker.get_state(provider),
"model": config.model
}
return status
@app.post("/v1/providers/{provider}/reset")
async def reset_provider(provider: str) -> dict:
"""Reset circuit breaker for provider."""
circuit_breaker.state[provider] = "closed"
circuit_breaker.failures[provider] = 0
return {"provider": provider, "state": "closed"}
@app.get("/health")
async def health():
return {"status": "healthy"}
Robust fallback strategies are essential for production LLM applications. Start with proper failure detection—classify errors to determine if they’re retryable and extract retry-after headers when available. Implement circuit breakers to prevent cascading failures; when a provider is consistently failing, stop sending requests until it recovers. Use exponential backoff with jitter for retries to avoid thundering herd problems. For fallback routing, cascade through models of decreasing capability (GPT-4 to GPT-3.5 to Claude Haiku) to balance quality and reliability. Consider parallel execution for latency-critical applications—race multiple providers and use the first response. Monitor fallback usage in production; high fallback rates indicate provider issues or misconfigured primary models. Quality checking after responses ensures fallbacks don’t degrade user experience. The goal is invisible reliability—users should never know when fallbacks are used, only that the system always responds appropriately.