Introduction: Context windows are the most valuable resource in LLM applications. Every token matters—waste space on irrelevant content and you lose room for information that could improve responses. Effective context window optimization means fitting the right information in the right amount of space. This guide covers practical strategies: prioritizing content by relevance, chunking documents intelligently, allocating tokens across different content types, and building systems that automatically optimize context for each request.

Token Budget Management
from dataclasses import dataclass, field
from typing import Any, Optional
import tiktoken
@dataclass
class TokenBudget:
"""Token budget for context window."""
total_limit: int
system_prompt: int = 0
conversation_history: int = 0
retrieved_context: int = 0
user_query: int = 0
reserved_output: int = 1000
@property
def used(self) -> int:
"""Total tokens used."""
return (
self.system_prompt +
self.conversation_history +
self.retrieved_context +
self.user_query
)
@property
def available(self) -> int:
"""Available tokens for content."""
return self.total_limit - self.reserved_output - self.used
@property
def utilization(self) -> float:
"""Context utilization percentage."""
usable = self.total_limit - self.reserved_output
return self.used / usable if usable > 0 else 0
class TokenCounter:
"""Count tokens for different models."""
MODEL_LIMITS = {
"gpt-4o": 128000,
"gpt-4o-mini": 128000,
"gpt-4-turbo": 128000,
"gpt-3.5-turbo": 16385,
"claude-3-5-sonnet": 200000,
"claude-3-haiku": 200000,
"gemini-1.5-pro": 1000000,
"gemini-1.5-flash": 1000000
}
def __init__(self, model: str = "gpt-4o-mini"):
self.model = model
try:
self.encoder = tiktoken.encoding_for_model(model)
except KeyError:
self.encoder = tiktoken.get_encoding("cl100k_base")
def count(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoder.encode(text))
def count_messages(self, messages: list[dict]) -> int:
"""Count tokens in chat messages."""
total = 0
for message in messages:
total += 4 # Message overhead
total += self.count(message.get("content", ""))
if message.get("name"):
total += self.count(message["name"])
total += 2 # Reply priming
return total
def get_limit(self, model: str = None) -> int:
"""Get context limit for model."""
return self.MODEL_LIMITS.get(model or self.model, 8000)
def truncate(self, text: str, max_tokens: int) -> str:
"""Truncate text to token limit."""
tokens = self.encoder.encode(text)
if len(tokens) <= max_tokens:
return text
return self.encoder.decode(tokens[:max_tokens])
class BudgetAllocator:
"""Allocate token budget across content types."""
def __init__(self, counter: TokenCounter):
self.counter = counter
def allocate(
self,
model: str,
system_prompt: str = None,
conversation: list[dict] = None,
query: str = None,
reserved_output: int = 1000
) -> TokenBudget:
"""Create token budget allocation."""
total_limit = self.counter.get_limit(model)
budget = TokenBudget(
total_limit=total_limit,
reserved_output=reserved_output
)
if system_prompt:
budget.system_prompt = self.counter.count(system_prompt)
if conversation:
budget.conversation_history = self.counter.count_messages(conversation)
if query:
budget.user_query = self.counter.count(query)
return budget
def allocate_proportional(
self,
budget: TokenBudget,
content_types: dict[str, float]
) -> dict[str, int]:
"""Allocate available tokens proportionally."""
available = budget.available
total_weight = sum(content_types.values())
allocations = {}
for content_type, weight in content_types.items():
allocations[content_type] = int(available * weight / total_weight)
return allocations
Content Prioritization
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
@dataclass
class PrioritizedContent:
"""Content with priority score."""
content: str
score: float
tokens: int
source: str = None
metadata: dict = None
class ContentPrioritizer:
"""Prioritize content by relevance."""
def __init__(self, embedding_client: Any, model: str = "text-embedding-3-small"):
self.embedding_client = embedding_client
self.model = model
async def prioritize_by_relevance(
self,
query: str,
contents: list[str],
counter: TokenCounter
) -> list[PrioritizedContent]:
"""Prioritize contents by relevance to query."""
# Get embeddings
all_texts = [query] + contents
response = await self.embedding_client.embeddings.create(
model=self.model,
input=all_texts
)
query_emb = np.array(response.data[0].embedding)
prioritized = []
for i, content in enumerate(contents):
content_emb = np.array(response.data[i + 1].embedding)
# Cosine similarity
similarity = np.dot(query_emb, content_emb) / (
np.linalg.norm(query_emb) * np.linalg.norm(content_emb)
)
prioritized.append(PrioritizedContent(
content=content,
score=float(similarity),
tokens=counter.count(content)
))
# Sort by score descending
prioritized.sort(key=lambda x: x.score, reverse=True)
return prioritized
def prioritize_by_recency(
self,
contents: list[dict],
counter: TokenCounter
) -> list[PrioritizedContent]:
"""Prioritize by recency (for conversation history)."""
prioritized = []
n = len(contents)
for i, item in enumerate(contents):
# More recent = higher score
recency_score = (i + 1) / n
prioritized.append(PrioritizedContent(
content=item.get("content", str(item)),
score=recency_score,
tokens=counter.count(item.get("content", str(item))),
metadata={"index": i}
))
return prioritized
def prioritize_combined(
self,
contents: list[PrioritizedContent],
relevance_weight: float = 0.7,
recency_weight: float = 0.3
) -> list[PrioritizedContent]:
"""Combine relevance and recency scores."""
# Normalize scores
max_score = max(c.score for c in contents) if contents else 1
for i, content in enumerate(contents):
relevance = content.score / max_score if max_score > 0 else 0
recency = (i + 1) / len(contents)
content.score = (
relevance * relevance_weight +
recency * recency_weight
)
contents.sort(key=lambda x: x.score, reverse=True)
return contents
class GreedySelector:
"""Select content greedily within token budget."""
def select(
self,
prioritized: list[PrioritizedContent],
max_tokens: int
) -> list[PrioritizedContent]:
"""Select highest priority content within budget."""
selected = []
used_tokens = 0
for content in prioritized:
if used_tokens + content.tokens <= max_tokens:
selected.append(content)
used_tokens += content.tokens
return selected
def select_with_minimum(
self,
prioritized: list[PrioritizedContent],
max_tokens: int,
min_items: int = 1
) -> list[PrioritizedContent]:
"""Select with minimum item guarantee."""
if not prioritized:
return []
# Always include top items up to minimum
selected = prioritized[:min_items]
used_tokens = sum(c.tokens for c in selected)
# Add more if budget allows
for content in prioritized[min_items:]:
if used_tokens + content.tokens <= max_tokens:
selected.append(content)
used_tokens += content.tokens
return selected
Smart Chunking
from dataclasses import dataclass
from typing import Any, Optional
import re
@dataclass
class Chunk:
"""A chunk of content."""
content: str
tokens: int
start_index: int = 0
end_index: int = 0
metadata: dict = None
class SmartChunker:
"""Chunk content intelligently."""
def __init__(self, counter: TokenCounter):
self.counter = counter
def chunk_by_tokens(
self,
text: str,
chunk_size: int = 500,
overlap: int = 50
) -> list[Chunk]:
"""Chunk by token count with overlap."""
tokens = self.counter.encoder.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = self.counter.encoder.decode(chunk_tokens)
chunks.append(Chunk(
content=chunk_text,
tokens=len(chunk_tokens),
start_index=start,
end_index=end
))
start = end - overlap if end < len(tokens) else end
return chunks
def chunk_by_sentences(
self,
text: str,
max_chunk_tokens: int = 500
) -> list[Chunk]:
"""Chunk by sentences, respecting token limit."""
# Split into sentences
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = self.counter.count(sentence)
if current_tokens + sentence_tokens > max_chunk_tokens:
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append(Chunk(
content=chunk_text,
tokens=current_tokens
))
current_chunk = [sentence]
current_tokens = sentence_tokens
else:
current_chunk.append(sentence)
current_tokens += sentence_tokens
# Add remaining
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append(Chunk(
content=chunk_text,
tokens=current_tokens
))
return chunks
def chunk_by_paragraphs(
self,
text: str,
max_chunk_tokens: int = 500
) -> list[Chunk]:
"""Chunk by paragraphs."""
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_tokens = 0
for para in paragraphs:
para = para.strip()
if not para:
continue
para_tokens = self.counter.count(para)
if para_tokens > max_chunk_tokens:
# Paragraph too large, chunk by sentences
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(Chunk(
content=chunk_text,
tokens=current_tokens
))
current_chunk = []
current_tokens = 0
# Add sentence chunks
sentence_chunks = self.chunk_by_sentences(para, max_chunk_tokens)
chunks.extend(sentence_chunks)
elif current_tokens + para_tokens > max_chunk_tokens:
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(Chunk(
content=chunk_text,
tokens=current_tokens
))
current_chunk = [para]
current_tokens = para_tokens
else:
current_chunk.append(para)
current_tokens += para_tokens
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(Chunk(
content=chunk_text,
tokens=current_tokens
))
return chunks
def chunk_code(
self,
code: str,
max_chunk_tokens: int = 500
) -> list[Chunk]:
"""Chunk code by logical units."""
# Split by function/class definitions
pattern = r'((?:def |class |async def )[^\n]+)'
parts = re.split(pattern, code)
chunks = []
current_chunk = []
current_tokens = 0
for part in parts:
if not part.strip():
continue
part_tokens = self.counter.count(part)
if current_tokens + part_tokens > max_chunk_tokens:
if current_chunk:
chunk_text = ''.join(current_chunk)
chunks.append(Chunk(
content=chunk_text,
tokens=current_tokens
))
current_chunk = [part]
current_tokens = part_tokens
else:
current_chunk.append(part)
current_tokens += part_tokens
if current_chunk:
chunk_text = ''.join(current_chunk)
chunks.append(Chunk(
content=chunk_text,
tokens=current_tokens
))
return chunks
Context Builder
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class BuiltContext:
"""Built context ready for LLM."""
messages: list[dict]
total_tokens: int
budget: TokenBudget
included_sources: list[str]
class ContextBuilder:
"""Build optimized context for LLM calls."""
def __init__(
self,
counter: TokenCounter,
prioritizer: ContentPrioritizer = None
):
self.counter = counter
self.prioritizer = prioritizer
self.allocator = BudgetAllocator(counter)
self.selector = GreedySelector()
async def build(
self,
model: str,
query: str,
system_prompt: str = None,
conversation: list[dict] = None,
documents: list[str] = None,
reserved_output: int = 1000
) -> BuiltContext:
"""Build optimized context."""
# Create budget
budget = self.allocator.allocate(
model=model,
system_prompt=system_prompt,
conversation=conversation,
query=query,
reserved_output=reserved_output
)
messages = []
included_sources = []
# Add system prompt
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# Add retrieved documents if available
if documents and self.prioritizer:
# Prioritize documents
prioritized = await self.prioritizer.prioritize_by_relevance(
query, documents, self.counter
)
# Select within budget
selected = self.selector.select(prioritized, budget.available)
if selected:
context_text = "\n\n".join(c.content for c in selected)
messages.append({
"role": "system",
"content": f"Relevant context:\n{context_text}"
})
budget.retrieved_context = sum(c.tokens for c in selected)
included_sources = [c.source for c in selected if c.source]
# Add conversation history
if conversation:
# Truncate if needed
available_for_history = budget.available - budget.user_query
truncated = self._truncate_conversation(
conversation, available_for_history
)
messages.extend(truncated)
# Add user query
messages.append({"role": "user", "content": query})
total_tokens = self.counter.count_messages(messages)
return BuiltContext(
messages=messages,
total_tokens=total_tokens,
budget=budget,
included_sources=included_sources
)
def _truncate_conversation(
self,
conversation: list[dict],
max_tokens: int
) -> list[dict]:
"""Truncate conversation to fit budget."""
# Keep most recent messages
result = []
used_tokens = 0
for message in reversed(conversation):
msg_tokens = self.counter.count(message.get("content", "")) + 4
if used_tokens + msg_tokens > max_tokens:
break
result.insert(0, message)
used_tokens += msg_tokens
return result
class AdaptiveContextBuilder:
"""Build context with adaptive allocation."""
def __init__(
self,
counter: TokenCounter,
embedding_client: Any
):
self.counter = counter
self.prioritizer = ContentPrioritizer(embedding_client)
self.chunker = SmartChunker(counter)
async def build_adaptive(
self,
model: str,
query: str,
system_prompt: str = None,
conversation: list[dict] = None,
documents: list[str] = None,
allocation_strategy: str = "balanced"
) -> BuiltContext:
"""Build with adaptive token allocation."""
total_limit = self.counter.get_limit(model)
reserved_output = 1000
available = total_limit - reserved_output
# Calculate fixed costs
system_tokens = self.counter.count(system_prompt) if system_prompt else 0
query_tokens = self.counter.count(query)
remaining = available - system_tokens - query_tokens
# Allocate based on strategy
if allocation_strategy == "context_heavy":
# Prioritize retrieved context
context_budget = int(remaining * 0.7)
history_budget = int(remaining * 0.3)
elif allocation_strategy == "history_heavy":
# Prioritize conversation history
context_budget = int(remaining * 0.3)
history_budget = int(remaining * 0.7)
else:
# Balanced
context_budget = int(remaining * 0.5)
history_budget = int(remaining * 0.5)
messages = []
# System prompt
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# Retrieved context
if documents:
context_content = await self._build_context_section(
query, documents, context_budget
)
if context_content:
messages.append({
"role": "system",
"content": f"Context:\n{context_content}"
})
# Conversation history
if conversation:
truncated = self._truncate_conversation(conversation, history_budget)
messages.extend(truncated)
# Query
messages.append({"role": "user", "content": query})
return BuiltContext(
messages=messages,
total_tokens=self.counter.count_messages(messages),
budget=None,
included_sources=[]
)
async def _build_context_section(
self,
query: str,
documents: list[str],
budget: int
) -> str:
"""Build context section within budget."""
# Chunk documents
all_chunks = []
for doc in documents:
chunks = self.chunker.chunk_by_paragraphs(doc, max_chunk_tokens=300)
all_chunks.extend(chunks)
# Prioritize chunks
chunk_texts = [c.content for c in all_chunks]
prioritized = await self.prioritizer.prioritize_by_relevance(
query, chunk_texts, self.counter
)
# Select within budget
selector = GreedySelector()
selected = selector.select(prioritized, budget)
return "\n\n".join(c.content for c in selected)
def _truncate_conversation(
self,
conversation: list[dict],
max_tokens: int
) -> list[dict]:
"""Truncate conversation to fit budget."""
result = []
used = 0
for msg in reversed(conversation):
tokens = self.counter.count(msg.get("content", "")) + 4
if used + tokens > max_tokens:
break
result.insert(0, msg)
used += tokens
return result
Production Context Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
counter = TokenCounter()
context_builder = None # Initialize with embedding client
class BuildContextRequest(BaseModel):
model: str = "gpt-4o-mini"
query: str
system_prompt: Optional[str] = None
conversation: Optional[list[dict]] = None
documents: Optional[list[str]] = None
reserved_output: int = 1000
allocation_strategy: str = "balanced"
class CountTokensRequest(BaseModel):
text: str
model: str = "gpt-4o-mini"
class ChunkRequest(BaseModel):
text: str
method: str = "paragraphs"
max_chunk_tokens: int = 500
@app.post("/v1/context/build")
async def build_context(request: BuildContextRequest):
"""Build optimized context."""
result = await context_builder.build_adaptive(
model=request.model,
query=request.query,
system_prompt=request.system_prompt,
conversation=request.conversation,
documents=request.documents,
allocation_strategy=request.allocation_strategy
)
return {
"messages": result.messages,
"total_tokens": result.total_tokens,
"model_limit": counter.get_limit(request.model),
"utilization": result.total_tokens / counter.get_limit(request.model)
}
@app.post("/v1/tokens/count")
async def count_tokens(request: CountTokensRequest):
"""Count tokens in text."""
count = counter.count(request.text)
limit = counter.get_limit(request.model)
return {
"tokens": count,
"model": request.model,
"limit": limit,
"percentage": count / limit
}
@app.post("/v1/tokens/truncate")
async def truncate_text(text: str, max_tokens: int, model: str = "gpt-4o-mini"):
"""Truncate text to token limit."""
truncated = counter.truncate(text, max_tokens)
return {
"text": truncated,
"original_tokens": counter.count(text),
"truncated_tokens": counter.count(truncated)
}
@app.post("/v1/chunk")
async def chunk_text(request: ChunkRequest):
"""Chunk text into smaller pieces."""
chunker = SmartChunker(counter)
if request.method == "tokens":
chunks = chunker.chunk_by_tokens(request.text, request.max_chunk_tokens)
elif request.method == "sentences":
chunks = chunker.chunk_by_sentences(request.text, request.max_chunk_tokens)
elif request.method == "paragraphs":
chunks = chunker.chunk_by_paragraphs(request.text, request.max_chunk_tokens)
elif request.method == "code":
chunks = chunker.chunk_code(request.text, request.max_chunk_tokens)
else:
raise HTTPException(status_code=400, detail=f"Unknown method: {request.method}")
return {
"chunks": [
{"content": c.content, "tokens": c.tokens}
for c in chunks
],
"total_chunks": len(chunks)
}
@app.get("/v1/models")
async def list_models():
"""List supported models and their limits."""
return {
"models": [
{"name": name, "limit": limit}
for name, limit in TokenCounter.MODEL_LIMITS.items()
]
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- tiktoken: https://github.com/openai/tiktoken
- LangChain Text Splitters: https://python.langchain.com/docs/modules/data_connection/document_transformers/
- OpenAI Context Length: https://platform.openai.com/docs/models
- Anthropic Context Windows: https://docs.anthropic.com/en/docs/about-claude/models
Conclusion
Context window optimization is about making every token count. Start with accurate token counting using tiktoken to understand your actual usage. Create explicit token budgets that account for system prompts, conversation history, retrieved context, and reserved output space. Prioritize content by relevance to the query—not all context is equally valuable. Use smart chunking that respects semantic boundaries like sentences and paragraphs rather than arbitrary token splits. Implement greedy selection that picks the highest-value content within your budget. For complex applications, use adaptive allocation that adjusts the balance between conversation history and retrieved context based on the task. The key insight is that context window optimization is a constrained optimization problem—you're maximizing information value within a fixed token budget. Build systems that automatically make these tradeoffs so you get the best possible context for every request.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.