Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Semantic Search Optimization: Building High-Quality Retrieval Systems

Introduction: Semantic search goes beyond keyword matching to understand the meaning and intent behind queries. By converting text to dense vector embeddings, semantic search finds conceptually similar content even when exact words don’t match. However, naive implementations often underperform—poor embedding choices, suboptimal indexing, and lack of reranking lead to irrelevant results. This guide covers practical optimization techniques: selecting the right embedding model for your domain, tuning vector index parameters for speed and accuracy, implementing hybrid search that combines semantic and keyword approaches, and adding reranking to improve precision. Whether you’re building a RAG system, document search, or recommendation engine, these optimizations can dramatically improve search quality.

Semantic Search
Semantic Search: Query Embedding, Vector Retrieval, Result Reranking

Embedding Selection

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
from enum import Enum

class EmbeddingTask(Enum):
    """Embedding task types."""
    
    RETRIEVAL_QUERY = "retrieval_query"
    RETRIEVAL_DOCUMENT = "retrieval_document"
    SEMANTIC_SIMILARITY = "semantic_similarity"
    CLASSIFICATION = "classification"
    CLUSTERING = "clustering"

@dataclass
class EmbeddingConfig:
    """Configuration for embedding model."""
    
    model_name: str
    dimension: int
    max_tokens: int
    normalize: bool = True
    batch_size: int = 32
    task_prefix: dict[EmbeddingTask, str] = None

class EmbeddingModel(ABC):
    """Abstract embedding model."""
    
    @abstractmethod
    async def embed(
        self,
        texts: list[str],
        task: EmbeddingTask = None
    ) -> list[list[float]]:
        """Generate embeddings for texts."""
        pass
    
    @property
    @abstractmethod
    def dimension(self) -> int:
        """Embedding dimension."""
        pass

class OpenAIEmbedding(EmbeddingModel):
    """OpenAI embedding model."""
    
    def __init__(
        self,
        client: Any,
        model: str = "text-embedding-3-small",
        dimensions: int = None
    ):
        self.client = client
        self.model = model
        self._dimensions = dimensions or self._get_default_dimension()
    
    def _get_default_dimension(self) -> int:
        """Get default dimension for model."""
        
        defaults = {
            "text-embedding-3-small": 1536,
            "text-embedding-3-large": 3072,
            "text-embedding-ada-002": 1536
        }
        return defaults.get(self.model, 1536)
    
    @property
    def dimension(self) -> int:
        return self._dimensions
    
    async def embed(
        self,
        texts: list[str],
        task: EmbeddingTask = None
    ) -> list[list[float]]:
        """Generate embeddings using OpenAI."""
        
        kwargs = {"model": self.model, "input": texts}
        
        if self._dimensions and self.model.startswith("text-embedding-3"):
            kwargs["dimensions"] = self._dimensions
        
        response = await self.client.embeddings.create(**kwargs)
        
        return [item.embedding for item in response.data]

class CohereEmbedding(EmbeddingModel):
    """Cohere embedding model."""
    
    def __init__(
        self,
        client: Any,
        model: str = "embed-english-v3.0"
    ):
        self.client = client
        self.model = model
        
        self._task_map = {
            EmbeddingTask.RETRIEVAL_QUERY: "search_query",
            EmbeddingTask.RETRIEVAL_DOCUMENT: "search_document",
            EmbeddingTask.CLASSIFICATION: "classification",
            EmbeddingTask.CLUSTERING: "clustering"
        }
    
    @property
    def dimension(self) -> int:
        return 1024
    
    async def embed(
        self,
        texts: list[str],
        task: EmbeddingTask = None
    ) -> list[list[float]]:
        """Generate embeddings using Cohere."""
        
        input_type = self._task_map.get(task, "search_document")
        
        response = await self.client.embed(
            texts=texts,
            model=self.model,
            input_type=input_type
        )
        
        return response.embeddings

class VoyageEmbedding(EmbeddingModel):
    """Voyage AI embedding model."""
    
    def __init__(
        self,
        client: Any,
        model: str = "voyage-large-2"
    ):
        self.client = client
        self.model = model
        
        self._dimensions = {
            "voyage-large-2": 1536,
            "voyage-code-2": 1536,
            "voyage-lite-02-instruct": 1024
        }
    
    @property
    def dimension(self) -> int:
        return self._dimensions.get(self.model, 1536)
    
    async def embed(
        self,
        texts: list[str],
        task: EmbeddingTask = None
    ) -> list[list[float]]:
        """Generate embeddings using Voyage."""
        
        input_type = "query" if task == EmbeddingTask.RETRIEVAL_QUERY else "document"
        
        response = await self.client.embed(
            texts=texts,
            model=self.model,
            input_type=input_type
        )
        
        return response.embeddings

class SentenceTransformerEmbedding(EmbeddingModel):
    """Local sentence transformer model."""
    
    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        device: str = "cpu"
    ):
        self.model_name = model_name
        self.device = device
        self._model = None
        self._dimension = None
    
    def _load_model(self):
        """Lazy load model."""
        
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer(self.model_name, device=self.device)
            self._dimension = self._model.get_sentence_embedding_dimension()
    
    @property
    def dimension(self) -> int:
        self._load_model()
        return self._dimension
    
    async def embed(
        self,
        texts: list[str],
        task: EmbeddingTask = None
    ) -> list[list[float]]:
        """Generate embeddings locally."""
        
        self._load_model()
        
        embeddings = self._model.encode(
            texts,
            normalize_embeddings=True,
            show_progress_bar=False
        )
        
        return embeddings.tolist()

class InstructorEmbedding(EmbeddingModel):
    """Instructor embedding with task instructions."""
    
    def __init__(
        self,
        model_name: str = "hkunlp/instructor-large",
        device: str = "cpu"
    ):
        self.model_name = model_name
        self.device = device
        self._model = None
        
        self._instructions = {
            EmbeddingTask.RETRIEVAL_QUERY: "Represent the question for retrieving relevant documents:",
            EmbeddingTask.RETRIEVAL_DOCUMENT: "Represent the document for retrieval:",
            EmbeddingTask.SEMANTIC_SIMILARITY: "Represent the sentence for semantic similarity:",
            EmbeddingTask.CLASSIFICATION: "Represent the text for classification:",
            EmbeddingTask.CLUSTERING: "Represent the text for clustering:"
        }
    
    def _load_model(self):
        """Lazy load model."""
        
        if self._model is None:
            from InstructorEmbedding import INSTRUCTOR
            self._model = INSTRUCTOR(self.model_name, device=self.device)
    
    @property
    def dimension(self) -> int:
        return 768
    
    async def embed(
        self,
        texts: list[str],
        task: EmbeddingTask = None
    ) -> list[list[float]]:
        """Generate embeddings with instructions."""
        
        self._load_model()
        
        instruction = self._instructions.get(
            task,
            self._instructions[EmbeddingTask.RETRIEVAL_DOCUMENT]
        )
        
        inputs = [[instruction, text] for text in texts]
        embeddings = self._model.encode(inputs)
        
        return embeddings.tolist()

Vector Index Optimization

from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
import numpy as np

@dataclass
class SearchResult:
    """A search result."""
    
    id: str
    score: float
    metadata: dict = None
    text: str = None

@dataclass
class IndexConfig:
    """Vector index configuration."""
    
    dimension: int
    metric: str = "cosine"  # cosine, euclidean, dot_product
    
    # HNSW parameters
    ef_construction: int = 200
    m: int = 16
    ef_search: int = 100
    
    # IVF parameters
    nlist: int = 100
    nprobe: int = 10
    
    # PQ parameters
    pq_m: int = 8
    pq_bits: int = 8

class VectorIndex(ABC):
    """Abstract vector index."""
    
    @abstractmethod
    async def add(
        self,
        ids: list[str],
        embeddings: list[list[float]],
        metadata: list[dict] = None
    ) -> None:
        """Add vectors to index."""
        pass
    
    @abstractmethod
    async def search(
        self,
        query_embedding: list[float],
        k: int = 10,
        filter: dict = None
    ) -> list[SearchResult]:
        """Search for similar vectors."""
        pass
    
    @abstractmethod
    async def delete(self, ids: list[str]) -> None:
        """Delete vectors by ID."""
        pass

class FAISSIndex(VectorIndex):
    """FAISS vector index."""
    
    def __init__(self, config: IndexConfig):
        self.config = config
        self._index = None
        self._id_map: dict[int, str] = {}
        self._metadata: dict[str, dict] = {}
        self._next_id = 0
        
        self._build_index()
    
    def _build_index(self):
        """Build FAISS index."""
        
        import faiss
        
        d = self.config.dimension
        
        # Choose index type based on expected size
        if self.config.metric == "cosine":
            # Normalize vectors for cosine similarity
            quantizer = faiss.IndexFlatIP(d)
        else:
            quantizer = faiss.IndexFlatL2(d)
        
        # Use IVF for larger datasets
        self._index = faiss.IndexIVFFlat(
            quantizer,
            d,
            self.config.nlist,
            faiss.METRIC_INNER_PRODUCT if self.config.metric == "cosine" else faiss.METRIC_L2
        )
    
    async def add(
        self,
        ids: list[str],
        embeddings: list[list[float]],
        metadata: list[dict] = None
    ) -> None:
        """Add vectors to FAISS index."""
        
        import faiss
        
        vectors = np.array(embeddings, dtype=np.float32)
        
        # Normalize for cosine similarity
        if self.config.metric == "cosine":
            faiss.normalize_L2(vectors)
        
        # Train index if needed
        if not self._index.is_trained:
            self._index.train(vectors)
        
        # Add vectors
        self._index.add(vectors)
        
        # Store ID mapping and metadata
        for i, id_ in enumerate(ids):
            internal_id = self._next_id + i
            self._id_map[internal_id] = id_
            
            if metadata and i < len(metadata):
                self._metadata[id_] = metadata[i]
        
        self._next_id += len(ids)
    
    async def search(
        self,
        query_embedding: list[float],
        k: int = 10,
        filter: dict = None
    ) -> list[SearchResult]:
        """Search FAISS index."""
        
        import faiss
        
        query = np.array([query_embedding], dtype=np.float32)
        
        if self.config.metric == "cosine":
            faiss.normalize_L2(query)
        
        # Set search parameters
        self._index.nprobe = self.config.nprobe
        
        # Search
        distances, indices = self._index.search(query, k)
        
        results = []
        for dist, idx in zip(distances[0], indices[0]):
            if idx == -1:
                continue
            
            id_ = self._id_map.get(idx)
            if not id_:
                continue
            
            # Apply filter if provided
            if filter:
                meta = self._metadata.get(id_, {})
                if not self._matches_filter(meta, filter):
                    continue
            
            results.append(SearchResult(
                id=id_,
                score=float(dist),
                metadata=self._metadata.get(id_)
            ))
        
        return results
    
    def _matches_filter(self, metadata: dict, filter: dict) -> bool:
        """Check if metadata matches filter."""
        
        for key, value in filter.items():
            if key not in metadata:
                return False
            if metadata[key] != value:
                return False
        return True
    
    async def delete(self, ids: list[str]) -> None:
        """Delete vectors (not directly supported in FAISS)."""
        
        # FAISS doesn't support deletion well
        # Would need to rebuild index
        for id_ in ids:
            if id_ in self._metadata:
                del self._metadata[id_]

class HNSWIndex(VectorIndex):
    """HNSW index using hnswlib."""
    
    def __init__(self, config: IndexConfig, max_elements: int = 100000):
        self.config = config
        self.max_elements = max_elements
        self._index = None
        self._metadata: dict[int, dict] = {}
        self._id_to_internal: dict[str, int] = {}
        self._internal_to_id: dict[int, str] = {}
        self._next_id = 0
        
        self._build_index()
    
    def _build_index(self):
        """Build HNSW index."""
        
        import hnswlib
        
        space = "cosine" if self.config.metric == "cosine" else "l2"
        
        self._index = hnswlib.Index(space=space, dim=self.config.dimension)
        self._index.init_index(
            max_elements=self.max_elements,
            ef_construction=self.config.ef_construction,
            M=self.config.m
        )
    
    async def add(
        self,
        ids: list[str],
        embeddings: list[list[float]],
        metadata: list[dict] = None
    ) -> None:
        """Add vectors to HNSW index."""
        
        vectors = np.array(embeddings, dtype=np.float32)
        
        internal_ids = list(range(self._next_id, self._next_id + len(ids)))
        
        self._index.add_items(vectors, internal_ids)
        
        for i, (id_, internal_id) in enumerate(zip(ids, internal_ids)):
            self._id_to_internal[id_] = internal_id
            self._internal_to_id[internal_id] = id_
            
            if metadata and i < len(metadata):
                self._metadata[internal_id] = metadata[i]
        
        self._next_id += len(ids)
    
    async def search(
        self,
        query_embedding: list[float],
        k: int = 10,
        filter: dict = None
    ) -> list[SearchResult]:
        """Search HNSW index."""
        
        query = np.array([query_embedding], dtype=np.float32)
        
        self._index.set_ef(self.config.ef_search)
        
        # Search more if filtering
        search_k = k * 3 if filter else k
        
        internal_ids, distances = self._index.knn_query(query, k=search_k)
        
        results = []
        for internal_id, dist in zip(internal_ids[0], distances[0]):
            id_ = self._internal_to_id.get(internal_id)
            if not id_:
                continue
            
            meta = self._metadata.get(internal_id, {})
            
            if filter and not self._matches_filter(meta, filter):
                continue
            
            results.append(SearchResult(
                id=id_,
                score=1 - float(dist),  # Convert distance to similarity
                metadata=meta
            ))
            
            if len(results) >= k:
                break
        
        return results
    
    def _matches_filter(self, metadata: dict, filter: dict) -> bool:
        """Check if metadata matches filter."""
        
        for key, value in filter.items():
            if key not in metadata:
                return False
            if metadata[key] != value:
                return False
        return True
    
    async def delete(self, ids: list[str]) -> None:
        """Mark vectors as deleted."""
        
        for id_ in ids:
            internal_id = self._id_to_internal.get(id_)
            if internal_id is not None:
                self._index.mark_deleted(internal_id)
                del self._id_to_internal[id_]
                del self._internal_to_id[internal_id]
                if internal_id in self._metadata:
                    del self._metadata[internal_id]

Hybrid Search

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class FusionMethod(Enum):
    """Methods for fusing search results."""
    
    RRF = "rrf"  # Reciprocal Rank Fusion
    LINEAR = "linear"  # Linear combination
    CONVEX = "convex"  # Convex combination

@dataclass
class HybridConfig:
    """Hybrid search configuration."""
    
    semantic_weight: float = 0.7
    keyword_weight: float = 0.3
    fusion_method: FusionMethod = FusionMethod.RRF
    rrf_k: int = 60

class KeywordSearch:
    """BM25-based keyword search."""
    
    def __init__(self):
        self._documents: dict[str, str] = {}
        self._bm25 = None
        self._doc_ids: list[str] = []
    
    def add(self, id_: str, text: str) -> None:
        """Add document to index."""
        
        self._documents[id_] = text
        self._bm25 = None  # Invalidate index
    
    def _build_index(self):
        """Build BM25 index."""
        
        from rank_bm25 import BM25Okapi
        
        self._doc_ids = list(self._documents.keys())
        tokenized = [
            doc.lower().split()
            for doc in self._documents.values()
        ]
        
        self._bm25 = BM25Okapi(tokenized)
    
    def search(self, query: str, k: int = 10) -> list[SearchResult]:
        """Search using BM25."""
        
        if self._bm25 is None:
            self._build_index()
        
        tokenized_query = query.lower().split()
        scores = self._bm25.get_scores(tokenized_query)
        
        # Get top k
        top_indices = scores.argsort()[-k:][::-1]
        
        results = []
        for idx in top_indices:
            if scores[idx] > 0:
                results.append(SearchResult(
                    id=self._doc_ids[idx],
                    score=float(scores[idx]),
                    text=self._documents[self._doc_ids[idx]]
                ))
        
        return results

class HybridSearch:
    """Combine semantic and keyword search."""
    
    def __init__(
        self,
        embedding_model: EmbeddingModel,
        vector_index: VectorIndex,
        config: HybridConfig = None
    ):
        self.embedding_model = embedding_model
        self.vector_index = vector_index
        self.config = config or HybridConfig()
        
        self.keyword_search = KeywordSearch()
        self._documents: dict[str, str] = {}
    
    async def add(
        self,
        id_: str,
        text: str,
        metadata: dict = None
    ) -> None:
        """Add document to both indexes."""
        
        # Add to keyword index
        self.keyword_search.add(id_, text)
        self._documents[id_] = text
        
        # Add to vector index
        embeddings = await self.embedding_model.embed(
            [text],
            task=EmbeddingTask.RETRIEVAL_DOCUMENT
        )
        
        await self.vector_index.add(
            ids=[id_],
            embeddings=embeddings,
            metadata=[metadata] if metadata else None
        )
    
    async def search(
        self,
        query: str,
        k: int = 10,
        filter: dict = None
    ) -> list[SearchResult]:
        """Perform hybrid search."""
        
        # Semantic search
        query_embedding = await self.embedding_model.embed(
            [query],
            task=EmbeddingTask.RETRIEVAL_QUERY
        )
        
        semantic_results = await self.vector_index.search(
            query_embedding[0],
            k=k * 2,
            filter=filter
        )
        
        # Keyword search
        keyword_results = self.keyword_search.search(query, k=k * 2)
        
        # Fuse results
        if self.config.fusion_method == FusionMethod.RRF:
            return self._rrf_fusion(semantic_results, keyword_results, k)
        elif self.config.fusion_method == FusionMethod.LINEAR:
            return self._linear_fusion(semantic_results, keyword_results, k)
        else:
            return self._convex_fusion(semantic_results, keyword_results, k)
    
    def _rrf_fusion(
        self,
        semantic: list[SearchResult],
        keyword: list[SearchResult],
        k: int
    ) -> list[SearchResult]:
        """Reciprocal Rank Fusion."""
        
        rrf_k = self.config.rrf_k
        scores: dict[str, float] = {}
        
        # Score semantic results
        for rank, result in enumerate(semantic):
            scores[result.id] = scores.get(result.id, 0) + \
                self.config.semantic_weight / (rrf_k + rank + 1)
        
        # Score keyword results
        for rank, result in enumerate(keyword):
            scores[result.id] = scores.get(result.id, 0) + \
                self.config.keyword_weight / (rrf_k + rank + 1)
        
        # Sort by combined score
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        
        results = []
        for id_ in sorted_ids[:k]:
            results.append(SearchResult(
                id=id_,
                score=scores[id_],
                text=self._documents.get(id_)
            ))
        
        return results
    
    def _linear_fusion(
        self,
        semantic: list[SearchResult],
        keyword: list[SearchResult],
        k: int
    ) -> list[SearchResult]:
        """Linear combination of scores."""
        
        # Normalize scores
        semantic_scores = self._normalize_scores(semantic)
        keyword_scores = self._normalize_scores(keyword)
        
        # Combine scores
        combined: dict[str, float] = {}
        
        for result in semantic:
            combined[result.id] = \
                self.config.semantic_weight * semantic_scores.get(result.id, 0)
        
        for result in keyword:
            combined[result.id] = combined.get(result.id, 0) + \
                self.config.keyword_weight * keyword_scores.get(result.id, 0)
        
        # Sort and return
        sorted_ids = sorted(combined.keys(), key=lambda x: combined[x], reverse=True)
        
        return [
            SearchResult(id=id_, score=combined[id_], text=self._documents.get(id_))
            for id_ in sorted_ids[:k]
        ]
    
    def _convex_fusion(
        self,
        semantic: list[SearchResult],
        keyword: list[SearchResult],
        k: int
    ) -> list[SearchResult]:
        """Convex combination (weighted average)."""
        
        # Same as linear but ensures weights sum to 1
        total_weight = self.config.semantic_weight + self.config.keyword_weight
        sem_w = self.config.semantic_weight / total_weight
        key_w = self.config.keyword_weight / total_weight
        
        semantic_scores = self._normalize_scores(semantic)
        keyword_scores = self._normalize_scores(keyword)
        
        combined: dict[str, float] = {}
        all_ids = set(r.id for r in semantic) | set(r.id for r in keyword)
        
        for id_ in all_ids:
            combined[id_] = \
                sem_w * semantic_scores.get(id_, 0) + \
                key_w * keyword_scores.get(id_, 0)
        
        sorted_ids = sorted(combined.keys(), key=lambda x: combined[x], reverse=True)
        
        return [
            SearchResult(id=id_, score=combined[id_], text=self._documents.get(id_))
            for id_ in sorted_ids[:k]
        ]
    
    def _normalize_scores(self, results: list[SearchResult]) -> dict[str, float]:
        """Normalize scores to 0-1 range."""
        
        if not results:
            return {}
        
        scores = [r.score for r in results]
        min_score = min(scores)
        max_score = max(scores)
        
        if max_score == min_score:
            return {r.id: 1.0 for r in results}
        
        return {
            r.id: (r.score - min_score) / (max_score - min_score)
            for r in results
        }

Query Optimization

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class QueryExpansion:
    """Expanded query with variations."""
    
    original: str
    expanded: list[str]
    hypothetical_answers: list[str] = None

class QueryOptimizer:
    """Optimize queries for better retrieval."""
    
    def __init__(
        self,
        llm_client: Any = None,
        model: str = "gpt-4o-mini"
    ):
        self.llm_client = llm_client
        self.model = model
    
    async def expand_query(self, query: str) -> QueryExpansion:
        """Expand query with variations."""
        
        if not self.llm_client:
            return QueryExpansion(original=query, expanded=[query])
        
        prompt = f"""Generate 3 alternative phrasings of this search query that might help find relevant documents:

Query: {query}

Return only the alternative queries, one per line."""
        
        response = await self.llm_client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7
        )
        
        alternatives = response.choices[0].message.content.strip().split("\n")
        alternatives = [a.strip() for a in alternatives if a.strip()]
        
        return QueryExpansion(
            original=query,
            expanded=[query] + alternatives[:3]
        )
    
    async def generate_hyde(self, query: str) -> QueryExpansion:
        """Generate Hypothetical Document Embeddings."""
        
        if not self.llm_client:
            return QueryExpansion(original=query, expanded=[query])
        
        prompt = f"""Write a short passage that would be a perfect answer to this question:

Question: {query}

Write 2-3 sentences as if from a document that answers this question."""
        
        response = await self.llm_client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7
        )
        
        hypothetical = response.choices[0].message.content.strip()
        
        return QueryExpansion(
            original=query,
            expanded=[query],
            hypothetical_answers=[hypothetical]
        )
    
    async def decompose_query(self, query: str) -> list[str]:
        """Decompose complex query into sub-queries."""
        
        if not self.llm_client:
            return [query]
        
        prompt = f"""Break down this complex question into simpler sub-questions that together would answer the original:

Question: {query}

Return 2-4 simpler questions, one per line."""
        
        response = await self.llm_client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        
        sub_queries = response.choices[0].message.content.strip().split("\n")
        sub_queries = [q.strip() for q in sub_queries if q.strip()]
        
        return sub_queries if sub_queries else [query]

class MultiQuerySearch:
    """Search with multiple query variations."""
    
    def __init__(
        self,
        search: HybridSearch,
        optimizer: QueryOptimizer
    ):
        self.search = search
        self.optimizer = optimizer
    
    async def search_expanded(
        self,
        query: str,
        k: int = 10
    ) -> list[SearchResult]:
        """Search with expanded queries."""
        
        expansion = await self.optimizer.expand_query(query)
        
        all_results: dict[str, SearchResult] = {}
        
        for expanded_query in expansion.expanded:
            results = await self.search.search(expanded_query, k=k)
            
            for result in results:
                if result.id not in all_results:
                    all_results[result.id] = result
                else:
                    # Keep higher score
                    if result.score > all_results[result.id].score:
                        all_results[result.id] = result
        
        # Sort by score
        sorted_results = sorted(
            all_results.values(),
            key=lambda x: x.score,
            reverse=True
        )
        
        return sorted_results[:k]
    
    async def search_hyde(
        self,
        query: str,
        k: int = 10
    ) -> list[SearchResult]:
        """Search using HyDE."""
        
        expansion = await self.optimizer.generate_hyde(query)
        
        # Search with both original and hypothetical
        all_results: dict[str, SearchResult] = {}
        
        # Original query
        results = await self.search.search(query, k=k)
        for result in results:
            all_results[result.id] = result
        
        # Hypothetical answer
        if expansion.hypothetical_answers:
            for hypo in expansion.hypothetical_answers:
                results = await self.search.search(hypo, k=k)
                for result in results:
                    if result.id not in all_results:
                        all_results[result.id] = result
        
        sorted_results = sorted(
            all_results.values(),
            key=lambda x: x.score,
            reverse=True
        )
        
        return sorted_results[:k]

