Introduction: Production LLM applications often benefit from using multiple models—routing simple queries to cheaper models, using specialized models for specific tasks, and falling back to alternatives when primary models fail. Multi-model orchestration enables cost optimization, improved reliability, and access to each model’s unique strengths. This guide covers practical orchestration patterns: model routing based on query complexity, parallel model calls for consensus, specialized model pipelines, and unified interfaces that abstract model differences.

Unified Model Interface
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, AsyncGenerator
from enum import Enum
class ModelProvider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
GOOGLE = "google"
LOCAL = "local"
@dataclass
class ModelConfig:
"""Configuration for a model."""
provider: ModelProvider
model_id: str
max_tokens: int = 4096
temperature: float = 0.7
cost_per_1k_input: float = 0.0
cost_per_1k_output: float = 0.0
@dataclass
class CompletionResponse:
"""Unified response format."""
content: str
model: str
provider: ModelProvider
input_tokens: int
output_tokens: int
latency_ms: float
cost: float
class BaseModelClient(ABC):
"""Abstract base for model clients."""
def __init__(self, config: ModelConfig):
self.config = config
@abstractmethod
async def complete(
self,
messages: list[dict],
**kwargs
) -> CompletionResponse:
"""Generate completion."""
pass
@abstractmethod
async def stream(
self,
messages: list[dict],
**kwargs
) -> AsyncGenerator[str, None]:
"""Stream completion."""
pass
def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost for token usage."""
return (
(input_tokens / 1000) * self.config.cost_per_1k_input +
(output_tokens / 1000) * self.config.cost_per_1k_output
)
class OpenAIClient(BaseModelClient):
"""OpenAI model client."""
def __init__(self, config: ModelConfig):
super().__init__(config)
from openai import AsyncOpenAI
self.client = AsyncOpenAI()
async def complete(
self,
messages: list[dict],
**kwargs
) -> CompletionResponse:
import time
start = time.time()
response = await self.client.chat.completions.create(
model=self.config.model_id,
messages=messages,
max_tokens=kwargs.get("max_tokens", self.config.max_tokens),
temperature=kwargs.get("temperature", self.config.temperature)
)
latency = (time.time() - start) * 1000
return CompletionResponse(
content=response.choices[0].message.content,
model=self.config.model_id,
provider=ModelProvider.OPENAI,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
latency_ms=latency,
cost=self.calculate_cost(
response.usage.prompt_tokens,
response.usage.completion_tokens
)
)
async def stream(
self,
messages: list[dict],
**kwargs
) -> AsyncGenerator[str, None]:
response = await self.client.chat.completions.create(
model=self.config.model_id,
messages=messages,
stream=True
)
async for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
class AnthropicClient(BaseModelClient):
"""Anthropic Claude client."""
def __init__(self, config: ModelConfig):
super().__init__(config)
from anthropic import AsyncAnthropic
self.client = AsyncAnthropic()
async def complete(
self,
messages: list[dict],
**kwargs
) -> CompletionResponse:
import time
start = time.time()
# Convert messages format
system = None
converted = []
for msg in messages:
if msg["role"] == "system":
system = msg["content"]
else:
converted.append(msg)
response = await self.client.messages.create(
model=self.config.model_id,
messages=converted,
system=system,
max_tokens=kwargs.get("max_tokens", self.config.max_tokens)
)
latency = (time.time() - start) * 1000
return CompletionResponse(
content=response.content[0].text,
model=self.config.model_id,
provider=ModelProvider.ANTHROPIC,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
latency_ms=latency,
cost=self.calculate_cost(
response.usage.input_tokens,
response.usage.output_tokens
)
)
async def stream(
self,
messages: list[dict],
**kwargs
) -> AsyncGenerator[str, None]:
system = None
converted = []
for msg in messages:
if msg["role"] == "system":
system = msg["content"]
else:
converted.append(msg)
async with self.client.messages.stream(
model=self.config.model_id,
messages=converted,
system=system,
max_tokens=kwargs.get("max_tokens", self.config.max_tokens)
) as stream:
async for text in stream.text_stream:
yield text
Model Router
from dataclasses import dataclass
from typing import Callable
import re
@dataclass
class RoutingRule:
"""Rule for routing to a model."""
name: str
condition: Callable[[str, list[dict]], bool]
model_id: str
priority: int = 0
class ModelRouter:
"""Route requests to appropriate models."""
def __init__(self):
self.rules: list[RoutingRule] = []
self.default_model: str = "gpt-4o-mini"
self.clients: dict[str, BaseModelClient] = {}
def register_client(self, model_id: str, client: BaseModelClient):
"""Register a model client."""
self.clients[model_id] = client
def add_rule(self, rule: RoutingRule):
"""Add routing rule."""
self.rules.append(rule)
self.rules.sort(key=lambda r: r.priority, reverse=True)
def route(self, prompt: str, messages: list[dict] = None) -> str:
"""Determine which model to use."""
messages = messages or [{"role": "user", "content": prompt}]
for rule in self.rules:
if rule.condition(prompt, messages):
return rule.model_id
return self.default_model
async def complete(
self,
prompt: str,
messages: list[dict] = None,
**kwargs
) -> CompletionResponse:
"""Route and complete."""
messages = messages or [{"role": "user", "content": prompt}]
model_id = self.route(prompt, messages)
client = self.clients.get(model_id)
if not client:
raise ValueError(f"No client for model: {model_id}")
return await client.complete(messages, **kwargs)
# Complexity-based routing
class ComplexityRouter(ModelRouter):
"""Route based on query complexity."""
def __init__(self):
super().__init__()
self._setup_rules()
def _setup_rules(self):
"""Setup complexity-based rules."""
# Simple queries -> cheap model
self.add_rule(RoutingRule(
name="simple_query",
condition=self._is_simple,
model_id="gpt-4o-mini",
priority=1
))
# Code generation -> capable model
self.add_rule(RoutingRule(
name="code_generation",
condition=self._is_code,
model_id="gpt-4o",
priority=2
))
# Complex reasoning -> best model
self.add_rule(RoutingRule(
name="complex_reasoning",
condition=self._is_complex,
model_id="claude-3-5-sonnet-20241022",
priority=3
))
def _is_simple(self, prompt: str, messages: list[dict]) -> bool:
"""Check if query is simple."""
simple_patterns = [
r"^(what|who|when|where) is",
r"^define ",
r"^translate ",
r"^summarize ",
]
prompt_lower = prompt.lower()
for pattern in simple_patterns:
if re.match(pattern, prompt_lower):
return True
# Short prompts are often simple
return len(prompt.split()) < 20
def _is_code(self, prompt: str, messages: list[dict]) -> bool:
"""Check if query involves code."""
code_keywords = [
"code", "function", "class", "implement",
"python", "javascript", "typescript", "sql",
"debug", "fix", "refactor", "optimize"
]
prompt_lower = prompt.lower()
return any(kw in prompt_lower for kw in code_keywords)
def _is_complex(self, prompt: str, messages: list[dict]) -> bool:
"""Check if query requires complex reasoning."""
complex_indicators = [
"analyze", "compare", "evaluate", "critique",
"step by step", "reasoning", "explain why",
"pros and cons", "trade-offs"
]
prompt_lower = prompt.lower()
# Long prompts often need more reasoning
if len(prompt.split()) > 100:
return True
return any(ind in prompt_lower for ind in complex_indicators)
# Cost-optimized routing
class CostOptimizedRouter(ModelRouter):
"""Route to minimize cost while meeting quality."""
def __init__(self, max_cost_per_request: float = 0.01):
super().__init__()
self.max_cost = max_cost_per_request
self.model_costs = {
"gpt-4o-mini": 0.00015,
"gpt-4o": 0.0025,
"claude-3-5-sonnet-20241022": 0.003,
"claude-3-opus-20240229": 0.015
}
def route(self, prompt: str, messages: list[dict] = None) -> str:
"""Route based on estimated cost."""
# Estimate tokens
estimated_tokens = len(prompt.split()) * 2
# Find cheapest model that fits budget
for model_id, cost_per_1k in sorted(
self.model_costs.items(),
key=lambda x: x[1]
):
estimated_cost = (estimated_tokens / 1000) * cost_per_1k * 2
if estimated_cost <= self.max_cost:
return model_id
return self.default_model
Parallel Model Execution
import asyncio
from typing import Callable
class ParallelExecutor:
"""Execute multiple models in parallel."""
def __init__(self, clients: dict[str, BaseModelClient]):
self.clients = clients
async def race(
self,
messages: list[dict],
models: list[str] = None,
**kwargs
) -> CompletionResponse:
"""Return first successful response."""
models = models or list(self.clients.keys())
async def call_model(model_id: str):
client = self.clients[model_id]
return await client.complete(messages, **kwargs)
tasks = [
asyncio.create_task(call_model(m))
for m in models
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending tasks
for task in pending:
task.cancel()
# Return first result
for task in done:
try:
return task.result()
except Exception:
continue
raise Exception("All models failed")
async def all(
self,
messages: list[dict],
models: list[str] = None,
**kwargs
) -> list[CompletionResponse]:
"""Get responses from all models."""
models = models or list(self.clients.keys())
async def call_model(model_id: str):
try:
client = self.clients[model_id]
return await client.complete(messages, **kwargs)
except Exception as e:
return None
tasks = [call_model(m) for m in models]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
async def consensus(
self,
messages: list[dict],
models: list[str] = None,
selector: Callable[[list[CompletionResponse]], CompletionResponse] = None,
**kwargs
) -> CompletionResponse:
"""Get consensus from multiple models."""
responses = await self.all(messages, models, **kwargs)
if not responses:
raise Exception("No successful responses")
if selector:
return selector(responses)
# Default: return longest response
return max(responses, key=lambda r: len(r.content))
# Voting-based consensus
class VotingConsensus:
"""Select response based on voting."""
def __init__(self, judge_client: BaseModelClient):
self.judge = judge_client
async def select_best(
self,
query: str,
responses: list[CompletionResponse]
) -> CompletionResponse:
"""Use LLM to select best response."""
# Format responses for judging
formatted = "\n\n".join([
f"Response {i+1} ({r.model}):\n{r.content}"
for i, r in enumerate(responses)
])
judge_prompt = f"""Given this query: {query}
And these responses:
{formatted}
Which response is best? Reply with just the number (1, 2, etc.)."""
result = await self.judge.complete([
{"role": "user", "content": judge_prompt}
])
# Parse selection
try:
selection = int(result.content.strip()) - 1
return responses[selection]
except:
return responses[0]
Specialized Model Pipelines
from dataclasses import dataclass
from typing import Any
@dataclass
class PipelineStep:
"""A step in the pipeline."""
name: str
model_id: str
prompt_template: str
output_key: str
class ModelPipeline:
"""Chain multiple models for complex tasks."""
def __init__(self, clients: dict[str, BaseModelClient]):
self.clients = clients
self.steps: list[PipelineStep] = []
def add_step(self, step: PipelineStep):
"""Add step to pipeline."""
self.steps.append(step)
return self
async def execute(
self,
initial_input: dict[str, Any]
) -> dict[str, Any]:
"""Execute pipeline."""
context = initial_input.copy()
for step in self.steps:
# Format prompt with context
prompt = step.prompt_template.format(**context)
# Get client
client = self.clients.get(step.model_id)
if not client:
raise ValueError(f"No client for: {step.model_id}")
# Execute step
response = await client.complete([
{"role": "user", "content": prompt}
])
# Store result
context[step.output_key] = response.content
return context
# Example: Research pipeline
def create_research_pipeline(clients: dict[str, BaseModelClient]) -> ModelPipeline:
"""Create a research pipeline."""
pipeline = ModelPipeline(clients)
# Step 1: Generate search queries (fast model)
pipeline.add_step(PipelineStep(
name="generate_queries",
model_id="gpt-4o-mini",
prompt_template="Generate 3 search queries for researching: {topic}",
output_key="queries"
))
# Step 2: Analyze and synthesize (capable model)
pipeline.add_step(PipelineStep(
name="synthesize",
model_id="gpt-4o",
prompt_template="""Based on these search queries: {queries}
Provide a comprehensive analysis of: {topic}
Include key findings, different perspectives, and conclusions.""",
output_key="analysis"
))
# Step 3: Generate summary (fast model)
pipeline.add_step(PipelineStep(
name="summarize",
model_id="gpt-4o-mini",
prompt_template="Summarize this analysis in 3 bullet points:\n\n{analysis}",
output_key="summary"
))
return pipeline
# Task-specific routing
class TaskRouter:
"""Route to specialized models by task type."""
def __init__(self, clients: dict[str, BaseModelClient]):
self.clients = clients
# Task to model mapping
self.task_models = {
"code": "gpt-4o",
"creative": "claude-3-5-sonnet-20241022",
"analysis": "gpt-4o",
"translation": "gpt-4o-mini",
"summarization": "gpt-4o-mini",
"conversation": "gpt-4o-mini"
}
def detect_task(self, prompt: str) -> str:
"""Detect task type from prompt."""
prompt_lower = prompt.lower()
if any(kw in prompt_lower for kw in ["code", "function", "implement", "debug"]):
return "code"
if any(kw in prompt_lower for kw in ["write", "story", "creative", "poem"]):
return "creative"
if any(kw in prompt_lower for kw in ["analyze", "compare", "evaluate"]):
return "analysis"
if any(kw in prompt_lower for kw in ["translate", "translation"]):
return "translation"
if any(kw in prompt_lower for kw in ["summarize", "summary", "tldr"]):
return "summarization"
return "conversation"
async def complete(
self,
prompt: str,
task_type: str = None,
**kwargs
) -> CompletionResponse:
"""Complete with task-appropriate model."""
task = task_type or self.detect_task(prompt)
model_id = self.task_models.get(task, "gpt-4o-mini")
client = self.clients.get(model_id)
if not client:
raise ValueError(f"No client for: {model_id}")
return await client.complete(
[{"role": "user", "content": prompt}],
**kwargs
)
Production Orchestration Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize clients
MODEL_CONFIGS = {
"gpt-4o-mini": ModelConfig(
provider=ModelProvider.OPENAI,
model_id="gpt-4o-mini",
cost_per_1k_input=0.00015,
cost_per_1k_output=0.0006
),
"gpt-4o": ModelConfig(
provider=ModelProvider.OPENAI,
model_id="gpt-4o",
cost_per_1k_input=0.0025,
cost_per_1k_output=0.01
),
"claude-3-5-sonnet-20241022": ModelConfig(
provider=ModelProvider.ANTHROPIC,
model_id="claude-3-5-sonnet-20241022",
cost_per_1k_input=0.003,
cost_per_1k_output=0.015
)
}
clients = {}
for model_id, config in MODEL_CONFIGS.items():
if config.provider == ModelProvider.OPENAI:
clients[model_id] = OpenAIClient(config)
elif config.provider == ModelProvider.ANTHROPIC:
clients[model_id] = AnthropicClient(config)
# Initialize routers
complexity_router = ComplexityRouter()
for model_id, client in clients.items():
complexity_router.register_client(model_id, client)
parallel_executor = ParallelExecutor(clients)
task_router = TaskRouter(clients)
class CompletionRequest(BaseModel):
prompt: str
routing: str = "auto" # auto, specific, parallel
model: Optional[str] = None
task_type: Optional[str] = None
@app.post("/v1/complete")
async def complete(request: CompletionRequest):
"""Multi-model completion endpoint."""
try:
if request.routing == "specific" and request.model:
# Use specific model
client = clients.get(request.model)
if not client:
raise HTTPException(400, f"Unknown model: {request.model}")
response = await client.complete([
{"role": "user", "content": request.prompt}
])
elif request.routing == "parallel":
# Race all models
response = await parallel_executor.race([
{"role": "user", "content": request.prompt}
])
elif request.task_type:
# Task-based routing
response = await task_router.complete(
request.prompt,
task_type=request.task_type
)
else:
# Auto routing based on complexity
response = await complexity_router.complete(request.prompt)
return {
"content": response.content,
"model": response.model,
"provider": response.provider.value,
"tokens": {
"input": response.input_tokens,
"output": response.output_tokens
},
"latency_ms": response.latency_ms,
"cost": response.cost
}
except Exception as e:
raise HTTPException(500, str(e))
@app.get("/v1/models")
async def list_models():
"""List available models."""
return {
"models": [
{
"id": model_id,
"provider": config.provider.value,
"cost_per_1k_input": config.cost_per_1k_input,
"cost_per_1k_output": config.cost_per_1k_output
}
for model_id, config in MODEL_CONFIGS.items()
]
}
@app.get("/health")
async def health():
return {"status": "healthy", "models": len(clients)}
References
- OpenAI API: https://platform.openai.com/docs/api-reference
- Anthropic API: https://docs.anthropic.com/en/api
- LiteLLM: https://docs.litellm.ai/
- Router Patterns: https://martinfowler.com/articles/gateway-pattern.html
Conclusion
Multi-model orchestration unlocks significant benefits: cost savings by routing simple queries to cheaper models, improved reliability through fallbacks, and better results by using specialized models for specific tasks. Start with a unified interface that abstracts provider differences—this makes switching models trivial. Implement complexity-based routing to automatically select appropriate models. Use parallel execution for latency-critical applications or consensus-based selection for high-stakes decisions. Build specialized pipelines that chain models for complex workflows. Monitor costs and latency across models to continuously optimize routing rules. The goal is leveraging each model's strengths while minimizing costs and maximizing reliability.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply