Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Context Window Optimization: Making Every Token Count in LLM Applications

Introduction: Context windows are the most valuable resource in LLM applications. Every token matters—waste space on irrelevant content and you lose room for information that could improve responses. Effective context window optimization means fitting the right information in the right amount of space. This guide covers practical strategies: prioritizing content by relevance, chunking documents intelligently, allocating tokens across different content types, and building systems that automatically optimize context for each request.

Context Window
Context Optimization: Content Prioritization, Smart Chunking, Token Allocation

Token Budget Management

from dataclasses import dataclass, field
from typing import Any, Optional
import tiktoken

@dataclass
class TokenBudget:
    """Token budget for context window."""
    
    total_limit: int
    system_prompt: int = 0
    conversation_history: int = 0
    retrieved_context: int = 0
    user_query: int = 0
    reserved_output: int = 1000
    
    @property
    def used(self) -> int:
        """Total tokens used."""
        return (
            self.system_prompt +
            self.conversation_history +
            self.retrieved_context +
            self.user_query
        )
    
    @property
    def available(self) -> int:
        """Available tokens for content."""
        return self.total_limit - self.reserved_output - self.used
    
    @property
    def utilization(self) -> float:
        """Context utilization percentage."""
        usable = self.total_limit - self.reserved_output
        return self.used / usable if usable > 0 else 0

class TokenCounter:
    """Count tokens for different models."""
    
    MODEL_LIMITS = {
        "gpt-4o": 128000,
        "gpt-4o-mini": 128000,
        "gpt-4-turbo": 128000,
        "gpt-3.5-turbo": 16385,
        "claude-3-5-sonnet": 200000,
        "claude-3-haiku": 200000,
        "gemini-1.5-pro": 1000000,
        "gemini-1.5-flash": 1000000
    }
    
    def __init__(self, model: str = "gpt-4o-mini"):
        self.model = model
        try:
            self.encoder = tiktoken.encoding_for_model(model)
        except KeyError:
            self.encoder = tiktoken.get_encoding("cl100k_base")
    
    def count(self, text: str) -> int:
        """Count tokens in text."""
        return len(self.encoder.encode(text))
    
    def count_messages(self, messages: list[dict]) -> int:
        """Count tokens in chat messages."""
        
        total = 0
        for message in messages:
            total += 4  # Message overhead
            total += self.count(message.get("content", ""))
            if message.get("name"):
                total += self.count(message["name"])
        
        total += 2  # Reply priming
        return total
    
    def get_limit(self, model: str = None) -> int:
        """Get context limit for model."""
        return self.MODEL_LIMITS.get(model or self.model, 8000)
    
    def truncate(self, text: str, max_tokens: int) -> str:
        """Truncate text to token limit."""
        
        tokens = self.encoder.encode(text)
        
        if len(tokens) <= max_tokens:
            return text
        
        return self.encoder.decode(tokens[:max_tokens])

class BudgetAllocator:
    """Allocate token budget across content types."""
    
    def __init__(self, counter: TokenCounter):
        self.counter = counter
    
    def allocate(
        self,
        model: str,
        system_prompt: str = None,
        conversation: list[dict] = None,
        query: str = None,
        reserved_output: int = 1000
    ) -> TokenBudget:
        """Create token budget allocation."""
        
        total_limit = self.counter.get_limit(model)
        
        budget = TokenBudget(
            total_limit=total_limit,
            reserved_output=reserved_output
        )
        
        if system_prompt:
            budget.system_prompt = self.counter.count(system_prompt)
        
        if conversation:
            budget.conversation_history = self.counter.count_messages(conversation)
        
        if query:
            budget.user_query = self.counter.count(query)
        
        return budget
    
    def allocate_proportional(
        self,
        budget: TokenBudget,
        content_types: dict[str, float]
    ) -> dict[str, int]:
        """Allocate available tokens proportionally."""
        
        available = budget.available
        total_weight = sum(content_types.values())
        
        allocations = {}
        for content_type, weight in content_types.items():
            allocations[content_type] = int(available * weight / total_weight)
        
        return allocations

Content Prioritization

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

@dataclass
class PrioritizedContent:
    """Content with priority score."""
    
    content: str
    score: float
    tokens: int
    source: str = None
    metadata: dict = None

