Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Advanced Retrieval Strategies for RAG: From Query Transformation to Multi-Stage Pipelines

Introduction: Retrieval is the foundation of RAG systems. Poor retrieval means irrelevant context, which leads to hallucinations and wrong answers regardless of how capable your LLM is. Yet many RAG implementations use naive approaches—single-stage vector search with default settings. This guide covers advanced retrieval strategies: query transformation techniques, hybrid search combining dense and sparse methods, multi-stage retrieval pipelines, and reranking approaches that dramatically improve the relevance of retrieved documents.

Retrieval Strategies
Retrieval Pipeline: Query Embedding, Multi-Stage Search, Reranking

Query Transformation

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class TransformedQuery:
    """A transformed query with metadata."""
    
    original: str
    transformed: str
    method: str
    metadata: dict = None

class QueryExpander:
    """Expand queries for better retrieval."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def expand_with_synonyms(self, query: str) -> TransformedQuery:
        """Expand query with synonyms and related terms."""
        
        prompt = f"""Expand this search query with synonyms and related terms.
Keep the original meaning but add variations that might match relevant documents.

Query: {query}

Return the expanded query (original + expansions) on a single line."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=200
        )
        
        expanded = response.choices[0].message.content.strip()
        
        return TransformedQuery(
            original=query,
            transformed=expanded,
            method="synonym_expansion"
        )
    
    async def generate_subqueries(self, query: str) -> list[TransformedQuery]:
        """Break complex query into subqueries."""
        
        prompt = f"""Break this complex query into simpler subqueries.
Each subquery should capture one aspect of the original question.

Query: {query}

Return each subquery on a separate line (2-4 subqueries)."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=300
        )
        
        subqueries = []
        for line in response.choices[0].message.content.strip().split('\n'):
            line = line.strip()
            if line and not line.startswith('-'):
                line = line.lstrip('0123456789.-) ')
            if line:
                subqueries.append(TransformedQuery(
                    original=query,
                    transformed=line,
                    method="subquery"
                ))
        
        return subqueries
    
    async def hypothetical_document(self, query: str) -> TransformedQuery:
        """Generate hypothetical document that would answer the query (HyDE)."""
        
        prompt = f"""Write a short paragraph that would be the perfect answer to this question.
Write as if you're the document that contains the answer.

Question: {query}

Answer paragraph:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=300
        )
        
        hypothetical = response.choices[0].message.content.strip()
        
        return TransformedQuery(
            original=query,
            transformed=hypothetical,
            method="hyde"
        )

class QueryRewriter:
    """Rewrite queries for better matching."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def rewrite_for_retrieval(self, query: str) -> TransformedQuery:
        """Rewrite query to be more retrieval-friendly."""
        
        prompt = f"""Rewrite this query to be better for document retrieval.
- Remove conversational elements
- Focus on key concepts and entities
- Use terms likely to appear in documents

Original: {query}

Rewritten query:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=100
        )
        
        rewritten = response.choices[0].message.content.strip()
        
        return TransformedQuery(
            original=query,
            transformed=rewritten,
            method="retrieval_rewrite"
        )
    
    async def step_back(self, query: str) -> TransformedQuery:
        """Generate a more general 'step-back' query."""
        
        prompt = f"""Generate a more general question that would help answer this specific question.
The general question should retrieve background information useful for the specific question.

Specific question: {query}

General question:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=100
        )
        
        general = response.choices[0].message.content.strip()
        
        return TransformedQuery(
            original=query,
            transformed=general,
            method="step_back"
        )

Hybrid Search

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
from collections import Counter
import math

@dataclass
class SearchResult:
    """A search result with score."""
    
    doc_id: str
    content: str
    score: float
    metadata: dict = None

class DenseRetriever:
    """Dense vector retrieval."""
    
    def __init__(
        self,
        embedding_client: Any,
        index: Any,  # FAISS, Pinecone, etc.
        embedding_model: str = "text-embedding-3-small"
    ):
        self.embedding_client = embedding_client
        self.index = index
        self.embedding_model = embedding_model
    
    async def search(
        self,
        query: str,
        top_k: int = 10
    ) -> list[SearchResult]:
        """Search using dense embeddings."""
        
        # Get query embedding
        response = await self.embedding_client.embeddings.create(
            model=self.embedding_model,
            input=query
        )
        query_embedding = response.data[0].embedding
        
        # Search index
        results = self.index.search(query_embedding, top_k)
        
        return [
            SearchResult(
                doc_id=r["id"],
                content=r["content"],
                score=r["score"],
                metadata=r.get("metadata", {})
            )
            for r in results
        ]

class SparseRetriever:
    """Sparse keyword retrieval using BM25."""
    
    def __init__(self, documents: list[dict]):
        self.documents = documents
        self.doc_freqs = Counter()
        self.doc_lengths = []
        self.avg_doc_length = 0
        self.k1 = 1.5
        self.b = 0.75
        
        self._build_index()
    
    def _build_index(self):
        """Build BM25 index."""
        
        total_length = 0
        
        for doc in self.documents:
            tokens = self._tokenize(doc["content"])
            self.doc_lengths.append(len(tokens))
            total_length += len(tokens)
            
            unique_tokens = set(tokens)
            for token in unique_tokens:
                self.doc_freqs[token] += 1
        
        self.avg_doc_length = total_length / len(self.documents) if self.documents else 0
    
    def _tokenize(self, text: str) -> list[str]:
        """Simple tokenization."""
        return text.lower().split()
    
    def _bm25_score(
        self,
        query_tokens: list[str],
        doc_tokens: list[str],
        doc_length: int
    ) -> float:
        """Calculate BM25 score."""
        
        score = 0.0
        doc_token_counts = Counter(doc_tokens)
        n_docs = len(self.documents)
        
        for token in query_tokens:
            if token not in doc_token_counts:
                continue
            
            tf = doc_token_counts[token]
            df = self.doc_freqs.get(token, 0)
            
            idf = math.log((n_docs - df + 0.5) / (df + 0.5) + 1)
            
            tf_component = (tf * (self.k1 + 1)) / (
                tf + self.k1 * (1 - self.b + self.b * doc_length / self.avg_doc_length)
            )
            
            score += idf * tf_component
        
        return score
    
    def search(self, query: str, top_k: int = 10) -> list[SearchResult]:
        """Search using BM25."""
        
        query_tokens = self._tokenize(query)
        
        scores = []
        for i, doc in enumerate(self.documents):
            doc_tokens = self._tokenize(doc["content"])
            score = self._bm25_score(query_tokens, doc_tokens, self.doc_lengths[i])
            scores.append((i, score))
        
        scores.sort(key=lambda x: x[1], reverse=True)
        
        results = []
        for i, score in scores[:top_k]:
            doc = self.documents[i]
            results.append(SearchResult(
                doc_id=doc.get("id", str(i)),
                content=doc["content"],
                score=score,
                metadata=doc.get("metadata", {})
            ))
        
        return results

class HybridRetriever:
    """Combine dense and sparse retrieval."""
    
    def __init__(
        self,
        dense_retriever: DenseRetriever,
        sparse_retriever: SparseRetriever,
        dense_weight: float = 0.5
    ):
        self.dense_retriever = dense_retriever
        self.sparse_retriever = sparse_retriever
        self.dense_weight = dense_weight
        self.sparse_weight = 1 - dense_weight
    
    async def search(
        self,
        query: str,
        top_k: int = 10
    ) -> list[SearchResult]:
        """Hybrid search with score fusion."""
        
        # Get results from both retrievers
        dense_results = await self.dense_retriever.search(query, top_k * 2)
        sparse_results = self.sparse_retriever.search(query, top_k * 2)
        
        # Normalize scores
        dense_results = self._normalize_scores(dense_results)
        sparse_results = self._normalize_scores(sparse_results)
        
        # Combine using weighted sum
        combined = {}
        
        for result in dense_results:
            combined[result.doc_id] = {
                "content": result.content,
                "metadata": result.metadata,
                "dense_score": result.score * self.dense_weight,
                "sparse_score": 0
            }
        
        for result in sparse_results:
            if result.doc_id in combined:
                combined[result.doc_id]["sparse_score"] = result.score * self.sparse_weight
            else:
                combined[result.doc_id] = {
                    "content": result.content,
                    "metadata": result.metadata,
                    "dense_score": 0,
                    "sparse_score": result.score * self.sparse_weight
                }
        
        # Calculate final scores
        final_results = []
        for doc_id, data in combined.items():
            final_score = data["dense_score"] + data["sparse_score"]
            final_results.append(SearchResult(
                doc_id=doc_id,
                content=data["content"],
                score=final_score,
                metadata=data["metadata"]
            ))
        
        # Sort and return top_k
        final_results.sort(key=lambda x: x.score, reverse=True)
        return final_results[:top_k]
    
    def _normalize_scores(self, results: list[SearchResult]) -> list[SearchResult]:
        """Normalize scores to 0-1 range."""
        
        if not results:
            return results
        
        scores = [r.score for r in results]
        min_score = min(scores)
        max_score = max(scores)
        
        if max_score == min_score:
            return results
        
        for result in results:
            result.score = (result.score - min_score) / (max_score - min_score)
        
        return results

Multi-Stage Retrieval

from dataclasses import dataclass
from typing import Any, Callable

@dataclass
class RetrievalStage:
    """A stage in the retrieval pipeline."""
    
    name: str
    retriever: Any
    top_k: int
    filter_fn: Callable[[SearchResult], bool] = None

class MultiStageRetriever:
    """Multi-stage retrieval pipeline."""
    
    def __init__(self):
        self.stages: list[RetrievalStage] = []
    
    def add_stage(
        self,
        name: str,
        retriever: Any,
        top_k: int,
        filter_fn: Callable[[SearchResult], bool] = None
    ):
        """Add a retrieval stage."""
        
        self.stages.append(RetrievalStage(
            name=name,
            retriever=retriever,
            top_k=top_k,
            filter_fn=filter_fn
        ))
    
    async def search(self, query: str) -> list[SearchResult]:
        """Execute multi-stage retrieval."""
        
        results = None
        
        for stage in self.stages:
            if results is None:
                # First stage: retrieve from full corpus
                if hasattr(stage.retriever, 'search'):
                    if asyncio.iscoroutinefunction(stage.retriever.search):
                        results = await stage.retriever.search(query, stage.top_k)
                    else:
                        results = stage.retriever.search(query, stage.top_k)
            else:
                # Subsequent stages: re-rank or filter
                if hasattr(stage.retriever, 'rerank'):
                    results = await stage.retriever.rerank(query, results, stage.top_k)
                else:
                    # Filter and re-score
                    results = results[:stage.top_k]
            
            # Apply filter if provided
            if stage.filter_fn:
                results = [r for r in results if stage.filter_fn(r)]
        
        return results

class ParentDocumentRetriever:
    """Retrieve child chunks, return parent documents."""
    
    def __init__(
        self,
        chunk_retriever: Any,
        parent_store: dict[str, str]  # chunk_id -> parent_id
    ):
        self.chunk_retriever = chunk_retriever
        self.parent_store = parent_store
        self.parent_documents: dict[str, str] = {}
    
    async def search(
        self,
        query: str,
        top_k: int = 5
    ) -> list[SearchResult]:
        """Search chunks, return parent documents."""
        
        # Retrieve chunks
        chunk_results = await self.chunk_retriever.search(query, top_k * 3)
        
        # Map to parents and deduplicate
        parent_scores: dict[str, float] = {}
        
        for result in chunk_results:
            parent_id = self.parent_store.get(result.doc_id)
            if parent_id:
                if parent_id not in parent_scores:
                    parent_scores[parent_id] = result.score
                else:
                    # Take max score among chunks
                    parent_scores[parent_id] = max(parent_scores[parent_id], result.score)
        
        # Build parent results
        results = []
        for parent_id, score in sorted(parent_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]:
            if parent_id in self.parent_documents:
                results.append(SearchResult(
                    doc_id=parent_id,
                    content=self.parent_documents[parent_id],
                    score=score
                ))
        
        return results

class ContextualRetriever:
    """Add surrounding context to retrieved chunks."""
    
    def __init__(
        self,
        base_retriever: Any,
        document_store: dict[str, list[str]],  # doc_id -> list of chunks
        context_window: int = 1  # chunks before and after
    ):
        self.base_retriever = base_retriever
        self.document_store = document_store
        self.context_window = context_window
    
    async def search(
        self,
        query: str,
        top_k: int = 5
    ) -> list[SearchResult]:
        """Search and expand with context."""
        
        results = await self.base_retriever.search(query, top_k)
        
        expanded_results = []
        for result in results:
            expanded_content = self._expand_context(result)
            expanded_results.append(SearchResult(
                doc_id=result.doc_id,
                content=expanded_content,
                score=result.score,
                metadata=result.metadata
            ))
        
        return expanded_results
    
    def _expand_context(self, result: SearchResult) -> str:
        """Expand chunk with surrounding context."""
        
        # Parse doc_id to get document and chunk index
        # Assuming format: "doc_123_chunk_5"
        parts = result.doc_id.split("_chunk_")
        if len(parts) != 2:
            return result.content
        
        doc_id = parts[0]
        chunk_idx = int(parts[1])
        
        if doc_id not in self.document_store:
            return result.content
        
        chunks = self.document_store[doc_id]
        
        # Get context window
        start = max(0, chunk_idx - self.context_window)
        end = min(len(chunks), chunk_idx + self.context_window + 1)
        
        return "\n\n".join(chunks[start:end])

Reranking

from dataclasses import dataclass
from typing import Any

class CrossEncoderReranker:
    """Rerank using cross-encoder model."""
    
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)
    
    def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = 5
    ) -> list[SearchResult]:
        """Rerank results using cross-encoder."""
        
        # Create query-document pairs
        pairs = [(query, r.content) for r in results]
        
        # Score pairs
        scores = self.model.predict(pairs)
        
        # Update scores and sort
        for i, result in enumerate(results):
            result.score = float(scores[i])
        
        results.sort(key=lambda x: x.score, reverse=True)
        
        return results[:top_k]

class CohereReranker:
    """Rerank using Cohere Rerank API."""
    
    def __init__(self, api_key: str):
        import cohere
        self.client = cohere.Client(api_key)
    
    def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = 5
    ) -> list[SearchResult]:
        """Rerank using Cohere."""
        
        documents = [r.content for r in results]
        
        response = self.client.rerank(
            query=query,
            documents=documents,
            top_n=top_k,
            model="rerank-english-v2.0"
        )
        
        reranked = []
        for item in response.results:
            original = results[item.index]
            reranked.append(SearchResult(
                doc_id=original.doc_id,
                content=original.content,
                score=item.relevance_score,
                metadata=original.metadata
            ))
        
        return reranked

class LLMReranker:
    """Rerank using LLM scoring."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = 5
    ) -> list[SearchResult]:
        """Rerank using LLM relevance scoring."""
        
        scored_results = []
        
        for result in results:
            score = await self._score_relevance(query, result.content)
            scored_results.append(SearchResult(
                doc_id=result.doc_id,
                content=result.content,
                score=score,
                metadata=result.metadata
            ))
        
        scored_results.sort(key=lambda x: x.score, reverse=True)
        
        return scored_results[:top_k]
    
    async def _score_relevance(self, query: str, document: str) -> float:
        """Score document relevance to query."""
        
        prompt = f"""Rate how relevant this document is to the query on a scale of 0-10.
Only respond with a number.

Query: {query}

Document: {document[:1000]}

Relevance score (0-10):"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=5
        )
        
        try:
            score = float(response.choices[0].message.content.strip())
            return score / 10  # Normalize to 0-1
        except ValueError:
            return 0.5

class RRFReranker:
    """Reciprocal Rank Fusion for combining multiple result lists."""
    
    def __init__(self, k: int = 60):
        self.k = k
    
    def fuse(
        self,
        result_lists: list[list[SearchResult]],
        top_k: int = 10
    ) -> list[SearchResult]:
        """Fuse multiple result lists using RRF."""
        
        doc_scores: dict[str, float] = {}
        doc_data: dict[str, SearchResult] = {}
        
        for results in result_lists:
            for rank, result in enumerate(results):
                rrf_score = 1 / (self.k + rank + 1)
                
                if result.doc_id not in doc_scores:
                    doc_scores[result.doc_id] = 0
                    doc_data[result.doc_id] = result
                
                doc_scores[result.doc_id] += rrf_score
        
        # Sort by fused score
        sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
        
        results = []
        for doc_id, score in sorted_docs[:top_k]:
            original = doc_data[doc_id]
            results.append(SearchResult(
                doc_id=doc_id,
                content=original.content,
                score=score,
                metadata=original.metadata
            ))
        
        return results

Production Retrieval Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
query_expander = None  # Initialize with actual client
hybrid_retriever = None
multi_stage_retriever = None
reranker = None

class SearchRequest(BaseModel):
    query: str
    top_k: int = 10
    use_expansion: bool = False
    use_reranking: bool = True

class HybridSearchRequest(BaseModel):
    query: str
    top_k: int = 10
    dense_weight: float = 0.5

class MultiQueryRequest(BaseModel):
    query: str
    top_k: int = 10
    strategy: str = "subqueries"  # subqueries, hyde, expansion

@app.post("/v1/search")
async def search(request: SearchRequest):
    """Basic search with optional expansion and reranking."""
    
    query = request.query
    
    # Query expansion
    if request.use_expansion and query_expander:
        expanded = await query_expander.expand_with_synonyms(query)
        query = expanded.transformed
    
    # Search
    results = await hybrid_retriever.search(query, request.top_k * 2)
    
    # Reranking
    if request.use_reranking and reranker:
        results = await reranker.rerank(request.query, results, request.top_k)
    else:
        results = results[:request.top_k]
    
    return {
        "query": request.query,
        "results": [
            {
                "doc_id": r.doc_id,
                "content": r.content[:500],
                "score": r.score
            }
            for r in results
        ]
    }

@app.post("/v1/search/hybrid")
async def hybrid_search(request: HybridSearchRequest):
    """Hybrid dense + sparse search."""
    
    # Temporarily adjust weights
    original_weight = hybrid_retriever.dense_weight
    hybrid_retriever.dense_weight = request.dense_weight
    hybrid_retriever.sparse_weight = 1 - request.dense_weight
    
    try:
        results = await hybrid_retriever.search(request.query, request.top_k)
    finally:
        hybrid_retriever.dense_weight = original_weight
        hybrid_retriever.sparse_weight = 1 - original_weight
    
    return {
        "query": request.query,
        "dense_weight": request.dense_weight,
        "results": [
            {
                "doc_id": r.doc_id,
                "content": r.content[:500],
                "score": r.score
            }
            for r in results
        ]
    }

@app.post("/v1/search/multi-query")
async def multi_query_search(request: MultiQueryRequest):
    """Search with multiple query variations."""
    
    queries = [request.query]
    
    if request.strategy == "subqueries":
        subqueries = await query_expander.generate_subqueries(request.query)
        queries.extend([sq.transformed for sq in subqueries])
    
    elif request.strategy == "hyde":
        hyde = await query_expander.hypothetical_document(request.query)
        queries.append(hyde.transformed)
    
    elif request.strategy == "expansion":
        expanded = await query_expander.expand_with_synonyms(request.query)
        queries.append(expanded.transformed)
    
    # Search with each query
    all_results = []
    for q in queries:
        results = await hybrid_retriever.search(q, request.top_k)
        all_results.append(results)
    
    # Fuse results using RRF
    rrf = RRFReranker()
    fused = rrf.fuse(all_results, request.top_k)
    
    return {
        "original_query": request.query,
        "queries_used": queries,
        "results": [
            {
                "doc_id": r.doc_id,
                "content": r.content[:500],
                "score": r.score
            }
            for r in fused
        ]
    }

@app.post("/v1/search/multi-stage")
async def multi_stage_search(request: SearchRequest):
    """Multi-stage retrieval pipeline."""
    
    results = await multi_stage_retriever.search(request.query)
    
    return {
        "query": request.query,
        "results": [
            {
                "doc_id": r.doc_id,
                "content": r.content[:500],
                "score": r.score
            }
            for r in results[:request.top_k]
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Advanced retrieval strategies dramatically improve RAG system quality. Query transformation—expansion, subqueries, HyDE—helps bridge the vocabulary gap between user queries and document content. Hybrid search combining dense embeddings with sparse BM25 captures both semantic similarity and keyword matches. Multi-stage pipelines let you cast a wide net initially, then progressively filter and refine. Reranking with cross-encoders or LLMs provides a final quality boost by considering query-document pairs jointly. Parent document retrieval and contextual expansion ensure you return enough context for the LLM to generate accurate answers. Reciprocal Rank Fusion elegantly combines results from multiple retrieval strategies. The key is building pipelines that are robust to query variations while maintaining low latency. Start simple, measure retrieval quality, and add complexity only where it demonstrably improves results.