Latest Articles

LLM Cost Optimization: Caching, Routing, and Compression Strategies

Introduction: LLM costs can spiral quickly in production systems. A single GPT-4 call might cost pennies, but multiply that by millions of requests and you’re looking at substantial monthly bills. The good news is that most LLM applications have significant optimization opportunities—often 50-80% cost reduction is achievable without sacrificing quality. The key strategies are semantic caching (avoid redundant calls), model routing (use cheaper models when possible), prompt compression (reduce token counts), and batching (amortize overhead). Each technique has tradeoffs: caching requires storage and has cache miss costs, routing needs quality monitoring, compression can affect output quality. This guide covers practical implementations of these cost optimization strategies with production-ready code, helping you build LLM applications that are both powerful and economically sustainable.

LLM Cost Optimization
Cost Optimization: Cache, Route, Compress

Cost Tracking and Analysis

from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from datetime import datetime, timedelta
from enum import Enum
import json

class ModelTier(Enum):
    """Model pricing tiers."""
    
    PREMIUM = "premium"  # GPT-4, Claude 3 Opus
    STANDARD = "standard"  # GPT-3.5, Claude 3 Sonnet
    ECONOMY = "economy"  # GPT-3.5-turbo, Claude 3 Haiku

@dataclass
class ModelPricing:
    """Pricing for a model."""
    
    model_name: str
    tier: ModelTier
    input_cost_per_1k: float
    output_cost_per_1k: float
    
    def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
        """Calculate cost for token usage."""
        
        input_cost = (input_tokens / 1000) * self.input_cost_per_1k
        output_cost = (output_tokens / 1000) * self.output_cost_per_1k
        
        return input_cost + output_cost

# Model pricing catalog
PRICING_CATALOG = {
    "gpt-4-turbo": ModelPricing("gpt-4-turbo", ModelTier.PREMIUM, 0.01, 0.03),
    "gpt-4o": ModelPricing("gpt-4o", ModelTier.PREMIUM, 0.005, 0.015),
    "gpt-4o-mini": ModelPricing("gpt-4o-mini", ModelTier.ECONOMY, 0.00015, 0.0006),
    "gpt-3.5-turbo": ModelPricing("gpt-3.5-turbo", ModelTier.STANDARD, 0.0005, 0.0015),
    "claude-3-opus": ModelPricing("claude-3-opus", ModelTier.PREMIUM, 0.015, 0.075),
    "claude-3-sonnet": ModelPricing("claude-3-sonnet", ModelTier.STANDARD, 0.003, 0.015),
    "claude-3-haiku": ModelPricing("claude-3-haiku", ModelTier.ECONOMY, 0.00025, 0.00125),
}

@dataclass
class UsageRecord:
    """Record of LLM usage."""
    
    request_id: str
    model: str
    input_tokens: int
    output_tokens: int
    cost: float
    timestamp: datetime
    cached: bool = False
    metadata: dict = field(default_factory=dict)

class CostTracker:
    """Track LLM costs."""
    
    def __init__(self):
        self.records: list[UsageRecord] = []
        self.pricing = PRICING_CATALOG
    
    def record_usage(
        self,
        request_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        cached: bool = False,
        metadata: dict = None
    ) -> UsageRecord:
        """Record a usage event."""
        
        pricing = self.pricing.get(model)
        
        if pricing:
            cost = 0 if cached else pricing.calculate_cost(input_tokens, output_tokens)
        else:
            cost = 0
        
        record = UsageRecord(
            request_id=request_id,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost,
            timestamp=datetime.now(),
            cached=cached,
            metadata=metadata or {}
        )
        
        self.records.append(record)
        return record
    
    def get_daily_cost(self, date: datetime = None) -> float:
        """Get total cost for a day."""
        
        date = date or datetime.now()
        start = date.replace(hour=0, minute=0, second=0, microsecond=0)
        end = start + timedelta(days=1)
        
        return sum(
            r.cost for r in self.records
            if start <= r.timestamp < end
        )
    
    def get_monthly_cost(self, year: int = None, month: int = None) -> float:
        """Get total cost for a month."""
        
        now = datetime.now()
        year = year or now.year
        month = month or now.month
        
        return sum(
            r.cost for r in self.records
            if r.timestamp.year == year and r.timestamp.month == month
        )
    
    def get_cost_by_model(self) -> dict[str, float]:
        """Get costs broken down by model."""
        
        costs = {}
        
        for record in self.records:
            if record.model not in costs:
                costs[record.model] = 0
            costs[record.model] += record.cost
        
        return costs
    
    def get_cache_savings(self) -> dict:
        """Calculate savings from caching."""
        
        cached_records = [r for r in self.records if r.cached]
        
        if not cached_records:
            return {"savings": 0, "cache_hits": 0}
        
        # Calculate what it would have cost without cache
        potential_cost = 0
        for record in cached_records:
            pricing = self.pricing.get(record.model)
            if pricing:
                potential_cost += pricing.calculate_cost(
                    record.input_tokens,
                    record.output_tokens
                )
        
        return {
            "savings": potential_cost,
            "cache_hits": len(cached_records),
            "cache_hit_rate": len(cached_records) / len(self.records) if self.records else 0
        }
    
    def get_optimization_report(self) -> dict:
        """Generate optimization report."""
        
        total_cost = sum(r.cost for r in self.records)
        total_tokens = sum(r.input_tokens + r.output_tokens for r in self.records)
        
        by_model = self.get_cost_by_model()
        cache_stats = self.get_cache_savings()
        
        # Find optimization opportunities
        opportunities = []
        
        # Check for premium model overuse
        premium_cost = sum(
            r.cost for r in self.records
            if self.pricing.get(r.model, ModelPricing("", ModelTier.STANDARD, 0, 0)).tier == ModelTier.PREMIUM
        )
        
        if premium_cost > total_cost * 0.5:
            opportunities.append({
                "type": "model_routing",
                "description": "Over 50% of costs from premium models",
                "potential_savings": premium_cost * 0.3
            })
        
        # Check cache hit rate
        if cache_stats["cache_hit_rate"] < 0.2:
            opportunities.append({
                "type": "caching",
                "description": "Low cache hit rate",
                "potential_savings": total_cost * 0.2
            })
        
        return {
            "total_cost": total_cost,
            "total_tokens": total_tokens,
            "cost_by_model": by_model,
            "cache_stats": cache_stats,
            "opportunities": opportunities
        }

Semantic Caching

from dataclasses import dataclass
from typing import Any, Optional, List
import hashlib
import time
import numpy as np

@dataclass
class CacheEntry:
    """A cache entry."""
    
    key: str
    prompt_hash: str
    response: str
    embedding: list[float]
    created_at: float
    ttl: int
    hit_count: int = 0
    metadata: dict = field(default_factory=dict)
    
    def is_expired(self) -> bool:
        return time.time() > self.created_at + self.ttl

class ExactCache:
    """Exact match cache."""
    
    def __init__(self, max_size: int = 10000, ttl: int = 3600):
        self.cache: dict[str, CacheEntry] = {}
        self.max_size = max_size
        self.ttl = ttl
    
    def _hash_prompt(self, prompt: str) -> str:
        """Create hash of prompt."""
        
        return hashlib.sha256(prompt.encode()).hexdigest()
    
    def get(self, prompt: str) -> Optional[str]:
        """Get cached response."""
        
        key = self._hash_prompt(prompt)
        entry = self.cache.get(key)
        
        if entry and not entry.is_expired():
            entry.hit_count += 1
            return entry.response
        
        if entry:
            del self.cache[key]
        
        return None
    
    def set(self, prompt: str, response: str, metadata: dict = None):
        """Cache a response."""
        
        # Evict if at capacity
        if len(self.cache) >= self.max_size:
            self._evict()
        
        key = self._hash_prompt(prompt)
        
        self.cache[key] = CacheEntry(
            key=key,
            prompt_hash=key,
            response=response,
            embedding=[],
            created_at=time.time(),
            ttl=self.ttl,
            metadata=metadata or {}
        )
    
    def _evict(self):
        """Evict least recently used entries."""
        
        # Remove expired entries first
        expired = [k for k, v in self.cache.items() if v.is_expired()]
        for key in expired:
            del self.cache[key]
        
        # If still over capacity, remove LRU
        if len(self.cache) >= self.max_size:
            sorted_entries = sorted(
                self.cache.items(),
                key=lambda x: x[1].hit_count
            )
            
            to_remove = len(self.cache) - self.max_size + 100
            for key, _ in sorted_entries[:to_remove]:
                del self.cache[key]

class SemanticCache:
    """Semantic similarity cache."""
    
    def __init__(
        self,
        embedding_model: Any,
        similarity_threshold: float = 0.95,
        max_size: int = 10000,
        ttl: int = 3600
    ):
        self.embedding_model = embedding_model
        self.similarity_threshold = similarity_threshold
        self.max_size = max_size
        self.ttl = ttl
        self.entries: list[CacheEntry] = []
    
    async def get(self, prompt: str) -> Optional[str]:
        """Get semantically similar cached response."""
        
        # Get embedding for prompt
        prompt_embedding = await self.embedding_model.embed(prompt)
        
        # Find most similar entry
        best_match = None
        best_similarity = 0
        
        for entry in self.entries:
            if entry.is_expired():
                continue
            
            similarity = self._cosine_similarity(prompt_embedding, entry.embedding)
            
            if similarity > best_similarity:
                best_similarity = similarity
                best_match = entry
        
        if best_match and best_similarity >= self.similarity_threshold:
            best_match.hit_count += 1
            return best_match.response
        
        return None
    
    async def set(self, prompt: str, response: str, metadata: dict = None):
        """Cache a response with embedding."""
        
        # Evict if at capacity
        if len(self.entries) >= self.max_size:
            self._evict()
        
        # Get embedding
        embedding = await self.embedding_model.embed(prompt)
        
        entry = CacheEntry(
            key=hashlib.sha256(prompt.encode()).hexdigest(),
            prompt_hash=hashlib.sha256(prompt.encode()).hexdigest(),
            response=response,
            embedding=embedding,
            created_at=time.time(),
            ttl=self.ttl,
            metadata=metadata or {}
        )
        
        self.entries.append(entry)
    
    def _cosine_similarity(self, a: list, b: list) -> float:
        """Calculate cosine similarity."""
        
        a = np.array(a)
        b = np.array(b)
        
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _evict(self):
        """Evict entries."""
        
        # Remove expired
        self.entries = [e for e in self.entries if not e.is_expired()]
        
        # Remove lowest hit count if still over capacity
        if len(self.entries) >= self.max_size:
            self.entries.sort(key=lambda e: e.hit_count)
            self.entries = self.entries[100:]

class HybridCache:
    """Hybrid exact + semantic cache."""
    
    def __init__(
        self,
        embedding_model: Any,
        exact_ttl: int = 3600,
        semantic_ttl: int = 7200,
        similarity_threshold: float = 0.95
    ):
        self.exact_cache = ExactCache(ttl=exact_ttl)
        self.semantic_cache = SemanticCache(
            embedding_model=embedding_model,
            similarity_threshold=similarity_threshold,
            ttl=semantic_ttl
        )
    
    async def get(self, prompt: str) -> Optional[tuple[str, str]]:
        """Get cached response. Returns (response, cache_type)."""
        
        # Try exact match first (faster)
        exact_result = self.exact_cache.get(prompt)
        if exact_result:
            return exact_result, "exact"
        
        # Try semantic match
        semantic_result = await self.semantic_cache.get(prompt)
        if semantic_result:
            return semantic_result, "semantic"
        
        return None, None
    
    async def set(self, prompt: str, response: str, metadata: dict = None):
        """Cache response in both caches."""
        
        self.exact_cache.set(prompt, response, metadata)
        await self.semantic_cache.set(prompt, response, metadata)
    
    def get_stats(self) -> dict:
        """Get cache statistics."""
        
        exact_hits = sum(e.hit_count for e in self.exact_cache.cache.values())
        semantic_hits = sum(e.hit_count for e in self.semantic_cache.entries)
        
        return {
            "exact_cache_size": len(self.exact_cache.cache),
            "semantic_cache_size": len(self.semantic_cache.entries),
            "exact_hits": exact_hits,
            "semantic_hits": semantic_hits,
            "total_hits": exact_hits + semantic_hits
        }

Model Routing

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
from enum import Enum

