RAG Query Optimization: Transforming User Questions into Effective Retrieval

Introduction: RAG quality depends heavily on retrieval quality, and retrieval quality depends on query quality. Users often ask vague questions, use different terminology than your documents, or need information that spans multiple topics. Query optimization bridges this gap—transforming user queries into forms that retrieve the most relevant documents. This guide covers practical query optimization techniques: expanding queries with synonyms and related terms, rewriting queries for better semantic matching, routing queries to appropriate indexes, and building systems that consistently find the right information regardless of how users phrase their questions.

RAG Query
Query Optimization: Query Expansion, Query Rewriting, Query Routing

Query Expansion

from dataclasses import dataclass, field
from typing import Any, Optional
import asyncio

@dataclass
class ExpandedQuery:
    """An expanded query with variations."""
    
    original: str
    expansions: list[str]
    synonyms: dict[str, list[str]] = field(default_factory=dict)
    
    def get_all_queries(self) -> list[str]:
        """Get all query variations."""
        return [self.original] + self.expansions

class SynonymExpander:
    """Expand queries with synonyms."""
    
    # Domain-specific synonyms
    SYNONYMS = {
        "error": ["exception", "failure", "bug", "issue", "problem"],
        "fast": ["quick", "rapid", "speedy", "performant", "efficient"],
        "create": ["make", "build", "generate", "construct", "initialize"],
        "delete": ["remove", "drop", "destroy", "clear", "erase"],
        "update": ["modify", "change", "edit", "alter", "patch"],
        "get": ["fetch", "retrieve", "obtain", "read", "query"],
        "api": ["endpoint", "interface", "service", "rest"],
        "database": ["db", "datastore", "storage", "repository"],
        "config": ["configuration", "settings", "options", "parameters"],
        "auth": ["authentication", "authorization", "login", "credentials"],
    }
    
    def __init__(self, custom_synonyms: dict[str, list[str]] = None):
        self.synonyms = {**self.SYNONYMS}
        if custom_synonyms:
            self.synonyms.update(custom_synonyms)
    
    def expand(self, query: str) -> ExpandedQuery:
        """Expand query with synonyms."""
        
        words = query.lower().split()
        found_synonyms = {}
        expansions = []
        
        for word in words:
            if word in self.synonyms:
                found_synonyms[word] = self.synonyms[word]
                
                # Create variations
                for synonym in self.synonyms[word][:3]:  # Limit expansions
                    expanded = query.lower().replace(word, synonym)
                    if expanded != query.lower():
                        expansions.append(expanded)
        
        return ExpandedQuery(
            original=query,
            expansions=expansions[:5],  # Limit total expansions
            synonyms=found_synonyms
        )

class LLMQueryExpander:
    """Use LLM to generate query expansions."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def expand(self, query: str, num_expansions: int = 3) -> ExpandedQuery:
        """Generate query expansions using LLM."""
        
        prompt = f"""Generate {num_expansions} alternative phrasings of this search query.
Each alternative should:
- Preserve the original intent
- Use different words or phrasing
- Be suitable for semantic search

Original query: {query}

Return only the alternative queries, one per line, without numbering or explanation."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7
        )
        
        expansions = [
            line.strip()
            for line in response.choices[0].message.content.strip().split('\n')
            if line.strip()
        ]
        
        return ExpandedQuery(
            original=query,
            expansions=expansions[:num_expansions]
        )

class HyDEExpander:
    """Hypothetical Document Embeddings expansion."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def expand(self, query: str) -> ExpandedQuery:
        """Generate hypothetical document that would answer the query."""
        
        prompt = f"""Write a short paragraph that would be the ideal answer to this question.
Write as if you're writing documentation that directly addresses the query.

Query: {query}

Ideal answer paragraph:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        
        hypothetical_doc = response.choices[0].message.content.strip()
        
        return ExpandedQuery(
            original=query,
            expansions=[hypothetical_doc]
        )

Query Rewriting

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class RewrittenQuery:
    """A rewritten query."""
    
    original: str
    rewritten: str
    reasoning: str = None

class QueryRewriter:
    """Rewrite queries for better retrieval."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def rewrite(
        self,
        query: str,
        context: str = None
    ) -> RewrittenQuery:
        """Rewrite query for better semantic matching."""
        
        context_section = f"\nConversation context: {context}" if context else ""
        
        prompt = f"""Rewrite this search query to be more specific and searchable.

Original query: {query}{context_section}

Guidelines:
- Expand abbreviations
- Add relevant technical terms
- Make implicit requirements explicit
- Remove filler words
- Keep the core intent

Return the rewritten query only, no explanation."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        
        return RewrittenQuery(
            original=query,
            rewritten=response.choices[0].message.content.strip()
        )

class ConversationalRewriter:
    """Rewrite queries considering conversation history."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def rewrite(
        self,
        query: str,
        history: list[dict]
    ) -> RewrittenQuery:
        """Rewrite query with conversation context."""
        
        # Format history
        history_text = "\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in history[-5:]  # Last 5 messages
        ])
        
        prompt = f"""Given this conversation history, rewrite the latest query to be standalone.
The rewritten query should include all necessary context from the conversation.

Conversation:
{history_text}

Latest query: {query}

Rewrite the query to be self-contained and searchable.
Return only the rewritten query."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        
        return RewrittenQuery(
            original=query,
            rewritten=response.choices[0].message.content.strip()
        )

class StepBackRewriter:
    """Step-back prompting for complex queries."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def rewrite(self, query: str) -> list[RewrittenQuery]:
        """Generate step-back queries for complex questions."""
        
        prompt = f"""For this specific question, generate broader questions that would help answer it.
These should be more general questions whose answers provide background knowledge.

Specific question: {query}

Generate 2-3 broader questions that would help answer the specific question.
Return one question per line."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.5
        )
        
        step_back_queries = [
            line.strip()
            for line in response.choices[0].message.content.strip().split('\n')
            if line.strip()
        ]
        
        return [
            RewrittenQuery(original=query, rewritten=q)
            for q in step_back_queries
        ]

class DecompositionRewriter:
    """Decompose complex queries into sub-queries."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def decompose(self, query: str) -> list[RewrittenQuery]:
        """Break complex query into simpler sub-queries."""
        
        prompt = f"""Break this complex question into simpler sub-questions.
Each sub-question should be answerable independently.

Complex question: {query}

Generate 2-4 simpler sub-questions.
Return one question per line."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        
        sub_queries = [
            line.strip()
            for line in response.choices[0].message.content.strip().split('\n')
            if line.strip()
        ]
        
        return [
            RewrittenQuery(original=query, rewritten=q)
            for q in sub_queries
        ]

Query Routing

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class QueryType(Enum):
    """Types of queries."""
    
    FACTUAL = "factual"
    PROCEDURAL = "procedural"
    CONCEPTUAL = "conceptual"
    TROUBLESHOOTING = "troubleshooting"
    COMPARISON = "comparison"

@dataclass
class RoutingDecision:
    """Query routing decision."""
    
    query: str
    query_type: QueryType
    target_indexes: list[str]
    search_strategy: str
    confidence: float

class QueryRouter:
    """Route queries to appropriate indexes."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
        
        # Index configurations
        self.indexes = {
            "documentation": {
                "types": [QueryType.PROCEDURAL, QueryType.CONCEPTUAL],
                "keywords": ["how to", "guide", "tutorial", "documentation"]
            },
            "api_reference": {
                "types": [QueryType.FACTUAL],
                "keywords": ["api", "endpoint", "method", "parameter", "function"]
            },
            "troubleshooting": {
                "types": [QueryType.TROUBLESHOOTING],
                "keywords": ["error", "issue", "problem", "fix", "debug", "not working"]
            },
            "examples": {
                "types": [QueryType.PROCEDURAL],
                "keywords": ["example", "sample", "code", "snippet", "demo"]
            }
        }
    
    async def route(self, query: str) -> RoutingDecision:
        """Route query to appropriate indexes."""
        
        # Classify query type
        query_type = await self._classify_query(query)
        
        # Determine target indexes
        target_indexes = self._select_indexes(query, query_type)
        
        # Determine search strategy
        search_strategy = self._select_strategy(query_type)
        
        return RoutingDecision(
            query=query,
            query_type=query_type,
            target_indexes=target_indexes,
            search_strategy=search_strategy,
            confidence=0.8
        )
    
    async def _classify_query(self, query: str) -> QueryType:
        """Classify query type using LLM."""
        
        prompt = f"""Classify this query into one of these categories:
- FACTUAL: Looking for specific facts or data
- PROCEDURAL: Looking for how to do something
- CONCEPTUAL: Looking for explanations or understanding
- TROUBLESHOOTING: Looking to fix a problem
- COMPARISON: Comparing options or alternatives

Query: {query}

Return only the category name."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        category = response.choices[0].message.content.strip().upper()
        
        try:
            return QueryType[category]
        except KeyError:
            return QueryType.FACTUAL
    
    def _select_indexes(
        self,
        query: str,
        query_type: QueryType
    ) -> list[str]:
        """Select indexes based on query and type."""
        
        selected = []
        query_lower = query.lower()
        
        for index_name, config in self.indexes.items():
            # Check query type match
            if query_type in config["types"]:
                selected.append(index_name)
                continue
            
            # Check keyword match
            for keyword in config["keywords"]:
                if keyword in query_lower:
                    selected.append(index_name)
                    break
        
        # Default to documentation if nothing selected
        if not selected:
            selected = ["documentation"]
        
        return selected
    
    def _select_strategy(self, query_type: QueryType) -> str:
        """Select search strategy based on query type."""
        
        strategies = {
            QueryType.FACTUAL: "semantic",
            QueryType.PROCEDURAL: "hybrid",
            QueryType.CONCEPTUAL: "semantic",
            QueryType.TROUBLESHOOTING: "keyword_boost",
            QueryType.COMPARISON: "multi_query"
        }
        
        return strategies.get(query_type, "semantic")

class AdaptiveRouter:
    """Router that learns from feedback."""
    
    def __init__(self, base_router: QueryRouter):
        self.base_router = base_router
        self._feedback: list[dict] = []
    
    async def route(self, query: str) -> RoutingDecision:
        """Route with adaptive learning."""
        
        decision = await self.base_router.route(query)
        
        # Adjust based on feedback patterns
        # (In production, this would use ML)
        
        return decision
    
    def record_feedback(
        self,
        query: str,
        decision: RoutingDecision,
        was_helpful: bool
    ):
        """Record feedback for learning."""
        
        self._feedback.append({
            "query": query,
            "indexes": decision.target_indexes,
            "strategy": decision.search_strategy,
            "helpful": was_helpful
        })

Multi-Query Retrieval

from dataclasses import dataclass
from typing import Any, Optional
import asyncio

@dataclass
class RetrievalResult:
    """Result from retrieval."""
    
    query: str
    documents: list[dict]
    scores: list[float]

@dataclass
class FusedResult:
    """Fused results from multiple queries."""
    
    documents: list[dict]
    scores: list[float]
    source_queries: list[str]

class MultiQueryRetriever:
    """Retrieve using multiple query variations."""
    
    def __init__(
        self,
        retriever: Any,
        expander: LLMQueryExpander = None,
        rewriter: QueryRewriter = None
    ):
        self.retriever = retriever
        self.expander = expander
        self.rewriter = rewriter
    
    async def retrieve(
        self,
        query: str,
        k: int = 10
    ) -> FusedResult:
        """Retrieve using multiple query variations."""
        
        queries = [query]
        
        # Add expansions
        if self.expander:
            expanded = await self.expander.expand(query)
            queries.extend(expanded.expansions)
        
        # Add rewritten version
        if self.rewriter:
            rewritten = await self.rewriter.rewrite(query)
            queries.append(rewritten.rewritten)
        
        # Retrieve for each query
        results = await asyncio.gather(*[
            self._retrieve_single(q, k)
            for q in queries
        ])
        
        # Fuse results
        return self._fuse_results(results, k)
    
    async def _retrieve_single(
        self,
        query: str,
        k: int
    ) -> RetrievalResult:
        """Retrieve for a single query."""
        
        docs = await self.retriever.retrieve(query, k=k)
        
        return RetrievalResult(
            query=query,
            documents=[d["content"] for d in docs],
            scores=[d["score"] for d in docs]
        )
    
    def _fuse_results(
        self,
        results: list[RetrievalResult],
        k: int
    ) -> FusedResult:
        """Fuse results using Reciprocal Rank Fusion."""
        
        # RRF constant
        rrf_k = 60
        
        # Calculate RRF scores
        doc_scores: dict[str, float] = {}
        doc_contents: dict[str, dict] = {}
        
        for result in results:
            for rank, (doc, score) in enumerate(zip(result.documents, result.scores)):
                doc_id = hash(str(doc))
                
                if doc_id not in doc_scores:
                    doc_scores[doc_id] = 0
                    doc_contents[doc_id] = doc
                
                # RRF formula
                doc_scores[doc_id] += 1 / (rrf_k + rank + 1)
        
        # Sort by fused score
        sorted_docs = sorted(
            doc_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:k]
        
        return FusedResult(
            documents=[doc_contents[doc_id] for doc_id, _ in sorted_docs],
            scores=[score for _, score in sorted_docs],
            source_queries=[r.query for r in results]
        )

class HierarchicalRetriever:
    """Retrieve with hierarchical query strategy."""
    
    def __init__(
        self,
        retriever: Any,
        step_back_rewriter: StepBackRewriter,
        decomposition_rewriter: DecompositionRewriter
    ):
        self.retriever = retriever
        self.step_back = step_back_rewriter
        self.decomposition = decomposition_rewriter
    
    async def retrieve(
        self,
        query: str,
        k: int = 10
    ) -> FusedResult:
        """Retrieve using hierarchical strategy."""
        
        all_results = []
        
        # Original query
        original_result = await self._retrieve_single(query, k)
        all_results.append(original_result)
        
        # Step-back queries for background
        step_back_queries = await self.step_back.rewrite(query)
        for sq in step_back_queries[:2]:
            result = await self._retrieve_single(sq.rewritten, k // 2)
            all_results.append(result)
        
        # Decomposed queries for specifics
        sub_queries = await self.decomposition.decompose(query)
        for sq in sub_queries[:3]:
            result = await self._retrieve_single(sq.rewritten, k // 3)
            all_results.append(result)
        
        # Fuse all results
        return self._fuse_results(all_results, k)
    
    async def _retrieve_single(self, query: str, k: int) -> RetrievalResult:
        docs = await self.retriever.retrieve(query, k=k)
        return RetrievalResult(
            query=query,
            documents=[d["content"] for d in docs],
            scores=[d["score"] for d in docs]
        )
    
    def _fuse_results(self, results: list[RetrievalResult], k: int) -> FusedResult:
        # Same RRF fusion as MultiQueryRetriever
        rrf_k = 60
        doc_scores: dict[str, float] = {}
        doc_contents: dict[str, dict] = {}
        
        for result in results:
            for rank, (doc, score) in enumerate(zip(result.documents, result.scores)):
                doc_id = hash(str(doc))
                if doc_id not in doc_scores:
                    doc_scores[doc_id] = 0
                    doc_contents[doc_id] = doc
                doc_scores[doc_id] += 1 / (rrf_k + rank + 1)
        
        sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)[:k]
        
        return FusedResult(
            documents=[doc_contents[doc_id] for doc_id, _ in sorted_docs],
            scores=[score for _, score in sorted_docs],
            source_queries=[r.query for r in results]
        )

Production Query Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
expander = None  # Initialize with client
rewriter = None
router = None
multi_retriever = None

class ExpandRequest(BaseModel):
    query: str
    num_expansions: int = 3

class RewriteRequest(BaseModel):
    query: str
    context: Optional[str] = None
    history: Optional[list[dict]] = None

class RouteRequest(BaseModel):
    query: str

class RetrieveRequest(BaseModel):
    query: str
    k: int = 10
    use_expansion: bool = True
    use_rewriting: bool = True

@app.post("/v1/query/expand")
async def expand_query(request: ExpandRequest):
    """Expand a query."""
    
    result = await expander.expand(request.query, request.num_expansions)
    
    return {
        "original": result.original,
        "expansions": result.expansions,
        "synonyms": result.synonyms
    }

@app.post("/v1/query/rewrite")
async def rewrite_query(request: RewriteRequest):
    """Rewrite a query."""
    
    if request.history:
        conv_rewriter = ConversationalRewriter(rewriter.client, rewriter.model)
        result = await conv_rewriter.rewrite(request.query, request.history)
    else:
        result = await rewriter.rewrite(request.query, request.context)
    
    return {
        "original": result.original,
        "rewritten": result.rewritten
    }

@app.post("/v1/query/route")
async def route_query(request: RouteRequest):
    """Route a query to appropriate indexes."""
    
    decision = await router.route(request.query)
    
    return {
        "query": decision.query,
        "query_type": decision.query_type.value,
        "target_indexes": decision.target_indexes,
        "search_strategy": decision.search_strategy,
        "confidence": decision.confidence
    }

@app.post("/v1/retrieve")
async def retrieve_documents(request: RetrieveRequest):
    """Retrieve documents with query optimization."""
    
    result = await multi_retriever.retrieve(request.query, request.k)
    
    return {
        "documents": result.documents,
        "scores": result.scores,
        "source_queries": result.source_queries
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Query optimization is the key to high-quality RAG retrieval. Start with query expansion to capture different ways users might phrase the same question—synonyms, related terms, and alternative phrasings all help. Use query rewriting to make vague or conversational queries more specific and searchable. For complex questions, decompose them into simpler sub-queries or use step-back prompting to retrieve background context. Route queries to appropriate indexes based on query type—troubleshooting queries need different sources than conceptual questions. Fuse results from multiple query variations using Reciprocal Rank Fusion to get the best of all approaches. The key insight is that users don’t always know how to phrase their questions optimally for your document collection. Query optimization bridges this gap, transforming user intent into effective retrieval queries. Invest in query optimization and your RAG system will find relevant documents even when users ask imperfect questions.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.