Introduction: Retrieval Augmented Generation (RAG) has become the standard pattern for grounding LLM responses in factual, up-to-date information. But basic RAG—retrieve chunks, stuff into prompt, generate—often falls short in production. Queries get misunderstood, irrelevant chunks pollute context, and answers lack coherence. This guide covers advanced RAG patterns that address these challenges: query transformation to improve retrieval, multi-stage retrieval with reranking, iterative refinement for complex questions, and hybrid approaches that combine dense and sparse retrieval. Whether you’re building a document Q&A system, knowledge base assistant, or research tool, these patterns will help you build RAG systems that actually work.

Query Transformation
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
@dataclass
class TransformedQuery:
"""Transformed query result."""
original: str
transformed: list[str]
metadata: dict = field(default_factory=dict)
class QueryTransformer(ABC):
"""Abstract query transformer."""
@abstractmethod
async def transform(self, query: str) -> TransformedQuery:
"""Transform query for better retrieval."""
pass
class QueryExpander(QueryTransformer):
"""Expand query with synonyms and related terms."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def transform(self, query: str) -> TransformedQuery:
"""Expand query using LLM."""
prompt = f"""Generate 3 alternative phrasings of this query that might help find relevant documents.
Keep the same meaning but use different words and perspectives.
Original query: {query}
Alternative queries (one per line):"""
response = await self.llm.complete(prompt)
alternatives = [
line.strip().lstrip("0123456789.-) ")
for line in response.content.strip().split("\n")
if line.strip()
][:3]
return TransformedQuery(
original=query,
transformed=[query] + alternatives
)
class HypotheticalDocumentGenerator(QueryTransformer):
"""Generate hypothetical document (HyDE)."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def transform(self, query: str) -> TransformedQuery:
"""Generate hypothetical answer document."""
prompt = f"""Write a short paragraph that would be a perfect answer to this question.
Write as if you're an expert providing factual information.
Question: {query}
Answer paragraph:"""
response = await self.llm.complete(prompt)
return TransformedQuery(
original=query,
transformed=[response.content.strip()],
metadata={"type": "hyde"}
)
class QueryDecomposer(QueryTransformer):
"""Decompose complex queries into sub-queries."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def transform(self, query: str) -> TransformedQuery:
"""Decompose into sub-queries."""
prompt = f"""Break down this complex question into simpler sub-questions that can be answered independently.
If the question is already simple, just return it as-is.
Question: {query}
Sub-questions (one per line):"""
response = await self.llm.complete(prompt)
sub_queries = [
line.strip().lstrip("0123456789.-) ")
for line in response.content.strip().split("\n")
if line.strip()
]
return TransformedQuery(
original=query,
transformed=sub_queries if sub_queries else [query],
metadata={"type": "decomposed"}
)
class StepBackTransformer(QueryTransformer):
"""Generate step-back question for broader context."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def transform(self, query: str) -> TransformedQuery:
"""Generate step-back question."""
prompt = f"""Given this specific question, generate a more general "step-back" question that would provide useful background context.
Specific question: {query}
Step-back question:"""
response = await self.llm.complete(prompt)
step_back = response.content.strip()
return TransformedQuery(
original=query,
transformed=[step_back, query],
metadata={"type": "step_back", "step_back_query": step_back}
)
class MultiQueryTransformer(QueryTransformer):
"""Combine multiple transformation strategies."""
def __init__(
self,
transformers: list[QueryTransformer],
deduplicate: bool = True
):
self.transformers = transformers
self.deduplicate = deduplicate
async def transform(self, query: str) -> TransformedQuery:
"""Apply all transformers."""
import asyncio
results = await asyncio.gather(*[
t.transform(query) for t in self.transformers
])
all_queries = [query]
for result in results:
all_queries.extend(result.transformed)
if self.deduplicate:
seen = set()
unique = []
for q in all_queries:
q_lower = q.lower().strip()
if q_lower not in seen:
seen.add(q_lower)
unique.append(q)
all_queries = unique
return TransformedQuery(
original=query,
transformed=all_queries
)
Retrieval Strategies
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
import numpy as np
@dataclass
class RetrievedDocument:
"""Retrieved document."""
id: str
content: str
score: float
metadata: dict = field(default_factory=dict)
class Retriever(ABC):
"""Abstract retriever."""
@abstractmethod
async def retrieve(
self,
query: str,
k: int = 10
) -> list[RetrievedDocument]:
"""Retrieve relevant documents."""
pass
class DenseRetriever(Retriever):
"""Dense vector retrieval."""
def __init__(
self,
embedding_model: Any,
vector_store: Any
):
self.embedder = embedding_model
self.store = vector_store
async def retrieve(
self,
query: str,
k: int = 10
) -> list[RetrievedDocument]:
"""Retrieve using dense embeddings."""
query_embedding = await self.embedder.embed(query)
results = await self.store.search(query_embedding, k)
return [
RetrievedDocument(
id=r["id"],
content=r["content"],
score=r["score"],
metadata=r.get("metadata", {})
)
for r in results
]
class SparseRetriever(Retriever):
"""Sparse keyword retrieval (BM25)."""
def __init__(self, index: Any):
self.index = index
async def retrieve(
self,
query: str,
k: int = 10
) -> list[RetrievedDocument]:
"""Retrieve using BM25."""
results = self.index.search(query, k)
return [
RetrievedDocument(
id=r["id"],
content=r["content"],
score=r["score"],
metadata=r.get("metadata", {})
)
for r in results
]
class HybridRetriever(Retriever):
"""Combine dense and sparse retrieval."""
def __init__(
self,
dense_retriever: DenseRetriever,
sparse_retriever: SparseRetriever,
alpha: float = 0.5 # Weight for dense
):
self.dense = dense_retriever
self.sparse = sparse_retriever
self.alpha = alpha
async def retrieve(
self,
query: str,
k: int = 10
) -> list[RetrievedDocument]:
"""Hybrid retrieval with score fusion."""
import asyncio
# Retrieve from both
dense_results, sparse_results = await asyncio.gather(
self.dense.retrieve(query, k * 2),
self.sparse.retrieve(query, k * 2)
)
# Normalize scores
dense_scores = self._normalize_scores(dense_results)
sparse_scores = self._normalize_scores(sparse_results)
# Combine
combined = {}
for doc, score in zip(dense_results, dense_scores):
combined[doc.id] = {
"doc": doc,
"dense_score": score,
"sparse_score": 0
}
for doc, score in zip(sparse_results, sparse_scores):
if doc.id in combined:
combined[doc.id]["sparse_score"] = score
else:
combined[doc.id] = {
"doc": doc,
"dense_score": 0,
"sparse_score": score
}
# Calculate final scores
results = []
for data in combined.values():
final_score = (
self.alpha * data["dense_score"] +
(1 - self.alpha) * data["sparse_score"]
)
doc = data["doc"]
doc.score = final_score
results.append(doc)
# Sort and return top k
results.sort(key=lambda x: x.score, reverse=True)
return results[:k]
def _normalize_scores(self, docs: list[RetrievedDocument]) -> list[float]:
"""Min-max normalize scores."""
if not docs:
return []
scores = [d.score for d in docs]
min_s, max_s = min(scores), max(scores)
if max_s == min_s:
return [1.0] * len(scores)
return [(s - min_s) / (max_s - min_s) for s in scores]
class MultiQueryRetriever(Retriever):
"""Retrieve for multiple query variants."""
def __init__(
self,
base_retriever: Retriever,
query_transformer: QueryTransformer
):
self.retriever = base_retriever
self.transformer = query_transformer
async def retrieve(
self,
query: str,
k: int = 10
) -> list[RetrievedDocument]:
"""Retrieve for all query variants."""
import asyncio
# Transform query
transformed = await self.transformer.transform(query)
# Retrieve for each variant
all_results = await asyncio.gather(*[
self.retriever.retrieve(q, k)
for q in transformed.transformed
])
# Reciprocal rank fusion
doc_scores = {}
for results in all_results:
for rank, doc in enumerate(results):
if doc.id not in doc_scores:
doc_scores[doc.id] = {"doc": doc, "rrf_score": 0}
# RRF formula
doc_scores[doc.id]["rrf_score"] += 1 / (60 + rank)
# Sort by RRF score
ranked = sorted(
doc_scores.values(),
key=lambda x: x["rrf_score"],
reverse=True
)
return [item["doc"] for item in ranked[:k]]
class ParentDocumentRetriever(Retriever):
"""Retrieve child chunks, return parent documents."""
def __init__(
self,
chunk_retriever: Retriever,
document_store: Any
):
self.chunk_retriever = chunk_retriever
self.doc_store = document_store
async def retrieve(
self,
query: str,
k: int = 10
) -> list[RetrievedDocument]:
"""Retrieve chunks, expand to parents."""
# Retrieve chunks
chunks = await self.chunk_retriever.retrieve(query, k * 3)
# Get unique parent documents
parent_ids = set()
parent_scores = {}
for chunk in chunks:
parent_id = chunk.metadata.get("parent_id")
if parent_id:
parent_ids.add(parent_id)
# Aggregate scores
if parent_id not in parent_scores:
parent_scores[parent_id] = []
parent_scores[parent_id].append(chunk.score)
# Fetch parent documents
parents = []
for parent_id in parent_ids:
doc = await self.doc_store.get(parent_id)
if doc:
# Use max chunk score as parent score
score = max(parent_scores[parent_id])
parents.append(RetrievedDocument(
id=parent_id,
content=doc["content"],
score=score,
metadata=doc.get("metadata", {})
))
# Sort by score
parents.sort(key=lambda x: x.score, reverse=True)
return parents[:k]
Reranking
from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
class Reranker(ABC):
"""Abstract reranker."""
@abstractmethod
async def rerank(
self,
query: str,
documents: list[RetrievedDocument],
k: int = None
) -> list[RetrievedDocument]:
"""Rerank documents."""
pass
class CrossEncoderReranker(Reranker):
"""Rerank using cross-encoder model."""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.model_name = model_name
self.model = None
def _load_model(self):
"""Lazy load model."""
if self.model is None:
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(self.model_name)
async def rerank(
self,
query: str,
documents: list[RetrievedDocument],
k: int = None
) -> list[RetrievedDocument]:
"""Rerank using cross-encoder."""
self._load_model()
if not documents:
return []
# Create query-document pairs
pairs = [(query, doc.content) for doc in documents]
# Score pairs
scores = self.model.predict(pairs)
# Update document scores
for doc, score in zip(documents, scores):
doc.score = float(score)
# Sort by new scores
documents.sort(key=lambda x: x.score, reverse=True)
if k:
return documents[:k]
return documents
class CohereReranker(Reranker):
"""Rerank using Cohere API."""
def __init__(self, api_key: str, model: str = "rerank-english-v2.0"):
import cohere
self.client = cohere.Client(api_key)
self.model = model
async def rerank(
self,
query: str,
documents: list[RetrievedDocument],
k: int = None
) -> list[RetrievedDocument]:
"""Rerank using Cohere."""
if not documents:
return []
response = self.client.rerank(
model=self.model,
query=query,
documents=[doc.content for doc in documents],
top_n=k or len(documents)
)
# Reorder documents
reranked = []
for result in response.results:
doc = documents[result.index]
doc.score = result.relevance_score
reranked.append(doc)
return reranked
class LLMReranker(Reranker):
"""Rerank using LLM scoring."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def rerank(
self,
query: str,
documents: list[RetrievedDocument],
k: int = None
) -> list[RetrievedDocument]:
"""Rerank using LLM relevance scoring."""
if not documents:
return []
import asyncio
async def score_document(doc: RetrievedDocument) -> float:
prompt = f"""Rate the relevance of this document to the query on a scale of 0-10.
Only output a number.
Query: {query}
Document: {doc.content[:1000]}
Relevance score (0-10):"""
response = await self.llm.complete(prompt)
try:
score = float(response.content.strip())
return min(max(score, 0), 10) / 10
except ValueError:
return 0.5
# Score all documents
scores = await asyncio.gather(*[score_document(doc) for doc in documents])
for doc, score in zip(documents, scores):
doc.score = score
documents.sort(key=lambda x: x.score, reverse=True)
if k:
return documents[:k]
return documents
class DiversityReranker(Reranker):
"""Rerank for diversity using MMR."""
def __init__(self, embedding_model: Any, lambda_param: float = 0.5):
self.embedder = embedding_model
self.lambda_param = lambda_param
async def rerank(
self,
query: str,
documents: list[RetrievedDocument],
k: int = None
) -> list[RetrievedDocument]:
"""Maximal Marginal Relevance reranking."""
if not documents:
return []
k = k or len(documents)
# Get embeddings
query_emb = await self.embedder.embed(query)
doc_embs = await self.embedder.embed_batch([d.content for d in documents])
# MMR selection
selected = []
remaining = list(range(len(documents)))
while len(selected) < k and remaining:
best_idx = None
best_score = float('-inf')
for idx in remaining:
# Relevance to query
relevance = self._cosine_sim(query_emb, doc_embs[idx])
# Diversity from selected
if selected:
max_sim = max(
self._cosine_sim(doc_embs[idx], doc_embs[s])
for s in selected
)
else:
max_sim = 0
# MMR score
mmr = self.lambda_param * relevance - (1 - self.lambda_param) * max_sim
if mmr > best_score:
best_score = mmr
best_idx = idx
if best_idx is not None:
selected.append(best_idx)
remaining.remove(best_idx)
return [documents[i] for i in selected]
def _cosine_sim(self, a, b) -> float:
"""Compute cosine similarity."""
import numpy as np
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
Generation Patterns
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class RAGResponse:
"""RAG response with sources."""
answer: str
sources: list[RetrievedDocument]
metadata: dict = None
class RAGGenerator:
"""Generate answers from retrieved context."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def generate(
self,
query: str,
documents: list[RetrievedDocument],
system_prompt: str = None
) -> RAGResponse:
"""Generate answer from context."""
# Build context
context = self._build_context(documents)
system = system_prompt or """You are a helpful assistant that answers questions based on the provided context.
If the context doesn't contain enough information to answer, say so.
Always cite your sources by referencing the document numbers."""
prompt = f"""Context:
{context}
Question: {query}
Answer based on the context above:"""
messages = [
{"role": "system", "content": system},
{"role": "user", "content": prompt}
]
response = await self.llm.chat(messages)
return RAGResponse(
answer=response.content,
sources=documents
)
def _build_context(self, documents: list[RetrievedDocument]) -> str:
"""Build context string from documents."""
parts = []
for i, doc in enumerate(documents, 1):
source = doc.metadata.get("source", "Unknown")
parts.append(f"[Document {i}] (Source: {source})\n{doc.content}")
return "\n\n".join(parts)
class RefineGenerator:
"""Iteratively refine answer with each document."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def generate(
self,
query: str,
documents: list[RetrievedDocument]
) -> RAGResponse:
"""Refine answer iteratively."""
if not documents:
return RAGResponse(answer="No relevant documents found.", sources=[])
# Initial answer from first document
current_answer = await self._initial_answer(query, documents[0])
# Refine with remaining documents
for doc in documents[1:]:
current_answer = await self._refine_answer(
query, current_answer, doc
)
return RAGResponse(
answer=current_answer,
sources=documents
)
async def _initial_answer(
self,
query: str,
document: RetrievedDocument
) -> str:
"""Generate initial answer."""
prompt = f"""Answer this question based on the document.
Document: {document.content}
Question: {query}
Answer:"""
response = await self.llm.complete(prompt)
return response.content
async def _refine_answer(
self,
query: str,
current_answer: str,
document: RetrievedDocument
) -> str:
"""Refine answer with new document."""
prompt = f"""You have an existing answer to a question.
Refine it using the new document if it provides additional relevant information.
If the new document isn't relevant, keep the original answer.
Question: {query}
Existing answer: {current_answer}
New document: {document.content}
Refined answer:"""
response = await self.llm.complete(prompt)
return response.content
class MapReduceGenerator:
"""Map-reduce for long document sets."""
def __init__(self, llm_client: Any, chunk_size: int = 5):
self.llm = llm_client
self.chunk_size = chunk_size
async def generate(
self,
query: str,
documents: list[RetrievedDocument]
) -> RAGResponse:
"""Map-reduce generation."""
import asyncio
# Map: Generate partial answers for each chunk
chunks = [
documents[i:i + self.chunk_size]
for i in range(0, len(documents), self.chunk_size)
]
partial_answers = await asyncio.gather(*[
self._map_chunk(query, chunk)
for chunk in chunks
])
# Reduce: Combine partial answers
final_answer = await self._reduce(query, partial_answers)
return RAGResponse(
answer=final_answer,
sources=documents
)
async def _map_chunk(
self,
query: str,
documents: list[RetrievedDocument]
) -> str:
"""Generate answer for document chunk."""
context = "\n\n".join(doc.content for doc in documents)
prompt = f"""Answer this question based on the documents.
Documents:
{context}
Question: {query}
Answer:"""
response = await self.llm.complete(prompt)
return response.content
async def _reduce(self, query: str, partial_answers: list[str]) -> str:
"""Combine partial answers."""
answers_text = "\n\n---\n\n".join(
f"Partial answer {i+1}:\n{ans}"
for i, ans in enumerate(partial_answers)
)
prompt = f"""Combine these partial answers into a comprehensive final answer.
Question: {query}
{answers_text}
Final comprehensive answer:"""
response = await self.llm.complete(prompt)
return response.content
class CitationGenerator:
"""Generate answers with inline citations."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def generate(
self,
query: str,
documents: list[RetrievedDocument]
) -> RAGResponse:
"""Generate with citations."""
# Number documents
numbered_context = "\n\n".join(
f"[{i+1}] {doc.content}"
for i, doc in enumerate(documents)
)
prompt = f"""Answer the question using the provided sources.
Include inline citations like [1], [2] etc. to reference your sources.
Sources:
{numbered_context}
Question: {query}
Answer with citations:"""
response = await self.llm.complete(prompt)
return RAGResponse(
answer=response.content,
sources=documents,
metadata={"has_citations": True}
)
Advanced RAG Patterns
from dataclasses import dataclass
from typing import Any, Optional
class SelfRAG:
"""Self-reflective RAG with retrieval decisions."""
def __init__(
self,
llm_client: Any,
retriever: Retriever
):
self.llm = llm_client
self.retriever = retriever
async def generate(self, query: str) -> RAGResponse:
"""Generate with self-reflection."""
# Decide if retrieval is needed
needs_retrieval = await self._needs_retrieval(query)
if needs_retrieval:
# Retrieve documents
documents = await self.retriever.retrieve(query)
# Generate with context
answer = await self._generate_with_context(query, documents)
# Check if answer is supported
is_supported = await self._check_support(answer, documents)
if not is_supported:
# Try again with more documents
more_docs = await self.retriever.retrieve(query, k=20)
answer = await self._generate_with_context(query, more_docs)
documents = more_docs
else:
# Generate without retrieval
answer = await self._generate_direct(query)
documents = []
return RAGResponse(answer=answer, sources=documents)
async def _needs_retrieval(self, query: str) -> bool:
"""Decide if retrieval is needed."""
prompt = f"""Does this question require looking up external information, or can it be answered from general knowledge?
Question: {query}
Answer with just "RETRIEVE" or "DIRECT":"""
response = await self.llm.complete(prompt)
return "RETRIEVE" in response.content.upper()
async def _generate_with_context(
self,
query: str,
documents: list[RetrievedDocument]
) -> str:
"""Generate answer with context."""
context = "\n\n".join(doc.content for doc in documents)
prompt = f"""Answer based on the context.
Context:
{context}
Question: {query}
Answer:"""
response = await self.llm.complete(prompt)
return response.content
async def _generate_direct(self, query: str) -> str:
"""Generate without retrieval."""
response = await self.llm.complete(f"Answer this question: {query}")
return response.content
async def _check_support(
self,
answer: str,
documents: list[RetrievedDocument]
) -> bool:
"""Check if answer is supported by documents."""
context = "\n\n".join(doc.content for doc in documents)
prompt = f"""Is this answer fully supported by the provided context?
Context:
{context}
Answer: {answer}
Reply with just "SUPPORTED" or "NOT_SUPPORTED":"""
response = await self.llm.complete(prompt)
return "SUPPORTED" in response.content.upper()
class CorrectiveRAG:
"""RAG with retrieval correction."""
def __init__(
self,
llm_client: Any,
retriever: Retriever,
web_search: Any = None
):
self.llm = llm_client
self.retriever = retriever
self.web_search = web_search
async def generate(self, query: str) -> RAGResponse:
"""Generate with retrieval correction."""
# Initial retrieval
documents = await self.retriever.retrieve(query)
# Grade documents
graded = await self._grade_documents(query, documents)
relevant_docs = [d for d, g in graded if g == "relevant"]
# If not enough relevant docs, try web search
if len(relevant_docs) < 2 and self.web_search:
web_docs = await self._web_search(query)
relevant_docs.extend(web_docs)
# Generate answer
if relevant_docs:
answer = await self._generate(query, relevant_docs)
else:
answer = "I couldn't find relevant information to answer this question."
return RAGResponse(answer=answer, sources=relevant_docs)
async def _grade_documents(
self,
query: str,
documents: list[RetrievedDocument]
) -> list[tuple[RetrievedDocument, str]]:
"""Grade document relevance."""
graded = []
for doc in documents:
prompt = f"""Is this document relevant to the question?
Question: {query}
Document: {doc.content[:500]}
Answer with just "relevant" or "irrelevant":"""
response = await self.llm.complete(prompt)
grade = "relevant" if "relevant" in response.content.lower() else "irrelevant"
graded.append((doc, grade))
return graded
async def _web_search(self, query: str) -> list[RetrievedDocument]:
"""Fallback to web search."""
if not self.web_search:
return []
results = await self.web_search.search(query)
return [
RetrievedDocument(
id=f"web_{i}",
content=r["content"],
score=1.0,
metadata={"source": r["url"]}
)
for i, r in enumerate(results)
]
async def _generate(
self,
query: str,
documents: list[RetrievedDocument]
) -> str:
"""Generate final answer."""
context = "\n\n".join(doc.content for doc in documents)
prompt = f"""Answer the question based on the context.
Context:
{context}
Question: {query}
Answer:"""
response = await self.llm.complete(prompt)
return response.content
class AdaptiveRAG:
"""Adapt retrieval strategy based on query."""
def __init__(
self,
llm_client: Any,
simple_retriever: Retriever,
complex_retriever: Retriever
):
self.llm = llm_client
self.simple_retriever = simple_retriever
self.complex_retriever = complex_retriever
async def generate(self, query: str) -> RAGResponse:
"""Adapt strategy based on query complexity."""
# Classify query
complexity = await self._classify_query(query)
if complexity == "simple":
documents = await self.simple_retriever.retrieve(query, k=3)
answer = await self._simple_generate(query, documents)
else:
documents = await self.complex_retriever.retrieve(query, k=10)
answer = await self._complex_generate(query, documents)
return RAGResponse(
answer=answer,
sources=documents,
metadata={"complexity": complexity}
)
async def _classify_query(self, query: str) -> str:
"""Classify query complexity."""
prompt = f"""Classify this query as "simple" or "complex".
Simple: Direct factual questions, single-hop reasoning
Complex: Multi-step reasoning, comparisons, analysis
Query: {query}
Classification:"""
response = await self.llm.complete(prompt)
return "complex" if "complex" in response.content.lower() else "simple"
async def _simple_generate(
self,
query: str,
documents: list[RetrievedDocument]
) -> str:
"""Simple generation."""
context = "\n".join(doc.content for doc in documents)
prompt = f"""Answer concisely based on the context.
Context: {context}
Question: {query}
Answer:"""
response = await self.llm.complete(prompt)
return response.content
async def _complex_generate(
self,
query: str,
documents: list[RetrievedDocument]
) -> str:
"""Complex generation with reasoning."""
context = "\n\n".join(doc.content for doc in documents)
prompt = f"""Answer this complex question step by step.
Context:
{context}
Question: {query}
Think through this step by step, then provide your answer:"""
response = await self.llm.complete(prompt)
return response.content
Production RAG Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class QueryRequest(BaseModel):
query: str
k: int = 5
use_reranking: bool = True
strategy: str = "standard" # standard, hyde, decompose
class RAGRequest(BaseModel):
query: str
k: int = 5
generation_strategy: str = "standard" # standard, refine, map_reduce
# Placeholder implementations
class MockRetriever:
async def retrieve(self, query: str, k: int = 10):
return [
RetrievedDocument(
id=f"doc_{i}",
content=f"Sample content for {query}",
score=1.0 - i * 0.1
)
for i in range(k)
]
class MockLLM:
async def complete(self, prompt: str):
class Response:
content = "Sample response"
return Response()
async def chat(self, messages: list):
class Response:
content = "Sample chat response"
return Response()
retriever = MockRetriever()
llm = MockLLM()
generator = RAGGenerator(llm)
@app.post("/v1/retrieve")
async def retrieve_documents(request: QueryRequest) -> dict:
"""Retrieve relevant documents."""
documents = await retriever.retrieve(request.query, request.k)
return {
"query": request.query,
"documents": [
{
"id": doc.id,
"content": doc.content,
"score": doc.score,
"metadata": doc.metadata
}
for doc in documents
]
}
@app.post("/v1/generate")
async def generate_answer(request: RAGRequest) -> dict:
"""Generate answer using RAG."""
# Retrieve
documents = await retriever.retrieve(request.query, request.k)
# Generate
if request.generation_strategy == "refine":
gen = RefineGenerator(llm)
elif request.generation_strategy == "map_reduce":
gen = MapReduceGenerator(llm)
else:
gen = generator
response = await gen.generate(request.query, documents)
return {
"query": request.query,
"answer": response.answer,
"sources": [
{"id": doc.id, "content": doc.content[:200]}
for doc in response.sources
]
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- RAG Paper: https://arxiv.org/abs/2005.11401
- HyDE: https://arxiv.org/abs/2212.10496
- Self-RAG: https://arxiv.org/abs/2310.11511
- Corrective RAG: https://arxiv.org/abs/2401.15884
- LangChain RAG: https://python.langchain.com/docs/use_cases/question_answering/
Conclusion
Building effective RAG systems requires going beyond basic retrieve-and-generate. Start with query transformation—HyDE, query expansion, and decomposition can dramatically improve retrieval quality for ambiguous or complex queries. Use hybrid retrieval combining dense and sparse methods to capture both semantic similarity and keyword matches. Always rerank your retrieved documents—cross-encoders provide much better relevance scoring than bi-encoders alone. For generation, choose your strategy based on the task: simple stuffing for short contexts, iterative refinement for accuracy, map-reduce for large document sets. Consider advanced patterns like Self-RAG and Corrective RAG when you need the system to reason about retrieval quality. Monitor your RAG pipeline end-to-end: track retrieval recall, answer faithfulness, and user satisfaction. The best RAG systems are iteratively improved based on failure analysis—look at queries where retrieval failed, where generation hallucinated, and where users weren’t satisfied. RAG is not a one-size-fits-all solution; the right combination of techniques depends on your specific documents, queries, and quality requirements.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.
