Vector Search Optimization: HNSW, IVF, and Hybrid Retrieval

Introduction: Vector search powers semantic retrieval in RAG systems, recommendation engines, and similarity search applications. But naive vector search doesn’t scale—searching millions of vectors with brute force is too slow for production. This guide covers optimization techniques: HNSW indexes for fast approximate search, IVF partitioning for large datasets, product quantization for memory efficiency, hybrid search combining vectors with keywords, and reranking for improved relevance. These patterns help you build vector search systems that are fast, accurate, and cost-effective at scale.

Vector Search Optimization
Vector Search: From Query to Optimized Results

HNSW Index Fundamentals

# pip install hnswlib numpy

import hnswlib
import numpy as np
from typing import Tuple

class HNSWIndex:
    """HNSW (Hierarchical Navigable Small World) index for fast ANN search."""
    
    def __init__(
        self,
        dim: int,
        max_elements: int,
        ef_construction: int = 200,
        M: int = 16,
        space: str = "cosine"
    ):
        """
        Args:
            dim: Vector dimension
            max_elements: Maximum number of vectors
            ef_construction: Construction time/accuracy trade-off (higher = better quality)
            M: Number of connections per layer (higher = better recall, more memory)
            space: Distance metric ("cosine", "l2", "ip")
        """
        self.index = hnswlib.Index(space=space, dim=dim)
        self.index.init_index(
            max_elements=max_elements,
            ef_construction=ef_construction,
            M=M
        )
        self.dim = dim
    
    def add(self, vectors: np.ndarray, ids: np.ndarray = None):
        """Add vectors to the index."""
        if ids is None:
            ids = np.arange(len(vectors))
        self.index.add_items(vectors, ids)
    
    def search(
        self,
        query: np.ndarray,
        k: int = 10,
        ef: int = 50
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        Search for nearest neighbors.
        
        Args:
            query: Query vector(s)
            k: Number of results
            ef: Search time/accuracy trade-off (higher = better recall)
        
        Returns:
            (ids, distances)
        """
        self.index.set_ef(ef)
        ids, distances = self.index.knn_query(query, k=k)
        return ids, distances
    
    def save(self, path: str):
        """Save index to disk."""
        self.index.save_index(path)
    
    def load(self, path: str):
        """Load index from disk."""
        self.index.load_index(path)

# Usage
dim = 1536  # OpenAI embedding dimension
num_vectors = 100000

# Create index
index = HNSWIndex(
    dim=dim,
    max_elements=num_vectors,
    ef_construction=200,  # Higher for better quality
    M=32  # Higher for better recall
)

# Add vectors
vectors = np.random.randn(num_vectors, dim).astype(np.float32)
index.add(vectors)

# Search
query = np.random.randn(1, dim).astype(np.float32)
ids, distances = index.search(query, k=10, ef=100)

print(f"Top 10 results: {ids[0]}")
print(f"Distances: {distances[0]}")

IVF Partitioning with FAISS

# pip install faiss-cpu  # or faiss-gpu for GPU support

import faiss
import numpy as np

class IVFIndex:
    """IVF (Inverted File) index for large-scale vector search."""
    
    def __init__(
        self,
        dim: int,
        nlist: int = 100,
        nprobe: int = 10,
        use_gpu: bool = False
    ):
        """
        Args:
            dim: Vector dimension
            nlist: Number of clusters/partitions
            nprobe: Number of clusters to search (trade-off speed/recall)
            use_gpu: Use GPU acceleration
        """
        self.dim = dim
        self.nlist = nlist
        self.nprobe = nprobe
        
        # Create quantizer and index
        self.quantizer = faiss.IndexFlatL2(dim)
        self.index = faiss.IndexIVFFlat(self.quantizer, dim, nlist)
        
        if use_gpu:
            res = faiss.StandardGpuResources()
            self.index = faiss.index_cpu_to_gpu(res, 0, self.index)
        
        self.is_trained = False
    
    def train(self, vectors: np.ndarray):
        """Train the index on sample vectors."""
        if not self.is_trained:
            self.index.train(vectors.astype(np.float32))
            self.is_trained = True
    
    def add(self, vectors: np.ndarray):
        """Add vectors to the index."""
        if not self.is_trained:
            self.train(vectors)
        self.index.add(vectors.astype(np.float32))
    
    def search(self, query: np.ndarray, k: int = 10) -> Tuple[np.ndarray, np.ndarray]:
        """Search for nearest neighbors."""
        self.index.nprobe = self.nprobe
        distances, ids = self.index.search(query.astype(np.float32), k)
        return ids, distances

# Usage
dim = 1536
num_vectors = 1000000

# Create index with 1000 partitions
index = IVFIndex(
    dim=dim,
    nlist=1000,  # sqrt(num_vectors) is a good starting point
    nprobe=50    # Search 50 partitions (5% of total)
)

# Generate sample data
vectors = np.random.randn(num_vectors, dim).astype(np.float32)

# Train on subset
train_vectors = vectors[:100000]
index.train(train_vectors)

# Add all vectors
index.add(vectors)

# Search
query = np.random.randn(1, dim).astype(np.float32)
ids, distances = index.search(query, k=10)

print(f"Found {len(ids[0])} results")

Product Quantization for Memory Efficiency

class PQIndex:
    """Product Quantization index for memory-efficient search."""
    
    def __init__(
        self,
        dim: int,
        nlist: int = 100,
        m: int = 8,  # Number of subquantizers
        nbits: int = 8  # Bits per subquantizer
    ):
        """
        Args:
            dim: Vector dimension (must be divisible by m)
            nlist: Number of IVF partitions
            m: Number of subquantizers (more = better quality, more memory)
            nbits: Bits per code (8 = 256 centroids per subquantizer)
        """
        self.dim = dim
        self.nlist = nlist
        self.m = m
        
        # IVF + PQ index
        self.quantizer = faiss.IndexFlatL2(dim)
        self.index = faiss.IndexIVFPQ(
            self.quantizer, dim, nlist, m, nbits
        )
        
        self.is_trained = False
    
    def train(self, vectors: np.ndarray):
        """Train the index."""
        if not self.is_trained:
            self.index.train(vectors.astype(np.float32))
            self.is_trained = True
    
    def add(self, vectors: np.ndarray):
        """Add vectors."""
        if not self.is_trained:
            self.train(vectors)
        self.index.add(vectors.astype(np.float32))
    
    def search(self, query: np.ndarray, k: int = 10, nprobe: int = 10):
        """Search with PQ."""
        self.index.nprobe = nprobe
        distances, ids = self.index.search(query.astype(np.float32), k)
        return ids, distances
    
    def memory_usage(self) -> dict:
        """Estimate memory usage."""
        # PQ uses m bytes per vector (with 8-bit codes)
        vectors_memory = self.index.ntotal * self.m
        
        return {
            "vectors_mb": vectors_memory / (1024 * 1024),
            "full_vectors_mb": self.index.ntotal * self.dim * 4 / (1024 * 1024),
            "compression_ratio": (self.dim * 4) / self.m
        }

# Usage - 1M vectors in ~8MB instead of ~6GB
dim = 1536
num_vectors = 1000000

pq_index = PQIndex(
    dim=dim,
    nlist=1000,
    m=48,  # 48 subquantizers for 1536-dim vectors
    nbits=8
)

vectors = np.random.randn(num_vectors, dim).astype(np.float32)
pq_index.train(vectors[:100000])
pq_index.add(vectors)

memory = pq_index.memory_usage()
print(f"Memory: {memory['vectors_mb']:.1f} MB")
print(f"Full vectors would be: {memory['full_vectors_mb']:.1f} MB")
print(f"Compression ratio: {memory['compression_ratio']:.1f}x")

Hybrid Search: Vectors + Keywords

from dataclasses import dataclass
from typing import Optional
import re

@dataclass
class SearchResult:
    id: str
    text: str
    vector_score: float
    keyword_score: float
    combined_score: float

class HybridSearch:
    """Combine vector similarity with keyword matching."""
    
    def __init__(
        self,
        vector_weight: float = 0.7,
        keyword_weight: float = 0.3
    ):
        self.vector_weight = vector_weight
        self.keyword_weight = keyword_weight
        self.documents: dict[str, dict] = {}
        self.vectors: dict[str, np.ndarray] = {}
        self.inverted_index: dict[str, set[str]] = {}
    
    def _tokenize(self, text: str) -> list[str]:
        """Simple tokenization."""
        return re.findall(r'\w+', text.lower())
    
    def add_document(self, doc_id: str, text: str, vector: np.ndarray):
        """Add a document with text and vector."""
        self.documents[doc_id] = {"text": text}
        self.vectors[doc_id] = vector
        
        # Build inverted index for keyword search
        tokens = self._tokenize(text)
        for token in tokens:
            if token not in self.inverted_index:
                self.inverted_index[token] = set()
            self.inverted_index[token].add(doc_id)
    
    def _keyword_search(self, query: str, k: int) -> dict[str, float]:
        """BM25-style keyword search."""
        query_tokens = self._tokenize(query)
        scores: dict[str, float] = {}
        
        for token in query_tokens:
            if token in self.inverted_index:
                # Simple TF-IDF-like scoring
                idf = np.log(len(self.documents) / len(self.inverted_index[token]))
                
                for doc_id in self.inverted_index[token]:
                    doc_tokens = self._tokenize(self.documents[doc_id]["text"])
                    tf = doc_tokens.count(token) / len(doc_tokens)
                    
                    if doc_id not in scores:
                        scores[doc_id] = 0
                    scores[doc_id] += tf * idf
        
        return scores
    
    def _vector_search(self, query_vector: np.ndarray, k: int) -> dict[str, float]:
        """Cosine similarity vector search."""
        scores = {}
        
        for doc_id, doc_vector in self.vectors.items():
            similarity = np.dot(query_vector, doc_vector) / (
                np.linalg.norm(query_vector) * np.linalg.norm(doc_vector)
            )
            scores[doc_id] = float(similarity)
        
        return scores
    
    def search(
        self,
        query: str,
        query_vector: np.ndarray,
        k: int = 10
    ) -> list[SearchResult]:
        """Hybrid search combining vectors and keywords."""
        
        # Get scores from both methods
        keyword_scores = self._keyword_search(query, k * 2)
        vector_scores = self._vector_search(query_vector, k * 2)
        
        # Normalize scores
        def normalize(scores: dict) -> dict:
            if not scores:
                return scores
            max_score = max(scores.values())
            min_score = min(scores.values())
            range_score = max_score - min_score or 1
            return {k: (v - min_score) / range_score for k, v in scores.items()}
        
        keyword_scores = normalize(keyword_scores)
        vector_scores = normalize(vector_scores)
        
        # Combine scores
        all_ids = set(keyword_scores.keys()) | set(vector_scores.keys())
        results = []
        
        for doc_id in all_ids:
            v_score = vector_scores.get(doc_id, 0)
            k_score = keyword_scores.get(doc_id, 0)
            combined = (
                self.vector_weight * v_score +
                self.keyword_weight * k_score
            )
            
            results.append(SearchResult(
                id=doc_id,
                text=self.documents[doc_id]["text"][:100],
                vector_score=v_score,
                keyword_score=k_score,
                combined_score=combined
            ))
        
        # Sort by combined score
        results.sort(key=lambda x: x.combined_score, reverse=True)
        return results[:k]

# Usage
hybrid = HybridSearch(vector_weight=0.6, keyword_weight=0.4)

# Add documents
docs = [
    ("doc1", "Python is a programming language for data science"),
    ("doc2", "JavaScript is used for web development"),
    ("doc3", "Machine learning with Python and TensorFlow"),
]

for doc_id, text in docs:
    vector = np.random.randn(384).astype(np.float32)  # Simulated embedding
    hybrid.add_document(doc_id, text, vector)

# Search
query = "Python programming"
query_vector = np.random.randn(384).astype(np.float32)

results = hybrid.search(query, query_vector, k=3)
for r in results:
    print(f"{r.id}: {r.combined_score:.3f} (v:{r.vector_score:.2f}, k:{r.keyword_score:.2f})")

Reranking for Better Relevance

from openai import OpenAI

client = OpenAI()

class CrossEncoderReranker:
    """Rerank results using cross-encoder or LLM."""
    
    def __init__(self, model: str = "gpt-4o-mini"):
        self.model = model
    
    def rerank_with_llm(
        self,
        query: str,
        documents: list[dict],
        top_k: int = 5
    ) -> list[dict]:
        """Rerank using LLM scoring."""
        
        # Format documents for LLM
        doc_list = "\n".join([
            f"[{i}] {doc['text'][:500]}"
            for i, doc in enumerate(documents)
        ])
        
        prompt = f"""Given the query and documents, rank the documents by relevance.
Return a JSON array of document indices in order of relevance (most relevant first).

Query: {query}

Documents:
{doc_list}

Return only the JSON array of indices, e.g., [2, 0, 4, 1, 3]"""
        
        response = client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        import json
        result = json.loads(response.choices[0].message.content)
        ranking = result.get("ranking", result.get("indices", list(range(len(documents)))))
        
        # Reorder documents
        reranked = []
        for idx in ranking[:top_k]:
            if 0 <= idx < len(documents):
                doc = documents[idx].copy()
                doc["rerank_position"] = len(reranked)
                reranked.append(doc)
        
        return reranked
    
    def rerank_with_embeddings(
        self,
        query: str,
        documents: list[dict],
        top_k: int = 5
    ) -> list[dict]:
        """Rerank using query-document embedding similarity."""
        
        # Get embeddings for query and all documents
        texts = [query] + [doc["text"] for doc in documents]
        
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=texts
        )
        
        embeddings = [item.embedding for item in response.data]
        query_embed = np.array(embeddings[0])
        doc_embeds = [np.array(e) for e in embeddings[1:]]
        
        # Calculate similarities
        scores = []
        for i, doc_embed in enumerate(doc_embeds):
            similarity = np.dot(query_embed, doc_embed) / (
                np.linalg.norm(query_embed) * np.linalg.norm(doc_embed)
            )
            scores.append((i, similarity))
        
        # Sort by similarity
        scores.sort(key=lambda x: x[1], reverse=True)
        
        # Return reranked documents
        reranked = []
        for idx, score in scores[:top_k]:
            doc = documents[idx].copy()
            doc["rerank_score"] = float(score)
            reranked.append(doc)
        
        return reranked

# Usage
reranker = CrossEncoderReranker()

# Initial retrieval results
initial_results = [
    {"id": "1", "text": "Python basics for beginners"},
    {"id": "2", "text": "Advanced Python programming patterns"},
    {"id": "3", "text": "JavaScript fundamentals"},
    {"id": "4", "text": "Python machine learning tutorial"},
    {"id": "5", "text": "Web development with Python Django"},
]

query = "Python for machine learning"

# Rerank
reranked = reranker.rerank_with_embeddings(query, initial_results, top_k=3)

for doc in reranked:
    print(f"{doc['id']}: {doc['text']} (score: {doc.get('rerank_score', 'N/A'):.3f})")

Production Vector Search Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import time

app = FastAPI()

class SearchRequest(BaseModel):
    query: str
    query_vector: Optional[list[float]] = None
    k: int = 10
    use_reranking: bool = True
    hybrid_search: bool = True

class SearchResponse(BaseModel):
    results: list[dict]
    latency_ms: float
    total_candidates: int

# Initialize components
hnsw_index = HNSWIndex(dim=1536, max_elements=1000000)
hybrid_searcher = HybridSearch()
reranker = CrossEncoderReranker()

@app.post("/search", response_model=SearchResponse)
async def search(request: SearchRequest):
    """Optimized vector search endpoint."""
    
    start_time = time.time()
    
    # Get query embedding if not provided
    if request.query_vector is None:
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=request.query
        )
        query_vector = np.array(response.data[0].embedding)
    else:
        query_vector = np.array(request.query_vector)
    
    # Stage 1: Fast ANN retrieval (get more candidates for reranking)
    candidate_k = request.k * 3 if request.use_reranking else request.k
    
    if request.hybrid_search:
        results = hybrid_searcher.search(
            request.query,
            query_vector,
            k=candidate_k
        )
        candidates = [{"id": r.id, "text": r.text, "score": r.combined_score} for r in results]
    else:
        ids, distances = hnsw_index.search(
            query_vector.reshape(1, -1).astype(np.float32),
            k=candidate_k,
            ef=100
        )
        candidates = [{"id": str(i), "score": 1 - d} for i, d in zip(ids[0], distances[0])]
    
    total_candidates = len(candidates)
    
    # Stage 2: Reranking
    if request.use_reranking and len(candidates) > 0:
        results = reranker.rerank_with_embeddings(
            request.query,
            candidates,
            top_k=request.k
        )
    else:
        results = candidates[:request.k]
    
    latency_ms = (time.time() - start_time) * 1000
    
    return SearchResponse(
        results=results,
        latency_ms=latency_ms,
        total_candidates=total_candidates
    )

@app.get("/stats")
async def get_stats():
    """Get index statistics."""
    return {
        "total_vectors": hnsw_index.index.get_current_count(),
        "index_size_mb": hnsw_index.index.get_current_count() * 1536 * 4 / (1024 * 1024)
    }

References

Conclusion

Optimizing vector search requires understanding the trade-offs between speed, accuracy, and memory. HNSW provides excellent query performance with tunable recall via the ef parameter. IVF partitioning scales to billions of vectors by limiting search to relevant clusters. Product quantization dramatically reduces memory usage with acceptable accuracy loss. Hybrid search combining vectors with keywords improves relevance for queries with specific terms. Reranking with cross-encoders or LLMs provides the final quality boost for top results. Start with HNSW for most use cases, add PQ when memory is constrained, and implement reranking when precision matters most.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.