class TaskComplexity(Enum):
    """Task complexity levels."""
    
    SIMPLE = "simple"
    MODERATE = "moderate"
    COMPLEX = "complex"

@dataclass
class RoutingRule:
    """A routing rule."""
    
    name: str
    condition: Callable[[str], bool]
    target_model: str
    priority: int = 0

class ComplexityClassifier:
    """Classify task complexity."""
    
    def __init__(self, llm_client: Any = None):
        self.llm = llm_client
        self.simple_patterns = [
            "summarize",
            "translate",
            "extract",
            "format",
            "list",
            "define"
        ]
        self.complex_patterns = [
            "analyze",
            "compare",
            "evaluate",
            "design",
            "architect",
            "optimize",
            "debug"
        ]
    
    def classify(self, prompt: str) -> TaskComplexity:
        """Classify prompt complexity."""
        
        prompt_lower = prompt.lower()
        
        # Check for complex patterns
        for pattern in self.complex_patterns:
            if pattern in prompt_lower:
                return TaskComplexity.COMPLEX
        
        # Check for simple patterns
        for pattern in self.simple_patterns:
            if pattern in prompt_lower:
                return TaskComplexity.SIMPLE
        
        # Check prompt length
        if len(prompt) < 100:
            return TaskComplexity.SIMPLE
        elif len(prompt) > 1000:
            return TaskComplexity.COMPLEX
        
        return TaskComplexity.MODERATE
    
    async def classify_with_llm(self, prompt: str) -> TaskComplexity:
        """Use LLM to classify complexity."""
        
        if not self.llm:
            return self.classify(prompt)
        
        classification_prompt = f"""Classify the complexity of this task as SIMPLE, MODERATE, or COMPLEX.

SIMPLE: Basic tasks like summarization, translation, formatting, simple Q&A
MODERATE: Tasks requiring some reasoning or multi-step processing
COMPLEX: Tasks requiring deep analysis, creative problem-solving, or expert knowledge

Task: {prompt[:500]}

Classification (respond with just the word):"""
        
        response = await self.llm.generate(classification_prompt, max_tokens=10)
        response = response.strip().upper()
        
        if "SIMPLE" in response:
            return TaskComplexity.SIMPLE
        elif "COMPLEX" in response:
            return TaskComplexity.COMPLEX
        
        return TaskComplexity.MODERATE

class ModelRouter:
    """Route requests to appropriate models."""
    
    def __init__(self):
        self.rules: list[RoutingRule] = []
        self.classifier = ComplexityClassifier()
        self.model_map = {
            TaskComplexity.SIMPLE: "gpt-4o-mini",
            TaskComplexity.MODERATE: "gpt-3.5-turbo",
            TaskComplexity.COMPLEX: "gpt-4o"
        }
    
    def add_rule(self, rule: RoutingRule):
        """Add a routing rule."""
        
        self.rules.append(rule)
        self.rules.sort(key=lambda r: r.priority, reverse=True)
    
    def route(self, prompt: str) -> str:
        """Route prompt to model."""
        
        # Check rules first
        for rule in self.rules:
            if rule.condition(prompt):
                return rule.target_model
        
        # Fall back to complexity-based routing
        complexity = self.classifier.classify(prompt)
        return self.model_map.get(complexity, "gpt-3.5-turbo")
    
    async def route_with_classification(self, prompt: str) -> tuple[str, TaskComplexity]:
        """Route with detailed classification."""
        
        # Check rules first
        for rule in self.rules:
            if rule.condition(prompt):
                return rule.target_model, TaskComplexity.MODERATE
        
        # Use LLM classification for better accuracy
        complexity = await self.classifier.classify_with_llm(prompt)
        model = self.model_map.get(complexity, "gpt-3.5-turbo")
        
        return model, complexity

class CascadeRouter:
    """Cascade through models, starting cheap."""
    
    def __init__(
        self,
        models: list[str],
        quality_checker: Callable[[str, str], float]
    ):
        self.models = models  # Ordered from cheapest to most expensive
        self.quality_checker = quality_checker
        self.quality_threshold = 0.8
    
    async def route(
        self,
        prompt: str,
        llm_clients: dict[str, Any]
    ) -> tuple[str, str]:
        """Try models in order until quality threshold met."""
        
        for model in self.models:
            client = llm_clients.get(model)
            if not client:
                continue
            
            response = await client.generate(prompt)
            quality = self.quality_checker(prompt, response)
            
            if quality >= self.quality_threshold:
                return response, model
        
        # Fall back to most expensive model
        final_model = self.models[-1]
        client = llm_clients.get(final_model)
        response = await client.generate(prompt)
        
        return response, final_model

class AdaptiveRouter:
    """Learn optimal routing from feedback."""
    
    def __init__(self):
        self.routing_history: list[dict] = []
        self.model_performance: dict[str, dict] = {}
    
    def record_result(
        self,
        prompt: str,
        model: str,
        quality_score: float,
        cost: float,
        latency_ms: float
    ):
        """Record routing result."""
        
        self.routing_history.append({
            "prompt_length": len(prompt),
            "model": model,
            "quality": quality_score,
            "cost": cost,
            "latency": latency_ms
        })
        
        # Update model performance stats
        if model not in self.model_performance:
            self.model_performance[model] = {
                "total_quality": 0,
                "total_cost": 0,
                "count": 0
            }
        
        stats = self.model_performance[model]
        stats["total_quality"] += quality_score
        stats["total_cost"] += cost
        stats["count"] += 1
    
    def get_optimal_model(
        self,
        prompt: str,
        quality_weight: float = 0.5,
        cost_weight: float = 0.5
    ) -> str:
        """Get optimal model based on history."""
        
        if not self.model_performance:
            return "gpt-3.5-turbo"
        
        best_model = None
        best_score = -1
        
        for model, stats in self.model_performance.items():
            if stats["count"] == 0:
                continue
            
            avg_quality = stats["total_quality"] / stats["count"]
            avg_cost = stats["total_cost"] / stats["count"]
            
            # Normalize cost (lower is better)
            max_cost = max(s["total_cost"] / s["count"] for s in self.model_performance.values() if s["count"] > 0)
            normalized_cost = 1 - (avg_cost / max_cost) if max_cost > 0 else 1
            
            score = quality_weight * avg_quality + cost_weight * normalized_cost
            
            if score > best_score:
                best_score = score
                best_model = model
        
        return best_model or "gpt-3.5-turbo"

Prompt Compression

from dataclasses import dataclass
from typing import Any, Optional, List
import re

@dataclass
class CompressionResult:
    """Result of prompt compression."""
    
    original_tokens: int
    compressed_tokens: int
    compression_ratio: float
    compressed_text: str

class TokenCounter:
    """Count tokens in text."""
    
    def __init__(self, model: str = "gpt-3.5-turbo"):
        self.model = model
    
    def count(self, text: str) -> int:
        """Count tokens."""
        
        try:
            import tiktoken
            encoding = tiktoken.encoding_for_model(self.model)
            return len(encoding.encode(text))
        except:
            # Fallback: rough estimate
            return len(text) // 4

class WhitespaceCompressor:
    """Compress whitespace in prompts."""
    
    def compress(self, text: str) -> str:
        """Remove excess whitespace."""
        
        # Replace multiple spaces with single space
        text = re.sub(r' +', ' ', text)
        
        # Replace multiple newlines with single newline
        text = re.sub(r'\n+', '\n', text)
        
        # Remove leading/trailing whitespace from lines
        lines = [line.strip() for line in text.split('\n')]
        text = '\n'.join(lines)
        
        return text.strip()

class StopwordRemover:
    """Remove stopwords from prompts."""
    
    def __init__(self):
        self.stopwords = {
            "the", "a", "an", "is", "are", "was", "were", "be", "been",
            "being", "have", "has", "had", "do", "does", "did", "will",
            "would", "could", "should", "may", "might", "must", "shall",
            "can", "need", "dare", "ought", "used", "to", "of", "in",
            "for", "on", "with", "at", "by", "from", "as", "into",
            "through", "during", "before", "after", "above", "below",
            "between", "under", "again", "further", "then", "once"
        }
    
    def compress(self, text: str, preserve_structure: bool = True) -> str:
        """Remove stopwords while preserving meaning."""
        
        if preserve_structure:
            # Only remove from specific sections
            return text
        
        words = text.split()
        filtered = [w for w in words if w.lower() not in self.stopwords]
        
        return ' '.join(filtered)

class ContextTruncator:
    """Truncate context to fit token limits."""
    
    def __init__(self, token_counter: TokenCounter):
        self.counter = token_counter
    
    def truncate(
        self,
        text: str,
        max_tokens: int,
        strategy: str = "end"
    ) -> str:
        """Truncate text to max tokens."""
        
        current_tokens = self.counter.count(text)
        
        if current_tokens <= max_tokens:
            return text
        
        if strategy == "end":
            return self._truncate_end(text, max_tokens)
        elif strategy == "start":
            return self._truncate_start(text, max_tokens)
        elif strategy == "middle":
            return self._truncate_middle(text, max_tokens)
        
        return text
    
    def _truncate_end(self, text: str, max_tokens: int) -> str:
        """Truncate from end."""
        
        words = text.split()
        
        while self.counter.count(' '.join(words)) > max_tokens and words:
            words = words[:-1]
        
        return ' '.join(words) + "..."
    
    def _truncate_start(self, text: str, max_tokens: int) -> str:
        """Truncate from start."""
        
        words = text.split()
        
        while self.counter.count(' '.join(words)) > max_tokens and words:
            words = words[1:]
        
        return "..." + ' '.join(words)
    
    def _truncate_middle(self, text: str, max_tokens: int) -> str:
        """Keep start and end, remove middle."""
        
        words = text.split()
        half_tokens = max_tokens // 2
        
        start_words = []
        end_words = []
        
        # Build start
        for word in words:
            if self.counter.count(' '.join(start_words + [word])) <= half_tokens:
                start_words.append(word)
            else:
                break
        
        # Build end
        for word in reversed(words):
            if self.counter.count(' '.join([word] + end_words)) <= half_tokens:
                end_words.insert(0, word)
            else:
                break
        
        return ' '.join(start_words) + " ... " + ' '.join(end_words)

class LLMCompressor:
    """Use LLM to compress prompts."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def compress(
        self,
        text: str,
        target_ratio: float = 0.5
    ) -> str:
        """Compress text using LLM summarization."""
        
        prompt = f"""Compress the following text to approximately {int(target_ratio * 100)}% of its original length.
Preserve all key information, facts, and meaning.
Remove redundancy and verbose language.

Text to compress:
{text}

