Introduction: High-dimensional embeddings from models like OpenAI’s text-embedding-3-large (3072 dimensions) or Cohere’s embed-v3 (1024 dimensions) deliver excellent semantic understanding but come with costs: more storage, slower similarity computations, and higher memory usage. For many applications, you can reduce dimensions significantly while preserving most of the semantic information. This guide covers practical dimensionality reduction techniques: PCA for linear projections, Matryoshka embeddings that are designed for truncation, autoencoders for learned compression, and quantization-aware approaches. Whether you’re optimizing for cost, latency, or scale, these techniques will help you find the right balance between embedding quality and efficiency.

PCA-Based Reduction
from dataclasses import dataclass, field
from typing import Any, Optional
import numpy as np
@dataclass
class ReductionResult:
"""Dimensionality reduction result."""
reduced_embeddings: np.ndarray
original_dim: int
reduced_dim: int
variance_retained: float = 0.0
reconstruction_error: float = 0.0
class PCAReducer:
"""PCA-based dimensionality reduction."""
def __init__(self, n_components: int = 256):
self.n_components = n_components
self.mean = None
self.components = None
self.explained_variance_ratio = None
def fit(self, embeddings: np.ndarray):
"""Fit PCA on embeddings."""
# Center data
self.mean = embeddings.mean(axis=0)
centered = embeddings - self.mean
# Compute covariance matrix
cov = np.cov(centered.T)
# Eigendecomposition
eigenvalues, eigenvectors = np.linalg.eigh(cov)
# Sort by eigenvalue (descending)
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:, idx]
# Keep top components
self.components = eigenvectors[:, :self.n_components].T
# Explained variance
total_var = eigenvalues.sum()
self.explained_variance_ratio = eigenvalues[:self.n_components] / total_var
def transform(self, embeddings: np.ndarray) -> np.ndarray:
"""Transform embeddings to lower dimension."""
centered = embeddings - self.mean
return np.dot(centered, self.components.T)
def inverse_transform(self, reduced: np.ndarray) -> np.ndarray:
"""Reconstruct original embeddings."""
return np.dot(reduced, self.components) + self.mean
def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
"""Fit and transform embeddings."""
self.fit(embeddings)
reduced = self.transform(embeddings)
# Calculate reconstruction error
reconstructed = self.inverse_transform(reduced)
error = np.mean((embeddings - reconstructed) ** 2)
return ReductionResult(
reduced_embeddings=reduced,
original_dim=embeddings.shape[1],
reduced_dim=self.n_components,
variance_retained=float(self.explained_variance_ratio.sum()),
reconstruction_error=float(error)
)
class IncrementalPCAReducer:
"""Incremental PCA for large datasets."""
def __init__(self, n_components: int = 256, batch_size: int = 1000):
self.n_components = n_components
self.batch_size = batch_size
self.pca = None
def fit(self, embeddings: np.ndarray):
"""Fit incrementally on batches."""
from sklearn.decomposition import IncrementalPCA
self.pca = IncrementalPCA(n_components=self.n_components)
n_samples = embeddings.shape[0]
for i in range(0, n_samples, self.batch_size):
batch = embeddings[i:i + self.batch_size]
self.pca.partial_fit(batch)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
"""Transform embeddings."""
return self.pca.transform(embeddings)
def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
"""Fit and transform."""
self.fit(embeddings)
reduced = self.transform(embeddings)
return ReductionResult(
reduced_embeddings=reduced,
original_dim=embeddings.shape[1],
reduced_dim=self.n_components,
variance_retained=float(self.pca.explained_variance_ratio_.sum())
)
class WhiteningPCAReducer:
"""PCA with whitening for normalized embeddings."""
def __init__(self, n_components: int = 256):
self.n_components = n_components
self.mean = None
self.components = None
self.scale = None
def fit(self, embeddings: np.ndarray):
"""Fit PCA with whitening."""
from sklearn.decomposition import PCA
pca = PCA(n_components=self.n_components, whiten=True)
pca.fit(embeddings)
self.mean = pca.mean_
self.components = pca.components_
self.scale = np.sqrt(pca.explained_variance_)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
"""Transform with whitening."""
centered = embeddings - self.mean
projected = np.dot(centered, self.components.T)
whitened = projected / self.scale
return whitened
def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
"""Fit and transform with whitening."""
self.fit(embeddings)
reduced = self.transform(embeddings)
return ReductionResult(
reduced_embeddings=reduced,
original_dim=embeddings.shape[1],
reduced_dim=self.n_components
)
Matryoshka Embeddings
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
class MatryoshkaTruncator:
"""Truncate Matryoshka embeddings to desired dimension."""
def __init__(self, target_dim: int = 256):
self.target_dim = target_dim
def truncate(self, embeddings: np.ndarray) -> np.ndarray:
"""Simply truncate to first N dimensions."""
if embeddings.shape[1] <= self.target_dim:
return embeddings
return embeddings[:, :self.target_dim]
def truncate_and_normalize(self, embeddings: np.ndarray) -> np.ndarray:
"""Truncate and re-normalize."""
truncated = self.truncate(embeddings)
# L2 normalize
norms = np.linalg.norm(truncated, axis=1, keepdims=True)
normalized = truncated / (norms + 1e-8)
return normalized
class AdaptiveMatryoshka:
"""Adaptively choose dimension based on task."""
def __init__(
self,
dimensions: list[int] = [64, 128, 256, 512, 1024]
):
self.dimensions = sorted(dimensions)
def select_dimension(
self,
embeddings: np.ndarray,
accuracy_threshold: float = 0.95,
ground_truth_pairs: list[tuple[int, int]] = None
) -> int:
"""Select smallest dimension meeting accuracy threshold."""
if ground_truth_pairs is None:
# Default to middle dimension
return self.dimensions[len(self.dimensions) // 2]
full_dim = embeddings.shape[1]
# Calculate accuracy at full dimension
full_accuracy = self._calculate_accuracy(
embeddings, ground_truth_pairs
)
# Find smallest dimension meeting threshold
for dim in self.dimensions:
if dim >= full_dim:
return full_dim
truncated = embeddings[:, :dim]
truncated = truncated / np.linalg.norm(truncated, axis=1, keepdims=True)
accuracy = self._calculate_accuracy(truncated, ground_truth_pairs)
relative_accuracy = accuracy / full_accuracy
if relative_accuracy >= accuracy_threshold:
return dim
return self.dimensions[-1]
def _calculate_accuracy(
self,
embeddings: np.ndarray,
pairs: list[tuple[int, int]]
) -> float:
"""Calculate retrieval accuracy on pairs."""
correct = 0
for query_idx, target_idx in pairs:
query = embeddings[query_idx]
# Find nearest neighbor
similarities = np.dot(embeddings, query)
similarities[query_idx] = -np.inf # Exclude self
nearest = np.argmax(similarities)
if nearest == target_idx:
correct += 1
return correct / len(pairs)
class MatryoshkaTrainer:
"""Train embeddings with Matryoshka loss."""
def __init__(
self,
base_model: Any,
dimensions: list[int] = [64, 128, 256, 512, 1024]
):
self.base_model = base_model
self.dimensions = sorted(dimensions)
def compute_loss(
self,
embeddings: np.ndarray,
labels: np.ndarray
) -> float:
"""Compute Matryoshka loss across dimensions."""
total_loss = 0.0
for dim in self.dimensions:
truncated = embeddings[:, :dim]
# Normalize
truncated = truncated / np.linalg.norm(truncated, axis=1, keepdims=True)
# Contrastive loss at this dimension
loss = self._contrastive_loss(truncated, labels)
total_loss += loss
return total_loss / len(self.dimensions)
def _contrastive_loss(
self,
embeddings: np.ndarray,
labels: np.ndarray,
temperature: float = 0.05
) -> float:
"""InfoNCE contrastive loss."""
# Similarity matrix
similarity = np.dot(embeddings, embeddings.T) / temperature
# Create label mask
labels = labels.reshape(-1, 1)
mask = (labels == labels.T).astype(float)
# Remove diagonal
np.fill_diagonal(mask, 0)
# Softmax
exp_sim = np.exp(similarity - similarity.max(axis=1, keepdims=True))
np.fill_diagonal(exp_sim, 0)
# Loss
pos_sim = (exp_sim * mask).sum(axis=1)
all_sim = exp_sim.sum(axis=1)
loss = -np.log(pos_sim / (all_sim + 1e-8) + 1e-8).mean()
return float(loss)
Autoencoder Compression
import torch
import torch.nn as nn
from dataclasses import dataclass
from typing import Any, Optional
class EmbeddingAutoencoder(nn.Module):
"""Autoencoder for embedding compression."""
def __init__(
self,
input_dim: int,
latent_dim: int,
hidden_dims: list[int] = None
):
super().__init__()
self.input_dim = input_dim
self.latent_dim = latent_dim
hidden_dims = hidden_dims or [512, 256]
# Encoder
encoder_layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
encoder_layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
])
prev_dim = hidden_dim
encoder_layers.append(nn.Linear(prev_dim, latent_dim))
self.encoder = nn.Sequential(*encoder_layers)
# Decoder
decoder_layers = []
prev_dim = latent_dim
for hidden_dim in reversed(hidden_dims):
decoder_layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
])
prev_dim = hidden_dim
decoder_layers.append(nn.Linear(prev_dim, input_dim))
self.decoder = nn.Sequential(*decoder_layers)
def encode(self, x: torch.Tensor) -> torch.Tensor:
"""Encode to latent space."""
return self.encoder(x)
def decode(self, z: torch.Tensor) -> torch.Tensor:
"""Decode from latent space."""
return self.decoder(z)
def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
"""Forward pass returning latent and reconstruction."""
z = self.encode(x)
x_recon = self.decode(z)
return z, x_recon
class VariationalEmbeddingAutoencoder(nn.Module):
"""VAE for embedding compression with regularization."""
def __init__(
self,
input_dim: int,
latent_dim: int,
hidden_dim: int = 512
):
super().__init__()
self.input_dim = input_dim
self.latent_dim = latent_dim
# Encoder
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
nn.Linear(hidden_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
)
self.fc_mu = nn.Linear(hidden_dim, latent_dim)
self.fc_var = nn.Linear(hidden_dim, latent_dim)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
nn.Linear(hidden_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
nn.Linear(hidden_dim, input_dim),
)
def encode(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
"""Encode to mean and variance."""
h = self.encoder(x)
return self.fc_mu(h), self.fc_var(h)
def reparameterize(
self,
mu: torch.Tensor,
log_var: torch.Tensor
) -> torch.Tensor:
"""Reparameterization trick."""
std = torch.exp(0.5 * log_var)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z: torch.Tensor) -> torch.Tensor:
"""Decode from latent space."""
return self.decoder(z)
def forward(
self,
x: torch.Tensor
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
"""Forward pass."""
mu, log_var = self.encode(x)
z = self.reparameterize(mu, log_var)
x_recon = self.decode(z)
return z, x_recon, mu, log_var
class AutoencoderTrainer:
"""Train autoencoder for embedding compression."""
def __init__(
self,
model: nn.Module,
learning_rate: float = 1e-4,
similarity_weight: float = 0.1
):
self.model = model
self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
self.similarity_weight = similarity_weight
def train_step(
self,
embeddings: torch.Tensor
) -> dict[str, float]:
"""Single training step."""
self.model.train()
self.optimizer.zero_grad()
# Forward pass
if isinstance(self.model, VariationalEmbeddingAutoencoder):
z, x_recon, mu, log_var = self.model(embeddings)
# Reconstruction loss
recon_loss = nn.functional.mse_loss(x_recon, embeddings)
# KL divergence
kl_loss = -0.5 * torch.mean(1 + log_var - mu.pow(2) - log_var.exp())
loss = recon_loss + 0.001 * kl_loss
else:
z, x_recon = self.model(embeddings)
# Reconstruction loss
recon_loss = nn.functional.mse_loss(x_recon, embeddings)
# Similarity preservation loss
orig_sim = torch.mm(embeddings, embeddings.t())
latent_sim = torch.mm(z, z.t())
sim_loss = nn.functional.mse_loss(latent_sim, orig_sim)
loss = recon_loss + self.similarity_weight * sim_loss
kl_loss = torch.tensor(0.0)
loss.backward()
self.optimizer.step()
return {
"loss": loss.item(),
"recon_loss": recon_loss.item(),
"kl_loss": kl_loss.item() if isinstance(kl_loss, torch.Tensor) else 0.0
}
def train(
self,
embeddings: np.ndarray,
epochs: int = 100,
batch_size: int = 256
) -> list[dict]:
"""Train autoencoder."""
import numpy as np
dataset = torch.tensor(embeddings, dtype=torch.float32)
history = []
for epoch in range(epochs):
# Shuffle
indices = torch.randperm(len(dataset))
epoch_losses = []
for i in range(0, len(dataset), batch_size):
batch_indices = indices[i:i + batch_size]
batch = dataset[batch_indices]
losses = self.train_step(batch)
epoch_losses.append(losses)
# Average losses
avg_losses = {
k: np.mean([l[k] for l in epoch_losses])
for k in epoch_losses[0].keys()
}
history.append(avg_losses)
return history
Random Projection
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
class GaussianRandomProjection:
"""Random projection using Gaussian matrix."""
def __init__(self, n_components: int = 256, random_state: int = 42):
self.n_components = n_components
self.random_state = random_state
self.projection_matrix = None
def fit(self, embeddings: np.ndarray):
"""Generate random projection matrix."""
np.random.seed(self.random_state)
input_dim = embeddings.shape[1]
# Gaussian random matrix
self.projection_matrix = np.random.randn(
input_dim, self.n_components
) / np.sqrt(self.n_components)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
"""Project embeddings."""
return np.dot(embeddings, self.projection_matrix)
def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
"""Fit and transform."""
self.fit(embeddings)
reduced = self.transform(embeddings)
return ReductionResult(
reduced_embeddings=reduced,
original_dim=embeddings.shape[1],
reduced_dim=self.n_components
)
class SparseRandomProjection:
"""Sparse random projection for efficiency."""
def __init__(
self,
n_components: int = 256,
density: float = 0.1,
random_state: int = 42
):
self.n_components = n_components
self.density = density
self.random_state = random_state
self.projection_matrix = None
def fit(self, embeddings: np.ndarray):
"""Generate sparse projection matrix."""
np.random.seed(self.random_state)
input_dim = embeddings.shape[1]
# Sparse random matrix
s = 1 / self.density
# Values: +sqrt(s), 0, -sqrt(s) with probabilities 1/2s, 1-1/s, 1/2s
matrix = np.zeros((input_dim, self.n_components))
for i in range(input_dim):
for j in range(self.n_components):
r = np.random.random()
if r < 1 / (2 * s):
matrix[i, j] = np.sqrt(s)
elif r < 1 / s:
matrix[i, j] = -np.sqrt(s)
# else: 0
self.projection_matrix = matrix / np.sqrt(self.n_components)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
"""Project embeddings."""
return np.dot(embeddings, self.projection_matrix)
def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
"""Fit and transform."""
self.fit(embeddings)
reduced = self.transform(embeddings)
return ReductionResult(
reduced_embeddings=reduced,
original_dim=embeddings.shape[1],
reduced_dim=self.n_components
)
class JohnsonLindenstraussProjection:
"""Optimal random projection based on JL lemma."""
def __init__(self, epsilon: float = 0.1, random_state: int = 42):
self.epsilon = epsilon
self.random_state = random_state
self.projection_matrix = None
self.n_components = None
def _compute_optimal_dim(self, n_samples: int) -> int:
"""Compute optimal dimension from JL lemma."""
# k >= 4 * ln(n) / (epsilon^2 / 2 - epsilon^3 / 3)
import math
denominator = (self.epsilon ** 2) / 2 - (self.epsilon ** 3) / 3
k = int(4 * math.log(n_samples) / denominator) + 1
return k
def fit(self, embeddings: np.ndarray):
"""Generate optimal projection matrix."""
np.random.seed(self.random_state)
n_samples, input_dim = embeddings.shape
self.n_components = self._compute_optimal_dim(n_samples)
# Gaussian random matrix
self.projection_matrix = np.random.randn(
input_dim, self.n_components
) / np.sqrt(self.n_components)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
"""Project embeddings."""
return np.dot(embeddings, self.projection_matrix)
def fit_transform(self, embeddings: np.ndarray) -> ReductionResult:
"""Fit and transform."""
self.fit(embeddings)
reduced = self.transform(embeddings)
return ReductionResult(
reduced_embeddings=reduced,
original_dim=embeddings.shape[1],
reduced_dim=self.n_components
)
Evaluation and Selection
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
@dataclass
class ReductionEvaluation:
"""Evaluation of reduction method."""
method: str
original_dim: int
reduced_dim: int
compression_ratio: float
recall_at_10: float
mrr: float
latency_speedup: float
class ReductionEvaluator:
"""Evaluate dimensionality reduction methods."""
def __init__(
self,
queries: np.ndarray,
documents: np.ndarray,
ground_truth: list[list[int]] # Relevant doc indices per query
):
self.queries = queries
self.documents = documents
self.ground_truth = ground_truth
def evaluate(
self,
reducer: Any,
method_name: str
) -> ReductionEvaluation:
"""Evaluate a reduction method."""
import time
# Reduce embeddings
reduced_docs = reducer.fit_transform(self.documents)
reduced_queries = reducer.transform(self.queries)
if isinstance(reduced_docs, ReductionResult):
reduced_docs = reduced_docs.reduced_embeddings
# Normalize
reduced_docs = reduced_docs / np.linalg.norm(reduced_docs, axis=1, keepdims=True)
reduced_queries = reduced_queries / np.linalg.norm(reduced_queries, axis=1, keepdims=True)
# Calculate metrics
recall = self._calculate_recall(reduced_queries, reduced_docs, k=10)
mrr = self._calculate_mrr(reduced_queries, reduced_docs)
# Latency comparison
original_latency = self._measure_latency(self.queries, self.documents)
reduced_latency = self._measure_latency(reduced_queries, reduced_docs)
speedup = original_latency / reduced_latency
return ReductionEvaluation(
method=method_name,
original_dim=self.documents.shape[1],
reduced_dim=reduced_docs.shape[1],
compression_ratio=self.documents.shape[1] / reduced_docs.shape[1],
recall_at_10=recall,
mrr=mrr,
latency_speedup=speedup
)
def _calculate_recall(
self,
queries: np.ndarray,
documents: np.ndarray,
k: int = 10
) -> float:
"""Calculate recall@k."""
recalls = []
for i, query in enumerate(queries):
# Get similarities
similarities = np.dot(documents, query)
# Get top k
top_k = np.argsort(similarities)[-k:][::-1]
# Calculate recall
relevant = set(self.ground_truth[i])
retrieved = set(top_k)
if relevant:
recall = len(relevant & retrieved) / len(relevant)
recalls.append(recall)
return np.mean(recalls)
def _calculate_mrr(
self,
queries: np.ndarray,
documents: np.ndarray
) -> float:
"""Calculate Mean Reciprocal Rank."""
mrrs = []
for i, query in enumerate(queries):
similarities = np.dot(documents, query)
ranking = np.argsort(similarities)[::-1]
# Find rank of first relevant document
for rank, doc_idx in enumerate(ranking, 1):
if doc_idx in self.ground_truth[i]:
mrrs.append(1 / rank)
break
else:
mrrs.append(0)
return np.mean(mrrs)
def _measure_latency(
self,
queries: np.ndarray,
documents: np.ndarray,
n_runs: int = 100
) -> float:
"""Measure search latency."""
import time
times = []
for _ in range(n_runs):
query = queries[np.random.randint(len(queries))]
start = time.time()
_ = np.dot(documents, query)
times.append(time.time() - start)
return np.mean(times)
class DimensionSelector:
"""Select optimal dimension for target metrics."""
def __init__(
self,
evaluator: ReductionEvaluator,
reducer_class: type
):
self.evaluator = evaluator
self.reducer_class = reducer_class
def find_optimal_dimension(
self,
dimensions: list[int],
min_recall: float = 0.95
) -> int:
"""Find smallest dimension meeting recall threshold."""
for dim in sorted(dimensions):
reducer = self.reducer_class(n_components=dim)
eval_result = self.evaluator.evaluate(reducer, f"dim_{dim}")
if eval_result.recall_at_10 >= min_recall:
return dim
return dimensions[-1]
def pareto_frontier(
self,
dimensions: list[int]
) -> list[ReductionEvaluation]:
"""Find Pareto-optimal dimension/recall tradeoffs."""
results = []
for dim in dimensions:
reducer = self.reducer_class(n_components=dim)
eval_result = self.evaluator.evaluate(reducer, f"dim_{dim}")
results.append(eval_result)
# Find Pareto frontier
pareto = []
for result in results:
is_dominated = False
for other in results:
if (other.compression_ratio > result.compression_ratio and
other.recall_at_10 >= result.recall_at_10):
is_dominated = True
break
if not is_dominated:
pareto.append(result)
return pareto
Production Reduction Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import numpy as np
app = FastAPI()
class ReduceRequest(BaseModel):
embeddings: list[list[float]]
target_dim: int = 256
method: str = "pca" # pca, matryoshka, random
class FitRequest(BaseModel):
embeddings: list[list[float]]
target_dim: int = 256
method: str = "pca"
class TransformRequest(BaseModel):
embeddings: list[list[float]]
# Store fitted reducers
reducers = {}
@app.post("/v1/reduce")
async def reduce_embeddings(request: ReduceRequest) -> dict:
"""Reduce embedding dimensions."""
embeddings = np.array(request.embeddings, dtype=np.float32)
if request.method == "pca":
reducer = PCAReducer(n_components=request.target_dim)
elif request.method == "matryoshka":
reducer = MatryoshkaTruncator(target_dim=request.target_dim)
reduced = reducer.truncate_and_normalize(embeddings)
return {
"reduced_embeddings": reduced.tolist(),
"original_dim": embeddings.shape[1],
"reduced_dim": request.target_dim
}
elif request.method == "random":
reducer = GaussianRandomProjection(n_components=request.target_dim)
else:
raise HTTPException(status_code=400, detail=f"Unknown method: {request.method}")
result = reducer.fit_transform(embeddings)
return {
"reduced_embeddings": result.reduced_embeddings.tolist(),
"original_dim": result.original_dim,
"reduced_dim": result.reduced_dim,
"variance_retained": result.variance_retained
}
@app.post("/v1/fit")
async def fit_reducer(request: FitRequest) -> dict:
"""Fit a reducer on training data."""
embeddings = np.array(request.embeddings, dtype=np.float32)
if request.method == "pca":
reducer = PCAReducer(n_components=request.target_dim)
elif request.method == "random":
reducer = GaussianRandomProjection(n_components=request.target_dim)
else:
raise HTTPException(status_code=400, detail=f"Unknown method: {request.method}")
reducer.fit(embeddings)
# Store reducer
reducer_id = f"{request.method}_{request.target_dim}"
reducers[reducer_id] = reducer
return {
"reducer_id": reducer_id,
"method": request.method,
"target_dim": request.target_dim
}
@app.post("/v1/transform/{reducer_id}")
async def transform_embeddings(reducer_id: str, request: TransformRequest) -> dict:
"""Transform embeddings using fitted reducer."""
if reducer_id not in reducers:
raise HTTPException(status_code=404, detail="Reducer not found")
reducer = reducers[reducer_id]
embeddings = np.array(request.embeddings, dtype=np.float32)
reduced = reducer.transform(embeddings)
return {
"reduced_embeddings": reduced.tolist()
}
@app.get("/v1/reducers")
async def list_reducers() -> dict:
"""List available reducers."""
return {
"reducers": list(reducers.keys())
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- Matryoshka Embeddings: https://arxiv.org/abs/2205.13147
- Johnson-Lindenstrauss Lemma: https://en.wikipedia.org/wiki/Johnson-Lindenstrauss_lemma
- OpenAI Embeddings: https://platform.openai.com/docs/guides/embeddings
- Sentence Transformers: https://www.sbert.net/
Conclusion
Embedding dimensionality reduction is a powerful tool for optimizing vector search systems. For Matryoshka-trained models like OpenAI’s text-embedding-3 series, simple truncation is remarkably effective—you can often reduce from 3072 to 256 dimensions with minimal quality loss. For other embeddings, PCA provides a good balance of simplicity and effectiveness, especially when you have representative training data. Random projection is useful when you need a fast, data-independent method with theoretical guarantees. Autoencoders can learn more sophisticated compressions but require training and may not generalize well. Always evaluate on your specific task—the right dimension depends on your data distribution, query patterns, and quality requirements. Start with aggressive reduction (4-8x) and increase dimensions only if metrics suffer. Remember that reduced embeddings also benefit from faster similarity computation and lower memory usage, so the effective speedup is often greater than the compression ratio alone. Monitor your retrieval quality in production and adjust dimensions as your data evolves.
