Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Embedding Dimensionality Reduction: Compressing Vectors Without Losing Semantics

Introduction: High-dimensional embeddings from models like OpenAI’s text-embedding-3-large (3072 dimensions) or Cohere’s embed-v3 (1024 dimensions) deliver excellent semantic understanding but come with costs: more storage, slower similarity computations, and higher memory usage. For many applications, you can reduce dimensions significantly while preserving most of the semantic information. This guide covers practical dimensionality reduction techniques: PCA for linear projections, Matryoshka embeddings that are designed for truncation, autoencoders for learned compression, and quantization-aware approaches. Whether you’re optimizing for cost, latency, or scale, these techniques will help you find the right balance between embedding quality and efficiency.

Embedding Dimensionality Reduction
Dimensionality Reduction: PCA Projection, Matryoshka Truncation, Autoencoder Compression

PCA-Based Reduction

from dataclasses import dataclass, field
from typing import Any, Optional
import numpy as np

@dataclass
class ReductionResult:
    """Dimensionality reduction result."""
    
    reduced_embeddings: np.ndarray
    original_dim: int
    reduced_dim: int
    variance_retained: float = 0.0
    reconstruction_error: float = 0.0

class PCAReducer:
    """PCA-based dimensionality reduction."""
    
    def __init__(self, n_components: int = 256):
        self.n_components = n_components
        self.mean = None
        self.components = None
        self.explained_variance_ratio = None
    
    def fit(self, embeddings: np.ndarray):
        """Fit PCA on embeddings."""
        
        # Center data
        self.mean = embeddings.mean(axis=0)
        centered = embeddings - self.mean
        
        # Compute covariance matrix
        cov = np.cov(centered.T)
        
        # Eigendecomposition
        eigenvalues, eigenvectors = np.linalg.eigh(cov)
        
        # Sort by eigenvalue (descending)
        idx = np.argsort(eigenvalues)[::-1]
        eigenvalues = eigenvalues[idx]
        eigenvectors = eigenvectors[:, idx]
        
        # Keep top components
        self.components = eigenvectors[:, :self.n_components].T
        
        # Explained variance
        total_var = eigenvalues.sum()
        self.explained_variance_ratio = eigenvalues[:self.n_components] / total_var
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        """Transform embeddings to lower dimension."""
        
        centered = embeddings - self.mean
        return np.dot(centered, self.components.T)
    
    def inverse_transform(self, reduced: np.ndarray) -> np.ndarray:
        """Reconstruct original embeddings."""
        
        return np.dot(reduced, self.components) + self.mean
    
    def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
        """Fit and transform embeddings."""
        
        self.fit(embeddings)
        reduced = self.transform(embeddings)
        
        # Calculate reconstruction error
        reconstructed = self.inverse_transform(reduced)
        error = np.mean((embeddings - reconstructed) ** 2)
        
        return ReductionResult(
            reduced_embeddings=reduced,
            original_dim=embeddings.shape[1],
            reduced_dim=self.n_components,
            variance_retained=float(self.explained_variance_ratio.sum()),
            reconstruction_error=float(error)
        )

class IncrementalPCAReducer:
    """Incremental PCA for large datasets."""
    
    def __init__(self, n_components: int = 256, batch_size: int = 1000):
        self.n_components = n_components
        self.batch_size = batch_size
        self.pca = None
    
    def fit(self, embeddings: np.ndarray):
        """Fit incrementally on batches."""
        
        from sklearn.decomposition import IncrementalPCA
        
        self.pca = IncrementalPCA(n_components=self.n_components)
        
        n_samples = embeddings.shape[0]
        
        for i in range(0, n_samples, self.batch_size):
            batch = embeddings[i:i + self.batch_size]
            self.pca.partial_fit(batch)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        """Transform embeddings."""
        
        return self.pca.transform(embeddings)
    
    def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
        """Fit and transform."""
        
        self.fit(embeddings)
        reduced = self.transform(embeddings)
        
        return ReductionResult(
            reduced_embeddings=reduced,
            original_dim=embeddings.shape[1],
            reduced_dim=self.n_components,
            variance_retained=float(self.pca.explained_variance_ratio_.sum())
        )

class WhiteningPCAReducer:
    """PCA with whitening for normalized embeddings."""
    
    def __init__(self, n_components: int = 256):
        self.n_components = n_components
        self.mean = None
        self.components = None
        self.scale = None
    
    def fit(self, embeddings: np.ndarray):
        """Fit PCA with whitening."""
        
        from sklearn.decomposition import PCA
        
        pca = PCA(n_components=self.n_components, whiten=True)
        pca.fit(embeddings)
        
        self.mean = pca.mean_
        self.components = pca.components_
        self.scale = np.sqrt(pca.explained_variance_)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        """Transform with whitening."""
        
        centered = embeddings - self.mean
        projected = np.dot(centered, self.components.T)
        whitened = projected / self.scale
        
        return whitened
    
    def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
        """Fit and transform with whitening."""
        
        self.fit(embeddings)
        reduced = self.transform(embeddings)
        
        return ReductionResult(
            reduced_embeddings=reduced,
            original_dim=embeddings.shape[1],
            reduced_dim=self.n_components
        )

Matryoshka Embeddings

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

class MatryoshkaTruncator:
    """Truncate Matryoshka embeddings to desired dimension."""
    
    def __init__(self, target_dim: int = 256):
        self.target_dim = target_dim
    
    def truncate(self, embeddings: np.ndarray) -> np.ndarray:
        """Simply truncate to first N dimensions."""
        
        if embeddings.shape[1] <= self.target_dim:
            return embeddings
        
        return embeddings[:, :self.target_dim]
    
    def truncate_and_normalize(self, embeddings: np.ndarray) -> np.ndarray:
        """Truncate and re-normalize."""
        
        truncated = self.truncate(embeddings)
        
        # L2 normalize
        norms = np.linalg.norm(truncated, axis=1, keepdims=True)
        normalized = truncated / (norms + 1e-8)
        
        return normalized

class AdaptiveMatryoshka:
    """Adaptively choose dimension based on task."""
    
    def __init__(
        self,
        dimensions: list[int] = [64, 128, 256, 512, 1024]
    ):
        self.dimensions = sorted(dimensions)
    
    def select_dimension(
        self,
        embeddings: np.ndarray,
        accuracy_threshold: float = 0.95,
        ground_truth_pairs: list[tuple[int, int]] = None
    ) -> int:
        """Select smallest dimension meeting accuracy threshold."""
        
        if ground_truth_pairs is None:
            # Default to middle dimension
            return self.dimensions[len(self.dimensions) // 2]
        
        full_dim = embeddings.shape[1]
        
        # Calculate accuracy at full dimension
        full_accuracy = self._calculate_accuracy(
            embeddings, ground_truth_pairs
        )
        
        # Find smallest dimension meeting threshold
        for dim in self.dimensions:
            if dim >= full_dim:
                return full_dim
            
            truncated = embeddings[:, :dim]
            truncated = truncated / np.linalg.norm(truncated, axis=1, keepdims=True)
            
            accuracy = self._calculate_accuracy(truncated, ground_truth_pairs)
            relative_accuracy = accuracy / full_accuracy
            
            if relative_accuracy >= accuracy_threshold:
                return dim
        
        return self.dimensions[-1]
    
    def _calculate_accuracy(
        self,
        embeddings: np.ndarray,
        pairs: list[tuple[int, int]]
    ) -> float:
        """Calculate retrieval accuracy on pairs."""
        
        correct = 0
        
        for query_idx, target_idx in pairs:
            query = embeddings[query_idx]
            
            # Find nearest neighbor
            similarities = np.dot(embeddings, query)
            similarities[query_idx] = -np.inf  # Exclude self
            
            nearest = np.argmax(similarities)
            
            if nearest == target_idx:
                correct += 1
        
        return correct / len(pairs)

class MatryoshkaTrainer:
    """Train embeddings with Matryoshka loss."""
    
    def __init__(
        self,
        base_model: Any,
        dimensions: list[int] = [64, 128, 256, 512, 1024]
    ):
        self.base_model = base_model
        self.dimensions = sorted(dimensions)
    
    def compute_loss(
        self,
        embeddings: np.ndarray,
        labels: np.ndarray
    ) -> float:
        """Compute Matryoshka loss across dimensions."""
        
        total_loss = 0.0
        
        for dim in self.dimensions:
            truncated = embeddings[:, :dim]
            
            # Normalize
            truncated = truncated / np.linalg.norm(truncated, axis=1, keepdims=True)
            
            # Contrastive loss at this dimension
            loss = self._contrastive_loss(truncated, labels)
            total_loss += loss
        
        return total_loss / len(self.dimensions)
    
    def _contrastive_loss(
        self,
        embeddings: np.ndarray,
        labels: np.ndarray,
        temperature: float = 0.05
    ) -> float:
        """InfoNCE contrastive loss."""
        
        # Similarity matrix
        similarity = np.dot(embeddings, embeddings.T) / temperature
        
        # Create label mask
        labels = labels.reshape(-1, 1)
        mask = (labels == labels.T).astype(float)
        
        # Remove diagonal
        np.fill_diagonal(mask, 0)
        
        # Softmax
        exp_sim = np.exp(similarity - similarity.max(axis=1, keepdims=True))
        np.fill_diagonal(exp_sim, 0)
        
        # Loss
        pos_sim = (exp_sim * mask).sum(axis=1)
        all_sim = exp_sim.sum(axis=1)
        
        loss = -np.log(pos_sim / (all_sim + 1e-8) + 1e-8).mean()
        
        return float(loss)

Autoencoder Compression

import torch
import torch.nn as nn
from dataclasses import dataclass
from typing import Any, Optional

class EmbeddingAutoencoder(nn.Module):
    """Autoencoder for embedding compression."""
    
    def __init__(
        self,
        input_dim: int,
        latent_dim: int,
        hidden_dims: list[int] = None
    ):
        super().__init__()
        
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        
        hidden_dims = hidden_dims or [512, 256]
        
        # Encoder
        encoder_layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            encoder_layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.LayerNorm(hidden_dim),
                nn.GELU(),
            ])
            prev_dim = hidden_dim
        
        encoder_layers.append(nn.Linear(prev_dim, latent_dim))
        self.encoder = nn.Sequential(*encoder_layers)
        
        # Decoder
        decoder_layers = []
        prev_dim = latent_dim
        
        for hidden_dim in reversed(hidden_dims):
            decoder_layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.LayerNorm(hidden_dim),
                nn.GELU(),
            ])
            prev_dim = hidden_dim
        
        decoder_layers.append(nn.Linear(prev_dim, input_dim))
        self.decoder = nn.Sequential(*decoder_layers)
    
    def encode(self, x: torch.Tensor) -> torch.Tensor:
        """Encode to latent space."""
        return self.encoder(x)
    
    def decode(self, z: torch.Tensor) -> torch.Tensor:
        """Decode from latent space."""
        return self.decoder(z)
    
    def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        """Forward pass returning latent and reconstruction."""
        z = self.encode(x)
        x_recon = self.decode(z)
        return z, x_recon