Compressed version:"""
        
        return await self.llm.generate(prompt)

class PromptCompressor:
    """Comprehensive prompt compression."""
    
    def __init__(self, llm_client: Any = None):
        self.token_counter = TokenCounter()
        self.whitespace = WhitespaceCompressor()
        self.truncator = ContextTruncator(self.token_counter)
        self.llm_compressor = LLMCompressor(llm_client) if llm_client else None
    
    async def compress(
        self,
        prompt: str,
        max_tokens: int = None,
        target_ratio: float = None
    ) -> CompressionResult:
        """Compress prompt."""
        
        original_tokens = self.token_counter.count(prompt)
        compressed = prompt
        
        # Step 1: Whitespace compression (always)
        compressed = self.whitespace.compress(compressed)
        
        # Step 2: Check if we need more compression
        current_tokens = self.token_counter.count(compressed)
        
        if max_tokens and current_tokens > max_tokens:
            # Use truncation
            compressed = self.truncator.truncate(compressed, max_tokens)
        
        elif target_ratio and self.llm_compressor:
            # Use LLM compression
            compressed = await self.llm_compressor.compress(compressed, target_ratio)
        
        final_tokens = self.token_counter.count(compressed)
        
        return CompressionResult(
            original_tokens=original_tokens,
            compressed_tokens=final_tokens,
            compression_ratio=final_tokens / original_tokens if original_tokens > 0 else 1,
            compressed_text=compressed
        )

Production Cost Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any

app = FastAPI()

class GenerateRequest(BaseModel):
    prompt: str
    model: Optional[str] = None
    max_tokens: int = 1000
    use_cache: bool = True
    compress: bool = True

class CostReportRequest(BaseModel):
    start_date: Optional[str] = None
    end_date: Optional[str] = None
    group_by: str = "model"

# Initialize components
cost_tracker = CostTracker()
router = ModelRouter()
compressor = PromptCompressor()
cache = ExactCache()

@app.post("/v1/generate")
async def generate(request: GenerateRequest) -> dict:
    """Generate with cost optimization."""
    
    prompt = request.prompt
    
    # Step 1: Check cache
    if request.use_cache:
        cached = cache.get(prompt)
        if cached:
            cost_tracker.record_usage(
                request_id="cached",
                model=request.model or "cached",
                input_tokens=0,
                output_tokens=0,
                cached=True
            )
            return {
                "response": cached,
                "cached": True,
                "cost": 0
            }
    
    # Step 2: Compress prompt
    if request.compress:
        result = await compressor.compress(prompt)
        prompt = result.compressed_text
    
    # Step 3: Route to model
    model = request.model or router.route(prompt)
    
    # Step 4: Generate (mock - would call actual LLM)
    response = f"Generated response for: {prompt[:50]}..."
    input_tokens = len(prompt) // 4
    output_tokens = len(response) // 4
    
    # Step 5: Record usage
    record = cost_tracker.record_usage(
        request_id="req_123",
        model=model,
        input_tokens=input_tokens,
        output_tokens=output_tokens
    )
    
    # Step 6: Cache response
    if request.use_cache:
        cache.set(request.prompt, response)
    
    return {
        "response": response,
        "model": model,
        "cached": False,
        "cost": record.cost,
        "tokens": {
            "input": input_tokens,
            "output": output_tokens
        }
    }

@app.get("/v1/costs/daily")
async def get_daily_costs(days: int = 7) -> list[dict]:
    """Get daily costs."""
    
    from datetime import datetime, timedelta
    
    results = []
    
    for i in range(days):
        date = datetime.now() - timedelta(days=i)
        cost = cost_tracker.get_daily_cost(date)
        results.append({
            "date": date.strftime("%Y-%m-%d"),
            "cost": cost
        })
    
    return results

@app.get("/v1/costs/by-model")
async def get_costs_by_model() -> dict:
    """Get costs by model."""
    
    return cost_tracker.get_cost_by_model()

@app.get("/v1/costs/report")
async def get_cost_report() -> dict:
    """Get optimization report."""
    
    return cost_tracker.get_optimization_report()

@app.get("/v1/cache/stats")
async def get_cache_stats() -> dict:
    """Get cache statistics."""
    
    return {
        "size": len(cache.cache),
        "savings": cost_tracker.get_cache_savings()
    }

@app.post("/v1/estimate")
async def estimate_cost(prompt: str, model: str = "gpt-3.5-turbo") -> dict:
    """Estimate cost for a prompt."""
    
    pricing = PRICING_CATALOG.get(model)
    
    if not pricing:
        raise HTTPException(status_code=404, detail="Model not found")
    
    token_counter = TokenCounter()
    input_tokens = token_counter.count(prompt)
    
    # Estimate output tokens (rough)
    estimated_output = min(input_tokens * 2, 1000)
    
    cost = pricing.calculate_cost(input_tokens, estimated_output)
    
    return {
        "model": model,
        "input_tokens": input_tokens,
        "estimated_output_tokens": estimated_output,
        "estimated_cost": cost
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

LLM cost optimization is about applying the right technique at the right time. Start with measurement—you can't optimize what you don't track. Implement cost tracking from day one and monitor spending by model, feature, and user segment. Semantic caching typically provides the highest ROI: many applications have significant query repetition, and a 30% cache hit rate can reduce costs by 30% with minimal quality impact. Model routing is the next lever—use GPT-4 for complex reasoning tasks but route simple queries to GPT-3.5-turbo or even smaller models. The key is building a quality feedback loop so you can verify that cheaper models meet your quality bar. Prompt compression reduces token counts without changing model selection: remove whitespace, truncate context intelligently, or use LLM-based summarization for long inputs. Batching amortizes API overhead and can reduce costs for high-volume applications. The most effective approach combines all these techniques: cache first, compress what remains, route to the cheapest model that meets quality requirements, and batch where possible. Monitor continuously and adjust thresholds based on quality metrics. With systematic optimization, 50-80% cost reduction is achievable for most LLM applications.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion