Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Embedding Space Analysis: Visualizing and Understanding Vector Representations

Introduction: Understanding embedding spaces is crucial for building effective semantic search, RAG systems, and recommendation engines. Embeddings map text, images, or other data into high-dimensional vector spaces where similar items cluster together. But how do you know if your embeddings are working well? How do you debug retrieval failures or understand why certain queries return unexpected results? Embedding space analysis provides the tools to answer these questions—dimensionality reduction for visualization, clustering to discover natural groupings, distance metrics to measure similarity, and quality metrics to evaluate embedding models. This guide covers practical techniques for analyzing, visualizing, and debugging embedding spaces, helping you build intuition about how your embeddings behave and identify issues before they impact production systems.

Embedding Space Analysis
Embedding Space Analysis: Reduction, Clustering, Visualization

Embedding Space Fundamentals

from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict, Tuple
import numpy as np
from enum import Enum

@dataclass
class Embedding:
    """A single embedding with metadata."""
    
    vector: np.ndarray
    text: str
    metadata: dict = field(default_factory=dict)
    
    @property
    def dimension(self) -> int:
        return len(self.vector)
    
    @property
    def norm(self) -> float:
        return np.linalg.norm(self.vector)
    
    def normalize(self) -> 'Embedding':
        """Return normalized embedding."""
        
        norm = self.norm
        if norm == 0:
            return self
        
        return Embedding(
            vector=self.vector / norm,
            text=self.text,
            metadata=self.metadata
        )

class DistanceMetric(Enum):
    """Distance metrics for embeddings."""
    
    COSINE = "cosine"
    EUCLIDEAN = "euclidean"
    DOT_PRODUCT = "dot_product"
    MANHATTAN = "manhattan"

class EmbeddingSpace:
    """Manage and analyze embedding space."""
    
    def __init__(self, dimension: int):
        self.dimension = dimension
        self.embeddings: list[Embedding] = []
    
    def add(self, embedding: Embedding):
        """Add embedding to space."""
        
        if embedding.dimension != self.dimension:
            raise ValueError(f"Expected dimension {self.dimension}, got {embedding.dimension}")
        
        self.embeddings.append(embedding)
    
    def add_batch(self, embeddings: list[Embedding]):
        """Add multiple embeddings."""
        
        for emb in embeddings:
            self.add(emb)
    
    def get_matrix(self) -> np.ndarray:
        """Get all embeddings as matrix."""
        
        return np.array([e.vector for e in self.embeddings])
    
    def distance(
        self,
        a: np.ndarray,
        b: np.ndarray,
        metric: DistanceMetric = DistanceMetric.COSINE
    ) -> float:
        """Calculate distance between embeddings."""
        
        if metric == DistanceMetric.COSINE:
            # Cosine distance = 1 - cosine similarity
            dot = np.dot(a, b)
            norm_a = np.linalg.norm(a)
            norm_b = np.linalg.norm(b)
            
            if norm_a == 0 or norm_b == 0:
                return 1.0
            
            return 1 - (dot / (norm_a * norm_b))
        
        elif metric == DistanceMetric.EUCLIDEAN:
            return np.linalg.norm(a - b)
        
        elif metric == DistanceMetric.DOT_PRODUCT:
            # Negative dot product (higher = more similar)
            return -np.dot(a, b)
        
        elif metric == DistanceMetric.MANHATTAN:
            return np.sum(np.abs(a - b))
        
        raise ValueError(f"Unknown metric: {metric}")
    
    def similarity(
        self,
        a: np.ndarray,
        b: np.ndarray,
        metric: DistanceMetric = DistanceMetric.COSINE
    ) -> float:
        """Calculate similarity between embeddings."""
        
        if metric == DistanceMetric.COSINE:
            dot = np.dot(a, b)
            norm_a = np.linalg.norm(a)
            norm_b = np.linalg.norm(b)
            
            if norm_a == 0 or norm_b == 0:
                return 0.0
            
            return dot / (norm_a * norm_b)
        
        elif metric == DistanceMetric.DOT_PRODUCT:
            return np.dot(a, b)
        
        # For distance metrics, convert to similarity
        dist = self.distance(a, b, metric)
        return 1 / (1 + dist)
    
    def nearest_neighbors(
        self,
        query: np.ndarray,
        k: int = 10,
        metric: DistanceMetric = DistanceMetric.COSINE
    ) -> list[tuple[Embedding, float]]:
        """Find k nearest neighbors."""
        
        distances = []
        
        for emb in self.embeddings:
            dist = self.distance(query, emb.vector, metric)
            distances.append((emb, dist))
        
        distances.sort(key=lambda x: x[1])
        return distances[:k]
    
    def pairwise_distances(
        self,
        metric: DistanceMetric = DistanceMetric.COSINE
    ) -> np.ndarray:
        """Calculate pairwise distance matrix."""
        
        n = len(self.embeddings)
        distances = np.zeros((n, n))
        
        for i in range(n):
            for j in range(i + 1, n):
                dist = self.distance(
                    self.embeddings[i].vector,
                    self.embeddings[j].vector,
                    metric
                )
                distances[i, j] = dist
                distances[j, i] = dist
        
        return distances

class EmbeddingStatistics:
    """Calculate statistics for embedding space."""
    
    def __init__(self, space: EmbeddingSpace):
        self.space = space
    
    def basic_stats(self) -> dict:
        """Get basic statistics."""
        
        matrix = self.space.get_matrix()
        
        return {
            "count": len(self.space.embeddings),
            "dimension": self.space.dimension,
            "mean_norm": np.mean([e.norm for e in self.space.embeddings]),
            "std_norm": np.std([e.norm for e in self.space.embeddings]),
            "min_norm": np.min([e.norm for e in self.space.embeddings]),
            "max_norm": np.max([e.norm for e in self.space.embeddings]),
            "mean_vector": np.mean(matrix, axis=0),
            "std_per_dim": np.std(matrix, axis=0)
        }
    
    def coverage_stats(self) -> dict:
        """Analyze space coverage."""
        
        matrix = self.space.get_matrix()
        
        # Calculate spread in each dimension
        dim_ranges = np.ptp(matrix, axis=0)  # Peak to peak
        
        # Calculate effective dimensionality using PCA
        from sklearn.decomposition import PCA
        
        pca = PCA()
        pca.fit(matrix)
        
        # Dimensions needed for 95% variance
        cumsum = np.cumsum(pca.explained_variance_ratio_)
        effective_dim = np.searchsorted(cumsum, 0.95) + 1
        
        return {
            "mean_dim_range": np.mean(dim_ranges),
            "min_dim_range": np.min(dim_ranges),
            "max_dim_range": np.max(dim_ranges),
            "effective_dimensionality": effective_dim,
            "variance_explained_by_10_dims": cumsum[9] if len(cumsum) > 9 else cumsum[-1],
            "variance_explained_by_50_dims": cumsum[49] if len(cumsum) > 49 else cumsum[-1]
        }
    
    def density_stats(
        self,
        metric: DistanceMetric = DistanceMetric.COSINE
    ) -> dict:
        """Analyze embedding density."""
        
        distances = self.space.pairwise_distances(metric)
        
        # Get upper triangle (excluding diagonal)
        upper = distances[np.triu_indices_from(distances, k=1)]
        
        return {
            "mean_distance": np.mean(upper),
            "std_distance": np.std(upper),
            "min_distance": np.min(upper),
            "max_distance": np.max(upper),
            "median_distance": np.median(upper),
            "p10_distance": np.percentile(upper, 10),
            "p90_distance": np.percentile(upper, 90)
        }

Dimensionality Reduction

from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np

class DimensionalityReducer:
    """Base class for dimensionality reduction."""
    
    def fit(self, embeddings: np.ndarray):
        raise NotImplementedError
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        raise NotImplementedError
    
    def fit_transform(self, embeddings: np.ndarray) -> np.ndarray:
        self.fit(embeddings)
        return self.transform(embeddings)

class PCAReducer(DimensionalityReducer):
    """PCA-based dimensionality reduction."""
    
    def __init__(self, n_components: int = 2):
        self.n_components = n_components
        self.pca = None
    
    def fit(self, embeddings: np.ndarray):
        from sklearn.decomposition import PCA
        
        self.pca = PCA(n_components=self.n_components)
        self.pca.fit(embeddings)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        return self.pca.transform(embeddings)
    
    def get_explained_variance(self) -> np.ndarray:
        """Get explained variance ratio."""
        
        return self.pca.explained_variance_ratio_
    
    def get_components(self) -> np.ndarray:
        """Get principal components."""
        
        return self.pca.components_

class TSNEReducer(DimensionalityReducer):
    """t-SNE dimensionality reduction."""
    
    def __init__(
        self,
        n_components: int = 2,
        perplexity: float = 30.0,
        learning_rate: float = 200.0,
        n_iter: int = 1000
    ):
        self.n_components = n_components
        self.perplexity = perplexity
        self.learning_rate = learning_rate
        self.n_iter = n_iter
        self.tsne = None
        self._embedding = None
    
    def fit(self, embeddings: np.ndarray):
        from sklearn.manifold import TSNE
        
        self.tsne = TSNE(
            n_components=self.n_components,
            perplexity=self.perplexity,
            learning_rate=self.learning_rate,
            n_iter=self.n_iter,
            random_state=42
        )
        self._embedding = self.tsne.fit_transform(embeddings)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        # t-SNE doesn't support transform, return fitted embedding
        return self._embedding
    
    def fit_transform(self, embeddings: np.ndarray) -> np.ndarray:
        self.fit(embeddings)
        return self._embedding

class UMAPReducer(DimensionalityReducer):
    """UMAP dimensionality reduction."""
    
    def __init__(
        self,
        n_components: int = 2,
        n_neighbors: int = 15,
        min_dist: float = 0.1,
        metric: str = "cosine"
    ):
        self.n_components = n_components
        self.n_neighbors = n_neighbors
        self.min_dist = min_dist
        self.metric = metric
        self.umap = None
    
    def fit(self, embeddings: np.ndarray):
        import umap
        
        self.umap = umap.UMAP(
            n_components=self.n_components,
            n_neighbors=self.n_neighbors,
            min_dist=self.min_dist,
            metric=self.metric,
            random_state=42
        )
        self.umap.fit(embeddings)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        return self.umap.transform(embeddings)

class HybridReducer(DimensionalityReducer):
    """Two-stage reduction: PCA then t-SNE/UMAP."""
    
    def __init__(
        self,
        pca_components: int = 50,
        final_components: int = 2,
        method: str = "umap"
    ):
        self.pca_components = pca_components
        self.final_components = final_components
        self.method = method
        self.pca = PCAReducer(pca_components)
        
        if method == "umap":
            self.final = UMAPReducer(final_components)
        else:
            self.final = TSNEReducer(final_components)
    
    def fit(self, embeddings: np.ndarray):
        # First reduce with PCA
        pca_result = self.pca.fit_transform(embeddings)
        
        # Then apply final reduction
        self.final.fit(pca_result)
    
    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        pca_result = self.pca.transform(embeddings)
        return self.final.transform(pca_result)
    
    def fit_transform(self, embeddings: np.ndarray) -> np.ndarray:
        pca_result = self.pca.fit_transform(embeddings)
        return self.final.fit_transform(pca_result)

@dataclass
class ReductionResult:
    """Result of dimensionality reduction."""
    
    reduced: np.ndarray
    original_dim: int
    reduced_dim: int
    method: str
    metadata: dict = field(default_factory=dict)

class ReductionPipeline:
    """Pipeline for dimensionality reduction."""
    
    def __init__(self):
        self.reducers: dict[str, DimensionalityReducer] = {}
    
    def add_reducer(self, name: str, reducer: DimensionalityReducer):
        """Add a reducer."""
        
        self.reducers[name] = reducer
    
    def reduce(
        self,
        embeddings: np.ndarray,
        method: str = "umap"
    ) -> ReductionResult:
        """Reduce embeddings."""
        
        if method not in self.reducers:
            raise ValueError(f"Unknown method: {method}")
        
        reducer = self.reducers[method]
        reduced = reducer.fit_transform(embeddings)
        
        return ReductionResult(
            reduced=reduced,
            original_dim=embeddings.shape[1],
            reduced_dim=reduced.shape[1],
            method=method
        )
    
    def compare_methods(
        self,
        embeddings: np.ndarray
    ) -> dict[str, ReductionResult]:
        """Compare all reduction methods."""
        
        results = {}
        
        for name, reducer in self.reducers.items():
            results[name] = self.reduce(embeddings, name)
        
        return results

Clustering Analysis

from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np

@dataclass
class ClusterResult:
    """Result of clustering."""
    
    labels: np.ndarray
    n_clusters: int
    centroids: np.ndarray = None
    silhouette_score: float = 0.0
    inertia: float = 0.0

class EmbeddingClusterer:
    """Cluster embeddings."""
    
    def __init__(self, method: str = "kmeans"):
        self.method = method
        self.model = None
    
    def cluster(
        self,
        embeddings: np.ndarray,
        n_clusters: int = None
    ) -> ClusterResult:
        """Cluster embeddings."""
        
        if self.method == "kmeans":
            return self._kmeans(embeddings, n_clusters or 10)
        elif self.method == "hdbscan":
            return self._hdbscan(embeddings)
        elif self.method == "agglomerative":
            return self._agglomerative(embeddings, n_clusters or 10)
        elif self.method == "dbscan":
            return self._dbscan(embeddings)
        
        raise ValueError(f"Unknown method: {self.method}")
    
    def _kmeans(
        self,
        embeddings: np.ndarray,
        n_clusters: int
    ) -> ClusterResult:
        """K-means clustering."""
        
        from sklearn.cluster import KMeans
        from sklearn.metrics import silhouette_score
        
        model = KMeans(n_clusters=n_clusters, random_state=42)
        labels = model.fit_predict(embeddings)
        
        sil_score = silhouette_score(embeddings, labels) if n_clusters > 1 else 0
        
        return ClusterResult(
            labels=labels,
            n_clusters=n_clusters,
            centroids=model.cluster_centers_,
            silhouette_score=sil_score,
            inertia=model.inertia_
        )
    
    def _hdbscan(self, embeddings: np.ndarray) -> ClusterResult:
        """HDBSCAN clustering."""
        
        import hdbscan
        from sklearn.metrics import silhouette_score
        
        model = hdbscan.HDBSCAN(min_cluster_size=5, metric='euclidean')
        labels = model.fit_predict(embeddings)
        
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        
        # Calculate silhouette only for non-noise points
        mask = labels != -1
        if mask.sum() > 1 and n_clusters > 1:
            sil_score = silhouette_score(embeddings[mask], labels[mask])
        else:
            sil_score = 0
        
        return ClusterResult(
            labels=labels,
            n_clusters=n_clusters,
            silhouette_score=sil_score
        )
    
    def _agglomerative(
        self,
        embeddings: np.ndarray,
        n_clusters: int
    ) -> ClusterResult:
        """Agglomerative clustering."""
        
        from sklearn.cluster import AgglomerativeClustering
        from sklearn.metrics import silhouette_score
        
        model = AgglomerativeClustering(n_clusters=n_clusters)
        labels = model.fit_predict(embeddings)
        
        sil_score = silhouette_score(embeddings, labels) if n_clusters > 1 else 0
        
        return ClusterResult(
            labels=labels,
            n_clusters=n_clusters,
            silhouette_score=sil_score
        )
    
    def _dbscan(self, embeddings: np.ndarray) -> ClusterResult:
        """DBSCAN clustering."""
        
        from sklearn.cluster import DBSCAN
        from sklearn.metrics import silhouette_score
        
        model = DBSCAN(eps=0.5, min_samples=5)
        labels = model.fit_predict(embeddings)
        
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        
        mask = labels != -1
        if mask.sum() > 1 and n_clusters > 1:
            sil_score = silhouette_score(embeddings[mask], labels[mask])
        else:
            sil_score = 0
        
        return ClusterResult(
            labels=labels,
            n_clusters=n_clusters,
            silhouette_score=sil_score
        )

class OptimalClusterFinder:
    """Find optimal number of clusters."""
    
    def __init__(self, max_clusters: int = 20):
        self.max_clusters = max_clusters
    
    def find_optimal(
        self,
        embeddings: np.ndarray,
        method: str = "silhouette"
    ) -> int:
        """Find optimal cluster count."""
        
        if method == "silhouette":
            return self._silhouette_method(embeddings)
        elif method == "elbow":
            return self._elbow_method(embeddings)
        elif method == "gap":
            return self._gap_statistic(embeddings)
        
        raise ValueError(f"Unknown method: {method}")
    
    def _silhouette_method(self, embeddings: np.ndarray) -> int:
        """Find optimal k using silhouette score."""
        
        from sklearn.cluster import KMeans
        from sklearn.metrics import silhouette_score
        
        scores = []
        
        for k in range(2, min(self.max_clusters + 1, len(embeddings))):
            kmeans = KMeans(n_clusters=k, random_state=42)
            labels = kmeans.fit_predict(embeddings)
            score = silhouette_score(embeddings, labels)
            scores.append((k, score))
        
        # Return k with highest silhouette score
        return max(scores, key=lambda x: x[1])[0]
    
    def _elbow_method(self, embeddings: np.ndarray) -> int:
        """Find optimal k using elbow method."""
        
        from sklearn.cluster import KMeans
        
        inertias = []
        
        for k in range(1, min(self.max_clusters + 1, len(embeddings))):
            kmeans = KMeans(n_clusters=k, random_state=42)
            kmeans.fit(embeddings)
            inertias.append(kmeans.inertia_)
        
        # Find elbow using second derivative
        diffs = np.diff(inertias)
        diffs2 = np.diff(diffs)
        
        # Elbow is where second derivative is maximum
        elbow = np.argmax(diffs2) + 2
        
        return elbow
    
    def _gap_statistic(self, embeddings: np.ndarray) -> int:
        """Find optimal k using gap statistic."""
        
        from sklearn.cluster import KMeans
        
        gaps = []
        n_refs = 10
        
        for k in range(1, min(self.max_clusters + 1, len(embeddings))):
            # Cluster actual data
            kmeans = KMeans(n_clusters=k, random_state=42)
            kmeans.fit(embeddings)
            actual_inertia = kmeans.inertia_
            
            # Cluster random reference data
            ref_inertias = []
            for _ in range(n_refs):
                random_data = np.random.uniform(
                    embeddings.min(axis=0),
                    embeddings.max(axis=0),
                    size=embeddings.shape
                )
                kmeans_ref = KMeans(n_clusters=k, random_state=42)
                kmeans_ref.fit(random_data)
                ref_inertias.append(kmeans_ref.inertia_)
            
            gap = np.log(np.mean(ref_inertias)) - np.log(actual_inertia)
            gaps.append(gap)
        
        # Find first k where gap[k] >= gap[k+1] - std
        for i in range(len(gaps) - 1):
            if gaps[i] >= gaps[i + 1]:
                return i + 1
        
        return len(gaps)

class ClusterAnalyzer:
    """Analyze cluster quality and characteristics."""
    
    def __init__(self, embeddings: np.ndarray, labels: np.ndarray):
        self.embeddings = embeddings
        self.labels = labels
        self.n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
    
    def get_cluster_stats(self) -> dict:
        """Get statistics for each cluster."""
        
        stats = {}
        
        for label in set(self.labels):
            if label == -1:
                continue
            
            mask = self.labels == label
            cluster_embeddings = self.embeddings[mask]
            
            centroid = np.mean(cluster_embeddings, axis=0)
            
            # Calculate distances to centroid
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            
            stats[label] = {
                "size": mask.sum(),
                "centroid": centroid,
                "mean_distance_to_centroid": np.mean(distances),
                "max_distance_to_centroid": np.max(distances),
                "std_distance": np.std(distances)
            }
        
        return stats
    
    def get_cluster_separation(self) -> np.ndarray:
        """Calculate separation between clusters."""
        
        centroids = []
        
        for label in range(self.n_clusters):
            mask = self.labels == label
            centroid = np.mean(self.embeddings[mask], axis=0)
            centroids.append(centroid)
        
        centroids = np.array(centroids)
        
        # Pairwise distances between centroids
        separation = np.zeros((self.n_clusters, self.n_clusters))
        
        for i in range(self.n_clusters):
            for j in range(i + 1, self.n_clusters):
                dist = np.linalg.norm(centroids[i] - centroids[j])
                separation[i, j] = dist
                separation[j, i] = dist
        
        return separation
    
    def get_outliers(self, threshold: float = 2.0) -> list[int]:
        """Find outlier embeddings."""
        
        outliers = []
        
        for label in set(self.labels):
            if label == -1:
                continue
            
            mask = self.labels == label
            indices = np.where(mask)[0]
            cluster_embeddings = self.embeddings[mask]
            
            centroid = np.mean(cluster_embeddings, axis=0)
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            
            mean_dist = np.mean(distances)
            std_dist = np.std(distances)
            
            for i, dist in enumerate(distances):
                if dist > mean_dist + threshold * std_dist:
                    outliers.append(indices[i])
        
        return outliers

Visualization Tools

from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np

class EmbeddingVisualizer:
    """Visualize embedding spaces."""
    
    def __init__(self):
        self.reducer = None
    
    def plot_2d(
        self,
        embeddings: np.ndarray,
        labels: np.ndarray = None,
        texts: list[str] = None,
        title: str = "Embedding Space",
        method: str = "umap"
    ):
        """Create 2D scatter plot."""
        
        import matplotlib.pyplot as plt
        
        # Reduce to 2D
        if embeddings.shape[1] > 2:
            if method == "umap":
                reducer = UMAPReducer(n_components=2)
            elif method == "tsne":
                reducer = TSNEReducer(n_components=2)
            else:
                reducer = PCAReducer(n_components=2)
            
            reduced = reducer.fit_transform(embeddings)
        else:
            reduced = embeddings
        
        # Create plot
        fig, ax = plt.subplots(figsize=(12, 8))
        
        if labels is not None:
            scatter = ax.scatter(
                reduced[:, 0],
                reduced[:, 1],
                c=labels,
                cmap='tab10',
                alpha=0.7
            )
            plt.colorbar(scatter, label='Cluster')
        else:
            ax.scatter(reduced[:, 0], reduced[:, 1], alpha=0.7)
        
        # Add text annotations for first few points
        if texts is not None:
            for i, txt in enumerate(texts[:20]):
                ax.annotate(
                    txt[:30],
                    (reduced[i, 0], reduced[i, 1]),
                    fontsize=8,
                    alpha=0.7
                )
        
        ax.set_title(title)
        ax.set_xlabel('Dimension 1')
        ax.set_ylabel('Dimension 2')
        
        return fig
    
    def plot_3d(
        self,
        embeddings: np.ndarray,
        labels: np.ndarray = None,
        title: str = "Embedding Space 3D"
    ):
        """Create 3D scatter plot."""
        
        import matplotlib.pyplot as plt
        from mpl_toolkits.mplot3d import Axes3D
        
        # Reduce to 3D
        if embeddings.shape[1] > 3:
            reducer = UMAPReducer(n_components=3)
            reduced = reducer.fit_transform(embeddings)
        else:
            reduced = embeddings
        
        fig = plt.figure(figsize=(12, 8))
        ax = fig.add_subplot(111, projection='3d')
        
        if labels is not None:
            scatter = ax.scatter(
                reduced[:, 0],
                reduced[:, 1],
                reduced[:, 2],
                c=labels,
                cmap='tab10',
                alpha=0.7
            )
            plt.colorbar(scatter, label='Cluster')
        else:
            ax.scatter(
                reduced[:, 0],
                reduced[:, 1],
                reduced[:, 2],
                alpha=0.7
            )
        
        ax.set_title(title)
        ax.set_xlabel('Dimension 1')
        ax.set_ylabel('Dimension 2')
        ax.set_zlabel('Dimension 3')
        
        return fig
    
    def plot_similarity_heatmap(
        self,
        embeddings: np.ndarray,
        texts: list[str] = None,
        title: str = "Similarity Matrix"
    ):
        """Plot similarity heatmap."""
        
        import matplotlib.pyplot as plt
        import seaborn as sns
        
        # Calculate cosine similarity
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        normalized = embeddings / norms
        similarity = np.dot(normalized, normalized.T)
        
        fig, ax = plt.subplots(figsize=(12, 10))
        
        sns.heatmap(
            similarity,
            ax=ax,
            cmap='RdYlBu_r',
            vmin=-1,
            vmax=1,
            xticklabels=texts[:50] if texts else False,
            yticklabels=texts[:50] if texts else False
        )
        
        ax.set_title(title)
        plt.xticks(rotation=45, ha='right')
        plt.yticks(rotation=0)
        
        return fig
    
    def plot_dimension_distribution(
        self,
        embeddings: np.ndarray,
        dims: list[int] = None,
        title: str = "Dimension Distributions"
    ):
        """Plot distribution of values in each dimension."""
        
        import matplotlib.pyplot as plt
        
        dims = dims or list(range(min(10, embeddings.shape[1])))
        
        fig, axes = plt.subplots(2, 5, figsize=(15, 6))
        axes = axes.flatten()
        
        for i, dim in enumerate(dims[:10]):
            axes[i].hist(embeddings[:, dim], bins=50, alpha=0.7)
            axes[i].set_title(f'Dimension {dim}')
            axes[i].set_xlabel('Value')
            axes[i].set_ylabel('Count')
        
        plt.suptitle(title)
        plt.tight_layout()
        
        return fig
    
    def plot_cluster_sizes(
        self,
        labels: np.ndarray,
        title: str = "Cluster Sizes"
    ):
        """Plot cluster size distribution."""
        
        import matplotlib.pyplot as plt
        
        unique, counts = np.unique(labels, return_counts=True)
        
        fig, ax = plt.subplots(figsize=(10, 6))
        
        ax.bar(unique, counts, alpha=0.7)
        ax.set_xlabel('Cluster')
        ax.set_ylabel('Size')
        ax.set_title(title)
        
        return fig

class InteractiveVisualizer:
    """Interactive embedding visualization."""
    
    def __init__(self):
        self.data = None
    
    def create_plotly_scatter(
        self,
        embeddings: np.ndarray,
        labels: np.ndarray = None,
        texts: list[str] = None,
        title: str = "Embedding Space"
    ):
        """Create interactive Plotly scatter plot."""
        
        import plotly.express as px
        import pandas as pd
        
        # Reduce to 2D
        if embeddings.shape[1] > 2:
            reducer = UMAPReducer(n_components=2)
            reduced = reducer.fit_transform(embeddings)
        else:
            reduced = embeddings
        
        df = pd.DataFrame({
            'x': reduced[:, 0],
            'y': reduced[:, 1],
            'label': labels if labels is not None else 0,
            'text': texts if texts else [f"Point {i}" for i in range(len(reduced))]
        })
        
        fig = px.scatter(
            df,
            x='x',
            y='y',
            color='label',
            hover_data=['text'],
            title=title
        )
        
        return fig
    
    def create_plotly_3d(
        self,
        embeddings: np.ndarray,
        labels: np.ndarray = None,
        texts: list[str] = None,
        title: str = "Embedding Space 3D"
    ):
        """Create interactive 3D Plotly scatter plot."""
        
        import plotly.express as px
        import pandas as pd
        
        # Reduce to 3D
        if embeddings.shape[1] > 3:
            reducer = UMAPReducer(n_components=3)
            reduced = reducer.fit_transform(embeddings)
        else:
            reduced = embeddings
        
        df = pd.DataFrame({
            'x': reduced[:, 0],
            'y': reduced[:, 1],
            'z': reduced[:, 2],
            'label': labels if labels is not None else 0,
            'text': texts if texts else [f"Point {i}" for i in range(len(reduced))]
        })
        
        fig = px.scatter_3d(
            df,
            x='x',
            y='y',
            z='z',
            color='label',
            hover_data=['text'],
            title=title
        )
        
        return fig

Production Analysis Service

from fastapi import FastAPI, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import Optional, List
import numpy as np
import json

app = FastAPI()

class AnalyzeRequest(BaseModel):
    embeddings: List[List[float]]
    texts: Optional[List[str]] = None
    method: str = "umap"
    n_clusters: Optional[int] = None

class ClusterRequest(BaseModel):
    embeddings: List[List[float]]
    method: str = "kmeans"
    n_clusters: Optional[int] = None

class SimilarityRequest(BaseModel):
    query: List[float]
    embeddings: List[List[float]]
    k: int = 10

# Initialize components
space = EmbeddingSpace(dimension=1536)  # Default OpenAI dimension
visualizer = EmbeddingVisualizer()

@app.post("/v1/analyze")
async def analyze_embeddings(request: AnalyzeRequest) -> dict:
    """Analyze embedding space."""
    
    embeddings = np.array(request.embeddings)
    
    # Create temporary space
    temp_space = EmbeddingSpace(dimension=embeddings.shape[1])
    for i, vec in enumerate(embeddings):
        text = request.texts[i] if request.texts else f"embedding_{i}"
        temp_space.add(Embedding(vector=np.array(vec), text=text))
    
    # Get statistics
    stats = EmbeddingStatistics(temp_space)
    basic = stats.basic_stats()
    coverage = stats.coverage_stats()
    density = stats.density_stats()
    
    return {
        "count": basic["count"],
        "dimension": basic["dimension"],
        "mean_norm": float(basic["mean_norm"]),
        "effective_dimensionality": coverage["effective_dimensionality"],
        "mean_distance": float(density["mean_distance"]),
        "median_distance": float(density["median_distance"])
    }

@app.post("/v1/reduce")
async def reduce_dimensions(request: AnalyzeRequest) -> dict:
    """Reduce embedding dimensions."""
    
    embeddings = np.array(request.embeddings)
    
    if request.method == "pca":
        reducer = PCAReducer(n_components=2)
    elif request.method == "tsne":
        reducer = TSNEReducer(n_components=2)
    else:
        reducer = UMAPReducer(n_components=2)
    
    reduced = reducer.fit_transform(embeddings)
    
    return {
        "reduced": reduced.tolist(),
        "method": request.method,
        "original_dim": embeddings.shape[1],
        "reduced_dim": 2
    }

@app.post("/v1/cluster")
async def cluster_embeddings(request: ClusterRequest) -> dict:
    """Cluster embeddings."""
    
    embeddings = np.array(request.embeddings)
    
    clusterer = EmbeddingClusterer(method=request.method)
    
    if request.n_clusters:
        result = clusterer.cluster(embeddings, request.n_clusters)
    else:
        # Find optimal
        finder = OptimalClusterFinder()
        optimal_k = finder.find_optimal(embeddings)
        result = clusterer.cluster(embeddings, optimal_k)
    
    return {
        "labels": result.labels.tolist(),
        "n_clusters": result.n_clusters,
        "silhouette_score": float(result.silhouette_score)
    }

@app.post("/v1/similarity")
async def find_similar(request: SimilarityRequest) -> dict:
    """Find similar embeddings."""
    
    query = np.array(request.query)
    embeddings = np.array(request.embeddings)
    
    # Calculate similarities
    similarities = []
    for i, emb in enumerate(embeddings):
        sim = np.dot(query, emb) / (np.linalg.norm(query) * np.linalg.norm(emb))
        similarities.append((i, float(sim)))
    
    # Sort by similarity
    similarities.sort(key=lambda x: x[1], reverse=True)
    
    return {
        "results": [
            {"index": idx, "similarity": sim}
            for idx, sim in similarities[:request.k]
        ]
    }

@app.post("/v1/outliers")
async def find_outliers(request: ClusterRequest) -> dict:
    """Find outlier embeddings."""
    
    embeddings = np.array(request.embeddings)
    
    # Cluster first
    clusterer = EmbeddingClusterer(method="kmeans")
    result = clusterer.cluster(embeddings, request.n_clusters or 5)
    
    # Find outliers
    analyzer = ClusterAnalyzer(embeddings, result.labels)
    outliers = analyzer.get_outliers(threshold=2.0)
    
    return {
        "outlier_indices": outliers,
        "outlier_count": len(outliers),
        "total_count": len(embeddings)
    }

@app.get("/v1/stats")
async def get_space_stats() -> dict:
    """Get current space statistics."""
    
    if not space.embeddings:
        return {"error": "No embeddings in space"}
    
    stats = EmbeddingStatistics(space)
    basic = stats.basic_stats()
    
    return {
        "count": basic["count"],
        "dimension": basic["dimension"],
        "mean_norm": float(basic["mean_norm"])
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Embedding space analysis transforms opaque vector representations into actionable insights. Start with basic statistics—norm distributions, effective dimensionality, and pairwise distances—to understand your embedding space’s structure. Use dimensionality reduction (UMAP for preserving global structure, t-SNE for local clusters, PCA for speed) to visualize high-dimensional spaces in 2D or 3D. Clustering reveals natural groupings in your data; use silhouette scores and the elbow method to find optimal cluster counts. For production systems, track embedding quality metrics over time—drift in mean vectors, changes in cluster distributions, and outlier rates can signal model degradation or data distribution shifts. Interactive visualizations with Plotly help debug retrieval failures by showing where queries land relative to document clusters. The key insight is that embeddings aren’t black boxes—they’re geometric spaces with structure you can measure, visualize, and optimize. Understanding this structure helps you choose better embedding models, tune retrieval parameters, and diagnose why certain queries fail.