Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Retrieval Reranking Techniques: From Cross-Encoders to LLM-Based Scoring

Introduction: Initial retrieval casts a wide net—vector search or keyword matching returns candidates that might be relevant. Reranking narrows the focus, using more expensive but accurate models to score each candidate against the query. Cross-encoders process query-document pairs together, capturing fine-grained semantic relationships that bi-encoders miss. This two-stage approach balances efficiency with accuracy: fast retrieval gets candidates, slow reranking picks the best. This guide covers practical reranking strategies: implementing cross-encoder scoring, choosing between model-based and LLM-based rerankers, handling latency constraints with batching and caching, and combining multiple reranking signals. Whether you’re building search, RAG, or recommendation systems, reranking is often the difference between good and great retrieval quality.

Reranking Pipeline
Reranking Pipeline: Cross-Encoder Scoring, Score Sorting, Threshold Filtering

Cross-Encoder Reranking

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod

@dataclass
class Document:
    """A document to rerank."""
    
    id: str
    content: str
    metadata: dict = field(default_factory=dict)
    initial_score: float = 0.0

@dataclass
class RankedDocument:
    """A reranked document."""
    
    document: Document
    rerank_score: float
    rank: int

class Reranker(ABC):
    """Abstract reranker interface."""
    
    @abstractmethod
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank documents for query."""
        pass

class CrossEncoderReranker(Reranker):
    """Cross-encoder based reranker."""
    
    def __init__(
        self,
        model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    ):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank using cross-encoder."""
        
        if not documents:
            return []
        
        # Create query-document pairs
        pairs = [(query, doc.content) for doc in documents]
        
        # Score all pairs
        scores = self.model.predict(pairs)
        
        # Create ranked results
        ranked = []
        for doc, score in zip(documents, scores):
            ranked.append(RankedDocument(
                document=doc,
                rerank_score=float(score),
                rank=0  # Will be set after sorting
            ))
        
        # Sort by score
        ranked.sort(key=lambda x: x.rerank_score, reverse=True)
        
        # Set ranks
        for i, r in enumerate(ranked):
            r.rank = i + 1
        
        # Apply top_k
        if top_k:
            ranked = ranked[:top_k]
        
        return ranked

class ColBERTReranker(Reranker):
    """ColBERT-based reranker with late interaction."""
    
    def __init__(self, model_path: str):
        # Would load ColBERT model
        self.model_path = model_path
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank using ColBERT late interaction."""
        
        # Encode query tokens
        query_embeddings = self._encode_query(query)
        
        ranked = []
        for doc in documents:
            # Encode document tokens
            doc_embeddings = self._encode_document(doc.content)
            
            # MaxSim scoring
            score = self._maxsim(query_embeddings, doc_embeddings)
            
            ranked.append(RankedDocument(
                document=doc,
                rerank_score=score,
                rank=0
            ))
        
        # Sort and rank
        ranked.sort(key=lambda x: x.rerank_score, reverse=True)
        for i, r in enumerate(ranked):
            r.rank = i + 1
        
        if top_k:
            ranked = ranked[:top_k]
        
        return ranked
    
    def _encode_query(self, query: str) -> list[list[float]]:
        """Encode query to token embeddings."""
        # Would use ColBERT model
        return []
    
    def _encode_document(self, content: str) -> list[list[float]]:
        """Encode document to token embeddings."""
        # Would use ColBERT model
        return []
    
    def _maxsim(
        self,
        query_emb: list[list[float]],
        doc_emb: list[list[float]]
    ) -> float:
        """Compute MaxSim score."""
        
        import numpy as np
        
        if not query_emb or not doc_emb:
            return 0.0
        
        q = np.array(query_emb)
        d = np.array(doc_emb)
        
        # Similarity matrix
        sim = np.dot(q, d.T)
        
        # Max over document tokens for each query token
        max_sim = sim.max(axis=1)
        
        # Sum over query tokens
        return float(max_sim.sum())

LLM-Based Reranking

from dataclasses import dataclass
from typing import Any, Optional
import json

class LLMReranker(Reranker):
    """LLM-based reranker."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini"
    ):
        self.client = client
        self.model = model
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank using LLM."""
        
        if not documents:
            return []
        
        # Format documents for prompt
        doc_list = "\n\n".join([
            f"[{i+1}] {doc.content[:500]}"
            for i, doc in enumerate(documents)
        ])
        
        prompt = f"""Rank these documents by relevance to the query.

Query: {query}

Documents:
{doc_list}

Return a JSON array of document numbers in order of relevance (most relevant first).
Example: [3, 1, 5, 2, 4]

Only return the JSON array, nothing else."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        # Parse ranking
        content = response.choices[0].message.content
        
        import re
        json_match = re.search(r'\[[\d,\s]+\]', content)
        if not json_match:
            # Return original order
            return [
                RankedDocument(doc, doc.initial_score, i+1)
                for i, doc in enumerate(documents)
            ]
        
        ranking = json.loads(json_match.group(0))
        
        # Create ranked results
        ranked = []
        for rank, doc_idx in enumerate(ranking):
            if 1 <= doc_idx <= len(documents):
                doc = documents[doc_idx - 1]
                ranked.append(RankedDocument(
                    document=doc,
                    rerank_score=len(documents) - rank,  # Higher score for better rank
                    rank=rank + 1
                ))
        
        if top_k:
            ranked = ranked[:top_k]
        
        return ranked

class LLMPointwiseReranker(Reranker):
    """LLM reranker with pointwise scoring."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini"
    ):
        self.client = client
        self.model = model
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Score each document independently."""
        
        import asyncio
        
        # Score all documents in parallel
        tasks = [
            self._score_document(query, doc)
            for doc in documents
        ]
        
        scores = await asyncio.gather(*tasks)
        
        # Create ranked results
        ranked = []
        for doc, score in zip(documents, scores):
            ranked.append(RankedDocument(
                document=doc,
                rerank_score=score,
                rank=0
            ))
        
        # Sort and rank
        ranked.sort(key=lambda x: x.rerank_score, reverse=True)
        for i, r in enumerate(ranked):
            r.rank = i + 1
        
        if top_k:
            ranked = ranked[:top_k]
        
        return ranked
    
    async def _score_document(
        self,
        query: str,
        document: Document
    ) -> float:
        """Score single document."""
        
        prompt = f"""Rate how relevant this document is to the query on a scale of 0-10.

Query: {query}

Document: {document.content[:1000]}

Return only a number between 0 and 10."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        try:
            score = float(response.choices[0].message.content.strip())
            return min(10, max(0, score))
        except ValueError:
            return 5.0  # Default score

class LLMPairwiseReranker(Reranker):
    """LLM reranker with pairwise comparison."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini"
    ):
        self.client = client
        self.model = model
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank using pairwise comparisons."""
        
        n = len(documents)
        if n <= 1:
            return [
                RankedDocument(doc, 1.0, i+1)
                for i, doc in enumerate(documents)
            ]
        
        # Win counts for each document
        wins = [0] * n
        
        # Compare all pairs
        import asyncio
        
        comparisons = []
        for i in range(n):
            for j in range(i + 1, n):
                comparisons.append((i, j))
        
        tasks = [
            self._compare_pair(query, documents[i], documents[j])
            for i, j in comparisons
        ]
        
        results = await asyncio.gather(*tasks)
        
        for (i, j), winner in zip(comparisons, results):
            if winner == 0:
                wins[i] += 1
            else:
                wins[j] += 1
        
        # Create ranked results
        ranked = []
        for i, doc in enumerate(documents):
            ranked.append(RankedDocument(
                document=doc,
                rerank_score=wins[i],
                rank=0
            ))
        
        # Sort by wins
        ranked.sort(key=lambda x: x.rerank_score, reverse=True)
        for i, r in enumerate(ranked):
            r.rank = i + 1
        
        if top_k:
            ranked = ranked[:top_k]
        
        return ranked
    
    async def _compare_pair(
        self,
        query: str,
        doc_a: Document,
        doc_b: Document
    ) -> int:
        """Compare two documents. Returns 0 if A wins, 1 if B wins."""
        
        prompt = f"""Which document is more relevant to the query?

