Introduction: Context windows define how much information an LLM can process at once—from 4K tokens in older models to 128K+ in modern ones. Effective context management means fitting the most relevant information within these limits while leaving room for generation. This guide covers practical context window strategies: token counting and budget allocation, content prioritization, compression techniques, dynamic context assembly, and handling conversations that exceed window limits gracefully.

Token Counting and Budget Allocation
import tiktoken
from dataclasses import dataclass
from typing import Optional
@dataclass
class TokenBudget:
"""Token budget allocation."""
total: int
system_prompt: int
conversation_history: int
retrieved_context: int
user_input: int
reserved_for_output: int
@property
def available(self) -> int:
"""Available tokens after allocations."""
used = (
self.system_prompt +
self.conversation_history +
self.retrieved_context +
self.user_input
)
return self.total - used - self.reserved_for_output
class TokenCounter:
"""Count tokens for different models."""
def __init__(self, model: str = "gpt-4o"):
self.model = model
self.encoding = self._get_encoding(model)
def _get_encoding(self, model: str):
"""Get tiktoken encoding for model."""
try:
return tiktoken.encoding_for_model(model)
except KeyError:
return tiktoken.get_encoding("cl100k_base")
def count(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoding.encode(text))
def count_messages(self, messages: list[dict]) -> int:
"""Count tokens in message list."""
# OpenAI message format overhead
tokens_per_message = 3
tokens_per_name = 1
total = 0
for message in messages:
total += tokens_per_message
for key, value in message.items():
total += self.count(str(value))
if key == "name":
total += tokens_per_name
total += 3 # Reply priming
return total
def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
"""Truncate text to fit token limit."""
tokens = self.encoding.encode(text)
if len(tokens) <= max_tokens:
return text
truncated_tokens = tokens[:max_tokens]
return self.encoding.decode(truncated_tokens)
class BudgetAllocator:
"""Allocate token budget across components."""
def __init__(
self,
model: str = "gpt-4o",
max_context: int = 128000,
output_reserve: int = 4096
):
self.counter = TokenCounter(model)
self.max_context = max_context
self.output_reserve = output_reserve
def allocate(
self,
system_prompt: str,
user_input: str,
history_priority: float = 0.3,
context_priority: float = 0.5
) -> TokenBudget:
"""Allocate budget based on priorities."""
# Fixed allocations
system_tokens = self.counter.count(system_prompt)
input_tokens = self.counter.count(user_input)
# Available for dynamic allocation
available = (
self.max_context -
system_tokens -
input_tokens -
self.output_reserve
)
# Allocate based on priorities
history_budget = int(available * history_priority)
context_budget = int(available * context_priority)
return TokenBudget(
total=self.max_context,
system_prompt=system_tokens,
conversation_history=history_budget,
retrieved_context=context_budget,
user_input=input_tokens,
reserved_for_output=self.output_reserve
)
def fit_content(
self,
content: str,
budget: int
) -> str:
"""Fit content within budget."""
current_tokens = self.counter.count(content)
if current_tokens <= budget:
return content
return self.counter.truncate_to_tokens(content, budget)
Content Prioritization
from dataclasses import dataclass
from typing import Callable
from enum import Enum
class Priority(int, Enum):
CRITICAL = 100
HIGH = 75
MEDIUM = 50
LOW = 25
OPTIONAL = 10
@dataclass
class ContextItem:
"""An item that can be included in context."""
content: str
priority: Priority
tokens: int
source: str
metadata: dict = None
class ContentPrioritizer:
"""Prioritize content for context inclusion."""
def __init__(self, counter: TokenCounter):
self.counter = counter
def prioritize(
self,
items: list[ContextItem],
budget: int
) -> list[ContextItem]:
"""Select items that fit within budget."""
# Sort by priority (highest first)
sorted_items = sorted(
items,
key=lambda x: x.priority.value,
reverse=True
)
selected = []
used_tokens = 0
for item in sorted_items:
if used_tokens + item.tokens <= budget:
selected.append(item)
used_tokens += item.tokens
return selected
def prioritize_with_recency(
self,
items: list[ContextItem],
budget: int,
recency_weight: float = 0.3
) -> list[ContextItem]:
"""Prioritize with recency bonus."""
# Add recency score (later items get bonus)
scored_items = []
for i, item in enumerate(items):
recency_score = (i / len(items)) * 100 * recency_weight
combined_score = item.priority.value + recency_score
scored_items.append((item, combined_score))
# Sort by combined score
scored_items.sort(key=lambda x: x[1], reverse=True)
selected = []
used_tokens = 0
for item, _ in scored_items:
if used_tokens + item.tokens <= budget:
selected.append(item)
used_tokens += item.tokens
return selected
class MessagePrioritizer:
"""Prioritize conversation messages."""
def __init__(self, counter: TokenCounter):
self.counter = counter
def select_messages(
self,
messages: list[dict],
budget: int,
keep_recent: int = 4
) -> list[dict]:
"""Select messages within budget, keeping recent ones."""
if not messages:
return []
# Always keep most recent messages
recent = messages[-keep_recent:] if len(messages) >= keep_recent else messages
recent_tokens = sum(
self.counter.count(m.get("content", ""))
for m in recent
)
if recent_tokens >= budget:
# Truncate recent messages if needed
return self._truncate_messages(recent, budget)
# Add older messages if budget allows
remaining_budget = budget - recent_tokens
older = messages[:-keep_recent] if len(messages) > keep_recent else []
# Add from most recent older messages
selected_older = []
for msg in reversed(older):
msg_tokens = self.counter.count(msg.get("content", ""))
if msg_tokens <= remaining_budget:
selected_older.insert(0, msg)
remaining_budget -= msg_tokens
return selected_older + recent
def _truncate_messages(
self,
messages: list[dict],
budget: int
) -> list[dict]:
"""Truncate messages to fit budget."""
result = []
remaining = budget
for msg in reversed(messages):
content = msg.get("content", "")
tokens = self.counter.count(content)
if tokens <= remaining:
result.insert(0, msg)
remaining -= tokens
else:
# Truncate this message
truncated = self.counter.truncate_to_tokens(content, remaining)
result.insert(0, {**msg, "content": truncated})
break
return result
Context Compression
from dataclasses import dataclass
@dataclass
class CompressionResult:
"""Result of context compression."""
original_tokens: int
compressed_tokens: int
content: str
compression_ratio: float
class LLMCompressor:
"""Compress context using LLM summarization."""
def __init__(self, client, counter: TokenCounter):
self.client = client
self.counter = counter
def compress(
self,
content: str,
target_tokens: int,
preserve_key_info: bool = True
) -> CompressionResult:
"""Compress content to target token count."""
original_tokens = self.counter.count(content)
if original_tokens <= target_tokens:
return CompressionResult(
original_tokens=original_tokens,
compressed_tokens=original_tokens,
content=content,
compression_ratio=1.0
)
# Calculate target word count (rough estimate)
target_words = int(target_tokens * 0.75)
prompt = f"""Compress the following text to approximately {target_words} words.
{"Preserve all key facts, names, dates, and specific details." if preserve_key_info else "Focus on main ideas only."}
Text to compress:
{content}
Compressed version:"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=target_tokens + 100
)
compressed = response.choices[0].message.content
compressed_tokens = self.counter.count(compressed)
return CompressionResult(
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
content=compressed,
compression_ratio=compressed_tokens / original_tokens
)
def compress_conversation(
self,
messages: list[dict],
target_tokens: int
) -> str:
"""Compress conversation history."""
# Format conversation
formatted = "\n".join([
f"{m['role'].upper()}: {m['content']}"
for m in messages
])
prompt = f"""Summarize this conversation, preserving:
- Key decisions and conclusions
- Important facts mentioned
- Current topic and context
- Any pending questions or tasks
Target length: approximately {int(target_tokens * 0.75)} words.
Conversation:
{formatted}
Summary:"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=target_tokens + 100
)
return response.choices[0].message.content
class ExtractiveCompressor:
"""Compress by extracting key sentences."""
def __init__(self, counter: TokenCounter):
self.counter = counter
def compress(
self,
content: str,
target_tokens: int
) -> CompressionResult:
"""Extract key sentences to fit budget."""
import re
original_tokens = self.counter.count(content)
if original_tokens <= target_tokens:
return CompressionResult(
original_tokens=original_tokens,
compressed_tokens=original_tokens,
content=content,
compression_ratio=1.0
)
# Split into sentences
sentences = re.split(r'(?<=[.!?])\s+', content)
# Score sentences by position and length
scored = []
for i, sent in enumerate(sentences):
# Prefer first and last sentences
position_score = 1.0
if i < 3:
position_score = 1.5
elif i >= len(sentences) - 2:
position_score = 1.3
# Prefer medium-length sentences
words = len(sent.split())
length_score = 1.0 if 10 <= words <= 30 else 0.8
scored.append((sent, position_score * length_score, i))
# Sort by score, keeping original order for ties
scored.sort(key=lambda x: (-x[1], x[2]))
# Select sentences within budget
selected = []
used_tokens = 0
for sent, _, orig_idx in scored:
sent_tokens = self.counter.count(sent)
if used_tokens + sent_tokens <= target_tokens:
selected.append((sent, orig_idx))
used_tokens += sent_tokens
# Restore original order
selected.sort(key=lambda x: x[1])
compressed = " ".join(s for s, _ in selected)
return CompressionResult(
original_tokens=original_tokens,
compressed_tokens=used_tokens,
content=compressed,
compression_ratio=used_tokens / original_tokens
)
Dynamic Context Assembly
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ContextSection:
"""A section of the context."""
name: str
content: str
priority: Priority
min_tokens: int = 0
max_tokens: Optional[int] = None
compressible: bool = True
@dataclass
class AssembledContext:
"""Assembled context ready for LLM."""
sections: dict[str, str]
total_tokens: int
budget_used: float
class DynamicContextAssembler:
"""Assemble context dynamically based on budget."""
def __init__(
self,
client,
counter: TokenCounter,
max_context: int = 128000
):
self.client = client
self.counter = counter
self.max_context = max_context
self.compressor = LLMCompressor(client, counter)
def assemble(
self,
sections: list[ContextSection],
output_reserve: int = 4096
) -> AssembledContext:
"""Assemble context from sections."""
budget = self.max_context - output_reserve
# Sort by priority
sorted_sections = sorted(
sections,
key=lambda x: x.priority.value,
reverse=True
)
# First pass: allocate minimum tokens
result = {}
used_tokens = 0
for section in sorted_sections:
tokens = self.counter.count(section.content)
if section.min_tokens > 0:
# Must include at least min_tokens
if tokens <= section.min_tokens:
result[section.name] = section.content
used_tokens += tokens
else:
# Compress to min_tokens
compressed = self.compressor.compress(
section.content,
section.min_tokens
)
result[section.name] = compressed.content
used_tokens += compressed.compressed_tokens
# Second pass: expand sections if budget allows
remaining = budget - used_tokens
for section in sorted_sections:
if section.name in result:
current_tokens = self.counter.count(result[section.name])
original_tokens = self.counter.count(section.content)
if current_tokens < original_tokens:
# Can we expand?
max_expand = section.max_tokens or original_tokens
expand_to = min(
max_expand,
current_tokens + remaining
)
if expand_to > current_tokens:
if expand_to >= original_tokens:
result[section.name] = section.content
remaining -= (original_tokens - current_tokens)
else:
compressed = self.compressor.compress(
section.content,
expand_to
)
remaining -= (compressed.compressed_tokens - current_tokens)
result[section.name] = compressed.content
else:
# Section not yet included
tokens = self.counter.count(section.content)
if tokens <= remaining:
result[section.name] = section.content
remaining -= tokens
elif section.compressible and remaining > 100:
compressed = self.compressor.compress(
section.content,
remaining
)
result[section.name] = compressed.content
remaining -= compressed.compressed_tokens
total_used = budget - remaining
return AssembledContext(
sections=result,
total_tokens=total_used,
budget_used=total_used / budget
)
def build_messages(
self,
assembled: AssembledContext,
system_prompt: str,
user_message: str
) -> list[dict]:
"""Build message list from assembled context."""
messages = [{"role": "system", "content": system_prompt}]
# Add context sections
if "conversation_summary" in assembled.sections:
messages.append({
"role": "system",
"content": f"Previous conversation summary:\n{assembled.sections['conversation_summary']}"
})
if "retrieved_context" in assembled.sections:
messages.append({
"role": "system",
"content": f"Relevant context:\n{assembled.sections['retrieved_context']}"
})
# Add conversation history if present
if "history" in assembled.sections:
# Parse history back into messages
# (simplified - in practice, store structured)
messages.append({
"role": "system",
"content": f"Recent conversation:\n{assembled.sections['history']}"
})
messages.append({"role": "user", "content": user_message})
return messages
Production Context Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
from openai import OpenAI
client = OpenAI()
counter = TokenCounter("gpt-4o")
allocator = BudgetAllocator("gpt-4o", max_context=128000)
assembler = DynamicContextAssembler(client, counter)
compressor = LLMCompressor(client, counter)
class ContextRequest(BaseModel):
system_prompt: str
user_message: str
conversation_history: list[dict] = []
retrieved_documents: list[str] = []
max_output_tokens: int = 4096
class CompressRequest(BaseModel):
content: str
target_tokens: int
preserve_key_info: bool = True
@app.post("/v1/context/build")
async def build_context(request: ContextRequest):
"""Build optimized context for LLM."""
# Allocate budget
budget = allocator.allocate(
system_prompt=request.system_prompt,
user_input=request.user_message
)
# Create context sections
sections = []
# System prompt (critical)
sections.append(ContextSection(
name="system",
content=request.system_prompt,
priority=Priority.CRITICAL,
compressible=False
))
# Conversation history
if request.conversation_history:
history_text = "\n".join([
f"{m['role']}: {m['content']}"
for m in request.conversation_history
])
sections.append(ContextSection(
name="history",
content=history_text,
priority=Priority.HIGH,
min_tokens=500,
max_tokens=budget.conversation_history
))
# Retrieved documents
if request.retrieved_documents:
docs_text = "\n\n---\n\n".join(request.retrieved_documents)
sections.append(ContextSection(
name="retrieved_context",
content=docs_text,
priority=Priority.MEDIUM,
min_tokens=200,
max_tokens=budget.retrieved_context
))
# Assemble context
assembled = assembler.assemble(
sections,
output_reserve=request.max_output_tokens
)
# Build messages
messages = assembler.build_messages(
assembled,
request.system_prompt,
request.user_message
)
return {
"messages": messages,
"total_tokens": assembled.total_tokens,
"budget_used": assembled.budget_used,
"sections_included": list(assembled.sections.keys())
}
@app.post("/v1/context/compress")
async def compress_content(request: CompressRequest):
"""Compress content to target tokens."""
result = compressor.compress(
request.content,
request.target_tokens,
request.preserve_key_info
)
return {
"original_tokens": result.original_tokens,
"compressed_tokens": result.compressed_tokens,
"compression_ratio": result.compression_ratio,
"content": result.content
}
@app.post("/v1/context/count")
async def count_tokens(content: str):
"""Count tokens in content."""
return {
"tokens": counter.count(content),
"model": counter.model
}
@app.get("/v1/context/budget")
async def get_budget(
system_tokens: int = 500,
input_tokens: int = 100
):
"""Get token budget allocation."""
# Create dummy content for budget calculation
budget = allocator.allocate(
system_prompt="x" * (system_tokens * 4), # Rough char estimate
user_input="x" * (input_tokens * 4)
)
return {
"total": budget.total,
"system_prompt": budget.system_prompt,
"conversation_history": budget.conversation_history,
"retrieved_context": budget.retrieved_context,
"user_input": budget.user_input,
"reserved_for_output": budget.reserved_for_output,
"available": budget.available
}
@app.get("/health")
async def health():
return {"status": "healthy", "model": counter.model}
References
- OpenAI Tokenizer: https://platform.openai.com/tokenizer
- tiktoken: https://github.com/openai/tiktoken
- LangChain Context: https://python.langchain.com/docs/modules/memory/
- Anthropic Context Windows: https://docs.anthropic.com/claude/docs/context-windows
Conclusion
Context window management is about making the most of limited space. Start with accurate token counting using tiktoken or model-specific tokenizers. Allocate budget across components based on their importance—system prompts and recent messages typically get priority. Implement content prioritization to select the most relevant information when you can't fit everything. Use compression techniques—both extractive (selecting key sentences) and abstractive (LLM summarization)—to fit more information in less space. Build dynamic context assembly that adapts to varying input sizes. The goal is maximizing information density while maintaining coherence—every token should contribute to better responses.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply