Latest Articles

Vector Search Algorithms: From Brute Force to HNSW and Beyond

Introduction: Vector search is the foundation of modern semantic retrieval systems, enabling applications to find similar items based on meaning rather than exact keyword matches. Understanding the algorithms behind vector search—from brute-force linear scan to sophisticated approximate nearest neighbor (ANN) methods—is essential for building efficient retrieval systems. This guide covers the core algorithms that power vector databases: HNSW (Hierarchical Navigable Small World), IVF (Inverted File Index), Product Quantization, LSH (Locality Sensitive Hashing), and tree-based methods. Each algorithm makes different tradeoffs between search accuracy, speed, memory usage, and index build time. Whether you’re building a recommendation system, semantic search engine, or RAG application, these patterns will help you choose and implement the right search algorithm for your use case.

Vector Search Algorithms
Vector Search: Index Structure, Similarity Search, Result Ranking

Brute Force and Distance Metrics

from dataclasses import dataclass
from typing import List, Tuple, Callable
import numpy as np
from enum import Enum

class DistanceMetric(Enum):
    """Supported distance metrics."""
    
    COSINE = "cosine"
    EUCLIDEAN = "euclidean"
    DOT_PRODUCT = "dot_product"
    MANHATTAN = "manhattan"

@dataclass
class SearchResult:
    """A search result with score and metadata."""
    
    id: str
    score: float
    vector: np.ndarray
    metadata: dict = None

class DistanceCalculator:
    """Calculate distances between vectors."""
    
    @staticmethod
    def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
        """Cosine similarity (higher is more similar)."""
        
        dot = np.dot(a, b)
        norm_a = np.linalg.norm(a)
        norm_b = np.linalg.norm(b)
        
        if norm_a == 0 or norm_b == 0:
            return 0.0
        
        return dot / (norm_a * norm_b)
    
    @staticmethod
    def euclidean_distance(a: np.ndarray, b: np.ndarray) -> float:
        """Euclidean distance (lower is more similar)."""
        
        return np.linalg.norm(a - b)
    
    @staticmethod
    def dot_product(a: np.ndarray, b: np.ndarray) -> float:
        """Dot product (higher is more similar for normalized vectors)."""
        
        return np.dot(a, b)
    
    @staticmethod
    def manhattan_distance(a: np.ndarray, b: np.ndarray) -> float:
        """Manhattan distance (lower is more similar)."""
        
        return np.sum(np.abs(a - b))
    
    @classmethod
    def get_metric(cls, metric: DistanceMetric) -> Callable:
        """Get distance function for metric."""
        
        metrics = {
            DistanceMetric.COSINE: cls.cosine_similarity,
            DistanceMetric.EUCLIDEAN: cls.euclidean_distance,
            DistanceMetric.DOT_PRODUCT: cls.dot_product,
            DistanceMetric.MANHATTAN: cls.manhattan_distance
        }
        
        return metrics[metric]

class BruteForceIndex:
    """Exact nearest neighbor search (brute force)."""
    
    def __init__(self, metric: DistanceMetric = DistanceMetric.COSINE):
        self.metric = metric
        self.distance_fn = DistanceCalculator.get_metric(metric)
        
        self.vectors: list[np.ndarray] = []
        self.ids: list[str] = []
        self.metadata: list[dict] = []
    
    def add(self, id: str, vector: np.ndarray, metadata: dict = None):
        """Add vector to index."""
        
        self.ids.append(id)
        self.vectors.append(vector)
        self.metadata.append(metadata or {})
    
    def add_batch(
        self,
        ids: list[str],
        vectors: list[np.ndarray],
        metadata: list[dict] = None
    ):
        """Add batch of vectors."""
        
        metadata = metadata or [{}] * len(ids)
        
        for id, vector, meta in zip(ids, vectors, metadata):
            self.add(id, vector, meta)
    
    def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
        """Search for k nearest neighbors."""
        
        if not self.vectors:
            return []
        
        # Calculate all distances
        scores = []
        for i, vector in enumerate(self.vectors):
            score = self.distance_fn(query, vector)
            scores.append((i, score))
        
        # Sort by score
        if self.metric in [DistanceMetric.COSINE, DistanceMetric.DOT_PRODUCT]:
            # Higher is better
            scores.sort(key=lambda x: x[1], reverse=True)
        else:
            # Lower is better
            scores.sort(key=lambda x: x[1])
        
        # Return top k
        results = []
        for i, score in scores[:k]:
            results.append(SearchResult(
                id=self.ids[i],
                score=score,
                vector=self.vectors[i],
                metadata=self.metadata[i]
            ))
        
        return results
    
    def __len__(self) -> int:
        return len(self.vectors)

class OptimizedBruteForce:
    """Optimized brute force using matrix operations."""
    
    def __init__(self, metric: DistanceMetric = DistanceMetric.COSINE):
        self.metric = metric
        
        self.vectors: np.ndarray = None
        self.ids: list[str] = []
        self.metadata: list[dict] = []
        self.normalized = False
    
    def build(
        self,
        ids: list[str],
        vectors: np.ndarray,
        metadata: list[dict] = None
    ):
        """Build index from vectors."""
        
        self.ids = ids
        self.vectors = vectors.astype(np.float32)
        self.metadata = metadata or [{}] * len(ids)
        
        # Normalize for cosine similarity
        if self.metric == DistanceMetric.COSINE:
            norms = np.linalg.norm(self.vectors, axis=1, keepdims=True)
            self.vectors = self.vectors / np.maximum(norms, 1e-10)
            self.normalized = True
    
    def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
        """Search using matrix multiplication."""
        
        if self.vectors is None:
            return []
        
        query = query.astype(np.float32)
        
        if self.normalized:
            query = query / np.maximum(np.linalg.norm(query), 1e-10)
        
        # Matrix multiplication for all similarities
        if self.metric in [DistanceMetric.COSINE, DistanceMetric.DOT_PRODUCT]:
            scores = np.dot(self.vectors, query)
            top_indices = np.argsort(scores)[::-1][:k]
        else:
            # Euclidean distance
            diff = self.vectors - query
            scores = np.linalg.norm(diff, axis=1)
            top_indices = np.argsort(scores)[:k]
        
        results = []
        for i in top_indices:
            results.append(SearchResult(
                id=self.ids[i],
                score=float(scores[i]),
                vector=self.vectors[i],
                metadata=self.metadata[i]
            ))
        
        return results

HNSW Algorithm

from dataclasses import dataclass, field
from typing import Set, Dict
import numpy as np
import heapq
import random

@dataclass
class HNSWConfig:
    """HNSW configuration."""
    
    M: int = 16  # Max connections per layer
    ef_construction: int = 200  # Search width during construction
    ef_search: int = 50  # Search width during query
    ml: float = 1 / np.log(16)  # Level multiplier

class HNSWNode:
    """A node in the HNSW graph."""
    
    def __init__(self, id: str, vector: np.ndarray, level: int):
        self.id = id
        self.vector = vector
        self.level = level
        
        # Connections at each level
        self.connections: dict[int, set[str]] = {
            l: set() for l in range(level + 1)
        }

class HNSWIndex:
    """Hierarchical Navigable Small World index."""
    
    def __init__(self, config: HNSWConfig = None):
        self.config = config or HNSWConfig()
        
        self.nodes: dict[str, HNSWNode] = {}
        self.entry_point: str = None
        self.max_level: int = -1
    
    def _random_level(self) -> int:
        """Generate random level for new node."""
        
        level = 0
        while random.random() < self.config.ml and level < 32:
            level += 1
        return level
    
    def _distance(self, a: np.ndarray, b: np.ndarray) -> float:
        """Calculate distance (using negative cosine for min-heap)."""
        
        dot = np.dot(a, b)
        norm_a = np.linalg.norm(a)
        norm_b = np.linalg.norm(b)
        
        if norm_a == 0 or norm_b == 0:
            return float('inf')
        
        # Return negative similarity for min-heap
        return -dot / (norm_a * norm_b)
    
    def _search_layer(
        self,
        query: np.ndarray,
        entry_points: set[str],
        ef: int,
        level: int
    ) -> list[Tuple[float, str]]:
        """Search a single layer."""
        
        visited = set(entry_points)
        candidates = []
        results = []
        
        # Initialize with entry points
        for ep in entry_points:
            dist = self._distance(query, self.nodes[ep].vector)
            heapq.heappush(candidates, (dist, ep))
            heapq.heappush(results, (-dist, ep))
        
        while candidates:
            dist, current = heapq.heappop(candidates)
            
            # Check if we can stop
            if results and -results[0][0] < dist:
                break
            
            # Explore neighbors
            node = self.nodes[current]
            
            if level in node.connections:
                for neighbor_id in node.connections[level]:
                    if neighbor_id not in visited:
                        visited.add(neighbor_id)
                        
                        neighbor = self.nodes[neighbor_id]
                        neighbor_dist = self._distance(query, neighbor.vector)
                        
                        if len(results) < ef or neighbor_dist < -results[0][0]:
                            heapq.heappush(candidates, (neighbor_dist, neighbor_id))
                            heapq.heappush(results, (-neighbor_dist, neighbor_id))
                            
                            if len(results) > ef:
                                heapq.heappop(results)
        
        # Convert to list sorted by distance
        return [(−score, id) for score, id in sorted(results, reverse=True)]
    
    def _select_neighbors(
        self,
        query: np.ndarray,
        candidates: list[Tuple[float, str]],
        M: int
    ) -> list[str]:
        """Select M best neighbors using heuristic."""
        
        # Simple: just take M closest
        return [id for _, id in candidates[:M]]
    
    def add(self, id: str, vector: np.ndarray):
        """Add vector to index."""
        
        level = self._random_level()
        node = HNSWNode(id, vector, level)
        self.nodes[id] = node
        
        if self.entry_point is None:
            self.entry_point = id
            self.max_level = level
            return
        
        # Find entry point for insertion
        current_ep = {self.entry_point}
        
        # Traverse from top to insertion level
        for l in range(self.max_level, level, -1):
            results = self._search_layer(vector, current_ep, 1, l)
            if results:
                current_ep = {results[0][1]}
        
        # Insert at each level
        for l in range(min(level, self.max_level), -1, -1):
            results = self._search_layer(
                vector, current_ep, self.config.ef_construction, l
            )
            
            neighbors = self._select_neighbors(
                vector, results, self.config.M
            )
            
            # Add bidirectional connections
            for neighbor_id in neighbors:
                node.connections[l].add(neighbor_id)
                self.nodes[neighbor_id].connections[l].add(id)
                
                # Prune if too many connections
                if len(self.nodes[neighbor_id].connections[l]) > self.config.M:
                    # Keep M closest
                    neighbor = self.nodes[neighbor_id]
                    dists = [
                        (self._distance(neighbor.vector, self.nodes[n].vector), n)
                        for n in neighbor.connections[l]
                    ]
                    dists.sort()
                    neighbor.connections[l] = set(n for _, n in dists[:self.config.M])
            
            current_ep = set(neighbors) if neighbors else current_ep
        
        # Update entry point if needed
        if level > self.max_level:
            self.entry_point = id
            self.max_level = level
    
    def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
        """Search for k nearest neighbors."""
        
        if not self.nodes:
            return []
        
        current_ep = {self.entry_point}
        
        # Traverse from top to bottom
        for l in range(self.max_level, 0, -1):
            results = self._search_layer(query, current_ep, 1, l)
            if results:
                current_ep = {results[0][1]}
        
        # Search bottom layer
        results = self._search_layer(
            query, current_ep, self.config.ef_search, 0
        )
        
        # Convert to SearchResult
        search_results = []
        for dist, id in results[:k]:
            node = self.nodes[id]
            search_results.append(SearchResult(
                id=id,
                score=-dist,  # Convert back to similarity
                vector=node.vector,
                metadata={}
            ))
        
        return search_results

IVF Index

from dataclasses import dataclass
from typing import List, Tuple
import numpy as np
from sklearn.cluster import KMeans

@dataclass
class IVFConfig:
    """IVF configuration."""
    
    n_clusters: int = 100
    n_probe: int = 10  # Number of clusters to search

class IVFIndex:
    """Inverted File Index for approximate search."""
    
    def __init__(self, config: IVFConfig = None):
        self.config = config or IVFConfig()
        
        self.centroids: np.ndarray = None
        self.inverted_lists: dict[int, list[Tuple[str, np.ndarray]]] = {}
        self.trained = False
    
    def train(self, vectors: np.ndarray):
        """Train cluster centroids."""
        
        n_clusters = min(self.config.n_clusters, len(vectors))
        
        kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
        kmeans.fit(vectors)
        
        self.centroids = kmeans.cluster_centers_
        self.inverted_lists = {i: [] for i in range(n_clusters)}
        self.trained = True
    
    def add(self, id: str, vector: np.ndarray):
        """Add vector to index."""
        
        if not self.trained:
            raise ValueError("Index must be trained before adding vectors")
        
        # Find nearest centroid
        distances = np.linalg.norm(self.centroids - vector, axis=1)
        cluster_id = int(np.argmin(distances))
        
        self.inverted_lists[cluster_id].append((id, vector))
    
    def add_batch(self, ids: list[str], vectors: np.ndarray):
        """Add batch of vectors."""
        
        if not self.trained:
            raise ValueError("Index must be trained before adding vectors")
        
        # Find nearest centroids for all
        for i, (id, vector) in enumerate(zip(ids, vectors)):
            distances = np.linalg.norm(self.centroids - vector, axis=1)
            cluster_id = int(np.argmin(distances))
            self.inverted_lists[cluster_id].append((id, vector))
    
    def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
        """Search using IVF."""
        
        if not self.trained:
            return []
        
        # Find n_probe nearest clusters
        distances = np.linalg.norm(self.centroids - query, axis=1)
        probe_clusters = np.argsort(distances)[:self.config.n_probe]
        
        # Search within selected clusters
        candidates = []
        
        for cluster_id in probe_clusters:
            for id, vector in self.inverted_lists[cluster_id]:
                # Cosine similarity
                similarity = np.dot(query, vector) / (
                    np.linalg.norm(query) * np.linalg.norm(vector)
                )
                candidates.append((similarity, id, vector))
        
        # Sort and return top k
        candidates.sort(reverse=True)
        
        results = []
        for score, id, vector in candidates[:k]:
            results.append(SearchResult(
                id=id,
                score=score,
                vector=vector,
                metadata={}
            ))
        
        return results

class IVFPQIndex:
    """IVF with Product Quantization for memory efficiency."""
    
    def __init__(
        self,
        n_clusters: int = 100,
        n_subvectors: int = 8,
        n_bits: int = 8
    ):
        self.n_clusters = n_clusters
        self.n_subvectors = n_subvectors
        self.n_bits = n_bits
        self.n_centroids_pq = 2 ** n_bits
        
        self.coarse_centroids: np.ndarray = None
        self.pq_centroids: np.ndarray = None  # Shape: (n_subvectors, n_centroids_pq, subvector_dim)
        self.inverted_lists: dict[int, list[Tuple[str, np.ndarray]]] = {}
        
        self.trained = False
        self.dim = None
        self.subvector_dim = None
    
    def train(self, vectors: np.ndarray):
        """Train coarse quantizer and PQ codebooks."""
        
        self.dim = vectors.shape[1]
        self.subvector_dim = self.dim // self.n_subvectors
        
        # Train coarse quantizer
        n_clusters = min(self.n_clusters, len(vectors))
        kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
        kmeans.fit(vectors)
        self.coarse_centroids = kmeans.cluster_centers_
        
        # Compute residuals
        assignments = kmeans.predict(vectors)
        residuals = vectors - self.coarse_centroids[assignments]
        
        # Train PQ codebooks
        self.pq_centroids = np.zeros((
            self.n_subvectors,
            self.n_centroids_pq,
            self.subvector_dim
        ))
        
        for i in range(self.n_subvectors):
            start = i * self.subvector_dim
            end = start + self.subvector_dim
            
            subvectors = residuals[:, start:end]
            
            pq_kmeans = KMeans(
                n_clusters=self.n_centroids_pq,
                random_state=42,
                n_init=10
            )
            pq_kmeans.fit(subvectors)
            self.pq_centroids[i] = pq_kmeans.cluster_centers_
        
        self.inverted_lists = {i: [] for i in range(n_clusters)}
        self.trained = True
    
    def _encode_pq(self, residual: np.ndarray) -> np.ndarray:
        """Encode residual using PQ."""
        
        codes = np.zeros(self.n_subvectors, dtype=np.uint8)
        
        for i in range(self.n_subvectors):
            start = i * self.subvector_dim
            end = start + self.subvector_dim
            
            subvector = residual[start:end]
            distances = np.linalg.norm(
                self.pq_centroids[i] - subvector, axis=1
            )
            codes[i] = np.argmin(distances)
        
        return codes
    
    def add(self, id: str, vector: np.ndarray):
        """Add vector with PQ encoding."""
        
        if not self.trained:
            raise ValueError("Index must be trained first")
        
        # Find coarse cluster
        distances = np.linalg.norm(self.coarse_centroids - vector, axis=1)
        cluster_id = int(np.argmin(distances))
        
        # Compute and encode residual
        residual = vector - self.coarse_centroids[cluster_id]
        codes = self._encode_pq(residual)
        
        self.inverted_lists[cluster_id].append((id, codes))
    
    def search(self, query: np.ndarray, k: int = 10, n_probe: int = 10) -> list[SearchResult]:
        """Search using IVF-PQ."""
        
        if not self.trained:
            return []
        
        # Find probe clusters
        distances = np.linalg.norm(self.coarse_centroids - query, axis=1)
        probe_clusters = np.argsort(distances)[:n_probe]
        
        candidates = []
        
        for cluster_id in probe_clusters:
            # Compute query residual for this cluster
            residual = query - self.coarse_centroids[cluster_id]
            
            # Precompute distance tables
            distance_tables = np.zeros((self.n_subvectors, self.n_centroids_pq))
            
            for i in range(self.n_subvectors):
                start = i * self.subvector_dim
                end = start + self.subvector_dim
                
                subvector = residual[start:end]
                distance_tables[i] = np.linalg.norm(
                    self.pq_centroids[i] - subvector, axis=1
                )
            
            # Score all vectors in cluster
            for id, codes in self.inverted_lists[cluster_id]:
                # Asymmetric distance computation
                dist = sum(distance_tables[i, codes[i]] for i in range(self.n_subvectors))
                candidates.append((dist, id))
        
        # Sort and return top k
        candidates.sort()
        
        results = []
        for dist, id in candidates[:k]:
            results.append(SearchResult(
                id=id,
                score=-dist,  # Convert to similarity-like score
                vector=None,  # PQ doesn't store original vectors
                metadata={}
            ))
        
        return results

