Introduction: Understanding embedding spaces is crucial for building effective semantic search, RAG systems, and recommendation engines. Embeddings map text, images, or other data into high-dimensional vector spaces where similar items cluster together. But how do you know if your embeddings are working well? How do you debug retrieval failures or understand why certain queries return unexpected results? Embedding space analysis provides the tools to answer these questions—dimensionality reduction for visualization, clustering to discover natural groupings, distance metrics to measure similarity, and quality metrics to evaluate embedding models. This guide covers practical techniques for analyzing, visualizing, and debugging embedding spaces, helping you build intuition about how your embeddings behave and identify issues before they impact production systems.

Embedding Space Fundamentals
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict, Tuple
import numpy as np
from enum import Enum
@dataclass
class Embedding:
"""A single embedding with metadata."""
vector: np.ndarray
text: str
metadata: dict = field(default_factory=dict)
@property
def dimension(self) -> int:
return len(self.vector)
@property
def norm(self) -> float:
return np.linalg.norm(self.vector)
def normalize(self) -> 'Embedding':
"""Return normalized embedding."""
norm = self.norm
if norm == 0:
return self
return Embedding(
vector=self.vector / norm,
text=self.text,
metadata=self.metadata
)
class DistanceMetric(Enum):
"""Distance metrics for embeddings."""
COSINE = "cosine"
EUCLIDEAN = "euclidean"
DOT_PRODUCT = "dot_product"
MANHATTAN = "manhattan"
class EmbeddingSpace:
"""Manage and analyze embedding space."""
def __init__(self, dimension: int):
self.dimension = dimension
self.embeddings: list[Embedding] = []
def add(self, embedding: Embedding):
"""Add embedding to space."""
if embedding.dimension != self.dimension:
raise ValueError(f"Expected dimension {self.dimension}, got {embedding.dimension}")
self.embeddings.append(embedding)
def add_batch(self, embeddings: list[Embedding]):
"""Add multiple embeddings."""
for emb in embeddings:
self.add(emb)
def get_matrix(self) -> np.ndarray:
"""Get all embeddings as matrix."""
return np.array([e.vector for e in self.embeddings])
def distance(
self,
a: np.ndarray,
b: np.ndarray,
metric: DistanceMetric = DistanceMetric.COSINE
) -> float:
"""Calculate distance between embeddings."""
if metric == DistanceMetric.COSINE:
# Cosine distance = 1 - cosine similarity
dot = np.dot(a, b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 1.0
return 1 - (dot / (norm_a * norm_b))
elif metric == DistanceMetric.EUCLIDEAN:
return np.linalg.norm(a - b)
elif metric == DistanceMetric.DOT_PRODUCT:
# Negative dot product (higher = more similar)
return -np.dot(a, b)
elif metric == DistanceMetric.MANHATTAN:
return np.sum(np.abs(a - b))
raise ValueError(f"Unknown metric: {metric}")
def similarity(
self,
a: np.ndarray,
b: np.ndarray,
metric: DistanceMetric = DistanceMetric.COSINE
) -> float:
"""Calculate similarity between embeddings."""
if metric == DistanceMetric.COSINE:
dot = np.dot(a, b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
elif metric == DistanceMetric.DOT_PRODUCT:
return np.dot(a, b)
# For distance metrics, convert to similarity
dist = self.distance(a, b, metric)
return 1 / (1 + dist)
def nearest_neighbors(
self,
query: np.ndarray,
k: int = 10,
metric: DistanceMetric = DistanceMetric.COSINE
) -> list[tuple[Embedding, float]]:
"""Find k nearest neighbors."""
distances = []
for emb in self.embeddings:
dist = self.distance(query, emb.vector, metric)
distances.append((emb, dist))
distances.sort(key=lambda x: x[1])
return distances[:k]
def pairwise_distances(
self,
metric: DistanceMetric = DistanceMetric.COSINE
) -> np.ndarray:
"""Calculate pairwise distance matrix."""
n = len(self.embeddings)
distances = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
dist = self.distance(
self.embeddings[i].vector,
self.embeddings[j].vector,
metric
)
distances[i, j] = dist
distances[j, i] = dist
return distances
class EmbeddingStatistics:
"""Calculate statistics for embedding space."""
def __init__(self, space: EmbeddingSpace):
self.space = space
def basic_stats(self) -> dict:
"""Get basic statistics."""
matrix = self.space.get_matrix()
return {
"count": len(self.space.embeddings),
"dimension": self.space.dimension,
"mean_norm": np.mean([e.norm for e in self.space.embeddings]),
"std_norm": np.std([e.norm for e in self.space.embeddings]),
"min_norm": np.min([e.norm for e in self.space.embeddings]),
"max_norm": np.max([e.norm for e in self.space.embeddings]),
"mean_vector": np.mean(matrix, axis=0),
"std_per_dim": np.std(matrix, axis=0)
}
def coverage_stats(self) -> dict:
"""Analyze space coverage."""
matrix = self.space.get_matrix()
# Calculate spread in each dimension
dim_ranges = np.ptp(matrix, axis=0) # Peak to peak
# Calculate effective dimensionality using PCA
from sklearn.decomposition import PCA
pca = PCA()
pca.fit(matrix)
# Dimensions needed for 95% variance
cumsum = np.cumsum(pca.explained_variance_ratio_)
effective_dim = np.searchsorted(cumsum, 0.95) + 1
return {
"mean_dim_range": np.mean(dim_ranges),
"min_dim_range": np.min(dim_ranges),
"max_dim_range": np.max(dim_ranges),
"effective_dimensionality": effective_dim,
"variance_explained_by_10_dims": cumsum[9] if len(cumsum) > 9 else cumsum[-1],
"variance_explained_by_50_dims": cumsum[49] if len(cumsum) > 49 else cumsum[-1]
}
def density_stats(
self,
metric: DistanceMetric = DistanceMetric.COSINE
) -> dict:
"""Analyze embedding density."""
distances = self.space.pairwise_distances(metric)
# Get upper triangle (excluding diagonal)
upper = distances[np.triu_indices_from(distances, k=1)]
return {
"mean_distance": np.mean(upper),
"std_distance": np.std(upper),
"min_distance": np.min(upper),
"max_distance": np.max(upper),
"median_distance": np.median(upper),
"p10_distance": np.percentile(upper, 10),
"p90_distance": np.percentile(upper, 90)
}
Dimensionality Reduction
from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np
class DimensionalityReducer:
"""Base class for dimensionality reduction."""
def fit(self, embeddings: np.ndarray):
raise NotImplementedError
def transform(self, embeddings: np.ndarray) -> np.ndarray:
raise NotImplementedError
def fit_transform(self, embeddings: np.ndarray) -> np.ndarray:
self.fit(embeddings)
return self.transform(embeddings)
class PCAReducer(DimensionalityReducer):
"""PCA-based dimensionality reduction."""
def __init__(self, n_components: int = 2):
self.n_components = n_components
self.pca = None
def fit(self, embeddings: np.ndarray):
from sklearn.decomposition import PCA
self.pca = PCA(n_components=self.n_components)
self.pca.fit(embeddings)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
return self.pca.transform(embeddings)
def get_explained_variance(self) -> np.ndarray:
"""Get explained variance ratio."""
return self.pca.explained_variance_ratio_
def get_components(self) -> np.ndarray:
"""Get principal components."""
return self.pca.components_
class TSNEReducer(DimensionalityReducer):
"""t-SNE dimensionality reduction."""
def __init__(
self,
n_components: int = 2,
perplexity: float = 30.0,
learning_rate: float = 200.0,
n_iter: int = 1000
):
self.n_components = n_components
self.perplexity = perplexity
self.learning_rate = learning_rate
self.n_iter = n_iter
self.tsne = None
self._embedding = None
def fit(self, embeddings: np.ndarray):
from sklearn.manifold import TSNE
self.tsne = TSNE(
n_components=self.n_components,
perplexity=self.perplexity,
learning_rate=self.learning_rate,
n_iter=self.n_iter,
random_state=42
)
self._embedding = self.tsne.fit_transform(embeddings)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
# t-SNE doesn't support transform, return fitted embedding
return self._embedding
def fit_transform(self, embeddings: np.ndarray) -> np.ndarray:
self.fit(embeddings)
return self._embedding
class UMAPReducer(DimensionalityReducer):
"""UMAP dimensionality reduction."""
def __init__(
self,
n_components: int = 2,
n_neighbors: int = 15,
min_dist: float = 0.1,
metric: str = "cosine"
):
self.n_components = n_components
self.n_neighbors = n_neighbors
self.min_dist = min_dist
self.metric = metric
self.umap = None
def fit(self, embeddings: np.ndarray):
import umap
self.umap = umap.UMAP(
n_components=self.n_components,
n_neighbors=self.n_neighbors,
min_dist=self.min_dist,
metric=self.metric,
random_state=42
)
self.umap.fit(embeddings)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
return self.umap.transform(embeddings)
class HybridReducer(DimensionalityReducer):
"""Two-stage reduction: PCA then t-SNE/UMAP."""
def __init__(
self,
pca_components: int = 50,
final_components: int = 2,
method: str = "umap"
):
self.pca_components = pca_components
self.final_components = final_components
self.method = method
self.pca = PCAReducer(pca_components)
if method == "umap":
self.final = UMAPReducer(final_components)
else:
self.final = TSNEReducer(final_components)
def fit(self, embeddings: np.ndarray):
# First reduce with PCA
pca_result = self.pca.fit_transform(embeddings)
# Then apply final reduction
self.final.fit(pca_result)
def transform(self, embeddings: np.ndarray) -> np.ndarray:
pca_result = self.pca.transform(embeddings)
return self.final.transform(pca_result)
def fit_transform(self, embeddings: np.ndarray) -> np.ndarray:
pca_result = self.pca.fit_transform(embeddings)
return self.final.fit_transform(pca_result)
@dataclass
class ReductionResult:
"""Result of dimensionality reduction."""
reduced: np.ndarray
original_dim: int
reduced_dim: int
method: str
metadata: dict = field(default_factory=dict)
class ReductionPipeline:
"""Pipeline for dimensionality reduction."""
def __init__(self):
self.reducers: dict[str, DimensionalityReducer] = {}
def add_reducer(self, name: str, reducer: DimensionalityReducer):
"""Add a reducer."""
self.reducers[name] = reducer
def reduce(
self,
embeddings: np.ndarray,
method: str = "umap"
) -> ReductionResult:
"""Reduce embeddings."""
if method not in self.reducers:
raise ValueError(f"Unknown method: {method}")
reducer = self.reducers[method]
reduced = reducer.fit_transform(embeddings)
return ReductionResult(
reduced=reduced,
original_dim=embeddings.shape[1],
reduced_dim=reduced.shape[1],
method=method
)
def compare_methods(
self,
embeddings: np.ndarray
) -> dict[str, ReductionResult]:
"""Compare all reduction methods."""
results = {}
for name, reducer in self.reducers.items():
results[name] = self.reduce(embeddings, name)
return results
Clustering Analysis
from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np
@dataclass
class ClusterResult:
"""Result of clustering."""
labels: np.ndarray
n_clusters: int
centroids: np.ndarray = None
silhouette_score: float = 0.0
inertia: float = 0.0
class EmbeddingClusterer:
"""Cluster embeddings."""
def __init__(self, method: str = "kmeans"):
self.method = method
self.model = None
def cluster(
self,
embeddings: np.ndarray,
n_clusters: int = None
) -> ClusterResult:
"""Cluster embeddings."""
if self.method == "kmeans":
return self._kmeans(embeddings, n_clusters or 10)
elif self.method == "hdbscan":
return self._hdbscan(embeddings)
elif self.method == "agglomerative":
return self._agglomerative(embeddings, n_clusters or 10)
elif self.method == "dbscan":
return self._dbscan(embeddings)
raise ValueError(f"Unknown method: {self.method}")
def _kmeans(
self,
embeddings: np.ndarray,
n_clusters: int
) -> ClusterResult:
"""K-means clustering."""
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
model = KMeans(n_clusters=n_clusters, random_state=42)
labels = model.fit_predict(embeddings)
sil_score = silhouette_score(embeddings, labels) if n_clusters > 1 else 0
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
centroids=model.cluster_centers_,
silhouette_score=sil_score,
inertia=model.inertia_
)
def _hdbscan(self, embeddings: np.ndarray) -> ClusterResult:
"""HDBSCAN clustering."""
import hdbscan
from sklearn.metrics import silhouette_score
model = hdbscan.HDBSCAN(min_cluster_size=5, metric='euclidean')
labels = model.fit_predict(embeddings)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
# Calculate silhouette only for non-noise points
mask = labels != -1
if mask.sum() > 1 and n_clusters > 1:
sil_score = silhouette_score(embeddings[mask], labels[mask])
else:
sil_score = 0
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
silhouette_score=sil_score
)
def _agglomerative(
self,
embeddings: np.ndarray,
n_clusters: int
) -> ClusterResult:
"""Agglomerative clustering."""
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
model = AgglomerativeClustering(n_clusters=n_clusters)
labels = model.fit_predict(embeddings)
sil_score = silhouette_score(embeddings, labels) if n_clusters > 1 else 0
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
silhouette_score=sil_score
)
def _dbscan(self, embeddings: np.ndarray) -> ClusterResult:
"""DBSCAN clustering."""
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score
model = DBSCAN(eps=0.5, min_samples=5)
labels = model.fit_predict(embeddings)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
mask = labels != -1
if mask.sum() > 1 and n_clusters > 1:
sil_score = silhouette_score(embeddings[mask], labels[mask])
else:
sil_score = 0
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
silhouette_score=sil_score
)
class OptimalClusterFinder:
"""Find optimal number of clusters."""
def __init__(self, max_clusters: int = 20):
self.max_clusters = max_clusters
def find_optimal(
self,
embeddings: np.ndarray,
method: str = "silhouette"
) -> int:
"""Find optimal cluster count."""
if method == "silhouette":
return self._silhouette_method(embeddings)
elif method == "elbow":
return self._elbow_method(embeddings)
elif method == "gap":
return self._gap_statistic(embeddings)
raise ValueError(f"Unknown method: {method}")
def _silhouette_method(self, embeddings: np.ndarray) -> int:
"""Find optimal k using silhouette score."""
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
scores = []
for k in range(2, min(self.max_clusters + 1, len(embeddings))):
kmeans = KMeans(n_clusters=k, random_state=42)
labels = kmeans.fit_predict(embeddings)
score = silhouette_score(embeddings, labels)
scores.append((k, score))
# Return k with highest silhouette score
return max(scores, key=lambda x: x[1])[0]
def _elbow_method(self, embeddings: np.ndarray) -> int:
"""Find optimal k using elbow method."""
from sklearn.cluster import KMeans
inertias = []
for k in range(1, min(self.max_clusters + 1, len(embeddings))):
kmeans = KMeans(n_clusters=k, random_state=42)
kmeans.fit(embeddings)
inertias.append(kmeans.inertia_)
# Find elbow using second derivative
diffs = np.diff(inertias)
diffs2 = np.diff(diffs)
# Elbow is where second derivative is maximum
elbow = np.argmax(diffs2) + 2
return elbow
def _gap_statistic(self, embeddings: np.ndarray) -> int:
"""Find optimal k using gap statistic."""
from sklearn.cluster import KMeans
gaps = []
n_refs = 10
for k in range(1, min(self.max_clusters + 1, len(embeddings))):
# Cluster actual data
kmeans = KMeans(n_clusters=k, random_state=42)
kmeans.fit(embeddings)
actual_inertia = kmeans.inertia_
# Cluster random reference data
ref_inertias = []
for _ in range(n_refs):
random_data = np.random.uniform(
embeddings.min(axis=0),
embeddings.max(axis=0),
size=embeddings.shape
)
kmeans_ref = KMeans(n_clusters=k, random_state=42)
kmeans_ref.fit(random_data)
ref_inertias.append(kmeans_ref.inertia_)
gap = np.log(np.mean(ref_inertias)) - np.log(actual_inertia)
gaps.append(gap)
# Find first k where gap[k] >= gap[k+1] - std
for i in range(len(gaps) - 1):
if gaps[i] >= gaps[i + 1]:
return i + 1
return len(gaps)
class ClusterAnalyzer:
"""Analyze cluster quality and characteristics."""
def __init__(self, embeddings: np.ndarray, labels: np.ndarray):
self.embeddings = embeddings
self.labels = labels
self.n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
def get_cluster_stats(self) -> dict:
"""Get statistics for each cluster."""
stats = {}
for label in set(self.labels):
if label == -1:
continue
mask = self.labels == label
cluster_embeddings = self.embeddings[mask]
centroid = np.mean(cluster_embeddings, axis=0)
# Calculate distances to centroid
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
stats[label] = {
"size": mask.sum(),
"centroid": centroid,
"mean_distance_to_centroid": np.mean(distances),
"max_distance_to_centroid": np.max(distances),
"std_distance": np.std(distances)
}
return stats
def get_cluster_separation(self) -> np.ndarray:
"""Calculate separation between clusters."""
centroids = []
for label in range(self.n_clusters):
mask = self.labels == label
centroid = np.mean(self.embeddings[mask], axis=0)
centroids.append(centroid)
centroids = np.array(centroids)
# Pairwise distances between centroids
separation = np.zeros((self.n_clusters, self.n_clusters))
for i in range(self.n_clusters):
for j in range(i + 1, self.n_clusters):
dist = np.linalg.norm(centroids[i] - centroids[j])
separation[i, j] = dist
separation[j, i] = dist
return separation
def get_outliers(self, threshold: float = 2.0) -> list[int]:
"""Find outlier embeddings."""
outliers = []
for label in set(self.labels):
if label == -1:
continue
mask = self.labels == label
indices = np.where(mask)[0]
cluster_embeddings = self.embeddings[mask]
centroid = np.mean(cluster_embeddings, axis=0)
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
mean_dist = np.mean(distances)
std_dist = np.std(distances)
for i, dist in enumerate(distances):
if dist > mean_dist + threshold * std_dist:
outliers.append(indices[i])
return outliers
Visualization Tools
from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np
class EmbeddingVisualizer:
"""Visualize embedding spaces."""
def __init__(self):
self.reducer = None
def plot_2d(
self,
embeddings: np.ndarray,
labels: np.ndarray = None,
texts: list[str] = None,
title: str = "Embedding Space",
method: str = "umap"
):
"""Create 2D scatter plot."""
import matplotlib.pyplot as plt
# Reduce to 2D
if embeddings.shape[1] > 2:
if method == "umap":
reducer = UMAPReducer(n_components=2)
elif method == "tsne":
reducer = TSNEReducer(n_components=2)
else:
reducer = PCAReducer(n_components=2)
reduced = reducer.fit_transform(embeddings)
else:
reduced = embeddings
# Create plot
fig, ax = plt.subplots(figsize=(12, 8))
if labels is not None:
scatter = ax.scatter(
reduced[:, 0],
reduced[:, 1],
c=labels,
cmap='tab10',
alpha=0.7
)
plt.colorbar(scatter, label='Cluster')
else:
ax.scatter(reduced[:, 0], reduced[:, 1], alpha=0.7)
# Add text annotations for first few points
if texts is not None:
for i, txt in enumerate(texts[:20]):
ax.annotate(
txt[:30],
(reduced[i, 0], reduced[i, 1]),
fontsize=8,
alpha=0.7
)
ax.set_title(title)
ax.set_xlabel('Dimension 1')
ax.set_ylabel('Dimension 2')
return fig
def plot_3d(
self,
embeddings: np.ndarray,
labels: np.ndarray = None,
title: str = "Embedding Space 3D"
):
"""Create 3D scatter plot."""
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# Reduce to 3D
if embeddings.shape[1] > 3:
reducer = UMAPReducer(n_components=3)
reduced = reducer.fit_transform(embeddings)
else:
reduced = embeddings
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
if labels is not None:
scatter = ax.scatter(
reduced[:, 0],
reduced[:, 1],
reduced[:, 2],
c=labels,
cmap='tab10',
alpha=0.7
)
plt.colorbar(scatter, label='Cluster')
else:
ax.scatter(
reduced[:, 0],
reduced[:, 1],
reduced[:, 2],
alpha=0.7
)
ax.set_title(title)
ax.set_xlabel('Dimension 1')
ax.set_ylabel('Dimension 2')
ax.set_zlabel('Dimension 3')
return fig
def plot_similarity_heatmap(
self,
embeddings: np.ndarray,
texts: list[str] = None,
title: str = "Similarity Matrix"
):
"""Plot similarity heatmap."""
import matplotlib.pyplot as plt
import seaborn as sns
# Calculate cosine similarity
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
normalized = embeddings / norms
similarity = np.dot(normalized, normalized.T)
fig, ax = plt.subplots(figsize=(12, 10))
sns.heatmap(
similarity,
ax=ax,
cmap='RdYlBu_r',
vmin=-1,
vmax=1,
xticklabels=texts[:50] if texts else False,
yticklabels=texts[:50] if texts else False
)
ax.set_title(title)
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
return fig
def plot_dimension_distribution(
self,
embeddings: np.ndarray,
dims: list[int] = None,
title: str = "Dimension Distributions"
):
"""Plot distribution of values in each dimension."""
import matplotlib.pyplot as plt
dims = dims or list(range(min(10, embeddings.shape[1])))
fig, axes = plt.subplots(2, 5, figsize=(15, 6))
axes = axes.flatten()
for i, dim in enumerate(dims[:10]):
axes[i].hist(embeddings[:, dim], bins=50, alpha=0.7)
axes[i].set_title(f'Dimension {dim}')
axes[i].set_xlabel('Value')
axes[i].set_ylabel('Count')
plt.suptitle(title)
plt.tight_layout()
return fig
def plot_cluster_sizes(
self,
labels: np.ndarray,
title: str = "Cluster Sizes"
):
"""Plot cluster size distribution."""
import matplotlib.pyplot as plt
unique, counts = np.unique(labels, return_counts=True)
fig, ax = plt.subplots(figsize=(10, 6))
ax.bar(unique, counts, alpha=0.7)
ax.set_xlabel('Cluster')
ax.set_ylabel('Size')
ax.set_title(title)
return fig
class InteractiveVisualizer:
"""Interactive embedding visualization."""
def __init__(self):
self.data = None
def create_plotly_scatter(
self,
embeddings: np.ndarray,
labels: np.ndarray = None,
texts: list[str] = None,
title: str = "Embedding Space"
):
"""Create interactive Plotly scatter plot."""
import plotly.express as px
import pandas as pd
# Reduce to 2D
if embeddings.shape[1] > 2:
reducer = UMAPReducer(n_components=2)
reduced = reducer.fit_transform(embeddings)
else:
reduced = embeddings
df = pd.DataFrame({
'x': reduced[:, 0],
'y': reduced[:, 1],
'label': labels if labels is not None else 0,
'text': texts if texts else [f"Point {i}" for i in range(len(reduced))]
})
fig = px.scatter(
df,
x='x',
y='y',
color='label',
hover_data=['text'],
title=title
)
return fig
def create_plotly_3d(
self,
embeddings: np.ndarray,
labels: np.ndarray = None,
texts: list[str] = None,
title: str = "Embedding Space 3D"
):
"""Create interactive 3D Plotly scatter plot."""
import plotly.express as px
import pandas as pd
# Reduce to 3D
if embeddings.shape[1] > 3:
reducer = UMAPReducer(n_components=3)
reduced = reducer.fit_transform(embeddings)
else:
reduced = embeddings
df = pd.DataFrame({
'x': reduced[:, 0],
'y': reduced[:, 1],
'z': reduced[:, 2],
'label': labels if labels is not None else 0,
'text': texts if texts else [f"Point {i}" for i in range(len(reduced))]
})
fig = px.scatter_3d(
df,
x='x',
y='y',
z='z',
color='label',
hover_data=['text'],
title=title
)
return fig
Production Analysis Service
from fastapi import FastAPI, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import Optional, List
import numpy as np
import json
app = FastAPI()
class AnalyzeRequest(BaseModel):
embeddings: List[List[float]]
texts: Optional[List[str]] = None
method: str = "umap"
n_clusters: Optional[int] = None
class ClusterRequest(BaseModel):
embeddings: List[List[float]]
method: str = "kmeans"
n_clusters: Optional[int] = None
class SimilarityRequest(BaseModel):
query: List[float]
embeddings: List[List[float]]
k: int = 10
# Initialize components
space = EmbeddingSpace(dimension=1536) # Default OpenAI dimension
visualizer = EmbeddingVisualizer()
@app.post("/v1/analyze")
async def analyze_embeddings(request: AnalyzeRequest) -> dict:
"""Analyze embedding space."""
embeddings = np.array(request.embeddings)
# Create temporary space
temp_space = EmbeddingSpace(dimension=embeddings.shape[1])
for i, vec in enumerate(embeddings):
text = request.texts[i] if request.texts else f"embedding_{i}"
temp_space.add(Embedding(vector=np.array(vec), text=text))
# Get statistics
stats = EmbeddingStatistics(temp_space)
basic = stats.basic_stats()
coverage = stats.coverage_stats()
density = stats.density_stats()
return {
"count": basic["count"],
"dimension": basic["dimension"],
"mean_norm": float(basic["mean_norm"]),
"effective_dimensionality": coverage["effective_dimensionality"],
"mean_distance": float(density["mean_distance"]),
"median_distance": float(density["median_distance"])
}
@app.post("/v1/reduce")
async def reduce_dimensions(request: AnalyzeRequest) -> dict:
"""Reduce embedding dimensions."""
embeddings = np.array(request.embeddings)
if request.method == "pca":
reducer = PCAReducer(n_components=2)
elif request.method == "tsne":
reducer = TSNEReducer(n_components=2)
else:
reducer = UMAPReducer(n_components=2)
reduced = reducer.fit_transform(embeddings)
return {
"reduced": reduced.tolist(),
"method": request.method,
"original_dim": embeddings.shape[1],
"reduced_dim": 2
}
@app.post("/v1/cluster")
async def cluster_embeddings(request: ClusterRequest) -> dict:
"""Cluster embeddings."""
embeddings = np.array(request.embeddings)
clusterer = EmbeddingClusterer(method=request.method)
if request.n_clusters:
result = clusterer.cluster(embeddings, request.n_clusters)
else:
# Find optimal
finder = OptimalClusterFinder()
optimal_k = finder.find_optimal(embeddings)
result = clusterer.cluster(embeddings, optimal_k)
return {
"labels": result.labels.tolist(),
"n_clusters": result.n_clusters,
"silhouette_score": float(result.silhouette_score)
}
@app.post("/v1/similarity")
async def find_similar(request: SimilarityRequest) -> dict:
"""Find similar embeddings."""
query = np.array(request.query)
embeddings = np.array(request.embeddings)
# Calculate similarities
similarities = []
for i, emb in enumerate(embeddings):
sim = np.dot(query, emb) / (np.linalg.norm(query) * np.linalg.norm(emb))
similarities.append((i, float(sim)))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
return {
"results": [
{"index": idx, "similarity": sim}
for idx, sim in similarities[:request.k]
]
}
@app.post("/v1/outliers")
async def find_outliers(request: ClusterRequest) -> dict:
"""Find outlier embeddings."""
embeddings = np.array(request.embeddings)
# Cluster first
clusterer = EmbeddingClusterer(method="kmeans")
result = clusterer.cluster(embeddings, request.n_clusters or 5)
# Find outliers
analyzer = ClusterAnalyzer(embeddings, result.labels)
outliers = analyzer.get_outliers(threshold=2.0)
return {
"outlier_indices": outliers,
"outlier_count": len(outliers),
"total_count": len(embeddings)
}
@app.get("/v1/stats")
async def get_space_stats() -> dict:
"""Get current space statistics."""
if not space.embeddings:
return {"error": "No embeddings in space"}
stats = EmbeddingStatistics(space)
basic = stats.basic_stats()
return {
"count": basic["count"],
"dimension": basic["dimension"],
"mean_norm": float(basic["mean_norm"])
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- UMAP Documentation: https://umap-learn.readthedocs.io/
- t-SNE Paper: https://www.jmlr.org/papers/v9/vandermaaten08a.html
- scikit-learn Clustering: https://scikit-learn.org/stable/modules/clustering.html
- HDBSCAN: https://hdbscan.readthedocs.io/
- Plotly: https://plotly.com/python/
Conclusion
Embedding space analysis transforms opaque vector representations into actionable insights. Start with basic statistics—norm distributions, effective dimensionality, and pairwise distances—to understand your embedding space’s structure. Use dimensionality reduction (UMAP for preserving global structure, t-SNE for local clusters, PCA for speed) to visualize high-dimensional spaces in 2D or 3D. Clustering reveals natural groupings in your data; use silhouette scores and the elbow method to find optimal cluster counts. For production systems, track embedding quality metrics over time—drift in mean vectors, changes in cluster distributions, and outlier rates can signal model degradation or data distribution shifts. Interactive visualizations with Plotly help debug retrieval failures by showing where queries land relative to document clusters. The key insight is that embeddings aren’t black boxes—they’re geometric spaces with structure you can measure, visualize, and optimize. Understanding this structure helps you choose better embedding models, tune retrieval parameters, and diagnose why certain queries fail.
