Advanced RAG Patterns: From Naive Retrieval to Production-Grade Systems

Introduction: Retrieval-Augmented Generation (RAG) has become the go-to architecture for building LLM applications that need access to private or current information. By retrieving relevant documents and including them in the prompt, RAG grounds LLM responses in factual content, reducing hallucinations and enabling knowledge that wasn’t in the training data. But naive RAG implementations often disappoint—the real skill lies in choosing the right chunking strategy, retrieval method, and generation approach for your use case. This guide covers advanced RAG patterns that separate production systems from prototypes.

RAG Architecture Patterns
RAG: From Document Ingestion to Grounded Responses

Document Chunking Strategies

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter,
    TokenTextSplitter
)
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings

# 1. Fixed-size chunking (simple but effective)
fixed_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    separators=["\n\n", "\n", ". ", " ", ""]
)

# 2. Token-based chunking (respects model limits)
token_splitter = TokenTextSplitter(
    chunk_size=256,
    chunk_overlap=20,
    encoding_name="cl100k_base"  # GPT-4 tokenizer
)

# 3. Markdown-aware chunking (preserves structure)
md_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
    ]
)

# 4. Semantic chunking (groups related content)
semantic_splitter = SemanticChunker(
    OpenAIEmbeddings(),
    breakpoint_threshold_type="percentile",
    breakpoint_threshold_amount=95
)

# Custom chunking for code
def chunk_code_file(content: str, language: str) -> list[dict]:
    """Chunk code files by function/class boundaries."""
    import ast
    
    if language == "python":
        tree = ast.parse(content)
        chunks = []
        
        for node in ast.walk(tree):
            if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
                start_line = node.lineno - 1
                end_line = node.end_lineno
                
                lines = content.split("\n")
                chunk_content = "\n".join(lines[start_line:end_line])
                
                chunks.append({
                    "content": chunk_content,
                    "metadata": {
                        "type": type(node).__name__,
                        "name": node.name,
                        "start_line": start_line,
                        "end_line": end_line
                    }
                })
        
        return chunks
    
    # Fallback to fixed chunking
    return [{"content": c, "metadata": {}} for c in fixed_splitter.split_text(content)]

# Choosing chunk size
def optimal_chunk_size(avg_query_length: int, context_window: int) -> int:
    """Estimate optimal chunk size based on query patterns."""
    # Rule of thumb: chunk should be 2-4x query length
    # Leave room for multiple chunks in context
    max_chunks = 5
    available_tokens = context_window * 0.6  # Reserve 40% for response
    
    return min(
        avg_query_length * 3,
        int(available_tokens / max_chunks)
    )

Hybrid Search with Re-ranking

import numpy as np
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder

class HybridRetriever:
    """Combines dense and sparse retrieval with re-ranking."""
    
    def __init__(
        self,
        documents: list[str],
        embeddings: np.ndarray,
        embedding_model,
        reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    ):
        self.documents = documents
        self.embeddings = embeddings
        self.embedding_model = embedding_model
        
        # Initialize BM25 for sparse retrieval
        tokenized = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized)
        
        # Initialize cross-encoder for re-ranking
        self.reranker = CrossEncoder(reranker_model)
    
    def search(
        self,
        query: str,
        top_k: int = 5,
        alpha: float = 0.5,  # Weight for dense vs sparse
        rerank: bool = True,
        rerank_top_k: int = 20
    ) -> list[dict]:
        """Hybrid search with optional re-ranking."""
        
        # Get more candidates for re-ranking
        candidate_k = rerank_top_k if rerank else top_k
        
        # Dense retrieval
        query_embedding = self.embedding_model.encode(query)
        dense_scores = np.dot(self.embeddings, query_embedding)
        dense_scores = (dense_scores - dense_scores.min()) / (dense_scores.max() - dense_scores.min() + 1e-8)
        
        # Sparse retrieval (BM25)
        sparse_scores = np.array(self.bm25.get_scores(query.lower().split()))
        sparse_scores = (sparse_scores - sparse_scores.min()) / (sparse_scores.max() - sparse_scores.min() + 1e-8)
        
        # Combine scores (Reciprocal Rank Fusion alternative)
        hybrid_scores = alpha * dense_scores + (1 - alpha) * sparse_scores
        
        # Get top candidates
        top_indices = np.argsort(hybrid_scores)[::-1][:candidate_k]
        
        if rerank:
            # Re-rank with cross-encoder
            candidates = [(query, self.documents[i]) for i in top_indices]
            rerank_scores = self.reranker.predict(candidates)
            
            # Sort by rerank scores
            reranked = sorted(
                zip(top_indices, rerank_scores),
                key=lambda x: x[1],
                reverse=True
            )[:top_k]
            
            return [
                {
                    "document": self.documents[idx],
                    "score": float(score),
                    "index": int(idx)
                }
                for idx, score in reranked
            ]
        
        return [
            {
                "document": self.documents[idx],
                "score": float(hybrid_scores[idx]),
                "index": int(idx)
            }
            for idx in top_indices[:top_k]
        ]

# Usage
from sentence_transformers import SentenceTransformer

encoder = SentenceTransformer("all-MiniLM-L6-v2")
documents = ["Document 1...", "Document 2...", "Document 3..."]
embeddings = encoder.encode(documents)

retriever = HybridRetriever(documents, embeddings, encoder)
results = retriever.search("What is machine learning?", top_k=3, rerank=True)

Advanced RAG Patterns

from openai import OpenAI

client = OpenAI()

# Pattern 1: Query Expansion
def expand_query(query: str) -> list[str]:
    """Generate multiple query variations for better retrieval."""
    
    response = client.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[{
            "role": "user",
            "content": f"""Generate 3 alternative phrasings of this search query.
Return only the queries, one per line.

Original query: {query}"""
        }],
        temperature=0.7
    )
    
    variations = response.choices[0].message.content.strip().split("\n")
    return [query] + [v.strip() for v in variations if v.strip()]

# Pattern 2: Hypothetical Document Embeddings (HyDE)
def hyde_retrieval(query: str, retriever) -> list[dict]:
    """Generate hypothetical answer, then retrieve similar documents."""
    
    # Generate hypothetical answer
    response = client.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[{
            "role": "user",
            "content": f"Write a detailed paragraph answering: {query}"
        }]
    )
    
    hypothetical_doc = response.choices[0].message.content
    
    # Retrieve using the hypothetical document
    return retriever.search(hypothetical_doc)

# Pattern 3: Self-Query (Metadata Filtering)
def self_query_retrieval(query: str, retriever) -> list[dict]:
    """Extract filters from natural language query."""
    
    response = client.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[{
            "role": "user",
            "content": f"""Extract search parameters from this query.
Return JSON with: {{"search_query": "...", "filters": {{"date_after": null, "category": null, "author": null}}}}

Query: {query}"""
        }],
        response_format={"type": "json_object"}
    )
    
    import json
    params = json.loads(response.choices[0].message.content)
    
    # Apply filters and search
    return retriever.search(
        params["search_query"],
        filters=params["filters"]
    )

# Pattern 4: Contextual Compression
def compress_context(query: str, documents: list[str]) -> list[str]:
    """Extract only relevant portions from retrieved documents."""
    
    compressed = []
    
    for doc in documents:
        response = client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[{
                "role": "user",
                "content": f"""Extract only the sentences relevant to the query.
If nothing is relevant, respond with "NOT_RELEVANT".

Query: {query}

Document: {doc}"""
            }],
            max_tokens=500
        )
        
        result = response.choices[0].message.content
        if result != "NOT_RELEVANT":
            compressed.append(result)
    
    return compressed

# Pattern 5: Multi-hop RAG
def multi_hop_rag(query: str, retriever, max_hops: int = 3) -> str:
    """Iteratively retrieve and reason for complex queries."""
    
    context = []
    current_query = query
    
    for hop in range(max_hops):
        # Retrieve for current query
        results = retriever.search(current_query, top_k=3)
        context.extend([r["document"] for r in results])
        
        # Check if we have enough information
        response = client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[
                {"role": "system", "content": "Determine if the context is sufficient to answer the question."},
                {"role": "user", "content": f"""Question: {query}

Context: {' '.join(context[-6:])}

Can you fully answer the question? If yes, provide the answer.
If no, what additional information do you need? Phrase it as a search query."""}
            ]
        )
        
        answer = response.choices[0].message.content
        
        if "additional information" not in answer.lower():
            return answer
        
        # Extract follow-up query
        current_query = answer.split(":")[-1].strip()
    
    # Final answer with all context
    return generate_answer(query, context)

Production RAG Pipeline

from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class RAGConfig:
    chunk_size: int = 500
    chunk_overlap: int = 50
    top_k: int = 5
    rerank: bool = True
    use_hyde: bool = False
    max_context_tokens: int = 4000

class ProductionRAG:
    """Production-ready RAG pipeline with caching and monitoring."""
    
    def __init__(self, config: RAGConfig, vector_store, llm_client):
        self.config = config
        self.vector_store = vector_store
        self.llm = llm_client
        self.cache = {}  # Use Redis in production
    
    def _cache_key(self, query: str) -> str:
        return hashlib.md5(query.lower().strip().encode()).hexdigest()
    
    def query(
        self,
        question: str,
        filters: Optional[dict] = None,
        stream: bool = False
    ) -> dict:
        """Execute RAG query with full pipeline."""
        
        # Check cache
        cache_key = self._cache_key(question)
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # Retrieve
        if self.config.use_hyde:
            docs = self._hyde_retrieve(question)
        else:
            docs = self.vector_store.search(
                question,
                top_k=self.config.top_k * 2 if self.config.rerank else self.config.top_k,
                filters=filters
            )
        
        # Rerank
        if self.config.rerank:
            docs = self._rerank(question, docs)[:self.config.top_k]
        
        # Build context
        context = self._build_context(docs)
        
        # Generate
        response = self._generate(question, context, stream)
        
        result = {
            "answer": response,
            "sources": [{"content": d["content"][:200], "metadata": d.get("metadata", {})} for d in docs],
            "query": question
        }
        
        # Cache
        self.cache[cache_key] = result
        
        return result
    
    def _build_context(self, docs: list[dict]) -> str:
        """Build context string respecting token limits."""
        context_parts = []
        total_tokens = 0
        
        for doc in docs:
            # Rough token estimate
            doc_tokens = len(doc["content"]) // 4
            
            if total_tokens + doc_tokens > self.config.max_context_tokens:
                break
            
            context_parts.append(doc["content"])
            total_tokens += doc_tokens
        
        return "\n\n---\n\n".join(context_parts)
    
    def _generate(self, question: str, context: str, stream: bool) -> str:
        """Generate answer with citation instructions."""
        
        system_prompt = """You are a helpful assistant that answers questions based on the provided context.
Always cite your sources by referencing the relevant context.
If the context doesn't contain enough information, say so clearly.
Do not make up information not present in the context."""

        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"""Context:
{context}

Question: {question}

Provide a comprehensive answer based on the context above."""}
        ]
        
        if stream:
            return self.llm.chat.completions.create(
                model="gpt-4-turbo-preview",
                messages=messages,
                stream=True
            )
        
        response = self.llm.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=messages
        )
        
        return response.choices[0].message.content

# Usage
config = RAGConfig(chunk_size=500, top_k=5, rerank=True)
rag = ProductionRAG(config, vector_store, client)
result = rag.query("What are the benefits of RAG?")
print(result["answer"])

References

Conclusion

RAG is deceptively simple in concept but challenging to optimize in practice. The difference between a prototype and production system lies in the details: semantic chunking that preserves context, hybrid retrieval that combines the strengths of dense and sparse methods, re-ranking that surfaces the most relevant results, and generation prompts that encourage grounded responses. Start with a simple pipeline, measure retrieval quality with tools like RAGAS, and iteratively improve based on failure cases. Remember that RAG is not just about retrieval—the generation step matters too. Experiment with context compression, citation instructions, and multi-hop reasoning for complex queries. The best RAG systems are those that know their limitations and gracefully handle queries outside their knowledge base.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.