Introduction: LLM costs can spiral quickly in production systems. A single GPT-4 call might cost pennies, but multiply that by millions of requests and you’re looking at substantial monthly bills. The good news is that most LLM applications have significant optimization opportunities—often 50-80% cost reduction is achievable without sacrificing quality. The key strategies are semantic caching (avoid redundant calls), model routing (use cheaper models when possible), prompt compression (reduce token counts), and batching (amortize overhead). Each technique has tradeoffs: caching requires storage and has cache miss costs, routing needs quality monitoring, compression can affect output quality. This guide covers practical implementations of these cost optimization strategies with production-ready code, helping you build LLM applications that are both powerful and economically sustainable.

Cost Tracking and Analysis
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from datetime import datetime, timedelta
from enum import Enum
import json
class ModelTier(Enum):
"""Model pricing tiers."""
PREMIUM = "premium" # GPT-4, Claude 3 Opus
STANDARD = "standard" # GPT-3.5, Claude 3 Sonnet
ECONOMY = "economy" # GPT-3.5-turbo, Claude 3 Haiku
@dataclass
class ModelPricing:
"""Pricing for a model."""
model_name: str
tier: ModelTier
input_cost_per_1k: float
output_cost_per_1k: float
def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost for token usage."""
input_cost = (input_tokens / 1000) * self.input_cost_per_1k
output_cost = (output_tokens / 1000) * self.output_cost_per_1k
return input_cost + output_cost
# Model pricing catalog
PRICING_CATALOG = {
"gpt-4-turbo": ModelPricing("gpt-4-turbo", ModelTier.PREMIUM, 0.01, 0.03),
"gpt-4o": ModelPricing("gpt-4o", ModelTier.PREMIUM, 0.005, 0.015),
"gpt-4o-mini": ModelPricing("gpt-4o-mini", ModelTier.ECONOMY, 0.00015, 0.0006),
"gpt-3.5-turbo": ModelPricing("gpt-3.5-turbo", ModelTier.STANDARD, 0.0005, 0.0015),
"claude-3-opus": ModelPricing("claude-3-opus", ModelTier.PREMIUM, 0.015, 0.075),
"claude-3-sonnet": ModelPricing("claude-3-sonnet", ModelTier.STANDARD, 0.003, 0.015),
"claude-3-haiku": ModelPricing("claude-3-haiku", ModelTier.ECONOMY, 0.00025, 0.00125),
}
@dataclass
class UsageRecord:
"""Record of LLM usage."""
request_id: str
model: str
input_tokens: int
output_tokens: int
cost: float
timestamp: datetime
cached: bool = False
metadata: dict = field(default_factory=dict)
class CostTracker:
"""Track LLM costs."""
def __init__(self):
self.records: list[UsageRecord] = []
self.pricing = PRICING_CATALOG
def record_usage(
self,
request_id: str,
model: str,
input_tokens: int,
output_tokens: int,
cached: bool = False,
metadata: dict = None
) -> UsageRecord:
"""Record a usage event."""
pricing = self.pricing.get(model)
if pricing:
cost = 0 if cached else pricing.calculate_cost(input_tokens, output_tokens)
else:
cost = 0
record = UsageRecord(
request_id=request_id,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
timestamp=datetime.now(),
cached=cached,
metadata=metadata or {}
)
self.records.append(record)
return record
def get_daily_cost(self, date: datetime = None) -> float:
"""Get total cost for a day."""
date = date or datetime.now()
start = date.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
return sum(
r.cost for r in self.records
if start <= r.timestamp < end
)
def get_monthly_cost(self, year: int = None, month: int = None) -> float:
"""Get total cost for a month."""
now = datetime.now()
year = year or now.year
month = month or now.month
return sum(
r.cost for r in self.records
if r.timestamp.year == year and r.timestamp.month == month
)
def get_cost_by_model(self) -> dict[str, float]:
"""Get costs broken down by model."""
costs = {}
for record in self.records:
if record.model not in costs:
costs[record.model] = 0
costs[record.model] += record.cost
return costs
def get_cache_savings(self) -> dict:
"""Calculate savings from caching."""
cached_records = [r for r in self.records if r.cached]
if not cached_records:
return {"savings": 0, "cache_hits": 0}
# Calculate what it would have cost without cache
potential_cost = 0
for record in cached_records:
pricing = self.pricing.get(record.model)
if pricing:
potential_cost += pricing.calculate_cost(
record.input_tokens,
record.output_tokens
)
return {
"savings": potential_cost,
"cache_hits": len(cached_records),
"cache_hit_rate": len(cached_records) / len(self.records) if self.records else 0
}
def get_optimization_report(self) -> dict:
"""Generate optimization report."""
total_cost = sum(r.cost for r in self.records)
total_tokens = sum(r.input_tokens + r.output_tokens for r in self.records)
by_model = self.get_cost_by_model()
cache_stats = self.get_cache_savings()
# Find optimization opportunities
opportunities = []
# Check for premium model overuse
premium_cost = sum(
r.cost for r in self.records
if self.pricing.get(r.model, ModelPricing("", ModelTier.STANDARD, 0, 0)).tier == ModelTier.PREMIUM
)
if premium_cost > total_cost * 0.5:
opportunities.append({
"type": "model_routing",
"description": "Over 50% of costs from premium models",
"potential_savings": premium_cost * 0.3
})
# Check cache hit rate
if cache_stats["cache_hit_rate"] < 0.2:
opportunities.append({
"type": "caching",
"description": "Low cache hit rate",
"potential_savings": total_cost * 0.2
})
return {
"total_cost": total_cost,
"total_tokens": total_tokens,
"cost_by_model": by_model,
"cache_stats": cache_stats,
"opportunities": opportunities
}
Semantic Caching
from dataclasses import dataclass
from typing import Any, Optional, List
import hashlib
import time
import numpy as np
@dataclass
class CacheEntry:
"""A cache entry."""
key: str
prompt_hash: str
response: str
embedding: list[float]
created_at: float
ttl: int
hit_count: int = 0
metadata: dict = field(default_factory=dict)
def is_expired(self) -> bool:
return time.time() > self.created_at + self.ttl
class ExactCache:
"""Exact match cache."""
def __init__(self, max_size: int = 10000, ttl: int = 3600):
self.cache: dict[str, CacheEntry] = {}
self.max_size = max_size
self.ttl = ttl
def _hash_prompt(self, prompt: str) -> str:
"""Create hash of prompt."""
return hashlib.sha256(prompt.encode()).hexdigest()
def get(self, prompt: str) -> Optional[str]:
"""Get cached response."""
key = self._hash_prompt(prompt)
entry = self.cache.get(key)
if entry and not entry.is_expired():
entry.hit_count += 1
return entry.response
if entry:
del self.cache[key]
return None
def set(self, prompt: str, response: str, metadata: dict = None):
"""Cache a response."""
# Evict if at capacity
if len(self.cache) >= self.max_size:
self._evict()
key = self._hash_prompt(prompt)
self.cache[key] = CacheEntry(
key=key,
prompt_hash=key,
response=response,
embedding=[],
created_at=time.time(),
ttl=self.ttl,
metadata=metadata or {}
)
def _evict(self):
"""Evict least recently used entries."""
# Remove expired entries first
expired = [k for k, v in self.cache.items() if v.is_expired()]
for key in expired:
del self.cache[key]
# If still over capacity, remove LRU
if len(self.cache) >= self.max_size:
sorted_entries = sorted(
self.cache.items(),
key=lambda x: x[1].hit_count
)
to_remove = len(self.cache) - self.max_size + 100
for key, _ in sorted_entries[:to_remove]:
del self.cache[key]
class SemanticCache:
"""Semantic similarity cache."""
def __init__(
self,
embedding_model: Any,
similarity_threshold: float = 0.95,
max_size: int = 10000,
ttl: int = 3600
):
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self.max_size = max_size
self.ttl = ttl
self.entries: list[CacheEntry] = []
async def get(self, prompt: str) -> Optional[str]:
"""Get semantically similar cached response."""
# Get embedding for prompt
prompt_embedding = await self.embedding_model.embed(prompt)
# Find most similar entry
best_match = None
best_similarity = 0
for entry in self.entries:
if entry.is_expired():
continue
similarity = self._cosine_similarity(prompt_embedding, entry.embedding)
if similarity > best_similarity:
best_similarity = similarity
best_match = entry
if best_match and best_similarity >= self.similarity_threshold:
best_match.hit_count += 1
return best_match.response
return None
async def set(self, prompt: str, response: str, metadata: dict = None):
"""Cache a response with embedding."""
# Evict if at capacity
if len(self.entries) >= self.max_size:
self._evict()
# Get embedding
embedding = await self.embedding_model.embed(prompt)
entry = CacheEntry(
key=hashlib.sha256(prompt.encode()).hexdigest(),
prompt_hash=hashlib.sha256(prompt.encode()).hexdigest(),
response=response,
embedding=embedding,
created_at=time.time(),
ttl=self.ttl,
metadata=metadata or {}
)
self.entries.append(entry)
def _cosine_similarity(self, a: list, b: list) -> float:
"""Calculate cosine similarity."""
a = np.array(a)
b = np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def _evict(self):
"""Evict entries."""
# Remove expired
self.entries = [e for e in self.entries if not e.is_expired()]
# Remove lowest hit count if still over capacity
if len(self.entries) >= self.max_size:
self.entries.sort(key=lambda e: e.hit_count)
self.entries = self.entries[100:]
class HybridCache:
"""Hybrid exact + semantic cache."""
def __init__(
self,
embedding_model: Any,
exact_ttl: int = 3600,
semantic_ttl: int = 7200,
similarity_threshold: float = 0.95
):
self.exact_cache = ExactCache(ttl=exact_ttl)
self.semantic_cache = SemanticCache(
embedding_model=embedding_model,
similarity_threshold=similarity_threshold,
ttl=semantic_ttl
)
async def get(self, prompt: str) -> Optional[tuple[str, str]]:
"""Get cached response. Returns (response, cache_type)."""
# Try exact match first (faster)
exact_result = self.exact_cache.get(prompt)
if exact_result:
return exact_result, "exact"
# Try semantic match
semantic_result = await self.semantic_cache.get(prompt)
if semantic_result:
return semantic_result, "semantic"
return None, None
async def set(self, prompt: str, response: str, metadata: dict = None):
"""Cache response in both caches."""
self.exact_cache.set(prompt, response, metadata)
await self.semantic_cache.set(prompt, response, metadata)
def get_stats(self) -> dict:
"""Get cache statistics."""
exact_hits = sum(e.hit_count for e in self.exact_cache.cache.values())
semantic_hits = sum(e.hit_count for e in self.semantic_cache.entries)
return {
"exact_cache_size": len(self.exact_cache.cache),
"semantic_cache_size": len(self.semantic_cache.entries),
"exact_hits": exact_hits,
"semantic_hits": semantic_hits,
"total_hits": exact_hits + semantic_hits
}
Model Routing
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
from enum import Enum
class TaskComplexity(Enum):
"""Task complexity levels."""
SIMPLE = "simple"
MODERATE = "moderate"
COMPLEX = "complex"
@dataclass
class RoutingRule:
"""A routing rule."""
name: str
condition: Callable[[str], bool]
target_model: str
priority: int = 0
class ComplexityClassifier:
"""Classify task complexity."""
def __init__(self, llm_client: Any = None):
self.llm = llm_client
self.simple_patterns = [
"summarize",
"translate",
"extract",
"format",
"list",
"define"
]
self.complex_patterns = [
"analyze",
"compare",
"evaluate",
"design",
"architect",
"optimize",
"debug"
]
def classify(self, prompt: str) -> TaskComplexity:
"""Classify prompt complexity."""
prompt_lower = prompt.lower()
# Check for complex patterns
for pattern in self.complex_patterns:
if pattern in prompt_lower:
return TaskComplexity.COMPLEX
# Check for simple patterns
for pattern in self.simple_patterns:
if pattern in prompt_lower:
return TaskComplexity.SIMPLE
# Check prompt length
if len(prompt) < 100:
return TaskComplexity.SIMPLE
elif len(prompt) > 1000:
return TaskComplexity.COMPLEX
return TaskComplexity.MODERATE
async def classify_with_llm(self, prompt: str) -> TaskComplexity:
"""Use LLM to classify complexity."""
if not self.llm:
return self.classify(prompt)
classification_prompt = f"""Classify the complexity of this task as SIMPLE, MODERATE, or COMPLEX.
SIMPLE: Basic tasks like summarization, translation, formatting, simple Q&A
MODERATE: Tasks requiring some reasoning or multi-step processing
COMPLEX: Tasks requiring deep analysis, creative problem-solving, or expert knowledge
Task: {prompt[:500]}
Classification (respond with just the word):"""
response = await self.llm.generate(classification_prompt, max_tokens=10)
response = response.strip().upper()
if "SIMPLE" in response:
return TaskComplexity.SIMPLE
elif "COMPLEX" in response:
return TaskComplexity.COMPLEX
return TaskComplexity.MODERATE
class ModelRouter:
"""Route requests to appropriate models."""
def __init__(self):
self.rules: list[RoutingRule] = []
self.classifier = ComplexityClassifier()
self.model_map = {
TaskComplexity.SIMPLE: "gpt-4o-mini",
TaskComplexity.MODERATE: "gpt-3.5-turbo",
TaskComplexity.COMPLEX: "gpt-4o"
}
def add_rule(self, rule: RoutingRule):
"""Add a routing rule."""
self.rules.append(rule)
self.rules.sort(key=lambda r: r.priority, reverse=True)
def route(self, prompt: str) -> str:
"""Route prompt to model."""
# Check rules first
for rule in self.rules:
if rule.condition(prompt):
return rule.target_model
# Fall back to complexity-based routing
complexity = self.classifier.classify(prompt)
return self.model_map.get(complexity, "gpt-3.5-turbo")
async def route_with_classification(self, prompt: str) -> tuple[str, TaskComplexity]:
"""Route with detailed classification."""
# Check rules first
for rule in self.rules:
if rule.condition(prompt):
return rule.target_model, TaskComplexity.MODERATE
# Use LLM classification for better accuracy
complexity = await self.classifier.classify_with_llm(prompt)
model = self.model_map.get(complexity, "gpt-3.5-turbo")
return model, complexity
class CascadeRouter:
"""Cascade through models, starting cheap."""
def __init__(
self,
models: list[str],
quality_checker: Callable[[str, str], float]
):
self.models = models # Ordered from cheapest to most expensive
self.quality_checker = quality_checker
self.quality_threshold = 0.8
async def route(
self,
prompt: str,
llm_clients: dict[str, Any]
) -> tuple[str, str]:
"""Try models in order until quality threshold met."""
for model in self.models:
client = llm_clients.get(model)
if not client:
continue
response = await client.generate(prompt)
quality = self.quality_checker(prompt, response)
if quality >= self.quality_threshold:
return response, model
# Fall back to most expensive model
final_model = self.models[-1]
client = llm_clients.get(final_model)
response = await client.generate(prompt)
return response, final_model
class AdaptiveRouter:
"""Learn optimal routing from feedback."""
def __init__(self):
self.routing_history: list[dict] = []
self.model_performance: dict[str, dict] = {}
def record_result(
self,
prompt: str,
model: str,
quality_score: float,
cost: float,
latency_ms: float
):
"""Record routing result."""
self.routing_history.append({
"prompt_length": len(prompt),
"model": model,
"quality": quality_score,
"cost": cost,
"latency": latency_ms
})
# Update model performance stats
if model not in self.model_performance:
self.model_performance[model] = {
"total_quality": 0,
"total_cost": 0,
"count": 0
}
stats = self.model_performance[model]
stats["total_quality"] += quality_score
stats["total_cost"] += cost
stats["count"] += 1
def get_optimal_model(
self,
prompt: str,
quality_weight: float = 0.5,
cost_weight: float = 0.5
) -> str:
"""Get optimal model based on history."""
if not self.model_performance:
return "gpt-3.5-turbo"
best_model = None
best_score = -1
for model, stats in self.model_performance.items():
if stats["count"] == 0:
continue
avg_quality = stats["total_quality"] / stats["count"]
avg_cost = stats["total_cost"] / stats["count"]
# Normalize cost (lower is better)
max_cost = max(s["total_cost"] / s["count"] for s in self.model_performance.values() if s["count"] > 0)
normalized_cost = 1 - (avg_cost / max_cost) if max_cost > 0 else 1
score = quality_weight * avg_quality + cost_weight * normalized_cost
if score > best_score:
best_score = score
best_model = model
return best_model or "gpt-3.5-turbo"
Prompt Compression
from dataclasses import dataclass
from typing import Any, Optional, List
import re
@dataclass
class CompressionResult:
"""Result of prompt compression."""
original_tokens: int
compressed_tokens: int
compression_ratio: float
compressed_text: str
class TokenCounter:
"""Count tokens in text."""
def __init__(self, model: str = "gpt-3.5-turbo"):
self.model = model
def count(self, text: str) -> int:
"""Count tokens."""
try:
import tiktoken
encoding = tiktoken.encoding_for_model(self.model)
return len(encoding.encode(text))
except:
# Fallback: rough estimate
return len(text) // 4
class WhitespaceCompressor:
"""Compress whitespace in prompts."""
def compress(self, text: str) -> str:
"""Remove excess whitespace."""
# Replace multiple spaces with single space
text = re.sub(r' +', ' ', text)
# Replace multiple newlines with single newline
text = re.sub(r'\n+', '\n', text)
# Remove leading/trailing whitespace from lines
lines = [line.strip() for line in text.split('\n')]
text = '\n'.join(lines)
return text.strip()
class StopwordRemover:
"""Remove stopwords from prompts."""
def __init__(self):
self.stopwords = {
"the", "a", "an", "is", "are", "was", "were", "be", "been",
"being", "have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "must", "shall",
"can", "need", "dare", "ought", "used", "to", "of", "in",
"for", "on", "with", "at", "by", "from", "as", "into",
"through", "during", "before", "after", "above", "below",
"between", "under", "again", "further", "then", "once"
}
def compress(self, text: str, preserve_structure: bool = True) -> str:
"""Remove stopwords while preserving meaning."""
if preserve_structure:
# Only remove from specific sections
return text
words = text.split()
filtered = [w for w in words if w.lower() not in self.stopwords]
return ' '.join(filtered)
class ContextTruncator:
"""Truncate context to fit token limits."""
def __init__(self, token_counter: TokenCounter):
self.counter = token_counter
def truncate(
self,
text: str,
max_tokens: int,
strategy: str = "end"
) -> str:
"""Truncate text to max tokens."""
current_tokens = self.counter.count(text)
if current_tokens <= max_tokens:
return text
if strategy == "end":
return self._truncate_end(text, max_tokens)
elif strategy == "start":
return self._truncate_start(text, max_tokens)
elif strategy == "middle":
return self._truncate_middle(text, max_tokens)
return text
def _truncate_end(self, text: str, max_tokens: int) -> str:
"""Truncate from end."""
words = text.split()
while self.counter.count(' '.join(words)) > max_tokens and words:
words = words[:-1]
return ' '.join(words) + "..."
def _truncate_start(self, text: str, max_tokens: int) -> str:
"""Truncate from start."""
words = text.split()
while self.counter.count(' '.join(words)) > max_tokens and words:
words = words[1:]
return "..." + ' '.join(words)
def _truncate_middle(self, text: str, max_tokens: int) -> str:
"""Keep start and end, remove middle."""
words = text.split()
half_tokens = max_tokens // 2
start_words = []
end_words = []
# Build start
for word in words:
if self.counter.count(' '.join(start_words + [word])) <= half_tokens:
start_words.append(word)
else:
break
# Build end
for word in reversed(words):
if self.counter.count(' '.join([word] + end_words)) <= half_tokens:
end_words.insert(0, word)
else:
break
return ' '.join(start_words) + " ... " + ' '.join(end_words)
class LLMCompressor:
"""Use LLM to compress prompts."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def compress(
self,
text: str,
target_ratio: float = 0.5
) -> str:
"""Compress text using LLM summarization."""
prompt = f"""Compress the following text to approximately {int(target_ratio * 100)}% of its original length.
Preserve all key information, facts, and meaning.
Remove redundancy and verbose language.
Text to compress:
{text}
Compressed version:"""
return await self.llm.generate(prompt)
class PromptCompressor:
"""Comprehensive prompt compression."""
def __init__(self, llm_client: Any = None):
self.token_counter = TokenCounter()
self.whitespace = WhitespaceCompressor()
self.truncator = ContextTruncator(self.token_counter)
self.llm_compressor = LLMCompressor(llm_client) if llm_client else None
async def compress(
self,
prompt: str,
max_tokens: int = None,
target_ratio: float = None
) -> CompressionResult:
"""Compress prompt."""
original_tokens = self.token_counter.count(prompt)
compressed = prompt
# Step 1: Whitespace compression (always)
compressed = self.whitespace.compress(compressed)
# Step 2: Check if we need more compression
current_tokens = self.token_counter.count(compressed)
if max_tokens and current_tokens > max_tokens:
# Use truncation
compressed = self.truncator.truncate(compressed, max_tokens)
elif target_ratio and self.llm_compressor:
# Use LLM compression
compressed = await self.llm_compressor.compress(compressed, target_ratio)
final_tokens = self.token_counter.count(compressed)
return CompressionResult(
original_tokens=original_tokens,
compressed_tokens=final_tokens,
compression_ratio=final_tokens / original_tokens if original_tokens > 0 else 1,
compressed_text=compressed
)
Production Cost Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
app = FastAPI()
class GenerateRequest(BaseModel):
prompt: str
model: Optional[str] = None
max_tokens: int = 1000
use_cache: bool = True
compress: bool = True
class CostReportRequest(BaseModel):
start_date: Optional[str] = None
end_date: Optional[str] = None
group_by: str = "model"
# Initialize components
cost_tracker = CostTracker()
router = ModelRouter()
compressor = PromptCompressor()
cache = ExactCache()
@app.post("/v1/generate")
async def generate(request: GenerateRequest) -> dict:
"""Generate with cost optimization."""
prompt = request.prompt
# Step 1: Check cache
if request.use_cache:
cached = cache.get(prompt)
if cached:
cost_tracker.record_usage(
request_id="cached",
model=request.model or "cached",
input_tokens=0,
output_tokens=0,
cached=True
)
return {
"response": cached,
"cached": True,
"cost": 0
}
# Step 2: Compress prompt
if request.compress:
result = await compressor.compress(prompt)
prompt = result.compressed_text
# Step 3: Route to model
model = request.model or router.route(prompt)
# Step 4: Generate (mock - would call actual LLM)
response = f"Generated response for: {prompt[:50]}..."
input_tokens = len(prompt) // 4
output_tokens = len(response) // 4
# Step 5: Record usage
record = cost_tracker.record_usage(
request_id="req_123",
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens
)
# Step 6: Cache response
if request.use_cache:
cache.set(request.prompt, response)
return {
"response": response,
"model": model,
"cached": False,
"cost": record.cost,
"tokens": {
"input": input_tokens,
"output": output_tokens
}
}
@app.get("/v1/costs/daily")
async def get_daily_costs(days: int = 7) -> list[dict]:
"""Get daily costs."""
from datetime import datetime, timedelta
results = []
for i in range(days):
date = datetime.now() - timedelta(days=i)
cost = cost_tracker.get_daily_cost(date)
results.append({
"date": date.strftime("%Y-%m-%d"),
"cost": cost
})
return results
@app.get("/v1/costs/by-model")
async def get_costs_by_model() -> dict:
"""Get costs by model."""
return cost_tracker.get_cost_by_model()
@app.get("/v1/costs/report")
async def get_cost_report() -> dict:
"""Get optimization report."""
return cost_tracker.get_optimization_report()
@app.get("/v1/cache/stats")
async def get_cache_stats() -> dict:
"""Get cache statistics."""
return {
"size": len(cache.cache),
"savings": cost_tracker.get_cache_savings()
}
@app.post("/v1/estimate")
async def estimate_cost(prompt: str, model: str = "gpt-3.5-turbo") -> dict:
"""Estimate cost for a prompt."""
pricing = PRICING_CATALOG.get(model)
if not pricing:
raise HTTPException(status_code=404, detail="Model not found")
token_counter = TokenCounter()
input_tokens = token_counter.count(prompt)
# Estimate output tokens (rough)
estimated_output = min(input_tokens * 2, 1000)
cost = pricing.calculate_cost(input_tokens, estimated_output)
return {
"model": model,
"input_tokens": input_tokens,
"estimated_output_tokens": estimated_output,
"estimated_cost": cost
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Pricing: https://openai.com/pricing
- Anthropic Pricing: https://www.anthropic.com/pricing
- LLMLingua: https://github.com/microsoft/LLMLingua
- GPTCache: https://github.com/zilliztech/GPTCache
- Semantic Kernel: https://github.com/microsoft/semantic-kernel
Conclusion
LLM cost optimization is about applying the right technique at the right time. Start with measurement—you can't optimize what you don't track. Implement cost tracking from day one and monitor spending by model, feature, and user segment. Semantic caching typically provides the highest ROI: many applications have significant query repetition, and a 30% cache hit rate can reduce costs by 30% with minimal quality impact. Model routing is the next lever—use GPT-4 for complex reasoning tasks but route simple queries to GPT-3.5-turbo or even smaller models. The key is building a quality feedback loop so you can verify that cheaper models meet your quality bar. Prompt compression reduces token counts without changing model selection: remove whitespace, truncate context intelligently, or use LLM-based summarization for long inputs. Batching amortizes API overhead and can reduce costs for high-volume applications. The most effective approach combines all these techniques: cache first, compress what remains, route to the cheapest model that meets quality requirements, and batch where possible. Monitor continuously and adjust thresholds based on quality metrics. With systematic optimization, 50-80% cost reduction is achievable for most LLM applications.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.