LLM Model Selection: Choosing the Right Model for Every Task

Introduction: Choosing the right LLM for your task is one of the most impactful decisions you’ll make. Use a model that’s too small and you’ll get poor quality. Use one that’s too large and you’ll burn through budget while waiting for slow responses. The landscape changes constantly—new models launch monthly, pricing shifts, and capabilities evolve. This guide covers practical model selection: understanding capability tiers, benchmarking models on your specific tasks, optimizing for cost and latency, and building routing systems that automatically select the best model for each request.

Model Selection
Selection Pipeline: Capability Analysis, Performance Benchmark, Cost Optimization

Model Capability Mapping

from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum

class TaskType(Enum):
    """Types of LLM tasks."""
    
    SIMPLE_QA = "simple_qa"
    COMPLEX_REASONING = "complex_reasoning"
    CODE_GENERATION = "code_generation"
    CREATIVE_WRITING = "creative_writing"
    SUMMARIZATION = "summarization"
    EXTRACTION = "extraction"
    CLASSIFICATION = "classification"
    TRANSLATION = "translation"
    MATH = "math"
    MULTI_TURN = "multi_turn"

@dataclass
class ModelCapabilities:
    """Capabilities of an LLM model."""
    
    model_id: str
    provider: str
    
    # Context and output limits
    context_window: int
    max_output_tokens: int
    
    # Capabilities (0-1 scores)
    reasoning: float
    coding: float
    creativity: float
    instruction_following: float
    factual_accuracy: float
    math: float
    multilingual: float
    
    # Performance characteristics
    latency_tier: str  # fast, medium, slow
    cost_per_1k_input: float
    cost_per_1k_output: float
    
    # Features
    supports_json_mode: bool = False
    supports_function_calling: bool = False
    supports_vision: bool = False
    supports_streaming: bool = True

class ModelRegistry:
    """Registry of available models and their capabilities."""
    
    def __init__(self):
        self.models: dict[str, ModelCapabilities] = {}
        self._load_default_models()
    
    def _load_default_models(self):
        """Load default model configurations."""
        
        self.models = {
            "gpt-4o": ModelCapabilities(
                model_id="gpt-4o",
                provider="openai",
                context_window=128000,
                max_output_tokens=16384,
                reasoning=0.95,
                coding=0.95,
                creativity=0.90,
                instruction_following=0.95,
                factual_accuracy=0.90,
                math=0.90,
                multilingual=0.90,
                latency_tier="medium",
                cost_per_1k_input=0.0025,
                cost_per_1k_output=0.01,
                supports_json_mode=True,
                supports_function_calling=True,
                supports_vision=True
            ),
            "gpt-4o-mini": ModelCapabilities(
                model_id="gpt-4o-mini",
                provider="openai",
                context_window=128000,
                max_output_tokens=16384,
                reasoning=0.80,
                coding=0.85,
                creativity=0.80,
                instruction_following=0.90,
                factual_accuracy=0.85,
                math=0.80,
                multilingual=0.85,
                latency_tier="fast",
                cost_per_1k_input=0.00015,
                cost_per_1k_output=0.0006,
                supports_json_mode=True,
                supports_function_calling=True,
                supports_vision=True
            ),
            "claude-3-5-sonnet": ModelCapabilities(
                model_id="claude-3-5-sonnet-20241022",
                provider="anthropic",
                context_window=200000,
                max_output_tokens=8192,
                reasoning=0.95,
                coding=0.98,
                creativity=0.92,
                instruction_following=0.95,
                factual_accuracy=0.92,
                math=0.90,
                multilingual=0.88,
                latency_tier="medium",
                cost_per_1k_input=0.003,
                cost_per_1k_output=0.015,
                supports_json_mode=True,
                supports_function_calling=True,
                supports_vision=True
            ),
            "claude-3-haiku": ModelCapabilities(
                model_id="claude-3-haiku-20240307",
                provider="anthropic",
                context_window=200000,
                max_output_tokens=4096,
                reasoning=0.75,
                coding=0.80,
                creativity=0.75,
                instruction_following=0.85,
                factual_accuracy=0.80,
                math=0.70,
                multilingual=0.80,
                latency_tier="fast",
                cost_per_1k_input=0.00025,
                cost_per_1k_output=0.00125,
                supports_json_mode=True,
                supports_function_calling=True,
                supports_vision=True
            ),
            "gemini-1.5-pro": ModelCapabilities(
                model_id="gemini-1.5-pro",
                provider="google",
                context_window=2000000,
                max_output_tokens=8192,
                reasoning=0.92,
                coding=0.90,
                creativity=0.88,
                instruction_following=0.90,
                factual_accuracy=0.88,
                math=0.88,
                multilingual=0.92,
                latency_tier="medium",
                cost_per_1k_input=0.00125,
                cost_per_1k_output=0.005,
                supports_json_mode=True,
                supports_function_calling=True,
                supports_vision=True
            ),
            "gemini-1.5-flash": ModelCapabilities(
                model_id="gemini-1.5-flash",
                provider="google",
                context_window=1000000,
                max_output_tokens=8192,
                reasoning=0.80,
                coding=0.82,
                creativity=0.78,
                instruction_following=0.85,
                factual_accuracy=0.82,
                math=0.78,
                multilingual=0.88,
                latency_tier="fast",
                cost_per_1k_input=0.000075,
                cost_per_1k_output=0.0003,
                supports_json_mode=True,
                supports_function_calling=True,
                supports_vision=True
            )
        }
    
    def get_model(self, model_id: str) -> Optional[ModelCapabilities]:
        """Get model by ID."""
        return self.models.get(model_id)
    
    def filter_by_capability(
        self,
        min_reasoning: float = 0,
        min_coding: float = 0,
        min_context: int = 0,
        max_cost_per_1k: float = float('inf'),
        requires_vision: bool = False,
        requires_function_calling: bool = False
    ) -> list[ModelCapabilities]:
        """Filter models by capability requirements."""
        
        results = []
        for model in self.models.values():
            if model.reasoning < min_reasoning:
                continue
            if model.coding < min_coding:
                continue
            if model.context_window < min_context:
                continue
            if model.cost_per_1k_input > max_cost_per_1k:
                continue
            if requires_vision and not model.supports_vision:
                continue
            if requires_function_calling and not model.supports_function_calling:
                continue
            results.append(model)
        
        return results

Task-Based Selection

from dataclasses import dataclass
from typing import Any

@dataclass
class TaskRequirements:
    """Requirements for a specific task."""
    
    task_type: TaskType
    complexity: str  # low, medium, high
    required_context: int
    latency_sensitive: bool
    cost_sensitive: bool
    requires_json: bool = False
    requires_function_calling: bool = False
    requires_vision: bool = False

class TaskAnalyzer:
    """Analyze tasks to determine requirements."""
    
    def __init__(self):
        self.complexity_indicators = {
            "high": [
                "analyze", "compare", "evaluate", "synthesize",
                "design", "architect", "optimize", "debug complex"
            ],
            "medium": [
                "explain", "summarize", "convert", "refactor",
                "implement", "write", "create"
            ],
            "low": [
                "extract", "classify", "translate", "format",
                "list", "identify", "check"
            ]
        }
    
    def analyze(self, prompt: str, context_length: int = 0) -> TaskRequirements:
        """Analyze a prompt to determine task requirements."""
        
        prompt_lower = prompt.lower()
        
        # Determine task type
        task_type = self._detect_task_type(prompt_lower)
        
        # Determine complexity
        complexity = self._detect_complexity(prompt_lower)
        
        # Estimate required context
        required_context = max(context_length + len(prompt) * 4, 4000)
        
        # Detect special requirements
        requires_json = any(kw in prompt_lower for kw in ["json", "structured", "schema"])
        requires_function = any(kw in prompt_lower for kw in ["call", "function", "tool", "api"])
        requires_vision = any(kw in prompt_lower for kw in ["image", "picture", "screenshot", "diagram"])
        
        return TaskRequirements(
            task_type=task_type,
            complexity=complexity,
            required_context=required_context,
            latency_sensitive=False,  # Can be overridden
            cost_sensitive=False,  # Can be overridden
            requires_json=requires_json,
            requires_function_calling=requires_function,
            requires_vision=requires_vision
        )
    
    def _detect_task_type(self, prompt: str) -> TaskType:
        """Detect the type of task from prompt."""
        
        if any(kw in prompt for kw in ["code", "function", "class", "implement", "debug", "python", "javascript"]):
            return TaskType.CODE_GENERATION
        if any(kw in prompt for kw in ["summarize", "summary", "tldr", "brief"]):
            return TaskType.SUMMARIZATION
        if any(kw in prompt for kw in ["extract", "find", "identify", "list all"]):
            return TaskType.EXTRACTION
        if any(kw in prompt for kw in ["classify", "categorize", "label", "sentiment"]):
            return TaskType.CLASSIFICATION
        if any(kw in prompt for kw in ["translate", "convert to"]):
            return TaskType.TRANSLATION
        if any(kw in prompt for kw in ["calculate", "solve", "math", "equation"]):
            return TaskType.MATH
        if any(kw in prompt for kw in ["write", "story", "creative", "poem", "essay"]):
            return TaskType.CREATIVE_WRITING
        if any(kw in prompt for kw in ["analyze", "compare", "evaluate", "reason", "think through"]):
            return TaskType.COMPLEX_REASONING
        
        return TaskType.SIMPLE_QA
    
    def _detect_complexity(self, prompt: str) -> str:
        """Detect task complexity."""
        
        for level, indicators in self.complexity_indicators.items():
            if any(ind in prompt for ind in indicators):
                return level
        
        # Default based on length
        if len(prompt) > 2000:
            return "high"
        elif len(prompt) > 500:
            return "medium"
        return "low"

class ModelSelector:
    """Select optimal model for a task."""
    
    def __init__(self, registry: ModelRegistry):
        self.registry = registry
        
        # Task type to capability mapping
        self.task_capability_map = {
            TaskType.CODE_GENERATION: "coding",
            TaskType.COMPLEX_REASONING: "reasoning",
            TaskType.CREATIVE_WRITING: "creativity",
            TaskType.MATH: "math",
            TaskType.SUMMARIZATION: "instruction_following",
            TaskType.EXTRACTION: "instruction_following",
            TaskType.CLASSIFICATION: "instruction_following",
            TaskType.TRANSLATION: "multilingual",
            TaskType.SIMPLE_QA: "factual_accuracy",
            TaskType.MULTI_TURN: "reasoning"
        }
        
        # Complexity to minimum capability score
        self.complexity_thresholds = {
            "low": 0.70,
            "medium": 0.80,
            "high": 0.90
        }
    
    def select(self, requirements: TaskRequirements) -> ModelCapabilities:
        """Select the best model for given requirements."""
        
        candidates = list(self.registry.models.values())
        
        # Filter by hard requirements
        candidates = [m for m in candidates if m.context_window >= requirements.required_context]
        
        if requirements.requires_json:
            candidates = [m for m in candidates if m.supports_json_mode]
        
        if requirements.requires_function_calling:
            candidates = [m for m in candidates if m.supports_function_calling]
        
        if requirements.requires_vision:
            candidates = [m for m in candidates if m.supports_vision]
        
        if not candidates:
            raise ValueError("No models meet the requirements")
        
        # Score candidates
        scored = []
        for model in candidates:
            score = self._score_model(model, requirements)
            scored.append((model, score))
        
        # Sort by score (higher is better)
        scored.sort(key=lambda x: x[1], reverse=True)
        
        return scored[0][0]
    
    def _score_model(self, model: ModelCapabilities, requirements: TaskRequirements) -> float:
        """Score a model for given requirements."""
        
        score = 0.0
        
        # Primary capability score
        primary_capability = self.task_capability_map.get(requirements.task_type, "reasoning")
        capability_score = getattr(model, primary_capability, 0.5)
        
        min_threshold = self.complexity_thresholds[requirements.complexity]
        if capability_score >= min_threshold:
            score += capability_score * 40
        else:
            score += capability_score * 20  # Penalty for not meeting threshold
        
        # Latency score
        if requirements.latency_sensitive:
            latency_scores = {"fast": 30, "medium": 15, "slow": 0}
            score += latency_scores.get(model.latency_tier, 0)
        else:
            score += 15  # Neutral
        
        # Cost score
        if requirements.cost_sensitive:
            # Lower cost = higher score
            max_cost = 0.05  # Normalize against
            cost_ratio = 1 - min(model.cost_per_1k_input / max_cost, 1)
            score += cost_ratio * 30
        else:
            score += 15  # Neutral
        
        return score

