Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Multi-Model Orchestration: Routing, Parallel Execution, and Specialized Pipelines

Introduction: Production LLM applications often benefit from using multiple models—routing simple queries to cheaper models, using specialized models for specific tasks, and falling back to alternatives when primary models fail. Multi-model orchestration enables cost optimization, improved reliability, and access to each model’s unique strengths. This guide covers practical orchestration patterns: model routing based on query complexity, parallel model calls for consensus, specialized model pipelines, and unified interfaces that abstract model differences.

Multi-Model Orchestration
Multi-Model Orchestration: Router, Model Pool, and Response Selection

Unified Model Interface

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, AsyncGenerator
from enum import Enum

class ModelProvider(str, Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    GOOGLE = "google"
    LOCAL = "local"

@dataclass
class ModelConfig:
    """Configuration for a model."""
    
    provider: ModelProvider
    model_id: str
    max_tokens: int = 4096
    temperature: float = 0.7
    cost_per_1k_input: float = 0.0
    cost_per_1k_output: float = 0.0

@dataclass
class CompletionResponse:
    """Unified response format."""
    
    content: str
    model: str
    provider: ModelProvider
    input_tokens: int
    output_tokens: int
    latency_ms: float
    cost: float

class BaseModelClient(ABC):
    """Abstract base for model clients."""
    
    def __init__(self, config: ModelConfig):
        self.config = config
    
    @abstractmethod
    async def complete(
        self,
        messages: list[dict],
        **kwargs
    ) -> CompletionResponse:
        """Generate completion."""
        pass
    
    @abstractmethod
    async def stream(
        self,
        messages: list[dict],
        **kwargs
    ) -> AsyncGenerator[str, None]:
        """Stream completion."""
        pass
    
    def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
        """Calculate cost for token usage."""
        
        return (
            (input_tokens / 1000) * self.config.cost_per_1k_input +
            (output_tokens / 1000) * self.config.cost_per_1k_output
        )

class OpenAIClient(BaseModelClient):
    """OpenAI model client."""
    
    def __init__(self, config: ModelConfig):
        super().__init__(config)
        from openai import AsyncOpenAI
        self.client = AsyncOpenAI()
    
    async def complete(
        self,
        messages: list[dict],
        **kwargs
    ) -> CompletionResponse:
        import time
        start = time.time()
        
        response = await self.client.chat.completions.create(
            model=self.config.model_id,
            messages=messages,
            max_tokens=kwargs.get("max_tokens", self.config.max_tokens),
            temperature=kwargs.get("temperature", self.config.temperature)
        )
        
        latency = (time.time() - start) * 1000
        
        return CompletionResponse(
            content=response.choices[0].message.content,
            model=self.config.model_id,
            provider=ModelProvider.OPENAI,
            input_tokens=response.usage.prompt_tokens,
            output_tokens=response.usage.completion_tokens,
            latency_ms=latency,
            cost=self.calculate_cost(
                response.usage.prompt_tokens,
                response.usage.completion_tokens
            )
        )
    
    async def stream(
        self,
        messages: list[dict],
        **kwargs
    ) -> AsyncGenerator[str, None]:
        response = await self.client.chat.completions.create(
            model=self.config.model_id,
            messages=messages,
            stream=True
        )
        
        async for chunk in response:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

class AnthropicClient(BaseModelClient):
    """Anthropic Claude client."""
    
    def __init__(self, config: ModelConfig):
        super().__init__(config)
        from anthropic import AsyncAnthropic
        self.client = AsyncAnthropic()
    
    async def complete(
        self,
        messages: list[dict],
        **kwargs
    ) -> CompletionResponse:
        import time
        start = time.time()
        
        # Convert messages format
        system = None
        converted = []
        
        for msg in messages:
            if msg["role"] == "system":
                system = msg["content"]
            else:
                converted.append(msg)
        
        response = await self.client.messages.create(
            model=self.config.model_id,
            messages=converted,
            system=system,
            max_tokens=kwargs.get("max_tokens", self.config.max_tokens)
        )
        
        latency = (time.time() - start) * 1000
        
        return CompletionResponse(
            content=response.content[0].text,
            model=self.config.model_id,
            provider=ModelProvider.ANTHROPIC,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            latency_ms=latency,
            cost=self.calculate_cost(
                response.usage.input_tokens,
                response.usage.output_tokens
            )
        )
    
    async def stream(
        self,
        messages: list[dict],
        **kwargs
    ) -> AsyncGenerator[str, None]:
        system = None
        converted = []
        
        for msg in messages:
            if msg["role"] == "system":
                system = msg["content"]
            else:
                converted.append(msg)
        
        async with self.client.messages.stream(
            model=self.config.model_id,
            messages=converted,
            system=system,
            max_tokens=kwargs.get("max_tokens", self.config.max_tokens)
        ) as stream:
            async for text in stream.text_stream:
                yield text

Model Router

from dataclasses import dataclass
from typing import Callable
import re

@dataclass
class RoutingRule:
    """Rule for routing to a model."""
    
    name: str
    condition: Callable[[str, list[dict]], bool]
    model_id: str
    priority: int = 0

class ModelRouter:
    """Route requests to appropriate models."""
    
    def __init__(self):
        self.rules: list[RoutingRule] = []
        self.default_model: str = "gpt-4o-mini"
        self.clients: dict[str, BaseModelClient] = {}
    
    def register_client(self, model_id: str, client: BaseModelClient):
        """Register a model client."""
        self.clients[model_id] = client
    
    def add_rule(self, rule: RoutingRule):
        """Add routing rule."""
        self.rules.append(rule)
        self.rules.sort(key=lambda r: r.priority, reverse=True)
    
    def route(self, prompt: str, messages: list[dict] = None) -> str:
        """Determine which model to use."""
        
        messages = messages or [{"role": "user", "content": prompt}]
        
        for rule in self.rules:
            if rule.condition(prompt, messages):
                return rule.model_id
        
        return self.default_model
    
    async def complete(
        self,
        prompt: str,
        messages: list[dict] = None,
        **kwargs
    ) -> CompletionResponse:
        """Route and complete."""
        
        messages = messages or [{"role": "user", "content": prompt}]
        model_id = self.route(prompt, messages)
        
        client = self.clients.get(model_id)
        if not client:
            raise ValueError(f"No client for model: {model_id}")
        
        return await client.complete(messages, **kwargs)

# Complexity-based routing
class ComplexityRouter(ModelRouter):
    """Route based on query complexity."""
    
    def __init__(self):
        super().__init__()
        self._setup_rules()
    
    def _setup_rules(self):
        """Setup complexity-based rules."""
        
        # Simple queries -> cheap model
        self.add_rule(RoutingRule(
            name="simple_query",
            condition=self._is_simple,
            model_id="gpt-4o-mini",
            priority=1
        ))
        
        # Code generation -> capable model
        self.add_rule(RoutingRule(
            name="code_generation",
            condition=self._is_code,
            model_id="gpt-4o",
            priority=2
        ))
        
        # Complex reasoning -> best model
        self.add_rule(RoutingRule(
            name="complex_reasoning",
            condition=self._is_complex,
            model_id="claude-3-5-sonnet-20241022",
            priority=3
        ))
    
    def _is_simple(self, prompt: str, messages: list[dict]) -> bool:
        """Check if query is simple."""
        
        simple_patterns = [
            r"^(what|who|when|where) is",
            r"^define ",
            r"^translate ",
            r"^summarize ",
        ]
        
        prompt_lower = prompt.lower()
        
        for pattern in simple_patterns:
            if re.match(pattern, prompt_lower):
                return True
        
        # Short prompts are often simple
        return len(prompt.split()) < 20
    
    def _is_code(self, prompt: str, messages: list[dict]) -> bool:
        """Check if query involves code."""
        
        code_keywords = [
            "code", "function", "class", "implement",
            "python", "javascript", "typescript", "sql",
            "debug", "fix", "refactor", "optimize"
        ]
        
        prompt_lower = prompt.lower()
        return any(kw in prompt_lower for kw in code_keywords)
    
    def _is_complex(self, prompt: str, messages: list[dict]) -> bool:
        """Check if query requires complex reasoning."""
        
        complex_indicators = [
            "analyze", "compare", "evaluate", "critique",
            "step by step", "reasoning", "explain why",
            "pros and cons", "trade-offs"
        ]
        
        prompt_lower = prompt.lower()
        
        # Long prompts often need more reasoning
        if len(prompt.split()) > 100:
            return True
        
        return any(ind in prompt_lower for ind in complex_indicators)

# Cost-optimized routing
class CostOptimizedRouter(ModelRouter):
    """Route to minimize cost while meeting quality."""
    
    def __init__(self, max_cost_per_request: float = 0.01):
        super().__init__()
        self.max_cost = max_cost_per_request
        self.model_costs = {
            "gpt-4o-mini": 0.00015,
            "gpt-4o": 0.0025,
            "claude-3-5-sonnet-20241022": 0.003,
            "claude-3-opus-20240229": 0.015
        }
    
    def route(self, prompt: str, messages: list[dict] = None) -> str:
        """Route based on estimated cost."""
        
        # Estimate tokens
        estimated_tokens = len(prompt.split()) * 2
        
        # Find cheapest model that fits budget
        for model_id, cost_per_1k in sorted(
            self.model_costs.items(),
            key=lambda x: x[1]
        ):
            estimated_cost = (estimated_tokens / 1000) * cost_per_1k * 2
            
            if estimated_cost <= self.max_cost:
                return model_id
        
        return self.default_model

Parallel Model Execution

import asyncio
from typing import Callable

class ParallelExecutor:
    """Execute multiple models in parallel."""
    
    def __init__(self, clients: dict[str, BaseModelClient]):
        self.clients = clients
    
    async def race(
        self,
        messages: list[dict],
        models: list[str] = None,
        **kwargs
    ) -> CompletionResponse:
        """Return first successful response."""
        
        models = models or list(self.clients.keys())
        
        async def call_model(model_id: str):
            client = self.clients[model_id]
            return await client.complete(messages, **kwargs)
        
        tasks = [
            asyncio.create_task(call_model(m))
            for m in models
        ]
        
        done, pending = await asyncio.wait(
            tasks,
            return_when=asyncio.FIRST_COMPLETED
        )
        
        # Cancel pending tasks
        for task in pending:
            task.cancel()
        
        # Return first result
        for task in done:
            try:
                return task.result()
            except Exception:
                continue
        
        raise Exception("All models failed")
    
    async def all(
        self,
        messages: list[dict],
        models: list[str] = None,
        **kwargs
    ) -> list[CompletionResponse]:
        """Get responses from all models."""
        
        models = models or list(self.clients.keys())
        
        async def call_model(model_id: str):
            try:
                client = self.clients[model_id]
                return await client.complete(messages, **kwargs)
            except Exception as e:
                return None
        
        tasks = [call_model(m) for m in models]
        results = await asyncio.gather(*tasks)
        
        return [r for r in results if r is not None]
    
    async def consensus(
        self,
        messages: list[dict],
        models: list[str] = None,
        selector: Callable[[list[CompletionResponse]], CompletionResponse] = None,
        **kwargs
    ) -> CompletionResponse:
        """Get consensus from multiple models."""
        
        responses = await self.all(messages, models, **kwargs)
        
        if not responses:
            raise Exception("No successful responses")
        
        if selector:
            return selector(responses)
        
        # Default: return longest response
        return max(responses, key=lambda r: len(r.content))

# Voting-based consensus
class VotingConsensus:
    """Select response based on voting."""
    
    def __init__(self, judge_client: BaseModelClient):
        self.judge = judge_client
    
    async def select_best(
        self,
        query: str,
        responses: list[CompletionResponse]
    ) -> CompletionResponse:
        """Use LLM to select best response."""
        
        # Format responses for judging
        formatted = "\n\n".join([
            f"Response {i+1} ({r.model}):\n{r.content}"
            for i, r in enumerate(responses)
        ])
        
        judge_prompt = f"""Given this query: {query}

And these responses:
{formatted}

Which response is best? Reply with just the number (1, 2, etc.)."""
        
        result = await self.judge.complete([
            {"role": "user", "content": judge_prompt}
        ])
        
        # Parse selection
        try:
            selection = int(result.content.strip()) - 1
            return responses[selection]
        except:
            return responses[0]

Specialized Model Pipelines

from dataclasses import dataclass
from typing import Any

@dataclass
class PipelineStep:
    """A step in the pipeline."""
    
    name: str
    model_id: str
    prompt_template: str
    output_key: str

class ModelPipeline:
    """Chain multiple models for complex tasks."""
    
    def __init__(self, clients: dict[str, BaseModelClient]):
        self.clients = clients
        self.steps: list[PipelineStep] = []
    
    def add_step(self, step: PipelineStep):
        """Add step to pipeline."""
        self.steps.append(step)
        return self
    
    async def execute(
        self,
        initial_input: dict[str, Any]
    ) -> dict[str, Any]:
        """Execute pipeline."""
        
        context = initial_input.copy()
        
        for step in self.steps:
            # Format prompt with context
            prompt = step.prompt_template.format(**context)
            
            # Get client
            client = self.clients.get(step.model_id)
            if not client:
                raise ValueError(f"No client for: {step.model_id}")
            
            # Execute step
            response = await client.complete([
                {"role": "user", "content": prompt}
            ])
            
            # Store result
            context[step.output_key] = response.content
        
        return context

# Example: Research pipeline
def create_research_pipeline(clients: dict[str, BaseModelClient]) -> ModelPipeline:
    """Create a research pipeline."""
    
    pipeline = ModelPipeline(clients)
    
    # Step 1: Generate search queries (fast model)
    pipeline.add_step(PipelineStep(
        name="generate_queries",
        model_id="gpt-4o-mini",
        prompt_template="Generate 3 search queries for researching: {topic}",
        output_key="queries"
    ))
    
    # Step 2: Analyze and synthesize (capable model)
    pipeline.add_step(PipelineStep(
        name="synthesize",
        model_id="gpt-4o",
        prompt_template="""Based on these search queries: {queries}

Provide a comprehensive analysis of: {topic}

Include key findings, different perspectives, and conclusions.""",
        output_key="analysis"
    ))
    
    # Step 3: Generate summary (fast model)
    pipeline.add_step(PipelineStep(
        name="summarize",
        model_id="gpt-4o-mini",
        prompt_template="Summarize this analysis in 3 bullet points:\n\n{analysis}",
        output_key="summary"
    ))
    
    return pipeline

# Task-specific routing
class TaskRouter:
    """Route to specialized models by task type."""
    
    def __init__(self, clients: dict[str, BaseModelClient]):
        self.clients = clients
        
        # Task to model mapping
        self.task_models = {
            "code": "gpt-4o",
            "creative": "claude-3-5-sonnet-20241022",
            "analysis": "gpt-4o",
            "translation": "gpt-4o-mini",
            "summarization": "gpt-4o-mini",
            "conversation": "gpt-4o-mini"
        }
    
    def detect_task(self, prompt: str) -> str:
        """Detect task type from prompt."""
        
        prompt_lower = prompt.lower()
        
        if any(kw in prompt_lower for kw in ["code", "function", "implement", "debug"]):
            return "code"
        
        if any(kw in prompt_lower for kw in ["write", "story", "creative", "poem"]):
            return "creative"
        
        if any(kw in prompt_lower for kw in ["analyze", "compare", "evaluate"]):
            return "analysis"
        
        if any(kw in prompt_lower for kw in ["translate", "translation"]):
            return "translation"
        
        if any(kw in prompt_lower for kw in ["summarize", "summary", "tldr"]):
            return "summarization"
        
        return "conversation"
    
    async def complete(
        self,
        prompt: str,
        task_type: str = None,
        **kwargs
    ) -> CompletionResponse:
        """Complete with task-appropriate model."""
        
        task = task_type or self.detect_task(prompt)
        model_id = self.task_models.get(task, "gpt-4o-mini")
        
        client = self.clients.get(model_id)
        if not client:
            raise ValueError(f"No client for: {model_id}")
        
        return await client.complete(
            [{"role": "user", "content": prompt}],
            **kwargs
        )

Production Orchestration Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize clients
MODEL_CONFIGS = {
    "gpt-4o-mini": ModelConfig(
        provider=ModelProvider.OPENAI,
        model_id="gpt-4o-mini",
        cost_per_1k_input=0.00015,
        cost_per_1k_output=0.0006
    ),
    "gpt-4o": ModelConfig(
        provider=ModelProvider.OPENAI,
        model_id="gpt-4o",
        cost_per_1k_input=0.0025,
        cost_per_1k_output=0.01
    ),
    "claude-3-5-sonnet-20241022": ModelConfig(
        provider=ModelProvider.ANTHROPIC,
        model_id="claude-3-5-sonnet-20241022",
        cost_per_1k_input=0.003,
        cost_per_1k_output=0.015
    )
}

clients = {}
for model_id, config in MODEL_CONFIGS.items():
    if config.provider == ModelProvider.OPENAI:
        clients[model_id] = OpenAIClient(config)
    elif config.provider == ModelProvider.ANTHROPIC:
        clients[model_id] = AnthropicClient(config)

# Initialize routers
complexity_router = ComplexityRouter()
for model_id, client in clients.items():
    complexity_router.register_client(model_id, client)

parallel_executor = ParallelExecutor(clients)
task_router = TaskRouter(clients)

class CompletionRequest(BaseModel):
    prompt: str
    routing: str = "auto"  # auto, specific, parallel
    model: Optional[str] = None
    task_type: Optional[str] = None

@app.post("/v1/complete")
async def complete(request: CompletionRequest):
    """Multi-model completion endpoint."""
    
    try:
        if request.routing == "specific" and request.model:
            # Use specific model
            client = clients.get(request.model)
            if not client:
                raise HTTPException(400, f"Unknown model: {request.model}")
            
            response = await client.complete([
                {"role": "user", "content": request.prompt}
            ])
        
        elif request.routing == "parallel":
            # Race all models
            response = await parallel_executor.race([
                {"role": "user", "content": request.prompt}
            ])
        
        elif request.task_type:
            # Task-based routing
            response = await task_router.complete(
                request.prompt,
                task_type=request.task_type
            )
        
        else:
            # Auto routing based on complexity
            response = await complexity_router.complete(request.prompt)
        
        return {
            "content": response.content,
            "model": response.model,
            "provider": response.provider.value,
            "tokens": {
                "input": response.input_tokens,
                "output": response.output_tokens
            },
            "latency_ms": response.latency_ms,
            "cost": response.cost
        }
        
    except Exception as e:
        raise HTTPException(500, str(e))

@app.get("/v1/models")
async def list_models():
    """List available models."""
    
    return {
        "models": [
            {
                "id": model_id,
                "provider": config.provider.value,
                "cost_per_1k_input": config.cost_per_1k_input,
                "cost_per_1k_output": config.cost_per_1k_output
            }
            for model_id, config in MODEL_CONFIGS.items()
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy", "models": len(clients)}

References

Conclusion

Multi-model orchestration unlocks significant benefits: cost savings by routing simple queries to cheaper models, improved reliability through fallbacks, and better results by using specialized models for specific tasks. Start with a unified interface that abstracts provider differences—this makes switching models trivial. Implement complexity-based routing to automatically select appropriate models. Use parallel execution for latency-critical applications or consensus-based selection for high-stakes decisions. Build specialized pipelines that chain models for complex workflows. Monitor costs and latency across models to continuously optimize routing rules. The goal is leveraging each model's strengths while minimizing costs and maximizing reliability.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

  

  

  

This site uses Akismet to reduce spam. Learn how your comment data is processed.