class VariationalEmbeddingAutoencoder(nn.Module):
    """VAE for embedding compression with regularization."""
    
    def __init__(
        self,
        input_dim: int,
        latent_dim: int,
        hidden_dim: int = 512
    ):
        super().__init__()
        
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
        )
        
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc_var = nn.Linear(hidden_dim, latent_dim)
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, input_dim),
        )
    
    def encode(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        """Encode to mean and variance."""
        h = self.encoder(x)
        return self.fc_mu(h), self.fc_var(h)
    
    def reparameterize(
        self,
        mu: torch.Tensor,
        log_var: torch.Tensor
    ) -> torch.Tensor:
        """Reparameterization trick."""
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z: torch.Tensor) -> torch.Tensor:
        """Decode from latent space."""
        return self.decoder(z)
    
    def forward(
        self,
        x: torch.Tensor
    ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        """Forward pass."""
        mu, log_var = self.encode(x)
        z = self.reparameterize(mu, log_var)
        x_recon = self.decode(z)
        return z, x_recon, mu, log_var

class AutoencoderTrainer:
    """Train autoencoder for embedding compression."""
    
    def __init__(
        self,
        model: nn.Module,
        learning_rate: float = 1e-4,
        similarity_weight: float = 0.1
    ):
        self.model = model
        self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
        self.similarity_weight = similarity_weight
    
    def train_step(
        self,
        embeddings: torch.Tensor
    ) -> dict[str, float]:
        """Single training step."""
        
        self.model.train()
        self.optimizer.zero_grad()
        
        # Forward pass
        if isinstance(self.model, VariationalEmbeddingAutoencoder):
            z, x_recon, mu, log_var = self.model(embeddings)
            
            # Reconstruction loss
            recon_loss = nn.functional.mse_loss(x_recon, embeddings)
            
            # KL divergence
            kl_loss = -0.5 * torch.mean(1 + log_var - mu.pow(2) - log_var.exp())
            
            loss = recon_loss + 0.001 * kl_loss
        else:
            z, x_recon = self.model(embeddings)
            
            # Reconstruction loss
            recon_loss = nn.functional.mse_loss(x_recon, embeddings)
            
            # Similarity preservation loss
            orig_sim = torch.mm(embeddings, embeddings.t())
            latent_sim = torch.mm(z, z.t())
            sim_loss = nn.functional.mse_loss(latent_sim, orig_sim)
            
            loss = recon_loss + self.similarity_weight * sim_loss
            kl_loss = torch.tensor(0.0)
        
        loss.backward()
        self.optimizer.step()
        
        return {
            "loss": loss.item(),
            "recon_loss": recon_loss.item(),
            "kl_loss": kl_loss.item() if isinstance(kl_loss, torch.Tensor) else 0.0
        }
    
    def train(
        self,
        embeddings: np.ndarray,
        epochs: int = 100,
        batch_size: int = 256
    ) -> list[dict]:
        """Train autoencoder."""
        
        import numpy as np
        
        dataset = torch.tensor(embeddings, dtype=torch.float32)
        
        history = []
        
        for epoch in range(epochs):
            # Shuffle
            indices = torch.randperm(len(dataset))
            
            epoch_losses = []
            
            for i in range(0, len(dataset), batch_size):
                batch_indices = indices[i:i + batch_size]
                batch = dataset[batch_indices]
                
                losses = self.train_step(batch)
                epoch_losses.append(losses)
            
            # Average losses
            avg_losses = {
                k: np.mean([l[k] for l in epoch_losses])
                for k in epoch_losses[0].keys()
            }
            
            history.append(avg_losses)
        
        return history

Random Projection

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

class GaussianRandomProjection:
    """Random projection using Gaussian matrix."""
    
    def __init__(self, n_components: int = 256, random_state: int = 42):
        self.n_components = n_components
        self.random_state = random_state
        self.projection_matrix = None
    
    def fit(self, embeddings: np.ndarray):
        """Generate random projection matrix."""
        
        np.random.seed(self.random_state)
        
        input_dim = embeddings.shape[1]
        
        # Gaussian random matrix
        self.projection_matrix = np.random.randn(
            input_dim, self.n_components
        ) / np.sqrt(self.n_components)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        """Project embeddings."""
        
        return np.dot(embeddings, self.projection_matrix)
    
    def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
        """Fit and transform."""
        
        self.fit(embeddings)
        reduced = self.transform(embeddings)
        
        return ReductionResult(
            reduced_embeddings=reduced,
            original_dim=embeddings.shape[1],
            reduced_dim=self.n_components
        )

class SparseRandomProjection:
    """Sparse random projection for efficiency."""
    
    def __init__(
        self,
        n_components: int = 256,
        density: float = 0.1,
        random_state: int = 42
    ):
        self.n_components = n_components
        self.density = density
        self.random_state = random_state
        self.projection_matrix = None
    
    def fit(self, embeddings: np.ndarray):
        """Generate sparse projection matrix."""
        
        np.random.seed(self.random_state)
        
        input_dim = embeddings.shape[1]
        
        # Sparse random matrix
        s = 1 / self.density
        
        # Values: +sqrt(s), 0, -sqrt(s) with probabilities 1/2s, 1-1/s, 1/2s
        matrix = np.zeros((input_dim, self.n_components))
        
        for i in range(input_dim):
            for j in range(self.n_components):
                r = np.random.random()
                if r < 1 / (2 * s):
                    matrix[i, j] = np.sqrt(s)
                elif r < 1 / s:
                    matrix[i, j] = -np.sqrt(s)
                # else: 0
        
        self.projection_matrix = matrix / np.sqrt(self.n_components)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        """Project embeddings."""
        
        return np.dot(embeddings, self.projection_matrix)
    
    def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
        """Fit and transform."""
        
        self.fit(embeddings)
        reduced = self.transform(embeddings)
        
        return ReductionResult(
            reduced_embeddings=reduced,
            original_dim=embeddings.shape[1],
            reduced_dim=self.n_components
        )

class JohnsonLindenstraussProjection:
    """Optimal random projection based on JL lemma."""
    
    def __init__(self, epsilon: float = 0.1, random_state: int = 42):
        self.epsilon = epsilon
        self.random_state = random_state
        self.projection_matrix = None
        self.n_components = None
    
    def _compute_optimal_dim(self, n_samples: int) -> int:
        """Compute optimal dimension from JL lemma."""
        
        # k >= 4 * ln(n) / (epsilon^2 / 2 - epsilon^3 / 3)
        import math
        
        denominator = (self.epsilon ** 2) / 2 - (self.epsilon ** 3) / 3
        k = int(4 * math.log(n_samples) / denominator) + 1
        
        return k
    
    def fit(self, embeddings: np.ndarray):
        """Generate optimal projection matrix."""
        
        np.random.seed(self.random_state)
        
        n_samples, input_dim = embeddings.shape
        self.n_components = self._compute_optimal_dim(n_samples)
        
        # Gaussian random matrix
        self.projection_matrix = np.random.randn(
            input_dim, self.n_components
        ) / np.sqrt(self.n_components)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        """Project embeddings."""
        
        return np.dot(embeddings, self.projection_matrix)
    
    def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
        """Fit and transform."""
        
        self.fit(embeddings)
        reduced = self.transform(embeddings)
        
        return ReductionResult(
            reduced_embeddings=reduced,
            original_dim=embeddings.shape[1],
            reduced_dim=self.n_components
        )

Evaluation and Selection

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

@dataclass
class ReductionEvaluation:
    """Evaluation of reduction method."""
    
    method: str
    original_dim: int
    reduced_dim: int
    compression_ratio: float
    recall_at_10: float
    mrr: float
    latency_speedup: float

class ReductionEvaluator:
    """Evaluate dimensionality reduction methods."""
    
    def __init__(
        self,
        queries: np.ndarray,
        documents: np.ndarray,
        ground_truth: list[list[int]]  # Relevant doc indices per query
    ):
        self.queries = queries
        self.documents = documents
        self.ground_truth = ground_truth
    
    def evaluate(
        self,
        reducer: Any,
        method_name: str
    ) -> ReductionEvaluation:
        """Evaluate a reduction method."""
        
        import time
        
        # Reduce embeddings
        reduced_docs = reducer.fit_transform(self.documents)
        reduced_queries = reducer.transform(self.queries)
        
        if isinstance(reduced_docs, ReductionResult):
            reduced_docs = reduced_docs.reduced_embeddings
        
        # Normalize
        reduced_docs = reduced_docs / np.linalg.norm(reduced_docs, axis=1, keepdims=True)
        reduced_queries = reduced_queries / np.linalg.norm(reduced_queries, axis=1, keepdims=True)
        
        # Calculate metrics
        recall = self._calculate_recall(reduced_queries, reduced_docs, k=10)
        mrr = self._calculate_mrr(reduced_queries, reduced_docs)
        
        # Latency comparison
        original_latency = self._measure_latency(self.queries, self.documents)
        reduced_latency = self._measure_latency(reduced_queries, reduced_docs)
        speedup = original_latency / reduced_latency
        
        return ReductionEvaluation(
            method=method_name,
            original_dim=self.documents.shape[1],
            reduced_dim=reduced_docs.shape[1],
            compression_ratio=self.documents.shape[1] / reduced_docs.shape[1],
            recall_at_10=recall,
            mrr=mrr,
            latency_speedup=speedup
        )
    
    def _calculate_recall(
        self,
        queries: np.ndarray,
        documents: np.ndarray,
        k: int = 10
    ) -> float:
        """Calculate recall@k."""
        
        recalls = []
        
        for i, query in enumerate(queries):
            # Get similarities
            similarities = np.dot(documents, query)
            
            # Get top k
            top_k = np.argsort(similarities)[-k:][::-1]
            
            # Calculate recall
            relevant = set(self.ground_truth[i])
            retrieved = set(top_k)
            
            if relevant:
                recall = len(relevant & retrieved) / len(relevant)
                recalls.append(recall)
        
        return np.mean(recalls)
    
    def _calculate_mrr(
        self,
        queries: np.ndarray,
        documents: np.ndarray
    ) -> float:
        """Calculate Mean Reciprocal Rank."""
        
        mrrs = []
        
        for i, query in enumerate(queries):
            similarities = np.dot(documents, query)
            ranking = np.argsort(similarities)[::-1]
            
            # Find rank of first relevant document
            for rank, doc_idx in enumerate(ranking, 1):
                if doc_idx in self.ground_truth[i]:
                    mrrs.append(1 / rank)
                    break
            else:
                mrrs.append(0)
        
        return np.mean(mrrs)
    
    def _measure_latency(
        self,
        queries: np.ndarray,
        documents: np.ndarray,
        n_runs: int = 100
    ) -> float:
        """Measure search latency."""
        
        import time
        
        times = []
        
        for _ in range(n_runs):
            query = queries[np.random.randint(len(queries))]
            
            start = time.time()
            _ = np.dot(documents, query)
            times.append(time.time() - start)
        
        return np.mean(times)

class DimensionSelector:
    """Select optimal dimension for target metrics."""
    
    def __init__(
        self,
        evaluator: ReductionEvaluator,
        reducer_class: type
    ):
        self.evaluator = evaluator
        self.reducer_class = reducer_class
    
    def find_optimal_dimension(
        self,
        dimensions: list[int],
        min_recall: float = 0.95
    ) -> int:
        """Find smallest dimension meeting recall threshold."""
        
        for dim in sorted(dimensions):
            reducer = self.reducer_class(n_components=dim)
            eval_result = self.evaluator.evaluate(reducer, f"dim_{dim}")
            
            if eval_result.recall_at_10 >= min_recall:
                return dim
        
        return dimensions[-1]
    
    def pareto_frontier(
        self,
        dimensions: list[int]
    ) -> list[ReductionEvaluation]:
        """Find Pareto-optimal dimension/recall tradeoffs."""
        
        results = []
        
        for dim in dimensions:
            reducer = self.reducer_class(n_components=dim)
            eval_result = self.evaluator.evaluate(reducer, f"dim_{dim}")
            results.append(eval_result)
        
        # Find Pareto frontier
        pareto = []
        
        for result in results:
            is_dominated = False
            
            for other in results:
                if (other.compression_ratio > result.compression_ratio and
                    other.recall_at_10 >= result.recall_at_10):
                    is_dominated = True
                    break
            
            if not is_dominated:
                pareto.append(result)
        
        return pareto

Production Reduction Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import numpy as np

app = FastAPI()

class ReduceRequest(BaseModel):
    embeddings: list[list[float]]
    target_dim: int = 256
    method: str = "pca"  # pca, matryoshka, random

class FitRequest(BaseModel):
    embeddings: list[list[float]]
    target_dim: int = 256
    method: str = "pca"

class TransformRequest(BaseModel):
    embeddings: list[list[float]]

# Store fitted reducers
reducers = {}

@app.post("/v1/reduce")
async def reduce_embeddings(request: ReduceRequest) -> dict:
    """Reduce embedding dimensions."""
    
    embeddings = np.array(request.embeddings, dtype=np.float32)
    
    if request.method == "pca":
        reducer = PCAReducer(n_components=request.target_dim)
    elif request.method == "matryoshka":
        reducer = MatryoshkaTruncator(target_dim=request.target_dim)
        reduced = reducer.truncate_and_normalize(embeddings)
        
        return {
            "reduced_embeddings": reduced.tolist(),
            "original_dim": embeddings.shape[1],
            "reduced_dim": request.target_dim
        }
    elif request.method == "random":
        reducer = GaussianRandomProjection(n_components=request.target_dim)
    else:
        raise HTTPException(status_code=400, detail=f"Unknown method: {request.method}")
    
    result = reducer.fit_transform(embeddings)
    
    return {
        "reduced_embeddings": result.reduced_embeddings.tolist(),
        "original_dim": result.original_dim,
        "reduced_dim": result.reduced_dim,
        "variance_retained": result.variance_retained
    }

@app.post("/v1/fit")
async def fit_reducer(request: FitRequest) -> dict:
    """Fit a reducer on training data."""
    
    embeddings = np.array(request.embeddings, dtype=np.float32)
    
    if request.method == "pca":
        reducer = PCAReducer(n_components=request.target_dim)
    elif request.method == "random":
        reducer = GaussianRandomProjection(n_components=request.target_dim)
    else:
        raise HTTPException(status_code=400, detail=f"Unknown method: {request.method}")
    
    reducer.fit(embeddings)
    
    # Store reducer
    reducer_id = f"{request.method}_{request.target_dim}"
    reducers[reducer_id] = reducer
    
    return {
        "reducer_id": reducer_id,
        "method": request.method,
        "target_dim": request.target_dim
    }

@app.post("/v1/transform/{reducer_id}")
async def transform_embeddings(reducer_id: str, request: TransformRequest) -> dict:
    """Transform embeddings using fitted reducer."""
    
    if reducer_id not in reducers:
        raise HTTPException(status_code=404, detail="Reducer not found")
    
    reducer = reducers[reducer_id]
    embeddings = np.array(request.embeddings, dtype=np.float32)
    
    reduced = reducer.transform(embeddings)
    
    return {
        "reduced_embeddings": reduced.tolist()
    }

@app.get("/v1/reducers")
async def list_reducers() -> dict:
    """List available reducers."""
    
    return {
        "reducers": list(reducers.keys())
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Embedding dimensionality reduction is a powerful tool for optimizing vector search systems. For Matryoshka-trained models like OpenAI’s text-embedding-3 series, simple truncation is remarkably effective—you can often reduce from 3072 to 256 dimensions with minimal quality loss. For other embeddings, PCA provides a good balance of simplicity and effectiveness, especially when you have representative training data. Random projection is useful when you need a fast, data-independent method with theoretical guarantees. Autoencoders can learn more sophisticated compressions but require training and may not generalize well. Always evaluate on your specific task—the right dimension depends on your data distribution, query patterns, and quality requirements. Start with aggressive reduction (4-8x) and increase dimensions only if metrics suffer. Remember that reduced embeddings also benefit from faster similarity computation and lower memory usage, so the effective speedup is often greater than the compression ratio alone. Monitor your retrieval quality in production and adjust dimensions as your data evolves.