Benchmarking Framework

from dataclasses import dataclass, field
from typing import Any, Callable
import time
import asyncio

@dataclass
class BenchmarkResult:
    """Result of a single benchmark run."""
    
    model_id: str
    task_id: str
    
    # Performance metrics
    latency_ms: float
    prompt_tokens: int
    completion_tokens: int
    cost_usd: float
    
    # Quality metrics
    accuracy: Optional[float] = None
    relevance: Optional[float] = None
    
    # Output
    output: str = ""
    error: Optional[str] = None

@dataclass
class BenchmarkTask:
    """A task for benchmarking."""
    
    task_id: str
    prompt: str
    expected_output: Optional[str] = None
    evaluator: Optional[Callable[[str, str], float]] = None
    task_type: TaskType = TaskType.SIMPLE_QA

class ModelBenchmarker:
    """Benchmark models on specific tasks."""
    
    def __init__(self, clients: dict[str, Any], registry: ModelRegistry):
        self.clients = clients  # provider -> client
        self.registry = registry
    
    async def benchmark_model(
        self,
        model_id: str,
        task: BenchmarkTask,
        num_runs: int = 3
    ) -> list[BenchmarkResult]:
        """Benchmark a single model on a task."""
        
        model = self.registry.get_model(model_id)
        if not model:
            raise ValueError(f"Unknown model: {model_id}")
        
        client = self.clients.get(model.provider)
        if not client:
            raise ValueError(f"No client for provider: {model.provider}")
        
        results = []
        
        for _ in range(num_runs):
            result = await self._run_single(client, model, task)
            results.append(result)
        
        return results
    
    async def _run_single(
        self,
        client: Any,
        model: ModelCapabilities,
        task: BenchmarkTask
    ) -> BenchmarkResult:
        """Run a single benchmark."""
        
        start_time = time.time()
        error = None
        output = ""
        prompt_tokens = 0
        completion_tokens = 0
        
        try:
            if model.provider == "openai":
                response = await client.chat.completions.create(
                    model=model.model_id,
                    messages=[{"role": "user", "content": task.prompt}]
                )
                output = response.choices[0].message.content
                prompt_tokens = response.usage.prompt_tokens
                completion_tokens = response.usage.completion_tokens
            
            elif model.provider == "anthropic":
                response = await client.messages.create(
                    model=model.model_id,
                    max_tokens=4096,
                    messages=[{"role": "user", "content": task.prompt}]
                )
                output = response.content[0].text
                prompt_tokens = response.usage.input_tokens
                completion_tokens = response.usage.output_tokens
            
            elif model.provider == "google":
                response = await client.generate_content_async(task.prompt)
                output = response.text
                prompt_tokens = response.usage_metadata.prompt_token_count
                completion_tokens = response.usage_metadata.candidates_token_count
        
        except Exception as e:
            error = str(e)
        
        latency_ms = (time.time() - start_time) * 1000
        
        # Calculate cost
        cost = (
            (prompt_tokens / 1000) * model.cost_per_1k_input +
            (completion_tokens / 1000) * model.cost_per_1k_output
        )
        
        # Evaluate quality
        accuracy = None
        if task.evaluator and output and task.expected_output:
            accuracy = task.evaluator(output, task.expected_output)
        
        return BenchmarkResult(
            model_id=model.model_id,
            task_id=task.task_id,
            latency_ms=latency_ms,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            cost_usd=cost,
            accuracy=accuracy,
            output=output,
            error=error
        )
    
    async def compare_models(
        self,
        model_ids: list[str],
        tasks: list[BenchmarkTask],
        num_runs: int = 3
    ) -> dict[str, list[BenchmarkResult]]:
        """Compare multiple models on multiple tasks."""
        
        results = {}
        
        for model_id in model_ids:
            model_results = []
            for task in tasks:
                task_results = await self.benchmark_model(model_id, task, num_runs)
                model_results.extend(task_results)
            results[model_id] = model_results
        
        return results
    
    def summarize_results(self, results: dict[str, list[BenchmarkResult]]) -> dict:
        """Summarize benchmark results."""
        
        summary = {}
        
        for model_id, model_results in results.items():
            valid_results = [r for r in model_results if r.error is None]
            
            if not valid_results:
                summary[model_id] = {"error": "All runs failed"}
                continue
            
            latencies = [r.latency_ms for r in valid_results]
            costs = [r.cost_usd for r in valid_results]
            accuracies = [r.accuracy for r in valid_results if r.accuracy is not None]
            
            summary[model_id] = {
                "runs": len(valid_results),
                "errors": len(model_results) - len(valid_results),
                "latency_avg_ms": sum(latencies) / len(latencies),
                "latency_p95_ms": sorted(latencies)[int(len(latencies) * 0.95)] if len(latencies) > 1 else latencies[0],
                "cost_avg_usd": sum(costs) / len(costs),
                "cost_total_usd": sum(costs),
                "accuracy_avg": sum(accuracies) / len(accuracies) if accuracies else None
            }
        
        return summary

Dynamic Model Routing

from dataclasses import dataclass
from typing import Any, Optional
import random

@dataclass
class RoutingDecision:
    """A model routing decision."""
    
    model_id: str
    reason: str
    confidence: float
    fallback_model: Optional[str] = None

class ModelRouter:
    """Route requests to optimal models."""
    
    def __init__(
        self,
        registry: ModelRegistry,
        selector: ModelSelector,
        analyzer: TaskAnalyzer
    ):
        self.registry = registry
        self.selector = selector
        self.analyzer = analyzer
        
        # Routing rules
        self.rules: list[tuple[Callable, str]] = []
        
        # Performance history for adaptive routing
        self.performance_history: dict[str, list[float]] = {}
    
    def add_rule(self, condition: Callable[[str, dict], bool], model_id: str):
        """Add a routing rule."""
        self.rules.append((condition, model_id))
    
    def route(
        self,
        prompt: str,
        context: dict = None,
        latency_sensitive: bool = False,
        cost_sensitive: bool = False
    ) -> RoutingDecision:
        """Route a request to the optimal model."""
        
        context = context or {}
        
        # Check explicit rules first
        for condition, model_id in self.rules:
            if condition(prompt, context):
                return RoutingDecision(
                    model_id=model_id,
                    reason="explicit_rule",
                    confidence=1.0
                )
        
        # Analyze task
        requirements = self.analyzer.analyze(prompt)
        requirements.latency_sensitive = latency_sensitive
        requirements.cost_sensitive = cost_sensitive
        
        # Select model
        model = self.selector.select(requirements)
        
        # Determine fallback
        fallback = self._get_fallback(model.model_id, requirements)
        
        return RoutingDecision(
            model_id=model.model_id,
            reason=f"task_type={requirements.task_type.value}, complexity={requirements.complexity}",
            confidence=0.8,
            fallback_model=fallback
        )
    
    def _get_fallback(self, primary_model: str, requirements: TaskRequirements) -> Optional[str]:
        """Get fallback model."""
        
        # Get all suitable models
        candidates = list(self.registry.models.values())
        candidates = [m for m in candidates if m.model_id != primary_model]
        candidates = [m for m in candidates if m.context_window >= requirements.required_context]
        
        if not candidates:
            return None
        
        # Prefer faster/cheaper models as fallback
        candidates.sort(key=lambda m: m.cost_per_1k_input)
        
        return candidates[0].model_id
    
    def record_performance(self, model_id: str, latency_ms: float, success: bool):
        """Record model performance for adaptive routing."""
        
        if model_id not in self.performance_history:
            self.performance_history[model_id] = []
        
        # Store success rate (1.0 for success, 0.0 for failure)
        score = 1.0 if success else 0.0
        self.performance_history[model_id].append(score)
        
        # Keep last 100 records
        if len(self.performance_history[model_id]) > 100:
            self.performance_history[model_id].pop(0)
    
    def get_model_health(self, model_id: str) -> float:
        """Get model health score (0-1)."""
        
        history = self.performance_history.get(model_id, [])
        if not history:
            return 1.0  # Assume healthy if no data
        
        return sum(history) / len(history)

