Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Context Compression Techniques: Fitting More Information into Limited Token Budgets

Introduction: Context window limits are one of the most frustrating constraints when building LLM applications. You have a 100-page document but only 8K tokens of context. You want to include conversation history but it’s eating into your prompt budget. Context compression techniques solve this by reducing the token count while preserving the information that matters. This isn’t just about summarization—it’s about intelligent extraction, selective pruning, and efficient encoding that maintains semantic fidelity. The best compression strategies are task-aware: they know what information the model needs and ruthlessly eliminate everything else. This guide covers practical techniques from simple truncation to sophisticated extractive compression, helping you fit more relevant context into limited token budgets while maintaining output quality.

Context Compression Techniques
Context Compression: Extraction, Summarization, Encoding

Basic Compression Strategies

from dataclasses import dataclass, field
from typing import Any, Optional, List, Callable
from enum import Enum
import re

class CompressionStrategy(Enum):
    """Compression strategy types."""
    
    TRUNCATE = "truncate"
    SUMMARIZE = "summarize"
    EXTRACT = "extract"
    HYBRID = "hybrid"

@dataclass
class CompressionResult:
    """Result of context compression."""
    
    original_text: str
    compressed_text: str
    original_tokens: int
    compressed_tokens: int
    compression_ratio: float
    strategy: CompressionStrategy
    metadata: dict = field(default_factory=dict)

class TokenCounter:
    """Count tokens for text."""
    
    def __init__(self, model: str = "gpt-4"):
        self.model = model
        self._encoder = None
    
    @property
    def encoder(self):
        if self._encoder is None:
            import tiktoken
            self._encoder = tiktoken.encoding_for_model(self.model)
        return self._encoder
    
    def count(self, text: str) -> int:
        """Count tokens in text."""
        
        return len(self.encoder.encode(text))
    
    def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
        """Truncate text to max tokens."""
        
        tokens = self.encoder.encode(text)
        
        if len(tokens) <= max_tokens:
            return text
        
        return self.encoder.decode(tokens[:max_tokens])

class TruncationCompressor:
    """Simple truncation-based compression."""
    
    def __init__(self, token_counter: TokenCounter):
        self.counter = token_counter
    
    def compress(
        self,
        text: str,
        max_tokens: int,
        strategy: str = "end"
    ) -> CompressionResult:
        """Compress by truncation."""
        
        original_tokens = self.counter.count(text)
        
        if original_tokens <= max_tokens:
            return CompressionResult(
                original_text=text,
                compressed_text=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0,
                strategy=CompressionStrategy.TRUNCATE
            )
        
        if strategy == "end":
            compressed = self._truncate_end(text, max_tokens)
        elif strategy == "start":
            compressed = self._truncate_start(text, max_tokens)
        elif strategy == "middle":
            compressed = self._truncate_middle(text, max_tokens)
        else:
            compressed = self._truncate_end(text, max_tokens)
        
        compressed_tokens = self.counter.count(compressed)
        
        return CompressionResult(
            original_text=text,
            compressed_text=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.TRUNCATE
        )
    
    def _truncate_end(self, text: str, max_tokens: int) -> str:
        """Keep beginning, truncate end."""
        
        return self.counter.truncate_to_tokens(text, max_tokens)
    
    def _truncate_start(self, text: str, max_tokens: int) -> str:
        """Keep end, truncate beginning."""
        
        tokens = self.counter.encoder.encode(text)
        kept = tokens[-max_tokens:]
        return self.counter.encoder.decode(kept)
    
    def _truncate_middle(self, text: str, max_tokens: int) -> str:
        """Keep beginning and end, truncate middle."""
        
        tokens = self.counter.encoder.encode(text)
        
        half = max_tokens // 2
        start = tokens[:half]
        end = tokens[-half:]
        
        start_text = self.counter.encoder.decode(start)
        end_text = self.counter.encoder.decode(end)
        
        return f"{start_text}\n\n[...content truncated...]\n\n{end_text}"