LSH and Tree-Based Methods

from dataclasses import dataclass
from typing import List, Set, Tuple
import numpy as np
from collections import defaultdict

@dataclass
class LSHConfig:
    """LSH configuration."""
    
    n_tables: int = 10
    n_bits: int = 16

class LSHIndex:
    """Locality Sensitive Hashing for approximate search."""
    
    def __init__(self, dim: int, config: LSHConfig = None):
        self.dim = dim
        self.config = config or LSHConfig()
        
        # Random hyperplanes for each table
        self.hyperplanes = [
            np.random.randn(self.config.n_bits, dim)
            for _ in range(self.config.n_tables)
        ]
        
        # Hash tables
        self.tables: list[dict[int, list[Tuple[str, np.ndarray]]]] = [
            defaultdict(list) for _ in range(self.config.n_tables)
        ]
    
    def _hash(self, vector: np.ndarray, table_idx: int) -> int:
        """Compute hash for vector."""
        
        projections = np.dot(self.hyperplanes[table_idx], vector)
        bits = (projections > 0).astype(int)
        
        # Convert bits to integer
        hash_value = 0
        for bit in bits:
            hash_value = (hash_value << 1) | bit
        
        return hash_value
    
    def add(self, id: str, vector: np.ndarray):
        """Add vector to all hash tables."""
        
        for i in range(self.config.n_tables):
            hash_value = self._hash(vector, i)
            self.tables[i][hash_value].append((id, vector))
    
    def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
        """Search using LSH."""
        
        candidates = set()
        candidate_vectors = {}
        
        # Collect candidates from all tables
        for i in range(self.config.n_tables):
            hash_value = self._hash(query, i)
            
            for id, vector in self.tables[i][hash_value]:
                candidates.add(id)
                candidate_vectors[id] = vector
        
        # Rank candidates by actual similarity
        scored = []
        for id in candidates:
            vector = candidate_vectors[id]
            similarity = np.dot(query, vector) / (
                np.linalg.norm(query) * np.linalg.norm(vector)
            )
            scored.append((similarity, id, vector))
        
        scored.sort(reverse=True)
        
        results = []
        for score, id, vector in scored[:k]:
            results.append(SearchResult(
                id=id,
                score=score,
                vector=vector,
                metadata={}
            ))
        
        return results

class BallTree:
    """Ball tree for exact nearest neighbor search."""
    
    def __init__(self, leaf_size: int = 40):
        self.leaf_size = leaf_size
        self.root = None
    
    def build(self, ids: list[str], vectors: np.ndarray):
        """Build ball tree."""
        
        indices = list(range(len(ids)))
        self.ids = ids
        self.vectors = vectors
        
        self.root = self._build_node(indices)
    
    def _build_node(self, indices: list[int]) -> dict:
        """Recursively build tree node."""
        
        if len(indices) <= self.leaf_size:
            return {"type": "leaf", "indices": indices}
        
        # Compute centroid and radius
        vectors = self.vectors[indices]
        centroid = np.mean(vectors, axis=0)
        
        distances = np.linalg.norm(vectors - centroid, axis=1)
        radius = np.max(distances)
        
        # Split along dimension with highest variance
        variances = np.var(vectors, axis=0)
        split_dim = np.argmax(variances)
        
        # Partition
        values = vectors[:, split_dim]
        median = np.median(values)
        
        left_indices = [i for i in indices if self.vectors[i, split_dim] <= median]
        right_indices = [i for i in indices if self.vectors[i, split_dim] > median]
        
        # Handle edge case
        if not left_indices or not right_indices:
            mid = len(indices) // 2
            left_indices = indices[:mid]
            right_indices = indices[mid:]
        
        return {
            "type": "internal",
            "centroid": centroid,
            "radius": radius,
            "left": self._build_node(left_indices),
            "right": self._build_node(right_indices)
        }
    
    def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
        """Search ball tree."""
        
        # Priority queue: (negative_distance, node)
        heap = [(0, self.root)]
        results = []
        
        while heap and len(results) < k * 2:
            _, node = heapq.heappop(heap)
            
            if node["type"] == "leaf":
                for idx in node["indices"]:
                    vector = self.vectors[idx]
                    dist = np.linalg.norm(query - vector)
                    results.append((dist, self.ids[idx], vector))
            else:
                # Check if we can prune
                dist_to_centroid = np.linalg.norm(query - node["centroid"])
                
                # Add children to heap
                heapq.heappush(heap, (dist_to_centroid, node["left"]))
                heapq.heappush(heap, (dist_to_centroid, node["right"]))
        
        # Sort and return top k
        results.sort()
        
        search_results = []
        seen = set()
        
        for dist, id, vector in results:
            if id not in seen:
                seen.add(id)
                search_results.append(SearchResult(
                    id=id,
                    score=-dist,
                    vector=vector,
                    metadata={}
                ))
            
            if len(search_results) >= k:
                break
        
        return search_results

class KDTree:
    """KD-tree for low-dimensional nearest neighbor search."""
    
    def __init__(self, leaf_size: int = 40):
        self.leaf_size = leaf_size
        self.root = None
        self.dim = None
    
    def build(self, ids: list[str], vectors: np.ndarray):
        """Build KD-tree."""
        
        self.ids = ids
        self.vectors = vectors
        self.dim = vectors.shape[1]
        
        indices = list(range(len(ids)))
        self.root = self._build_node(indices, 0)
    
    def _build_node(self, indices: list[int], depth: int) -> dict:
        """Recursively build tree node."""
        
        if len(indices) <= self.leaf_size:
            return {"type": "leaf", "indices": indices}
        
        # Split dimension cycles through all dimensions
        split_dim = depth % self.dim
        
        # Sort by split dimension
        sorted_indices = sorted(
            indices,
            key=lambda i: self.vectors[i, split_dim]
        )
        
        mid = len(sorted_indices) // 2
        
        return {
            "type": "internal",
            "split_dim": split_dim,
            "split_value": self.vectors[sorted_indices[mid], split_dim],
            "left": self._build_node(sorted_indices[:mid], depth + 1),
            "right": self._build_node(sorted_indices[mid:], depth + 1)
        }
    
    def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
        """Search KD-tree."""
        
        results = []
        self._search_node(self.root, query, k, results)
        
        results.sort()
        
        search_results = []
        for dist, id, vector in results[:k]:
            search_results.append(SearchResult(
                id=id,
                score=-dist,
                vector=vector,
                metadata={}
            ))
        
        return search_results
    
    def _search_node(
        self,
        node: dict,
        query: np.ndarray,
        k: int,
        results: list
    ):
        """Recursively search node."""
        
        if node["type"] == "leaf":
            for idx in node["indices"]:
                vector = self.vectors[idx]
                dist = np.linalg.norm(query - vector)
                results.append((dist, self.ids[idx], vector))
            return
        
        split_dim = node["split_dim"]
        split_value = node["split_value"]
        
        # Decide which subtree to search first
        if query[split_dim] <= split_value:
            first, second = node["left"], node["right"]
        else:
            first, second = node["right"], node["left"]
        
        # Search first subtree
        self._search_node(first, query, k, results)
        
        # Check if we need to search second subtree
        dist_to_split = abs(query[split_dim] - split_value)
        
        if len(results) < k or dist_to_split < results[-1][0]:
            self._search_node(second, query, k, results)

