Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Vector Database Optimization: Scaling Semantic Search to Millions of Embeddings

Introduction: Vector databases are the backbone of modern AI applications—powering semantic search, RAG systems, and recommendation engines. But as your vector collection grows from thousands to millions of embeddings, naive approaches break down. Query latency spikes, memory costs explode, and recall accuracy degrades. This guide covers practical optimization strategies: choosing the right index type for your access patterns, tuning HNSW parameters for the latency-recall tradeoff you need, using quantization to cut memory usage by 4-8x with minimal accuracy loss, and implementing sharding strategies for horizontal scale. Whether you’re using Pinecone, Weaviate, Qdrant, or Milvus, these techniques will help you build vector search systems that stay fast and accurate at scale.

Vector Database Optimization
Vector DB Optimization: Index Tuning, Quantization, Sharding Strategy

Index Selection and Configuration

from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
from abc import ABC, abstractmethod
import numpy as np

class IndexType(Enum):
    """Vector index types."""
    
    FLAT = "flat"           # Exact search, no index
    IVF = "ivf"             # Inverted file index
    HNSW = "hnsw"           # Hierarchical navigable small world
    ANNOY = "annoy"         # Approximate nearest neighbors
    SCANN = "scann"         # Scalable nearest neighbors

@dataclass
class IndexConfig:
    """Index configuration."""
    
    index_type: IndexType
    dimension: int
    metric: str = "cosine"  # cosine, euclidean, dot_product
    
    # IVF parameters
    nlist: int = 100        # Number of clusters
    nprobe: int = 10        # Clusters to search
    
    # HNSW parameters
    m: int = 16             # Connections per node
    ef_construction: int = 200  # Build-time search width
    ef_search: int = 50     # Query-time search width
    
    # Quantization
    use_pq: bool = False    # Product quantization
    pq_segments: int = 8    # PQ segments
    pq_bits: int = 8        # Bits per segment

class IndexOptimizer:
    """Optimize index configuration."""
    
    def __init__(self):
        self.benchmarks = {}
    
    def recommend_index(
        self,
        num_vectors: int,
        dimension: int,
        qps_target: int,
        recall_target: float = 0.95,
        memory_budget_gb: float = None
    ) -> IndexConfig:
        """Recommend index configuration."""
        
        # Small datasets: use flat index
        if num_vectors < 10000:
            return IndexConfig(
                index_type=IndexType.FLAT,
                dimension=dimension
            )
        
        # Medium datasets: IVF or HNSW
        if num_vectors < 1000000:
            # HNSW for high recall requirements
            if recall_target > 0.95:
                return self._configure_hnsw(
                    num_vectors, dimension, recall_target, qps_target
                )
            else:
                return self._configure_ivf(
                    num_vectors, dimension, recall_target, qps_target
                )
        
        # Large datasets: need quantization
        memory_per_vector = dimension * 4  # float32
        total_memory = num_vectors * memory_per_vector / 1e9
        
        if memory_budget_gb and total_memory > memory_budget_gb:
            return self._configure_with_quantization(
                num_vectors, dimension, memory_budget_gb, recall_target
            )
        
        return self._configure_hnsw(
            num_vectors, dimension, recall_target, qps_target
        )
    
    def _configure_hnsw(
        self,
        num_vectors: int,
        dimension: int,
        recall_target: float,
        qps_target: int
    ) -> IndexConfig:
        """Configure HNSW index."""
        
        # M: more connections = higher recall, more memory
        if recall_target > 0.99:
            m = 32
        elif recall_target > 0.95:
            m = 16
        else:
            m = 8
        
        # ef_construction: higher = better index, slower build
        ef_construction = max(100, m * 10)
        
        # ef_search: higher = better recall, slower search
        if recall_target > 0.99:
            ef_search = 200
        elif recall_target > 0.95:
            ef_search = 100
        else:
            ef_search = 50
        
        return IndexConfig(
            index_type=IndexType.HNSW,
            dimension=dimension,
            m=m,
            ef_construction=ef_construction,
            ef_search=ef_search
        )
    
    def _configure_ivf(
        self,
        num_vectors: int,
        dimension: int,
        recall_target: float,
        qps_target: int
    ) -> IndexConfig:
        """Configure IVF index."""
        
        # nlist: sqrt(n) is a good starting point
        nlist = int(np.sqrt(num_vectors))
        nlist = max(100, min(nlist, 4096))
        
        # nprobe: more probes = higher recall, slower search
        if recall_target > 0.95:
            nprobe = max(10, nlist // 10)
        else:
            nprobe = max(5, nlist // 20)
        
        return IndexConfig(
            index_type=IndexType.IVF,
            dimension=dimension,
            nlist=nlist,
            nprobe=nprobe
        )
    
    def _configure_with_quantization(
        self,
        num_vectors: int,
        dimension: int,
        memory_budget_gb: float,
        recall_target: float
    ) -> IndexConfig:
        """Configure index with quantization."""
        
        # Calculate required compression
        raw_memory = num_vectors * dimension * 4 / 1e9
        compression_ratio = raw_memory / memory_budget_gb
        
        # PQ segments: more = better accuracy, more memory
        if compression_ratio > 8:
            pq_segments = dimension // 8
            pq_bits = 4
        elif compression_ratio > 4:
            pq_segments = dimension // 4
            pq_bits = 8
        else:
            pq_segments = dimension // 2
            pq_bits = 8
        
        config = self._configure_hnsw(
            num_vectors, dimension, recall_target, 100
        )
        config.use_pq = True
        config.pq_segments = pq_segments
        config.pq_bits = pq_bits
        
        return config

class HNSWTuner:
    """Tune HNSW parameters."""
    
    def __init__(self, vectors: np.ndarray, queries: np.ndarray):
        self.vectors = vectors
        self.queries = queries
        self.ground_truth = self._compute_ground_truth()
    
    def _compute_ground_truth(self, k: int = 10) -> np.ndarray:
        """Compute exact nearest neighbors."""
        
        from sklearn.neighbors import NearestNeighbors
        
        nn = NearestNeighbors(n_neighbors=k, metric='cosine')
        nn.fit(self.vectors)
        
        _, indices = nn.kneighbors(self.queries)
        return indices
    
    def tune(
        self,
        m_values: list[int] = [8, 16, 32],
        ef_values: list[int] = [50, 100, 200, 400]
    ) -> dict:
        """Grid search for optimal parameters."""
        
        results = []
        
        for m in m_values:
            for ef in ef_values:
                recall, latency = self._evaluate(m, ef)
                
                results.append({
                    "m": m,
                    "ef_search": ef,
                    "recall": recall,
                    "latency_ms": latency
                })
        
        return {
            "results": results,
            "best_recall": max(results, key=lambda x: x["recall"]),
            "best_latency": min(results, key=lambda x: x["latency_ms"])
        }
    
    def _evaluate(self, m: int, ef_search: int) -> tuple[float, float]:
        """Evaluate configuration."""
        
        import hnswlib
        import time
        
        dim = self.vectors.shape[1]
        num_vectors = self.vectors.shape[0]
        
        # Build index
        index = hnswlib.Index(space='cosine', dim=dim)
        index.init_index(
            max_elements=num_vectors,
            ef_construction=200,
            M=m
        )
        index.add_items(self.vectors)
        index.set_ef(ef_search)
        
        # Query
        start = time.time()
        labels, _ = index.knn_query(self.queries, k=10)
        latency = (time.time() - start) / len(self.queries) * 1000
        
        # Calculate recall
        recall = self._calculate_recall(labels)
        
        return recall, latency
    
    def _calculate_recall(self, predictions: np.ndarray, k: int = 10) -> float:
        """Calculate recall@k."""
        
        correct = 0
        total = len(self.queries) * k
        
        for i, pred in enumerate(predictions):
            correct += len(set(pred[:k]) & set(self.ground_truth[i][:k]))
        
        return correct / total

Vector Quantization

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

@dataclass
class QuantizationResult:
    """Quantization result."""
    
    compressed_vectors: np.ndarray
    codebook: np.ndarray = None
    compression_ratio: float = 1.0
    reconstruction_error: float = 0.0

class ScalarQuantizer:
    """Scalar quantization (SQ)."""
    
    def __init__(self, bits: int = 8):
        self.bits = bits
        self.num_levels = 2 ** bits
        self.min_val = None
        self.max_val = None
    
    def fit(self, vectors: np.ndarray):
        """Fit quantizer to data."""
        
        self.min_val = vectors.min(axis=0)
        self.max_val = vectors.max(axis=0)
    
    def encode(self, vectors: np.ndarray) -> np.ndarray:
        """Quantize vectors."""
        
        # Normalize to [0, 1]
        normalized = (vectors - self.min_val) / (self.max_val - self.min_val + 1e-8)
        
        # Quantize to integer levels
        quantized = np.round(normalized * (self.num_levels - 1)).astype(np.uint8)
        
        return quantized
    
    def decode(self, quantized: np.ndarray) -> np.ndarray:
        """Reconstruct vectors."""
        
        normalized = quantized.astype(np.float32) / (self.num_levels - 1)
        reconstructed = normalized * (self.max_val - self.min_val) + self.min_val
        
        return reconstructed
    
    def compress(self, vectors: np.ndarray) -> QuantizationResult:
        """Compress vectors."""
        
        self.fit(vectors)
        compressed = self.encode(vectors)
        reconstructed = self.decode(compressed)
        
        error = np.mean((vectors - reconstructed) ** 2)
        
        # Compression: float32 (4 bytes) -> uint8 (1 byte)
        compression_ratio = 4.0
        
        return QuantizationResult(
            compressed_vectors=compressed,
            compression_ratio=compression_ratio,
            reconstruction_error=error
        )

class ProductQuantizer:
    """Product quantization (PQ)."""
    
    def __init__(self, num_segments: int = 8, num_centroids: int = 256):
        self.num_segments = num_segments
        self.num_centroids = num_centroids
        self.codebooks = None
        self.segment_size = None
    
    def fit(self, vectors: np.ndarray):
        """Train PQ codebooks."""
        
        from sklearn.cluster import KMeans
        
        dim = vectors.shape[1]
        self.segment_size = dim // self.num_segments
        
        self.codebooks = []
        
        for i in range(self.num_segments):
            start = i * self.segment_size
            end = start + self.segment_size
            
            segment = vectors[:, start:end]
            
            kmeans = KMeans(
                n_clusters=self.num_centroids,
                n_init=1,
                max_iter=20
            )
            kmeans.fit(segment)
            
            self.codebooks.append(kmeans.cluster_centers_)
    
    def encode(self, vectors: np.ndarray) -> np.ndarray:
        """Encode vectors to PQ codes."""
        
        n = vectors.shape[0]
        codes = np.zeros((n, self.num_segments), dtype=np.uint8)
        
        for i in range(self.num_segments):
            start = i * self.segment_size
            end = start + self.segment_size
            
            segment = vectors[:, start:end]
            codebook = self.codebooks[i]
            
            # Find nearest centroid
            distances = np.linalg.norm(
                segment[:, np.newaxis] - codebook,
                axis=2
            )
            codes[:, i] = np.argmin(distances, axis=1)
        
        return codes
    
    def decode(self, codes: np.ndarray) -> np.ndarray:
        """Decode PQ codes to vectors."""
        
        n = codes.shape[0]
        dim = self.segment_size * self.num_segments
        vectors = np.zeros((n, dim), dtype=np.float32)
        
        for i in range(self.num_segments):
            start = i * self.segment_size
            end = start + self.segment_size
            
            vectors[:, start:end] = self.codebooks[i][codes[:, i]]
        
        return vectors
    
    def compress(self, vectors: np.ndarray) -> QuantizationResult:
        """Compress vectors using PQ."""
        
        self.fit(vectors)
        codes = self.encode(vectors)
        reconstructed = self.decode(codes)
        
        error = np.mean((vectors - reconstructed) ** 2)
        
        # Compression: dim * 4 bytes -> num_segments bytes
        original_size = vectors.shape[1] * 4
        compressed_size = self.num_segments
        compression_ratio = original_size / compressed_size
        
        return QuantizationResult(
            compressed_vectors=codes,
            codebook=np.array(self.codebooks),
            compression_ratio=compression_ratio,
            reconstruction_error=error
        )

class BinaryQuantizer:
    """Binary quantization for extreme compression."""
    
    def __init__(self):
        self.thresholds = None
    
    def fit(self, vectors: np.ndarray):
        """Fit thresholds (median per dimension)."""
        
        self.thresholds = np.median(vectors, axis=0)
    
    def encode(self, vectors: np.ndarray) -> np.ndarray:
        """Encode to binary vectors."""
        
        binary = (vectors > self.thresholds).astype(np.uint8)
        
        # Pack bits into bytes
        dim = vectors.shape[1]
        packed_dim = (dim + 7) // 8
        
        packed = np.packbits(binary, axis=1)
        
        return packed
    
    def hamming_distance(
        self,
        query_packed: np.ndarray,
        database_packed: np.ndarray
    ) -> np.ndarray:
        """Compute Hamming distances."""
        
        # XOR and count bits
        xor = np.bitwise_xor(query_packed, database_packed)
        
        # Count set bits
        distances = np.zeros(len(database_packed))
        for i, x in enumerate(xor):
            distances[i] = np.unpackbits(x).sum()
        
        return distances
    
    def compress(self, vectors: np.ndarray) -> QuantizationResult:
        """Compress to binary."""
        
        self.fit(vectors)
        packed = self.encode(vectors)
        
        # Compression: dim * 4 bytes -> dim / 8 bytes
        compression_ratio = 32.0
        
        return QuantizationResult(
            compressed_vectors=packed,
            compression_ratio=compression_ratio,
            reconstruction_error=float('inf')  # No reconstruction
        )

Sharding and Distribution

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
import hashlib
import numpy as np

@dataclass
class Shard:
    """Vector database shard."""
    
    shard_id: str
    num_vectors: int = 0
    index: Any = None
    metadata: dict = field(default_factory=dict)

class ShardingStrategy(ABC):
    """Abstract sharding strategy."""
    
    @abstractmethod
    def get_shard(self, vector_id: str, vector: np.ndarray = None) -> str:
        """Determine shard for vector."""
        pass
    
    @abstractmethod
    def get_query_shards(self, query: np.ndarray) -> list[str]:
        """Determine shards to query."""
        pass

class HashSharding(ShardingStrategy):
    """Hash-based sharding."""
    
    def __init__(self, num_shards: int):
        self.num_shards = num_shards
        self.shard_ids = [f"shard_{i}" for i in range(num_shards)]
    
    def get_shard(self, vector_id: str, vector: np.ndarray = None) -> str:
        """Hash vector ID to shard."""
        
        hash_val = int(hashlib.md5(vector_id.encode()).hexdigest(), 16)
        shard_idx = hash_val % self.num_shards
        
        return self.shard_ids[shard_idx]
    
    def get_query_shards(self, query: np.ndarray) -> list[str]:
        """Query all shards."""
        
        return self.shard_ids

class ClusterSharding(ShardingStrategy):
    """Cluster-based sharding for locality."""
    
    def __init__(self, num_shards: int, sample_vectors: np.ndarray = None):
        self.num_shards = num_shards
        self.shard_ids = [f"shard_{i}" for i in range(num_shards)]
        self.centroids = None
        
        if sample_vectors is not None:
            self._train_centroids(sample_vectors)
    
    def _train_centroids(self, vectors: np.ndarray):
        """Train cluster centroids."""
        
        from sklearn.cluster import KMeans
        
        kmeans = KMeans(n_clusters=self.num_shards, n_init=3)
        kmeans.fit(vectors)
        
        self.centroids = kmeans.cluster_centers_
    
    def get_shard(self, vector_id: str, vector: np.ndarray = None) -> str:
        """Assign to nearest centroid."""
        
        if vector is None or self.centroids is None:
            # Fallback to hash
            hash_val = int(hashlib.md5(vector_id.encode()).hexdigest(), 16)
            return self.shard_ids[hash_val % self.num_shards]
        
        distances = np.linalg.norm(self.centroids - vector, axis=1)
        nearest = np.argmin(distances)
        
        return self.shard_ids[nearest]
    
    def get_query_shards(self, query: np.ndarray, num_shards: int = 3) -> list[str]:
        """Query nearest cluster shards."""
        
        if self.centroids is None:
            return self.shard_ids
        
        distances = np.linalg.norm(self.centroids - query, axis=1)
        nearest_indices = np.argsort(distances)[:num_shards]
        
        return [self.shard_ids[i] for i in nearest_indices]

class ShardedVectorDB:
    """Sharded vector database."""
    
    def __init__(
        self,
        strategy: ShardingStrategy,
        shard_factory: callable
    ):
        self.strategy = strategy
        self.shard_factory = shard_factory
        self.shards: dict[str, Shard] = {}
    
    def _get_or_create_shard(self, shard_id: str) -> Shard:
        """Get or create shard."""
        
        if shard_id not in self.shards:
            self.shards[shard_id] = Shard(
                shard_id=shard_id,
                index=self.shard_factory()
            )
        
        return self.shards[shard_id]
    
    def insert(self, vector_id: str, vector: np.ndarray, metadata: dict = None):
        """Insert vector into appropriate shard."""
        
        shard_id = self.strategy.get_shard(vector_id, vector)
        shard = self._get_or_create_shard(shard_id)
        
        shard.index.add(vector_id, vector, metadata)
        shard.num_vectors += 1
    
    def search(
        self,
        query: np.ndarray,
        k: int = 10,
        filter: dict = None
    ) -> list[dict]:
        """Search across relevant shards."""
        
        shard_ids = self.strategy.get_query_shards(query)
        
        all_results = []
        
        for shard_id in shard_ids:
            if shard_id in self.shards:
                shard = self.shards[shard_id]
                results = shard.index.search(query, k, filter)
                all_results.extend(results)
        
        # Merge and sort by score
        all_results.sort(key=lambda x: x["score"], reverse=True)
        
        return all_results[:k]
    
    def get_stats(self) -> dict:
        """Get shard statistics."""
        
        stats = {
            "num_shards": len(self.shards),
            "total_vectors": sum(s.num_vectors for s in self.shards.values()),
            "shards": {}
        }
        
        for shard_id, shard in self.shards.items():
            stats["shards"][shard_id] = {
                "num_vectors": shard.num_vectors
            }
        
        return stats

class ReplicationManager:
    """Manage shard replication."""
    
    def __init__(self, replication_factor: int = 2):
        self.replication_factor = replication_factor
        self.replicas: dict[str, list[str]] = {}  # shard_id -> replica_ids
    
    def get_replicas(self, shard_id: str) -> list[str]:
        """Get replica IDs for shard."""
        
        if shard_id not in self.replicas:
            self.replicas[shard_id] = [
                f"{shard_id}_replica_{i}"
                for i in range(self.replication_factor)
            ]
        
        return self.replicas[shard_id]
    
    def select_replica(self, shard_id: str) -> str:
        """Select replica for read (round-robin)."""
        
        replicas = self.get_replicas(shard_id)
        
        # Simple round-robin
        import random
        return random.choice(replicas)

Query Optimization

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
import time

@dataclass
class QueryPlan:
    """Query execution plan."""
    
    use_prefilter: bool = False
    use_reranking: bool = False
    oversample_factor: int = 1
    shards_to_query: list[str] = None

class QueryOptimizer:
    """Optimize vector queries."""
    
    def __init__(self, db_stats: dict):
        self.db_stats = db_stats
    
    def plan_query(
        self,
        query: np.ndarray,
        k: int,
        filter: dict = None,
        latency_budget_ms: float = 100
    ) -> QueryPlan:
        """Create query execution plan."""
        
        plan = QueryPlan()
        
        # Determine if prefiltering is beneficial
        if filter:
            selectivity = self._estimate_selectivity(filter)
            
            if selectivity < 0.1:
                # Highly selective filter - prefilter first
                plan.use_prefilter = True
            else:
                # Post-filter after ANN search
                plan.use_prefilter = False
                plan.oversample_factor = int(1 / selectivity) + 1
        
        # Determine if reranking is needed
        total_vectors = self.db_stats.get("total_vectors", 0)
        
        if total_vectors > 100000 and k < 20:
            plan.use_reranking = True
            plan.oversample_factor = max(plan.oversample_factor, 3)
        
        return plan
    
    def _estimate_selectivity(self, filter: dict) -> float:
        """Estimate filter selectivity."""
        
        # Simplified estimation
        # In practice, use statistics from metadata index
        
        selectivity = 1.0
        
        for field, condition in filter.items():
            if isinstance(condition, dict):
                if "$eq" in condition:
                    selectivity *= 0.1
                elif "$in" in condition:
                    selectivity *= len(condition["$in"]) * 0.05
                elif "$gt" in condition or "$lt" in condition:
                    selectivity *= 0.3
            else:
                selectivity *= 0.1
        
        return selectivity

class QueryCache:
    """Cache for frequent queries."""
    
    def __init__(self, max_size: int = 1000, ttl_seconds: int = 300):
        self.max_size = max_size
        self.ttl = ttl_seconds
        self.cache: dict[str, tuple[list, float]] = {}
    
    def _hash_query(self, query: np.ndarray, k: int, filter: dict = None) -> str:
        """Create cache key."""
        
        query_bytes = query.tobytes()
        filter_str = str(sorted(filter.items())) if filter else ""
        
        import hashlib
        key = hashlib.md5(query_bytes + f"{k}{filter_str}".encode()).hexdigest()
        
        return key
    
    def get(
        self,
        query: np.ndarray,
        k: int,
        filter: dict = None
    ) -> Optional[list]:
        """Get cached results."""
        
        key = self._hash_query(query, k, filter)
        
        if key in self.cache:
            results, timestamp = self.cache[key]
            
            if time.time() - timestamp < self.ttl:
                return results
            else:
                del self.cache[key]
        
        return None
    
    def set(
        self,
        query: np.ndarray,
        k: int,
        results: list,
        filter: dict = None
    ):
        """Cache results."""
        
        if len(self.cache) >= self.max_size:
            # Evict oldest
            oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1])
            del self.cache[oldest_key]
        
        key = self._hash_query(query, k, filter)
        self.cache[key] = (results, time.time())

