Embedding Model Selection: Choosing the Right Model for Your AI Application

Introduction: Choosing the right embedding model determines the quality of your semantic search, RAG system, or clustering application. Different models excel at different tasks—some optimize for retrieval accuracy, others for speed, and others for specific domains. The wrong choice means poor results regardless of how well you build everything else. This guide covers practical embedding model selection: understanding model characteristics, benchmarking on your specific data, optimizing for latency and cost, and building systems that can switch models as better options emerge.

Embedding Selection
Model Selection: Task Evaluation, Model Comparison, Performance Optimization

Model Characteristics

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class EmbeddingTask(Enum):
    """Types of embedding tasks."""
    
    RETRIEVAL = "retrieval"
    CLUSTERING = "clustering"
    CLASSIFICATION = "classification"
    SEMANTIC_SIMILARITY = "semantic_similarity"
    RERANKING = "reranking"

@dataclass
class ModelSpec:
    """Embedding model specification."""
    
    name: str
    provider: str
    dimensions: int
    max_tokens: int
    
    # Performance characteristics
    latency_ms: float = None  # Average latency per request
    throughput: float = None  # Embeddings per second
    
    # Quality metrics (from MTEB or similar)
    retrieval_score: float = None
    clustering_score: float = None
    classification_score: float = None
    
    # Cost
    cost_per_1k_tokens: float = None
    
    # Features
    supports_batching: bool = True
    supports_truncation: bool = True
    multilingual: bool = False

# Popular embedding models
MODELS = {
    "text-embedding-3-small": ModelSpec(
        name="text-embedding-3-small",
        provider="openai",
        dimensions=1536,
        max_tokens=8191,
        retrieval_score=0.62,
        cost_per_1k_tokens=0.00002,
        multilingual=True
    ),
    "text-embedding-3-large": ModelSpec(
        name="text-embedding-3-large",
        provider="openai",
        dimensions=3072,
        max_tokens=8191,
        retrieval_score=0.64,
        cost_per_1k_tokens=0.00013,
        multilingual=True
    ),
    "text-embedding-ada-002": ModelSpec(
        name="text-embedding-ada-002",
        provider="openai",
        dimensions=1536,
        max_tokens=8191,
        retrieval_score=0.58,
        cost_per_1k_tokens=0.0001
    ),
    "voyage-large-2": ModelSpec(
        name="voyage-large-2",
        provider="voyage",
        dimensions=1536,
        max_tokens=16000,
        retrieval_score=0.68,
        cost_per_1k_tokens=0.00012
    ),
    "voyage-code-2": ModelSpec(
        name="voyage-code-2",
        provider="voyage",
        dimensions=1536,
        max_tokens=16000,
        retrieval_score=0.70,  # For code
        cost_per_1k_tokens=0.00012
    ),
    "cohere-embed-english-v3": ModelSpec(
        name="embed-english-v3.0",
        provider="cohere",
        dimensions=1024,
        max_tokens=512,
        retrieval_score=0.64,
        cost_per_1k_tokens=0.0001
    ),
    "bge-large-en-v1.5": ModelSpec(
        name="BAAI/bge-large-en-v1.5",
        provider="huggingface",
        dimensions=1024,
        max_tokens=512,
        retrieval_score=0.63,
        cost_per_1k_tokens=0.0  # Self-hosted
    ),
    "e5-large-v2": ModelSpec(
        name="intfloat/e5-large-v2",
        provider="huggingface",
        dimensions=1024,
        max_tokens=512,
        retrieval_score=0.62,
        cost_per_1k_tokens=0.0
    ),
    "gte-large": ModelSpec(
        name="thenlper/gte-large",
        provider="huggingface",
        dimensions=1024,
        max_tokens=512,
        retrieval_score=0.63,
        cost_per_1k_tokens=0.0
    )
}

class ModelSelector:
    """Select embedding model based on requirements."""
    
    def __init__(self, models: dict[str, ModelSpec] = None):
        self.models = models or MODELS
    
    def select(
        self,
        task: EmbeddingTask,
        max_cost_per_1k: float = None,
        min_dimensions: int = None,
        max_latency_ms: float = None,
        require_multilingual: bool = False
    ) -> list[ModelSpec]:
        """Select models matching requirements."""
        
        candidates = []
        
        for name, spec in self.models.items():
            # Filter by cost
            if max_cost_per_1k and spec.cost_per_1k_tokens:
                if spec.cost_per_1k_tokens > max_cost_per_1k:
                    continue
            
            # Filter by dimensions
            if min_dimensions and spec.dimensions < min_dimensions:
                continue
            
            # Filter by latency
            if max_latency_ms and spec.latency_ms:
                if spec.latency_ms > max_latency_ms:
                    continue
            
            # Filter by multilingual
            if require_multilingual and not spec.multilingual:
                continue
            
            candidates.append(spec)
        
        # Sort by task-specific score
        score_attr = f"{task.value}_score"
        
        candidates.sort(
            key=lambda m: getattr(m, score_attr, 0) or 0,
            reverse=True
        )
        
        return candidates

Unified Embedding Interface

from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
import numpy as np

@dataclass
class EmbeddingResult:
    """Result of embedding operation."""
    
    embeddings: list[list[float]]
    model: str
    dimensions: int
    tokens_used: int = 0
    latency_ms: float = 0

class EmbeddingProvider(ABC):
    """Base class for embedding providers."""
    
    @abstractmethod
    async def embed(
        self,
        texts: list[str],
        **kwargs
    ) -> EmbeddingResult:
        """Generate embeddings for texts."""
        pass
    
    @abstractmethod
    def get_spec(self) -> ModelSpec:
        """Get model specification."""
        pass

class OpenAIEmbedding(EmbeddingProvider):
    """OpenAI embedding provider."""
    
    def __init__(self, client: Any, model: str = "text-embedding-3-small"):
        self.client = client
        self.model = model
    
    async def embed(
        self,
        texts: list[str],
        dimensions: int = None,
        **kwargs
    ) -> EmbeddingResult:
        """Generate embeddings using OpenAI."""
        
        import time
        start = time.time()
        
        params = {"model": self.model, "input": texts}
        
        if dimensions and "3-" in self.model:
            params["dimensions"] = dimensions
        
        response = await self.client.embeddings.create(**params)
        
        latency = (time.time() - start) * 1000
        
        embeddings = [item.embedding for item in response.data]
        
        return EmbeddingResult(
            embeddings=embeddings,
            model=self.model,
            dimensions=len(embeddings[0]),
            tokens_used=response.usage.total_tokens,
            latency_ms=latency
        )
    
    def get_spec(self) -> ModelSpec:
        return MODELS.get(self.model)

class VoyageEmbedding(EmbeddingProvider):
    """Voyage AI embedding provider."""
    
    def __init__(self, client: Any, model: str = "voyage-large-2"):
        self.client = client
        self.model = model
    
    async def embed(
        self,
        texts: list[str],
        input_type: str = None,
        **kwargs
    ) -> EmbeddingResult:
        """Generate embeddings using Voyage."""
        
        import time
        start = time.time()
        
        params = {"model": self.model, "input": texts}
        
        if input_type:
            params["input_type"] = input_type
        
        response = await self.client.embed(**params)
        
        latency = (time.time() - start) * 1000
        
        return EmbeddingResult(
            embeddings=response.embeddings,
            model=self.model,
            dimensions=len(response.embeddings[0]),
            tokens_used=response.total_tokens,
            latency_ms=latency
        )
    
    def get_spec(self) -> ModelSpec:
        return MODELS.get(self.model)

class HuggingFaceEmbedding(EmbeddingProvider):
    """Local HuggingFace embedding provider."""
    
    def __init__(self, model_name: str = "BAAI/bge-large-en-v1.5"):
        self.model_name = model_name
        self._model = None
        self._tokenizer = None
    
    def _load_model(self):
        """Lazy load model."""
        
        if self._model is None:
            from transformers import AutoModel, AutoTokenizer
            import torch
            
            self._tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            self._model = AutoModel.from_pretrained(self.model_name)
            
            if torch.cuda.is_available():
                self._model = self._model.cuda()
    
    async def embed(
        self,
        texts: list[str],
        **kwargs
    ) -> EmbeddingResult:
        """Generate embeddings locally."""
        
        import time
        import torch
        
        self._load_model()
        
        start = time.time()
        
        # Tokenize
        encoded = self._tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        )
        
        if torch.cuda.is_available():
            encoded = {k: v.cuda() for k, v in encoded.items()}
        
        # Generate embeddings
        with torch.no_grad():
            outputs = self._model(**encoded)
            embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
        
        latency = (time.time() - start) * 1000
        
        return EmbeddingResult(
            embeddings=embeddings.tolist(),
            model=self.model_name,
            dimensions=embeddings.shape[1],
            tokens_used=sum(len(self._tokenizer.encode(t)) for t in texts),
            latency_ms=latency
        )
    
    def get_spec(self) -> ModelSpec:
        return MODELS.get(self.model_name)

class UnifiedEmbedder:
    """Unified interface for multiple embedding providers."""
    
    def __init__(self):
        self._providers: dict[str, EmbeddingProvider] = {}
        self._default_provider: str = None
    
    def register(self, name: str, provider: EmbeddingProvider, default: bool = False):
        """Register an embedding provider."""
        
        self._providers[name] = provider
        
        if default or self._default_provider is None:
            self._default_provider = name
    
    async def embed(
        self,
        texts: list[str],
        provider: str = None,
        **kwargs
    ) -> EmbeddingResult:
        """Generate embeddings using specified or default provider."""
        
        provider_name = provider or self._default_provider
        
        if provider_name not in self._providers:
            raise ValueError(f"Unknown provider: {provider_name}")
        
        return await self._providers[provider_name].embed(texts, **kwargs)
    
    def get_providers(self) -> list[str]:
        """List registered providers."""
        return list(self._providers.keys())

Benchmarking

from dataclasses import dataclass, field
from typing import Any, Optional
import numpy as np
import time

@dataclass
class BenchmarkResult:
    """Result of model benchmark."""
    
    model: str
    task: str
    
    # Quality metrics
    accuracy: float = None
    precision: float = None
    recall: float = None
    ndcg: float = None
    mrr: float = None
    
    # Performance metrics
    avg_latency_ms: float = None
    p95_latency_ms: float = None
    throughput: float = None
    
    # Cost metrics
    total_tokens: int = 0
    estimated_cost: float = 0.0

@dataclass
class BenchmarkDataset:
    """Dataset for benchmarking."""
    
    queries: list[str]
    documents: list[str]
    relevance: dict[int, list[int]]  # query_idx -> relevant_doc_idxs