class ContentPrioritizer:
    """Prioritize content by relevance."""
    
    def __init__(self, embedding_client: Any, model: str = "text-embedding-3-small"):
        self.embedding_client = embedding_client
        self.model = model
    
    async def prioritize_by_relevance(
        self,
        query: str,
        contents: list[str],
        counter: TokenCounter
    ) -> list[PrioritizedContent]:
        """Prioritize contents by relevance to query."""
        
        # Get embeddings
        all_texts = [query] + contents
        
        response = await self.embedding_client.embeddings.create(
            model=self.model,
            input=all_texts
        )
        
        query_emb = np.array(response.data[0].embedding)
        
        prioritized = []
        for i, content in enumerate(contents):
            content_emb = np.array(response.data[i + 1].embedding)
            
            # Cosine similarity
            similarity = np.dot(query_emb, content_emb) / (
                np.linalg.norm(query_emb) * np.linalg.norm(content_emb)
            )
            
            prioritized.append(PrioritizedContent(
                content=content,
                score=float(similarity),
                tokens=counter.count(content)
            ))
        
        # Sort by score descending
        prioritized.sort(key=lambda x: x.score, reverse=True)
        
        return prioritized
    
    def prioritize_by_recency(
        self,
        contents: list[dict],
        counter: TokenCounter
    ) -> list[PrioritizedContent]:
        """Prioritize by recency (for conversation history)."""
        
        prioritized = []
        n = len(contents)
        
        for i, item in enumerate(contents):
            # More recent = higher score
            recency_score = (i + 1) / n
            
            prioritized.append(PrioritizedContent(
                content=item.get("content", str(item)),
                score=recency_score,
                tokens=counter.count(item.get("content", str(item))),
                metadata={"index": i}
            ))
        
        return prioritized
    
    def prioritize_combined(
        self,
        contents: list[PrioritizedContent],
        relevance_weight: float = 0.7,
        recency_weight: float = 0.3
    ) -> list[PrioritizedContent]:
        """Combine relevance and recency scores."""
        
        # Normalize scores
        max_score = max(c.score for c in contents) if contents else 1
        
        for i, content in enumerate(contents):
            relevance = content.score / max_score if max_score > 0 else 0
            recency = (i + 1) / len(contents)
            
            content.score = (
                relevance * relevance_weight +
                recency * recency_weight
            )
        
        contents.sort(key=lambda x: x.score, reverse=True)
        return contents

class GreedySelector:
    """Select content greedily within token budget."""
    
    def select(
        self,
        prioritized: list[PrioritizedContent],
        max_tokens: int
    ) -> list[PrioritizedContent]:
        """Select highest priority content within budget."""
        
        selected = []
        used_tokens = 0
        
        for content in prioritized:
            if used_tokens + content.tokens <= max_tokens:
                selected.append(content)
                used_tokens += content.tokens
        
        return selected
    
    def select_with_minimum(
        self,
        prioritized: list[PrioritizedContent],
        max_tokens: int,
        min_items: int = 1
    ) -> list[PrioritizedContent]:
        """Select with minimum item guarantee."""
        
        if not prioritized:
            return []
        
        # Always include top items up to minimum
        selected = prioritized[:min_items]
        used_tokens = sum(c.tokens for c in selected)
        
        # Add more if budget allows
        for content in prioritized[min_items:]:
            if used_tokens + content.tokens <= max_tokens:
                selected.append(content)
                used_tokens += content.tokens
        
        return selected

Smart Chunking

from dataclasses import dataclass
from typing import Any, Optional
import re

@dataclass
class Chunk:
    """A chunk of content."""
    
    content: str
    tokens: int
    start_index: int = 0
    end_index: int = 0
    metadata: dict = None

class SmartChunker:
    """Chunk content intelligently."""
    
    def __init__(self, counter: TokenCounter):
        self.counter = counter
    
    def chunk_by_tokens(
        self,
        text: str,
        chunk_size: int = 500,
        overlap: int = 50
    ) -> list[Chunk]:
        """Chunk by token count with overlap."""
        
        tokens = self.counter.encoder.encode(text)
        chunks = []
        
        start = 0
        while start < len(tokens):
            end = min(start + chunk_size, len(tokens))
            
            chunk_tokens = tokens[start:end]
            chunk_text = self.counter.encoder.decode(chunk_tokens)
            
            chunks.append(Chunk(
                content=chunk_text,
                tokens=len(chunk_tokens),
                start_index=start,
                end_index=end
            ))
            
            start = end - overlap if end < len(tokens) else end
        
        return chunks
    
    def chunk_by_sentences(
        self,
        text: str,
        max_chunk_tokens: int = 500
    ) -> list[Chunk]:
        """Chunk by sentences, respecting token limit."""
        
        # Split into sentences
        sentences = re.split(r'(?<=[.!?])\s+', text)
        
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for sentence in sentences:
            sentence_tokens = self.counter.count(sentence)
            
            if current_tokens + sentence_tokens > max_chunk_tokens:
                if current_chunk:
                    chunk_text = ' '.join(current_chunk)
                    chunks.append(Chunk(
                        content=chunk_text,
                        tokens=current_tokens
                    ))
                
                current_chunk = [sentence]
                current_tokens = sentence_tokens
            else:
                current_chunk.append(sentence)
                current_tokens += sentence_tokens
        
        # Add remaining
        if current_chunk:
            chunk_text = ' '.join(current_chunk)
            chunks.append(Chunk(
                content=chunk_text,
                tokens=current_tokens
            ))
        
        return chunks
    
    def chunk_by_paragraphs(
        self,
        text: str,
        max_chunk_tokens: int = 500
    ) -> list[Chunk]:
        """Chunk by paragraphs."""
        
        paragraphs = text.split('\n\n')
        
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            
            para_tokens = self.counter.count(para)
            
            if para_tokens > max_chunk_tokens:
                # Paragraph too large, chunk by sentences
                if current_chunk:
                    chunk_text = '\n\n'.join(current_chunk)
                    chunks.append(Chunk(
                        content=chunk_text,
                        tokens=current_tokens
                    ))
                    current_chunk = []
                    current_tokens = 0
                
                # Add sentence chunks
                sentence_chunks = self.chunk_by_sentences(para, max_chunk_tokens)
                chunks.extend(sentence_chunks)
            
            elif current_tokens + para_tokens > max_chunk_tokens:
                if current_chunk:
                    chunk_text = '\n\n'.join(current_chunk)
                    chunks.append(Chunk(
                        content=chunk_text,
                        tokens=current_tokens
                    ))
                
                current_chunk = [para]
                current_tokens = para_tokens
            else:
                current_chunk.append(para)
                current_tokens += para_tokens
        
        if current_chunk:
            chunk_text = '\n\n'.join(current_chunk)
            chunks.append(Chunk(
                content=chunk_text,
                tokens=current_tokens
            ))
        
        return chunks
    
    def chunk_code(
        self,
        code: str,
        max_chunk_tokens: int = 500
    ) -> list[Chunk]:
        """Chunk code by logical units."""
        
        # Split by function/class definitions
        pattern = r'((?:def |class |async def )[^\n]+)'
        parts = re.split(pattern, code)
        
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for part in parts:
            if not part.strip():
                continue
            
            part_tokens = self.counter.count(part)
            
            if current_tokens + part_tokens > max_chunk_tokens:
                if current_chunk:
                    chunk_text = ''.join(current_chunk)
                    chunks.append(Chunk(
                        content=chunk_text,
                        tokens=current_tokens
                    ))
                
                current_chunk = [part]
                current_tokens = part_tokens
            else:
                current_chunk.append(part)
                current_tokens += part_tokens
        
        if current_chunk:
            chunk_text = ''.join(current_chunk)
            chunks.append(Chunk(
                content=chunk_text,
                tokens=current_tokens
            ))
        
        return chunks

Context Builder

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class BuiltContext:
    """Built context ready for LLM."""
    
    messages: list[dict]
    total_tokens: int
    budget: TokenBudget
    included_sources: list[str]

