Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Get – Visual Studio 2013 – Update 4

Microsoft has released an update for Visual Studio 2013, this update is the latest in a cumulative series of feature additions and bug fixes for Visual Studio 2013.

Download VS2013.04.exe  (Online/Web Install)

Download VS2013.04.iso (Offline Install)

For more information,

For more information:

Also see Visual Studio Updates and the Visual Studio Update KB Article.

Watch Channel 9 Series on VS 2013 Update 4

LLM Fallback Strategies: Building Reliable AI Applications

Introduction: LLM APIs fail. Rate limits hit, services go down, models return errors, and responses sometimes don’t meet quality thresholds. Building reliable AI applications requires robust fallback strategies that gracefully handle these failures without degrading user experience. A well-designed fallback system tries alternative models, implements retry logic with exponential backoff, caches successful responses, and provides meaningful degraded experiences when all else fails. The key is detecting failures quickly, routing to alternatives seamlessly, and maintaining response quality across fallback paths. This guide covers practical patterns for implementing LLM fallback strategies: from simple retry logic to sophisticated multi-provider routing with quality-aware failover.

LLM Fallback Strategies
Fallback: Primary, Detect, Backup

Failure Detection

from dataclasses import dataclass, field
from typing import Any, Optional, List, Callable
from enum import Enum
from datetime import datetime, timedelta
import asyncio

class FailureType(Enum):
    """Types of LLM failures."""
    
    RATE_LIMIT = "rate_limit"
    TIMEOUT = "timeout"
    API_ERROR = "api_error"
    INVALID_RESPONSE = "invalid_response"
    QUALITY_FAILURE = "quality_failure"
    CONTENT_FILTER = "content_filter"
    CONTEXT_LENGTH = "context_length"
    UNKNOWN = "unknown"

@dataclass
class FailureEvent:
    """A failure event."""
    
    failure_type: FailureType
    provider: str
    model: str
    error_message: str
    timestamp: datetime = field(default_factory=datetime.now)
    retryable: bool = True
    retry_after: int = None  # seconds

class FailureDetector:
    """Detect and classify LLM failures."""
    
    def __init__(self):
        self.error_patterns: dict[str, FailureType] = {
            "rate limit": FailureType.RATE_LIMIT,
            "429": FailureType.RATE_LIMIT,
            "timeout": FailureType.TIMEOUT,
            "timed out": FailureType.TIMEOUT,
            "context length": FailureType.CONTEXT_LENGTH,
            "maximum context": FailureType.CONTEXT_LENGTH,
            "content filter": FailureType.CONTENT_FILTER,
            "content policy": FailureType.CONTENT_FILTER,
            "invalid": FailureType.INVALID_RESPONSE,
            "500": FailureType.API_ERROR,
            "502": FailureType.API_ERROR,
            "503": FailureType.API_ERROR
        }
    
    def detect(self, error: Exception, provider: str, model: str) -> FailureEvent:
        """Detect failure type from exception."""
        
        error_str = str(error).lower()
        
        failure_type = FailureType.UNKNOWN
        retryable = True
        retry_after = None
        
        for pattern, ftype in self.error_patterns.items():
            if pattern in error_str:
                failure_type = ftype
                break
        
        # Set retry behavior based on failure type
        if failure_type == FailureType.RATE_LIMIT:
            retry_after = self._extract_retry_after(error_str)
            retryable = True
        elif failure_type == FailureType.CONTENT_FILTER:
            retryable = False
        elif failure_type == FailureType.CONTEXT_LENGTH:
            retryable = False
        
        return FailureEvent(
            failure_type=failure_type,
            provider=provider,
            model=model,
            error_message=str(error),
            retryable=retryable,
            retry_after=retry_after
        )
    
    def _extract_retry_after(self, error_str: str) -> int:
        """Extract retry-after value from error."""
        
        import re
        
        # Look for retry-after patterns
        patterns = [
            r'retry.?after[:\s]+(\d+)',
            r'wait[:\s]+(\d+)',
            r'(\d+)\s*seconds?'
        ]
        
        for pattern in patterns:
            match = re.search(pattern, error_str)
            if match:
                return int(match.group(1))
        
        return 60  # Default retry after

class QualityChecker:
    """Check response quality."""
    
    def __init__(self):
        self.validators: list[Callable[[str], bool]] = []
    
    def add_validator(self, validator: Callable[[str], bool]):
        """Add quality validator."""
        
        self.validators.append(validator)
    
    def check(self, response: str) -> tuple[bool, str]:
        """Check response quality."""
        
        # Basic checks
        if not response or not response.strip():
            return False, "Empty response"
        
        if len(response) < 10:
            return False, "Response too short"
        
        # Run custom validators
        for validator in self.validators:
            try:
                if not validator(response):
                    return False, "Custom validation failed"
            except Exception as e:
                return False, f"Validator error: {str(e)}"
        
        return True, "OK"

class CircuitBreaker:
    """Circuit breaker for LLM providers."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        half_open_requests: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests
        
        self.failures: dict[str, int] = {}
        self.last_failure: dict[str, datetime] = {}
        self.state: dict[str, str] = {}  # closed, open, half-open
        self.half_open_successes: dict[str, int] = {}
    
    def is_available(self, provider: str) -> bool:
        """Check if provider is available."""
        
        state = self.state.get(provider, "closed")
        
        if state == "closed":
            return True
        
        if state == "open":
            # Check if recovery timeout has passed
            last = self.last_failure.get(provider)
            if last and datetime.now() - last > timedelta(seconds=self.recovery_timeout):
                self.state[provider] = "half-open"
                self.half_open_successes[provider] = 0
                return True
            return False
        
        if state == "half-open":
            return True
        
        return True
    
    def record_success(self, provider: str):
        """Record successful request."""
        
        state = self.state.get(provider, "closed")
        
        if state == "half-open":
            self.half_open_successes[provider] = self.half_open_successes.get(provider, 0) + 1
            
            if self.half_open_successes[provider] >= self.half_open_requests:
                self.state[provider] = "closed"
                self.failures[provider] = 0
        
        elif state == "closed":
            self.failures[provider] = 0
    
    def record_failure(self, provider: str):
        """Record failed request."""
        
        self.failures[provider] = self.failures.get(provider, 0) + 1
        self.last_failure[provider] = datetime.now()
        
        state = self.state.get(provider, "closed")
        
        if state == "half-open":
            self.state[provider] = "open"
        
        elif state == "closed":
            if self.failures[provider] >= self.failure_threshold:
                self.state[provider] = "open"
    
    def get_state(self, provider: str) -> str:
        """Get circuit state."""
        
        return self.state.get(provider, "closed")

Retry Strategies

from dataclasses import dataclass
from typing import Any, Optional, Callable, TypeVar
import asyncio
import random

T = TypeVar('T')

@dataclass
class RetryConfig:
    """Configuration for retry logic."""
    
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_exceptions: tuple = (Exception,)

class RetryStrategy:
    """Retry strategy with exponential backoff."""
    
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
    
    def calculate_delay(self, attempt: int) -> float:
        """Calculate delay for attempt."""
        
        delay = self.config.base_delay * (self.config.exponential_base ** attempt)
        delay = min(delay, self.config.max_delay)
        
        if self.config.jitter:
            delay = delay * (0.5 + random.random())
        
        return delay
    
    async def execute(
        self,
        func: Callable[[], T],
        on_retry: Callable[[int, Exception], None] = None
    ) -> T:
        """Execute function with retry."""
        
        last_exception = None
        
        for attempt in range(self.config.max_retries + 1):
            try:
                result = func()
                if asyncio.iscoroutine(result):
                    result = await result
                return result
            
            except self.config.retryable_exceptions as e:
                last_exception = e
                
                if attempt < self.config.max_retries:
                    delay = self.calculate_delay(attempt)
                    
                    if on_retry:
                        on_retry(attempt + 1, e)
                    
                    await asyncio.sleep(delay)
        
        raise last_exception

class AdaptiveRetry:
    """Adaptive retry based on failure patterns."""
    
    def __init__(self):
        self.failure_history: list[tuple[datetime, FailureType]] = []
        self.base_config = RetryConfig()
    
    def get_config(self, failure_type: FailureType = None) -> RetryConfig:
        """Get retry config based on recent failures."""
        
        # Analyze recent failures
        recent = [f for f in self.failure_history 
                  if datetime.now() - f[0] < timedelta(minutes=5)]
        
        config = RetryConfig(
            max_retries=self.base_config.max_retries,
            base_delay=self.base_config.base_delay,
            max_delay=self.base_config.max_delay
        )
        
        # Adjust based on failure patterns
        rate_limit_count = sum(1 for _, ft in recent if ft == FailureType.RATE_LIMIT)
        
        if rate_limit_count > 3:
            config.base_delay *= 2
            config.max_retries = min(config.max_retries + 2, 10)
        
        if failure_type == FailureType.RATE_LIMIT:
            config.base_delay = max(config.base_delay, 5.0)
        
        return config
    
    def record_failure(self, failure_type: FailureType):
        """Record failure for adaptation."""
        
        self.failure_history.append((datetime.now(), failure_type))
        
        # Keep only recent history
        cutoff = datetime.now() - timedelta(hours=1)
        self.failure_history = [f for f in self.failure_history if f[0] > cutoff]

class RateLimitAwareRetry:
    """Retry with rate limit awareness."""
    
    def __init__(self):
        self.rate_limits: dict[str, dict] = {}
    
    def update_rate_limit(
        self,
        provider: str,
        requests_remaining: int,
        reset_time: datetime
    ):
        """Update rate limit info."""
        
        self.rate_limits[provider] = {
            "remaining": requests_remaining,
            "reset": reset_time
        }
    
    async def wait_if_needed(self, provider: str):
        """Wait if rate limited."""
        
        if provider not in self.rate_limits:
            return
        
        info = self.rate_limits[provider]
        
        if info["remaining"] <= 0:
            wait_time = (info["reset"] - datetime.now()).total_seconds()
            if wait_time > 0:
                await asyncio.sleep(wait_time)
    
    def should_retry(self, provider: str, failure: FailureEvent) -> bool:
        """Check if should retry based on rate limits."""
        
        if failure.failure_type != FailureType.RATE_LIMIT:
            return True
        
        if failure.retry_after:
            return True
        
        if provider in self.rate_limits:
            return self.rate_limits[provider]["remaining"] > 0
        
        return True

Fallback Routing

from dataclasses import dataclass
from typing import Any, Optional, List
import asyncio

@dataclass
class ModelConfig:
    """Configuration for a model."""
    
    provider: str
    model: str
    priority: int = 0
    max_tokens: int = 4096
    cost_per_1k_tokens: float = 0.0
    latency_ms: int = 1000

@dataclass
class FallbackResult:
    """Result from fallback execution."""
    
    response: str
    provider: str
    model: str
    attempts: int
    fallback_used: bool
    latency_ms: float

class FallbackRouter:
    """Route requests with fallback."""
    
    def __init__(self):
        self.models: list[ModelConfig] = []
        self.clients: dict[str, Any] = {}
        self.circuit_breaker = CircuitBreaker()
        self.failure_detector = FailureDetector()
    
    def add_model(self, config: ModelConfig, client: Any):
        """Add model to fallback chain."""
        
        self.models.append(config)
        self.models.sort(key=lambda m: m.priority)
        self.clients[f"{config.provider}:{config.model}"] = client
    
    async def execute(
        self,
        messages: list[dict],
        max_attempts: int = None
    ) -> FallbackResult:
        """Execute with fallback."""
        
        import time
        
        start_time = time.time()
        attempts = 0
        last_error = None
        
        max_attempts = max_attempts or len(self.models)
        
        for config in self.models:
            if attempts >= max_attempts:
                break
            
            # Check circuit breaker
            if not self.circuit_breaker.is_available(config.provider):
                continue
            
            attempts += 1
            client_key = f"{config.provider}:{config.model}"
            client = self.clients.get(client_key)
            
            if not client:
                continue
            
            try:
                response = await client.generate(
                    messages=messages,
                    model=config.model,
                    max_tokens=config.max_tokens
                )
                
                self.circuit_breaker.record_success(config.provider)
                
                return FallbackResult(
                    response=response,
                    provider=config.provider,
                    model=config.model,
                    attempts=attempts,
                    fallback_used=attempts > 1,
                    latency_ms=(time.time() - start_time) * 1000
                )
            
            except Exception as e:
                last_error = e
                failure = self.failure_detector.detect(e, config.provider, config.model)
                self.circuit_breaker.record_failure(config.provider)
                
                if not failure.retryable:
                    continue
                
                if failure.retry_after:
                    await asyncio.sleep(failure.retry_after)
        
        raise last_error or Exception("All fallback models failed")

class CascadeFallback:
    """Cascade through models of decreasing capability."""
    
    def __init__(self):
        self.tiers: list[list[ModelConfig]] = []
        self.clients: dict[str, Any] = {}
    
    def add_tier(self, models: list[ModelConfig], clients: dict[str, Any]):
        """Add a tier of models."""
        
        self.tiers.append(models)
        self.clients.update(clients)
    
    async def execute(
        self,
        messages: list[dict],
        quality_checker: QualityChecker = None
    ) -> FallbackResult:
        """Execute with cascade fallback."""
        
        import time
        
        start_time = time.time()
        attempts = 0
        
        for tier_idx, tier in enumerate(self.tiers):
            for config in tier:
                attempts += 1
                client_key = f"{config.provider}:{config.model}"
                client = self.clients.get(client_key)
                
                if not client:
                    continue
                
                try:
                    response = await client.generate(
                        messages=messages,
                        model=config.model
                    )
                    
                    # Check quality
                    if quality_checker:
                        is_valid, reason = quality_checker.check(response)
                        if not is_valid:
                            continue
                    
                    return FallbackResult(
                        response=response,
                        provider=config.provider,
                        model=config.model,
                        attempts=attempts,
                        fallback_used=tier_idx > 0 or attempts > 1,
                        latency_ms=(time.time() - start_time) * 1000
                    )
                
                except Exception:
                    continue
        
        raise Exception("All cascade tiers failed")

class ParallelFallback:
    """Execute multiple models in parallel, use first success."""
    
    def __init__(self):
        self.models: list[ModelConfig] = []
        self.clients: dict[str, Any] = {}
    
    def add_model(self, config: ModelConfig, client: Any):
        """Add model for parallel execution."""
        
        self.models.append(config)
        self.clients[f"{config.provider}:{config.model}"] = client
    
    async def execute(
        self,
        messages: list[dict],
        timeout: float = 30.0
    ) -> FallbackResult:
        """Execute all models in parallel."""
        
        import time
        
        start_time = time.time()
        
        async def try_model(config: ModelConfig) -> tuple[ModelConfig, str]:
            client_key = f"{config.provider}:{config.model}"
            client = self.clients.get(client_key)
            
            if not client:
                raise Exception("No client")
            
            response = await client.generate(
                messages=messages,
                model=config.model
            )
            
            return config, response
        
        tasks = [
            asyncio.create_task(try_model(config))
            for config in self.models
        ]
        
        done, pending = await asyncio.wait(
            tasks,
            timeout=timeout,
            return_when=asyncio.FIRST_COMPLETED
        )
        
        # Cancel pending tasks
        for task in pending:
            task.cancel()
        
        # Get first successful result
        for task in done:
            try:
                config, response = task.result()
                
                return FallbackResult(
                    response=response,
                    provider=config.provider,
                    model=config.model,
                    attempts=len(self.models),
                    fallback_used=False,
                    latency_ms=(time.time() - start_time) * 1000
                )
            except:
                continue
        
        raise Exception("All parallel models failed")

Production Fallback Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict

app = FastAPI()

class CompletionRequest(BaseModel):
    messages: List[Dict]
    max_tokens: int = 1024
    fallback_strategy: str = "cascade"
    timeout: float = 30.0

class CompletionResponse(BaseModel):
    response: str
    provider: str
    model: str
    attempts: int
    fallback_used: bool
    latency_ms: float

# Initialize components
fallback_router = FallbackRouter()
cascade_fallback = CascadeFallback()
circuit_breaker = CircuitBreaker()

@app.post("/v1/completions")
async def create_completion(request: CompletionRequest) -> CompletionResponse:
    """Create completion with fallback."""
    
    try:
        if request.fallback_strategy == "cascade":
            result = await cascade_fallback.execute(
                request.messages
            )
        else:
            result = await fallback_router.execute(
                request.messages
            )
        
        return CompletionResponse(
            response=result.response,
            provider=result.provider,
            model=result.model,
            attempts=result.attempts,
            fallback_used=result.fallback_used,
            latency_ms=result.latency_ms
        )
    
    except Exception as e:
        raise HTTPException(status_code=503, detail=str(e))

@app.get("/v1/providers/status")
async def get_provider_status() -> dict:
    """Get status of all providers."""
    
    status = {}
    
    for config in fallback_router.models:
        provider = config.provider
        status[provider] = {
            "available": circuit_breaker.is_available(provider),
            "state": circuit_breaker.get_state(provider),
            "model": config.model
        }
    
    return status

@app.post("/v1/providers/{provider}/reset")
async def reset_provider(provider: str) -> dict:
    """Reset circuit breaker for provider."""
    
    circuit_breaker.state[provider] = "closed"
    circuit_breaker.failures[provider] = 0
    
    return {"provider": provider, "state": "closed"}

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Robust fallback strategies are essential for production LLM applications. Start with proper failure detection—classify errors to determine if they’re retryable and extract retry-after headers when available. Implement circuit breakers to prevent cascading failures; when a provider is consistently failing, stop sending requests until it recovers. Use exponential backoff with jitter for retries to avoid thundering herd problems. For fallback routing, cascade through models of decreasing capability (GPT-4 to GPT-3.5 to Claude Haiku) to balance quality and reliability. Consider parallel execution for latency-critical applications—race multiple providers and use the first response. Monitor fallback usage in production; high fallback rates indicate provider issues or misconfigured primary models. Quality checking after responses ensures fallbacks don’t degrade user experience. The goal is invisible reliability—users should never know when fallbacks are used, only that the system always responds appropriately.