Introduction: Retrieval is the foundation of RAG systems. Poor retrieval means irrelevant context, which leads to hallucinations and wrong answers regardless of how capable your LLM is. Yet many RAG implementations use naive approaches—single-stage vector search with default settings. This guide covers advanced retrieval strategies: query transformation techniques, hybrid search combining dense and sparse methods, multi-stage retrieval pipelines, and reranking approaches that dramatically improve the relevance of retrieved documents.

Query Transformation
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class TransformedQuery:
"""A transformed query with metadata."""
original: str
transformed: str
method: str
metadata: dict = None
class QueryExpander:
"""Expand queries for better retrieval."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def expand_with_synonyms(self, query: str) -> TransformedQuery:
"""Expand query with synonyms and related terms."""
prompt = f"""Expand this search query with synonyms and related terms.
Keep the original meaning but add variations that might match relevant documents.
Query: {query}
Return the expanded query (original + expansions) on a single line."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=200
)
expanded = response.choices[0].message.content.strip()
return TransformedQuery(
original=query,
transformed=expanded,
method="synonym_expansion"
)
async def generate_subqueries(self, query: str) -> list[TransformedQuery]:
"""Break complex query into subqueries."""
prompt = f"""Break this complex query into simpler subqueries.
Each subquery should capture one aspect of the original question.
Query: {query}
Return each subquery on a separate line (2-4 subqueries)."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=300
)
subqueries = []
for line in response.choices[0].message.content.strip().split('\n'):
line = line.strip()
if line and not line.startswith('-'):
line = line.lstrip('0123456789.-) ')
if line:
subqueries.append(TransformedQuery(
original=query,
transformed=line,
method="subquery"
))
return subqueries
async def hypothetical_document(self, query: str) -> TransformedQuery:
"""Generate hypothetical document that would answer the query (HyDE)."""
prompt = f"""Write a short paragraph that would be the perfect answer to this question.
Write as if you're the document that contains the answer.
Question: {query}
Answer paragraph:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=300
)
hypothetical = response.choices[0].message.content.strip()
return TransformedQuery(
original=query,
transformed=hypothetical,
method="hyde"
)
class QueryRewriter:
"""Rewrite queries for better matching."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def rewrite_for_retrieval(self, query: str) -> TransformedQuery:
"""Rewrite query to be more retrieval-friendly."""
prompt = f"""Rewrite this query to be better for document retrieval.
- Remove conversational elements
- Focus on key concepts and entities
- Use terms likely to appear in documents
Original: {query}
Rewritten query:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=100
)
rewritten = response.choices[0].message.content.strip()
return TransformedQuery(
original=query,
transformed=rewritten,
method="retrieval_rewrite"
)
async def step_back(self, query: str) -> TransformedQuery:
"""Generate a more general 'step-back' query."""
prompt = f"""Generate a more general question that would help answer this specific question.
The general question should retrieve background information useful for the specific question.
Specific question: {query}
General question:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=100
)
general = response.choices[0].message.content.strip()
return TransformedQuery(
original=query,
transformed=general,
method="step_back"
)
Hybrid Search
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
from collections import Counter
import math
@dataclass
class SearchResult:
"""A search result with score."""
doc_id: str
content: str
score: float
metadata: dict = None
class DenseRetriever:
"""Dense vector retrieval."""
def __init__(
self,
embedding_client: Any,
index: Any, # FAISS, Pinecone, etc.
embedding_model: str = "text-embedding-3-small"
):
self.embedding_client = embedding_client
self.index = index
self.embedding_model = embedding_model
async def search(
self,
query: str,
top_k: int = 10
) -> list[SearchResult]:
"""Search using dense embeddings."""
# Get query embedding
response = await self.embedding_client.embeddings.create(
model=self.embedding_model,
input=query
)
query_embedding = response.data[0].embedding
# Search index
results = self.index.search(query_embedding, top_k)
return [
SearchResult(
doc_id=r["id"],
content=r["content"],
score=r["score"],
metadata=r.get("metadata", {})
)
for r in results
]
class SparseRetriever:
"""Sparse keyword retrieval using BM25."""
def __init__(self, documents: list[dict]):
self.documents = documents
self.doc_freqs = Counter()
self.doc_lengths = []
self.avg_doc_length = 0
self.k1 = 1.5
self.b = 0.75
self._build_index()
def _build_index(self):
"""Build BM25 index."""
total_length = 0
for doc in self.documents:
tokens = self._tokenize(doc["content"])
self.doc_lengths.append(len(tokens))
total_length += len(tokens)
unique_tokens = set(tokens)
for token in unique_tokens:
self.doc_freqs[token] += 1
self.avg_doc_length = total_length / len(self.documents) if self.documents else 0
def _tokenize(self, text: str) -> list[str]:
"""Simple tokenization."""
return text.lower().split()
def _bm25_score(
self,
query_tokens: list[str],
doc_tokens: list[str],
doc_length: int
) -> float:
"""Calculate BM25 score."""
score = 0.0
doc_token_counts = Counter(doc_tokens)
n_docs = len(self.documents)
for token in query_tokens:
if token not in doc_token_counts:
continue
tf = doc_token_counts[token]
df = self.doc_freqs.get(token, 0)
idf = math.log((n_docs - df + 0.5) / (df + 0.5) + 1)
tf_component = (tf * (self.k1 + 1)) / (
tf + self.k1 * (1 - self.b + self.b * doc_length / self.avg_doc_length)
)
score += idf * tf_component
return score
def search(self, query: str, top_k: int = 10) -> list[SearchResult]:
"""Search using BM25."""
query_tokens = self._tokenize(query)
scores = []
for i, doc in enumerate(self.documents):
doc_tokens = self._tokenize(doc["content"])
score = self._bm25_score(query_tokens, doc_tokens, self.doc_lengths[i])
scores.append((i, score))
scores.sort(key=lambda x: x[1], reverse=True)
results = []
for i, score in scores[:top_k]:
doc = self.documents[i]
results.append(SearchResult(
doc_id=doc.get("id", str(i)),
content=doc["content"],
score=score,
metadata=doc.get("metadata", {})
))
return results
class HybridRetriever:
"""Combine dense and sparse retrieval."""
def __init__(
self,
dense_retriever: DenseRetriever,
sparse_retriever: SparseRetriever,
dense_weight: float = 0.5
):
self.dense_retriever = dense_retriever
self.sparse_retriever = sparse_retriever
self.dense_weight = dense_weight
self.sparse_weight = 1 - dense_weight
async def search(
self,
query: str,
top_k: int = 10
) -> list[SearchResult]:
"""Hybrid search with score fusion."""
# Get results from both retrievers
dense_results = await self.dense_retriever.search(query, top_k * 2)
sparse_results = self.sparse_retriever.search(query, top_k * 2)
# Normalize scores
dense_results = self._normalize_scores(dense_results)
sparse_results = self._normalize_scores(sparse_results)
# Combine using weighted sum
combined = {}
for result in dense_results:
combined[result.doc_id] = {
"content": result.content,
"metadata": result.metadata,
"dense_score": result.score * self.dense_weight,
"sparse_score": 0
}
for result in sparse_results:
if result.doc_id in combined:
combined[result.doc_id]["sparse_score"] = result.score * self.sparse_weight
else:
combined[result.doc_id] = {
"content": result.content,
"metadata": result.metadata,
"dense_score": 0,
"sparse_score": result.score * self.sparse_weight
}
# Calculate final scores
final_results = []
for doc_id, data in combined.items():
final_score = data["dense_score"] + data["sparse_score"]
final_results.append(SearchResult(
doc_id=doc_id,
content=data["content"],
score=final_score,
metadata=data["metadata"]
))
# Sort and return top_k
final_results.sort(key=lambda x: x.score, reverse=True)
return final_results[:top_k]
def _normalize_scores(self, results: list[SearchResult]) -> list[SearchResult]:
"""Normalize scores to 0-1 range."""
if not results:
return results
scores = [r.score for r in results]
min_score = min(scores)
max_score = max(scores)
if max_score == min_score:
return results
for result in results:
result.score = (result.score - min_score) / (max_score - min_score)
return results
Multi-Stage Retrieval
from dataclasses import dataclass
from typing import Any, Callable
@dataclass
class RetrievalStage:
"""A stage in the retrieval pipeline."""
name: str
retriever: Any
top_k: int
filter_fn: Callable[[SearchResult], bool] = None
class MultiStageRetriever:
"""Multi-stage retrieval pipeline."""
def __init__(self):
self.stages: list[RetrievalStage] = []
def add_stage(
self,
name: str,
retriever: Any,
top_k: int,
filter_fn: Callable[[SearchResult], bool] = None
):
"""Add a retrieval stage."""
self.stages.append(RetrievalStage(
name=name,
retriever=retriever,
top_k=top_k,
filter_fn=filter_fn
))
async def search(self, query: str) -> list[SearchResult]:
"""Execute multi-stage retrieval."""
results = None
for stage in self.stages:
if results is None:
# First stage: retrieve from full corpus
if hasattr(stage.retriever, 'search'):
if asyncio.iscoroutinefunction(stage.retriever.search):
results = await stage.retriever.search(query, stage.top_k)
else:
results = stage.retriever.search(query, stage.top_k)
else:
# Subsequent stages: re-rank or filter
if hasattr(stage.retriever, 'rerank'):
results = await stage.retriever.rerank(query, results, stage.top_k)
else:
# Filter and re-score
results = results[:stage.top_k]
# Apply filter if provided
if stage.filter_fn:
results = [r for r in results if stage.filter_fn(r)]
return results
class ParentDocumentRetriever:
"""Retrieve child chunks, return parent documents."""
def __init__(
self,
chunk_retriever: Any,
parent_store: dict[str, str] # chunk_id -> parent_id
):
self.chunk_retriever = chunk_retriever
self.parent_store = parent_store
self.parent_documents: dict[str, str] = {}
async def search(
self,
query: str,
top_k: int = 5
) -> list[SearchResult]:
"""Search chunks, return parent documents."""
# Retrieve chunks
chunk_results = await self.chunk_retriever.search(query, top_k * 3)
# Map to parents and deduplicate
parent_scores: dict[str, float] = {}
for result in chunk_results:
parent_id = self.parent_store.get(result.doc_id)
if parent_id:
if parent_id not in parent_scores:
parent_scores[parent_id] = result.score
else:
# Take max score among chunks
parent_scores[parent_id] = max(parent_scores[parent_id], result.score)
# Build parent results
results = []
for parent_id, score in sorted(parent_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]:
if parent_id in self.parent_documents:
results.append(SearchResult(
doc_id=parent_id,
content=self.parent_documents[parent_id],
score=score
))
return results
class ContextualRetriever:
"""Add surrounding context to retrieved chunks."""
def __init__(
self,
base_retriever: Any,
document_store: dict[str, list[str]], # doc_id -> list of chunks
context_window: int = 1 # chunks before and after
):
self.base_retriever = base_retriever
self.document_store = document_store
self.context_window = context_window
async def search(
self,
query: str,
top_k: int = 5
) -> list[SearchResult]:
"""Search and expand with context."""
results = await self.base_retriever.search(query, top_k)
expanded_results = []
for result in results:
expanded_content = self._expand_context(result)
expanded_results.append(SearchResult(
doc_id=result.doc_id,
content=expanded_content,
score=result.score,
metadata=result.metadata
))
return expanded_results
def _expand_context(self, result: SearchResult) -> str:
"""Expand chunk with surrounding context."""
# Parse doc_id to get document and chunk index
# Assuming format: "doc_123_chunk_5"
parts = result.doc_id.split("_chunk_")
if len(parts) != 2:
return result.content
doc_id = parts[0]
chunk_idx = int(parts[1])
if doc_id not in self.document_store:
return result.content
chunks = self.document_store[doc_id]
# Get context window
start = max(0, chunk_idx - self.context_window)
end = min(len(chunks), chunk_idx + self.context_window + 1)
return "\n\n".join(chunks[start:end])
Reranking
from dataclasses import dataclass
from typing import Any
class CrossEncoderReranker:
"""Rerank using cross-encoder model."""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
results: list[SearchResult],
top_k: int = 5
) -> list[SearchResult]:
"""Rerank results using cross-encoder."""
# Create query-document pairs
pairs = [(query, r.content) for r in results]
# Score pairs
scores = self.model.predict(pairs)
# Update scores and sort
for i, result in enumerate(results):
result.score = float(scores[i])
results.sort(key=lambda x: x.score, reverse=True)
return results[:top_k]
class CohereReranker:
"""Rerank using Cohere Rerank API."""
def __init__(self, api_key: str):
import cohere
self.client = cohere.Client(api_key)
def rerank(
self,
query: str,
results: list[SearchResult],
top_k: int = 5
) -> list[SearchResult]:
"""Rerank using Cohere."""
documents = [r.content for r in results]
response = self.client.rerank(
query=query,
documents=documents,
top_n=top_k,
model="rerank-english-v2.0"
)
reranked = []
for item in response.results:
original = results[item.index]
reranked.append(SearchResult(
doc_id=original.doc_id,
content=original.content,
score=item.relevance_score,
metadata=original.metadata
))
return reranked
class LLMReranker:
"""Rerank using LLM scoring."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def rerank(
self,
query: str,
results: list[SearchResult],
top_k: int = 5
) -> list[SearchResult]:
"""Rerank using LLM relevance scoring."""
scored_results = []
for result in results:
score = await self._score_relevance(query, result.content)
scored_results.append(SearchResult(
doc_id=result.doc_id,
content=result.content,
score=score,
metadata=result.metadata
))
scored_results.sort(key=lambda x: x.score, reverse=True)
return scored_results[:top_k]
async def _score_relevance(self, query: str, document: str) -> float:
"""Score document relevance to query."""
prompt = f"""Rate how relevant this document is to the query on a scale of 0-10.
Only respond with a number.
Query: {query}
Document: {document[:1000]}
Relevance score (0-10):"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=5
)
try:
score = float(response.choices[0].message.content.strip())
return score / 10 # Normalize to 0-1
except ValueError:
return 0.5
class RRFReranker:
"""Reciprocal Rank Fusion for combining multiple result lists."""
def __init__(self, k: int = 60):
self.k = k
def fuse(
self,
result_lists: list[list[SearchResult]],
top_k: int = 10
) -> list[SearchResult]:
"""Fuse multiple result lists using RRF."""
doc_scores: dict[str, float] = {}
doc_data: dict[str, SearchResult] = {}
for results in result_lists:
for rank, result in enumerate(results):
rrf_score = 1 / (self.k + rank + 1)
if result.doc_id not in doc_scores:
doc_scores[result.doc_id] = 0
doc_data[result.doc_id] = result
doc_scores[result.doc_id] += rrf_score
# Sort by fused score
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
results = []
for doc_id, score in sorted_docs[:top_k]:
original = doc_data[doc_id]
results.append(SearchResult(
doc_id=doc_id,
content=original.content,
score=score,
metadata=original.metadata
))
return results
Production Retrieval Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
query_expander = None # Initialize with actual client
hybrid_retriever = None
multi_stage_retriever = None
reranker = None
class SearchRequest(BaseModel):
query: str
top_k: int = 10
use_expansion: bool = False
use_reranking: bool = True
class HybridSearchRequest(BaseModel):
query: str
top_k: int = 10
dense_weight: float = 0.5
class MultiQueryRequest(BaseModel):
query: str
top_k: int = 10
strategy: str = "subqueries" # subqueries, hyde, expansion
@app.post("/v1/search")
async def search(request: SearchRequest):
"""Basic search with optional expansion and reranking."""
query = request.query
# Query expansion
if request.use_expansion and query_expander:
expanded = await query_expander.expand_with_synonyms(query)
query = expanded.transformed
# Search
results = await hybrid_retriever.search(query, request.top_k * 2)
# Reranking
if request.use_reranking and reranker:
results = await reranker.rerank(request.query, results, request.top_k)
else:
results = results[:request.top_k]
return {
"query": request.query,
"results": [
{
"doc_id": r.doc_id,
"content": r.content[:500],
"score": r.score
}
for r in results
]
}
@app.post("/v1/search/hybrid")
async def hybrid_search(request: HybridSearchRequest):
"""Hybrid dense + sparse search."""
# Temporarily adjust weights
original_weight = hybrid_retriever.dense_weight
hybrid_retriever.dense_weight = request.dense_weight
hybrid_retriever.sparse_weight = 1 - request.dense_weight
try:
results = await hybrid_retriever.search(request.query, request.top_k)
finally:
hybrid_retriever.dense_weight = original_weight
hybrid_retriever.sparse_weight = 1 - original_weight
return {
"query": request.query,
"dense_weight": request.dense_weight,
"results": [
{
"doc_id": r.doc_id,
"content": r.content[:500],
"score": r.score
}
for r in results
]
}
@app.post("/v1/search/multi-query")
async def multi_query_search(request: MultiQueryRequest):
"""Search with multiple query variations."""
queries = [request.query]
if request.strategy == "subqueries":
subqueries = await query_expander.generate_subqueries(request.query)
queries.extend([sq.transformed for sq in subqueries])
elif request.strategy == "hyde":
hyde = await query_expander.hypothetical_document(request.query)
queries.append(hyde.transformed)
elif request.strategy == "expansion":
expanded = await query_expander.expand_with_synonyms(request.query)
queries.append(expanded.transformed)
# Search with each query
all_results = []
for q in queries:
results = await hybrid_retriever.search(q, request.top_k)
all_results.append(results)
# Fuse results using RRF
rrf = RRFReranker()
fused = rrf.fuse(all_results, request.top_k)
return {
"original_query": request.query,
"queries_used": queries,
"results": [
{
"doc_id": r.doc_id,
"content": r.content[:500],
"score": r.score
}
for r in fused
]
}
@app.post("/v1/search/multi-stage")
async def multi_stage_search(request: SearchRequest):
"""Multi-stage retrieval pipeline."""
results = await multi_stage_retriever.search(request.query)
return {
"query": request.query,
"results": [
{
"doc_id": r.doc_id,
"content": r.content[:500],
"score": r.score
}
for r in results[:request.top_k]
]
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LlamaIndex Retrieval: https://docs.llamaindex.ai/en/stable/module_guides/querying/retriever/
- LangChain Retrievers: https://python.langchain.com/docs/modules/data_connection/retrievers/
- Cohere Rerank: https://docs.cohere.com/docs/reranking
- HyDE Paper: https://arxiv.org/abs/2212.10496
Conclusion
Advanced retrieval strategies dramatically improve RAG system quality. Query transformation—expansion, subqueries, HyDE—helps bridge the vocabulary gap between user queries and document content. Hybrid search combining dense embeddings with sparse BM25 captures both semantic similarity and keyword matches. Multi-stage pipelines let you cast a wide net initially, then progressively filter and refine. Reranking with cross-encoders or LLMs provides a final quality boost by considering query-document pairs jointly. Parent document retrieval and contextual expansion ensure you return enough context for the LLM to generate accurate answers. Reciprocal Rank Fusion elegantly combines results from multiple retrieval strategies. The key is building pipelines that are robust to query variations while maintaining low latency. Start simple, measure retrieval quality, and add complexity only where it demonstrably improves results.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.