Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Retrieval Augmented Generation Patterns: Building RAG Systems That Actually Work

Introduction: Retrieval Augmented Generation (RAG) grounds LLM responses in your actual data, reducing hallucinations and enabling knowledge that wasn’t in the training set. But naive RAG—embed documents, retrieve top-k, stuff into prompt—often disappoints. Retrieval misses relevant documents, context windows overflow, and the model ignores important information buried in long contexts. This guide covers advanced RAG patterns that actually work: query transformation for better retrieval, hybrid search combining dense and sparse methods, reranking to surface the best results, context compression to fit more information, and iterative retrieval for complex questions.

RAG Patterns
RAG Pipeline: Document Retrieval, Reranking, Context Augmentation

Query Transformation

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class TransformedQuery:
    """A transformed query for retrieval."""
    
    original: str
    transformed: str
    sub_queries: list[str] = None
    hypothetical_answer: str = None

class QueryTransformer:
    """Transform queries for better retrieval."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def expand_query(self, query: str) -> TransformedQuery:
        """Expand query with related terms and concepts."""
        
        prompt = f"""Expand this search query by adding related terms, synonyms, and concepts that would help find relevant documents.

Original query: {query}

Provide an expanded version that includes:
- Synonyms for key terms
- Related concepts
- Alternative phrasings

Expanded query:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return TransformedQuery(
            original=query,
            transformed=response.choices[0].message.content.strip()
        )
    
    async def decompose_query(self, query: str) -> TransformedQuery:
        """Decompose complex query into sub-queries."""
        
        prompt = f"""Break down this complex question into simpler sub-questions that can be answered independently.

Question: {query}

List 2-4 sub-questions that together would answer the main question.
Format as a numbered list."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        # Parse sub-queries
        import re
        text = response.choices[0].message.content
        sub_queries = re.findall(r'\d+\.\s*(.+?)(?=\d+\.|$)', text, re.DOTALL)
        sub_queries = [q.strip() for q in sub_queries if q.strip()]
        
        return TransformedQuery(
            original=query,
            transformed=query,
            sub_queries=sub_queries
        )
    
    async def generate_hypothetical_answer(self, query: str) -> TransformedQuery:
        """Generate hypothetical answer for HyDE retrieval."""
        
        prompt = f"""Generate a hypothetical answer to this question. The answer should be detailed and contain the kind of information that would be in a document answering this question.

Question: {query}

Hypothetical answer:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        
        return TransformedQuery(
            original=query,
            transformed=query,
            hypothetical_answer=response.choices[0].message.content.strip()
        )
    
    async def rewrite_for_retrieval(self, query: str) -> TransformedQuery:
        """Rewrite conversational query for retrieval."""
        
        prompt = f"""Rewrite this conversational query as a clear, specific search query optimized for document retrieval.

Conversational query: {query}

Search-optimized query:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return TransformedQuery(
            original=query,
            transformed=response.choices[0].message.content.strip()
        )

class MultiQueryRetriever:
    """Retrieve using multiple query variations."""
    
    def __init__(
        self,
        transformer: QueryTransformer,
        retriever: Any,
        num_variations: int = 3
    ):
        self.transformer = transformer
        self.retriever = retriever
        self.num_variations = num_variations
    
    async def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
        """Retrieve using multiple query variations."""
        
        # Generate query variations
        variations = [query]
        
        # Add expanded query
        expanded = await self.transformer.expand_query(query)
        variations.append(expanded.transformed)
        
        # Add hypothetical answer for HyDE
        hyde = await self.transformer.generate_hypothetical_answer(query)
        if hyde.hypothetical_answer:
            variations.append(hyde.hypothetical_answer)
        
        # Retrieve for each variation
        all_results = []
        seen_ids = set()
        
        for variation in variations[:self.num_variations]:
            results = await self.retriever.search(variation, top_k=top_k)
            
            for result in results:
                if result["id"] not in seen_ids:
                    all_results.append(result)
                    seen_ids.add(result["id"])
        
        return all_results[:top_k * 2]  # Return more for reranking

Hybrid Search

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

@dataclass
class SearchResult:
    """A search result with scores."""
    
    id: str
    content: str
    metadata: dict
    dense_score: float = 0.0
    sparse_score: float = 0.0
    combined_score: float = 0.0

class HybridSearcher:
    """Combine dense and sparse search."""
    
    def __init__(
        self,
        embedding_client: Any,
        vector_store: Any,
        sparse_index: Any = None,
        alpha: float = 0.5
    ):
        self.embedding_client = embedding_client
        self.vector_store = vector_store
        self.sparse_index = sparse_index
        self.alpha = alpha  # Weight for dense vs sparse
    
    async def search(
        self,
        query: str,
        top_k: int = 10,
        filter: dict = None
    ) -> list[SearchResult]:
        """Perform hybrid search."""
        
        # Dense search
        dense_results = await self._dense_search(query, top_k * 2, filter)
        
        # Sparse search (BM25)
        sparse_results = await self._sparse_search(query, top_k * 2, filter)
        
        # Combine results
        combined = self._combine_results(dense_results, sparse_results)
        
        return combined[:top_k]
    
    async def _dense_search(
        self,
        query: str,
        top_k: int,
        filter: dict = None
    ) -> list[SearchResult]:
        """Perform dense vector search."""
        
        # Get query embedding
        response = await self.embedding_client.embeddings.create(
            model="text-embedding-3-small",
            input=query
        )
        query_embedding = response.data[0].embedding
        
        # Search vector store
        results = await self.vector_store.search(
            vector=query_embedding,
            top_k=top_k,
            filter=filter
        )
        
        return [
            SearchResult(
                id=r["id"],
                content=r["content"],
                metadata=r.get("metadata", {}),
                dense_score=r["score"]
            )
            for r in results
        ]
    
    async def _sparse_search(
        self,
        query: str,
        top_k: int,
        filter: dict = None
    ) -> list[SearchResult]:
        """Perform sparse BM25 search."""
        
        if not self.sparse_index:
            return []
        
        results = self.sparse_index.search(query, top_k=top_k)
        
        return [
            SearchResult(
                id=r["id"],
                content=r["content"],
                metadata=r.get("metadata", {}),
                sparse_score=r["score"]
            )
            for r in results
        ]
    
    def _combine_results(
        self,
        dense_results: list[SearchResult],
        sparse_results: list[SearchResult]
    ) -> list[SearchResult]:
        """Combine and rerank results using RRF."""
        
        # Reciprocal Rank Fusion
        k = 60  # RRF constant
        
        scores = {}
        results_map = {}
        
        # Score dense results
        for rank, result in enumerate(dense_results):
            rrf_score = 1 / (k + rank + 1)
            scores[result.id] = scores.get(result.id, 0) + self.alpha * rrf_score
            results_map[result.id] = result
        
        # Score sparse results
        for rank, result in enumerate(sparse_results):
            rrf_score = 1 / (k + rank + 1)
            scores[result.id] = scores.get(result.id, 0) + (1 - self.alpha) * rrf_score
            if result.id not in results_map:
                results_map[result.id] = result
        
        # Sort by combined score
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        
        combined = []
        for doc_id in sorted_ids:
            result = results_map[doc_id]
            result.combined_score = scores[doc_id]
            combined.append(result)
        
        return combined

class BM25Index:
    """Simple BM25 index for sparse search."""
    
    def __init__(self, k1: float = 1.5, b: float = 0.75):
        self.k1 = k1
        self.b = b
        self.documents = []
        self.doc_lengths = []
        self.avg_doc_length = 0
        self.term_frequencies = {}
        self.doc_frequencies = {}
        self.idf = {}
    
    def add_documents(self, documents: list[dict]) -> None:
        """Add documents to the index."""
        
        import re
        
        for doc in documents:
            self.documents.append(doc)
            
            # Tokenize
            tokens = re.findall(r'\w+', doc["content"].lower())
            self.doc_lengths.append(len(tokens))
            
            # Count term frequencies
            tf = {}
            for token in tokens:
                tf[token] = tf.get(token, 0) + 1
            
            self.term_frequencies[doc["id"]] = tf
            
            # Update document frequencies
            for token in set(tokens):
                self.doc_frequencies[token] = self.doc_frequencies.get(token, 0) + 1
        
        # Calculate average document length
        self.avg_doc_length = sum(self.doc_lengths) / len(self.doc_lengths) if self.doc_lengths else 0
        
        # Calculate IDF
        import math
        n = len(self.documents)
        for term, df in self.doc_frequencies.items():
            self.idf[term] = math.log((n - df + 0.5) / (df + 0.5) + 1)
    
    def search(self, query: str, top_k: int = 10) -> list[dict]:
        """Search the index."""
        
        import re
        
        query_tokens = re.findall(r'\w+', query.lower())
        
        scores = []
        for i, doc in enumerate(self.documents):
            score = 0
            tf = self.term_frequencies[doc["id"]]
            doc_length = self.doc_lengths[i]
            
            for token in query_tokens:
                if token in tf:
                    freq = tf[token]
                    idf = self.idf.get(token, 0)
                    
                    # BM25 formula
                    numerator = freq * (self.k1 + 1)
                    denominator = freq + self.k1 * (1 - self.b + self.b * doc_length / self.avg_doc_length)
                    score += idf * numerator / denominator
            
            scores.append((doc, score))
        
        # Sort by score
        scores.sort(key=lambda x: x[1], reverse=True)
        
        return [
            {"id": doc["id"], "content": doc["content"], "metadata": doc.get("metadata", {}), "score": score}
            for doc, score in scores[:top_k]
        ]

Reranking

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class RerankResult:
    """A reranked result."""
    
    id: str
    content: str
    original_rank: int
    new_rank: int
    relevance_score: float

class CrossEncoderReranker:
    """Rerank using cross-encoder model."""
    
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)
    
    def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = None
    ) -> list[RerankResult]:
        """Rerank results using cross-encoder."""
        
        # Create query-document pairs
        pairs = [(query, r.content) for r in results]
        
        # Score pairs
        scores = self.model.predict(pairs)
        
        # Create reranked results
        reranked = []
        for i, (result, score) in enumerate(zip(results, scores)):
            reranked.append(RerankResult(
                id=result.id,
                content=result.content,
                original_rank=i,
                new_rank=0,  # Will be set after sorting
                relevance_score=float(score)
            ))
        
        # Sort by relevance score
        reranked.sort(key=lambda x: x.relevance_score, reverse=True)
        
        # Update ranks
        for i, result in enumerate(reranked):
            result.new_rank = i
        
        if top_k:
            reranked = reranked[:top_k]
        
        return reranked

class LLMReranker:
    """Rerank using LLM as judge."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = 5
    ) -> list[RerankResult]:
        """Rerank using LLM scoring."""
        
        # Score each result
        scored_results = []
        
        for i, result in enumerate(results):
            score = await self._score_relevance(query, result.content)
            scored_results.append(RerankResult(
                id=result.id,
                content=result.content,
                original_rank=i,
                new_rank=0,
                relevance_score=score
            ))
        
        # Sort by score
        scored_results.sort(key=lambda x: x.relevance_score, reverse=True)
        
        # Update ranks
        for i, result in enumerate(scored_results):
            result.new_rank = i
        
        return scored_results[:top_k]
    
    async def _score_relevance(self, query: str, document: str) -> float:
        """Score document relevance to query."""
        
        prompt = f"""Rate how relevant this document is to the query on a scale of 0-10.

Query: {query}

Document: {document[:1000]}

Respond with only a number from 0-10:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            max_tokens=10
        )
        
        try:
            score = float(response.choices[0].message.content.strip())
            return min(max(score, 0), 10) / 10
        except:
            return 0.5

class CohereReranker:
    """Rerank using Cohere rerank API."""
    
    def __init__(self, client: Any, model: str = "rerank-english-v3.0"):
        self.client = client
        self.model = model
    
    async def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = 10
    ) -> list[RerankResult]:
        """Rerank using Cohere."""
        
        documents = [r.content for r in results]
        
        response = await self.client.rerank(
            model=self.model,
            query=query,
            documents=documents,
            top_n=top_k
        )
        
        reranked = []
        for i, item in enumerate(response.results):
            original_idx = item.index
            reranked.append(RerankResult(
                id=results[original_idx].id,
                content=results[original_idx].content,
                original_rank=original_idx,
                new_rank=i,
                relevance_score=item.relevance_score
            ))
        
        return reranked

Context Compression

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class CompressedContext:
    """Compressed context for LLM."""
    
    original_length: int
    compressed_length: int
    content: str
    sources: list[str]

class ContextCompressor:
    """Compress retrieved context to fit token limits."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def compress(
        self,
        query: str,
        documents: list[str],
        max_tokens: int = 2000
    ) -> CompressedContext:
        """Compress documents to fit token limit."""
        
        original_text = "\n\n".join(documents)
        original_length = len(original_text)
        
        # If already fits, return as-is
        estimated_tokens = len(original_text) // 4
        if estimated_tokens <= max_tokens:
            return CompressedContext(
                original_length=original_length,
                compressed_length=original_length,
                content=original_text,
                sources=[f"doc_{i}" for i in range(len(documents))]
            )
        
        # Extract relevant sentences
        compressed = await self._extract_relevant(query, documents, max_tokens)
        
        return CompressedContext(
            original_length=original_length,
            compressed_length=len(compressed),
            content=compressed,
            sources=[f"doc_{i}" for i in range(len(documents))]
        )
    
    async def _extract_relevant(
        self,
        query: str,
        documents: list[str],
        max_tokens: int
    ) -> str:
        """Extract most relevant sentences."""
        
        prompt = f"""Given this query and documents, extract only the sentences that are directly relevant to answering the query.

Query: {query}

Documents:
{chr(10).join(f"[Doc {i}]: {doc[:500]}" for i, doc in enumerate(documents))}

Extract the most relevant sentences (aim for about {max_tokens // 4} words):"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            max_tokens=max_tokens
        )
        
        return response.choices[0].message.content

class ChunkSelector:
    """Select most relevant chunks from documents."""
    
    def __init__(self, embedding_client: Any):
        self.embedding_client = embedding_client
    
    async def select_chunks(
        self,
        query: str,
        chunks: list[str],
        max_chunks: int = 5
    ) -> list[str]:
        """Select most relevant chunks."""
        
        # Get embeddings
        all_texts = [query] + chunks
        response = await self.embedding_client.embeddings.create(
            model="text-embedding-3-small",
            input=all_texts
        )
        
        embeddings = [d.embedding for d in response.data]
        query_embedding = embeddings[0]
        chunk_embeddings = embeddings[1:]
        
        # Calculate similarities
        import numpy as np
        
        similarities = []
        for i, chunk_emb in enumerate(chunk_embeddings):
            sim = np.dot(query_embedding, chunk_emb) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(chunk_emb)
            )
            similarities.append((i, sim))
        
        # Sort by similarity
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # Select top chunks
        selected_indices = [idx for idx, _ in similarities[:max_chunks]]
        selected_indices.sort()  # Maintain original order
        
        return [chunks[i] for i in selected_indices]

class ContextBuilder:
    """Build optimized context for RAG."""
    
    def __init__(
        self,
        compressor: ContextCompressor,
        chunk_selector: ChunkSelector,
        max_context_tokens: int = 4000
    ):
        self.compressor = compressor
        self.chunk_selector = chunk_selector
        self.max_context_tokens = max_context_tokens
    
    async def build_context(
        self,
        query: str,
        documents: list[dict]
    ) -> str:
        """Build optimized context from documents."""
        
        # Extract all chunks
        all_chunks = []
        for doc in documents:
            chunks = self._chunk_document(doc["content"])
            all_chunks.extend(chunks)
        
        # Select relevant chunks
        selected = await self.chunk_selector.select_chunks(
            query,
            all_chunks,
            max_chunks=10
        )
        
        # Compress if needed
        compressed = await self.compressor.compress(
            query,
            selected,
            self.max_context_tokens
        )
        
        return compressed.content
    
    def _chunk_document(self, text: str, chunk_size: int = 500) -> list[str]:
        """Split document into chunks."""
        
        sentences = text.replace('\n', ' ').split('. ')
        
        chunks = []
        current_chunk = []
        current_length = 0
        
        for sentence in sentences:
            sentence_length = len(sentence)
            
            if current_length + sentence_length > chunk_size and current_chunk:
                chunks.append('. '.join(current_chunk) + '.')
                current_chunk = []
                current_length = 0
            
            current_chunk.append(sentence)
            current_length += sentence_length
        
        if current_chunk:
            chunks.append('. '.join(current_chunk) + '.')
        
        return chunks

Production RAG Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
query_transformer = None  # Initialize with client
hybrid_searcher = None
reranker = None
context_builder = None
llm_client = None

class RAGRequest(BaseModel):
    query: str
    top_k: int = 5
    use_reranking: bool = True
    use_compression: bool = True

class SearchRequest(BaseModel):
    query: str
    top_k: int = 10
    search_type: str = "hybrid"

@app.post("/v1/rag")
async def rag_query(request: RAGRequest):
    """Full RAG pipeline."""
    
    # Transform query
    transformed = await query_transformer.rewrite_for_retrieval(request.query)
    
    # Hybrid search
    results = await hybrid_searcher.search(
        transformed.transformed,
        top_k=request.top_k * 2
    )
    
    # Rerank if enabled
    if request.use_reranking and results:
        reranked = await reranker.rerank(
            request.query,
            results,
            top_k=request.top_k
        )
        documents = [{"content": r.content, "id": r.id} for r in reranked]
    else:
        documents = [{"content": r.content, "id": r.id} for r in results[:request.top_k]]
    
    # Build context
    if request.use_compression:
        context = await context_builder.build_context(request.query, documents)
    else:
        context = "\n\n".join([d["content"] for d in documents])
    
    # Generate response
    prompt = f"""Answer the question based on the following context.

Context:
{context}

Question: {request.query}

Answer:"""
    
    response = await llm_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )
    
    return {
        "answer": response.choices[0].message.content,
        "sources": [d["id"] for d in documents],
        "query_transformed": transformed.transformed
    }

@app.post("/v1/search")
async def search(request: SearchRequest):
    """Search without generation."""
    
    if request.search_type == "hybrid":
        results = await hybrid_searcher.search(request.query, request.top_k)
    else:
        # Dense only
        results = await hybrid_searcher._dense_search(request.query, request.top_k, None)
    
    return {
        "results": [
            {
                "id": r.id,
                "content": r.content[:500],
                "score": r.combined_score or r.dense_score
            }
            for r in results
        ]
    }

@app.post("/v1/rerank")
async def rerank_results(query: str, documents: list[str], top_k: int = 5):
    """Rerank documents."""
    
    results = [
        SearchResult(id=f"doc_{i}", content=doc, metadata={})
        for i, doc in enumerate(documents)
    ]
    
    reranked = await reranker.rerank(query, results, top_k)
    
    return {
        "reranked": [
            {
                "id": r.id,
                "content": r.content[:500],
                "original_rank": r.original_rank,
                "new_rank": r.new_rank,
                "score": r.relevance_score
            }
            for r in reranked
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Effective RAG requires attention at every stage of the pipeline. Query transformation improves retrieval by expanding queries, decomposing complex questions, and generating hypothetical answers for better embedding matches. Hybrid search combines the precision of keyword matching with the semantic understanding of dense retrieval—neither alone is sufficient for production systems. Reranking with cross-encoders or LLMs surfaces the most relevant results from a larger candidate set, dramatically improving the quality of context provided to the LLM. Context compression ensures you fit the most important information within token limits, using techniques like relevant sentence extraction and chunk selection. For production systems, implement all these stages with appropriate fallbacks, monitor retrieval quality with metrics like MRR and recall, and continuously improve based on user feedback. The goal is grounded, accurate responses that cite their sources—RAG done right makes LLMs genuinely useful for knowledge-intensive tasks.