Query: {query}

Document A: {doc_a.content[:500]}

Document B: {doc_b.content[:500]}

Answer with just 'A' or 'B'."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        answer = response.choices[0].message.content.strip().upper()
        return 0 if "A" in answer else 1

Ensemble Reranking

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class FusionMethod(Enum):
    """Score fusion methods."""
    
    WEIGHTED_SUM = "weighted_sum"
    RECIPROCAL_RANK = "reciprocal_rank"
    BORDA_COUNT = "borda_count"
    CONDORCET = "condorcet"

@dataclass
class RerankerWeight:
    """Reranker with weight."""
    
    reranker: Reranker
    weight: float = 1.0
    name: str = ""

class EnsembleReranker(Reranker):
    """Ensemble of multiple rerankers."""
    
    def __init__(
        self,
        rerankers: list[RerankerWeight],
        fusion_method: FusionMethod = FusionMethod.RECIPROCAL_RANK
    ):
        self.rerankers = rerankers
        self.fusion_method = fusion_method
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank using ensemble."""
        
        import asyncio
        
        # Get rankings from all rerankers
        tasks = [
            rw.reranker.rerank(query, documents)
            for rw in self.rerankers
        ]
        
        all_rankings = await asyncio.gather(*tasks)
        
        # Fuse rankings
        if self.fusion_method == FusionMethod.WEIGHTED_SUM:
            fused = self._weighted_sum_fusion(all_rankings)
        elif self.fusion_method == FusionMethod.RECIPROCAL_RANK:
            fused = self._rrf_fusion(all_rankings)
        elif self.fusion_method == FusionMethod.BORDA_COUNT:
            fused = self._borda_fusion(all_rankings)
        else:
            fused = self._rrf_fusion(all_rankings)
        
        # Sort and rank
        fused.sort(key=lambda x: x.rerank_score, reverse=True)
        for i, r in enumerate(fused):
            r.rank = i + 1
        
        if top_k:
            fused = fused[:top_k]
        
        return fused
    
    def _weighted_sum_fusion(
        self,
        all_rankings: list[list[RankedDocument]]
    ) -> list[RankedDocument]:
        """Fuse using weighted sum of scores."""
        
        # Normalize scores to [0, 1]
        normalized_rankings = []
        for ranking in all_rankings:
            if not ranking:
                normalized_rankings.append([])
                continue
            
            max_score = max(r.rerank_score for r in ranking)
            min_score = min(r.rerank_score for r in ranking)
            score_range = max_score - min_score or 1
            
            normalized = []
            for r in ranking:
                norm_score = (r.rerank_score - min_score) / score_range
                normalized.append(RankedDocument(
                    document=r.document,
                    rerank_score=norm_score,
                    rank=r.rank
                ))
            normalized_rankings.append(normalized)
        
        # Aggregate scores
        doc_scores: dict[str, float] = {}
        doc_map: dict[str, Document] = {}
        
        for ranking, rw in zip(normalized_rankings, self.rerankers):
            for r in ranking:
                doc_id = r.document.id
                doc_map[doc_id] = r.document
                
                if doc_id not in doc_scores:
                    doc_scores[doc_id] = 0.0
                
                doc_scores[doc_id] += r.rerank_score * rw.weight
        
        # Create results
        results = []
        for doc_id, score in doc_scores.items():
            results.append(RankedDocument(
                document=doc_map[doc_id],
                rerank_score=score,
                rank=0
            ))
        
        return results
    
    def _rrf_fusion(
        self,
        all_rankings: list[list[RankedDocument]],
        k: int = 60
    ) -> list[RankedDocument]:
        """Reciprocal Rank Fusion."""
        
        doc_scores: dict[str, float] = {}
        doc_map: dict[str, Document] = {}
        
        for ranking, rw in zip(all_rankings, self.rerankers):
            for r in ranking:
                doc_id = r.document.id
                doc_map[doc_id] = r.document
                
                if doc_id not in doc_scores:
                    doc_scores[doc_id] = 0.0
                
                # RRF formula: 1 / (k + rank)
                doc_scores[doc_id] += rw.weight / (k + r.rank)
        
        results = []
        for doc_id, score in doc_scores.items():
            results.append(RankedDocument(
                document=doc_map[doc_id],
                rerank_score=score,
                rank=0
            ))
        
        return results
    
    def _borda_fusion(
        self,
        all_rankings: list[list[RankedDocument]]
    ) -> list[RankedDocument]:
        """Borda count fusion."""
        
        doc_scores: dict[str, float] = {}
        doc_map: dict[str, Document] = {}
        
        for ranking, rw in zip(all_rankings, self.rerankers):
            n = len(ranking)
            for r in ranking:
                doc_id = r.document.id
                doc_map[doc_id] = r.document
                
                if doc_id not in doc_scores:
                    doc_scores[doc_id] = 0.0
                
                # Borda: n - rank + 1 points
                doc_scores[doc_id] += (n - r.rank + 1) * rw.weight
        
        results = []
        for doc_id, score in doc_scores.items():
            results.append(RankedDocument(
                document=doc_map[doc_id],
                rerank_score=score,
                rank=0
            ))
        
        return results

class CascadeReranker(Reranker):
    """Cascade of rerankers with progressive filtering."""
    
    def __init__(self, stages: list[tuple[Reranker, int]]):
        """
        stages: list of (reranker, top_k) tuples
        Each stage reranks and keeps top_k for next stage
        """
        self.stages = stages
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank through cascade."""
        
        current_docs = documents
        
        for reranker, stage_top_k in self.stages:
            ranked = await reranker.rerank(query, current_docs, stage_top_k)
            current_docs = [r.document for r in ranked]
        
        # Final ranking
        final_ranked = []
        for i, doc in enumerate(current_docs):
            final_ranked.append(RankedDocument(
                document=doc,
                rerank_score=len(current_docs) - i,
                rank=i + 1
            ))
        
        if top_k:
            final_ranked = final_ranked[:top_k]
        
        return final_ranked

Caching and Optimization

from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime, timedelta
import hashlib
import json

@dataclass
class CacheEntry:
    """Cache entry for reranking results."""
    
    query: str
    doc_ids: list[str]
    ranked_ids: list[str]
    scores: list[float]
    created_at: datetime
    ttl: timedelta

class RerankerCache:
    """Cache for reranking results."""
    
    def __init__(self, ttl_seconds: int = 3600):
        self._cache: dict[str, CacheEntry] = {}
        self.ttl = timedelta(seconds=ttl_seconds)
    
    def _make_key(self, query: str, doc_ids: list[str]) -> str:
        """Create cache key."""
        
        content = json.dumps({
            "query": query,
            "doc_ids": sorted(doc_ids)
        })
        return hashlib.sha256(content.encode()).hexdigest()
    
    def get(
        self,
        query: str,
        doc_ids: list[str]
    ) -> Optional[list[tuple[str, float]]]:
        """Get cached ranking."""
        
        key = self._make_key(query, doc_ids)
        entry = self._cache.get(key)
        
        if not entry:
            return None
        
        # Check expiry
        if datetime.utcnow() > entry.created_at + entry.ttl:
            del self._cache[key]
            return None
        
        return list(zip(entry.ranked_ids, entry.scores))
    
    def set(
        self,
        query: str,
        doc_ids: list[str],
        ranked: list[RankedDocument]
    ):
        """Cache ranking."""
        
        key = self._make_key(query, doc_ids)
        
        self._cache[key] = CacheEntry(
            query=query,
            doc_ids=doc_ids,
            ranked_ids=[r.document.id for r in ranked],
            scores=[r.rerank_score for r in ranked],
            created_at=datetime.utcnow(),
            ttl=self.ttl
        )
    
    def clear(self):
        """Clear cache."""
        self._cache.clear()

class CachedReranker(Reranker):
    """Reranker with caching."""
    
    def __init__(
        self,
        reranker: Reranker,
        cache: RerankerCache = None
    ):
        self.reranker = reranker
        self.cache = cache or RerankerCache()
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank with caching."""
        
        doc_ids = [d.id for d in documents]
        
        # Check cache
        cached = self.cache.get(query, doc_ids)
        if cached:
            # Reconstruct from cache
            id_to_doc = {d.id: d for d in documents}
            ranked = []
            for i, (doc_id, score) in enumerate(cached):
                if doc_id in id_to_doc:
                    ranked.append(RankedDocument(
                        document=id_to_doc[doc_id],
                        rerank_score=score,
                        rank=i + 1
                    ))
            
            if top_k:
                ranked = ranked[:top_k]
            
            return ranked
        
        # Compute ranking
        ranked = await self.reranker.rerank(query, documents, top_k)
        
        # Cache result
        self.cache.set(query, doc_ids, ranked)
        
        return ranked

class BatchReranker(Reranker):
    """Batch reranking for efficiency."""
    
    def __init__(
        self,
        reranker: Reranker,
        batch_size: int = 32
    ):
        self.reranker = reranker
        self.batch_size = batch_size
    
    async def rerank(
        self,
        query: str,
        documents: list[Document],
        top_k: int = None
    ) -> list[RankedDocument]:
        """Rerank in batches."""
        
        if len(documents) <= self.batch_size:
            return await self.reranker.rerank(query, documents, top_k)
        
        # Process in batches
        all_ranked = []
        
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]
            ranked = await self.reranker.rerank(query, batch)
            all_ranked.extend(ranked)
        
        # Re-sort all results
        all_ranked.sort(key=lambda x: x.rerank_score, reverse=True)
        for i, r in enumerate(all_ranked):
            r.rank = i + 1
        
        if top_k:
            all_ranked = all_ranked[:top_k]
        
        return all_ranked

Production Reranking Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from enum import Enum

app = FastAPI()

# Initialize rerankers
cross_encoder = CrossEncoderReranker()
cache = RerankerCache(ttl_seconds=3600)
cached_reranker = CachedReranker(cross_encoder, cache)

class RerankerType(str, Enum):
    CROSS_ENCODER = "cross_encoder"
    LLM = "llm"
    ENSEMBLE = "ensemble"

class DocumentInput(BaseModel):
    id: str
    content: str
    metadata: Optional[dict] = None
    initial_score: Optional[float] = 0.0

class RerankRequest(BaseModel):
    query: str
    documents: list[DocumentInput]
    top_k: Optional[int] = None
    reranker: RerankerType = RerankerType.CROSS_ENCODER
    use_cache: bool = True

class RankedDocumentOutput(BaseModel):
    id: str
    content: str
    rerank_score: float
    rank: int
    metadata: Optional[dict] = None

class RerankResponse(BaseModel):
    query: str
    results: list[RankedDocumentOutput]
    reranker_used: str
    cached: bool

@app.post("/v1/rerank")
async def rerank_documents(request: RerankRequest) -> RerankResponse:
    """Rerank documents for query."""
    
    # Convert to internal format
    documents = [
        Document(
            id=d.id,
            content=d.content,
            metadata=d.metadata or {},
            initial_score=d.initial_score or 0.0
        )
        for d in request.documents
    ]
    
    # Select reranker
    if request.reranker == RerankerType.CROSS_ENCODER:
        reranker = cached_reranker if request.use_cache else cross_encoder
    elif request.reranker == RerankerType.LLM:
        # Would initialize LLM reranker
        reranker = cross_encoder
    else:
        reranker = cross_encoder
    
    # Check if cached
    cached = False
    if request.use_cache:
        doc_ids = [d.id for d in documents]
        cached_result = cache.get(request.query, doc_ids)
        cached = cached_result is not None
    
    # Rerank
    ranked = await reranker.rerank(
        request.query,
        documents,
        request.top_k
    )
    
    # Format response
    results = [
        RankedDocumentOutput(
            id=r.document.id,
            content=r.document.content,
            rerank_score=r.rerank_score,
            rank=r.rank,
            metadata=r.document.metadata
        )
        for r in ranked
    ]
    
    return RerankResponse(
        query=request.query,
        results=results,
        reranker_used=request.reranker.value,
        cached=cached
    )

class CompareRequest(BaseModel):
    query: str
    document_a: DocumentInput
    document_b: DocumentInput

class CompareResponse(BaseModel):
    query: str
    winner: str
    winner_id: str
    confidence: float

@app.post("/v1/compare")
async def compare_documents(request: CompareRequest) -> CompareResponse:
    """Compare two documents for relevance."""
    
    doc_a = Document(
        id=request.document_a.id,
        content=request.document_a.content
    )
    doc_b = Document(
        id=request.document_b.id,
        content=request.document_b.content
    )
    
    # Rerank both
    ranked = await cross_encoder.rerank(
        request.query,
        [doc_a, doc_b]
    )
    
    winner = ranked[0]
    loser = ranked[1]
    
    # Confidence based on score difference
    score_diff = winner.rerank_score - loser.rerank_score
    confidence = min(1.0, score_diff / 10.0)  # Normalize
    
    return CompareResponse(
        query=request.query,
        winner="A" if winner.document.id == doc_a.id else "B",
        winner_id=winner.document.id,
        confidence=confidence
    )

class ScoreRequest(BaseModel):
    query: str
    document: DocumentInput

class ScoreResponse(BaseModel):
    query: str
    document_id: str
    relevance_score: float

@app.post("/v1/score")
async def score_document(request: ScoreRequest) -> ScoreResponse:
    """Score single document relevance."""
    
    doc = Document(
        id=request.document.id,
        content=request.document.content
    )
    
    ranked = await cross_encoder.rerank(request.query, [doc])
    
    return ScoreResponse(
        query=request.query,
        document_id=doc.id,
        relevance_score=ranked[0].rerank_score if ranked else 0.0
    )

@app.delete("/v1/cache")
async def clear_cache():
    """Clear reranking cache."""
    
    cache.clear()
    return {"status": "cleared"}

@app.get("/v1/cache/stats")
async def cache_stats():
    """Get cache statistics."""
    
    return {
        "entries": len(cache._cache),
        "ttl_seconds": cache.ttl.total_seconds()
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Reranking transforms good retrieval into great retrieval. Start with cross-encoders—they process query-document pairs together, capturing semantic relationships that bi-encoders miss during initial retrieval. Choose models based on your latency budget: smaller cross-encoders for real-time applications, larger models or LLM-based rerankers when accuracy matters more than speed. LLM rerankers offer flexibility through listwise, pointwise, or pairwise approaches, each with different cost-accuracy tradeoffs. Combine multiple rerankers using ensemble methods like Reciprocal Rank Fusion to leverage diverse signals. Build cascades that progressively filter candidates through increasingly expensive rerankers. Cache aggressively—reranking results are often stable for the same query-document combinations. The key insight is that reranking is a precision tool: initial retrieval optimizes for recall (don't miss relevant documents), while reranking optimizes for precision (put the best documents first). This two-stage architecture lets you balance computational cost with retrieval quality, scaling expensive reranking only to the candidates that matter.