class EmbeddingBenchmark:
    """Benchmark embedding models."""
    
    def __init__(self, embedder: UnifiedEmbedder):
        self.embedder = embedder
    
    async def benchmark_retrieval(
        self,
        dataset: BenchmarkDataset,
        provider: str,
        k: int = 10
    ) -> BenchmarkResult:
        """Benchmark retrieval performance."""
        
        latencies = []
        total_tokens = 0
        
        # Embed documents
        start = time.time()
        doc_result = await self.embedder.embed(
            dataset.documents,
            provider=provider
        )
        doc_embeddings = np.array(doc_result.embeddings)
        latencies.append(doc_result.latency_ms)
        total_tokens += doc_result.tokens_used
        
        # Embed queries and evaluate
        hits = 0
        total_relevant = 0
        reciprocal_ranks = []
        ndcg_scores = []
        
        for i, query in enumerate(dataset.queries):
            query_result = await self.embedder.embed([query], provider=provider)
            query_embedding = np.array(query_result.embeddings[0])
            latencies.append(query_result.latency_ms)
            total_tokens += query_result.tokens_used
            
            # Calculate similarities
            similarities = np.dot(doc_embeddings, query_embedding)
            top_k_indices = np.argsort(similarities)[-k:][::-1]
            
            # Calculate metrics
            relevant = set(dataset.relevance.get(i, []))
            total_relevant += len(relevant)
            
            # Hits@k
            hits += len(set(top_k_indices) & relevant)
            
            # MRR
            for rank, idx in enumerate(top_k_indices, 1):
                if idx in relevant:
                    reciprocal_ranks.append(1.0 / rank)
                    break
            else:
                reciprocal_ranks.append(0.0)
            
            # NDCG
            dcg = sum(
                1.0 / np.log2(rank + 2)
                for rank, idx in enumerate(top_k_indices)
                if idx in relevant
            )
            ideal_dcg = sum(
                1.0 / np.log2(rank + 2)
                for rank in range(min(len(relevant), k))
            )
            ndcg_scores.append(dcg / ideal_dcg if ideal_dcg > 0 else 0)
        
        # Calculate cost
        spec = self.embedder._providers[provider].get_spec()
        cost = (total_tokens / 1000) * (spec.cost_per_1k_tokens or 0)
        
        return BenchmarkResult(
            model=provider,
            task="retrieval",
            recall=hits / total_relevant if total_relevant > 0 else 0,
            mrr=np.mean(reciprocal_ranks),
            ndcg=np.mean(ndcg_scores),
            avg_latency_ms=np.mean(latencies),
            p95_latency_ms=np.percentile(latencies, 95),
            throughput=len(dataset.queries) / (sum(latencies) / 1000),
            total_tokens=total_tokens,
            estimated_cost=cost
        )
    
    async def benchmark_clustering(
        self,
        texts: list[str],
        labels: list[int],
        provider: str
    ) -> BenchmarkResult:
        """Benchmark clustering performance."""
        
        from sklearn.cluster import KMeans
        from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
        
        # Embed texts
        result = await self.embedder.embed(texts, provider=provider)
        embeddings = np.array(result.embeddings)
        
        # Cluster
        n_clusters = len(set(labels))
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        predicted = kmeans.fit_predict(embeddings)
        
        # Evaluate
        ari = adjusted_rand_score(labels, predicted)
        nmi = normalized_mutual_info_score(labels, predicted)
        
        spec = self.embedder._providers[provider].get_spec()
        cost = (result.tokens_used / 1000) * (spec.cost_per_1k_tokens or 0)
        
        return BenchmarkResult(
            model=provider,
            task="clustering",
            accuracy=(ari + nmi) / 2,  # Combined score
            avg_latency_ms=result.latency_ms,
            total_tokens=result.tokens_used,
            estimated_cost=cost
        )
    
    async def compare_models(
        self,
        dataset: BenchmarkDataset,
        providers: list[str],
        task: str = "retrieval"
    ) -> list[BenchmarkResult]:
        """Compare multiple models on same dataset."""
        
        results = []
        
        for provider in providers:
            if task == "retrieval":
                result = await self.benchmark_retrieval(dataset, provider)
            elif task == "clustering":
                result = await self.benchmark_clustering(
                    dataset.documents,
                    list(range(len(dataset.documents))),  # Placeholder labels
                    provider
                )
            else:
                raise ValueError(f"Unknown task: {task}")
            
            results.append(result)
        
        return results

Dimension Reduction

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

@dataclass
class ReductionResult:
    """Result of dimension reduction."""
    
    embeddings: np.ndarray
    original_dims: int
    reduced_dims: int
    variance_retained: float = None

class DimensionReducer:
    """Reduce embedding dimensions."""
    
    def __init__(self, method: str = "pca"):
        self.method = method
        self._reducer = None
    
    def fit(self, embeddings: np.ndarray, target_dims: int) -> "DimensionReducer":
        """Fit reducer on embeddings."""
        
        if self.method == "pca":
            from sklearn.decomposition import PCA
            self._reducer = PCA(n_components=target_dims)
            self._reducer.fit(embeddings)
        
        elif self.method == "umap":
            import umap
            self._reducer = umap.UMAP(n_components=target_dims)
            self._reducer.fit(embeddings)
        
        elif self.method == "random":
            # Random projection
            self._projection_matrix = np.random.randn(
                embeddings.shape[1], target_dims
            ) / np.sqrt(target_dims)
        
        else:
            raise ValueError(f"Unknown method: {self.method}")
        
        return self
    
    def transform(self, embeddings: np.ndarray) -> ReductionResult:
        """Transform embeddings to lower dimensions."""
        
        original_dims = embeddings.shape[1]
        
        if self.method == "random":
            reduced = embeddings @ self._projection_matrix
            variance_retained = None
        else:
            reduced = self._reducer.transform(embeddings)
            
            if self.method == "pca":
                variance_retained = sum(self._reducer.explained_variance_ratio_)
            else:
                variance_retained = None
        
        return ReductionResult(
            embeddings=reduced,
            original_dims=original_dims,
            reduced_dims=reduced.shape[1],
            variance_retained=variance_retained
        )
    
    def fit_transform(
        self,
        embeddings: np.ndarray,
        target_dims: int
    ) -> ReductionResult:
        """Fit and transform in one step."""
        
        self.fit(embeddings, target_dims)
        return self.transform(embeddings)

