Introduction: Choosing the right embedding model determines the quality of your semantic search, RAG system, or clustering application. Different models excel at different tasks—some optimize for retrieval accuracy, others for speed, and others for specific domains. The wrong choice means poor results regardless of how well you build everything else. This guide covers practical embedding model selection: understanding model characteristics, benchmarking on your specific data, optimizing for latency and cost, and building systems that can switch models as better options emerge.

Model Characteristics
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class EmbeddingTask(Enum):
"""Types of embedding tasks."""
RETRIEVAL = "retrieval"
CLUSTERING = "clustering"
CLASSIFICATION = "classification"
SEMANTIC_SIMILARITY = "semantic_similarity"
RERANKING = "reranking"
@dataclass
class ModelSpec:
"""Embedding model specification."""
name: str
provider: str
dimensions: int
max_tokens: int
# Performance characteristics
latency_ms: float = None # Average latency per request
throughput: float = None # Embeddings per second
# Quality metrics (from MTEB or similar)
retrieval_score: float = None
clustering_score: float = None
classification_score: float = None
# Cost
cost_per_1k_tokens: float = None
# Features
supports_batching: bool = True
supports_truncation: bool = True
multilingual: bool = False
# Popular embedding models
MODELS = {
"text-embedding-3-small": ModelSpec(
name="text-embedding-3-small",
provider="openai",
dimensions=1536,
max_tokens=8191,
retrieval_score=0.62,
cost_per_1k_tokens=0.00002,
multilingual=True
),
"text-embedding-3-large": ModelSpec(
name="text-embedding-3-large",
provider="openai",
dimensions=3072,
max_tokens=8191,
retrieval_score=0.64,
cost_per_1k_tokens=0.00013,
multilingual=True
),
"text-embedding-ada-002": ModelSpec(
name="text-embedding-ada-002",
provider="openai",
dimensions=1536,
max_tokens=8191,
retrieval_score=0.58,
cost_per_1k_tokens=0.0001
),
"voyage-large-2": ModelSpec(
name="voyage-large-2",
provider="voyage",
dimensions=1536,
max_tokens=16000,
retrieval_score=0.68,
cost_per_1k_tokens=0.00012
),
"voyage-code-2": ModelSpec(
name="voyage-code-2",
provider="voyage",
dimensions=1536,
max_tokens=16000,
retrieval_score=0.70, # For code
cost_per_1k_tokens=0.00012
),
"cohere-embed-english-v3": ModelSpec(
name="embed-english-v3.0",
provider="cohere",
dimensions=1024,
max_tokens=512,
retrieval_score=0.64,
cost_per_1k_tokens=0.0001
),
"bge-large-en-v1.5": ModelSpec(
name="BAAI/bge-large-en-v1.5",
provider="huggingface",
dimensions=1024,
max_tokens=512,
retrieval_score=0.63,
cost_per_1k_tokens=0.0 # Self-hosted
),
"e5-large-v2": ModelSpec(
name="intfloat/e5-large-v2",
provider="huggingface",
dimensions=1024,
max_tokens=512,
retrieval_score=0.62,
cost_per_1k_tokens=0.0
),
"gte-large": ModelSpec(
name="thenlper/gte-large",
provider="huggingface",
dimensions=1024,
max_tokens=512,
retrieval_score=0.63,
cost_per_1k_tokens=0.0
)
}
class ModelSelector:
"""Select embedding model based on requirements."""
def __init__(self, models: dict[str, ModelSpec] = None):
self.models = models or MODELS
def select(
self,
task: EmbeddingTask,
max_cost_per_1k: float = None,
min_dimensions: int = None,
max_latency_ms: float = None,
require_multilingual: bool = False
) -> list[ModelSpec]:
"""Select models matching requirements."""
candidates = []
for name, spec in self.models.items():
# Filter by cost
if max_cost_per_1k and spec.cost_per_1k_tokens:
if spec.cost_per_1k_tokens > max_cost_per_1k:
continue
# Filter by dimensions
if min_dimensions and spec.dimensions < min_dimensions:
continue
# Filter by latency
if max_latency_ms and spec.latency_ms:
if spec.latency_ms > max_latency_ms:
continue
# Filter by multilingual
if require_multilingual and not spec.multilingual:
continue
candidates.append(spec)
# Sort by task-specific score
score_attr = f"{task.value}_score"
candidates.sort(
key=lambda m: getattr(m, score_attr, 0) or 0,
reverse=True
)
return candidates
Unified Embedding Interface
from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
import numpy as np
@dataclass
class EmbeddingResult:
"""Result of embedding operation."""
embeddings: list[list[float]]
model: str
dimensions: int
tokens_used: int = 0
latency_ms: float = 0
class EmbeddingProvider(ABC):
"""Base class for embedding providers."""
@abstractmethod
async def embed(
self,
texts: list[str],
**kwargs
) -> EmbeddingResult:
"""Generate embeddings for texts."""
pass
@abstractmethod
def get_spec(self) -> ModelSpec:
"""Get model specification."""
pass
class OpenAIEmbedding(EmbeddingProvider):
"""OpenAI embedding provider."""
def __init__(self, client: Any, model: str = "text-embedding-3-small"):
self.client = client
self.model = model
async def embed(
self,
texts: list[str],
dimensions: int = None,
**kwargs
) -> EmbeddingResult:
"""Generate embeddings using OpenAI."""
import time
start = time.time()
params = {"model": self.model, "input": texts}
if dimensions and "3-" in self.model:
params["dimensions"] = dimensions
response = await self.client.embeddings.create(**params)
latency = (time.time() - start) * 1000
embeddings = [item.embedding for item in response.data]
return EmbeddingResult(
embeddings=embeddings,
model=self.model,
dimensions=len(embeddings[0]),
tokens_used=response.usage.total_tokens,
latency_ms=latency
)
def get_spec(self) -> ModelSpec:
return MODELS.get(self.model)
class VoyageEmbedding(EmbeddingProvider):
"""Voyage AI embedding provider."""
def __init__(self, client: Any, model: str = "voyage-large-2"):
self.client = client
self.model = model
async def embed(
self,
texts: list[str],
input_type: str = None,
**kwargs
) -> EmbeddingResult:
"""Generate embeddings using Voyage."""
import time
start = time.time()
params = {"model": self.model, "input": texts}
if input_type:
params["input_type"] = input_type
response = await self.client.embed(**params)
latency = (time.time() - start) * 1000
return EmbeddingResult(
embeddings=response.embeddings,
model=self.model,
dimensions=len(response.embeddings[0]),
tokens_used=response.total_tokens,
latency_ms=latency
)
def get_spec(self) -> ModelSpec:
return MODELS.get(self.model)
class HuggingFaceEmbedding(EmbeddingProvider):
"""Local HuggingFace embedding provider."""
def __init__(self, model_name: str = "BAAI/bge-large-en-v1.5"):
self.model_name = model_name
self._model = None
self._tokenizer = None
def _load_model(self):
"""Lazy load model."""
if self._model is None:
from transformers import AutoModel, AutoTokenizer
import torch
self._tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self._model = AutoModel.from_pretrained(self.model_name)
if torch.cuda.is_available():
self._model = self._model.cuda()
async def embed(
self,
texts: list[str],
**kwargs
) -> EmbeddingResult:
"""Generate embeddings locally."""
import time
import torch
self._load_model()
start = time.time()
# Tokenize
encoded = self._tokenizer(
texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
)
if torch.cuda.is_available():
encoded = {k: v.cuda() for k, v in encoded.items()}
# Generate embeddings
with torch.no_grad():
outputs = self._model(**encoded)
embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
latency = (time.time() - start) * 1000
return EmbeddingResult(
embeddings=embeddings.tolist(),
model=self.model_name,
dimensions=embeddings.shape[1],
tokens_used=sum(len(self._tokenizer.encode(t)) for t in texts),
latency_ms=latency
)
def get_spec(self) -> ModelSpec:
return MODELS.get(self.model_name)
class UnifiedEmbedder:
"""Unified interface for multiple embedding providers."""
def __init__(self):
self._providers: dict[str, EmbeddingProvider] = {}
self._default_provider: str = None
def register(self, name: str, provider: EmbeddingProvider, default: bool = False):
"""Register an embedding provider."""
self._providers[name] = provider
if default or self._default_provider is None:
self._default_provider = name
async def embed(
self,
texts: list[str],
provider: str = None,
**kwargs
) -> EmbeddingResult:
"""Generate embeddings using specified or default provider."""
provider_name = provider or self._default_provider
if provider_name not in self._providers:
raise ValueError(f"Unknown provider: {provider_name}")
return await self._providers[provider_name].embed(texts, **kwargs)
def get_providers(self) -> list[str]:
"""List registered providers."""
return list(self._providers.keys())
Benchmarking
from dataclasses import dataclass, field
from typing import Any, Optional
import numpy as np
import time
@dataclass
class BenchmarkResult:
"""Result of model benchmark."""
model: str
task: str
# Quality metrics
accuracy: float = None
precision: float = None
recall: float = None
ndcg: float = None
mrr: float = None
# Performance metrics
avg_latency_ms: float = None
p95_latency_ms: float = None
throughput: float = None
# Cost metrics
total_tokens: int = 0
estimated_cost: float = 0.0
@dataclass
class BenchmarkDataset:
"""Dataset for benchmarking."""
queries: list[str]
documents: list[str]
relevance: dict[int, list[int]] # query_idx -> relevant_doc_idxs
class EmbeddingBenchmark:
"""Benchmark embedding models."""
def __init__(self, embedder: UnifiedEmbedder):
self.embedder = embedder
async def benchmark_retrieval(
self,
dataset: BenchmarkDataset,
provider: str,
k: int = 10
) -> BenchmarkResult:
"""Benchmark retrieval performance."""
latencies = []
total_tokens = 0
# Embed documents
start = time.time()
doc_result = await self.embedder.embed(
dataset.documents,
provider=provider
)
doc_embeddings = np.array(doc_result.embeddings)
latencies.append(doc_result.latency_ms)
total_tokens += doc_result.tokens_used
# Embed queries and evaluate
hits = 0
total_relevant = 0
reciprocal_ranks = []
ndcg_scores = []
for i, query in enumerate(dataset.queries):
query_result = await self.embedder.embed([query], provider=provider)
query_embedding = np.array(query_result.embeddings[0])
latencies.append(query_result.latency_ms)
total_tokens += query_result.tokens_used
# Calculate similarities
similarities = np.dot(doc_embeddings, query_embedding)
top_k_indices = np.argsort(similarities)[-k:][::-1]
# Calculate metrics
relevant = set(dataset.relevance.get(i, []))
total_relevant += len(relevant)
# Hits@k
hits += len(set(top_k_indices) & relevant)
# MRR
for rank, idx in enumerate(top_k_indices, 1):
if idx in relevant:
reciprocal_ranks.append(1.0 / rank)
break
else:
reciprocal_ranks.append(0.0)
# NDCG
dcg = sum(
1.0 / np.log2(rank + 2)
for rank, idx in enumerate(top_k_indices)
if idx in relevant
)
ideal_dcg = sum(
1.0 / np.log2(rank + 2)
for rank in range(min(len(relevant), k))
)
ndcg_scores.append(dcg / ideal_dcg if ideal_dcg > 0 else 0)
# Calculate cost
spec = self.embedder._providers[provider].get_spec()
cost = (total_tokens / 1000) * (spec.cost_per_1k_tokens or 0)
return BenchmarkResult(
model=provider,
task="retrieval",
recall=hits / total_relevant if total_relevant > 0 else 0,
mrr=np.mean(reciprocal_ranks),
ndcg=np.mean(ndcg_scores),
avg_latency_ms=np.mean(latencies),
p95_latency_ms=np.percentile(latencies, 95),
throughput=len(dataset.queries) / (sum(latencies) / 1000),
total_tokens=total_tokens,
estimated_cost=cost
)
async def benchmark_clustering(
self,
texts: list[str],
labels: list[int],
provider: str
) -> BenchmarkResult:
"""Benchmark clustering performance."""
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
# Embed texts
result = await self.embedder.embed(texts, provider=provider)
embeddings = np.array(result.embeddings)
# Cluster
n_clusters = len(set(labels))
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
predicted = kmeans.fit_predict(embeddings)
# Evaluate
ari = adjusted_rand_score(labels, predicted)
nmi = normalized_mutual_info_score(labels, predicted)
spec = self.embedder._providers[provider].get_spec()
cost = (result.tokens_used / 1000) * (spec.cost_per_1k_tokens or 0)
return BenchmarkResult(
model=provider,
task="clustering",
accuracy=(ari + nmi) / 2, # Combined score
avg_latency_ms=result.latency_ms,
total_tokens=result.tokens_used,
estimated_cost=cost
)
async def compare_models(
self,
dataset: BenchmarkDataset,
providers: list[str],
task: str = "retrieval"
) -> list[BenchmarkResult]:
"""Compare multiple models on same dataset."""
results = []
for provider in providers:
if task == "retrieval":
result = await self.benchmark_retrieval(dataset, provider)
elif task == "clustering":
result = await self.benchmark_clustering(
dataset.documents,
list(range(len(dataset.documents))), # Placeholder labels
provider
)
else:
raise ValueError(f"Unknown task: {task}")
results.append(result)
return results
Dimension Reduction
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
@dataclass
class ReductionResult:
"""Result of dimension reduction."""
embeddings: np.ndarray
original_dims: int
reduced_dims: int
variance_retained: float = None
class DimensionReducer:
"""Reduce embedding dimensions."""
def __init__(self, method: str = "pca"):
self.method = method
self._reducer = None
def fit(self, embeddings: np.ndarray, target_dims: int) -> "DimensionReducer":
"""Fit reducer on embeddings."""
if self.method == "pca":
from sklearn.decomposition import PCA
self._reducer = PCA(n_components=target_dims)
self._reducer.fit(embeddings)
elif self.method == "umap":
import umap
self._reducer = umap.UMAP(n_components=target_dims)
self._reducer.fit(embeddings)
elif self.method == "random":
# Random projection
self._projection_matrix = np.random.randn(
embeddings.shape[1], target_dims
) / np.sqrt(target_dims)
else:
raise ValueError(f"Unknown method: {self.method}")
return self
def transform(self, embeddings: np.ndarray) -> ReductionResult:
"""Transform embeddings to lower dimensions."""
original_dims = embeddings.shape[1]
if self.method == "random":
reduced = embeddings @ self._projection_matrix
variance_retained = None
else:
reduced = self._reducer.transform(embeddings)
if self.method == "pca":
variance_retained = sum(self._reducer.explained_variance_ratio_)
else:
variance_retained = None
return ReductionResult(
embeddings=reduced,
original_dims=original_dims,
reduced_dims=reduced.shape[1],
variance_retained=variance_retained
)
def fit_transform(
self,
embeddings: np.ndarray,
target_dims: int
) -> ReductionResult:
"""Fit and transform in one step."""
self.fit(embeddings, target_dims)
return self.transform(embeddings)
class MatryoshkaEmbedding:
"""Use Matryoshka embeddings for flexible dimensions."""
def __init__(self, provider: EmbeddingProvider):
self.provider = provider
async def embed(
self,
texts: list[str],
dimensions: int = None
) -> EmbeddingResult:
"""Generate embeddings with optional dimension truncation."""
# Get full embeddings
result = await self.provider.embed(texts)
if dimensions and dimensions < result.dimensions:
# Truncate and normalize
truncated = [
self._truncate_and_normalize(emb, dimensions)
for emb in result.embeddings
]
return EmbeddingResult(
embeddings=truncated,
model=result.model,
dimensions=dimensions,
tokens_used=result.tokens_used,
latency_ms=result.latency_ms
)
return result
def _truncate_and_normalize(
self,
embedding: list[float],
dimensions: int
) -> list[float]:
"""Truncate and L2 normalize."""
truncated = np.array(embedding[:dimensions])
norm = np.linalg.norm(truncated)
if norm > 0:
truncated = truncated / norm
return truncated.tolist()
Production Embedding Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
embedder = UnifiedEmbedder()
benchmark = EmbeddingBenchmark(embedder)
class EmbedRequest(BaseModel):
texts: list[str]
provider: Optional[str] = None
dimensions: Optional[int] = None
class BenchmarkRequest(BaseModel):
queries: list[str]
documents: list[str]
relevance: dict[str, list[int]]
providers: list[str]
@app.post("/v1/embed")
async def embed_texts(request: EmbedRequest):
"""Generate embeddings."""
try:
result = await embedder.embed(
texts=request.texts,
provider=request.provider
)
# Optionally reduce dimensions
if request.dimensions and request.dimensions < result.dimensions:
reducer = DimensionReducer(method="pca")
reduced = reducer.fit_transform(
np.array(result.embeddings),
request.dimensions
)
return {
"embeddings": reduced.embeddings.tolist(),
"model": result.model,
"dimensions": reduced.reduced_dims,
"tokens_used": result.tokens_used,
"latency_ms": result.latency_ms
}
return {
"embeddings": result.embeddings,
"model": result.model,
"dimensions": result.dimensions,
"tokens_used": result.tokens_used,
"latency_ms": result.latency_ms
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/benchmark")
async def run_benchmark(request: BenchmarkRequest):
"""Benchmark embedding models."""
dataset = BenchmarkDataset(
queries=request.queries,
documents=request.documents,
relevance={int(k): v for k, v in request.relevance.items()}
)
results = await benchmark.compare_models(
dataset=dataset,
providers=request.providers
)
return {
"results": [
{
"model": r.model,
"recall": r.recall,
"mrr": r.mrr,
"ndcg": r.ndcg,
"avg_latency_ms": r.avg_latency_ms,
"estimated_cost": r.estimated_cost
}
for r in results
]
}
@app.get("/v1/models")
async def list_models():
"""List available embedding models."""
return {
"models": [
{
"name": spec.name,
"provider": spec.provider,
"dimensions": spec.dimensions,
"max_tokens": spec.max_tokens,
"retrieval_score": spec.retrieval_score,
"cost_per_1k_tokens": spec.cost_per_1k_tokens
}
for spec in MODELS.values()
]
}
@app.get("/v1/providers")
async def list_providers():
"""List registered providers."""
return {"providers": embedder.get_providers()}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- MTEB Leaderboard: https://huggingface.co/spaces/mteb/leaderboard
- OpenAI Embeddings: https://platform.openai.com/docs/guides/embeddings
- Voyage AI: https://docs.voyageai.com/
- Sentence Transformers: https://www.sbert.net/
- Matryoshka Embeddings: https://arxiv.org/abs/2205.13147
Conclusion
Embedding model selection significantly impacts your application's quality and cost. Start by understanding your task—retrieval, clustering, and classification have different requirements. Use the MTEB leaderboard as a starting point, but benchmark on your specific data because domain matters. Consider the cost-quality tradeoff: OpenAI's text-embedding-3-small offers excellent value, while Voyage excels for code and specialized domains. For high-volume applications, self-hosted models like BGE or E5 eliminate per-token costs. Use dimension reduction when storage or latency matters—Matryoshka embeddings let you truncate without retraining. Build a unified interface that lets you switch models easily as better options emerge. The key insight is that embedding quality directly determines retrieval quality, which determines RAG quality. Invest time in selecting and benchmarking the right model for your use case.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.