Introduction: Vector search is the foundation of modern semantic retrieval systems, enabling applications to find similar items based on meaning rather than exact keyword matches. Understanding the algorithms behind vector search—from brute-force linear scan to sophisticated approximate nearest neighbor (ANN) methods—is essential for building efficient retrieval systems. This guide covers the core algorithms that power vector databases: HNSW (Hierarchical Navigable Small World), IVF (Inverted File Index), Product Quantization, LSH (Locality Sensitive Hashing), and tree-based methods. Each algorithm makes different tradeoffs between search accuracy, speed, memory usage, and index build time. Whether you’re building a recommendation system, semantic search engine, or RAG application, these patterns will help you choose and implement the right search algorithm for your use case.

Brute Force and Distance Metrics
from dataclasses import dataclass
from typing import List, Tuple, Callable
import numpy as np
from enum import Enum
class DistanceMetric(Enum):
"""Supported distance metrics."""
COSINE = "cosine"
EUCLIDEAN = "euclidean"
DOT_PRODUCT = "dot_product"
MANHATTAN = "manhattan"
@dataclass
class SearchResult:
"""A search result with score and metadata."""
id: str
score: float
vector: np.ndarray
metadata: dict = None
class DistanceCalculator:
"""Calculate distances between vectors."""
@staticmethod
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity (higher is more similar)."""
dot = np.dot(a, b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
@staticmethod
def euclidean_distance(a: np.ndarray, b: np.ndarray) -> float:
"""Euclidean distance (lower is more similar)."""
return np.linalg.norm(a - b)
@staticmethod
def dot_product(a: np.ndarray, b: np.ndarray) -> float:
"""Dot product (higher is more similar for normalized vectors)."""
return np.dot(a, b)
@staticmethod
def manhattan_distance(a: np.ndarray, b: np.ndarray) -> float:
"""Manhattan distance (lower is more similar)."""
return np.sum(np.abs(a - b))
@classmethod
def get_metric(cls, metric: DistanceMetric) -> Callable:
"""Get distance function for metric."""
metrics = {
DistanceMetric.COSINE: cls.cosine_similarity,
DistanceMetric.EUCLIDEAN: cls.euclidean_distance,
DistanceMetric.DOT_PRODUCT: cls.dot_product,
DistanceMetric.MANHATTAN: cls.manhattan_distance
}
return metrics[metric]
class BruteForceIndex:
"""Exact nearest neighbor search (brute force)."""
def __init__(self, metric: DistanceMetric = DistanceMetric.COSINE):
self.metric = metric
self.distance_fn = DistanceCalculator.get_metric(metric)
self.vectors: list[np.ndarray] = []
self.ids: list[str] = []
self.metadata: list[dict] = []
def add(self, id: str, vector: np.ndarray, metadata: dict = None):
"""Add vector to index."""
self.ids.append(id)
self.vectors.append(vector)
self.metadata.append(metadata or {})
def add_batch(
self,
ids: list[str],
vectors: list[np.ndarray],
metadata: list[dict] = None
):
"""Add batch of vectors."""
metadata = metadata or [{}] * len(ids)
for id, vector, meta in zip(ids, vectors, metadata):
self.add(id, vector, meta)
def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
"""Search for k nearest neighbors."""
if not self.vectors:
return []
# Calculate all distances
scores = []
for i, vector in enumerate(self.vectors):
score = self.distance_fn(query, vector)
scores.append((i, score))
# Sort by score
if self.metric in [DistanceMetric.COSINE, DistanceMetric.DOT_PRODUCT]:
# Higher is better
scores.sort(key=lambda x: x[1], reverse=True)
else:
# Lower is better
scores.sort(key=lambda x: x[1])
# Return top k
results = []
for i, score in scores[:k]:
results.append(SearchResult(
id=self.ids[i],
score=score,
vector=self.vectors[i],
metadata=self.metadata[i]
))
return results
def __len__(self) -> int:
return len(self.vectors)
class OptimizedBruteForce:
"""Optimized brute force using matrix operations."""
def __init__(self, metric: DistanceMetric = DistanceMetric.COSINE):
self.metric = metric
self.vectors: np.ndarray = None
self.ids: list[str] = []
self.metadata: list[dict] = []
self.normalized = False
def build(
self,
ids: list[str],
vectors: np.ndarray,
metadata: list[dict] = None
):
"""Build index from vectors."""
self.ids = ids
self.vectors = vectors.astype(np.float32)
self.metadata = metadata or [{}] * len(ids)
# Normalize for cosine similarity
if self.metric == DistanceMetric.COSINE:
norms = np.linalg.norm(self.vectors, axis=1, keepdims=True)
self.vectors = self.vectors / np.maximum(norms, 1e-10)
self.normalized = True
def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
"""Search using matrix multiplication."""
if self.vectors is None:
return []
query = query.astype(np.float32)
if self.normalized:
query = query / np.maximum(np.linalg.norm(query), 1e-10)
# Matrix multiplication for all similarities
if self.metric in [DistanceMetric.COSINE, DistanceMetric.DOT_PRODUCT]:
scores = np.dot(self.vectors, query)
top_indices = np.argsort(scores)[::-1][:k]
else:
# Euclidean distance
diff = self.vectors - query
scores = np.linalg.norm(diff, axis=1)
top_indices = np.argsort(scores)[:k]
results = []
for i in top_indices:
results.append(SearchResult(
id=self.ids[i],
score=float(scores[i]),
vector=self.vectors[i],
metadata=self.metadata[i]
))
return results
HNSW Algorithm
from dataclasses import dataclass, field
from typing import Set, Dict
import numpy as np
import heapq
import random
@dataclass
class HNSWConfig:
"""HNSW configuration."""
M: int = 16 # Max connections per layer
ef_construction: int = 200 # Search width during construction
ef_search: int = 50 # Search width during query
ml: float = 1 / np.log(16) # Level multiplier
class HNSWNode:
"""A node in the HNSW graph."""
def __init__(self, id: str, vector: np.ndarray, level: int):
self.id = id
self.vector = vector
self.level = level
# Connections at each level
self.connections: dict[int, set[str]] = {
l: set() for l in range(level + 1)
}
class HNSWIndex:
"""Hierarchical Navigable Small World index."""
def __init__(self, config: HNSWConfig = None):
self.config = config or HNSWConfig()
self.nodes: dict[str, HNSWNode] = {}
self.entry_point: str = None
self.max_level: int = -1
def _random_level(self) -> int:
"""Generate random level for new node."""
level = 0
while random.random() < self.config.ml and level < 32:
level += 1
return level
def _distance(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculate distance (using negative cosine for min-heap)."""
dot = np.dot(a, b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return float('inf')
# Return negative similarity for min-heap
return -dot / (norm_a * norm_b)
def _search_layer(
self,
query: np.ndarray,
entry_points: set[str],
ef: int,
level: int
) -> list[Tuple[float, str]]:
"""Search a single layer."""
visited = set(entry_points)
candidates = []
results = []
# Initialize with entry points
for ep in entry_points:
dist = self._distance(query, self.nodes[ep].vector)
heapq.heappush(candidates, (dist, ep))
heapq.heappush(results, (-dist, ep))
while candidates:
dist, current = heapq.heappop(candidates)
# Check if we can stop
if results and -results[0][0] < dist:
break
# Explore neighbors
node = self.nodes[current]
if level in node.connections:
for neighbor_id in node.connections[level]:
if neighbor_id not in visited:
visited.add(neighbor_id)
neighbor = self.nodes[neighbor_id]
neighbor_dist = self._distance(query, neighbor.vector)
if len(results) < ef or neighbor_dist < -results[0][0]:
heapq.heappush(candidates, (neighbor_dist, neighbor_id))
heapq.heappush(results, (-neighbor_dist, neighbor_id))
if len(results) > ef:
heapq.heappop(results)
# Convert to list sorted by distance
return [(−score, id) for score, id in sorted(results, reverse=True)]
def _select_neighbors(
self,
query: np.ndarray,
candidates: list[Tuple[float, str]],
M: int
) -> list[str]:
"""Select M best neighbors using heuristic."""
# Simple: just take M closest
return [id for _, id in candidates[:M]]
def add(self, id: str, vector: np.ndarray):
"""Add vector to index."""
level = self._random_level()
node = HNSWNode(id, vector, level)
self.nodes[id] = node
if self.entry_point is None:
self.entry_point = id
self.max_level = level
return
# Find entry point for insertion
current_ep = {self.entry_point}
# Traverse from top to insertion level
for l in range(self.max_level, level, -1):
results = self._search_layer(vector, current_ep, 1, l)
if results:
current_ep = {results[0][1]}
# Insert at each level
for l in range(min(level, self.max_level), -1, -1):
results = self._search_layer(
vector, current_ep, self.config.ef_construction, l
)
neighbors = self._select_neighbors(
vector, results, self.config.M
)
# Add bidirectional connections
for neighbor_id in neighbors:
node.connections[l].add(neighbor_id)
self.nodes[neighbor_id].connections[l].add(id)
# Prune if too many connections
if len(self.nodes[neighbor_id].connections[l]) > self.config.M:
# Keep M closest
neighbor = self.nodes[neighbor_id]
dists = [
(self._distance(neighbor.vector, self.nodes[n].vector), n)
for n in neighbor.connections[l]
]
dists.sort()
neighbor.connections[l] = set(n for _, n in dists[:self.config.M])
current_ep = set(neighbors) if neighbors else current_ep
# Update entry point if needed
if level > self.max_level:
self.entry_point = id
self.max_level = level
def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
"""Search for k nearest neighbors."""
if not self.nodes:
return []
current_ep = {self.entry_point}
# Traverse from top to bottom
for l in range(self.max_level, 0, -1):
results = self._search_layer(query, current_ep, 1, l)
if results:
current_ep = {results[0][1]}
# Search bottom layer
results = self._search_layer(
query, current_ep, self.config.ef_search, 0
)
# Convert to SearchResult
search_results = []
for dist, id in results[:k]:
node = self.nodes[id]
search_results.append(SearchResult(
id=id,
score=-dist, # Convert back to similarity
vector=node.vector,
metadata={}
))
return search_results
IVF Index
from dataclasses import dataclass
from typing import List, Tuple
import numpy as np
from sklearn.cluster import KMeans
@dataclass
class IVFConfig:
"""IVF configuration."""
n_clusters: int = 100
n_probe: int = 10 # Number of clusters to search
class IVFIndex:
"""Inverted File Index for approximate search."""
def __init__(self, config: IVFConfig = None):
self.config = config or IVFConfig()
self.centroids: np.ndarray = None
self.inverted_lists: dict[int, list[Tuple[str, np.ndarray]]] = {}
self.trained = False
def train(self, vectors: np.ndarray):
"""Train cluster centroids."""
n_clusters = min(self.config.n_clusters, len(vectors))
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
kmeans.fit(vectors)
self.centroids = kmeans.cluster_centers_
self.inverted_lists = {i: [] for i in range(n_clusters)}
self.trained = True
def add(self, id: str, vector: np.ndarray):
"""Add vector to index."""
if not self.trained:
raise ValueError("Index must be trained before adding vectors")
# Find nearest centroid
distances = np.linalg.norm(self.centroids - vector, axis=1)
cluster_id = int(np.argmin(distances))
self.inverted_lists[cluster_id].append((id, vector))
def add_batch(self, ids: list[str], vectors: np.ndarray):
"""Add batch of vectors."""
if not self.trained:
raise ValueError("Index must be trained before adding vectors")
# Find nearest centroids for all
for i, (id, vector) in enumerate(zip(ids, vectors)):
distances = np.linalg.norm(self.centroids - vector, axis=1)
cluster_id = int(np.argmin(distances))
self.inverted_lists[cluster_id].append((id, vector))
def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
"""Search using IVF."""
if not self.trained:
return []
# Find n_probe nearest clusters
distances = np.linalg.norm(self.centroids - query, axis=1)
probe_clusters = np.argsort(distances)[:self.config.n_probe]
# Search within selected clusters
candidates = []
for cluster_id in probe_clusters:
for id, vector in self.inverted_lists[cluster_id]:
# Cosine similarity
similarity = np.dot(query, vector) / (
np.linalg.norm(query) * np.linalg.norm(vector)
)
candidates.append((similarity, id, vector))
# Sort and return top k
candidates.sort(reverse=True)
results = []
for score, id, vector in candidates[:k]:
results.append(SearchResult(
id=id,
score=score,
vector=vector,
metadata={}
))
return results
class IVFPQIndex:
"""IVF with Product Quantization for memory efficiency."""
def __init__(
self,
n_clusters: int = 100,
n_subvectors: int = 8,
n_bits: int = 8
):
self.n_clusters = n_clusters
self.n_subvectors = n_subvectors
self.n_bits = n_bits
self.n_centroids_pq = 2 ** n_bits
self.coarse_centroids: np.ndarray = None
self.pq_centroids: np.ndarray = None # Shape: (n_subvectors, n_centroids_pq, subvector_dim)
self.inverted_lists: dict[int, list[Tuple[str, np.ndarray]]] = {}
self.trained = False
self.dim = None
self.subvector_dim = None
def train(self, vectors: np.ndarray):
"""Train coarse quantizer and PQ codebooks."""
self.dim = vectors.shape[1]
self.subvector_dim = self.dim // self.n_subvectors
# Train coarse quantizer
n_clusters = min(self.n_clusters, len(vectors))
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
kmeans.fit(vectors)
self.coarse_centroids = kmeans.cluster_centers_
# Compute residuals
assignments = kmeans.predict(vectors)
residuals = vectors - self.coarse_centroids[assignments]
# Train PQ codebooks
self.pq_centroids = np.zeros((
self.n_subvectors,
self.n_centroids_pq,
self.subvector_dim
))
for i in range(self.n_subvectors):
start = i * self.subvector_dim
end = start + self.subvector_dim
subvectors = residuals[:, start:end]
pq_kmeans = KMeans(
n_clusters=self.n_centroids_pq,
random_state=42,
n_init=10
)
pq_kmeans.fit(subvectors)
self.pq_centroids[i] = pq_kmeans.cluster_centers_
self.inverted_lists = {i: [] for i in range(n_clusters)}
self.trained = True
def _encode_pq(self, residual: np.ndarray) -> np.ndarray:
"""Encode residual using PQ."""
codes = np.zeros(self.n_subvectors, dtype=np.uint8)
for i in range(self.n_subvectors):
start = i * self.subvector_dim
end = start + self.subvector_dim
subvector = residual[start:end]
distances = np.linalg.norm(
self.pq_centroids[i] - subvector, axis=1
)
codes[i] = np.argmin(distances)
return codes
def add(self, id: str, vector: np.ndarray):
"""Add vector with PQ encoding."""
if not self.trained:
raise ValueError("Index must be trained first")
# Find coarse cluster
distances = np.linalg.norm(self.coarse_centroids - vector, axis=1)
cluster_id = int(np.argmin(distances))
# Compute and encode residual
residual = vector - self.coarse_centroids[cluster_id]
codes = self._encode_pq(residual)
self.inverted_lists[cluster_id].append((id, codes))
def search(self, query: np.ndarray, k: int = 10, n_probe: int = 10) -> list[SearchResult]:
"""Search using IVF-PQ."""
if not self.trained:
return []
# Find probe clusters
distances = np.linalg.norm(self.coarse_centroids - query, axis=1)
probe_clusters = np.argsort(distances)[:n_probe]
candidates = []
for cluster_id in probe_clusters:
# Compute query residual for this cluster
residual = query - self.coarse_centroids[cluster_id]
# Precompute distance tables
distance_tables = np.zeros((self.n_subvectors, self.n_centroids_pq))
for i in range(self.n_subvectors):
start = i * self.subvector_dim
end = start + self.subvector_dim
subvector = residual[start:end]
distance_tables[i] = np.linalg.norm(
self.pq_centroids[i] - subvector, axis=1
)
# Score all vectors in cluster
for id, codes in self.inverted_lists[cluster_id]:
# Asymmetric distance computation
dist = sum(distance_tables[i, codes[i]] for i in range(self.n_subvectors))
candidates.append((dist, id))
# Sort and return top k
candidates.sort()
results = []
for dist, id in candidates[:k]:
results.append(SearchResult(
id=id,
score=-dist, # Convert to similarity-like score
vector=None, # PQ doesn't store original vectors
metadata={}
))
return results
LSH and Tree-Based Methods
from dataclasses import dataclass
from typing import List, Set, Tuple
import numpy as np
from collections import defaultdict
@dataclass
class LSHConfig:
"""LSH configuration."""
n_tables: int = 10
n_bits: int = 16
class LSHIndex:
"""Locality Sensitive Hashing for approximate search."""
def __init__(self, dim: int, config: LSHConfig = None):
self.dim = dim
self.config = config or LSHConfig()
# Random hyperplanes for each table
self.hyperplanes = [
np.random.randn(self.config.n_bits, dim)
for _ in range(self.config.n_tables)
]
# Hash tables
self.tables: list[dict[int, list[Tuple[str, np.ndarray]]]] = [
defaultdict(list) for _ in range(self.config.n_tables)
]
def _hash(self, vector: np.ndarray, table_idx: int) -> int:
"""Compute hash for vector."""
projections = np.dot(self.hyperplanes[table_idx], vector)
bits = (projections > 0).astype(int)
# Convert bits to integer
hash_value = 0
for bit in bits:
hash_value = (hash_value << 1) | bit
return hash_value
def add(self, id: str, vector: np.ndarray):
"""Add vector to all hash tables."""
for i in range(self.config.n_tables):
hash_value = self._hash(vector, i)
self.tables[i][hash_value].append((id, vector))
def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
"""Search using LSH."""
candidates = set()
candidate_vectors = {}
# Collect candidates from all tables
for i in range(self.config.n_tables):
hash_value = self._hash(query, i)
for id, vector in self.tables[i][hash_value]:
candidates.add(id)
candidate_vectors[id] = vector
# Rank candidates by actual similarity
scored = []
for id in candidates:
vector = candidate_vectors[id]
similarity = np.dot(query, vector) / (
np.linalg.norm(query) * np.linalg.norm(vector)
)
scored.append((similarity, id, vector))
scored.sort(reverse=True)
results = []
for score, id, vector in scored[:k]:
results.append(SearchResult(
id=id,
score=score,
vector=vector,
metadata={}
))
return results
class BallTree:
"""Ball tree for exact nearest neighbor search."""
def __init__(self, leaf_size: int = 40):
self.leaf_size = leaf_size
self.root = None
def build(self, ids: list[str], vectors: np.ndarray):
"""Build ball tree."""
indices = list(range(len(ids)))
self.ids = ids
self.vectors = vectors
self.root = self._build_node(indices)
def _build_node(self, indices: list[int]) -> dict:
"""Recursively build tree node."""
if len(indices) <= self.leaf_size:
return {"type": "leaf", "indices": indices}
# Compute centroid and radius
vectors = self.vectors[indices]
centroid = np.mean(vectors, axis=0)
distances = np.linalg.norm(vectors - centroid, axis=1)
radius = np.max(distances)
# Split along dimension with highest variance
variances = np.var(vectors, axis=0)
split_dim = np.argmax(variances)
# Partition
values = vectors[:, split_dim]
median = np.median(values)
left_indices = [i for i in indices if self.vectors[i, split_dim] <= median]
right_indices = [i for i in indices if self.vectors[i, split_dim] > median]
# Handle edge case
if not left_indices or not right_indices:
mid = len(indices) // 2
left_indices = indices[:mid]
right_indices = indices[mid:]
return {
"type": "internal",
"centroid": centroid,
"radius": radius,
"left": self._build_node(left_indices),
"right": self._build_node(right_indices)
}
def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
"""Search ball tree."""
# Priority queue: (negative_distance, node)
heap = [(0, self.root)]
results = []
while heap and len(results) < k * 2:
_, node = heapq.heappop(heap)
if node["type"] == "leaf":
for idx in node["indices"]:
vector = self.vectors[idx]
dist = np.linalg.norm(query - vector)
results.append((dist, self.ids[idx], vector))
else:
# Check if we can prune
dist_to_centroid = np.linalg.norm(query - node["centroid"])
# Add children to heap
heapq.heappush(heap, (dist_to_centroid, node["left"]))
heapq.heappush(heap, (dist_to_centroid, node["right"]))
# Sort and return top k
results.sort()
search_results = []
seen = set()
for dist, id, vector in results:
if id not in seen:
seen.add(id)
search_results.append(SearchResult(
id=id,
score=-dist,
vector=vector,
metadata={}
))
if len(search_results) >= k:
break
return search_results
class KDTree:
"""KD-tree for low-dimensional nearest neighbor search."""
def __init__(self, leaf_size: int = 40):
self.leaf_size = leaf_size
self.root = None
self.dim = None
def build(self, ids: list[str], vectors: np.ndarray):
"""Build KD-tree."""
self.ids = ids
self.vectors = vectors
self.dim = vectors.shape[1]
indices = list(range(len(ids)))
self.root = self._build_node(indices, 0)
def _build_node(self, indices: list[int], depth: int) -> dict:
"""Recursively build tree node."""
if len(indices) <= self.leaf_size:
return {"type": "leaf", "indices": indices}
# Split dimension cycles through all dimensions
split_dim = depth % self.dim
# Sort by split dimension
sorted_indices = sorted(
indices,
key=lambda i: self.vectors[i, split_dim]
)
mid = len(sorted_indices) // 2
return {
"type": "internal",
"split_dim": split_dim,
"split_value": self.vectors[sorted_indices[mid], split_dim],
"left": self._build_node(sorted_indices[:mid], depth + 1),
"right": self._build_node(sorted_indices[mid:], depth + 1)
}
def search(self, query: np.ndarray, k: int = 10) -> list[SearchResult]:
"""Search KD-tree."""
results = []
self._search_node(self.root, query, k, results)
results.sort()
search_results = []
for dist, id, vector in results[:k]:
search_results.append(SearchResult(
id=id,
score=-dist,
vector=vector,
metadata={}
))
return search_results
def _search_node(
self,
node: dict,
query: np.ndarray,
k: int,
results: list
):
"""Recursively search node."""
if node["type"] == "leaf":
for idx in node["indices"]:
vector = self.vectors[idx]
dist = np.linalg.norm(query - vector)
results.append((dist, self.ids[idx], vector))
return
split_dim = node["split_dim"]
split_value = node["split_value"]
# Decide which subtree to search first
if query[split_dim] <= split_value:
first, second = node["left"], node["right"]
else:
first, second = node["right"], node["left"]
# Search first subtree
self._search_node(first, query, k, results)
# Check if we need to search second subtree
dist_to_split = abs(query[split_dim] - split_value)
if len(results) < k or dist_to_split < results[-1][0]:
self._search_node(second, query, k, results)
Production Search Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import numpy as np
import time
app = FastAPI()
class AddRequest(BaseModel):
id: str
vector: list[float]
metadata: Optional[dict] = None
class SearchRequest(BaseModel):
vector: list[float]
k: int = 10
filter: Optional[dict] = None
class SearchResponse(BaseModel):
results: list[dict]
latency_ms: float
class IndexStats(BaseModel):
size: int
algorithm: str
memory_mb: float
# Mock index for demo
class MockIndex:
def __init__(self):
self.data = {}
def add(self, id, vector, metadata=None):
self.data[id] = {"vector": vector, "metadata": metadata}
def search(self, vector, k=10):
results = []
for id, item in self.data.items():
score = np.dot(vector, item["vector"])
results.append({"id": id, "score": score, "metadata": item["metadata"]})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:k]
def __len__(self):
return len(self.data)
index = MockIndex()
@app.post("/v1/vectors/add")
async def add_vector(request: AddRequest) -> dict:
"""Add vector to index."""
vector = np.array(request.vector)
index.add(request.id, vector, request.metadata)
return {"status": "added", "id": request.id}
@app.post("/v1/vectors/search")
async def search_vectors(request: SearchRequest) -> SearchResponse:
"""Search for similar vectors."""
start = time.time()
vector = np.array(request.vector)
results = index.search(vector, request.k)
latency = (time.time() - start) * 1000
return SearchResponse(
results=results,
latency_ms=latency
)
@app.get("/v1/index/stats")
async def get_stats() -> IndexStats:
"""Get index statistics."""
return IndexStats(
size=len(index),
algorithm="brute_force",
memory_mb=0.0
)
@app.delete("/v1/vectors/{vector_id}")
async def delete_vector(vector_id: str) -> dict:
"""Delete vector from index."""
if vector_id in index.data:
del index.data[vector_id]
return {"status": "deleted"}
raise HTTPException(status_code=404, detail="Vector not found")
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- HNSW Paper: https://arxiv.org/abs/1603.09320
- Faiss Library: https://github.com/facebookresearch/faiss
- Annoy Library: https://github.com/spotify/annoy
- ScaNN: https://github.com/google-research/google-research/tree/master/scann
- Product Quantization: https://lear.inrialpes.fr/pubs/2011/JDS11/jegou_searching_with_quantization.pdf
Conclusion
Vector search algorithm selection depends on your specific requirements for accuracy, speed, memory, and scale. For small datasets under 100K vectors, brute force with matrix operations is often fast enough and guarantees exact results. HNSW is the go-to choice for most production systems—it offers excellent recall (95%+) with sub-millisecond latency and handles dynamic insertions well. IVF is better when you need to control the accuracy-speed tradeoff precisely and have a stable dataset that allows periodic retraining. Product Quantization dramatically reduces memory usage (8-16x compression) at the cost of some accuracy, making it essential for billion-scale deployments. LSH provides theoretical guarantees but is often outperformed by HNSW in practice. Tree-based methods like KD-trees work well for low-dimensional data but suffer from the curse of dimensionality for typical embedding dimensions (768-1536). The key insight is that approximate methods are not just acceptable but preferred—the small accuracy loss is worth the massive speed and memory gains. Start with HNSW using default parameters (M=16, ef_construction=200), measure your recall on a held-out test set, and tune from there based on your latency and accuracy requirements.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.
