Introduction: Initial retrieval casts a wide net—vector search or keyword matching returns candidates that might be relevant. Reranking narrows the focus, using more expensive but accurate models to score each candidate against the query. Cross-encoders process query-document pairs together, capturing fine-grained semantic relationships that bi-encoders miss. This two-stage approach balances efficiency with accuracy: fast retrieval gets candidates, slow reranking picks the best. This guide covers practical reranking strategies: implementing cross-encoder scoring, choosing between model-based and LLM-based rerankers, handling latency constraints with batching and caching, and combining multiple reranking signals. Whether you’re building search, RAG, or recommendation systems, reranking is often the difference between good and great retrieval quality.

Cross-Encoder Reranking
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
@dataclass
class Document:
"""A document to rerank."""
id: str
content: str
metadata: dict = field(default_factory=dict)
initial_score: float = 0.0
@dataclass
class RankedDocument:
"""A reranked document."""
document: Document
rerank_score: float
rank: int
class Reranker(ABC):
"""Abstract reranker interface."""
@abstractmethod
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank documents for query."""
pass
class CrossEncoderReranker(Reranker):
"""Cross-encoder based reranker."""
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank using cross-encoder."""
if not documents:
return []
# Create query-document pairs
pairs = [(query, doc.content) for doc in documents]
# Score all pairs
scores = self.model.predict(pairs)
# Create ranked results
ranked = []
for doc, score in zip(documents, scores):
ranked.append(RankedDocument(
document=doc,
rerank_score=float(score),
rank=0 # Will be set after sorting
))
# Sort by score
ranked.sort(key=lambda x: x.rerank_score, reverse=True)
# Set ranks
for i, r in enumerate(ranked):
r.rank = i + 1
# Apply top_k
if top_k:
ranked = ranked[:top_k]
return ranked
class ColBERTReranker(Reranker):
"""ColBERT-based reranker with late interaction."""
def __init__(self, model_path: str):
# Would load ColBERT model
self.model_path = model_path
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank using ColBERT late interaction."""
# Encode query tokens
query_embeddings = self._encode_query(query)
ranked = []
for doc in documents:
# Encode document tokens
doc_embeddings = self._encode_document(doc.content)
# MaxSim scoring
score = self._maxsim(query_embeddings, doc_embeddings)
ranked.append(RankedDocument(
document=doc,
rerank_score=score,
rank=0
))
# Sort and rank
ranked.sort(key=lambda x: x.rerank_score, reverse=True)
for i, r in enumerate(ranked):
r.rank = i + 1
if top_k:
ranked = ranked[:top_k]
return ranked
def _encode_query(self, query: str) -> list[list[float]]:
"""Encode query to token embeddings."""
# Would use ColBERT model
return []
def _encode_document(self, content: str) -> list[list[float]]:
"""Encode document to token embeddings."""
# Would use ColBERT model
return []
def _maxsim(
self,
query_emb: list[list[float]],
doc_emb: list[list[float]]
) -> float:
"""Compute MaxSim score."""
import numpy as np
if not query_emb or not doc_emb:
return 0.0
q = np.array(query_emb)
d = np.array(doc_emb)
# Similarity matrix
sim = np.dot(q, d.T)
# Max over document tokens for each query token
max_sim = sim.max(axis=1)
# Sum over query tokens
return float(max_sim.sum())
LLM-Based Reranking
from dataclasses import dataclass
from typing import Any, Optional
import json
class LLMReranker(Reranker):
"""LLM-based reranker."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini"
):
self.client = client
self.model = model
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank using LLM."""
if not documents:
return []
# Format documents for prompt
doc_list = "\n\n".join([
f"[{i+1}] {doc.content[:500]}"
for i, doc in enumerate(documents)
])
prompt = f"""Rank these documents by relevance to the query.
Query: {query}
Documents:
{doc_list}
Return a JSON array of document numbers in order of relevance (most relevant first).
Example: [3, 1, 5, 2, 4]
Only return the JSON array, nothing else."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
# Parse ranking
content = response.choices[0].message.content
import re
json_match = re.search(r'\[[\d,\s]+\]', content)
if not json_match:
# Return original order
return [
RankedDocument(doc, doc.initial_score, i+1)
for i, doc in enumerate(documents)
]
ranking = json.loads(json_match.group(0))
# Create ranked results
ranked = []
for rank, doc_idx in enumerate(ranking):
if 1 <= doc_idx <= len(documents):
doc = documents[doc_idx - 1]
ranked.append(RankedDocument(
document=doc,
rerank_score=len(documents) - rank, # Higher score for better rank
rank=rank + 1
))
if top_k:
ranked = ranked[:top_k]
return ranked
class LLMPointwiseReranker(Reranker):
"""LLM reranker with pointwise scoring."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini"
):
self.client = client
self.model = model
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Score each document independently."""
import asyncio
# Score all documents in parallel
tasks = [
self._score_document(query, doc)
for doc in documents
]
scores = await asyncio.gather(*tasks)
# Create ranked results
ranked = []
for doc, score in zip(documents, scores):
ranked.append(RankedDocument(
document=doc,
rerank_score=score,
rank=0
))
# Sort and rank
ranked.sort(key=lambda x: x.rerank_score, reverse=True)
for i, r in enumerate(ranked):
r.rank = i + 1
if top_k:
ranked = ranked[:top_k]
return ranked
async def _score_document(
self,
query: str,
document: Document
) -> float:
"""Score single document."""
prompt = f"""Rate how relevant this document is to the query on a scale of 0-10.
Query: {query}
Document: {document.content[:1000]}
Return only a number between 0 and 10."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
try:
score = float(response.choices[0].message.content.strip())
return min(10, max(0, score))
except ValueError:
return 5.0 # Default score
class LLMPairwiseReranker(Reranker):
"""LLM reranker with pairwise comparison."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini"
):
self.client = client
self.model = model
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank using pairwise comparisons."""
n = len(documents)
if n <= 1:
return [
RankedDocument(doc, 1.0, i+1)
for i, doc in enumerate(documents)
]
# Win counts for each document
wins = [0] * n
# Compare all pairs
import asyncio
comparisons = []
for i in range(n):
for j in range(i + 1, n):
comparisons.append((i, j))
tasks = [
self._compare_pair(query, documents[i], documents[j])
for i, j in comparisons
]
results = await asyncio.gather(*tasks)
for (i, j), winner in zip(comparisons, results):
if winner == 0:
wins[i] += 1
else:
wins[j] += 1
# Create ranked results
ranked = []
for i, doc in enumerate(documents):
ranked.append(RankedDocument(
document=doc,
rerank_score=wins[i],
rank=0
))
# Sort by wins
ranked.sort(key=lambda x: x.rerank_score, reverse=True)
for i, r in enumerate(ranked):
r.rank = i + 1
if top_k:
ranked = ranked[:top_k]
return ranked
async def _compare_pair(
self,
query: str,
doc_a: Document,
doc_b: Document
) -> int:
"""Compare two documents. Returns 0 if A wins, 1 if B wins."""
prompt = f"""Which document is more relevant to the query?
Query: {query}
Document A: {doc_a.content[:500]}
Document B: {doc_b.content[:500]}
Answer with just 'A' or 'B'."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
answer = response.choices[0].message.content.strip().upper()
return 0 if "A" in answer else 1
Ensemble Reranking
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class FusionMethod(Enum):
"""Score fusion methods."""
WEIGHTED_SUM = "weighted_sum"
RECIPROCAL_RANK = "reciprocal_rank"
BORDA_COUNT = "borda_count"
CONDORCET = "condorcet"
@dataclass
class RerankerWeight:
"""Reranker with weight."""
reranker: Reranker
weight: float = 1.0
name: str = ""
class EnsembleReranker(Reranker):
"""Ensemble of multiple rerankers."""
def __init__(
self,
rerankers: list[RerankerWeight],
fusion_method: FusionMethod = FusionMethod.RECIPROCAL_RANK
):
self.rerankers = rerankers
self.fusion_method = fusion_method
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank using ensemble."""
import asyncio
# Get rankings from all rerankers
tasks = [
rw.reranker.rerank(query, documents)
for rw in self.rerankers
]
all_rankings = await asyncio.gather(*tasks)
# Fuse rankings
if self.fusion_method == FusionMethod.WEIGHTED_SUM:
fused = self._weighted_sum_fusion(all_rankings)
elif self.fusion_method == FusionMethod.RECIPROCAL_RANK:
fused = self._rrf_fusion(all_rankings)
elif self.fusion_method == FusionMethod.BORDA_COUNT:
fused = self._borda_fusion(all_rankings)
else:
fused = self._rrf_fusion(all_rankings)
# Sort and rank
fused.sort(key=lambda x: x.rerank_score, reverse=True)
for i, r in enumerate(fused):
r.rank = i + 1
if top_k:
fused = fused[:top_k]
return fused
def _weighted_sum_fusion(
self,
all_rankings: list[list[RankedDocument]]
) -> list[RankedDocument]:
"""Fuse using weighted sum of scores."""
# Normalize scores to [0, 1]
normalized_rankings = []
for ranking in all_rankings:
if not ranking:
normalized_rankings.append([])
continue
max_score = max(r.rerank_score for r in ranking)
min_score = min(r.rerank_score for r in ranking)
score_range = max_score - min_score or 1
normalized = []
for r in ranking:
norm_score = (r.rerank_score - min_score) / score_range
normalized.append(RankedDocument(
document=r.document,
rerank_score=norm_score,
rank=r.rank
))
normalized_rankings.append(normalized)
# Aggregate scores
doc_scores: dict[str, float] = {}
doc_map: dict[str, Document] = {}
for ranking, rw in zip(normalized_rankings, self.rerankers):
for r in ranking:
doc_id = r.document.id
doc_map[doc_id] = r.document
if doc_id not in doc_scores:
doc_scores[doc_id] = 0.0
doc_scores[doc_id] += r.rerank_score * rw.weight
# Create results
results = []
for doc_id, score in doc_scores.items():
results.append(RankedDocument(
document=doc_map[doc_id],
rerank_score=score,
rank=0
))
return results
def _rrf_fusion(
self,
all_rankings: list[list[RankedDocument]],
k: int = 60
) -> list[RankedDocument]:
"""Reciprocal Rank Fusion."""
doc_scores: dict[str, float] = {}
doc_map: dict[str, Document] = {}
for ranking, rw in zip(all_rankings, self.rerankers):
for r in ranking:
doc_id = r.document.id
doc_map[doc_id] = r.document
if doc_id not in doc_scores:
doc_scores[doc_id] = 0.0
# RRF formula: 1 / (k + rank)
doc_scores[doc_id] += rw.weight / (k + r.rank)
results = []
for doc_id, score in doc_scores.items():
results.append(RankedDocument(
document=doc_map[doc_id],
rerank_score=score,
rank=0
))
return results
def _borda_fusion(
self,
all_rankings: list[list[RankedDocument]]
) -> list[RankedDocument]:
"""Borda count fusion."""
doc_scores: dict[str, float] = {}
doc_map: dict[str, Document] = {}
for ranking, rw in zip(all_rankings, self.rerankers):
n = len(ranking)
for r in ranking:
doc_id = r.document.id
doc_map[doc_id] = r.document
if doc_id not in doc_scores:
doc_scores[doc_id] = 0.0
# Borda: n - rank + 1 points
doc_scores[doc_id] += (n - r.rank + 1) * rw.weight
results = []
for doc_id, score in doc_scores.items():
results.append(RankedDocument(
document=doc_map[doc_id],
rerank_score=score,
rank=0
))
return results
class CascadeReranker(Reranker):
"""Cascade of rerankers with progressive filtering."""
def __init__(self, stages: list[tuple[Reranker, int]]):
"""
stages: list of (reranker, top_k) tuples
Each stage reranks and keeps top_k for next stage
"""
self.stages = stages
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank through cascade."""
current_docs = documents
for reranker, stage_top_k in self.stages:
ranked = await reranker.rerank(query, current_docs, stage_top_k)
current_docs = [r.document for r in ranked]
# Final ranking
final_ranked = []
for i, doc in enumerate(current_docs):
final_ranked.append(RankedDocument(
document=doc,
rerank_score=len(current_docs) - i,
rank=i + 1
))
if top_k:
final_ranked = final_ranked[:top_k]
return final_ranked
Caching and Optimization
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime, timedelta
import hashlib
import json
@dataclass
class CacheEntry:
"""Cache entry for reranking results."""
query: str
doc_ids: list[str]
ranked_ids: list[str]
scores: list[float]
created_at: datetime
ttl: timedelta
class RerankerCache:
"""Cache for reranking results."""
def __init__(self, ttl_seconds: int = 3600):
self._cache: dict[str, CacheEntry] = {}
self.ttl = timedelta(seconds=ttl_seconds)
def _make_key(self, query: str, doc_ids: list[str]) -> str:
"""Create cache key."""
content = json.dumps({
"query": query,
"doc_ids": sorted(doc_ids)
})
return hashlib.sha256(content.encode()).hexdigest()
def get(
self,
query: str,
doc_ids: list[str]
) -> Optional[list[tuple[str, float]]]:
"""Get cached ranking."""
key = self._make_key(query, doc_ids)
entry = self._cache.get(key)
if not entry:
return None
# Check expiry
if datetime.utcnow() > entry.created_at + entry.ttl:
del self._cache[key]
return None
return list(zip(entry.ranked_ids, entry.scores))
def set(
self,
query: str,
doc_ids: list[str],
ranked: list[RankedDocument]
):
"""Cache ranking."""
key = self._make_key(query, doc_ids)
self._cache[key] = CacheEntry(
query=query,
doc_ids=doc_ids,
ranked_ids=[r.document.id for r in ranked],
scores=[r.rerank_score for r in ranked],
created_at=datetime.utcnow(),
ttl=self.ttl
)
def clear(self):
"""Clear cache."""
self._cache.clear()
class CachedReranker(Reranker):
"""Reranker with caching."""
def __init__(
self,
reranker: Reranker,
cache: RerankerCache = None
):
self.reranker = reranker
self.cache = cache or RerankerCache()
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank with caching."""
doc_ids = [d.id for d in documents]
# Check cache
cached = self.cache.get(query, doc_ids)
if cached:
# Reconstruct from cache
id_to_doc = {d.id: d for d in documents}
ranked = []
for i, (doc_id, score) in enumerate(cached):
if doc_id in id_to_doc:
ranked.append(RankedDocument(
document=id_to_doc[doc_id],
rerank_score=score,
rank=i + 1
))
if top_k:
ranked = ranked[:top_k]
return ranked
# Compute ranking
ranked = await self.reranker.rerank(query, documents, top_k)
# Cache result
self.cache.set(query, doc_ids, ranked)
return ranked
class BatchReranker(Reranker):
"""Batch reranking for efficiency."""
def __init__(
self,
reranker: Reranker,
batch_size: int = 32
):
self.reranker = reranker
self.batch_size = batch_size
async def rerank(
self,
query: str,
documents: list[Document],
top_k: int = None
) -> list[RankedDocument]:
"""Rerank in batches."""
if len(documents) <= self.batch_size:
return await self.reranker.rerank(query, documents, top_k)
# Process in batches
all_ranked = []
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
ranked = await self.reranker.rerank(query, batch)
all_ranked.extend(ranked)
# Re-sort all results
all_ranked.sort(key=lambda x: x.rerank_score, reverse=True)
for i, r in enumerate(all_ranked):
r.rank = i + 1
if top_k:
all_ranked = all_ranked[:top_k]
return all_ranked
Production Reranking Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from enum import Enum
app = FastAPI()
# Initialize rerankers
cross_encoder = CrossEncoderReranker()
cache = RerankerCache(ttl_seconds=3600)
cached_reranker = CachedReranker(cross_encoder, cache)
class RerankerType(str, Enum):
CROSS_ENCODER = "cross_encoder"
LLM = "llm"
ENSEMBLE = "ensemble"
class DocumentInput(BaseModel):
id: str
content: str
metadata: Optional[dict] = None
initial_score: Optional[float] = 0.0
class RerankRequest(BaseModel):
query: str
documents: list[DocumentInput]
top_k: Optional[int] = None
reranker: RerankerType = RerankerType.CROSS_ENCODER
use_cache: bool = True
class RankedDocumentOutput(BaseModel):
id: str
content: str
rerank_score: float
rank: int
metadata: Optional[dict] = None
class RerankResponse(BaseModel):
query: str
results: list[RankedDocumentOutput]
reranker_used: str
cached: bool
@app.post("/v1/rerank")
async def rerank_documents(request: RerankRequest) -> RerankResponse:
"""Rerank documents for query."""
# Convert to internal format
documents = [
Document(
id=d.id,
content=d.content,
metadata=d.metadata or {},
initial_score=d.initial_score or 0.0
)
for d in request.documents
]
# Select reranker
if request.reranker == RerankerType.CROSS_ENCODER:
reranker = cached_reranker if request.use_cache else cross_encoder
elif request.reranker == RerankerType.LLM:
# Would initialize LLM reranker
reranker = cross_encoder
else:
reranker = cross_encoder
# Check if cached
cached = False
if request.use_cache:
doc_ids = [d.id for d in documents]
cached_result = cache.get(request.query, doc_ids)
cached = cached_result is not None
# Rerank
ranked = await reranker.rerank(
request.query,
documents,
request.top_k
)
# Format response
results = [
RankedDocumentOutput(
id=r.document.id,
content=r.document.content,
rerank_score=r.rerank_score,
rank=r.rank,
metadata=r.document.metadata
)
for r in ranked
]
return RerankResponse(
query=request.query,
results=results,
reranker_used=request.reranker.value,
cached=cached
)
class CompareRequest(BaseModel):
query: str
document_a: DocumentInput
document_b: DocumentInput
class CompareResponse(BaseModel):
query: str
winner: str
winner_id: str
confidence: float
@app.post("/v1/compare")
async def compare_documents(request: CompareRequest) -> CompareResponse:
"""Compare two documents for relevance."""
doc_a = Document(
id=request.document_a.id,
content=request.document_a.content
)
doc_b = Document(
id=request.document_b.id,
content=request.document_b.content
)
# Rerank both
ranked = await cross_encoder.rerank(
request.query,
[doc_a, doc_b]
)
winner = ranked[0]
loser = ranked[1]
# Confidence based on score difference
score_diff = winner.rerank_score - loser.rerank_score
confidence = min(1.0, score_diff / 10.0) # Normalize
return CompareResponse(
query=request.query,
winner="A" if winner.document.id == doc_a.id else "B",
winner_id=winner.document.id,
confidence=confidence
)
class ScoreRequest(BaseModel):
query: str
document: DocumentInput
class ScoreResponse(BaseModel):
query: str
document_id: str
relevance_score: float
@app.post("/v1/score")
async def score_document(request: ScoreRequest) -> ScoreResponse:
"""Score single document relevance."""
doc = Document(
id=request.document.id,
content=request.document.content
)
ranked = await cross_encoder.rerank(request.query, [doc])
return ScoreResponse(
query=request.query,
document_id=doc.id,
relevance_score=ranked[0].rerank_score if ranked else 0.0
)
@app.delete("/v1/cache")
async def clear_cache():
"""Clear reranking cache."""
cache.clear()
return {"status": "cleared"}
@app.get("/v1/cache/stats")
async def cache_stats():
"""Get cache statistics."""
return {
"entries": len(cache._cache),
"ttl_seconds": cache.ttl.total_seconds()
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- Cross-Encoders: https://www.sbert.net/docs/pretrained_cross-encoders.html
- ColBERT: https://github.com/stanford-futuredata/ColBERT
- Cohere Rerank: https://docs.cohere.com/docs/rerank
- Reciprocal Rank Fusion: https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf
Conclusion
Reranking transforms good retrieval into great retrieval. Start with cross-encoders—they process query-document pairs together, capturing semantic relationships that bi-encoders miss during initial retrieval. Choose models based on your latency budget: smaller cross-encoders for real-time applications, larger models or LLM-based rerankers when accuracy matters more than speed. LLM rerankers offer flexibility through listwise, pointwise, or pairwise approaches, each with different cost-accuracy tradeoffs. Combine multiple rerankers using ensemble methods like Reciprocal Rank Fusion to leverage diverse signals. Build cascades that progressively filter candidates through increasingly expensive rerankers. Cache aggressively—reranking results are often stable for the same query-document combinations. The key insight is that reranking is a precision tool: initial retrieval optimizes for recall (don't miss relevant documents), while reranking optimizes for precision (put the best documents first). This two-stage architecture lets you balance computational cost with retrieval quality, scaling expensive reranking only to the candidates that matter.
