Introduction: Retrieval Augmented Generation (RAG) grounds LLM responses in your actual data, reducing hallucinations and enabling knowledge that wasn’t in the training set. But naive RAG—embed documents, retrieve top-k, stuff into prompt—often disappoints. Retrieval misses relevant documents, context windows overflow, and the model ignores important information buried in long contexts. This guide covers advanced RAG patterns that actually work: query transformation for better retrieval, hybrid search combining dense and sparse methods, reranking to surface the best results, context compression to fit more information, and iterative retrieval for complex questions.

Query Transformation
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class TransformedQuery:
"""A transformed query for retrieval."""
original: str
transformed: str
sub_queries: list[str] = None
hypothetical_answer: str = None
class QueryTransformer:
"""Transform queries for better retrieval."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def expand_query(self, query: str) -> TransformedQuery:
"""Expand query with related terms and concepts."""
prompt = f"""Expand this search query by adding related terms, synonyms, and concepts that would help find relevant documents.
Original query: {query}
Provide an expanded version that includes:
- Synonyms for key terms
- Related concepts
- Alternative phrasings
Expanded query:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return TransformedQuery(
original=query,
transformed=response.choices[0].message.content.strip()
)
async def decompose_query(self, query: str) -> TransformedQuery:
"""Decompose complex query into sub-queries."""
prompt = f"""Break down this complex question into simpler sub-questions that can be answered independently.
Question: {query}
List 2-4 sub-questions that together would answer the main question.
Format as a numbered list."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
# Parse sub-queries
import re
text = response.choices[0].message.content
sub_queries = re.findall(r'\d+\.\s*(.+?)(?=\d+\.|$)', text, re.DOTALL)
sub_queries = [q.strip() for q in sub_queries if q.strip()]
return TransformedQuery(
original=query,
transformed=query,
sub_queries=sub_queries
)
async def generate_hypothetical_answer(self, query: str) -> TransformedQuery:
"""Generate hypothetical answer for HyDE retrieval."""
prompt = f"""Generate a hypothetical answer to this question. The answer should be detailed and contain the kind of information that would be in a document answering this question.
Question: {query}
Hypothetical answer:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
return TransformedQuery(
original=query,
transformed=query,
hypothetical_answer=response.choices[0].message.content.strip()
)
async def rewrite_for_retrieval(self, query: str) -> TransformedQuery:
"""Rewrite conversational query for retrieval."""
prompt = f"""Rewrite this conversational query as a clear, specific search query optimized for document retrieval.
Conversational query: {query}
Search-optimized query:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return TransformedQuery(
original=query,
transformed=response.choices[0].message.content.strip()
)
class MultiQueryRetriever:
"""Retrieve using multiple query variations."""
def __init__(
self,
transformer: QueryTransformer,
retriever: Any,
num_variations: int = 3
):
self.transformer = transformer
self.retriever = retriever
self.num_variations = num_variations
async def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
"""Retrieve using multiple query variations."""
# Generate query variations
variations = [query]
# Add expanded query
expanded = await self.transformer.expand_query(query)
variations.append(expanded.transformed)
# Add hypothetical answer for HyDE
hyde = await self.transformer.generate_hypothetical_answer(query)
if hyde.hypothetical_answer:
variations.append(hyde.hypothetical_answer)
# Retrieve for each variation
all_results = []
seen_ids = set()
for variation in variations[:self.num_variations]:
results = await self.retriever.search(variation, top_k=top_k)
for result in results:
if result["id"] not in seen_ids:
all_results.append(result)
seen_ids.add(result["id"])
return all_results[:top_k * 2] # Return more for reranking
Hybrid Search
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
@dataclass
class SearchResult:
"""A search result with scores."""
id: str
content: str
metadata: dict
dense_score: float = 0.0
sparse_score: float = 0.0
combined_score: float = 0.0
class HybridSearcher:
"""Combine dense and sparse search."""
def __init__(
self,
embedding_client: Any,
vector_store: Any,
sparse_index: Any = None,
alpha: float = 0.5
):
self.embedding_client = embedding_client
self.vector_store = vector_store
self.sparse_index = sparse_index
self.alpha = alpha # Weight for dense vs sparse
async def search(
self,
query: str,
top_k: int = 10,
filter: dict = None
) -> list[SearchResult]:
"""Perform hybrid search."""
# Dense search
dense_results = await self._dense_search(query, top_k * 2, filter)
# Sparse search (BM25)
sparse_results = await self._sparse_search(query, top_k * 2, filter)
# Combine results
combined = self._combine_results(dense_results, sparse_results)
return combined[:top_k]
async def _dense_search(
self,
query: str,
top_k: int,
filter: dict = None
) -> list[SearchResult]:
"""Perform dense vector search."""
# Get query embedding
response = await self.embedding_client.embeddings.create(
model="text-embedding-3-small",
input=query
)
query_embedding = response.data[0].embedding
# Search vector store
results = await self.vector_store.search(
vector=query_embedding,
top_k=top_k,
filter=filter
)
return [
SearchResult(
id=r["id"],
content=r["content"],
metadata=r.get("metadata", {}),
dense_score=r["score"]
)
for r in results
]
async def _sparse_search(
self,
query: str,
top_k: int,
filter: dict = None
) -> list[SearchResult]:
"""Perform sparse BM25 search."""
if not self.sparse_index:
return []
results = self.sparse_index.search(query, top_k=top_k)
return [
SearchResult(
id=r["id"],
content=r["content"],
metadata=r.get("metadata", {}),
sparse_score=r["score"]
)
for r in results
]
def _combine_results(
self,
dense_results: list[SearchResult],
sparse_results: list[SearchResult]
) -> list[SearchResult]:
"""Combine and rerank results using RRF."""
# Reciprocal Rank Fusion
k = 60 # RRF constant
scores = {}
results_map = {}
# Score dense results
for rank, result in enumerate(dense_results):
rrf_score = 1 / (k + rank + 1)
scores[result.id] = scores.get(result.id, 0) + self.alpha * rrf_score
results_map[result.id] = result
# Score sparse results
for rank, result in enumerate(sparse_results):
rrf_score = 1 / (k + rank + 1)
scores[result.id] = scores.get(result.id, 0) + (1 - self.alpha) * rrf_score
if result.id not in results_map:
results_map[result.id] = result
# Sort by combined score
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
combined = []
for doc_id in sorted_ids:
result = results_map[doc_id]
result.combined_score = scores[doc_id]
combined.append(result)
return combined
class BM25Index:
"""Simple BM25 index for sparse search."""
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.documents = []
self.doc_lengths = []
self.avg_doc_length = 0
self.term_frequencies = {}
self.doc_frequencies = {}
self.idf = {}
def add_documents(self, documents: list[dict]) -> None:
"""Add documents to the index."""
import re
for doc in documents:
self.documents.append(doc)
# Tokenize
tokens = re.findall(r'\w+', doc["content"].lower())
self.doc_lengths.append(len(tokens))
# Count term frequencies
tf = {}
for token in tokens:
tf[token] = tf.get(token, 0) + 1
self.term_frequencies[doc["id"]] = tf
# Update document frequencies
for token in set(tokens):
self.doc_frequencies[token] = self.doc_frequencies.get(token, 0) + 1
# Calculate average document length
self.avg_doc_length = sum(self.doc_lengths) / len(self.doc_lengths) if self.doc_lengths else 0
# Calculate IDF
import math
n = len(self.documents)
for term, df in self.doc_frequencies.items():
self.idf[term] = math.log((n - df + 0.5) / (df + 0.5) + 1)
def search(self, query: str, top_k: int = 10) -> list[dict]:
"""Search the index."""
import re
query_tokens = re.findall(r'\w+', query.lower())
scores = []
for i, doc in enumerate(self.documents):
score = 0
tf = self.term_frequencies[doc["id"]]
doc_length = self.doc_lengths[i]
for token in query_tokens:
if token in tf:
freq = tf[token]
idf = self.idf.get(token, 0)
# BM25 formula
numerator = freq * (self.k1 + 1)
denominator = freq + self.k1 * (1 - self.b + self.b * doc_length / self.avg_doc_length)
score += idf * numerator / denominator
scores.append((doc, score))
# Sort by score
scores.sort(key=lambda x: x[1], reverse=True)
return [
{"id": doc["id"], "content": doc["content"], "metadata": doc.get("metadata", {}), "score": score}
for doc, score in scores[:top_k]
]
Reranking
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class RerankResult:
"""A reranked result."""
id: str
content: str
original_rank: int
new_rank: int
relevance_score: float
class CrossEncoderReranker:
"""Rerank using cross-encoder model."""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
results: list[SearchResult],
top_k: int = None
) -> list[RerankResult]:
"""Rerank results using cross-encoder."""
# Create query-document pairs
pairs = [(query, r.content) for r in results]
# Score pairs
scores = self.model.predict(pairs)
# Create reranked results
reranked = []
for i, (result, score) in enumerate(zip(results, scores)):
reranked.append(RerankResult(
id=result.id,
content=result.content,
original_rank=i,
new_rank=0, # Will be set after sorting
relevance_score=float(score)
))
# Sort by relevance score
reranked.sort(key=lambda x: x.relevance_score, reverse=True)
# Update ranks
for i, result in enumerate(reranked):
result.new_rank = i
if top_k:
reranked = reranked[:top_k]
return reranked
class LLMReranker:
"""Rerank using LLM as judge."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def rerank(
self,
query: str,
results: list[SearchResult],
top_k: int = 5
) -> list[RerankResult]:
"""Rerank using LLM scoring."""
# Score each result
scored_results = []
for i, result in enumerate(results):
score = await self._score_relevance(query, result.content)
scored_results.append(RerankResult(
id=result.id,
content=result.content,
original_rank=i,
new_rank=0,
relevance_score=score
))
# Sort by score
scored_results.sort(key=lambda x: x.relevance_score, reverse=True)
# Update ranks
for i, result in enumerate(scored_results):
result.new_rank = i
return scored_results[:top_k]
async def _score_relevance(self, query: str, document: str) -> float:
"""Score document relevance to query."""
prompt = f"""Rate how relevant this document is to the query on a scale of 0-10.
Query: {query}
Document: {document[:1000]}
Respond with only a number from 0-10:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0,
max_tokens=10
)
try:
score = float(response.choices[0].message.content.strip())
return min(max(score, 0), 10) / 10
except:
return 0.5
class CohereReranker:
"""Rerank using Cohere rerank API."""
def __init__(self, client: Any, model: str = "rerank-english-v3.0"):
self.client = client
self.model = model
async def rerank(
self,
query: str,
results: list[SearchResult],
top_k: int = 10
) -> list[RerankResult]:
"""Rerank using Cohere."""
documents = [r.content for r in results]
response = await self.client.rerank(
model=self.model,
query=query,
documents=documents,
top_n=top_k
)
reranked = []
for i, item in enumerate(response.results):
original_idx = item.index
reranked.append(RerankResult(
id=results[original_idx].id,
content=results[original_idx].content,
original_rank=original_idx,
new_rank=i,
relevance_score=item.relevance_score
))
return reranked
Context Compression
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class CompressedContext:
"""Compressed context for LLM."""
original_length: int
compressed_length: int
content: str
sources: list[str]
class ContextCompressor:
"""Compress retrieved context to fit token limits."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def compress(
self,
query: str,
documents: list[str],
max_tokens: int = 2000
) -> CompressedContext:
"""Compress documents to fit token limit."""
original_text = "\n\n".join(documents)
original_length = len(original_text)
# If already fits, return as-is
estimated_tokens = len(original_text) // 4
if estimated_tokens <= max_tokens:
return CompressedContext(
original_length=original_length,
compressed_length=original_length,
content=original_text,
sources=[f"doc_{i}" for i in range(len(documents))]
)
# Extract relevant sentences
compressed = await self._extract_relevant(query, documents, max_tokens)
return CompressedContext(
original_length=original_length,
compressed_length=len(compressed),
content=compressed,
sources=[f"doc_{i}" for i in range(len(documents))]
)
async def _extract_relevant(
self,
query: str,
documents: list[str],
max_tokens: int
) -> str:
"""Extract most relevant sentences."""
prompt = f"""Given this query and documents, extract only the sentences that are directly relevant to answering the query.
Query: {query}
Documents:
{chr(10).join(f"[Doc {i}]: {doc[:500]}" for i, doc in enumerate(documents))}
Extract the most relevant sentences (aim for about {max_tokens // 4} words):"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0,
max_tokens=max_tokens
)
return response.choices[0].message.content
class ChunkSelector:
"""Select most relevant chunks from documents."""
def __init__(self, embedding_client: Any):
self.embedding_client = embedding_client
async def select_chunks(
self,
query: str,
chunks: list[str],
max_chunks: int = 5
) -> list[str]:
"""Select most relevant chunks."""
# Get embeddings
all_texts = [query] + chunks
response = await self.embedding_client.embeddings.create(
model="text-embedding-3-small",
input=all_texts
)
embeddings = [d.embedding for d in response.data]
query_embedding = embeddings[0]
chunk_embeddings = embeddings[1:]
# Calculate similarities
import numpy as np
similarities = []
for i, chunk_emb in enumerate(chunk_embeddings):
sim = np.dot(query_embedding, chunk_emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(chunk_emb)
)
similarities.append((i, sim))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
# Select top chunks
selected_indices = [idx for idx, _ in similarities[:max_chunks]]
selected_indices.sort() # Maintain original order
return [chunks[i] for i in selected_indices]
class ContextBuilder:
"""Build optimized context for RAG."""
def __init__(
self,
compressor: ContextCompressor,
chunk_selector: ChunkSelector,
max_context_tokens: int = 4000
):
self.compressor = compressor
self.chunk_selector = chunk_selector
self.max_context_tokens = max_context_tokens
async def build_context(
self,
query: str,
documents: list[dict]
) -> str:
"""Build optimized context from documents."""
# Extract all chunks
all_chunks = []
for doc in documents:
chunks = self._chunk_document(doc["content"])
all_chunks.extend(chunks)
# Select relevant chunks
selected = await self.chunk_selector.select_chunks(
query,
all_chunks,
max_chunks=10
)
# Compress if needed
compressed = await self.compressor.compress(
query,
selected,
self.max_context_tokens
)
return compressed.content
def _chunk_document(self, text: str, chunk_size: int = 500) -> list[str]:
"""Split document into chunks."""
sentences = text.replace('\n', ' ').split('. ')
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence_length = len(sentence)
if current_length + sentence_length > chunk_size and current_chunk:
chunks.append('. '.join(current_chunk) + '.')
current_chunk = []
current_length = 0
current_chunk.append(sentence)
current_length += sentence_length
if current_chunk:
chunks.append('. '.join(current_chunk) + '.')
return chunks
Production RAG Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
query_transformer = None # Initialize with client
hybrid_searcher = None
reranker = None
context_builder = None
llm_client = None
class RAGRequest(BaseModel):
query: str
top_k: int = 5
use_reranking: bool = True
use_compression: bool = True
class SearchRequest(BaseModel):
query: str
top_k: int = 10
search_type: str = "hybrid"
@app.post("/v1/rag")
async def rag_query(request: RAGRequest):
"""Full RAG pipeline."""
# Transform query
transformed = await query_transformer.rewrite_for_retrieval(request.query)
# Hybrid search
results = await hybrid_searcher.search(
transformed.transformed,
top_k=request.top_k * 2
)
# Rerank if enabled
if request.use_reranking and results:
reranked = await reranker.rerank(
request.query,
results,
top_k=request.top_k
)
documents = [{"content": r.content, "id": r.id} for r in reranked]
else:
documents = [{"content": r.content, "id": r.id} for r in results[:request.top_k]]
# Build context
if request.use_compression:
context = await context_builder.build_context(request.query, documents)
else:
context = "\n\n".join([d["content"] for d in documents])
# Generate response
prompt = f"""Answer the question based on the following context.
Context:
{context}
Question: {request.query}
Answer:"""
response = await llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return {
"answer": response.choices[0].message.content,
"sources": [d["id"] for d in documents],
"query_transformed": transformed.transformed
}
@app.post("/v1/search")
async def search(request: SearchRequest):
"""Search without generation."""
if request.search_type == "hybrid":
results = await hybrid_searcher.search(request.query, request.top_k)
else:
# Dense only
results = await hybrid_searcher._dense_search(request.query, request.top_k, None)
return {
"results": [
{
"id": r.id,
"content": r.content[:500],
"score": r.combined_score or r.dense_score
}
for r in results
]
}
@app.post("/v1/rerank")
async def rerank_results(query: str, documents: list[str], top_k: int = 5):
"""Rerank documents."""
results = [
SearchResult(id=f"doc_{i}", content=doc, metadata={})
for i, doc in enumerate(documents)
]
reranked = await reranker.rerank(query, results, top_k)
return {
"reranked": [
{
"id": r.id,
"content": r.content[:500],
"original_rank": r.original_rank,
"new_rank": r.new_rank,
"score": r.relevance_score
}
for r in reranked
]
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- RAG Paper: https://arxiv.org/abs/2005.11401
- HyDE: https://arxiv.org/abs/2212.10496
- Cohere Rerank: https://docs.cohere.com/docs/reranking
- LlamaIndex: https://docs.llamaindex.ai/
Conclusion
Effective RAG requires attention at every stage of the pipeline. Query transformation improves retrieval by expanding queries, decomposing complex questions, and generating hypothetical answers for better embedding matches. Hybrid search combines the precision of keyword matching with the semantic understanding of dense retrieval—neither alone is sufficient for production systems. Reranking with cross-encoders or LLMs surfaces the most relevant results from a larger candidate set, dramatically improving the quality of context provided to the LLM. Context compression ensures you fit the most important information within token limits, using techniques like relevant sentence extraction and chunk selection. For production systems, implement all these stages with appropriate fallbacks, monitor retrieval quality with metrics like MRR and recall, and continuously improve based on user feedback. The goal is grounded, accurate responses that cite their sources—RAG done right makes LLMs genuinely useful for knowledge-intensive tasks.