Production Search Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import numpy as np
import time

app = FastAPI()

class AddRequest(BaseModel):
    id: str
    vector: list[float]
    metadata: Optional[dict] = None

class SearchRequest(BaseModel):
    vector: list[float]
    k: int = 10
    filter: Optional[dict] = None

class SearchResponse(BaseModel):
    results: list[dict]
    latency_ms: float

class IndexStats(BaseModel):
    size: int
    algorithm: str
    memory_mb: float

# Mock index for demo
class MockIndex:
    def __init__(self):
        self.data = {}
    
    def add(self, id, vector, metadata=None):
        self.data[id] = {"vector": vector, "metadata": metadata}
    
    def search(self, vector, k=10):
        results = []
        for id, item in self.data.items():
            score = np.dot(vector, item["vector"])
            results.append({"id": id, "score": score, "metadata": item["metadata"]})
        results.sort(key=lambda x: x["score"], reverse=True)
        return results[:k]
    
    def __len__(self):
        return len(self.data)

index = MockIndex()

@app.post("/v1/vectors/add")
async def add_vector(request: AddRequest) -> dict:
    """Add vector to index."""
    
    vector = np.array(request.vector)
    index.add(request.id, vector, request.metadata)
    
    return {"status": "added", "id": request.id}

@app.post("/v1/vectors/search")
async def search_vectors(request: SearchRequest) -> SearchResponse:
    """Search for similar vectors."""
    
    start = time.time()
    
    vector = np.array(request.vector)
    results = index.search(vector, request.k)
    
    latency = (time.time() - start) * 1000
    
    return SearchResponse(
        results=results,
        latency_ms=latency
    )

@app.get("/v1/index/stats")
async def get_stats() -> IndexStats:
    """Get index statistics."""
    
    return IndexStats(
        size=len(index),
        algorithm="brute_force",
        memory_mb=0.0
    )

@app.delete("/v1/vectors/{vector_id}")
async def delete_vector(vector_id: str) -> dict:
    """Delete vector from index."""
    
    if vector_id in index.data:
        del index.data[vector_id]
        return {"status": "deleted"}
    
    raise HTTPException(status_code=404, detail="Vector not found")

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Vector search algorithm selection depends on your specific requirements for accuracy, speed, memory, and scale. For small datasets under 100K vectors, brute force with matrix operations is often fast enough and guarantees exact results. HNSW is the go-to choice for most production systems—it offers excellent recall (95%+) with sub-millisecond latency and handles dynamic insertions well. IVF is better when you need to control the accuracy-speed tradeoff precisely and have a stable dataset that allows periodic retraining. Product Quantization dramatically reduces memory usage (8-16x compression) at the cost of some accuracy, making it essential for billion-scale deployments. LSH provides theoretical guarantees but is often outperformed by HNSW in practice. Tree-based methods like KD-trees work well for low-dimensional data but suffer from the curse of dimensionality for typical embedding dimensions (768-1536). The key insight is that approximate methods are not just acceptable but preferred—the small accuracy loss is worth the massive speed and memory gains. Start with HNSW using default parameters (M=16, ef_construction=200), measure your recall on a held-out test set, and tune from there based on your latency and accuracy requirements.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion