Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Vector Search Optimization: Embedding Models, Hybrid Search, and Reranking Strategies

Introduction: Vector search is the foundation of modern RAG systems, but naive implementations often deliver poor results. Optimizing vector search requires understanding embedding models, index types, query strategies, and reranking techniques. The difference between a basic similarity search and a well-tuned retrieval pipeline can be dramatic—both in relevance and latency. This guide covers practical vector search optimization: embedding model selection, index configuration, hybrid search combining dense and sparse retrieval, reranking strategies, and production patterns for building high-performance semantic search systems.

Vector Search
Vector Search: Query Embedding, Vector Index, Reranking

Embedding Model Selection

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

@dataclass
class EmbeddingModel:
    """Configuration for an embedding model."""
    
    name: str
    dimension: int
    max_tokens: int
    normalize: bool = True
    
    # Performance characteristics
    latency_ms: float = 0.0
    cost_per_1k: float = 0.0

# Common embedding models
EMBEDDING_MODELS = {
    "text-embedding-3-small": EmbeddingModel(
        name="text-embedding-3-small",
        dimension=1536,
        max_tokens=8191,
        latency_ms=50,
        cost_per_1k=0.00002
    ),
    "text-embedding-3-large": EmbeddingModel(
        name="text-embedding-3-large",
        dimension=3072,
        max_tokens=8191,
        latency_ms=80,
        cost_per_1k=0.00013
    ),
    "voyage-large-2": EmbeddingModel(
        name="voyage-large-2",
        dimension=1536,
        max_tokens=16000,
        latency_ms=100,
        cost_per_1k=0.00012
    ),
    "bge-large-en-v1.5": EmbeddingModel(
        name="bge-large-en-v1.5",
        dimension=1024,
        max_tokens=512,
        latency_ms=20,
        cost_per_1k=0.0  # Self-hosted
    ),
}

class EmbeddingClient:
    """Client for generating embeddings."""
    
    def __init__(
        self,
        client: Any,
        model: str = "text-embedding-3-small"
    ):
        self.client = client
        self.model = model
        self.config = EMBEDDING_MODELS.get(model)
    
    async def embed(self, text: str) -> np.ndarray:
        """Generate embedding for single text."""
        
        response = await self.client.embeddings.create(
            model=self.model,
            input=text
        )
        
        embedding = np.array(response.data[0].embedding)
        
        if self.config and self.config.normalize:
            embedding = embedding / np.linalg.norm(embedding)
        
        return embedding
    
    async def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
        """Generate embeddings for batch of texts."""
        
        response = await self.client.embeddings.create(
            model=self.model,
            input=texts
        )
        
        embeddings = [
            np.array(item.embedding)
            for item in response.data
        ]
        
        if self.config and self.config.normalize:
            embeddings = [
                e / np.linalg.norm(e)
                for e in embeddings
            ]
        
        return embeddings

class DimensionReducer:
    """Reduce embedding dimensions for efficiency."""
    
    def __init__(self, target_dim: int = 256):
        self.target_dim = target_dim
        self.projection_matrix = None
    
    def fit(self, embeddings: np.ndarray):
        """Fit PCA-like projection."""
        
        from sklearn.decomposition import PCA
        
        pca = PCA(n_components=self.target_dim)
        pca.fit(embeddings)
        
        self.projection_matrix = pca.components_.T
    
    def transform(self, embedding: np.ndarray) -> np.ndarray:
        """Reduce dimension of embedding."""
        
        if self.projection_matrix is None:
            raise ValueError("Must fit before transform")
        
        reduced = embedding @ self.projection_matrix
        return reduced / np.linalg.norm(reduced)

class MatryoshkaEmbedding:
    """Use Matryoshka embeddings for variable dimensions."""
    
    def __init__(self, client: Any, model: str = "text-embedding-3-large"):
        self.client = client
        self.model = model
    
    async def embed(
        self,
        text: str,
        dimensions: int = 256
    ) -> np.ndarray:
        """Generate embedding with specified dimensions."""
        
        response = await self.client.embeddings.create(
            model=self.model,
            input=text,
            dimensions=dimensions
        )
        
        embedding = np.array(response.data[0].embedding)
        return embedding / np.linalg.norm(embedding)

Index Configuration

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
import numpy as np

class IndexType(Enum):
    FLAT = "flat"           # Exact search
    HNSW = "hnsw"           # Approximate nearest neighbor
    IVF = "ivf"             # Inverted file index
    PQ = "pq"               # Product quantization

@dataclass
class IndexConfig:
    """Configuration for vector index."""
    
    index_type: IndexType
    dimension: int
    metric: str = "cosine"  # cosine, euclidean, dot_product
    
    # HNSW parameters
    hnsw_m: int = 16        # Number of connections
    hnsw_ef_construction: int = 200
    hnsw_ef_search: int = 100
    
    # IVF parameters
    ivf_nlist: int = 100    # Number of clusters
    ivf_nprobe: int = 10    # Clusters to search

class VectorIndex:
    """In-memory vector index using FAISS."""
    
    def __init__(self, config: IndexConfig):
        self.config = config
        self.index = None
        self.metadata: list[dict] = []
    
    def build(self, embeddings: np.ndarray, metadata: list[dict] = None):
        """Build index from embeddings."""
        
        import faiss
        
        dimension = embeddings.shape[1]
        
        if self.config.index_type == IndexType.FLAT:
            if self.config.metric == "cosine":
                self.index = faiss.IndexFlatIP(dimension)
            else:
                self.index = faiss.IndexFlatL2(dimension)
        
        elif self.config.index_type == IndexType.HNSW:
            self.index = faiss.IndexHNSWFlat(
                dimension,
                self.config.hnsw_m
            )
            self.index.hnsw.efConstruction = self.config.hnsw_ef_construction
            self.index.hnsw.efSearch = self.config.hnsw_ef_search
        
        elif self.config.index_type == IndexType.IVF:
            quantizer = faiss.IndexFlatL2(dimension)
            self.index = faiss.IndexIVFFlat(
                quantizer,
                dimension,
                self.config.ivf_nlist
            )
            self.index.train(embeddings)
            self.index.nprobe = self.config.ivf_nprobe
        
        # Normalize for cosine similarity
        if self.config.metric == "cosine":
            faiss.normalize_L2(embeddings)
        
        self.index.add(embeddings)
        self.metadata = metadata or [{}] * len(embeddings)
    
    def search(
        self,
        query_embedding: np.ndarray,
        k: int = 10
    ) -> list[tuple[int, float, dict]]:
        """Search for nearest neighbors."""
        
        import faiss
        
        query = query_embedding.reshape(1, -1).astype('float32')
        
        if self.config.metric == "cosine":
            faiss.normalize_L2(query)
        
        distances, indices = self.index.search(query, k)
        
        results = []
        for i, (idx, dist) in enumerate(zip(indices[0], distances[0])):
            if idx >= 0:  # Valid index
                results.append((
                    int(idx),
                    float(dist),
                    self.metadata[idx] if idx < len(self.metadata) else {}
                ))
        
        return results

class PineconeIndex:
    """Pinecone vector index wrapper."""
    
    def __init__(
        self,
        api_key: str,
        index_name: str,
        dimension: int
    ):
        from pinecone import Pinecone
        
        self.pc = Pinecone(api_key=api_key)
        self.index = self.pc.Index(index_name)
        self.dimension = dimension
    
    def upsert(
        self,
        ids: list[str],
        embeddings: list[np.ndarray],
        metadata: list[dict] = None
    ):
        """Upsert vectors to index."""
        
        vectors = [
            {
                "id": id,
                "values": emb.tolist(),
                "metadata": meta or {}
            }
            for id, emb, meta in zip(
                ids,
                embeddings,
                metadata or [{}] * len(ids)
            )
        ]
        
        # Batch upsert
        batch_size = 100
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            self.index.upsert(vectors=batch)
    
    def search(
        self,
        query_embedding: np.ndarray,
        k: int = 10,
        filter: dict = None
    ) -> list[dict]:
        """Search for nearest neighbors."""
        
        results = self.index.query(
            vector=query_embedding.tolist(),
            top_k=k,
            filter=filter,
            include_metadata=True
        )
        
        return [
            {
                "id": match.id,
                "score": match.score,
                "metadata": match.metadata
            }
            for match in results.matches
        ]

Hybrid Search

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

@dataclass
class SearchResult:
    """A search result with score."""
    
    id: str
    score: float
    content: str
    metadata: dict = None

class BM25Index:
    """BM25 sparse retrieval index."""
    
    def __init__(self):
        self.documents: list[str] = []
        self.doc_ids: list[str] = []
        self.bm25 = None
    
    def build(self, documents: list[str], doc_ids: list[str] = None):
        """Build BM25 index."""
        
        from rank_bm25 import BM25Okapi
        
        self.documents = documents
        self.doc_ids = doc_ids or [str(i) for i in range(len(documents))]
        
        # Tokenize documents
        tokenized = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized)
    
    def search(self, query: str, k: int = 10) -> list[SearchResult]:
        """Search using BM25."""
        
        tokenized_query = query.lower().split()
        scores = self.bm25.get_scores(tokenized_query)
        
        # Get top k
        top_indices = np.argsort(scores)[::-1][:k]
        
        results = []
        for idx in top_indices:
            if scores[idx] > 0:
                results.append(SearchResult(
                    id=self.doc_ids[idx],
                    score=float(scores[idx]),
                    content=self.documents[idx]
                ))
        
        return results

class HybridSearcher:
    """Combine dense and sparse retrieval."""
    
    def __init__(
        self,
        dense_index: VectorIndex,
        sparse_index: BM25Index,
        embedding_client: EmbeddingClient,
        alpha: float = 0.5  # Weight for dense vs sparse
    ):
        self.dense_index = dense_index
        self.sparse_index = sparse_index
        self.embedding_client = embedding_client
        self.alpha = alpha
    
    async def search(
        self,
        query: str,
        k: int = 10
    ) -> list[SearchResult]:
        """Hybrid search combining dense and sparse."""
        
        # Dense search
        query_embedding = await self.embedding_client.embed(query)
        dense_results = self.dense_index.search(query_embedding, k * 2)
        
        # Sparse search
        sparse_results = self.sparse_index.search(query, k * 2)
        
        # Combine with reciprocal rank fusion
        return self._reciprocal_rank_fusion(
            dense_results,
            sparse_results,
            k
        )
    
    def _reciprocal_rank_fusion(
        self,
        dense_results: list,
        sparse_results: list[SearchResult],
        k: int,
        rrf_k: int = 60
    ) -> list[SearchResult]:
        """Combine results using RRF."""
        
        scores = {}
        contents = {}
        
        # Score dense results
        for rank, (idx, score, meta) in enumerate(dense_results):
            doc_id = str(idx)
            rrf_score = 1.0 / (rrf_k + rank + 1)
            scores[doc_id] = scores.get(doc_id, 0) + self.alpha * rrf_score
            contents[doc_id] = meta.get("content", "")
        
        # Score sparse results
        for rank, result in enumerate(sparse_results):
            rrf_score = 1.0 / (rrf_k + rank + 1)
            scores[result.id] = scores.get(result.id, 0) + (1 - self.alpha) * rrf_score
            contents[result.id] = result.content
        
        # Sort by combined score
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        
        return [
            SearchResult(
                id=doc_id,
                score=scores[doc_id],
                content=contents.get(doc_id, "")
            )
            for doc_id in sorted_ids[:k]
        ]

class AdaptiveHybridSearcher(HybridSearcher):
    """Hybrid search with adaptive weighting."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.query_classifier = None
    
    async def search(
        self,
        query: str,
        k: int = 10
    ) -> list[SearchResult]:
        """Search with adaptive alpha based on query type."""
        
        # Classify query
        alpha = self._classify_query(query)
        self.alpha = alpha
        
        return await super().search(query, k)
    
    def _classify_query(self, query: str) -> float:
        """Determine optimal alpha for query."""
        
        # Keyword-heavy queries favor sparse
        words = query.split()
        
        if len(words) <= 3:
            return 0.3  # Favor sparse for short queries
        
        # Questions favor dense
        if query.strip().endswith("?"):
            return 0.7  # Favor dense for questions
        
        return 0.5  # Balanced default

Reranking

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

class CrossEncoderReranker:
    """Rerank using cross-encoder model."""
    
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        
        self.model = CrossEncoder(model_name)
    
    def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = None
    ) -> list[SearchResult]:
        """Rerank results using cross-encoder."""
        
        if not results:
            return results
        
        # Create query-document pairs
        pairs = [(query, r.content) for r in results]
        
        # Score pairs
        scores = self.model.predict(pairs)
        
        # Sort by score
        scored_results = list(zip(results, scores))
        scored_results.sort(key=lambda x: x[1], reverse=True)
        
        # Update scores and return
        reranked = []
        for result, score in scored_results[:top_k]:
            result.score = float(score)
            reranked.append(result)
        
        return reranked

class CohereReranker:
    """Rerank using Cohere's rerank API."""
    
    def __init__(self, api_key: str, model: str = "rerank-english-v3.0"):
        import cohere
        
        self.client = cohere.Client(api_key)
        self.model = model
    
    def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = None
    ) -> list[SearchResult]:
        """Rerank using Cohere API."""
        
        if not results:
            return results
        
        documents = [r.content for r in results]
        
        response = self.client.rerank(
            model=self.model,
            query=query,
            documents=documents,
            top_n=top_k or len(results)
        )
        
        reranked = []
        for item in response.results:
            result = results[item.index]
            result.score = item.relevance_score
            reranked.append(result)
        
        return reranked

class LLMReranker:
    """Rerank using LLM scoring."""
    
    RERANK_PROMPT = """Rate the relevance of the following document to the query on a scale of 0-10.

Query: {query}

Document: {document}

Respond with only a number from 0-10."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = None
    ) -> list[SearchResult]:
        """Rerank using LLM scoring."""
        
        import asyncio
        
        async def score_result(result: SearchResult) -> tuple[SearchResult, float]:
            prompt = self.RERANK_PROMPT.format(
                query=query,
                document=result.content[:1000]  # Truncate
            )
            
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=10
            )
            
            try:
                score = float(response.choices[0].message.content.strip())
            except ValueError:
                score = 5.0
            
            return result, score
        
        # Score all results in parallel
        scored = await asyncio.gather(*[
            score_result(r) for r in results
        ])
        
        # Sort by score
        scored.sort(key=lambda x: x[1], reverse=True)
        
        reranked = []
        for result, score in scored[:top_k]:
            result.score = score / 10.0  # Normalize to 0-1
            reranked.append(result)
        
        return reranked

class MultiStageReranker:
    """Multi-stage reranking pipeline."""
    
    def __init__(self, stages: list):
        self.stages = stages
    
    async def rerank(
        self,
        query: str,
        results: list[SearchResult],
        top_k: int = None
    ) -> list[SearchResult]:
        """Apply reranking stages sequentially."""
        
        current_results = results
        
        for i, stage in enumerate(self.stages):
            # Progressively reduce candidates
            stage_k = max(top_k * (len(self.stages) - i), top_k) if top_k else None
            
            if hasattr(stage, 'rerank'):
                if asyncio.iscoroutinefunction(stage.rerank):
                    current_results = await stage.rerank(query, current_results, stage_k)
                else:
                    current_results = stage.rerank(query, current_results, stage_k)
        
        return current_results[:top_k] if top_k else current_results

Production Search Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components (placeholder)
embedding_client = None
vector_index = None
hybrid_searcher = None
reranker = None

class SearchRequest(BaseModel):
    query: str
    k: int = 10
    use_reranking: bool = True
    filter: Optional[dict] = None

class IndexRequest(BaseModel):
    documents: list[str]
    ids: Optional[list[str]] = None
    metadata: Optional[list[dict]] = None

@app.post("/v1/search")
async def search(request: SearchRequest):
    """Search for relevant documents."""
    
    # Get initial results
    if hybrid_searcher:
        results = await hybrid_searcher.search(
            request.query,
            k=request.k * 2 if request.use_reranking else request.k
        )
    else:
        # Dense-only search
        query_embedding = await embedding_client.embed(request.query)
        raw_results = vector_index.search(query_embedding, request.k * 2)
        
        results = [
            SearchResult(
                id=str(idx),
                score=score,
                content=meta.get("content", ""),
                metadata=meta
            )
            for idx, score, meta in raw_results
        ]
    
    # Rerank if enabled
    if request.use_reranking and reranker:
        results = await reranker.rerank(
            request.query,
            results,
            top_k=request.k
        )
    else:
        results = results[:request.k]
    
    return {
        "results": [
            {
                "id": r.id,
                "score": r.score,
                "content": r.content,
                "metadata": r.metadata
            }
            for r in results
        ]
    }

@app.post("/v1/index")
async def index_documents(request: IndexRequest):
    """Index new documents."""
    
    # Generate embeddings
    embeddings = await embedding_client.embed_batch(request.documents)
    
    # Build metadata
    metadata = request.metadata or [{}] * len(request.documents)
    for i, doc in enumerate(request.documents):
        metadata[i]["content"] = doc
    
    # Add to index
    embeddings_array = np.array(embeddings)
    vector_index.build(embeddings_array, metadata)
    
    return {
        "indexed": len(request.documents),
        "dimension": embeddings_array.shape[1]
    }

@app.get("/v1/stats")
async def get_stats():
    """Get index statistics."""
    
    return {
        "total_vectors": vector_index.index.ntotal if vector_index else 0,
        "dimension": vector_index.config.dimension if vector_index else 0,
        "index_type": vector_index.config.index_type.value if vector_index else None
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Vector search optimization transforms basic similarity search into high-quality retrieval. Start with the right embedding model for your domain—general-purpose models work well for most cases, but domain-specific models can significantly improve relevance. Choose index types based on your scale: flat indexes for small datasets, HNSW for balanced performance, IVF for large-scale deployments. Hybrid search combining dense and sparse retrieval often outperforms either alone—use reciprocal rank fusion to combine results effectively. Reranking with cross-encoders or LLMs dramatically improves precision at the cost of latency—use multi-stage pipelines to balance quality and speed. Monitor retrieval metrics (recall, precision, MRR) to guide optimization decisions. The goal is building retrieval systems that consistently surface the most relevant content for your users.