Advanced RAG Patterns: From Query Rewriting to Self-Reflective Retrieval

Introduction: Basic RAG retrieves documents and stuffs them into context. Advanced RAG transforms retrieval into a sophisticated pipeline that dramatically improves answer quality. This guide covers the techniques that separate production RAG systems from prototypes: query rewriting to improve retrieval, hybrid search combining dense and sparse methods, cross-encoder reranking for precision, contextual compression to fit more relevant information, self-RAG for adaptive retrieval, and corrective RAG for handling retrieval failures. Whether you’re building a knowledge base assistant, document Q&A system, or enterprise search, these patterns will help you achieve the accuracy and reliability that production applications demand.

Advanced RAG Patterns
Advanced RAG: Query Rewriting, Hybrid Retrieval, Cross-Encoder Reranking

Query Rewriting and Expansion

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod

@dataclass
class RewrittenQuery:
    """Result of query rewriting."""
    
    original: str
    rewritten: str
    expansions: list[str] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)

class QueryRewriter(ABC):
    """Abstract query rewriter."""
    
    @abstractmethod
    async def rewrite(self, query: str) -> RewrittenQuery:
        """Rewrite the query."""
        pass

class LLMQueryRewriter(QueryRewriter):
    """Rewrite queries using LLM."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def rewrite(self, query: str) -> RewrittenQuery:
        """Rewrite query for better retrieval."""
        
        prompt = f"""Rewrite this search query to improve retrieval results.
Make it more specific and include relevant synonyms.

Original query: {query}

Rewritten query (just the query, no explanation):"""
        
        response = await self.llm.complete(prompt)
        
        return RewrittenQuery(
            original=query,
            rewritten=response.content.strip()
        )

class HyDERewriter(QueryRewriter):
    """Hypothetical Document Embeddings rewriter."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def rewrite(self, query: str) -> RewrittenQuery:
        """Generate hypothetical document for query."""
        
        prompt = f"""Write a short passage that would answer this question.
The passage should be factual and informative.

Question: {query}

Passage:"""
        
        response = await self.llm.complete(prompt)
        
        return RewrittenQuery(
            original=query,
            rewritten=response.content.strip(),
            metadata={"method": "hyde"}
        )

class MultiQueryRewriter(QueryRewriter):
    """Generate multiple query variations."""
    
    def __init__(self, llm_client: Any, num_queries: int = 3):
        self.llm = llm_client
        self.num_queries = num_queries
    
    async def rewrite(self, query: str) -> RewrittenQuery:
        """Generate multiple query variations."""
        
        prompt = f"""Generate {self.num_queries} different versions of this search query.
Each version should approach the question from a different angle.

Original query: {query}

Variations (one per line):"""
        
        response = await self.llm.complete(prompt)
        
        variations = [
            line.strip().lstrip('0123456789.-) ')
            for line in response.content.split('\n')
            if line.strip()
        ][:self.num_queries]
        
        return RewrittenQuery(
            original=query,
            rewritten=query,  # Keep original as primary
            expansions=variations
        )

class StepBackRewriter(QueryRewriter):
    """Step-back prompting for abstract queries."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def rewrite(self, query: str) -> RewrittenQuery:
        """Generate step-back question."""
        
        prompt = f"""Given this specific question, generate a more general "step-back" question
that would help answer the original question.

Original question: {query}

Step-back question (more general/abstract):"""
        
        response = await self.llm.complete(prompt)
        
        return RewrittenQuery(
            original=query,
            rewritten=query,
            expansions=[response.content.strip()],
            metadata={"method": "step_back"}
        )

class QueryDecomposer(QueryRewriter):
    """Decompose complex queries into sub-queries."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def rewrite(self, query: str) -> RewrittenQuery:
        """Decompose into sub-queries."""
        
        prompt = f"""Break down this complex question into simpler sub-questions
that can be answered independently.

Complex question: {query}

Sub-questions (one per line):"""
        
        response = await self.llm.complete(prompt)
        
        sub_queries = [
            line.strip().lstrip('0123456789.-) ')
            for line in response.content.split('\n')
            if line.strip()
        ]
        
        return RewrittenQuery(
            original=query,
            rewritten=query,
            expansions=sub_queries,
            metadata={"method": "decomposition"}
        )