class MatryoshkaEmbedding:
    """Use Matryoshka embeddings for flexible dimensions."""
    
    def __init__(self, provider: EmbeddingProvider):
        self.provider = provider
    
    async def embed(
        self,
        texts: list[str],
        dimensions: int = None
    ) -> EmbeddingResult:
        """Generate embeddings with optional dimension truncation."""
        
        # Get full embeddings
        result = await self.provider.embed(texts)
        
        if dimensions and dimensions < result.dimensions:
            # Truncate and normalize
            truncated = [
                self._truncate_and_normalize(emb, dimensions)
                for emb in result.embeddings
            ]
            
            return EmbeddingResult(
                embeddings=truncated,
                model=result.model,
                dimensions=dimensions,
                tokens_used=result.tokens_used,
                latency_ms=result.latency_ms
            )
        
        return result
    
    def _truncate_and_normalize(
        self,
        embedding: list[float],
        dimensions: int
    ) -> list[float]:
        """Truncate and L2 normalize."""
        
        truncated = np.array(embedding[:dimensions])
        norm = np.linalg.norm(truncated)
        
        if norm > 0:
            truncated = truncated / norm
        
        return truncated.tolist()

Production Embedding Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
embedder = UnifiedEmbedder()
benchmark = EmbeddingBenchmark(embedder)

class EmbedRequest(BaseModel):
    texts: list[str]
    provider: Optional[str] = None
    dimensions: Optional[int] = None

class BenchmarkRequest(BaseModel):
    queries: list[str]
    documents: list[str]
    relevance: dict[str, list[int]]
    providers: list[str]

@app.post("/v1/embed")
async def embed_texts(request: EmbedRequest):
    """Generate embeddings."""
    
    try:
        result = await embedder.embed(
            texts=request.texts,
            provider=request.provider
        )
        
        # Optionally reduce dimensions
        if request.dimensions and request.dimensions < result.dimensions:
            reducer = DimensionReducer(method="pca")
            reduced = reducer.fit_transform(
                np.array(result.embeddings),
                request.dimensions
            )
            
            return {
                "embeddings": reduced.embeddings.tolist(),
                "model": result.model,
                "dimensions": reduced.reduced_dims,
                "tokens_used": result.tokens_used,
                "latency_ms": result.latency_ms
            }
        
        return {
            "embeddings": result.embeddings,
            "model": result.model,
            "dimensions": result.dimensions,
            "tokens_used": result.tokens_used,
            "latency_ms": result.latency_ms
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/v1/benchmark")
async def run_benchmark(request: BenchmarkRequest):
    """Benchmark embedding models."""
    
    dataset = BenchmarkDataset(
        queries=request.queries,
        documents=request.documents,
        relevance={int(k): v for k, v in request.relevance.items()}
    )
    
    results = await benchmark.compare_models(
        dataset=dataset,
        providers=request.providers
    )
    
    return {
        "results": [
            {
                "model": r.model,
                "recall": r.recall,
                "mrr": r.mrr,
                "ndcg": r.ndcg,
                "avg_latency_ms": r.avg_latency_ms,
                "estimated_cost": r.estimated_cost
            }
            for r in results
        ]
    }

@app.get("/v1/models")
async def list_models():
    """List available embedding models."""
    
    return {
        "models": [
            {
                "name": spec.name,
                "provider": spec.provider,
                "dimensions": spec.dimensions,
                "max_tokens": spec.max_tokens,
                "retrieval_score": spec.retrieval_score,
                "cost_per_1k_tokens": spec.cost_per_1k_tokens
            }
            for spec in MODELS.values()
        ]
    }

@app.get("/v1/providers")
async def list_providers():
    """List registered providers."""
    
    return {"providers": embedder.get_providers()}

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Embedding model selection significantly impacts your application's quality and cost. Start by understanding your task—retrieval, clustering, and classification have different requirements. Use the MTEB leaderboard as a starting point, but benchmark on your specific data because domain matters. Consider the cost-quality tradeoff: OpenAI's text-embedding-3-small offers excellent value, while Voyage excels for code and specialized domains. For high-volume applications, self-hosted models like BGE or E5 eliminate per-token costs. Use dimension reduction when storage or latency matters—Matryoshka embeddings let you truncate without retraining. Build a unified interface that lets you switch models easily as better options emerge. The key insight is that embedding quality directly determines retrieval quality, which determines RAG quality. Invest time in selecting and benchmarking the right model for your use case.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.