class BatchQueryExecutor:
    """Execute queries in batches."""
    
    def __init__(self, index: Any, batch_size: int = 100):
        self.index = index
        self.batch_size = batch_size
    
    def search_batch(
        self,
        queries: np.ndarray,
        k: int = 10
    ) -> list[list[dict]]:
        """Execute batch of queries."""
        
        all_results = []
        
        for i in range(0, len(queries), self.batch_size):
            batch = queries[i:i + self.batch_size]
            
            # Most vector DBs support batch queries
            batch_results = self.index.search_batch(batch, k)
            all_results.extend(batch_results)
        
        return all_results

class HybridSearcher:
    """Combine vector and keyword search."""
    
    def __init__(
        self,
        vector_index: Any,
        keyword_index: Any,
        alpha: float = 0.7  # Weight for vector search
    ):
        self.vector_index = vector_index
        self.keyword_index = keyword_index
        self.alpha = alpha
    
    def search(
        self,
        query_vector: np.ndarray,
        query_text: str,
        k: int = 10
    ) -> list[dict]:
        """Hybrid search combining vector and keyword."""
        
        # Vector search
        vector_results = self.vector_index.search(query_vector, k * 2)
        
        # Keyword search
        keyword_results = self.keyword_index.search(query_text, k * 2)
        
        # Combine scores
        combined = {}
        
        for result in vector_results:
            doc_id = result["id"]
            combined[doc_id] = {
                "id": doc_id,
                "vector_score": result["score"],
                "keyword_score": 0,
                "metadata": result.get("metadata", {})
            }
        
        for result in keyword_results:
            doc_id = result["id"]
            if doc_id in combined:
                combined[doc_id]["keyword_score"] = result["score"]
            else:
                combined[doc_id] = {
                    "id": doc_id,
                    "vector_score": 0,
                    "keyword_score": result["score"],
                    "metadata": result.get("metadata", {})
                }
        
        # Calculate final scores
        for doc_id, data in combined.items():
            data["score"] = (
                self.alpha * data["vector_score"] +
                (1 - self.alpha) * data["keyword_score"]
            )
        
        # Sort and return top k
        results = sorted(combined.values(), key=lambda x: x["score"], reverse=True)
        
        return results[:k]

Production Vector Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import numpy as np

app = FastAPI()

class IndexRequest(BaseModel):
    num_vectors: int
    dimension: int
    qps_target: int = 100
    recall_target: float = 0.95
    memory_budget_gb: Optional[float] = None

class InsertRequest(BaseModel):
    id: str
    vector: list[float]
    metadata: Optional[dict] = None

class SearchRequest(BaseModel):
    vector: list[float]
    k: int = 10
    filter: Optional[dict] = None

class BatchSearchRequest(BaseModel):
    vectors: list[list[float]]
    k: int = 10

# Initialize components
optimizer = IndexOptimizer()
cache = QueryCache()

# Placeholder index (replace with actual implementation)
class SimpleIndex:
    def __init__(self):
        self.vectors = {}
        self.metadata = {}
    
    def add(self, id: str, vector: np.ndarray, metadata: dict = None):
        self.vectors[id] = vector
        self.metadata[id] = metadata or {}
    
    def search(self, query: np.ndarray, k: int, filter: dict = None) -> list:
        results = []
        for id, vec in self.vectors.items():
            score = np.dot(query, vec) / (np.linalg.norm(query) * np.linalg.norm(vec))
            results.append({"id": id, "score": float(score), "metadata": self.metadata[id]})
        results.sort(key=lambda x: x["score"], reverse=True)
        return results[:k]

index = SimpleIndex()

@app.post("/v1/index/recommend")
async def recommend_index(request: IndexRequest) -> dict:
    """Get index configuration recommendation."""
    
    config = optimizer.recommend_index(
        request.num_vectors,
        request.dimension,
        request.qps_target,
        request.recall_target,
        request.memory_budget_gb
    )
    
    return {
        "index_type": config.index_type.value,
        "dimension": config.dimension,
        "metric": config.metric,
        "hnsw": {
            "m": config.m,
            "ef_construction": config.ef_construction,
            "ef_search": config.ef_search
        },
        "ivf": {
            "nlist": config.nlist,
            "nprobe": config.nprobe
        },
        "quantization": {
            "use_pq": config.use_pq,
            "pq_segments": config.pq_segments,
            "pq_bits": config.pq_bits
        }
    }

@app.post("/v1/vectors/insert")
async def insert_vector(request: InsertRequest) -> dict:
    """Insert vector."""
    
    vector = np.array(request.vector, dtype=np.float32)
    index.add(request.id, vector, request.metadata)
    
    return {"status": "inserted", "id": request.id}

@app.post("/v1/vectors/search")
async def search_vectors(request: SearchRequest) -> dict:
    """Search vectors."""
    
    query = np.array(request.vector, dtype=np.float32)
    
    # Check cache
    cached = cache.get(query, request.k, request.filter)
    if cached:
        return {"results": cached, "cached": True}
    
    # Search
    results = index.search(query, request.k, request.filter)
    
    # Cache results
    cache.set(query, request.k, results, request.filter)
    
    return {"results": results, "cached": False}