class ContextBuilder:
    """Build optimized context for LLM calls."""
    
    def __init__(
        self,
        counter: TokenCounter,
        prioritizer: ContentPrioritizer = None
    ):
        self.counter = counter
        self.prioritizer = prioritizer
        self.allocator = BudgetAllocator(counter)
        self.selector = GreedySelector()
    
    async def build(
        self,
        model: str,
        query: str,
        system_prompt: str = None,
        conversation: list[dict] = None,
        documents: list[str] = None,
        reserved_output: int = 1000
    ) -> BuiltContext:
        """Build optimized context."""
        
        # Create budget
        budget = self.allocator.allocate(
            model=model,
            system_prompt=system_prompt,
            conversation=conversation,
            query=query,
            reserved_output=reserved_output
        )
        
        messages = []
        included_sources = []
        
        # Add system prompt
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        
        # Add retrieved documents if available
        if documents and self.prioritizer:
            # Prioritize documents
            prioritized = await self.prioritizer.prioritize_by_relevance(
                query, documents, self.counter
            )
            
            # Select within budget
            selected = self.selector.select(prioritized, budget.available)
            
            if selected:
                context_text = "\n\n".join(c.content for c in selected)
                messages.append({
                    "role": "system",
                    "content": f"Relevant context:\n{context_text}"
                })
                
                budget.retrieved_context = sum(c.tokens for c in selected)
                included_sources = [c.source for c in selected if c.source]
        
        # Add conversation history
        if conversation:
            # Truncate if needed
            available_for_history = budget.available - budget.user_query
            
            truncated = self._truncate_conversation(
                conversation, available_for_history
            )
            messages.extend(truncated)
        
        # Add user query
        messages.append({"role": "user", "content": query})
        
        total_tokens = self.counter.count_messages(messages)
        
        return BuiltContext(
            messages=messages,
            total_tokens=total_tokens,
            budget=budget,
            included_sources=included_sources
        )
    
    def _truncate_conversation(
        self,
        conversation: list[dict],
        max_tokens: int
    ) -> list[dict]:
        """Truncate conversation to fit budget."""
        
        # Keep most recent messages
        result = []
        used_tokens = 0
        
        for message in reversed(conversation):
            msg_tokens = self.counter.count(message.get("content", "")) + 4
            
            if used_tokens + msg_tokens > max_tokens:
                break
            
            result.insert(0, message)
            used_tokens += msg_tokens
        
        return result

class AdaptiveContextBuilder:
    """Build context with adaptive allocation."""
    
    def __init__(
        self,
        counter: TokenCounter,
        embedding_client: Any
    ):
        self.counter = counter
        self.prioritizer = ContentPrioritizer(embedding_client)
        self.chunker = SmartChunker(counter)
    
    async def build_adaptive(
        self,
        model: str,
        query: str,
        system_prompt: str = None,
        conversation: list[dict] = None,
        documents: list[str] = None,
        allocation_strategy: str = "balanced"
    ) -> BuiltContext:
        """Build with adaptive token allocation."""
        
        total_limit = self.counter.get_limit(model)
        reserved_output = 1000
        available = total_limit - reserved_output
        
        # Calculate fixed costs
        system_tokens = self.counter.count(system_prompt) if system_prompt else 0
        query_tokens = self.counter.count(query)
        
        remaining = available - system_tokens - query_tokens
        
        # Allocate based on strategy
        if allocation_strategy == "context_heavy":
            # Prioritize retrieved context
            context_budget = int(remaining * 0.7)
            history_budget = int(remaining * 0.3)
        elif allocation_strategy == "history_heavy":
            # Prioritize conversation history
            context_budget = int(remaining * 0.3)
            history_budget = int(remaining * 0.7)
        else:
            # Balanced
            context_budget = int(remaining * 0.5)
            history_budget = int(remaining * 0.5)
        
        messages = []
        
        # System prompt
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        
        # Retrieved context
        if documents:
            context_content = await self._build_context_section(
                query, documents, context_budget
            )
            if context_content:
                messages.append({
                    "role": "system",
                    "content": f"Context:\n{context_content}"
                })
        
        # Conversation history
        if conversation:
            truncated = self._truncate_conversation(conversation, history_budget)
            messages.extend(truncated)
        
        # Query
        messages.append({"role": "user", "content": query})
        
        return BuiltContext(
            messages=messages,
            total_tokens=self.counter.count_messages(messages),
            budget=None,
            included_sources=[]
        )
    
    async def _build_context_section(
        self,
        query: str,
        documents: list[str],
        budget: int
    ) -> str:
        """Build context section within budget."""
        
        # Chunk documents
        all_chunks = []
        for doc in documents:
            chunks = self.chunker.chunk_by_paragraphs(doc, max_chunk_tokens=300)
            all_chunks.extend(chunks)
        
        # Prioritize chunks
        chunk_texts = [c.content for c in all_chunks]
        prioritized = await self.prioritizer.prioritize_by_relevance(
            query, chunk_texts, self.counter
        )
        
        # Select within budget
        selector = GreedySelector()
        selected = selector.select(prioritized, budget)
        
        return "\n\n".join(c.content for c in selected)
    
    def _truncate_conversation(
        self,
        conversation: list[dict],
        max_tokens: int
    ) -> list[dict]:
        """Truncate conversation to fit budget."""
        
        result = []
        used = 0
        
        for msg in reversed(conversation):
            tokens = self.counter.count(msg.get("content", "")) + 4
            if used + tokens > max_tokens:
                break
            result.insert(0, msg)
            used += tokens
        
        return result

