Introduction: Vector databases are the backbone of modern AI applications—powering semantic search, RAG systems, and recommendation engines. But as your vector collection grows from thousands to millions of embeddings, naive approaches break down. Query latency spikes, memory costs explode, and recall accuracy degrades. This guide covers practical optimization strategies: choosing the right index type for your access patterns, tuning HNSW parameters for the latency-recall tradeoff you need, using quantization to cut memory usage by 4-8x with minimal accuracy loss, and implementing sharding strategies for horizontal scale. Whether you’re using Pinecone, Weaviate, Qdrant, or Milvus, these techniques will help you build vector search systems that stay fast and accurate at scale.

Index Selection and Configuration
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
from abc import ABC, abstractmethod
import numpy as np
class IndexType(Enum):
"""Vector index types."""
FLAT = "flat" # Exact search, no index
IVF = "ivf" # Inverted file index
HNSW = "hnsw" # Hierarchical navigable small world
ANNOY = "annoy" # Approximate nearest neighbors
SCANN = "scann" # Scalable nearest neighbors
@dataclass
class IndexConfig:
"""Index configuration."""
index_type: IndexType
dimension: int
metric: str = "cosine" # cosine, euclidean, dot_product
# IVF parameters
nlist: int = 100 # Number of clusters
nprobe: int = 10 # Clusters to search
# HNSW parameters
m: int = 16 # Connections per node
ef_construction: int = 200 # Build-time search width
ef_search: int = 50 # Query-time search width
# Quantization
use_pq: bool = False # Product quantization
pq_segments: int = 8 # PQ segments
pq_bits: int = 8 # Bits per segment
class IndexOptimizer:
"""Optimize index configuration."""
def __init__(self):
self.benchmarks = {}
def recommend_index(
self,
num_vectors: int,
dimension: int,
qps_target: int,
recall_target: float = 0.95,
memory_budget_gb: float = None
) -> IndexConfig:
"""Recommend index configuration."""
# Small datasets: use flat index
if num_vectors < 10000:
return IndexConfig(
index_type=IndexType.FLAT,
dimension=dimension
)
# Medium datasets: IVF or HNSW
if num_vectors < 1000000:
# HNSW for high recall requirements
if recall_target > 0.95:
return self._configure_hnsw(
num_vectors, dimension, recall_target, qps_target
)
else:
return self._configure_ivf(
num_vectors, dimension, recall_target, qps_target
)
# Large datasets: need quantization
memory_per_vector = dimension * 4 # float32
total_memory = num_vectors * memory_per_vector / 1e9
if memory_budget_gb and total_memory > memory_budget_gb:
return self._configure_with_quantization(
num_vectors, dimension, memory_budget_gb, recall_target
)
return self._configure_hnsw(
num_vectors, dimension, recall_target, qps_target
)
def _configure_hnsw(
self,
num_vectors: int,
dimension: int,
recall_target: float,
qps_target: int
) -> IndexConfig:
"""Configure HNSW index."""
# M: more connections = higher recall, more memory
if recall_target > 0.99:
m = 32
elif recall_target > 0.95:
m = 16
else:
m = 8
# ef_construction: higher = better index, slower build
ef_construction = max(100, m * 10)
# ef_search: higher = better recall, slower search
if recall_target > 0.99:
ef_search = 200
elif recall_target > 0.95:
ef_search = 100
else:
ef_search = 50
return IndexConfig(
index_type=IndexType.HNSW,
dimension=dimension,
m=m,
ef_construction=ef_construction,
ef_search=ef_search
)
def _configure_ivf(
self,
num_vectors: int,
dimension: int,
recall_target: float,
qps_target: int
) -> IndexConfig:
"""Configure IVF index."""
# nlist: sqrt(n) is a good starting point
nlist = int(np.sqrt(num_vectors))
nlist = max(100, min(nlist, 4096))
# nprobe: more probes = higher recall, slower search
if recall_target > 0.95:
nprobe = max(10, nlist // 10)
else:
nprobe = max(5, nlist // 20)
return IndexConfig(
index_type=IndexType.IVF,
dimension=dimension,
nlist=nlist,
nprobe=nprobe
)
def _configure_with_quantization(
self,
num_vectors: int,
dimension: int,
memory_budget_gb: float,
recall_target: float
) -> IndexConfig:
"""Configure index with quantization."""
# Calculate required compression
raw_memory = num_vectors * dimension * 4 / 1e9
compression_ratio = raw_memory / memory_budget_gb
# PQ segments: more = better accuracy, more memory
if compression_ratio > 8:
pq_segments = dimension // 8
pq_bits = 4
elif compression_ratio > 4:
pq_segments = dimension // 4
pq_bits = 8
else:
pq_segments = dimension // 2
pq_bits = 8
config = self._configure_hnsw(
num_vectors, dimension, recall_target, 100
)
config.use_pq = True
config.pq_segments = pq_segments
config.pq_bits = pq_bits
return config
class HNSWTuner:
"""Tune HNSW parameters."""
def __init__(self, vectors: np.ndarray, queries: np.ndarray):
self.vectors = vectors
self.queries = queries
self.ground_truth = self._compute_ground_truth()
def _compute_ground_truth(self, k: int = 10) -> np.ndarray:
"""Compute exact nearest neighbors."""
from sklearn.neighbors import NearestNeighbors
nn = NearestNeighbors(n_neighbors=k, metric='cosine')
nn.fit(self.vectors)
_, indices = nn.kneighbors(self.queries)
return indices
def tune(
self,
m_values: list[int] = [8, 16, 32],
ef_values: list[int] = [50, 100, 200, 400]
) -> dict:
"""Grid search for optimal parameters."""
results = []
for m in m_values:
for ef in ef_values:
recall, latency = self._evaluate(m, ef)
results.append({
"m": m,
"ef_search": ef,
"recall": recall,
"latency_ms": latency
})
return {
"results": results,
"best_recall": max(results, key=lambda x: x["recall"]),
"best_latency": min(results, key=lambda x: x["latency_ms"])
}
def _evaluate(self, m: int, ef_search: int) -> tuple[float, float]:
"""Evaluate configuration."""
import hnswlib
import time
dim = self.vectors.shape[1]
num_vectors = self.vectors.shape[0]
# Build index
index = hnswlib.Index(space='cosine', dim=dim)
index.init_index(
max_elements=num_vectors,
ef_construction=200,
M=m
)
index.add_items(self.vectors)
index.set_ef(ef_search)
# Query
start = time.time()
labels, _ = index.knn_query(self.queries, k=10)
latency = (time.time() - start) / len(self.queries) * 1000
# Calculate recall
recall = self._calculate_recall(labels)
return recall, latency
def _calculate_recall(self, predictions: np.ndarray, k: int = 10) -> float:
"""Calculate recall@k."""
correct = 0
total = len(self.queries) * k
for i, pred in enumerate(predictions):
correct += len(set(pred[:k]) & set(self.ground_truth[i][:k]))
return correct / total
Vector Quantization
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
@dataclass
class QuantizationResult:
"""Quantization result."""
compressed_vectors: np.ndarray
codebook: np.ndarray = None
compression_ratio: float = 1.0
reconstruction_error: float = 0.0
class ScalarQuantizer:
"""Scalar quantization (SQ)."""
def __init__(self, bits: int = 8):
self.bits = bits
self.num_levels = 2 ** bits
self.min_val = None
self.max_val = None
def fit(self, vectors: np.ndarray):
"""Fit quantizer to data."""
self.min_val = vectors.min(axis=0)
self.max_val = vectors.max(axis=0)
def encode(self, vectors: np.ndarray) -> np.ndarray:
"""Quantize vectors."""
# Normalize to [0, 1]
normalized = (vectors - self.min_val) / (self.max_val - self.min_val + 1e-8)
# Quantize to integer levels
quantized = np.round(normalized * (self.num_levels - 1)).astype(np.uint8)
return quantized
def decode(self, quantized: np.ndarray) -> np.ndarray:
"""Reconstruct vectors."""
normalized = quantized.astype(np.float32) / (self.num_levels - 1)
reconstructed = normalized * (self.max_val - self.min_val) + self.min_val
return reconstructed
def compress(self, vectors: np.ndarray) -> QuantizationResult:
"""Compress vectors."""
self.fit(vectors)
compressed = self.encode(vectors)
reconstructed = self.decode(compressed)
error = np.mean((vectors - reconstructed) ** 2)
# Compression: float32 (4 bytes) -> uint8 (1 byte)
compression_ratio = 4.0
return QuantizationResult(
compressed_vectors=compressed,
compression_ratio=compression_ratio,
reconstruction_error=error
)
class ProductQuantizer:
"""Product quantization (PQ)."""
def __init__(self, num_segments: int = 8, num_centroids: int = 256):
self.num_segments = num_segments
self.num_centroids = num_centroids
self.codebooks = None
self.segment_size = None
def fit(self, vectors: np.ndarray):
"""Train PQ codebooks."""
from sklearn.cluster import KMeans
dim = vectors.shape[1]
self.segment_size = dim // self.num_segments
self.codebooks = []
for i in range(self.num_segments):
start = i * self.segment_size
end = start + self.segment_size
segment = vectors[:, start:end]
kmeans = KMeans(
n_clusters=self.num_centroids,
n_init=1,
max_iter=20
)
kmeans.fit(segment)
self.codebooks.append(kmeans.cluster_centers_)
def encode(self, vectors: np.ndarray) -> np.ndarray:
"""Encode vectors to PQ codes."""
n = vectors.shape[0]
codes = np.zeros((n, self.num_segments), dtype=np.uint8)
for i in range(self.num_segments):
start = i * self.segment_size
end = start + self.segment_size
segment = vectors[:, start:end]
codebook = self.codebooks[i]
# Find nearest centroid
distances = np.linalg.norm(
segment[:, np.newaxis] - codebook,
axis=2
)
codes[:, i] = np.argmin(distances, axis=1)
return codes
def decode(self, codes: np.ndarray) -> np.ndarray:
"""Decode PQ codes to vectors."""
n = codes.shape[0]
dim = self.segment_size * self.num_segments
vectors = np.zeros((n, dim), dtype=np.float32)
for i in range(self.num_segments):
start = i * self.segment_size
end = start + self.segment_size
vectors[:, start:end] = self.codebooks[i][codes[:, i]]
return vectors
def compress(self, vectors: np.ndarray) -> QuantizationResult:
"""Compress vectors using PQ."""
self.fit(vectors)
codes = self.encode(vectors)
reconstructed = self.decode(codes)
error = np.mean((vectors - reconstructed) ** 2)
# Compression: dim * 4 bytes -> num_segments bytes
original_size = vectors.shape[1] * 4
compressed_size = self.num_segments
compression_ratio = original_size / compressed_size
return QuantizationResult(
compressed_vectors=codes,
codebook=np.array(self.codebooks),
compression_ratio=compression_ratio,
reconstruction_error=error
)
class BinaryQuantizer:
"""Binary quantization for extreme compression."""
def __init__(self):
self.thresholds = None
def fit(self, vectors: np.ndarray):
"""Fit thresholds (median per dimension)."""
self.thresholds = np.median(vectors, axis=0)
def encode(self, vectors: np.ndarray) -> np.ndarray:
"""Encode to binary vectors."""
binary = (vectors > self.thresholds).astype(np.uint8)
# Pack bits into bytes
dim = vectors.shape[1]
packed_dim = (dim + 7) // 8
packed = np.packbits(binary, axis=1)
return packed
def hamming_distance(
self,
query_packed: np.ndarray,
database_packed: np.ndarray
) -> np.ndarray:
"""Compute Hamming distances."""
# XOR and count bits
xor = np.bitwise_xor(query_packed, database_packed)
# Count set bits
distances = np.zeros(len(database_packed))
for i, x in enumerate(xor):
distances[i] = np.unpackbits(x).sum()
return distances
def compress(self, vectors: np.ndarray) -> QuantizationResult:
"""Compress to binary."""
self.fit(vectors)
packed = self.encode(vectors)
# Compression: dim * 4 bytes -> dim / 8 bytes
compression_ratio = 32.0
return QuantizationResult(
compressed_vectors=packed,
compression_ratio=compression_ratio,
reconstruction_error=float('inf') # No reconstruction
)
Sharding and Distribution
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
import hashlib
import numpy as np
@dataclass
class Shard:
"""Vector database shard."""
shard_id: str
num_vectors: int = 0
index: Any = None
metadata: dict = field(default_factory=dict)
class ShardingStrategy(ABC):
"""Abstract sharding strategy."""
@abstractmethod
def get_shard(self, vector_id: str, vector: np.ndarray = None) -> str:
"""Determine shard for vector."""
pass
@abstractmethod
def get_query_shards(self, query: np.ndarray) -> list[str]:
"""Determine shards to query."""
pass
class HashSharding(ShardingStrategy):
"""Hash-based sharding."""
def __init__(self, num_shards: int):
self.num_shards = num_shards
self.shard_ids = [f"shard_{i}" for i in range(num_shards)]
def get_shard(self, vector_id: str, vector: np.ndarray = None) -> str:
"""Hash vector ID to shard."""
hash_val = int(hashlib.md5(vector_id.encode()).hexdigest(), 16)
shard_idx = hash_val % self.num_shards
return self.shard_ids[shard_idx]
def get_query_shards(self, query: np.ndarray) -> list[str]:
"""Query all shards."""
return self.shard_ids
class ClusterSharding(ShardingStrategy):
"""Cluster-based sharding for locality."""
def __init__(self, num_shards: int, sample_vectors: np.ndarray = None):
self.num_shards = num_shards
self.shard_ids = [f"shard_{i}" for i in range(num_shards)]
self.centroids = None
if sample_vectors is not None:
self._train_centroids(sample_vectors)
def _train_centroids(self, vectors: np.ndarray):
"""Train cluster centroids."""
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=self.num_shards, n_init=3)
kmeans.fit(vectors)
self.centroids = kmeans.cluster_centers_
def get_shard(self, vector_id: str, vector: np.ndarray = None) -> str:
"""Assign to nearest centroid."""
if vector is None or self.centroids is None:
# Fallback to hash
hash_val = int(hashlib.md5(vector_id.encode()).hexdigest(), 16)
return self.shard_ids[hash_val % self.num_shards]
distances = np.linalg.norm(self.centroids - vector, axis=1)
nearest = np.argmin(distances)
return self.shard_ids[nearest]
def get_query_shards(self, query: np.ndarray, num_shards: int = 3) -> list[str]:
"""Query nearest cluster shards."""
if self.centroids is None:
return self.shard_ids
distances = np.linalg.norm(self.centroids - query, axis=1)
nearest_indices = np.argsort(distances)[:num_shards]
return [self.shard_ids[i] for i in nearest_indices]
class ShardedVectorDB:
"""Sharded vector database."""
def __init__(
self,
strategy: ShardingStrategy,
shard_factory: callable
):
self.strategy = strategy
self.shard_factory = shard_factory
self.shards: dict[str, Shard] = {}
def _get_or_create_shard(self, shard_id: str) -> Shard:
"""Get or create shard."""
if shard_id not in self.shards:
self.shards[shard_id] = Shard(
shard_id=shard_id,
index=self.shard_factory()
)
return self.shards[shard_id]
def insert(self, vector_id: str, vector: np.ndarray, metadata: dict = None):
"""Insert vector into appropriate shard."""
shard_id = self.strategy.get_shard(vector_id, vector)
shard = self._get_or_create_shard(shard_id)
shard.index.add(vector_id, vector, metadata)
shard.num_vectors += 1
def search(
self,
query: np.ndarray,
k: int = 10,
filter: dict = None
) -> list[dict]:
"""Search across relevant shards."""
shard_ids = self.strategy.get_query_shards(query)
all_results = []
for shard_id in shard_ids:
if shard_id in self.shards:
shard = self.shards[shard_id]
results = shard.index.search(query, k, filter)
all_results.extend(results)
# Merge and sort by score
all_results.sort(key=lambda x: x["score"], reverse=True)
return all_results[:k]
def get_stats(self) -> dict:
"""Get shard statistics."""
stats = {
"num_shards": len(self.shards),
"total_vectors": sum(s.num_vectors for s in self.shards.values()),
"shards": {}
}
for shard_id, shard in self.shards.items():
stats["shards"][shard_id] = {
"num_vectors": shard.num_vectors
}
return stats
class ReplicationManager:
"""Manage shard replication."""
def __init__(self, replication_factor: int = 2):
self.replication_factor = replication_factor
self.replicas: dict[str, list[str]] = {} # shard_id -> replica_ids
def get_replicas(self, shard_id: str) -> list[str]:
"""Get replica IDs for shard."""
if shard_id not in self.replicas:
self.replicas[shard_id] = [
f"{shard_id}_replica_{i}"
for i in range(self.replication_factor)
]
return self.replicas[shard_id]
def select_replica(self, shard_id: str) -> str:
"""Select replica for read (round-robin)."""
replicas = self.get_replicas(shard_id)
# Simple round-robin
import random
return random.choice(replicas)
Query Optimization
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
import time
@dataclass
class QueryPlan:
"""Query execution plan."""
use_prefilter: bool = False
use_reranking: bool = False
oversample_factor: int = 1
shards_to_query: list[str] = None
class QueryOptimizer:
"""Optimize vector queries."""
def __init__(self, db_stats: dict):
self.db_stats = db_stats
def plan_query(
self,
query: np.ndarray,
k: int,
filter: dict = None,
latency_budget_ms: float = 100
) -> QueryPlan:
"""Create query execution plan."""
plan = QueryPlan()
# Determine if prefiltering is beneficial
if filter:
selectivity = self._estimate_selectivity(filter)
if selectivity < 0.1:
# Highly selective filter - prefilter first
plan.use_prefilter = True
else:
# Post-filter after ANN search
plan.use_prefilter = False
plan.oversample_factor = int(1 / selectivity) + 1
# Determine if reranking is needed
total_vectors = self.db_stats.get("total_vectors", 0)
if total_vectors > 100000 and k < 20:
plan.use_reranking = True
plan.oversample_factor = max(plan.oversample_factor, 3)
return plan
def _estimate_selectivity(self, filter: dict) -> float:
"""Estimate filter selectivity."""
# Simplified estimation
# In practice, use statistics from metadata index
selectivity = 1.0
for field, condition in filter.items():
if isinstance(condition, dict):
if "$eq" in condition:
selectivity *= 0.1
elif "$in" in condition:
selectivity *= len(condition["$in"]) * 0.05
elif "$gt" in condition or "$lt" in condition:
selectivity *= 0.3
else:
selectivity *= 0.1
return selectivity
class QueryCache:
"""Cache for frequent queries."""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 300):
self.max_size = max_size
self.ttl = ttl_seconds
self.cache: dict[str, tuple[list, float]] = {}
def _hash_query(self, query: np.ndarray, k: int, filter: dict = None) -> str:
"""Create cache key."""
query_bytes = query.tobytes()
filter_str = str(sorted(filter.items())) if filter else ""
import hashlib
key = hashlib.md5(query_bytes + f"{k}{filter_str}".encode()).hexdigest()
return key
def get(
self,
query: np.ndarray,
k: int,
filter: dict = None
) -> Optional[list]:
"""Get cached results."""
key = self._hash_query(query, k, filter)
if key in self.cache:
results, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return results
else:
del self.cache[key]
return None
def set(
self,
query: np.ndarray,
k: int,
results: list,
filter: dict = None
):
"""Cache results."""
if len(self.cache) >= self.max_size:
# Evict oldest
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1])
del self.cache[oldest_key]
key = self._hash_query(query, k, filter)
self.cache[key] = (results, time.time())
class BatchQueryExecutor:
"""Execute queries in batches."""
def __init__(self, index: Any, batch_size: int = 100):
self.index = index
self.batch_size = batch_size
def search_batch(
self,
queries: np.ndarray,
k: int = 10
) -> list[list[dict]]:
"""Execute batch of queries."""
all_results = []
for i in range(0, len(queries), self.batch_size):
batch = queries[i:i + self.batch_size]
# Most vector DBs support batch queries
batch_results = self.index.search_batch(batch, k)
all_results.extend(batch_results)
return all_results
class HybridSearcher:
"""Combine vector and keyword search."""
def __init__(
self,
vector_index: Any,
keyword_index: Any,
alpha: float = 0.7 # Weight for vector search
):
self.vector_index = vector_index
self.keyword_index = keyword_index
self.alpha = alpha
def search(
self,
query_vector: np.ndarray,
query_text: str,
k: int = 10
) -> list[dict]:
"""Hybrid search combining vector and keyword."""
# Vector search
vector_results = self.vector_index.search(query_vector, k * 2)
# Keyword search
keyword_results = self.keyword_index.search(query_text, k * 2)
# Combine scores
combined = {}
for result in vector_results:
doc_id = result["id"]
combined[doc_id] = {
"id": doc_id,
"vector_score": result["score"],
"keyword_score": 0,
"metadata": result.get("metadata", {})
}
for result in keyword_results:
doc_id = result["id"]
if doc_id in combined:
combined[doc_id]["keyword_score"] = result["score"]
else:
combined[doc_id] = {
"id": doc_id,
"vector_score": 0,
"keyword_score": result["score"],
"metadata": result.get("metadata", {})
}
# Calculate final scores
for doc_id, data in combined.items():
data["score"] = (
self.alpha * data["vector_score"] +
(1 - self.alpha) * data["keyword_score"]
)
# Sort and return top k
results = sorted(combined.values(), key=lambda x: x["score"], reverse=True)
return results[:k]
Production Vector Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import numpy as np
app = FastAPI()
class IndexRequest(BaseModel):
num_vectors: int
dimension: int
qps_target: int = 100
recall_target: float = 0.95
memory_budget_gb: Optional[float] = None
class InsertRequest(BaseModel):
id: str
vector: list[float]
metadata: Optional[dict] = None
class SearchRequest(BaseModel):
vector: list[float]
k: int = 10
filter: Optional[dict] = None
class BatchSearchRequest(BaseModel):
vectors: list[list[float]]
k: int = 10
# Initialize components
optimizer = IndexOptimizer()
cache = QueryCache()
# Placeholder index (replace with actual implementation)
class SimpleIndex:
def __init__(self):
self.vectors = {}
self.metadata = {}
def add(self, id: str, vector: np.ndarray, metadata: dict = None):
self.vectors[id] = vector
self.metadata[id] = metadata or {}
def search(self, query: np.ndarray, k: int, filter: dict = None) -> list:
results = []
for id, vec in self.vectors.items():
score = np.dot(query, vec) / (np.linalg.norm(query) * np.linalg.norm(vec))
results.append({"id": id, "score": float(score), "metadata": self.metadata[id]})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:k]
index = SimpleIndex()
@app.post("/v1/index/recommend")
async def recommend_index(request: IndexRequest) -> dict:
"""Get index configuration recommendation."""
config = optimizer.recommend_index(
request.num_vectors,
request.dimension,
request.qps_target,
request.recall_target,
request.memory_budget_gb
)
return {
"index_type": config.index_type.value,
"dimension": config.dimension,
"metric": config.metric,
"hnsw": {
"m": config.m,
"ef_construction": config.ef_construction,
"ef_search": config.ef_search
},
"ivf": {
"nlist": config.nlist,
"nprobe": config.nprobe
},
"quantization": {
"use_pq": config.use_pq,
"pq_segments": config.pq_segments,
"pq_bits": config.pq_bits
}
}
@app.post("/v1/vectors/insert")
async def insert_vector(request: InsertRequest) -> dict:
"""Insert vector."""
vector = np.array(request.vector, dtype=np.float32)
index.add(request.id, vector, request.metadata)
return {"status": "inserted", "id": request.id}
@app.post("/v1/vectors/search")
async def search_vectors(request: SearchRequest) -> dict:
"""Search vectors."""
query = np.array(request.vector, dtype=np.float32)
# Check cache
cached = cache.get(query, request.k, request.filter)
if cached:
return {"results": cached, "cached": True}
# Search
results = index.search(query, request.k, request.filter)
# Cache results
cache.set(query, request.k, results, request.filter)
return {"results": results, "cached": False}
@app.post("/v1/vectors/search/batch")
async def batch_search(request: BatchSearchRequest) -> dict:
"""Batch search."""
all_results = []
for vec in request.vectors:
query = np.array(vec, dtype=np.float32)
results = index.search(query, request.k)
all_results.append(results)
return {"results": all_results}
@app.get("/v1/stats")
async def get_stats() -> dict:
"""Get index statistics."""
return {
"num_vectors": len(index.vectors),
"cache_size": len(cache.cache)
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- HNSW Paper: https://arxiv.org/abs/1603.09320
- Product Quantization: https://lear.inrialpes.fr/pubs/2011/JDS11/jegou_searching_with_quantization.pdf
- Pinecone Docs: https://docs.pinecone.io/
- Weaviate Docs: https://weaviate.io/developers/weaviate
- Qdrant Docs: https://qdrant.tech/documentation/
Conclusion
Vector database optimization is about finding the right tradeoffs for your specific use case. Start by understanding your requirements: how many vectors, what latency is acceptable, what recall do you need, and what’s your memory budget. For most applications, HNSW provides the best balance of recall and speed—tune M for memory/recall tradeoff and ef_search for latency/recall tradeoff. When memory becomes a constraint, use quantization: scalar quantization gives 4x compression with minimal accuracy loss, product quantization can achieve 32x+ compression for large-scale systems. Sharding becomes necessary when you exceed single-node capacity—cluster-based sharding provides better query locality than hash sharding but requires more careful management. Don’t forget query-level optimizations: caching frequent queries, batching for throughput, and hybrid search for better relevance. Monitor your system continuously—track recall against ground truth, measure p99 latencies, and watch for shard imbalance. The best vector database configuration is one that’s continuously tuned based on actual usage patterns and evolving requirements.