@app.post("/v1/vectors/search/batch")
async def batch_search(request: BatchSearchRequest) -> dict:
    """Batch search."""
    
    all_results = []
    
    for vec in request.vectors:
        query = np.array(vec, dtype=np.float32)
        results = index.search(query, request.k)
        all_results.append(results)
    
    return {"results": all_results}

@app.get("/v1/stats")
async def get_stats() -> dict:
    """Get index statistics."""
    
    return {
        "num_vectors": len(index.vectors),
        "cache_size": len(cache.cache)
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Vector database optimization is about finding the right tradeoffs for your specific use case. Start by understanding your requirements: how many vectors, what latency is acceptable, what recall do you need, and what’s your memory budget. For most applications, HNSW provides the best balance of recall and speed—tune M for memory/recall tradeoff and ef_search for latency/recall tradeoff. When memory becomes a constraint, use quantization: scalar quantization gives 4x compression with minimal accuracy loss, product quantization can achieve 32x+ compression for large-scale systems. Sharding becomes necessary when you exceed single-node capacity—cluster-based sharding provides better query locality than hash sharding but requires more careful management. Don’t forget query-level optimizations: caching frequent queries, batching for throughput, and hybrid search for better relevance. Monitor your system continuously—track recall against ground truth, measure p99 latencies, and watch for shard imbalance. The best vector database configuration is one that’s continuously tuned based on actual usage patterns and evolving requirements.