Hybrid Retrieval

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
import numpy as np

@dataclass
class RetrievedDocument:
    """A retrieved document."""
    
    id: str
    content: str
    score: float
    metadata: dict = field(default_factory=dict)
    source: str = "unknown"

class Retriever(ABC):
    """Abstract retriever."""
    
    @abstractmethod
    async def retrieve(self, query: str, k: int = 10) -> list[RetrievedDocument]:
        """Retrieve documents."""
        pass

class DenseRetriever(Retriever):
    """Dense vector retrieval."""
    
    def __init__(self, embedding_model: Any, vector_store: Any):
        self.embedder = embedding_model
        self.store = vector_store
    
    async def retrieve(self, query: str, k: int = 10) -> list[RetrievedDocument]:
        """Retrieve using dense embeddings."""
        
        query_embedding = self.embedder.embed(query)
        results = await self.store.search(query_embedding, k=k)
        
        return [
            RetrievedDocument(
                id=r.id,
                content=r.content,
                score=r.score,
                metadata=r.metadata,
                source="dense"
            )
            for r in results
        ]

class SparseRetriever(Retriever):
    """Sparse BM25 retrieval."""
    
    def __init__(self, index: Any):
        self.index = index
    
    async def retrieve(self, query: str, k: int = 10) -> list[RetrievedDocument]:
        """Retrieve using BM25."""
        
        results = self.index.search(query, k=k)
        
        return [
            RetrievedDocument(
                id=r.id,
                content=r.content,
                score=r.score,
                metadata=r.metadata,
                source="sparse"
            )
            for r in results
        ]

class HybridRetriever(Retriever):
    """Combine dense and sparse retrieval."""
    
    def __init__(
        self,
        dense_retriever: DenseRetriever,
        sparse_retriever: SparseRetriever,
        alpha: float = 0.5  # Weight for dense scores
    ):
        self.dense = dense_retriever
        self.sparse = sparse_retriever
        self.alpha = alpha
    
    async def retrieve(self, query: str, k: int = 10) -> list[RetrievedDocument]:
        """Hybrid retrieval with score fusion."""
        
        import asyncio
        
        # Retrieve from both
        dense_results, sparse_results = await asyncio.gather(
            self.dense.retrieve(query, k=k * 2),
            self.sparse.retrieve(query, k=k * 2)
        )
        
        # Normalize scores
        dense_scores = self._normalize_scores([d.score for d in dense_results])
        sparse_scores = self._normalize_scores([d.score for d in sparse_results])
        
        # Build score map
        scores = {}
        docs = {}
        
        for doc, score in zip(dense_results, dense_scores):
            scores[doc.id] = self.alpha * score
            docs[doc.id] = doc
        
        for doc, score in zip(sparse_results, sparse_scores):
            if doc.id in scores:
                scores[doc.id] += (1 - self.alpha) * score
            else:
                scores[doc.id] = (1 - self.alpha) * score
                docs[doc.id] = doc
        
        # Sort by combined score
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        
        return [
            RetrievedDocument(
                id=doc_id,
                content=docs[doc_id].content,
                score=scores[doc_id],
                metadata=docs[doc_id].metadata,
                source="hybrid"
            )
            for doc_id in sorted_ids[:k]
        ]
    
    def _normalize_scores(self, scores: list[float]) -> list[float]:
        """Min-max normalize scores."""
        
        if not scores:
            return []
        
        min_s = min(scores)
        max_s = max(scores)
        
        if max_s == min_s:
            return [1.0] * len(scores)
        
        return [(s - min_s) / (max_s - min_s) for s in scores]

class RRFRetriever(Retriever):
    """Reciprocal Rank Fusion retriever."""
    
    def __init__(self, retrievers: list[Retriever], k_constant: int = 60):
        self.retrievers = retrievers
        self.k = k_constant
    
    async def retrieve(self, query: str, k: int = 10) -> list[RetrievedDocument]:
        """Retrieve with RRF fusion."""
        
        import asyncio
        
        # Get results from all retrievers
        all_results = await asyncio.gather(
            *[r.retrieve(query, k=k * 2) for r in self.retrievers]
        )
        
        # Calculate RRF scores
        rrf_scores = {}
        docs = {}
        
        for results in all_results:
            for rank, doc in enumerate(results):
                rrf_score = 1.0 / (self.k + rank + 1)
                
                if doc.id in rrf_scores:
                    rrf_scores[doc.id] += rrf_score
                else:
                    rrf_scores[doc.id] = rrf_score
                    docs[doc.id] = doc
        
        # Sort by RRF score
        sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
        
        return [
            RetrievedDocument(
                id=doc_id,
                content=docs[doc_id].content,
                score=rrf_scores[doc_id],
                metadata=docs[doc_id].metadata,
                source="rrf"
            )
            for doc_id in sorted_ids[:k]
        ]

class EnsembleRetriever(Retriever):
    """Ensemble of retrievers with learned weights."""
    
    def __init__(self, retrievers: list[Retriever], weights: list[float] = None):
        self.retrievers = retrievers
        self.weights = weights or [1.0 / len(retrievers)] * len(retrievers)
    
    async def retrieve(self, query: str, k: int = 10) -> list[RetrievedDocument]:
        """Weighted ensemble retrieval."""
        
        import asyncio
        
        all_results = await asyncio.gather(
            *[r.retrieve(query, k=k * 2) for r in self.retrievers]
        )
        
        # Weighted score combination
        scores = {}
        docs = {}
        
        for results, weight in zip(all_results, self.weights):
            # Normalize scores within each retriever
            max_score = max(d.score for d in results) if results else 1.0
            
            for doc in results:
                normalized = doc.score / max_score if max_score > 0 else 0
                weighted = normalized * weight
                
                if doc.id in scores:
                    scores[doc.id] += weighted
                else:
                    scores[doc.id] = weighted
                    docs[doc.id] = doc
        
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        
        return [
            RetrievedDocument(
                id=doc_id,
                content=docs[doc_id].content,
                score=scores[doc_id],
                metadata=docs[doc_id].metadata,
                source="ensemble"
            )
            for doc_id in sorted_ids[:k]
        ]

Cross-Encoder Reranking

from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod

class Reranker(ABC):
    """Abstract reranker."""
    
    @abstractmethod
    async def rerank(
        self,
        query: str,
        documents: list[RetrievedDocument],
        k: int = None
    ) -> list[RetrievedDocument]:
        """Rerank documents."""
        pass

class CrossEncoderReranker(Reranker):
    """Rerank using cross-encoder model."""
    
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)
    
    async def rerank(
        self,
        query: str,
        documents: list[RetrievedDocument],
        k: int = None
    ) -> list[RetrievedDocument]:
        """Rerank with cross-encoder."""
        
        if not documents:
            return []
        
        # Create query-document pairs
        pairs = [(query, doc.content) for doc in documents]
        
        # Get scores
        scores = self.model.predict(pairs)
        
        # Sort by score
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        # Update scores and return
        result = []
        for doc, score in scored_docs[:k]:
            result.append(RetrievedDocument(
                id=doc.id,
                content=doc.content,
                score=float(score),
                metadata={**doc.metadata, "original_score": doc.score},
                source=doc.source
            ))
        
        return result

class CohereReranker(Reranker):
    """Rerank using Cohere API."""
    
    def __init__(self, api_key: str, model: str = "rerank-english-v2.0"):
        import cohere
        self.client = cohere.Client(api_key)
        self.model = model
    
    async def rerank(
        self,
        query: str,
        documents: list[RetrievedDocument],
        k: int = None
    ) -> list[RetrievedDocument]:
        """Rerank with Cohere."""
        
        if not documents:
            return []
        
        response = self.client.rerank(
            model=self.model,
            query=query,
            documents=[doc.content for doc in documents],
            top_n=k or len(documents)
        )
        
        result = []
        for r in response.results:
            doc = documents[r.index]
            result.append(RetrievedDocument(
                id=doc.id,
                content=doc.content,
                score=r.relevance_score,
                metadata={**doc.metadata, "original_score": doc.score},
                source=doc.source
            ))
        
        return result

class LLMReranker(Reranker):
    """Rerank using LLM scoring."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def rerank(
        self,
        query: str,
        documents: list[RetrievedDocument],
        k: int = None
    ) -> list[RetrievedDocument]:
        """Rerank with LLM relevance scoring."""
        
        if not documents:
            return []
        
        scored_docs = []
        
        for doc in documents:
            prompt = f"""Rate the relevance of this document to the query on a scale of 0-10.

Query: {query}

Document: {doc.content[:500]}

Relevance score (just the number):"""
            
            response = await self.llm.complete(prompt)
            
            try:
                score = float(response.content.strip())
            except ValueError:
                score = 5.0
            
            scored_docs.append((doc, score))
        
        # Sort by score
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        result = []
        for doc, score in scored_docs[:k]:
            result.append(RetrievedDocument(
                id=doc.id,
                content=doc.content,
                score=score / 10.0,
                metadata={**doc.metadata, "original_score": doc.score},
                source=doc.source
            ))
        
        return result

class DiversityReranker(Reranker):
    """Rerank for diversity using MMR."""
    
    def __init__(self, embedding_model: Any, lambda_param: float = 0.5):
        self.embedder = embedding_model
        self.lambda_param = lambda_param
    
    async def rerank(
        self,
        query: str,
        documents: list[RetrievedDocument],
        k: int = None
    ) -> list[RetrievedDocument]:
        """Maximal Marginal Relevance reranking."""
        
        if not documents:
            return []
        
        k = k or len(documents)
        
        # Get embeddings
        query_emb = self.embedder.embed(query)
        doc_embs = [self.embedder.embed(doc.content) for doc in documents]
        
        # Calculate query similarities
        query_sims = [
            np.dot(query_emb, doc_emb)
            for doc_emb in doc_embs
        ]
        
        # MMR selection
        selected = []
        remaining = list(range(len(documents)))
        
        while len(selected) < k and remaining:
            mmr_scores = []
            
            for idx in remaining:
                # Relevance to query
                relevance = query_sims[idx]
                
                # Max similarity to already selected
                if selected:
                    max_sim = max(
                        np.dot(doc_embs[idx], doc_embs[s])
                        for s in selected
                    )
                else:
                    max_sim = 0
                
                # MMR score
                mmr = self.lambda_param * relevance - (1 - self.lambda_param) * max_sim
                mmr_scores.append((idx, mmr))
            
            # Select highest MMR
            best_idx = max(mmr_scores, key=lambda x: x[1])[0]
            selected.append(best_idx)
            remaining.remove(best_idx)
        
        return [
            RetrievedDocument(
                id=documents[idx].id,
                content=documents[idx].content,
                score=query_sims[idx],
                metadata=documents[idx].metadata,
                source=documents[idx].source
            )
            for idx in selected
        ]

Contextual Compression

from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod

class ContextCompressor(ABC):
    """Abstract context compressor."""
    
    @abstractmethod
    async def compress(
        self,
        query: str,
        documents: list[RetrievedDocument]
    ) -> list[RetrievedDocument]:
        """Compress documents to relevant parts."""
        pass

class LLMExtractor(ContextCompressor):
    """Extract relevant parts using LLM."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def compress(
        self,
        query: str,
        documents: list[RetrievedDocument]
    ) -> list[RetrievedDocument]:
        """Extract relevant sentences."""
        
        compressed = []
        
        for doc in documents:
            prompt = f"""Extract only the sentences from this document that are relevant to the query.
If no sentences are relevant, respond with "NO_RELEVANT_CONTENT".

Query: {query}

Document:
{doc.content}

Relevant sentences:"""
            
            response = await self.llm.complete(prompt)
            
            if "NO_RELEVANT_CONTENT" not in response.content:
                compressed.append(RetrievedDocument(
                    id=doc.id,
                    content=response.content.strip(),
                    score=doc.score,
                    metadata={**doc.metadata, "compressed": True},
                    source=doc.source
                ))
        
        return compressed

class EmbeddingFilter(ContextCompressor):
    """Filter sentences by embedding similarity."""
    
    def __init__(self, embedding_model: Any, threshold: float = 0.5):
        self.embedder = embedding_model
        self.threshold = threshold
    
    async def compress(
        self,
        query: str,
        documents: list[RetrievedDocument]
    ) -> list[RetrievedDocument]:
        """Filter to relevant sentences."""
        
        query_emb = self.embedder.embed(query)
        compressed = []
        
        for doc in documents:
            # Split into sentences
            sentences = self._split_sentences(doc.content)
            
            # Filter by similarity
            relevant = []
            for sentence in sentences:
                sent_emb = self.embedder.embed(sentence)
                similarity = np.dot(query_emb, sent_emb)
                
                if similarity >= self.threshold:
                    relevant.append(sentence)
            
            if relevant:
                compressed.append(RetrievedDocument(
                    id=doc.id,
                    content=" ".join(relevant),
                    score=doc.score,
                    metadata={**doc.metadata, "compressed": True},
                    source=doc.source
                ))
        
        return compressed
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split text into sentences."""
        import re
        return [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]

class ChunkCompressor(ContextCompressor):
    """Compress by selecting relevant chunks."""
    
    def __init__(
        self,
        embedding_model: Any,
        chunk_size: int = 200,
        top_chunks: int = 3
    ):
        self.embedder = embedding_model
        self.chunk_size = chunk_size
        self.top_chunks = top_chunks
    
    async def compress(
        self,
        query: str,
        documents: list[RetrievedDocument]
    ) -> list[RetrievedDocument]:
        """Select top relevant chunks."""
        
        query_emb = self.embedder.embed(query)
        compressed = []
        
        for doc in documents:
            # Split into chunks
            chunks = self._split_chunks(doc.content)
            
            # Score chunks
            scored_chunks = []
            for chunk in chunks:
                chunk_emb = self.embedder.embed(chunk)
                score = np.dot(query_emb, chunk_emb)
                scored_chunks.append((chunk, score))
            
            # Select top chunks
            scored_chunks.sort(key=lambda x: x[1], reverse=True)
            top = scored_chunks[:self.top_chunks]
            
            if top:
                compressed.append(RetrievedDocument(
                    id=doc.id,
                    content=" ... ".join(c[0] for c in top),
                    score=doc.score,
                    metadata={**doc.metadata, "compressed": True},
                    source=doc.source
                ))
        
        return compressed
    
    def _split_chunks(self, text: str) -> list[str]:
        """Split text into chunks."""
        
        words = text.split()
        chunks = []
        
        for i in range(0, len(words), self.chunk_size):
            chunk = " ".join(words[i:i + self.chunk_size])
            chunks.append(chunk)
        
        return chunks

class SummaryCompressor(ContextCompressor):
    """Compress by summarizing documents."""
    
    def __init__(self, llm_client: Any, max_length: int = 200):
        self.llm = llm_client
        self.max_length = max_length
    
    async def compress(
        self,
        query: str,
        documents: list[RetrievedDocument]
    ) -> list[RetrievedDocument]:
        """Summarize documents focused on query."""
        
        compressed = []
        
        for doc in documents:
            prompt = f"""Summarize this document in {self.max_length} words or less,
focusing on information relevant to the query.

Query: {query}

Document:
{doc.content}

Summary:"""
            
            response = await self.llm.complete(prompt)
            
            compressed.append(RetrievedDocument(
                id=doc.id,
                content=response.content.strip(),
                score=doc.score,
                metadata={**doc.metadata, "compressed": True, "method": "summary"},
                source=doc.source
            ))
        
        return compressed

Self-RAG and Corrective RAG

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class RetrievalDecision(Enum):
    """Whether to retrieve."""
    
    RETRIEVE = "retrieve"
    NO_RETRIEVE = "no_retrieve"

class RelevanceGrade(Enum):
    """Document relevance grade."""
    
    RELEVANT = "relevant"
    IRRELEVANT = "irrelevant"

class SupportGrade(Enum):
    """How well response is supported."""
    
    FULLY_SUPPORTED = "fully_supported"
    PARTIALLY_SUPPORTED = "partially_supported"
    NOT_SUPPORTED = "not_supported"

class SelfRAG:
    """Self-reflective RAG with adaptive retrieval."""
    
    def __init__(
        self,
        llm_client: Any,
        retriever: Retriever,
        reranker: Reranker = None
    ):
        self.llm = llm_client
        self.retriever = retriever
        self.reranker = reranker
    
    async def query(self, question: str) -> dict:
        """Process query with self-reflection."""
        
        # Step 1: Decide if retrieval is needed
        decision = await self._should_retrieve(question)
        
        if decision == RetrievalDecision.NO_RETRIEVE:
            # Generate without retrieval
            response = await self._generate_without_context(question)
            return {
                "answer": response,
                "retrieved": False,
                "documents": []
            }
        
        # Step 2: Retrieve documents
        documents = await self.retriever.retrieve(question)
        
        # Step 3: Grade relevance
        relevant_docs = await self._grade_documents(question, documents)
        
        if not relevant_docs:
            # No relevant documents - try web search or generate without context
            response = await self._generate_without_context(question)
            return {
                "answer": response,
                "retrieved": True,
                "documents": [],
                "note": "No relevant documents found"
            }
        
        # Step 4: Rerank if available
        if self.reranker:
            relevant_docs = await self.reranker.rerank(question, relevant_docs, k=5)
        
        # Step 5: Generate response
        response = await self._generate_with_context(question, relevant_docs)
        
        # Step 6: Check if response is supported
        support = await self._check_support(response, relevant_docs)
        
        if support == SupportGrade.NOT_SUPPORTED:
            # Regenerate or add disclaimer
            response = await self._regenerate_with_grounding(question, relevant_docs)
        
        return {
            "answer": response,
            "retrieved": True,
            "documents": [d.content for d in relevant_docs],
            "support_grade": support.value
        }
    
    async def _should_retrieve(self, question: str) -> RetrievalDecision:
        """Decide if retrieval is needed."""
        
        prompt = f"""Determine if external information is needed to answer this question.
Answer "retrieve" if external knowledge is needed, "no_retrieve" if you can answer from general knowledge.

Question: {question}

Decision:"""
        
        response = await self.llm.complete(prompt)
        
        if "no_retrieve" in response.content.lower():
            return RetrievalDecision.NO_RETRIEVE
        return RetrievalDecision.RETRIEVE
    
    async def _grade_documents(
        self,
        question: str,
        documents: list[RetrievedDocument]
    ) -> list[RetrievedDocument]:
        """Grade document relevance."""
        
        relevant = []
        
        for doc in documents:
            prompt = f"""Is this document relevant to the question?
Answer "relevant" or "irrelevant".

Question: {question}

Document: {doc.content[:500]}

Grade:"""
            
            response = await self.llm.complete(prompt)
            
            if "relevant" in response.content.lower() and "irrelevant" not in response.content.lower():
                relevant.append(doc)
        
        return relevant
    
    async def _generate_with_context(
        self,
        question: str,
        documents: list[RetrievedDocument]
    ) -> str:
        """Generate response with context."""
        
        context = "\n\n".join(doc.content for doc in documents)
        
        prompt = f"""Answer the question based on the provided context.

Context:
{context}

Question: {question}

Answer:"""
        
        response = await self.llm.complete(prompt)
        return response.content
    
    async def _generate_without_context(self, question: str) -> str:
        """Generate response without context."""
        
        prompt = f"""Answer this question based on your knowledge.

Question: {question}

Answer:"""
        
        response = await self.llm.complete(prompt)
        return response.content
    
    async def _check_support(
        self,
        response: str,
        documents: list[RetrievedDocument]
    ) -> SupportGrade:
        """Check if response is supported by documents."""
        
        context = "\n\n".join(doc.content for doc in documents)
        
        prompt = f"""Is this response supported by the provided documents?
Answer "fully_supported", "partially_supported", or "not_supported".

Documents:
{context}

Response: {response}

Support grade:"""
        
        result = await self.llm.complete(prompt)
        
        if "fully" in result.content.lower():
            return SupportGrade.FULLY_SUPPORTED
        elif "partially" in result.content.lower():
            return SupportGrade.PARTIALLY_SUPPORTED
        return SupportGrade.NOT_SUPPORTED
    
    async def _regenerate_with_grounding(
        self,
        question: str,
        documents: list[RetrievedDocument]
    ) -> str:
        """Regenerate with explicit grounding."""
        
        context = "\n\n".join(doc.content for doc in documents)
        
        prompt = f"""Answer the question using ONLY information from the provided documents.
If the documents don't contain enough information, say so.

Documents:
{context}

Question: {question}

Answer (cite specific parts of documents):"""
        
        response = await self.llm.complete(prompt)
        return response.content

class CorrectiveRAG:
    """RAG with retrieval correction."""
    
    def __init__(
        self,
        llm_client: Any,
        primary_retriever: Retriever,
        fallback_retriever: Retriever = None
    ):
        self.llm = llm_client
        self.primary = primary_retriever
        self.fallback = fallback_retriever
    
    async def query(self, question: str) -> dict:
        """Query with corrective retrieval."""
        
        # Initial retrieval
        documents = await self.primary.retrieve(question)
        
        # Evaluate retrieval quality
        evaluation = await self._evaluate_retrieval(question, documents)
        
        if evaluation == "correct":
            # Good retrieval - proceed
            pass
        elif evaluation == "ambiguous":
            # Partially relevant - augment with fallback
            if self.fallback:
                fallback_docs = await self.fallback.retrieve(question)
                documents = self._merge_documents(documents, fallback_docs)
        else:  # incorrect
            # Poor retrieval - use fallback only
            if self.fallback:
                documents = await self.fallback.retrieve(question)
            else:
                # Transform query and retry
                transformed = await self._transform_query(question)
                documents = await self.primary.retrieve(transformed)
        
        # Generate response
        response = await self._generate(question, documents)
        
        return {
            "answer": response,
            "evaluation": evaluation,
            "documents": [d.content for d in documents]
        }
    
    async def _evaluate_retrieval(
        self,
        question: str,
        documents: list[RetrievedDocument]
    ) -> str:
        """Evaluate retrieval quality."""
        
        if not documents:
            return "incorrect"
        
        context = "\n\n".join(doc.content[:300] for doc in documents[:3])
        
        prompt = f"""Evaluate if these documents can answer the question.
Answer "correct" if documents are relevant, "ambiguous" if partially relevant, "incorrect" if not relevant.

Question: {question}

Documents:
{context}

Evaluation:"""
        
        response = await self.llm.complete(prompt)
        
        if "correct" in response.content.lower() and "incorrect" not in response.content.lower():
            return "correct"
        elif "ambiguous" in response.content.lower():
            return "ambiguous"
        return "incorrect"
    
    async def _transform_query(self, question: str) -> str:
        """Transform query for better retrieval."""
        
        prompt = f"""Rewrite this question to be more specific and searchable.

Original: {question}

Rewritten:"""
        
        response = await self.llm.complete(prompt)
        return response.content.strip()
    
    def _merge_documents(
        self,
        primary: list[RetrievedDocument],
        fallback: list[RetrievedDocument]
    ) -> list[RetrievedDocument]:
        """Merge document lists."""
        
        seen_ids = {d.id for d in primary}
        merged = list(primary)
        
        for doc in fallback:
            if doc.id not in seen_ids:
                merged.append(doc)
                seen_ids.add(doc.id)
        
        return merged[:10]
    
    async def _generate(
        self,
        question: str,
        documents: list[RetrievedDocument]
    ) -> str:
        """Generate response."""
        
        context = "\n\n".join(doc.content for doc in documents)
        
        prompt = f"""Answer based on the context.

Context:
{context}

Question: {question}

Answer:"""
        
        response = await self.llm.complete(prompt)
        return response.content

Production RAG Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import asyncio

app = FastAPI()

class QueryRequest(BaseModel):
    query: str
    k: int = 5
    rerank: bool = True
    compress: bool = False

class IndexRequest(BaseModel):
    documents: list[dict]  # [{id, content, metadata}]

# Initialize components (simplified)
class MockRetriever:
    async def retrieve(self, query: str, k: int = 10):
        return []

class MockReranker:
    async def rerank(self, query: str, docs: list, k: int = None):
        return docs[:k] if k else docs

retriever = MockRetriever()
reranker = MockReranker()

@app.post("/v1/query")
async def query_rag(request: QueryRequest) -> dict:
    """Query the RAG system."""
    
    # Retrieve
    documents = await retriever.retrieve(request.query, k=request.k * 2)
    
    if not documents:
        return {
            "answer": "No relevant documents found.",
            "documents": [],
            "query": request.query
        }
    
    # Rerank if requested
    if request.rerank:
        documents = await reranker.rerank(request.query, documents, k=request.k)
    else:
        documents = documents[:request.k]
    
    return {
        "documents": [
            {
                "id": doc.id,
                "content": doc.content,
                "score": doc.score
            }
            for doc in documents
        ],
        "query": request.query
    }

@app.post("/v1/index")
async def index_documents(request: IndexRequest) -> dict:
    """Index documents."""
    
    # Would add to vector store
    return {
        "status": "indexed",
        "count": len(request.documents)
    }

@app.get("/v1/stats")
async def get_stats() -> dict:
    """Get system stats."""
    
    return {
        "documents_indexed": 0,
        "queries_processed": 0
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Advanced RAG is about building retrieval pipelines that match the sophistication of your use case. Start with query rewriting—a simple LLM rewrite can dramatically improve retrieval quality by making queries more specific and adding relevant terms. Implement hybrid retrieval combining dense and sparse methods; they capture different types of relevance and together outperform either alone. Add cross-encoder reranking as your precision layer; bi-encoders are fast but cross-encoders are accurate, so retrieve broadly then rerank precisely. Use contextual compression when context windows are limited; extracting relevant sentences lets you fit more useful information. Consider Self-RAG when you need adaptive retrieval—not every query needs external knowledge, and not every retrieved document is relevant. Implement Corrective RAG when retrieval quality varies; having fallback strategies and query transformation handles the cases where initial retrieval fails. The key insight is that RAG is not a single technique but a pipeline of techniques, each addressing a different failure mode. Profile your system to find where quality drops, then add the appropriate technique to address that specific issue. Production RAG systems often combine all these patterns, creating robust pipelines that handle the full diversity of real-world queries.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.