Production Context Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
counter = TokenCounter()
context_builder = None  # Initialize with embedding client

class BuildContextRequest(BaseModel):
    model: str = "gpt-4o-mini"
    query: str
    system_prompt: Optional[str] = None
    conversation: Optional[list[dict]] = None
    documents: Optional[list[str]] = None
    reserved_output: int = 1000
    allocation_strategy: str = "balanced"

class CountTokensRequest(BaseModel):
    text: str
    model: str = "gpt-4o-mini"

class ChunkRequest(BaseModel):
    text: str
    method: str = "paragraphs"
    max_chunk_tokens: int = 500

@app.post("/v1/context/build")
async def build_context(request: BuildContextRequest):
    """Build optimized context."""
    
    result = await context_builder.build_adaptive(
        model=request.model,
        query=request.query,
        system_prompt=request.system_prompt,
        conversation=request.conversation,
        documents=request.documents,
        allocation_strategy=request.allocation_strategy
    )
    
    return {
        "messages": result.messages,
        "total_tokens": result.total_tokens,
        "model_limit": counter.get_limit(request.model),
        "utilization": result.total_tokens / counter.get_limit(request.model)
    }

@app.post("/v1/tokens/count")
async def count_tokens(request: CountTokensRequest):
    """Count tokens in text."""
    
    count = counter.count(request.text)
    limit = counter.get_limit(request.model)
    
    return {
        "tokens": count,
        "model": request.model,
        "limit": limit,
        "percentage": count / limit
    }

@app.post("/v1/tokens/truncate")
async def truncate_text(text: str, max_tokens: int, model: str = "gpt-4o-mini"):
    """Truncate text to token limit."""
    
    truncated = counter.truncate(text, max_tokens)
    
    return {
        "text": truncated,
        "original_tokens": counter.count(text),
        "truncated_tokens": counter.count(truncated)
    }

@app.post("/v1/chunk")
async def chunk_text(request: ChunkRequest):
    """Chunk text into smaller pieces."""
    
    chunker = SmartChunker(counter)
    
    if request.method == "tokens":
        chunks = chunker.chunk_by_tokens(request.text, request.max_chunk_tokens)
    elif request.method == "sentences":
        chunks = chunker.chunk_by_sentences(request.text, request.max_chunk_tokens)
    elif request.method == "paragraphs":
        chunks = chunker.chunk_by_paragraphs(request.text, request.max_chunk_tokens)
    elif request.method == "code":
        chunks = chunker.chunk_code(request.text, request.max_chunk_tokens)
    else:
        raise HTTPException(status_code=400, detail=f"Unknown method: {request.method}")
    
    return {
        "chunks": [
            {"content": c.content, "tokens": c.tokens}
            for c in chunks
        ],
        "total_chunks": len(chunks)
    }

@app.get("/v1/models")
async def list_models():
    """List supported models and their limits."""
    
    return {
        "models": [
            {"name": name, "limit": limit}
            for name, limit in TokenCounter.MODEL_LIMITS.items()
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Context window optimization is about making every token count. Start with accurate token counting using tiktoken to understand your actual usage. Create explicit token budgets that account for system prompts, conversation history, retrieved context, and reserved output space. Prioritize content by relevance to the query—not all context is equally valuable. Use smart chunking that respects semantic boundaries like sentences and paragraphs rather than arbitrary token splits. Implement greedy selection that picks the highest-value content within your budget. For complex applications, use adaptive allocation that adjusts the balance between conversation history and retrieved context based on the task. The key insight is that context window optimization is a constrained optimization problem—you're maximizing information value within a fixed token budget. Build systems that automatically make these tradeoffs so you get the best possible context for every request.