class CostOptimizedRouter(ModelRouter):
    """Router that optimizes for cost."""
    
    def __init__(self, *args, budget_per_hour: float = 10.0, **kwargs):
        super().__init__(*args, **kwargs)
        self.budget_per_hour = budget_per_hour
        self.hourly_spend: float = 0.0
        self.last_reset: float = time.time()
    
    def route(self, prompt: str, **kwargs) -> RoutingDecision:
        """Route with cost awareness."""
        
        # Reset hourly spend if needed
        current_time = time.time()
        if current_time - self.last_reset > 3600:
            self.hourly_spend = 0.0
            self.last_reset = current_time
        
        # If over budget, force cheap model
        if self.hourly_spend > self.budget_per_hour * 0.9:
            cheap_models = sorted(
                self.registry.models.values(),
                key=lambda m: m.cost_per_1k_input
            )
            return RoutingDecision(
                model_id=cheap_models[0].model_id,
                reason="budget_constraint",
                confidence=1.0
            )
        
        # Otherwise, use normal routing with cost sensitivity
        kwargs["cost_sensitive"] = True
        return super().route(prompt, **kwargs)
    
    def record_cost(self, cost_usd: float):
        """Record cost for budget tracking."""
        self.hourly_spend += cost_usd

class ABTestRouter(ModelRouter):
    """Router that supports A/B testing."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.experiments: dict[str, dict] = {}
    
    def add_experiment(
        self,
        name: str,
        control_model: str,
        treatment_model: str,
        traffic_percentage: float = 0.1
    ):
        """Add an A/B test experiment."""
        
        self.experiments[name] = {
            "control": control_model,
            "treatment": treatment_model,
            "traffic": traffic_percentage,
            "results": {"control": [], "treatment": []}
        }
    
    def route(self, prompt: str, user_id: str = None, **kwargs) -> RoutingDecision:
        """Route with A/B testing."""
        
        # Check active experiments
        for exp_name, exp in self.experiments.items():
            # Deterministic assignment based on user_id
            if user_id:
                in_treatment = hash(f"{user_id}:{exp_name}") % 100 < exp["traffic"] * 100
            else:
                in_treatment = random.random() < exp["traffic"]
            
            if in_treatment:
                return RoutingDecision(
                    model_id=exp["treatment"],
                    reason=f"experiment:{exp_name}:treatment",
                    confidence=1.0
                )
        
        # Default routing
        return super().route(prompt, **kwargs)
    
    def record_experiment_result(self, experiment: str, variant: str, metric: float):
        """Record experiment result."""
        
        if experiment in self.experiments:
            self.experiments[experiment]["results"][variant].append(metric)

Production Selection Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
registry = ModelRegistry()
analyzer = TaskAnalyzer()
selector = ModelSelector(registry)
router = ModelRouter(registry, selector, analyzer)

class SelectionRequest(BaseModel):
    prompt: str
    context_length: int = 0
    latency_sensitive: bool = False
    cost_sensitive: bool = False
    requires_json: bool = False
    requires_function_calling: bool = False
    requires_vision: bool = False

class RoutingRequest(BaseModel):
    prompt: str
    user_id: Optional[str] = None
    latency_sensitive: bool = False
    cost_sensitive: bool = False

class BenchmarkRequest(BaseModel):
    model_ids: list[str]
    prompt: str
    expected_output: Optional[str] = None
    num_runs: int = 3

@app.post("/v1/select")
async def select_model(request: SelectionRequest):
    """Select optimal model for a task."""
    
    requirements = TaskRequirements(
        task_type=analyzer._detect_task_type(request.prompt.lower()),
        complexity=analyzer._detect_complexity(request.prompt.lower()),
        required_context=max(request.context_length + len(request.prompt) * 4, 4000),
        latency_sensitive=request.latency_sensitive,
        cost_sensitive=request.cost_sensitive,
        requires_json=request.requires_json,
        requires_function_calling=request.requires_function_calling,
        requires_vision=request.requires_vision
    )
    
    try:
        model = selector.select(requirements)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    
    return {
        "model_id": model.model_id,
        "provider": model.provider,
        "task_type": requirements.task_type.value,
        "complexity": requirements.complexity,
        "capabilities": {
            "reasoning": model.reasoning,
            "coding": model.coding,
            "creativity": model.creativity
        },
        "cost_per_1k_input": model.cost_per_1k_input,
        "cost_per_1k_output": model.cost_per_1k_output,
        "latency_tier": model.latency_tier
    }

@app.post("/v1/route")
async def route_request(request: RoutingRequest):
    """Route request to optimal model."""
    
    decision = router.route(
        request.prompt,
        latency_sensitive=request.latency_sensitive,
        cost_sensitive=request.cost_sensitive
    )
    
    model = registry.get_model(decision.model_id)
    
    return {
        "model_id": decision.model_id,
        "reason": decision.reason,
        "confidence": decision.confidence,
        "fallback_model": decision.fallback_model,
        "model_info": {
            "provider": model.provider,
            "context_window": model.context_window,
            "cost_per_1k_input": model.cost_per_1k_input
        } if model else None
    }

@app.get("/v1/models")
async def list_models():
    """List all available models."""
    
    return {
        "models": [
            {
                "model_id": m.model_id,
                "provider": m.provider,
                "context_window": m.context_window,
                "latency_tier": m.latency_tier,
                "cost_per_1k_input": m.cost_per_1k_input,
                "cost_per_1k_output": m.cost_per_1k_output,
                "capabilities": {
                    "reasoning": m.reasoning,
                    "coding": m.coding,
                    "creativity": m.creativity,
                    "math": m.math
                },
                "features": {
                    "json_mode": m.supports_json_mode,
                    "function_calling": m.supports_function_calling,
                    "vision": m.supports_vision
                }
            }
            for m in registry.models.values()
        ]
    }

@app.get("/v1/models/{model_id}")
async def get_model(model_id: str):
    """Get model details."""
    
    model = registry.get_model(model_id)
    if not model:
        raise HTTPException(status_code=404, detail="Model not found")
    
    return {
        "model_id": model.model_id,
        "provider": model.provider,
        "context_window": model.context_window,
        "max_output_tokens": model.max_output_tokens,
        "latency_tier": model.latency_tier,
        "cost_per_1k_input": model.cost_per_1k_input,
        "cost_per_1k_output": model.cost_per_1k_output,
        "capabilities": {
            "reasoning": model.reasoning,
            "coding": model.coding,
            "creativity": model.creativity,
            "instruction_following": model.instruction_following,
            "factual_accuracy": model.factual_accuracy,
            "math": model.math,
            "multilingual": model.multilingual
        },
        "features": {
            "json_mode": model.supports_json_mode,
            "function_calling": model.supports_function_calling,
            "vision": model.supports_vision,
            "streaming": model.supports_streaming
        }
    }

@app.get("/v1/health")
async def health():
    """Get router health."""
    
    return {
        "status": "healthy",
        "models_available": len(registry.models),
        "model_health": {
            model_id: router.get_model_health(model_id)
            for model_id in registry.models.keys()
        }
    }

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

References

Conclusion

Model selection is not a one-time decision but an ongoing optimization process. Build a registry of models with their capabilities, costs, and performance characteristics. Analyze tasks to understand their requirements—complexity, latency sensitivity, cost constraints, and special features needed. Use benchmarking to validate model performance on your specific use cases rather than relying solely on public benchmarks. Implement dynamic routing that selects models based on task requirements and adapts based on performance history. Consider cost-optimized routing with budget constraints and A/B testing to validate model changes. The key insight is that different tasks have different optimal models—a simple classification task doesn't need GPT-4, while complex reasoning tasks benefit from more capable models. Start with task analysis and basic selection, add benchmarking to validate choices, then implement dynamic routing as your system matures.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.