Introduction: LLM API costs can escalate quickly—a single GPT-4 call costs 100x more than GPT-4o-mini for the same tokens. Effective cost optimization requires a multi-pronged approach: intelligent model routing based on task complexity, aggressive caching for repeated queries, prompt optimization to reduce token usage, and batching to maximize throughput. This guide covers practical cost optimization strategies: model tiering and routing, token reduction techniques, caching layers, batch processing, and cost monitoring that helps you understand and control spending.

Model Tiering and Routing
from dataclasses import dataclass
from typing import Callable, Optional
from enum import Enum
class ModelTier(str, Enum):
ECONOMY = "economy" # GPT-4o-mini, Claude Haiku
STANDARD = "standard" # GPT-4o, Claude Sonnet
PREMIUM = "premium" # GPT-4, Claude Opus
@dataclass
class ModelConfig:
"""Model configuration with pricing."""
name: str
tier: ModelTier
input_cost_per_1m: float # Cost per 1M input tokens
output_cost_per_1m: float # Cost per 1M output tokens
max_tokens: int
def estimate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost for request."""
input_cost = (input_tokens / 1_000_000) * self.input_cost_per_1m
output_cost = (output_tokens / 1_000_000) * self.output_cost_per_1m
return input_cost + output_cost
# Model configurations (prices as of 2024)
MODELS = {
"gpt-4o-mini": ModelConfig(
name="gpt-4o-mini",
tier=ModelTier.ECONOMY,
input_cost_per_1m=0.15,
output_cost_per_1m=0.60,
max_tokens=128000
),
"gpt-4o": ModelConfig(
name="gpt-4o",
tier=ModelTier.STANDARD,
input_cost_per_1m=2.50,
output_cost_per_1m=10.00,
max_tokens=128000
),
"gpt-4-turbo": ModelConfig(
name="gpt-4-turbo",
tier=ModelTier.PREMIUM,
input_cost_per_1m=10.00,
output_cost_per_1m=30.00,
max_tokens=128000
),
"claude-3-haiku": ModelConfig(
name="claude-3-haiku-20240307",
tier=ModelTier.ECONOMY,
input_cost_per_1m=0.25,
output_cost_per_1m=1.25,
max_tokens=200000
),
"claude-3-sonnet": ModelConfig(
name="claude-3-5-sonnet-20241022",
tier=ModelTier.STANDARD,
input_cost_per_1m=3.00,
output_cost_per_1m=15.00,
max_tokens=200000
),
}
class CostAwareRouter:
"""Route requests to appropriate model tier."""
def __init__(self, client):
self.client = client
self.classifiers: list[tuple[Callable, ModelTier]] = []
def add_classifier(
self,
condition: Callable[[str], bool],
tier: ModelTier
):
"""Add routing rule."""
self.classifiers.append((condition, tier))
def classify_complexity(self, query: str) -> ModelTier:
"""Classify query complexity."""
# Check custom classifiers first
for condition, tier in self.classifiers:
if condition(query):
return tier
# Default heuristics
query_lower = query.lower()
# Premium tier indicators
premium_keywords = [
"analyze complex", "detailed analysis",
"multi-step reasoning", "compare and contrast",
"synthesize", "evaluate critically"
]
if any(kw in query_lower for kw in premium_keywords):
return ModelTier.PREMIUM
# Standard tier indicators
standard_keywords = [
"explain", "summarize", "write",
"describe", "outline", "draft"
]
if any(kw in query_lower for kw in standard_keywords):
return ModelTier.STANDARD
# Default to economy
return ModelTier.ECONOMY
def get_model(self, tier: ModelTier) -> ModelConfig:
"""Get model for tier."""
tier_models = {
ModelTier.ECONOMY: MODELS["gpt-4o-mini"],
ModelTier.STANDARD: MODELS["gpt-4o"],
ModelTier.PREMIUM: MODELS["gpt-4-turbo"]
}
return tier_models[tier]
def route(self, query: str) -> ModelConfig:
"""Route query to appropriate model."""
tier = self.classify_complexity(query)
return self.get_model(tier)
# Budget-constrained routing
class BudgetRouter:
"""Route with budget constraints."""
def __init__(self, daily_budget: float):
self.daily_budget = daily_budget
self.daily_spent = 0.0
self.request_count = 0
def can_afford(self, model: ModelConfig, estimated_tokens: int) -> bool:
"""Check if request fits budget."""
estimated_cost = model.estimate_cost(
estimated_tokens,
estimated_tokens // 2 # Assume output is half of input
)
return self.daily_spent + estimated_cost <= self.daily_budget
def route_within_budget(
self,
preferred_model: ModelConfig,
estimated_tokens: int
) -> ModelConfig:
"""Route to affordable model."""
if self.can_afford(preferred_model, estimated_tokens):
return preferred_model
# Try cheaper alternatives
cheaper_models = sorted(
MODELS.values(),
key=lambda m: m.input_cost_per_1m
)
for model in cheaper_models:
if self.can_afford(model, estimated_tokens):
return model
raise ValueError("No affordable model available")
def record_usage(self, model: ModelConfig, input_tokens: int, output_tokens: int):
"""Record usage for budget tracking."""
cost = model.estimate_cost(input_tokens, output_tokens)
self.daily_spent += cost
self.request_count += 1
def reset_daily(self):
"""Reset daily counters."""
self.daily_spent = 0.0
self.request_count = 0
Token Reduction
import tiktoken
from dataclasses import dataclass
@dataclass
class TokenStats:
"""Token usage statistics."""
original_tokens: int
optimized_tokens: int
savings_percent: float
class PromptOptimizer:
"""Optimize prompts to reduce token usage."""
def __init__(self, model: str = "gpt-4o"):
self.encoding = tiktoken.encoding_for_model(model)
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoding.encode(text))
def optimize_prompt(self, prompt: str) -> tuple[str, TokenStats]:
"""Optimize prompt for fewer tokens."""
original_tokens = self.count_tokens(prompt)
optimized = prompt
# Remove excessive whitespace
optimized = ' '.join(optimized.split())
# Remove redundant phrases
redundant = [
"Please ", "Could you please ",
"I would like you to ", "Can you ",
"I need you to ", "Would you mind "
]
for phrase in redundant:
optimized = optimized.replace(phrase, "")
# Shorten common phrases
replacements = {
"in order to": "to",
"as well as": "and",
"in addition to": "plus",
"with respect to": "regarding",
"in the event that": "if",
"at this point in time": "now",
"due to the fact that": "because"
}
for old, new in replacements.items():
optimized = optimized.replace(old, new)
optimized_tokens = self.count_tokens(optimized)
savings = (original_tokens - optimized_tokens) / original_tokens * 100
return optimized, TokenStats(
original_tokens=original_tokens,
optimized_tokens=optimized_tokens,
savings_percent=savings
)
def truncate_context(
self,
context: str,
max_tokens: int,
preserve_start: int = 100,
preserve_end: int = 100
) -> str:
"""Truncate context while preserving start and end."""
tokens = self.encoding.encode(context)
if len(tokens) <= max_tokens:
return context
# Keep start and end
start_tokens = tokens[:preserve_start]
end_tokens = tokens[-preserve_end:]
# Calculate middle budget
middle_budget = max_tokens - preserve_start - preserve_end - 10
if middle_budget > 0:
middle_start = preserve_start
middle_end = len(tokens) - preserve_end
middle_tokens = tokens[middle_start:middle_start + middle_budget]
truncated = start_tokens + middle_tokens + end_tokens
else:
truncated = start_tokens + end_tokens
return self.encoding.decode(truncated)
class OutputLimiter:
"""Limit output tokens for cost control."""
def __init__(self):
self.task_limits = {
"classification": 50,
"extraction": 200,
"summary": 300,
"explanation": 500,
"generation": 1000
}
def get_limit(self, task_type: str) -> int:
"""Get appropriate output limit for task."""
return self.task_limits.get(task_type, 500)
def add_length_instruction(
self,
prompt: str,
task_type: str
) -> str:
"""Add length constraint to prompt."""
limit = self.get_limit(task_type)
word_limit = int(limit * 0.75) # Rough token to word conversion
return f"{prompt}\n\nKeep your response under {word_limit} words."
# Compression for context
class ContextCompressor:
"""Compress context to reduce tokens."""
def __init__(self, client):
self.client = client
self.optimizer = PromptOptimizer()
def compress(
self,
context: str,
target_ratio: float = 0.5
) -> str:
"""Compress context using LLM."""
original_tokens = self.optimizer.count_tokens(context)
target_tokens = int(original_tokens * target_ratio)
target_words = int(target_tokens * 0.75)
prompt = f"""Compress the following text to approximately {target_words} words.
Preserve all key facts and important details.
Text:
{context}
Compressed version:"""
response = self.client.chat.completions.create(
model="gpt-4o-mini", # Use cheap model for compression
messages=[{"role": "user", "content": prompt}],
max_tokens=target_tokens + 100
)
return response.choices[0].message.content
Batch Processing
from dataclasses import dataclass
from typing import Callable
import asyncio
from datetime import datetime, timedelta
@dataclass
class BatchRequest:
"""A request in a batch."""
id: str
prompt: str
callback: Callable[[str], None] = None
created_at: datetime = None
@dataclass
class BatchResult:
"""Result of batch processing."""
id: str
response: str
tokens_used: int
cost: float
class BatchProcessor:
"""Process requests in batches for efficiency."""
def __init__(
self,
client,
model: str = "gpt-4o-mini",
batch_size: int = 20,
max_wait_seconds: float = 5.0
):
self.client = client
self.model = model
self.batch_size = batch_size
self.max_wait = max_wait_seconds
self.pending: list[BatchRequest] = []
self.results: dict[str, BatchResult] = {}
async def add_request(self, request: BatchRequest) -> str:
"""Add request to batch queue."""
request.created_at = datetime.now()
self.pending.append(request)
# Check if batch is ready
if len(self.pending) >= self.batch_size:
await self._process_batch()
return request.id
async def _process_batch(self):
"""Process pending batch."""
if not self.pending:
return
batch = self.pending[:self.batch_size]
self.pending = self.pending[self.batch_size:]
# Create batch prompt
batch_prompt = self._create_batch_prompt(batch)
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": batch_prompt}],
response_format={"type": "json_object"}
)
# Parse batch response
import json
results = json.loads(response.choices[0].message.content)
# Distribute results
for req in batch:
result_text = results.get(req.id, "")
self.results[req.id] = BatchResult(
id=req.id,
response=result_text,
tokens_used=response.usage.total_tokens // len(batch),
cost=0 # Calculate based on model
)
if req.callback:
req.callback(result_text)
def _create_batch_prompt(self, batch: list[BatchRequest]) -> str:
"""Create combined prompt for batch."""
prompts = "\n\n".join([
f"[{req.id}]\n{req.prompt}"
for req in batch
])
return f"""Process each of the following requests and return a JSON object
with the request IDs as keys and responses as values.
Requests:
{prompts}
Return JSON: {{"id1": "response1", "id2": "response2", ...}}"""
async def flush(self):
"""Process any remaining requests."""
while self.pending:
await self._process_batch()
def get_result(self, request_id: str) -> Optional[BatchResult]:
"""Get result for request."""
return self.results.get(request_id)
# Async batch with timeout
class TimedBatchProcessor:
"""Batch processor with time-based flushing."""
def __init__(
self,
client,
model: str = "gpt-4o-mini",
batch_size: int = 20,
flush_interval: float = 2.0
):
self.processor = BatchProcessor(client, model, batch_size)
self.flush_interval = flush_interval
self._flush_task = None
async def start(self):
"""Start background flush task."""
self._flush_task = asyncio.create_task(self._flush_loop())
async def stop(self):
"""Stop and flush remaining."""
if self._flush_task:
self._flush_task.cancel()
await self.processor.flush()
async def _flush_loop(self):
"""Periodically flush pending requests."""
while True:
await asyncio.sleep(self.flush_interval)
await self.processor.flush()
async def process(self, prompt: str) -> str:
"""Process single request through batch."""
import uuid
request_id = str(uuid.uuid4())
result_future = asyncio.Future()
def callback(response: str):
result_future.set_result(response)
request = BatchRequest(
id=request_id,
prompt=prompt,
callback=callback
)
await self.processor.add_request(request)
return await result_future
Cost Monitoring
from dataclasses import dataclass, field
from datetime import datetime, date
from collections import defaultdict
@dataclass
class UsageRecord:
"""Record of API usage."""
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
cost: float
request_type: str = "unknown"
class CostTracker:
"""Track and analyze LLM costs."""
def __init__(self):
self.records: list[UsageRecord] = []
self.daily_totals: dict[date, float] = defaultdict(float)
self.model_totals: dict[str, float] = defaultdict(float)
def record(
self,
model: str,
input_tokens: int,
output_tokens: int,
request_type: str = "unknown"
):
"""Record usage."""
config = MODELS.get(model)
if config:
cost = config.estimate_cost(input_tokens, output_tokens)
else:
# Default pricing
cost = (input_tokens + output_tokens) * 0.00001
record = UsageRecord(
timestamp=datetime.now(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
request_type=request_type
)
self.records.append(record)
self.daily_totals[date.today()] += cost
self.model_totals[model] += cost
def get_daily_cost(self, day: date = None) -> float:
"""Get cost for specific day."""
day = day or date.today()
return self.daily_totals.get(day, 0.0)
def get_total_cost(self) -> float:
"""Get total cost across all time."""
return sum(r.cost for r in self.records)
def get_cost_by_model(self) -> dict[str, float]:
"""Get cost breakdown by model."""
return dict(self.model_totals)
def get_cost_by_type(self) -> dict[str, float]:
"""Get cost breakdown by request type."""
by_type = defaultdict(float)
for record in self.records:
by_type[record.request_type] += record.cost
return dict(by_type)
def get_summary(self) -> dict:
"""Get cost summary."""
total_requests = len(self.records)
total_cost = self.get_total_cost()
return {
"total_requests": total_requests,
"total_cost": total_cost,
"avg_cost_per_request": total_cost / total_requests if total_requests > 0 else 0,
"today_cost": self.get_daily_cost(),
"by_model": self.get_cost_by_model(),
"by_type": self.get_cost_by_type()
}
class CostAlertManager:
"""Manage cost alerts and limits."""
def __init__(self, tracker: CostTracker):
self.tracker = tracker
self.alerts: list[dict] = []
self.daily_limit: float = None
self.monthly_limit: float = None
self.per_request_limit: float = None
def set_limits(
self,
daily: float = None,
monthly: float = None,
per_request: float = None
):
"""Set cost limits."""
self.daily_limit = daily
self.monthly_limit = monthly
self.per_request_limit = per_request
def check_limits(self) -> list[str]:
"""Check if any limits are exceeded."""
warnings = []
if self.daily_limit:
daily_cost = self.tracker.get_daily_cost()
if daily_cost >= self.daily_limit * 0.8:
warnings.append(f"Daily cost at {daily_cost:.2f}/{self.daily_limit:.2f}")
if self.monthly_limit:
monthly_cost = self.tracker.get_total_cost() # Simplified
if monthly_cost >= self.monthly_limit * 0.8:
warnings.append(f"Monthly cost at {monthly_cost:.2f}/{self.monthly_limit:.2f}")
return warnings
def should_block_request(self, estimated_cost: float) -> bool:
"""Check if request should be blocked."""
if self.per_request_limit and estimated_cost > self.per_request_limit:
return True
if self.daily_limit:
if self.tracker.get_daily_cost() + estimated_cost > self.daily_limit:
return True
return False
Production Cost Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
from openai import OpenAI
client = OpenAI()
router = CostAwareRouter(client)
budget_router = BudgetRouter(daily_budget=100.0)
optimizer = PromptOptimizer()
tracker = CostTracker()
alerts = CostAlertManager(tracker)
alerts.set_limits(daily=100.0, per_request=1.0)
class ChatRequest(BaseModel):
prompt: str
system_prompt: str = "You are a helpful assistant."
optimize_prompt: bool = True
max_tier: Optional[str] = None
class CostEstimateRequest(BaseModel):
prompt: str
model: Optional[str] = None
@app.post("/v1/chat")
async def chat_optimized(request: ChatRequest):
"""Chat with cost optimization."""
prompt = request.prompt
# Optimize prompt
if request.optimize_prompt:
prompt, stats = optimizer.optimize_prompt(prompt)
# Route to appropriate model
model_config = router.route(prompt)
# Check tier constraint
if request.max_tier:
max_tier = ModelTier(request.max_tier)
if model_config.tier.value > max_tier.value:
model_config = router.get_model(max_tier)
# Estimate cost
input_tokens = optimizer.count_tokens(prompt)
estimated_cost = model_config.estimate_cost(input_tokens, input_tokens // 2)
# Check limits
if alerts.should_block_request(estimated_cost):
raise HTTPException(429, "Cost limit exceeded")
# Make request
response = client.chat.completions.create(
model=model_config.name,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": prompt}
]
)
# Record usage
tracker.record(
model=model_config.name,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
request_type="chat"
)
actual_cost = model_config.estimate_cost(
response.usage.prompt_tokens,
response.usage.completion_tokens
)
return {
"response": response.choices[0].message.content,
"model": model_config.name,
"tier": model_config.tier.value,
"cost": actual_cost,
"tokens": {
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
}
@app.post("/v1/estimate")
async def estimate_cost(request: CostEstimateRequest):
"""Estimate cost for request."""
input_tokens = optimizer.count_tokens(request.prompt)
if request.model:
model_config = MODELS.get(request.model)
if not model_config:
raise HTTPException(400, f"Unknown model: {request.model}")
else:
model_config = router.route(request.prompt)
# Estimate output as half of input
estimated_output = input_tokens // 2
cost = model_config.estimate_cost(input_tokens, estimated_output)
return {
"model": model_config.name,
"input_tokens": input_tokens,
"estimated_output_tokens": estimated_output,
"estimated_cost": cost
}
@app.get("/v1/costs")
async def get_costs():
"""Get cost summary."""
return tracker.get_summary()
@app.get("/v1/costs/alerts")
async def get_alerts():
"""Get cost alerts."""
return {
"warnings": alerts.check_limits(),
"daily_limit": alerts.daily_limit,
"daily_spent": tracker.get_daily_cost()
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Pricing: https://openai.com/pricing
- Anthropic Pricing: https://www.anthropic.com/pricing
- tiktoken: https://github.com/openai/tiktoken
- LangChain Callbacks: https://python.langchain.com/docs/modules/callbacks/
Conclusion
LLM cost optimization is about using the right model for each task. Implement model tiering—route simple queries to cheap models (GPT-4o-mini, Claude Haiku) and reserve expensive models for complex reasoning. Optimize prompts to reduce token count without losing meaning. Use caching aggressively for repeated or similar queries. Batch requests when latency permits to reduce per-request overhead. Monitor costs continuously with alerts for budget thresholds. The goal isn’t minimizing cost at all costs—it’s maximizing value per dollar spent. A well-optimized system can reduce LLM costs by 80%+ while maintaining quality for the tasks that matter.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply