Introduction: Context windows are precious real estate. Every token you spend on context is a token you can’t use for output or additional information. Long prompts hit token limits, increase latency, and cost more money. Prompt compression techniques help you fit more information into less space without losing the signal that matters. This guide covers practical compression strategies: summarization that preserves key information, token pruning that removes redundancy, selective context that includes only relevant content, and building compression pipelines that automatically optimize prompts for your token budget.

Token Counting and Budgeting
from dataclasses import dataclass
from typing import Any, Optional
import tiktoken
@dataclass
class TokenBudget:
"""Token budget for a prompt."""
total_limit: int
system_tokens: int = 0
context_tokens: int = 0
query_tokens: int = 0
reserved_output: int = 1000
@property
def available_context(self) -> int:
"""Tokens available for context."""
return self.total_limit - self.system_tokens - self.query_tokens - self.reserved_output
@property
def remaining(self) -> int:
"""Remaining tokens after current usage."""
return self.available_context - self.context_tokens
class TokenCounter:
"""Count tokens for different models."""
def __init__(self, model: str = "gpt-4o-mini"):
self.model = model
self._encoders = {}
def _get_encoder(self, model: str):
"""Get encoder for model."""
if model not in self._encoders:
try:
self._encoders[model] = tiktoken.encoding_for_model(model)
except KeyError:
self._encoders[model] = tiktoken.get_encoding("cl100k_base")
return self._encoders[model]
def count(self, text: str, model: str = None) -> int:
"""Count tokens in text."""
encoder = self._get_encoder(model or self.model)
return len(encoder.encode(text))
def count_messages(self, messages: list[dict], model: str = None) -> int:
"""Count tokens in chat messages."""
encoder = self._get_encoder(model or self.model)
# Approximate message overhead
tokens = 0
for message in messages:
tokens += 4 # Message overhead
for key, value in message.items():
tokens += len(encoder.encode(str(value)))
tokens += 2 # Reply priming
return tokens
def truncate_to_limit(self, text: str, max_tokens: int, model: str = None) -> str:
"""Truncate text to token limit."""
encoder = self._get_encoder(model or self.model)
tokens = encoder.encode(text)
if len(tokens) <= max_tokens:
return text
truncated_tokens = tokens[:max_tokens]
return encoder.decode(truncated_tokens)
class BudgetManager:
"""Manage token budgets for prompts."""
def __init__(self, counter: TokenCounter):
self.counter = counter
# Model context limits
self.model_limits = {
"gpt-4o": 128000,
"gpt-4o-mini": 128000,
"gpt-4-turbo": 128000,
"gpt-3.5-turbo": 16385,
"claude-3-5-sonnet": 200000,
"claude-3-haiku": 200000
}
def create_budget(
self,
model: str,
system_prompt: str = None,
query: str = None,
reserved_output: int = 1000
) -> TokenBudget:
"""Create token budget for a request."""
total_limit = self.model_limits.get(model, 8000)
system_tokens = self.counter.count(system_prompt) if system_prompt else 0
query_tokens = self.counter.count(query) if query else 0
return TokenBudget(
total_limit=total_limit,
system_tokens=system_tokens,
query_tokens=query_tokens,
reserved_output=reserved_output
)
def allocate_context(
self,
budget: TokenBudget,
documents: list[str]
) -> list[str]:
"""Allocate context within budget."""
available = budget.available_context
selected = []
used = 0
for doc in documents:
doc_tokens = self.counter.count(doc)
if used + doc_tokens <= available:
selected.append(doc)
used += doc_tokens
else:
# Try to fit partial document
remaining = available - used
if remaining > 100: # Minimum useful size
truncated = self.counter.truncate_to_limit(doc, remaining)
selected.append(truncated)
break
return selected
Content Summarization
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class SummarizationResult:
"""Result of content summarization."""
original_tokens: int
compressed_tokens: int
content: str
compression_ratio: float
class ContentSummarizer:
"""Summarize content to reduce tokens."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
self.counter = TokenCounter(model)
async def summarize(
self,
content: str,
target_tokens: int,
preserve_key_info: bool = True
) -> SummarizationResult:
"""Summarize content to target token count."""
original_tokens = self.counter.count(content)
if original_tokens <= target_tokens:
return SummarizationResult(
original_tokens=original_tokens,
compressed_tokens=original_tokens,
content=content,
compression_ratio=1.0
)
# Calculate target word count (rough approximation)
target_words = int(target_tokens * 0.75)
prompt = f"""Summarize the following content in approximately {target_words} words.
Preserve the most important information, key facts, and main points.
{"Focus on information that would be relevant for answering questions." if preserve_key_info else ""}
Content:
{content}
Summary:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
summary = response.choices[0].message.content
compressed_tokens = self.counter.count(summary)
return SummarizationResult(
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
content=summary,
compression_ratio=compressed_tokens / original_tokens
)
async def extractive_summarize(
self,
content: str,
num_sentences: int = 5
) -> SummarizationResult:
"""Extract key sentences without rewriting."""
original_tokens = self.counter.count(content)
prompt = f"""Extract the {num_sentences} most important sentences from this content.
Return only the extracted sentences, one per line.
Do not modify or rewrite the sentences.
Content:
{content}
Key sentences:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
extracted = response.choices[0].message.content
compressed_tokens = self.counter.count(extracted)
return SummarizationResult(
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
content=extracted,
compression_ratio=compressed_tokens / original_tokens
)
async def hierarchical_summarize(
self,
documents: list[str],
target_tokens: int
) -> SummarizationResult:
"""Summarize multiple documents hierarchically."""
# First pass: summarize each document
summaries = []
total_original = 0
per_doc_target = target_tokens // len(documents)
for doc in documents:
total_original += self.counter.count(doc)
result = await self.summarize(doc, per_doc_target)
summaries.append(result.content)
# Combine summaries
combined = "\n\n".join(summaries)
combined_tokens = self.counter.count(combined)
# Second pass if still over budget
if combined_tokens > target_tokens:
final_result = await self.summarize(combined, target_tokens)
return SummarizationResult(
original_tokens=total_original,
compressed_tokens=final_result.compressed_tokens,
content=final_result.content,
compression_ratio=final_result.compressed_tokens / total_original
)
return SummarizationResult(
original_tokens=total_original,
compressed_tokens=combined_tokens,
content=combined,
compression_ratio=combined_tokens / total_original
)
Token Pruning
from dataclasses import dataclass
from typing import Any, Optional
import re
@dataclass
class PruningResult:
"""Result of token pruning."""
original_tokens: int
pruned_tokens: int
content: str
removed_elements: list[str]
class TokenPruner:
"""Prune tokens while preserving meaning."""
def __init__(self):
self.counter = TokenCounter()
def prune_whitespace(self, text: str) -> PruningResult:
"""Remove excessive whitespace."""
original_tokens = self.counter.count(text)
removed = []
# Normalize whitespace
pruned = re.sub(r'\n{3,}', '\n\n', text)
pruned = re.sub(r' {2,}', ' ', pruned)
pruned = re.sub(r'\t+', ' ', pruned)
if pruned != text:
removed.append("excessive whitespace")
pruned_tokens = self.counter.count(pruned)
return PruningResult(
original_tokens=original_tokens,
pruned_tokens=pruned_tokens,
content=pruned,
removed_elements=removed
)
def prune_formatting(self, text: str) -> PruningResult:
"""Remove unnecessary formatting."""
original_tokens = self.counter.count(text)
removed = []
pruned = text
# Remove markdown formatting
if '**' in pruned or '__' in pruned:
pruned = re.sub(r'\*\*(.+?)\*\*', r'\1', pruned)
pruned = re.sub(r'__(.+?)__', r'\1', pruned)
removed.append("bold formatting")
if '*' in pruned or '_' in pruned:
pruned = re.sub(r'\*(.+?)\*', r'\1', pruned)
pruned = re.sub(r'_(.+?)_', r'\1', pruned)
removed.append("italic formatting")
# Remove bullet points
if re.search(r'^[-*]\s', pruned, re.MULTILINE):
pruned = re.sub(r'^[-*]\s+', '', pruned, flags=re.MULTILINE)
removed.append("bullet points")
# Remove numbered lists formatting
pruned = re.sub(r'^\d+\.\s+', '', pruned, flags=re.MULTILINE)
pruned_tokens = self.counter.count(pruned)
return PruningResult(
original_tokens=original_tokens,
pruned_tokens=pruned_tokens,
content=pruned,
removed_elements=removed
)
def prune_redundancy(self, text: str) -> PruningResult:
"""Remove redundant phrases."""
original_tokens = self.counter.count(text)
removed = []
pruned = text
# Common redundant phrases
redundant_phrases = [
(r'\b(very|really|extremely|quite|rather)\s+', ''),
(r'\b(in order to)\b', 'to'),
(r'\b(due to the fact that)\b', 'because'),
(r'\b(at this point in time)\b', 'now'),
(r'\b(in the event that)\b', 'if'),
(r'\b(for the purpose of)\b', 'to'),
(r'\b(in spite of the fact that)\b', 'although'),
(r'\b(it is important to note that)\b', ''),
(r'\b(it should be noted that)\b', ''),
(r'\b(as a matter of fact)\b', ''),
]
for pattern, replacement in redundant_phrases:
if re.search(pattern, pruned, re.IGNORECASE):
pruned = re.sub(pattern, replacement, pruned, flags=re.IGNORECASE)
removed.append(f"redundant phrase: {pattern}")
pruned_tokens = self.counter.count(pruned)
return PruningResult(
original_tokens=original_tokens,
pruned_tokens=pruned_tokens,
content=pruned,
removed_elements=removed
)
def prune_all(self, text: str) -> PruningResult:
"""Apply all pruning strategies."""
original_tokens = self.counter.count(text)
all_removed = []
# Apply in sequence
result = self.prune_whitespace(text)
all_removed.extend(result.removed_elements)
result = self.prune_formatting(result.content)
all_removed.extend(result.removed_elements)
result = self.prune_redundancy(result.content)
all_removed.extend(result.removed_elements)
return PruningResult(
original_tokens=original_tokens,
pruned_tokens=result.pruned_tokens,
content=result.content,
removed_elements=all_removed
)
class SelectivePruner:
"""Selectively prune based on relevance."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
self.counter = TokenCounter()
async def prune_irrelevant(
self,
content: str,
query: str,
target_tokens: int
) -> PruningResult:
"""Remove content irrelevant to query."""
original_tokens = self.counter.count(content)
if original_tokens <= target_tokens:
return PruningResult(
original_tokens=original_tokens,
pruned_tokens=original_tokens,
content=content,
removed_elements=[]
)
prompt = f"""Given this query, remove sentences from the content that are not relevant to answering it.
Keep only the information needed to answer the query.
Target approximately {int(target_tokens * 0.75)} words.
Query: {query}
Content:
{content}
Relevant content only:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
pruned = response.choices[0].message.content
pruned_tokens = self.counter.count(pruned)
return PruningResult(
original_tokens=original_tokens,
pruned_tokens=pruned_tokens,
content=pruned,
removed_elements=["irrelevant sentences"]
)
Compression Pipeline
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class CompressionStrategy(Enum):
"""Compression strategies."""
TRUNCATE = "truncate"
SUMMARIZE = "summarize"
PRUNE = "prune"
SELECTIVE = "selective"
HYBRID = "hybrid"
@dataclass
class CompressionResult:
"""Result of compression pipeline."""
original_tokens: int
final_tokens: int
content: str
strategy_used: CompressionStrategy
compression_ratio: float
stages_applied: list[str]
class CompressionPipeline:
"""Pipeline for compressing prompts."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini"
):
self.client = client
self.model = model
self.counter = TokenCounter(model)
self.pruner = TokenPruner()
self.summarizer = ContentSummarizer(client, model)
self.selective_pruner = SelectivePruner(client, model)
async def compress(
self,
content: str,
target_tokens: int,
query: str = None,
strategy: CompressionStrategy = CompressionStrategy.HYBRID
) -> CompressionResult:
"""Compress content to target token count."""
original_tokens = self.counter.count(content)
stages = []
if original_tokens <= target_tokens:
return CompressionResult(
original_tokens=original_tokens,
final_tokens=original_tokens,
content=content,
strategy_used=strategy,
compression_ratio=1.0,
stages_applied=[]
)
current_content = content
if strategy == CompressionStrategy.TRUNCATE:
current_content = self.counter.truncate_to_limit(content, target_tokens)
stages.append("truncation")
elif strategy == CompressionStrategy.SUMMARIZE:
result = await self.summarizer.summarize(content, target_tokens)
current_content = result.content
stages.append("summarization")
elif strategy == CompressionStrategy.PRUNE:
result = self.pruner.prune_all(content)
current_content = result.content
stages.append("pruning")
# Truncate if still over
if self.counter.count(current_content) > target_tokens:
current_content = self.counter.truncate_to_limit(current_content, target_tokens)
stages.append("truncation")
elif strategy == CompressionStrategy.SELECTIVE:
if query:
result = await self.selective_pruner.prune_irrelevant(
content, query, target_tokens
)
current_content = result.content
stages.append("selective pruning")
else:
result = await self.summarizer.summarize(content, target_tokens)
current_content = result.content
stages.append("summarization")
elif strategy == CompressionStrategy.HYBRID:
# Stage 1: Basic pruning
prune_result = self.pruner.prune_all(content)
current_content = prune_result.content
stages.append("basic pruning")
current_tokens = self.counter.count(current_content)
# Stage 2: Selective pruning if query provided
if current_tokens > target_tokens and query:
selective_result = await self.selective_pruner.prune_irrelevant(
current_content, query, target_tokens
)
current_content = selective_result.content
stages.append("selective pruning")
current_tokens = self.counter.count(current_content)
# Stage 3: Summarization if still over
if current_tokens > target_tokens:
summary_result = await self.summarizer.summarize(
current_content, target_tokens
)
current_content = summary_result.content
stages.append("summarization")
current_tokens = self.counter.count(current_content)
# Stage 4: Truncation as last resort
if current_tokens > target_tokens:
current_content = self.counter.truncate_to_limit(
current_content, target_tokens
)
stages.append("truncation")
final_tokens = self.counter.count(current_content)
return CompressionResult(
original_tokens=original_tokens,
final_tokens=final_tokens,
content=current_content,
strategy_used=strategy,
compression_ratio=final_tokens / original_tokens,
stages_applied=stages
)
async def compress_documents(
self,
documents: list[str],
total_budget: int,
query: str = None
) -> list[CompressionResult]:
"""Compress multiple documents within budget."""
# Calculate per-document budget
per_doc_budget = total_budget // len(documents)
results = []
for doc in documents:
result = await self.compress(
doc,
per_doc_budget,
query,
CompressionStrategy.HYBRID
)
results.append(result)
return results
Production Compression Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
counter = TokenCounter()
budget_manager = BudgetManager(counter)
pruner = TokenPruner()
compression_pipeline = None # Initialize with client
class CountRequest(BaseModel):
text: str
model: str = "gpt-4o-mini"
class BudgetRequest(BaseModel):
model: str
system_prompt: Optional[str] = None
query: Optional[str] = None
reserved_output: int = 1000
class CompressRequest(BaseModel):
content: str
target_tokens: int
query: Optional[str] = None
strategy: str = "hybrid"
class CompressDocumentsRequest(BaseModel):
documents: list[str]
total_budget: int
query: Optional[str] = None
@app.post("/v1/tokens/count")
async def count_tokens(request: CountRequest):
"""Count tokens in text."""
count = counter.count(request.text, request.model)
return {
"tokens": count,
"model": request.model
}
@app.post("/v1/tokens/budget")
async def create_budget(request: BudgetRequest):
"""Create token budget."""
budget = budget_manager.create_budget(
request.model,
request.system_prompt,
request.query,
request.reserved_output
)
return {
"total_limit": budget.total_limit,
"system_tokens": budget.system_tokens,
"query_tokens": budget.query_tokens,
"reserved_output": budget.reserved_output,
"available_context": budget.available_context
}
@app.post("/v1/compress")
async def compress_content(request: CompressRequest):
"""Compress content to target tokens."""
try:
strategy = CompressionStrategy(request.strategy)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unknown strategy: {request.strategy}")
result = await compression_pipeline.compress(
request.content,
request.target_tokens,
request.query,
strategy
)
return {
"content": result.content,
"original_tokens": result.original_tokens,
"final_tokens": result.final_tokens,
"compression_ratio": result.compression_ratio,
"strategy": result.strategy_used.value,
"stages": result.stages_applied
}
@app.post("/v1/compress/documents")
async def compress_documents(request: CompressDocumentsRequest):
"""Compress multiple documents."""
results = await compression_pipeline.compress_documents(
request.documents,
request.total_budget,
request.query
)
return {
"documents": [
{
"content": r.content,
"original_tokens": r.original_tokens,
"final_tokens": r.final_tokens,
"compression_ratio": r.compression_ratio
}
for r in results
],
"total_original": sum(r.original_tokens for r in results),
"total_final": sum(r.final_tokens for r in results)
}
@app.post("/v1/prune")
async def prune_content(text: str):
"""Prune content without LLM."""
result = pruner.prune_all(text)
return {
"content": result.content,
"original_tokens": result.original_tokens,
"pruned_tokens": result.pruned_tokens,
"removed": result.removed_elements
}
@app.post("/v1/truncate")
async def truncate_content(text: str, max_tokens: int, model: str = "gpt-4o-mini"):
"""Truncate content to token limit."""
truncated = counter.truncate_to_limit(text, max_tokens, model)
return {
"content": truncated,
"original_tokens": counter.count(text, model),
"final_tokens": counter.count(truncated, model)
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- tiktoken: https://github.com/openai/tiktoken
- LLMLingua: https://github.com/microsoft/LLMLingua
- LangChain Context Compression: https://python.langchain.com/docs/modules/data_connection/retrievers/contextual_compression/
- OpenAI Tokenizer: https://platform.openai.com/tokenizer
Conclusion
Prompt compression is essential for building cost-effective and responsive LLM applications. Start with accurate token counting using tiktoken to understand your actual usage. Create token budgets that account for system prompts, queries, and reserved output space. Apply compression in stages: basic pruning removes whitespace and formatting without changing meaning, selective pruning removes content irrelevant to the query, summarization condenses information while preserving key points, and truncation serves as a last resort. Hybrid strategies that combine these approaches typically achieve the best results—prune first to remove noise, then selectively compress based on relevance, and summarize only when necessary. For production systems, monitor compression ratios and quality to ensure you're not losing critical information. The goal is fitting more useful context into your token budget while maintaining the signal that helps the model generate accurate responses.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply