Introduction: Context window limits are one of the most frustrating constraints when building LLM applications. You have a 100-page document but only 8K tokens of context. You want to include conversation history but it’s eating into your prompt budget. Context compression techniques solve this by reducing the token count while preserving the information that matters. This isn’t just about summarization—it’s about intelligent extraction, selective pruning, and efficient encoding that maintains semantic fidelity. The best compression strategies are task-aware: they know what information the model needs and ruthlessly eliminate everything else. This guide covers practical techniques from simple truncation to sophisticated extractive compression, helping you fit more relevant context into limited token budgets while maintaining output quality.

Basic Compression Strategies
from dataclasses import dataclass, field
from typing import Any, Optional, List, Callable
from enum import Enum
import re
class CompressionStrategy(Enum):
"""Compression strategy types."""
TRUNCATE = "truncate"
SUMMARIZE = "summarize"
EXTRACT = "extract"
HYBRID = "hybrid"
@dataclass
class CompressionResult:
"""Result of context compression."""
original_text: str
compressed_text: str
original_tokens: int
compressed_tokens: int
compression_ratio: float
strategy: CompressionStrategy
metadata: dict = field(default_factory=dict)
class TokenCounter:
"""Count tokens for text."""
def __init__(self, model: str = "gpt-4"):
self.model = model
self._encoder = None
@property
def encoder(self):
if self._encoder is None:
import tiktoken
self._encoder = tiktoken.encoding_for_model(self.model)
return self._encoder
def count(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoder.encode(text))
def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
"""Truncate text to max tokens."""
tokens = self.encoder.encode(text)
if len(tokens) <= max_tokens:
return text
return self.encoder.decode(tokens[:max_tokens])
class TruncationCompressor:
"""Simple truncation-based compression."""
def __init__(self, token_counter: TokenCounter):
self.counter = token_counter
def compress(
self,
text: str,
max_tokens: int,
strategy: str = "end"
) -> CompressionResult:
"""Compress by truncation."""
original_tokens = self.counter.count(text)
if original_tokens <= max_tokens:
return CompressionResult(
original_text=text,
compressed_text=text,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
compression_ratio=1.0,
strategy=CompressionStrategy.TRUNCATE
)
if strategy == "end":
compressed = self._truncate_end(text, max_tokens)
elif strategy == "start":
compressed = self._truncate_start(text, max_tokens)
elif strategy == "middle":
compressed = self._truncate_middle(text, max_tokens)
else:
compressed = self._truncate_end(text, max_tokens)
compressed_tokens = self.counter.count(compressed)
return CompressionResult(
original_text=text,
compressed_text=compressed,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.TRUNCATE
)
def _truncate_end(self, text: str, max_tokens: int) -> str:
"""Keep beginning, truncate end."""
return self.counter.truncate_to_tokens(text, max_tokens)
def _truncate_start(self, text: str, max_tokens: int) -> str:
"""Keep end, truncate beginning."""
tokens = self.counter.encoder.encode(text)
kept = tokens[-max_tokens:]
return self.counter.encoder.decode(kept)
def _truncate_middle(self, text: str, max_tokens: int) -> str:
"""Keep beginning and end, truncate middle."""
tokens = self.counter.encoder.encode(text)
half = max_tokens // 2
start = tokens[:half]
end = tokens[-half:]
start_text = self.counter.encoder.decode(start)
end_text = self.counter.encoder.decode(end)
return f"{start_text}\n\n[...content truncated...]\n\n{end_text}"
class SentenceCompressor:
"""Compress by selecting important sentences."""
def __init__(self, token_counter: TokenCounter):
self.counter = token_counter
def compress(
self,
text: str,
max_tokens: int,
query: str = None
) -> CompressionResult:
"""Compress by selecting sentences."""
original_tokens = self.counter.count(text)
if original_tokens <= max_tokens:
return CompressionResult(
original_text=text,
compressed_text=text,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
compression_ratio=1.0,
strategy=CompressionStrategy.EXTRACT
)
# Split into sentences
sentences = self._split_sentences(text)
# Score sentences
if query:
scored = self._score_by_relevance(sentences, query)
else:
scored = self._score_by_position(sentences)
# Select sentences within budget
selected = self._select_within_budget(scored, max_tokens)
# Reconstruct in original order
compressed = self._reconstruct(sentences, selected)
compressed_tokens = self.counter.count(compressed)
return CompressionResult(
original_text=text,
compressed_text=compressed,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.EXTRACT
)
def _split_sentences(self, text: str) -> list[str]:
"""Split text into sentences."""
# Simple sentence splitting
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
def _score_by_relevance(
self,
sentences: list[str],
query: str
) -> list[tuple[int, str, float]]:
"""Score sentences by relevance to query."""
query_words = set(query.lower().split())
scored = []
for i, sentence in enumerate(sentences):
sentence_words = set(sentence.lower().split())
overlap = len(query_words & sentence_words)
score = overlap / max(len(query_words), 1)
scored.append((i, sentence, score))
return scored
def _score_by_position(
self,
sentences: list[str]
) -> list[tuple[int, str, float]]:
"""Score sentences by position (beginning and end higher)."""
n = len(sentences)
scored = []
for i, sentence in enumerate(sentences):
# U-shaped scoring: high at start and end
if i < n * 0.2: # First 20%
score = 1.0 - (i / (n * 0.2)) * 0.3
elif i > n * 0.8: # Last 20%
score = 0.7 + ((i - n * 0.8) / (n * 0.2)) * 0.3
else: # Middle
score = 0.5
scored.append((i, sentence, score))
return scored
def _select_within_budget(
self,
scored: list[tuple[int, str, float]],
max_tokens: int
) -> set[int]:
"""Select sentences within token budget."""
# Sort by score descending
sorted_scored = sorted(scored, key=lambda x: x[2], reverse=True)
selected = set()
current_tokens = 0
for idx, sentence, score in sorted_scored:
sentence_tokens = self.counter.count(sentence)
if current_tokens + sentence_tokens <= max_tokens:
selected.add(idx)
current_tokens += sentence_tokens
return selected
def _reconstruct(
self,
sentences: list[str],
selected: set[int]
) -> str:
"""Reconstruct text from selected sentences."""
result = []
for i, sentence in enumerate(sentences):
if i in selected:
result.append(sentence)
return " ".join(result)
LLM-Based Summarization
from dataclasses import dataclass
from typing import Any, Optional, List
import asyncio
class LLMSummarizer:
"""Use LLM to summarize context."""
def __init__(self, llm_client: Any, token_counter: TokenCounter):
self.llm = llm_client
self.counter = token_counter
async def summarize(
self,
text: str,
max_tokens: int,
focus: str = None
) -> CompressionResult:
"""Summarize text using LLM."""
original_tokens = self.counter.count(text)
if original_tokens <= max_tokens:
return CompressionResult(
original_text=text,
compressed_text=text,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
compression_ratio=1.0,
strategy=CompressionStrategy.SUMMARIZE
)
# Calculate target length
target_words = max_tokens * 0.75 # Rough token to word ratio
prompt = self._build_prompt(text, target_words, focus)
response = await self.llm.generate(prompt)
compressed = response.strip()
# Ensure within budget
compressed_tokens = self.counter.count(compressed)
if compressed_tokens > max_tokens:
compressed = self.counter.truncate_to_tokens(compressed, max_tokens)
compressed_tokens = max_tokens
return CompressionResult(
original_text=text,
compressed_text=compressed,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.SUMMARIZE
)
def _build_prompt(
self,
text: str,
target_words: float,
focus: str = None
) -> str:
"""Build summarization prompt."""
focus_instruction = ""
if focus:
focus_instruction = f"\nFocus on information relevant to: {focus}"
return f"""Summarize the following text in approximately {int(target_words)} words.
Preserve key facts, names, numbers, and important details.
Write in a clear, concise style.{focus_instruction}
Text to summarize:
{text}
Summary:"""
class HierarchicalSummarizer:
"""Summarize long documents hierarchically."""
def __init__(
self,
llm_client: Any,
token_counter: TokenCounter,
chunk_size: int = 2000
):
self.llm = llm_client
self.counter = token_counter
self.chunk_size = chunk_size
async def summarize(
self,
text: str,
max_tokens: int,
focus: str = None
) -> CompressionResult:
"""Hierarchically summarize long text."""
original_tokens = self.counter.count(text)
if original_tokens <= max_tokens:
return CompressionResult(
original_text=text,
compressed_text=text,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
compression_ratio=1.0,
strategy=CompressionStrategy.SUMMARIZE
)
# Split into chunks
chunks = self._split_into_chunks(text)
# Summarize each chunk
chunk_summaries = await self._summarize_chunks(chunks, focus)
# Combine summaries
combined = "\n\n".join(chunk_summaries)
# If still too long, summarize again
if self.counter.count(combined) > max_tokens:
final_summary = await self._final_summarize(combined, max_tokens, focus)
else:
final_summary = combined
compressed_tokens = self.counter.count(final_summary)
return CompressionResult(
original_text=text,
compressed_text=final_summary,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.SUMMARIZE,
metadata={"chunks": len(chunks)}
)
def _split_into_chunks(self, text: str) -> list[str]:
"""Split text into chunks."""
chunks = []
current_chunk = []
current_tokens = 0
paragraphs = text.split("\n\n")
for para in paragraphs:
para_tokens = self.counter.count(para)
if current_tokens + para_tokens > self.chunk_size:
if current_chunk:
chunks.append("\n\n".join(current_chunk))
current_chunk = [para]
current_tokens = para_tokens
else:
current_chunk.append(para)
current_tokens += para_tokens
if current_chunk:
chunks.append("\n\n".join(current_chunk))
return chunks
async def _summarize_chunks(
self,
chunks: list[str],
focus: str = None
) -> list[str]:
"""Summarize all chunks."""
tasks = []
for chunk in chunks:
task = self._summarize_chunk(chunk, focus)
tasks.append(task)
return await asyncio.gather(*tasks)
async def _summarize_chunk(
self,
chunk: str,
focus: str = None
) -> str:
"""Summarize a single chunk."""
focus_instruction = ""
if focus:
focus_instruction = f"\nFocus on: {focus}"
prompt = f"""Summarize this text concisely, preserving key information:{focus_instruction}
{chunk}
Summary:"""
return await self.llm.generate(prompt)
async def _final_summarize(
self,
text: str,
max_tokens: int,
focus: str = None
) -> str:
"""Final summarization pass."""
target_words = max_tokens * 0.75
focus_instruction = ""
if focus:
focus_instruction = f"\nFocus on: {focus}"
prompt = f"""Create a final summary in approximately {int(target_words)} words.
Combine and synthesize the key points.{focus_instruction}
{text}
Final Summary:"""
response = await self.llm.generate(prompt)
# Ensure within budget
if self.counter.count(response) > max_tokens:
response = self.counter.truncate_to_tokens(response, max_tokens)
return response
class QueryFocusedSummarizer:
"""Summarize with focus on answering a query."""
def __init__(self, llm_client: Any, token_counter: TokenCounter):
self.llm = llm_client
self.counter = token_counter
async def summarize(
self,
text: str,
query: str,
max_tokens: int
) -> CompressionResult:
"""Summarize focusing on query-relevant information."""
original_tokens = self.counter.count(text)
target_words = max_tokens * 0.75
prompt = f"""Extract and summarize information from the text that is relevant to answering this question:
Question: {query}
Text:
{text}
Provide a focused summary (approximately {int(target_words)} words) containing only information relevant to the question:"""
response = await self.llm.generate(prompt)
compressed = response.strip()
if self.counter.count(compressed) > max_tokens:
compressed = self.counter.truncate_to_tokens(compressed, max_tokens)
compressed_tokens = self.counter.count(compressed)
return CompressionResult(
original_text=text,
compressed_text=compressed,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.SUMMARIZE,
metadata={"query": query}
)
Extractive Compression
from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np
class EmbeddingExtractor:
"""Extract relevant content using embeddings."""
def __init__(
self,
embedding_model: Any,
token_counter: TokenCounter
):
self.embedder = embedding_model
self.counter = token_counter
async def extract(
self,
text: str,
query: str,
max_tokens: int
) -> CompressionResult:
"""Extract query-relevant content."""
original_tokens = self.counter.count(text)
if original_tokens <= max_tokens:
return CompressionResult(
original_text=text,
compressed_text=text,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
compression_ratio=1.0,
strategy=CompressionStrategy.EXTRACT
)
# Split into chunks
chunks = self._split_into_chunks(text)
# Embed query and chunks
query_embedding = await self.embedder.embed(query)
chunk_embeddings = await self.embedder.embed_batch([c for c, _ in chunks])
# Score chunks by similarity
scores = self._calculate_similarities(query_embedding, chunk_embeddings)
# Select top chunks within budget
selected = self._select_chunks(chunks, scores, max_tokens)
compressed = "\n\n".join(selected)
compressed_tokens = self.counter.count(compressed)
return CompressionResult(
original_text=text,
compressed_text=compressed,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.EXTRACT
)
def _split_into_chunks(
self,
text: str,
chunk_size: int = 200
) -> list[tuple[str, int]]:
"""Split text into chunks with positions."""
chunks = []
paragraphs = text.split("\n\n")
for i, para in enumerate(paragraphs):
if para.strip():
chunks.append((para.strip(), i))
return chunks
def _calculate_similarities(
self,
query_embedding: np.ndarray,
chunk_embeddings: list[np.ndarray]
) -> list[float]:
"""Calculate cosine similarities."""
similarities = []
for chunk_emb in chunk_embeddings:
sim = np.dot(query_embedding, chunk_emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(chunk_emb)
)
similarities.append(float(sim))
return similarities
def _select_chunks(
self,
chunks: list[tuple[str, int]],
scores: list[float],
max_tokens: int
) -> list[str]:
"""Select chunks within token budget."""
# Sort by score
scored = list(zip(chunks, scores))
scored.sort(key=lambda x: x[1], reverse=True)
selected = []
current_tokens = 0
for (chunk, pos), score in scored:
chunk_tokens = self.counter.count(chunk)
if current_tokens + chunk_tokens <= max_tokens:
selected.append((chunk, pos))
current_tokens += chunk_tokens
# Sort by original position
selected.sort(key=lambda x: x[1])
return [chunk for chunk, _ in selected]
class KeyphraseExtractor:
"""Extract key phrases and sentences."""
def __init__(self, token_counter: TokenCounter):
self.counter = token_counter
def extract(
self,
text: str,
max_tokens: int
) -> CompressionResult:
"""Extract key phrases."""
original_tokens = self.counter.count(text)
if original_tokens <= max_tokens:
return CompressionResult(
original_text=text,
compressed_text=text,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
compression_ratio=1.0,
strategy=CompressionStrategy.EXTRACT
)
# Extract key sentences
sentences = self._split_sentences(text)
scored = self._score_sentences(sentences)
# Select within budget
selected = self._select_within_budget(scored, max_tokens)
compressed = " ".join(selected)
compressed_tokens = self.counter.count(compressed)
return CompressionResult(
original_text=text,
compressed_text=compressed,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.EXTRACT
)
def _split_sentences(self, text: str) -> list[str]:
"""Split into sentences."""
import re
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
def _score_sentences(
self,
sentences: list[str]
) -> list[tuple[str, float, int]]:
"""Score sentences by importance."""
# Calculate word frequencies
all_words = []
for sentence in sentences:
words = sentence.lower().split()
all_words.extend(words)
word_freq = {}
for word in all_words:
word_freq[word] = word_freq.get(word, 0) + 1
# Score sentences
scored = []
for i, sentence in enumerate(sentences):
words = sentence.lower().split()
if not words:
continue
# TF-based score
score = sum(word_freq.get(w, 0) for w in words) / len(words)
# Position bonus
if i < len(sentences) * 0.2:
score *= 1.2
elif i > len(sentences) * 0.8:
score *= 1.1
# Length penalty for very short sentences
if len(words) < 5:
score *= 0.8
scored.append((sentence, score, i))
return scored
def _select_within_budget(
self,
scored: list[tuple[str, float, int]],
max_tokens: int
) -> list[str]:
"""Select sentences within budget."""
# Sort by score
sorted_scored = sorted(scored, key=lambda x: x[1], reverse=True)
selected = []
current_tokens = 0
for sentence, score, pos in sorted_scored:
tokens = self.counter.count(sentence)
if current_tokens + tokens <= max_tokens:
selected.append((sentence, pos))
current_tokens += tokens
# Sort by position
selected.sort(key=lambda x: x[1])
return [s for s, _ in selected]
class LLMExtractor:
"""Use LLM to extract key information."""
def __init__(self, llm_client: Any, token_counter: TokenCounter):
self.llm = llm_client
self.counter = token_counter
async def extract(
self,
text: str,
max_tokens: int,
extraction_type: str = "key_points"
) -> CompressionResult:
"""Extract key information using LLM."""
original_tokens = self.counter.count(text)
target_words = max_tokens * 0.75
if extraction_type == "key_points":
prompt = self._key_points_prompt(text, target_words)
elif extraction_type == "facts":
prompt = self._facts_prompt(text, target_words)
elif extraction_type == "entities":
prompt = self._entities_prompt(text, target_words)
else:
prompt = self._key_points_prompt(text, target_words)
response = await self.llm.generate(prompt)
compressed = response.strip()
if self.counter.count(compressed) > max_tokens:
compressed = self.counter.truncate_to_tokens(compressed, max_tokens)
compressed_tokens = self.counter.count(compressed)
return CompressionResult(
original_text=text,
compressed_text=compressed,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.EXTRACT,
metadata={"extraction_type": extraction_type}
)
def _key_points_prompt(self, text: str, target_words: float) -> str:
return f"""Extract the key points from this text in approximately {int(target_words)} words.
List the most important information as concise bullet points.
Text:
{text}
Key Points:"""
def _facts_prompt(self, text: str, target_words: float) -> str:
return f"""Extract factual information from this text in approximately {int(target_words)} words.
Include names, dates, numbers, and verifiable facts.
Text:
{text}
Facts:"""
def _entities_prompt(self, text: str, target_words: float) -> str:
return f"""Extract named entities and their relationships from this text in approximately {int(target_words)} words.
Include people, organizations, locations, and their connections.
Text:
{text}
Entities and Relationships:"""
Hybrid Compression Pipeline
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import asyncio
@dataclass
class CompressionConfig:
"""Configuration for compression pipeline."""
max_tokens: int
strategy: CompressionStrategy = CompressionStrategy.HYBRID
query: str = None
preserve_structure: bool = True
min_compression_ratio: float = 0.1
class HybridCompressor:
"""Combine multiple compression strategies."""
def __init__(
self,
llm_client: Any,
embedding_model: Any,
token_counter: TokenCounter
):
self.llm = llm_client
self.embedder = embedding_model
self.counter = token_counter
# Initialize compressors
self.truncator = TruncationCompressor(token_counter)
self.sentence_compressor = SentenceCompressor(token_counter)
self.summarizer = LLMSummarizer(llm_client, token_counter)
self.extractor = EmbeddingExtractor(embedding_model, token_counter)
async def compress(
self,
text: str,
config: CompressionConfig
) -> CompressionResult:
"""Compress using hybrid strategy."""
original_tokens = self.counter.count(text)
if original_tokens <= config.max_tokens:
return CompressionResult(
original_text=text,
compressed_text=text,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
compression_ratio=1.0,
strategy=config.strategy
)
# Determine compression approach based on ratio needed
target_ratio = config.max_tokens / original_tokens
if target_ratio > 0.7:
# Light compression: sentence selection
result = self.sentence_compressor.compress(
text, config.max_tokens, config.query
)
elif target_ratio > 0.3:
# Medium compression: extractive + light summarization
result = await self._medium_compression(text, config)
else:
# Heavy compression: full summarization
result = await self._heavy_compression(text, config)
return result
async def _medium_compression(
self,
text: str,
config: CompressionConfig
) -> CompressionResult:
"""Medium compression using extraction."""
if config.query:
# Query-focused extraction
result = await self.extractor.extract(
text, config.max_tokens, config.query
)
else:
# General sentence selection
result = self.sentence_compressor.compress(
text, config.max_tokens
)
return result
async def _heavy_compression(
self,
text: str,
config: CompressionConfig
) -> CompressionResult:
"""Heavy compression using summarization."""
result = await self.summarizer.summarize(
text, config.max_tokens, config.query
)
return result
class AdaptiveCompressor:
"""Adaptively choose compression strategy."""
def __init__(
self,
compressors: dict[str, Any],
token_counter: TokenCounter
):
self.compressors = compressors
self.counter = token_counter
async def compress(
self,
text: str,
max_tokens: int,
context: dict = None
) -> CompressionResult:
"""Adaptively compress based on content."""
original_tokens = self.counter.count(text)
if original_tokens <= max_tokens:
return CompressionResult(
original_text=text,
compressed_text=text,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
compression_ratio=1.0,
strategy=CompressionStrategy.HYBRID
)
# Analyze content
content_type = self._analyze_content(text)
# Choose strategy
strategy = self._choose_strategy(content_type, context)
# Apply compression
compressor = self.compressors.get(strategy)
if hasattr(compressor, 'compress'):
if asyncio.iscoroutinefunction(compressor.compress):
result = await compressor.compress(text, max_tokens)
else:
result = compressor.compress(text, max_tokens)
else:
# Fallback to truncation
result = TruncationCompressor(self.counter).compress(text, max_tokens)
return result
def _analyze_content(self, text: str) -> str:
"""Analyze content type."""
# Check for structured content
if text.count("\n") > 10 and text.count(":") > 5:
return "structured"
# Check for narrative content
sentences = text.split(".")
avg_sentence_len = sum(len(s.split()) for s in sentences) / max(len(sentences), 1)
if avg_sentence_len > 15:
return "narrative"
# Check for technical content
technical_terms = ["function", "class", "import", "def", "return", "if", "for"]
if any(term in text.lower() for term in technical_terms):
return "technical"
return "general"
def _choose_strategy(
self,
content_type: str,
context: dict = None
) -> str:
"""Choose compression strategy."""
if context and context.get("query"):
return "extractive"
if content_type == "structured":
return "extractive"
elif content_type == "narrative":
return "summarize"
elif content_type == "technical":
return "extractive"
else:
return "sentence"
class CompressionPipeline:
"""Pipeline for multi-stage compression."""
def __init__(self, token_counter: TokenCounter):
self.counter = token_counter
self.stages: list[Callable] = []
def add_stage(self, compressor: Callable):
"""Add compression stage."""
self.stages.append(compressor)
async def compress(
self,
text: str,
max_tokens: int
) -> CompressionResult:
"""Run compression pipeline."""
original_tokens = self.counter.count(text)
current_text = text
for stage in self.stages:
current_tokens = self.counter.count(current_text)
if current_tokens <= max_tokens:
break
if asyncio.iscoroutinefunction(stage):
result = await stage(current_text, max_tokens)
else:
result = stage(current_text, max_tokens)
if hasattr(result, 'compressed_text'):
current_text = result.compressed_text
else:
current_text = result
compressed_tokens = self.counter.count(current_text)
return CompressionResult(
original_text=text,
compressed_text=current_text,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
compression_ratio=compressed_tokens / original_tokens,
strategy=CompressionStrategy.HYBRID
)
Production Compression Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
app = FastAPI()
class CompressRequest(BaseModel):
text: str
max_tokens: int
strategy: str = "hybrid"
query: Optional[str] = None
class CompressResponse(BaseModel):
compressed_text: str
original_tokens: int
compressed_tokens: int
compression_ratio: float
strategy: str
# Initialize components
token_counter = TokenCounter()
truncator = TruncationCompressor(token_counter)
sentence_compressor = SentenceCompressor(token_counter)
@app.post("/v1/compress", response_model=CompressResponse)
async def compress_text(request: CompressRequest) -> CompressResponse:
"""Compress text."""
if request.strategy == "truncate":
result = truncator.compress(request.text, request.max_tokens)
elif request.strategy == "sentence":
result = sentence_compressor.compress(
request.text, request.max_tokens, request.query
)
else:
# Default to sentence compression
result = sentence_compressor.compress(
request.text, request.max_tokens, request.query
)
return CompressResponse(
compressed_text=result.compressed_text,
original_tokens=result.original_tokens,
compressed_tokens=result.compressed_tokens,
compression_ratio=result.compression_ratio,
strategy=result.strategy.value
)
@app.post("/v1/compress/batch")
async def compress_batch(texts: List[str], max_tokens: int) -> List[CompressResponse]:
"""Compress multiple texts."""
results = []
for text in texts:
result = sentence_compressor.compress(text, max_tokens)
results.append(CompressResponse(
compressed_text=result.compressed_text,
original_tokens=result.original_tokens,
compressed_tokens=result.compressed_tokens,
compression_ratio=result.compression_ratio,
strategy=result.strategy.value
))
return results
@app.get("/v1/tokens/count")
async def count_tokens(text: str) -> dict:
"""Count tokens in text."""
count = token_counter.count(text)
return {"tokens": count, "characters": len(text)}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LLMLingua: https://github.com/microsoft/LLMLingua
- tiktoken: https://github.com/openai/tiktoken
- LangChain Text Splitters: https://python.langchain.com/docs/modules/data_connection/document_transformers/
- Sentence Transformers: https://www.sbert.net/
Conclusion
Context compression is about making intelligent trade-offs between information density and token budget. Start with the simplest approach that works: truncation is fast and predictable, sentence selection preserves key content, and summarization creates the most compact representations. The best strategy depends on your use case—query-focused extraction works well for RAG systems where you know what information you need, while hierarchical summarization handles long documents that need holistic understanding. Hybrid approaches combine multiple strategies: extract relevant sections first, then summarize if still over budget. Always measure compression quality, not just compression ratio—a 10x compression that loses critical information is worse than a 2x compression that preserves it. For production systems, cache compressed versions of frequently-accessed documents and use adaptive strategies that choose compression methods based on content type and query context. The goal isn't minimal tokens—it's maximum relevant information per token.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.