Production Search Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class IndexRequest(BaseModel):
    id: str
    text: str
    metadata: Optional[dict] = None

class SearchRequest(BaseModel):
    query: str
    k: int = 10
    filter: Optional[dict] = None
    use_hybrid: bool = True
    expand_query: bool = False
    use_hyde: bool = False

class SearchResultResponse(BaseModel):
    id: str
    score: float
    text: Optional[str] = None
    metadata: Optional[dict] = None

class SearchResponse(BaseModel):
    results: list[SearchResultResponse]
    query: str
    expanded_queries: Optional[list[str]] = None

# Initialize components (would be configured properly)
# embedding_model = OpenAIEmbedding(client)
# vector_index = HNSWIndex(IndexConfig(dimension=1536))
# hybrid_search = HybridSearch(embedding_model, vector_index)
# optimizer = QueryOptimizer(llm_client)

@app.post("/v1/index")
async def index_document(request: IndexRequest):
    """Index a document."""
    
    # await hybrid_search.add(request.id, request.text, request.metadata)
    
    return {
        "status": "indexed",
        "id": request.id
    }

@app.post("/v1/index/batch")
async def index_batch(documents: list[IndexRequest]):
    """Index multiple documents."""
    
    # for doc in documents:
    #     await hybrid_search.add(doc.id, doc.text, doc.metadata)
    
    return {
        "status": "indexed",
        "count": len(documents)
    }

@app.post("/v1/search")
async def search(request: SearchRequest) -> SearchResponse:
    """Search for documents."""
    
    expanded = None
    
    # Would use actual search
    # if request.use_hyde:
    #     results = await multi_query.search_hyde(request.query, request.k)
    # elif request.expand_query:
    #     results = await multi_query.search_expanded(request.query, request.k)
    # elif request.use_hybrid:
    #     results = await hybrid_search.search(request.query, request.k, request.filter)
    # else:
    #     results = await vector_index.search(query_embedding, request.k, request.filter)
    
    # Placeholder results
    results = []
    
    return SearchResponse(
        results=[
            SearchResultResponse(
                id=r.id,
                score=r.score,
                text=r.text,
                metadata=r.metadata
            )
            for r in results
        ],
        query=request.query,
        expanded_queries=expanded
    )

@app.delete("/v1/index/{doc_id}")
async def delete_document(doc_id: str):
    """Delete a document."""
    
    # await vector_index.delete([doc_id])
    
    return {"status": "deleted", "id": doc_id}

@app.get("/v1/stats")
async def get_stats():
    """Get index statistics."""
    
    return {
        "total_documents": 0,
        "index_type": "HNSW",
        "dimension": 1536,
        "metric": "cosine"
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Semantic search quality depends on multiple factors working together. Start with embedding selection—choose models trained for retrieval tasks and consider domain-specific options for specialized content. Optimize your vector index parameters: HNSW’s ef_construction and M values affect build time and recall, while ef_search controls query-time accuracy. Implement hybrid search combining semantic and keyword approaches—BM25 catches exact matches that embeddings might miss, while embeddings find conceptually similar content. Use query optimization techniques like expansion and HyDE to improve recall for ambiguous queries. Add reranking with cross-encoders for precision-critical applications. Monitor your search quality with metrics like MRR, NDCG, and recall@k, and continuously tune based on user feedback. The key insight is that semantic search is not a single algorithm but a pipeline—each component contributes to overall quality, and optimizing the full pipeline yields better results than perfecting any single component.