Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

LLM Cost Optimization: Model Routing, Token Reduction, and Budget Management

Introduction: LLM API costs can escalate quickly—a single GPT-4 call costs 100x more than GPT-4o-mini for the same tokens. Effective cost optimization requires a multi-pronged approach: intelligent model routing based on task complexity, aggressive caching for repeated queries, prompt optimization to reduce token usage, and batching to maximize throughput. This guide covers practical cost optimization strategies: model tiering and routing, token reduction techniques, caching layers, batch processing, and cost monitoring that helps you understand and control spending.

Cost Optimization
Cost Optimization: Model Routing, Caching, Batching

Model Tiering and Routing

from dataclasses import dataclass
from typing import Callable, Optional
from enum import Enum

class ModelTier(str, Enum):
    ECONOMY = "economy"      # GPT-4o-mini, Claude Haiku
    STANDARD = "standard"    # GPT-4o, Claude Sonnet
    PREMIUM = "premium"      # GPT-4, Claude Opus

@dataclass
class ModelConfig:
    """Model configuration with pricing."""
    
    name: str
    tier: ModelTier
    input_cost_per_1m: float   # Cost per 1M input tokens
    output_cost_per_1m: float  # Cost per 1M output tokens
    max_tokens: int
    
    def estimate_cost(self, input_tokens: int, output_tokens: int) -> float:
        """Estimate cost for request."""
        
        input_cost = (input_tokens / 1_000_000) * self.input_cost_per_1m
        output_cost = (output_tokens / 1_000_000) * self.output_cost_per_1m
        return input_cost + output_cost

# Model configurations (prices as of 2024)
MODELS = {
    "gpt-4o-mini": ModelConfig(
        name="gpt-4o-mini",
        tier=ModelTier.ECONOMY,
        input_cost_per_1m=0.15,
        output_cost_per_1m=0.60,
        max_tokens=128000
    ),
    "gpt-4o": ModelConfig(
        name="gpt-4o",
        tier=ModelTier.STANDARD,
        input_cost_per_1m=2.50,
        output_cost_per_1m=10.00,
        max_tokens=128000
    ),
    "gpt-4-turbo": ModelConfig(
        name="gpt-4-turbo",
        tier=ModelTier.PREMIUM,
        input_cost_per_1m=10.00,
        output_cost_per_1m=30.00,
        max_tokens=128000
    ),
    "claude-3-haiku": ModelConfig(
        name="claude-3-haiku-20240307",
        tier=ModelTier.ECONOMY,
        input_cost_per_1m=0.25,
        output_cost_per_1m=1.25,
        max_tokens=200000
    ),
    "claude-3-sonnet": ModelConfig(
        name="claude-3-5-sonnet-20241022",
        tier=ModelTier.STANDARD,
        input_cost_per_1m=3.00,
        output_cost_per_1m=15.00,
        max_tokens=200000
    ),
}

class CostAwareRouter:
    """Route requests to appropriate model tier."""
    
    def __init__(self, client):
        self.client = client
        self.classifiers: list[tuple[Callable, ModelTier]] = []
    
    def add_classifier(
        self,
        condition: Callable[[str], bool],
        tier: ModelTier
    ):
        """Add routing rule."""
        
        self.classifiers.append((condition, tier))
    
    def classify_complexity(self, query: str) -> ModelTier:
        """Classify query complexity."""
        
        # Check custom classifiers first
        for condition, tier in self.classifiers:
            if condition(query):
                return tier
        
        # Default heuristics
        query_lower = query.lower()
        
        # Premium tier indicators
        premium_keywords = [
            "analyze complex", "detailed analysis",
            "multi-step reasoning", "compare and contrast",
            "synthesize", "evaluate critically"
        ]
        
        if any(kw in query_lower for kw in premium_keywords):
            return ModelTier.PREMIUM
        
        # Standard tier indicators
        standard_keywords = [
            "explain", "summarize", "write",
            "describe", "outline", "draft"
        ]
        
        if any(kw in query_lower for kw in standard_keywords):
            return ModelTier.STANDARD
        
        # Default to economy
        return ModelTier.ECONOMY
    
    def get_model(self, tier: ModelTier) -> ModelConfig:
        """Get model for tier."""
        
        tier_models = {
            ModelTier.ECONOMY: MODELS["gpt-4o-mini"],
            ModelTier.STANDARD: MODELS["gpt-4o"],
            ModelTier.PREMIUM: MODELS["gpt-4-turbo"]
        }
        
        return tier_models[tier]
    
    def route(self, query: str) -> ModelConfig:
        """Route query to appropriate model."""
        
        tier = self.classify_complexity(query)
        return self.get_model(tier)

# Budget-constrained routing
class BudgetRouter:
    """Route with budget constraints."""
    
    def __init__(self, daily_budget: float):
        self.daily_budget = daily_budget
        self.daily_spent = 0.0
        self.request_count = 0
    
    def can_afford(self, model: ModelConfig, estimated_tokens: int) -> bool:
        """Check if request fits budget."""
        
        estimated_cost = model.estimate_cost(
            estimated_tokens,
            estimated_tokens // 2  # Assume output is half of input
        )
        
        return self.daily_spent + estimated_cost <= self.daily_budget
    
    def route_within_budget(
        self,
        preferred_model: ModelConfig,
        estimated_tokens: int
    ) -> ModelConfig:
        """Route to affordable model."""
        
        if self.can_afford(preferred_model, estimated_tokens):
            return preferred_model
        
        # Try cheaper alternatives
        cheaper_models = sorted(
            MODELS.values(),
            key=lambda m: m.input_cost_per_1m
        )
        
        for model in cheaper_models:
            if self.can_afford(model, estimated_tokens):
                return model
        
        raise ValueError("No affordable model available")
    
    def record_usage(self, model: ModelConfig, input_tokens: int, output_tokens: int):
        """Record usage for budget tracking."""
        
        cost = model.estimate_cost(input_tokens, output_tokens)
        self.daily_spent += cost
        self.request_count += 1
    
    def reset_daily(self):
        """Reset daily counters."""
        
        self.daily_spent = 0.0
        self.request_count = 0

Token Reduction

import tiktoken
from dataclasses import dataclass

@dataclass
class TokenStats:
    """Token usage statistics."""
    
    original_tokens: int
    optimized_tokens: int
    savings_percent: float

class PromptOptimizer:
    """Optimize prompts to reduce token usage."""
    
    def __init__(self, model: str = "gpt-4o"):
        self.encoding = tiktoken.encoding_for_model(model)
    
    def count_tokens(self, text: str) -> int:
        """Count tokens in text."""
        
        return len(self.encoding.encode(text))
    
    def optimize_prompt(self, prompt: str) -> tuple[str, TokenStats]:
        """Optimize prompt for fewer tokens."""
        
        original_tokens = self.count_tokens(prompt)
        
        optimized = prompt
        
        # Remove excessive whitespace
        optimized = ' '.join(optimized.split())
        
        # Remove redundant phrases
        redundant = [
            "Please ", "Could you please ",
            "I would like you to ", "Can you ",
            "I need you to ", "Would you mind "
        ]
        
        for phrase in redundant:
            optimized = optimized.replace(phrase, "")
        
        # Shorten common phrases
        replacements = {
            "in order to": "to",
            "as well as": "and",
            "in addition to": "plus",
            "with respect to": "regarding",
            "in the event that": "if",
            "at this point in time": "now",
            "due to the fact that": "because"
        }
        
        for old, new in replacements.items():
            optimized = optimized.replace(old, new)
        
        optimized_tokens = self.count_tokens(optimized)
        savings = (original_tokens - optimized_tokens) / original_tokens * 100
        
        return optimized, TokenStats(
            original_tokens=original_tokens,
            optimized_tokens=optimized_tokens,
            savings_percent=savings
        )
    
    def truncate_context(
        self,
        context: str,
        max_tokens: int,
        preserve_start: int = 100,
        preserve_end: int = 100
    ) -> str:
        """Truncate context while preserving start and end."""
        
        tokens = self.encoding.encode(context)
        
        if len(tokens) <= max_tokens:
            return context
        
        # Keep start and end
        start_tokens = tokens[:preserve_start]
        end_tokens = tokens[-preserve_end:]
        
        # Calculate middle budget
        middle_budget = max_tokens - preserve_start - preserve_end - 10
        
        if middle_budget > 0:
            middle_start = preserve_start
            middle_end = len(tokens) - preserve_end
            middle_tokens = tokens[middle_start:middle_start + middle_budget]
            
            truncated = start_tokens + middle_tokens + end_tokens
        else:
            truncated = start_tokens + end_tokens
        
        return self.encoding.decode(truncated)