class SentenceCompressor:
    """Compress by selecting important sentences."""
    
    def __init__(self, token_counter: TokenCounter):
        self.counter = token_counter
    
    def compress(
        self,
        text: str,
        max_tokens: int,
        query: str = None
    ) -> CompressionResult:
        """Compress by selecting sentences."""
        
        original_tokens = self.counter.count(text)
        
        if original_tokens <= max_tokens:
            return CompressionResult(
                original_text=text,
                compressed_text=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0,
                strategy=CompressionStrategy.EXTRACT
            )
        
        # Split into sentences
        sentences = self._split_sentences(text)
        
        # Score sentences
        if query:
            scored = self._score_by_relevance(sentences, query)
        else:
            scored = self._score_by_position(sentences)
        
        # Select sentences within budget
        selected = self._select_within_budget(scored, max_tokens)
        
        # Reconstruct in original order
        compressed = self._reconstruct(sentences, selected)
        compressed_tokens = self.counter.count(compressed)
        
        return CompressionResult(
            original_text=text,
            compressed_text=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.EXTRACT
        )
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split text into sentences."""
        
        # Simple sentence splitting
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def _score_by_relevance(
        self,
        sentences: list[str],
        query: str
    ) -> list[tuple[int, str, float]]:
        """Score sentences by relevance to query."""
        
        query_words = set(query.lower().split())
        scored = []
        
        for i, sentence in enumerate(sentences):
            sentence_words = set(sentence.lower().split())
            overlap = len(query_words & sentence_words)
            score = overlap / max(len(query_words), 1)
            scored.append((i, sentence, score))
        
        return scored
    
    def _score_by_position(
        self,
        sentences: list[str]
    ) -> list[tuple[int, str, float]]:
        """Score sentences by position (beginning and end higher)."""
        
        n = len(sentences)
        scored = []
        
        for i, sentence in enumerate(sentences):
            # U-shaped scoring: high at start and end
            if i < n * 0.2:  # First 20%
                score = 1.0 - (i / (n * 0.2)) * 0.3
            elif i > n * 0.8:  # Last 20%
                score = 0.7 + ((i - n * 0.8) / (n * 0.2)) * 0.3
            else:  # Middle
                score = 0.5
            
            scored.append((i, sentence, score))
        
        return scored
    
    def _select_within_budget(
        self,
        scored: list[tuple[int, str, float]],
        max_tokens: int
    ) -> set[int]:
        """Select sentences within token budget."""
        
        # Sort by score descending
        sorted_scored = sorted(scored, key=lambda x: x[2], reverse=True)
        
        selected = set()
        current_tokens = 0
        
        for idx, sentence, score in sorted_scored:
            sentence_tokens = self.counter.count(sentence)
            
            if current_tokens + sentence_tokens <= max_tokens:
                selected.add(idx)
                current_tokens += sentence_tokens
        
        return selected
    
    def _reconstruct(
        self,
        sentences: list[str],
        selected: set[int]
    ) -> str:
        """Reconstruct text from selected sentences."""
        
        result = []
        
        for i, sentence in enumerate(sentences):
            if i in selected:
                result.append(sentence)
        
        return " ".join(result)

LLM-Based Summarization

from dataclasses import dataclass
from typing import Any, Optional, List
import asyncio

class LLMSummarizer:
    """Use LLM to summarize context."""
    
    def __init__(self, llm_client: Any, token_counter: TokenCounter):
        self.llm = llm_client
        self.counter = token_counter
    
    async def summarize(
        self,
        text: str,
        max_tokens: int,
        focus: str = None
    ) -> CompressionResult:
        """Summarize text using LLM."""
        
        original_tokens = self.counter.count(text)
        
        if original_tokens <= max_tokens:
            return CompressionResult(
                original_text=text,
                compressed_text=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0,
                strategy=CompressionStrategy.SUMMARIZE
            )
        
        # Calculate target length
        target_words = max_tokens * 0.75  # Rough token to word ratio
        
        prompt = self._build_prompt(text, target_words, focus)
        
        response = await self.llm.generate(prompt)
        compressed = response.strip()
        
        # Ensure within budget
        compressed_tokens = self.counter.count(compressed)
        if compressed_tokens > max_tokens:
            compressed = self.counter.truncate_to_tokens(compressed, max_tokens)
            compressed_tokens = max_tokens
        
        return CompressionResult(
            original_text=text,
            compressed_text=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.SUMMARIZE
        )
    
    def _build_prompt(
        self,
        text: str,
        target_words: float,
        focus: str = None
    ) -> str:
        """Build summarization prompt."""
        
        focus_instruction = ""
        if focus:
            focus_instruction = f"\nFocus on information relevant to: {focus}"
        
        return f"""Summarize the following text in approximately {int(target_words)} words.
Preserve key facts, names, numbers, and important details.
Write in a clear, concise style.{focus_instruction}

Text to summarize:
{text}

Summary:"""

class HierarchicalSummarizer:
    """Summarize long documents hierarchically."""
    
    def __init__(
        self,
        llm_client: Any,
        token_counter: TokenCounter,
        chunk_size: int = 2000
    ):
        self.llm = llm_client
        self.counter = token_counter
        self.chunk_size = chunk_size
    
    async def summarize(
        self,
        text: str,
        max_tokens: int,
        focus: str = None
    ) -> CompressionResult:
        """Hierarchically summarize long text."""
        
        original_tokens = self.counter.count(text)
        
        if original_tokens <= max_tokens:
            return CompressionResult(
                original_text=text,
                compressed_text=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0,
                strategy=CompressionStrategy.SUMMARIZE
            )
        
        # Split into chunks
        chunks = self._split_into_chunks(text)
        
        # Summarize each chunk
        chunk_summaries = await self._summarize_chunks(chunks, focus)
        
        # Combine summaries
        combined = "\n\n".join(chunk_summaries)
        
        # If still too long, summarize again
        if self.counter.count(combined) > max_tokens:
            final_summary = await self._final_summarize(combined, max_tokens, focus)
        else:
            final_summary = combined
        
        compressed_tokens = self.counter.count(final_summary)
        
        return CompressionResult(
            original_text=text,
            compressed_text=final_summary,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.SUMMARIZE,
            metadata={"chunks": len(chunks)}
        )
    
    def _split_into_chunks(self, text: str) -> list[str]:
        """Split text into chunks."""
        
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        paragraphs = text.split("\n\n")
        
        for para in paragraphs:
            para_tokens = self.counter.count(para)
            
            if current_tokens + para_tokens > self.chunk_size:
                if current_chunk:
                    chunks.append("\n\n".join(current_chunk))
                current_chunk = [para]
                current_tokens = para_tokens
            else:
                current_chunk.append(para)
                current_tokens += para_tokens
        
        if current_chunk:
            chunks.append("\n\n".join(current_chunk))
        
        return chunks
    
    async def _summarize_chunks(
        self,
        chunks: list[str],
        focus: str = None
    ) -> list[str]:
        """Summarize all chunks."""
        
        tasks = []
        
        for chunk in chunks:
            task = self._summarize_chunk(chunk, focus)
            tasks.append(task)
        
        return await asyncio.gather(*tasks)
    
    async def _summarize_chunk(
        self,
        chunk: str,
        focus: str = None
    ) -> str:
        """Summarize a single chunk."""
        
        focus_instruction = ""
        if focus:
            focus_instruction = f"\nFocus on: {focus}"
        
        prompt = f"""Summarize this text concisely, preserving key information:{focus_instruction}

{chunk}

Summary:"""
        
        return await self.llm.generate(prompt)
    
    async def _final_summarize(
        self,
        text: str,
        max_tokens: int,
        focus: str = None
    ) -> str:
        """Final summarization pass."""
        
        target_words = max_tokens * 0.75
        
        focus_instruction = ""
        if focus:
            focus_instruction = f"\nFocus on: {focus}"
        
        prompt = f"""Create a final summary in approximately {int(target_words)} words.
Combine and synthesize the key points.{focus_instruction}

{text}

Final Summary:"""
        
        response = await self.llm.generate(prompt)
        
        # Ensure within budget
        if self.counter.count(response) > max_tokens:
            response = self.counter.truncate_to_tokens(response, max_tokens)
        
        return response

class QueryFocusedSummarizer:
    """Summarize with focus on answering a query."""
    
    def __init__(self, llm_client: Any, token_counter: TokenCounter):
        self.llm = llm_client
        self.counter = token_counter
    
    async def summarize(
        self,
        text: str,
        query: str,
        max_tokens: int
    ) -> CompressionResult:
        """Summarize focusing on query-relevant information."""
        
        original_tokens = self.counter.count(text)
        
        target_words = max_tokens * 0.75
        
        prompt = f"""Extract and summarize information from the text that is relevant to answering this question:

Question: {query}

Text:
{text}

Provide a focused summary (approximately {int(target_words)} words) containing only information relevant to the question:"""
        
        response = await self.llm.generate(prompt)
        compressed = response.strip()
        
        if self.counter.count(compressed) > max_tokens:
            compressed = self.counter.truncate_to_tokens(compressed, max_tokens)
        
        compressed_tokens = self.counter.count(compressed)
        
        return CompressionResult(
            original_text=text,
            compressed_text=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.SUMMARIZE,
            metadata={"query": query}
        )

Extractive Compression

from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np

class EmbeddingExtractor:
    """Extract relevant content using embeddings."""
    
    def __init__(
        self,
        embedding_model: Any,
        token_counter: TokenCounter
    ):
        self.embedder = embedding_model
        self.counter = token_counter
    
    async def extract(
        self,
        text: str,
        query: str,
        max_tokens: int
    ) -> CompressionResult:
        """Extract query-relevant content."""
        
        original_tokens = self.counter.count(text)
        
        if original_tokens <= max_tokens:
            return CompressionResult(
                original_text=text,
                compressed_text=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0,
                strategy=CompressionStrategy.EXTRACT
            )
        
        # Split into chunks
        chunks = self._split_into_chunks(text)
        
        # Embed query and chunks
        query_embedding = await self.embedder.embed(query)
        chunk_embeddings = await self.embedder.embed_batch([c for c, _ in chunks])
        
        # Score chunks by similarity
        scores = self._calculate_similarities(query_embedding, chunk_embeddings)
        
        # Select top chunks within budget
        selected = self._select_chunks(chunks, scores, max_tokens)
        
        compressed = "\n\n".join(selected)
        compressed_tokens = self.counter.count(compressed)
        
        return CompressionResult(
            original_text=text,
            compressed_text=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.EXTRACT
        )
    
    def _split_into_chunks(
        self,
        text: str,
        chunk_size: int = 200
    ) -> list[tuple[str, int]]:
        """Split text into chunks with positions."""
        
        chunks = []
        paragraphs = text.split("\n\n")
        
        for i, para in enumerate(paragraphs):
            if para.strip():
                chunks.append((para.strip(), i))
        
        return chunks
    
    def _calculate_similarities(
        self,
        query_embedding: np.ndarray,
        chunk_embeddings: list[np.ndarray]
    ) -> list[float]:
        """Calculate cosine similarities."""
        
        similarities = []
        
        for chunk_emb in chunk_embeddings:
            sim = np.dot(query_embedding, chunk_emb) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(chunk_emb)
            )
            similarities.append(float(sim))
        
        return similarities
    
    def _select_chunks(
        self,
        chunks: list[tuple[str, int]],
        scores: list[float],
        max_tokens: int
    ) -> list[str]:
        """Select chunks within token budget."""
        
        # Sort by score
        scored = list(zip(chunks, scores))
        scored.sort(key=lambda x: x[1], reverse=True)
        
        selected = []
        current_tokens = 0
        
        for (chunk, pos), score in scored:
            chunk_tokens = self.counter.count(chunk)
            
            if current_tokens + chunk_tokens <= max_tokens:
                selected.append((chunk, pos))
                current_tokens += chunk_tokens
        
        # Sort by original position
        selected.sort(key=lambda x: x[1])
        
        return [chunk for chunk, _ in selected]

class KeyphraseExtractor:
    """Extract key phrases and sentences."""
    
    def __init__(self, token_counter: TokenCounter):
        self.counter = token_counter
    
    def extract(
        self,
        text: str,
        max_tokens: int
    ) -> CompressionResult:
        """Extract key phrases."""
        
        original_tokens = self.counter.count(text)
        
        if original_tokens <= max_tokens:
            return CompressionResult(
                original_text=text,
                compressed_text=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0,
                strategy=CompressionStrategy.EXTRACT
            )
        
        # Extract key sentences
        sentences = self._split_sentences(text)
        scored = self._score_sentences(sentences)
        
        # Select within budget
        selected = self._select_within_budget(scored, max_tokens)
        
        compressed = " ".join(selected)
        compressed_tokens = self.counter.count(compressed)
        
        return CompressionResult(
            original_text=text,
            compressed_text=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.EXTRACT
        )
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split into sentences."""
        
        import re
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def _score_sentences(
        self,
        sentences: list[str]
    ) -> list[tuple[str, float, int]]:
        """Score sentences by importance."""
        
        # Calculate word frequencies
        all_words = []
        for sentence in sentences:
            words = sentence.lower().split()
            all_words.extend(words)
        
        word_freq = {}
        for word in all_words:
            word_freq[word] = word_freq.get(word, 0) + 1
        
        # Score sentences
        scored = []
        for i, sentence in enumerate(sentences):
            words = sentence.lower().split()
            
            if not words:
                continue
            
            # TF-based score
            score = sum(word_freq.get(w, 0) for w in words) / len(words)
            
            # Position bonus
            if i < len(sentences) * 0.2:
                score *= 1.2
            elif i > len(sentences) * 0.8:
                score *= 1.1
            
            # Length penalty for very short sentences
            if len(words) < 5:
                score *= 0.8
            
            scored.append((sentence, score, i))
        
        return scored
    
    def _select_within_budget(
        self,
        scored: list[tuple[str, float, int]],
        max_tokens: int
    ) -> list[str]:
        """Select sentences within budget."""
        
        # Sort by score
        sorted_scored = sorted(scored, key=lambda x: x[1], reverse=True)
        
        selected = []
        current_tokens = 0
        
        for sentence, score, pos in sorted_scored:
            tokens = self.counter.count(sentence)
            
            if current_tokens + tokens <= max_tokens:
                selected.append((sentence, pos))
                current_tokens += tokens
        
        # Sort by position
        selected.sort(key=lambda x: x[1])
        
        return [s for s, _ in selected]

class LLMExtractor:
    """Use LLM to extract key information."""
    
    def __init__(self, llm_client: Any, token_counter: TokenCounter):
        self.llm = llm_client
        self.counter = token_counter
    
    async def extract(
        self,
        text: str,
        max_tokens: int,
        extraction_type: str = "key_points"
    ) -> CompressionResult:
        """Extract key information using LLM."""
        
        original_tokens = self.counter.count(text)
        
        target_words = max_tokens * 0.75
        
        if extraction_type == "key_points":
            prompt = self._key_points_prompt(text, target_words)
        elif extraction_type == "facts":
            prompt = self._facts_prompt(text, target_words)
        elif extraction_type == "entities":
            prompt = self._entities_prompt(text, target_words)
        else:
            prompt = self._key_points_prompt(text, target_words)
        
        response = await self.llm.generate(prompt)
        compressed = response.strip()
        
        if self.counter.count(compressed) > max_tokens:
            compressed = self.counter.truncate_to_tokens(compressed, max_tokens)
        
        compressed_tokens = self.counter.count(compressed)
        
        return CompressionResult(
            original_text=text,
            compressed_text=compressed,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.EXTRACT,
            metadata={"extraction_type": extraction_type}
        )
    
    def _key_points_prompt(self, text: str, target_words: float) -> str:
        return f"""Extract the key points from this text in approximately {int(target_words)} words.
List the most important information as concise bullet points.

Text:
{text}

Key Points:"""
    
    def _facts_prompt(self, text: str, target_words: float) -> str:
        return f"""Extract factual information from this text in approximately {int(target_words)} words.
Include names, dates, numbers, and verifiable facts.

Text:
{text}

Facts:"""
    
    def _entities_prompt(self, text: str, target_words: float) -> str:
        return f"""Extract named entities and their relationships from this text in approximately {int(target_words)} words.
Include people, organizations, locations, and their connections.

Text:
{text}

Entities and Relationships:"""

Hybrid Compression Pipeline

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import asyncio

@dataclass
class CompressionConfig:
    """Configuration for compression pipeline."""
    
    max_tokens: int
    strategy: CompressionStrategy = CompressionStrategy.HYBRID
    query: str = None
    preserve_structure: bool = True
    min_compression_ratio: float = 0.1

class HybridCompressor:
    """Combine multiple compression strategies."""
    
    def __init__(
        self,
        llm_client: Any,
        embedding_model: Any,
        token_counter: TokenCounter
    ):
        self.llm = llm_client
        self.embedder = embedding_model
        self.counter = token_counter
        
        # Initialize compressors
        self.truncator = TruncationCompressor(token_counter)
        self.sentence_compressor = SentenceCompressor(token_counter)
        self.summarizer = LLMSummarizer(llm_client, token_counter)
        self.extractor = EmbeddingExtractor(embedding_model, token_counter)
    
    async def compress(
        self,
        text: str,
        config: CompressionConfig
    ) -> CompressionResult:
        """Compress using hybrid strategy."""
        
        original_tokens = self.counter.count(text)
        
        if original_tokens <= config.max_tokens:
            return CompressionResult(
                original_text=text,
                compressed_text=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0,
                strategy=config.strategy
            )
        
        # Determine compression approach based on ratio needed
        target_ratio = config.max_tokens / original_tokens
        
        if target_ratio > 0.7:
            # Light compression: sentence selection
            result = self.sentence_compressor.compress(
                text, config.max_tokens, config.query
            )
        elif target_ratio > 0.3:
            # Medium compression: extractive + light summarization
            result = await self._medium_compression(text, config)
        else:
            # Heavy compression: full summarization
            result = await self._heavy_compression(text, config)
        
        return result
    
    async def _medium_compression(
        self,
        text: str,
        config: CompressionConfig
    ) -> CompressionResult:
        """Medium compression using extraction."""
        
        if config.query:
            # Query-focused extraction
            result = await self.extractor.extract(
                text, config.max_tokens, config.query
            )
        else:
            # General sentence selection
            result = self.sentence_compressor.compress(
                text, config.max_tokens
            )
        
        return result
    
    async def _heavy_compression(
        self,
        text: str,
        config: CompressionConfig
    ) -> CompressionResult:
        """Heavy compression using summarization."""
        
        result = await self.summarizer.summarize(
            text, config.max_tokens, config.query
        )
        
        return result

class AdaptiveCompressor:
    """Adaptively choose compression strategy."""
    
    def __init__(
        self,
        compressors: dict[str, Any],
        token_counter: TokenCounter
    ):
        self.compressors = compressors
        self.counter = token_counter
    
    async def compress(
        self,
        text: str,
        max_tokens: int,
        context: dict = None
    ) -> CompressionResult:
        """Adaptively compress based on content."""
        
        original_tokens = self.counter.count(text)
        
        if original_tokens <= max_tokens:
            return CompressionResult(
                original_text=text,
                compressed_text=text,
                original_tokens=original_tokens,
                compressed_tokens=original_tokens,
                compression_ratio=1.0,
                strategy=CompressionStrategy.HYBRID
            )
        
        # Analyze content
        content_type = self._analyze_content(text)
        
        # Choose strategy
        strategy = self._choose_strategy(content_type, context)
        
        # Apply compression
        compressor = self.compressors.get(strategy)
        
        if hasattr(compressor, 'compress'):
            if asyncio.iscoroutinefunction(compressor.compress):
                result = await compressor.compress(text, max_tokens)
            else:
                result = compressor.compress(text, max_tokens)
        else:
            # Fallback to truncation
            result = TruncationCompressor(self.counter).compress(text, max_tokens)
        
        return result
    
    def _analyze_content(self, text: str) -> str:
        """Analyze content type."""
        
        # Check for structured content
        if text.count("\n") > 10 and text.count(":") > 5:
            return "structured"
        
        # Check for narrative content
        sentences = text.split(".")
        avg_sentence_len = sum(len(s.split()) for s in sentences) / max(len(sentences), 1)
        
        if avg_sentence_len > 15:
            return "narrative"
        
        # Check for technical content
        technical_terms = ["function", "class", "import", "def", "return", "if", "for"]
        if any(term in text.lower() for term in technical_terms):
            return "technical"
        
        return "general"
    
    def _choose_strategy(
        self,
        content_type: str,
        context: dict = None
    ) -> str:
        """Choose compression strategy."""
        
        if context and context.get("query"):
            return "extractive"
        
        if content_type == "structured":
            return "extractive"
        elif content_type == "narrative":
            return "summarize"
        elif content_type == "technical":
            return "extractive"
        else:
            return "sentence"

class CompressionPipeline:
    """Pipeline for multi-stage compression."""
    
    def __init__(self, token_counter: TokenCounter):
        self.counter = token_counter
        self.stages: list[Callable] = []
    
    def add_stage(self, compressor: Callable):
        """Add compression stage."""
        
        self.stages.append(compressor)
    
    async def compress(
        self,
        text: str,
        max_tokens: int
    ) -> CompressionResult:
        """Run compression pipeline."""
        
        original_tokens = self.counter.count(text)
        current_text = text
        
        for stage in self.stages:
            current_tokens = self.counter.count(current_text)
            
            if current_tokens <= max_tokens:
                break
            
            if asyncio.iscoroutinefunction(stage):
                result = await stage(current_text, max_tokens)
            else:
                result = stage(current_text, max_tokens)
            
            if hasattr(result, 'compressed_text'):
                current_text = result.compressed_text
            else:
                current_text = result
        
        compressed_tokens = self.counter.count(current_text)
        
        return CompressionResult(
            original_text=text,
            compressed_text=current_text,
            original_tokens=original_tokens,
            compressed_tokens=compressed_tokens,
            compression_ratio=compressed_tokens / original_tokens,
            strategy=CompressionStrategy.HYBRID
        )

Production Compression Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List

app = FastAPI()

class CompressRequest(BaseModel):
    text: str
    max_tokens: int
    strategy: str = "hybrid"
    query: Optional[str] = None

class CompressResponse(BaseModel):
    compressed_text: str
    original_tokens: int
    compressed_tokens: int
    compression_ratio: float
    strategy: str

# Initialize components
token_counter = TokenCounter()
truncator = TruncationCompressor(token_counter)
sentence_compressor = SentenceCompressor(token_counter)

@app.post("/v1/compress", response_model=CompressResponse)
async def compress_text(request: CompressRequest) -> CompressResponse:
    """Compress text."""
    
    if request.strategy == "truncate":
        result = truncator.compress(request.text, request.max_tokens)
    elif request.strategy == "sentence":
        result = sentence_compressor.compress(
            request.text, request.max_tokens, request.query
        )
    else:
        # Default to sentence compression
        result = sentence_compressor.compress(
            request.text, request.max_tokens, request.query
        )
    
    return CompressResponse(
        compressed_text=result.compressed_text,
        original_tokens=result.original_tokens,
        compressed_tokens=result.compressed_tokens,
        compression_ratio=result.compression_ratio,
        strategy=result.strategy.value
    )

@app.post("/v1/compress/batch")
async def compress_batch(texts: List[str], max_tokens: int) -> List[CompressResponse]:
    """Compress multiple texts."""
    
    results = []
    
    for text in texts:
        result = sentence_compressor.compress(text, max_tokens)
        results.append(CompressResponse(
            compressed_text=result.compressed_text,
            original_tokens=result.original_tokens,
            compressed_tokens=result.compressed_tokens,
            compression_ratio=result.compression_ratio,
            strategy=result.strategy.value
        ))
    
    return results

@app.get("/v1/tokens/count")
async def count_tokens(text: str) -> dict:
    """Count tokens in text."""
    
    count = token_counter.count(text)
    return {"tokens": count, "characters": len(text)}

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Context compression is about making intelligent trade-offs between information density and token budget. Start with the simplest approach that works: truncation is fast and predictable, sentence selection preserves key content, and summarization creates the most compact representations. The best strategy depends on your use case—query-focused extraction works well for RAG systems where you know what information you need, while hierarchical summarization handles long documents that need holistic understanding. Hybrid approaches combine multiple strategies: extract relevant sections first, then summarize if still over budget. Always measure compression quality, not just compression ratio—a 10x compression that loses critical information is worse than a 2x compression that preserves it. For production systems, cache compressed versions of frequently-accessed documents and use adaptive strategies that choose compression methods based on content type and query context. The goal isn't minimal tokens—it's maximum relevant information per token.