class OutputLimiter:
    """Limit output tokens for cost control."""
    
    def __init__(self):
        self.task_limits = {
            "classification": 50,
            "extraction": 200,
            "summary": 300,
            "explanation": 500,
            "generation": 1000
        }
    
    def get_limit(self, task_type: str) -> int:
        """Get appropriate output limit for task."""
        
        return self.task_limits.get(task_type, 500)
    
    def add_length_instruction(
        self,
        prompt: str,
        task_type: str
    ) -> str:
        """Add length constraint to prompt."""
        
        limit = self.get_limit(task_type)
        word_limit = int(limit * 0.75)  # Rough token to word conversion
        
        return f"{prompt}\n\nKeep your response under {word_limit} words."

# Compression for context
class ContextCompressor:
    """Compress context to reduce tokens."""
    
    def __init__(self, client):
        self.client = client
        self.optimizer = PromptOptimizer()
    
    def compress(
        self,
        context: str,
        target_ratio: float = 0.5
    ) -> str:
        """Compress context using LLM."""
        
        original_tokens = self.optimizer.count_tokens(context)
        target_tokens = int(original_tokens * target_ratio)
        target_words = int(target_tokens * 0.75)
        
        prompt = f"""Compress the following text to approximately {target_words} words.
Preserve all key facts and important details.

Text:
{context}

Compressed version:"""
        
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",  # Use cheap model for compression
            messages=[{"role": "user", "content": prompt}],
            max_tokens=target_tokens + 100
        )
        
        return response.choices[0].message.content

Batch Processing

from dataclasses import dataclass
from typing import Callable
import asyncio
from datetime import datetime, timedelta

@dataclass
class BatchRequest:
    """A request in a batch."""
    
    id: str
    prompt: str
    callback: Callable[[str], None] = None
    created_at: datetime = None

@dataclass
class BatchResult:
    """Result of batch processing."""
    
    id: str
    response: str
    tokens_used: int
    cost: float

class BatchProcessor:
    """Process requests in batches for efficiency."""
    
    def __init__(
        self,
        client,
        model: str = "gpt-4o-mini",
        batch_size: int = 20,
        max_wait_seconds: float = 5.0
    ):
        self.client = client
        self.model = model
        self.batch_size = batch_size
        self.max_wait = max_wait_seconds
        
        self.pending: list[BatchRequest] = []
        self.results: dict[str, BatchResult] = {}
    
    async def add_request(self, request: BatchRequest) -> str:
        """Add request to batch queue."""
        
        request.created_at = datetime.now()
        self.pending.append(request)
        
        # Check if batch is ready
        if len(self.pending) >= self.batch_size:
            await self._process_batch()
        
        return request.id
    
    async def _process_batch(self):
        """Process pending batch."""
        
        if not self.pending:
            return
        
        batch = self.pending[:self.batch_size]
        self.pending = self.pending[self.batch_size:]
        
        # Create batch prompt
        batch_prompt = self._create_batch_prompt(batch)
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": batch_prompt}],
            response_format={"type": "json_object"}
        )
        
        # Parse batch response
        import json
        results = json.loads(response.choices[0].message.content)
        
        # Distribute results
        for req in batch:
            result_text = results.get(req.id, "")
            
            self.results[req.id] = BatchResult(
                id=req.id,
                response=result_text,
                tokens_used=response.usage.total_tokens // len(batch),
                cost=0  # Calculate based on model
            )
            
            if req.callback:
                req.callback(result_text)
    
    def _create_batch_prompt(self, batch: list[BatchRequest]) -> str:
        """Create combined prompt for batch."""
        
        prompts = "\n\n".join([
            f"[{req.id}]\n{req.prompt}"
            for req in batch
        ])
        
        return f"""Process each of the following requests and return a JSON object
with the request IDs as keys and responses as values.

Requests:
{prompts}

Return JSON: {{"id1": "response1", "id2": "response2", ...}}"""
    
    async def flush(self):
        """Process any remaining requests."""
        
        while self.pending:
            await self._process_batch()
    
    def get_result(self, request_id: str) -> Optional[BatchResult]:
        """Get result for request."""
        
        return self.results.get(request_id)

# Async batch with timeout
class TimedBatchProcessor:
    """Batch processor with time-based flushing."""
    
    def __init__(
        self,
        client,
        model: str = "gpt-4o-mini",
        batch_size: int = 20,
        flush_interval: float = 2.0
    ):
        self.processor = BatchProcessor(client, model, batch_size)
        self.flush_interval = flush_interval
        self._flush_task = None
    
    async def start(self):
        """Start background flush task."""
        
        self._flush_task = asyncio.create_task(self._flush_loop())
    
    async def stop(self):
        """Stop and flush remaining."""
        
        if self._flush_task:
            self._flush_task.cancel()
        
        await self.processor.flush()
    
    async def _flush_loop(self):
        """Periodically flush pending requests."""
        
        while True:
            await asyncio.sleep(self.flush_interval)
            await self.processor.flush()
    
    async def process(self, prompt: str) -> str:
        """Process single request through batch."""
        
        import uuid
        
        request_id = str(uuid.uuid4())
        result_future = asyncio.Future()
        
        def callback(response: str):
            result_future.set_result(response)
        
        request = BatchRequest(
            id=request_id,
            prompt=prompt,
            callback=callback
        )
        
        await self.processor.add_request(request)
        
        return await result_future

Cost Monitoring

from dataclasses import dataclass, field
from datetime import datetime, date
from collections import defaultdict

@dataclass
class UsageRecord:
    """Record of API usage."""
    
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    cost: float
    request_type: str = "unknown"

class CostTracker:
    """Track and analyze LLM costs."""
    
    def __init__(self):
        self.records: list[UsageRecord] = []
        self.daily_totals: dict[date, float] = defaultdict(float)
        self.model_totals: dict[str, float] = defaultdict(float)
    
    def record(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        request_type: str = "unknown"
    ):
        """Record usage."""
        
        config = MODELS.get(model)
        
        if config:
            cost = config.estimate_cost(input_tokens, output_tokens)
        else:
            # Default pricing
            cost = (input_tokens + output_tokens) * 0.00001
        
        record = UsageRecord(
            timestamp=datetime.now(),
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost,
            request_type=request_type
        )
        
        self.records.append(record)
        self.daily_totals[date.today()] += cost
        self.model_totals[model] += cost
    
    def get_daily_cost(self, day: date = None) -> float:
        """Get cost for specific day."""
        
        day = day or date.today()
        return self.daily_totals.get(day, 0.0)
    
    def get_total_cost(self) -> float:
        """Get total cost across all time."""
        
        return sum(r.cost for r in self.records)
    
    def get_cost_by_model(self) -> dict[str, float]:
        """Get cost breakdown by model."""
        
        return dict(self.model_totals)
    
    def get_cost_by_type(self) -> dict[str, float]:
        """Get cost breakdown by request type."""
        
        by_type = defaultdict(float)
        
        for record in self.records:
            by_type[record.request_type] += record.cost
        
        return dict(by_type)
    
    def get_summary(self) -> dict:
        """Get cost summary."""
        
        total_requests = len(self.records)
        total_cost = self.get_total_cost()
        
        return {
            "total_requests": total_requests,
            "total_cost": total_cost,
            "avg_cost_per_request": total_cost / total_requests if total_requests > 0 else 0,
            "today_cost": self.get_daily_cost(),
            "by_model": self.get_cost_by_model(),
            "by_type": self.get_cost_by_type()
        }

class CostAlertManager:
    """Manage cost alerts and limits."""
    
    def __init__(self, tracker: CostTracker):
        self.tracker = tracker
        self.alerts: list[dict] = []
        
        self.daily_limit: float = None
        self.monthly_limit: float = None
        self.per_request_limit: float = None
    
    def set_limits(
        self,
        daily: float = None,
        monthly: float = None,
        per_request: float = None
    ):
        """Set cost limits."""
        
        self.daily_limit = daily
        self.monthly_limit = monthly
        self.per_request_limit = per_request
    
    def check_limits(self) -> list[str]:
        """Check if any limits are exceeded."""
        
        warnings = []
        
        if self.daily_limit:
            daily_cost = self.tracker.get_daily_cost()
            if daily_cost >= self.daily_limit * 0.8:
                warnings.append(f"Daily cost at {daily_cost:.2f}/{self.daily_limit:.2f}")
        
        if self.monthly_limit:
            monthly_cost = self.tracker.get_total_cost()  # Simplified
            if monthly_cost >= self.monthly_limit * 0.8:
                warnings.append(f"Monthly cost at {monthly_cost:.2f}/{self.monthly_limit:.2f}")
        
        return warnings
    
    def should_block_request(self, estimated_cost: float) -> bool:
        """Check if request should be blocked."""
        
        if self.per_request_limit and estimated_cost > self.per_request_limit:
            return True
        
        if self.daily_limit:
            if self.tracker.get_daily_cost() + estimated_cost > self.daily_limit:
                return True
        
        return False

Production Cost Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
from openai import OpenAI
client = OpenAI()

router = CostAwareRouter(client)
budget_router = BudgetRouter(daily_budget=100.0)
optimizer = PromptOptimizer()
tracker = CostTracker()
alerts = CostAlertManager(tracker)

alerts.set_limits(daily=100.0, per_request=1.0)

class ChatRequest(BaseModel):
    prompt: str
    system_prompt: str = "You are a helpful assistant."
    optimize_prompt: bool = True
    max_tier: Optional[str] = None

class CostEstimateRequest(BaseModel):
    prompt: str
    model: Optional[str] = None

@app.post("/v1/chat")
async def chat_optimized(request: ChatRequest):
    """Chat with cost optimization."""
    
    prompt = request.prompt
    
    # Optimize prompt
    if request.optimize_prompt:
        prompt, stats = optimizer.optimize_prompt(prompt)
    
    # Route to appropriate model
    model_config = router.route(prompt)
    
    # Check tier constraint
    if request.max_tier:
        max_tier = ModelTier(request.max_tier)
        if model_config.tier.value > max_tier.value:
            model_config = router.get_model(max_tier)
    
    # Estimate cost
    input_tokens = optimizer.count_tokens(prompt)
    estimated_cost = model_config.estimate_cost(input_tokens, input_tokens // 2)
    
    # Check limits
    if alerts.should_block_request(estimated_cost):
        raise HTTPException(429, "Cost limit exceeded")
    
    # Make request
    response = client.chat.completions.create(
        model=model_config.name,
        messages=[
            {"role": "system", "content": request.system_prompt},
            {"role": "user", "content": prompt}
        ]
    )
    
    # Record usage
    tracker.record(
        model=model_config.name,
        input_tokens=response.usage.prompt_tokens,
        output_tokens=response.usage.completion_tokens,
        request_type="chat"
    )
    
    actual_cost = model_config.estimate_cost(
        response.usage.prompt_tokens,
        response.usage.completion_tokens
    )
    
    return {
        "response": response.choices[0].message.content,
        "model": model_config.name,
        "tier": model_config.tier.value,
        "cost": actual_cost,
        "tokens": {
            "input": response.usage.prompt_tokens,
            "output": response.usage.completion_tokens
        }
    }

@app.post("/v1/estimate")
async def estimate_cost(request: CostEstimateRequest):
    """Estimate cost for request."""
    
    input_tokens = optimizer.count_tokens(request.prompt)
    
    if request.model:
        model_config = MODELS.get(request.model)
        if not model_config:
            raise HTTPException(400, f"Unknown model: {request.model}")
    else:
        model_config = router.route(request.prompt)
    
    # Estimate output as half of input
    estimated_output = input_tokens // 2
    cost = model_config.estimate_cost(input_tokens, estimated_output)
    
    return {
        "model": model_config.name,
        "input_tokens": input_tokens,
        "estimated_output_tokens": estimated_output,
        "estimated_cost": cost
    }

@app.get("/v1/costs")
async def get_costs():
    """Get cost summary."""
    
    return tracker.get_summary()

@app.get("/v1/costs/alerts")
async def get_alerts():
    """Get cost alerts."""
    
    return {
        "warnings": alerts.check_limits(),
        "daily_limit": alerts.daily_limit,
        "daily_spent": tracker.get_daily_cost()
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

LLM cost optimization is about using the right model for each task. Implement model tiering—route simple queries to cheap models (GPT-4o-mini, Claude Haiku) and reserve expensive models for complex reasoning. Optimize prompts to reduce token count without losing meaning. Use caching aggressively for repeated or similar queries. Batch requests when latency permits to reduce per-request overhead. Monitor costs continuously with alerts for budget thresholds. The goal isn’t minimizing cost at all costs—it’s maximizing value per dollar spent. A well-optimized system can reduce LLM costs by 80%+ while maintaining quality for the tasks that matter.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

  

  

  

This site uses Akismet to reduce spam. Learn how your comment data is processed.