Introduction: Choosing the right embedding model is one of the most impactful decisions in building semantic search and RAG systems. The embedding model determines how well your system understands the semantic meaning of text, how accurately it retrieves relevant documents, and ultimately how useful your AI application is to users. But the landscape is complex: OpenAI’s text-embedding-3, Cohere’s embed-v3, open-source models like BGE and E5, specialized models for code or multilingual content—each with different dimensions, performance characteristics, and cost profiles. The right choice depends on your specific use case: document length, language requirements, domain specificity, latency constraints, and budget. This guide covers practical approaches to embedding model selection: understanding the key metrics, benchmarking on your data, and making informed tradeoffs between quality, speed, and cost.

Model Catalog and Comparison
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from enum import Enum
class ModelProvider(Enum):
"""Embedding model providers."""
OPENAI = "openai"
COHERE = "cohere"
VOYAGE = "voyage"
HUGGINGFACE = "huggingface"
SENTENCE_TRANSFORMERS = "sentence_transformers"
GOOGLE = "google"
class ModelType(Enum):
"""Types of embedding models."""
GENERAL = "general"
CODE = "code"
MULTILINGUAL = "multilingual"
DOMAIN_SPECIFIC = "domain_specific"
@dataclass
class EmbeddingModelSpec:
"""Specification for an embedding model."""
name: str
provider: ModelProvider
model_type: ModelType
dimensions: int
max_tokens: int
supports_batching: bool = True
supports_truncation: bool = True
cost_per_1k_tokens: float = 0.0
avg_latency_ms: float = 0.0
mteb_score: float = 0.0
languages: list[str] = field(default_factory=lambda: ["en"])
description: str = ""
# Model catalog
MODEL_CATALOG = {
"text-embedding-3-small": EmbeddingModelSpec(
name="text-embedding-3-small",
provider=ModelProvider.OPENAI,
model_type=ModelType.GENERAL,
dimensions=1536,
max_tokens=8191,
cost_per_1k_tokens=0.00002,
avg_latency_ms=50,
mteb_score=62.3,
languages=["en", "multilingual"],
description="OpenAI's cost-effective embedding model"
),
"text-embedding-3-large": EmbeddingModelSpec(
name="text-embedding-3-large",
provider=ModelProvider.OPENAI,
model_type=ModelType.GENERAL,
dimensions=3072,
max_tokens=8191,
cost_per_1k_tokens=0.00013,
avg_latency_ms=80,
mteb_score=64.6,
languages=["en", "multilingual"],
description="OpenAI's highest quality embedding model"
),
"embed-english-v3.0": EmbeddingModelSpec(
name="embed-english-v3.0",
provider=ModelProvider.COHERE,
model_type=ModelType.GENERAL,
dimensions=1024,
max_tokens=512,
cost_per_1k_tokens=0.0001,
avg_latency_ms=60,
mteb_score=64.5,
languages=["en"],
description="Cohere's English embedding model"
),
"embed-multilingual-v3.0": EmbeddingModelSpec(
name="embed-multilingual-v3.0",
provider=ModelProvider.COHERE,
model_type=ModelType.MULTILINGUAL,
dimensions=1024,
max_tokens=512,
cost_per_1k_tokens=0.0001,
avg_latency_ms=70,
mteb_score=66.3,
languages=["100+ languages"],
description="Cohere's multilingual embedding model"
),
"voyage-large-2": EmbeddingModelSpec(
name="voyage-large-2",
provider=ModelProvider.VOYAGE,
model_type=ModelType.GENERAL,
dimensions=1536,
max_tokens=16000,
cost_per_1k_tokens=0.00012,
avg_latency_ms=100,
mteb_score=68.3,
languages=["en"],
description="Voyage AI's high-quality embedding model"
),
"voyage-code-2": EmbeddingModelSpec(
name="voyage-code-2",
provider=ModelProvider.VOYAGE,
model_type=ModelType.CODE,
dimensions=1536,
max_tokens=16000,
cost_per_1k_tokens=0.00012,
avg_latency_ms=100,
mteb_score=0.0,
languages=["code"],
description="Voyage AI's code-specialized embedding model"
),
"bge-large-en-v1.5": EmbeddingModelSpec(
name="BAAI/bge-large-en-v1.5",
provider=ModelProvider.HUGGINGFACE,
model_type=ModelType.GENERAL,
dimensions=1024,
max_tokens=512,
cost_per_1k_tokens=0.0,
avg_latency_ms=20,
mteb_score=64.2,
languages=["en"],
description="Open-source BGE model from BAAI"
),
"e5-large-v2": EmbeddingModelSpec(
name="intfloat/e5-large-v2",
provider=ModelProvider.HUGGINGFACE,
model_type=ModelType.GENERAL,
dimensions=1024,
max_tokens=512,
cost_per_1k_tokens=0.0,
avg_latency_ms=25,
mteb_score=62.2,
languages=["en"],
description="Microsoft's E5 embedding model"
),
"all-MiniLM-L6-v2": EmbeddingModelSpec(
name="sentence-transformers/all-MiniLM-L6-v2",
provider=ModelProvider.SENTENCE_TRANSFORMERS,
model_type=ModelType.GENERAL,
dimensions=384,
max_tokens=256,
cost_per_1k_tokens=0.0,
avg_latency_ms=5,
mteb_score=56.3,
languages=["en"],
description="Fast, lightweight embedding model"
),
"text-embedding-004": EmbeddingModelSpec(
name="text-embedding-004",
provider=ModelProvider.GOOGLE,
model_type=ModelType.GENERAL,
dimensions=768,
max_tokens=2048,
cost_per_1k_tokens=0.00001,
avg_latency_ms=40,
mteb_score=66.3,
languages=["en", "multilingual"],
description="Google's Gecko embedding model"
)
}
class ModelSelector:
"""Select embedding model based on requirements."""
def __init__(self, catalog: dict[str, EmbeddingModelSpec] = None):
self.catalog = catalog or MODEL_CATALOG
def filter_by_provider(self, provider: ModelProvider) -> list[EmbeddingModelSpec]:
"""Filter models by provider."""
return [m for m in self.catalog.values() if m.provider == provider]
def filter_by_type(self, model_type: ModelType) -> list[EmbeddingModelSpec]:
"""Filter models by type."""
return [m for m in self.catalog.values() if m.model_type == model_type]
def filter_by_dimensions(
self,
min_dims: int = 0,
max_dims: int = 10000
) -> list[EmbeddingModelSpec]:
"""Filter models by dimension range."""
return [
m for m in self.catalog.values()
if min_dims <= m.dimensions <= max_dims
]
def filter_by_cost(self, max_cost: float) -> list[EmbeddingModelSpec]:
"""Filter models by cost."""
return [
m for m in self.catalog.values()
if m.cost_per_1k_tokens <= max_cost
]
def rank_by_quality(self) -> list[EmbeddingModelSpec]:
"""Rank models by MTEB score."""
models = list(self.catalog.values())
return sorted(models, key=lambda m: m.mteb_score, reverse=True)
def rank_by_speed(self) -> list[EmbeddingModelSpec]:
"""Rank models by latency."""
models = list(self.catalog.values())
return sorted(models, key=lambda m: m.avg_latency_ms)
def recommend(
self,
use_case: str,
budget: str = "medium",
latency_requirement: str = "medium"
) -> list[EmbeddingModelSpec]:
"""Recommend models for use case."""
candidates = list(self.catalog.values())
# Filter by use case
if use_case == "code":
candidates = [m for m in candidates if m.model_type == ModelType.CODE or "code" in m.name.lower()]
elif use_case == "multilingual":
candidates = [m for m in candidates if m.model_type == ModelType.MULTILINGUAL or len(m.languages) > 1]
# Filter by budget
if budget == "low":
candidates = [m for m in candidates if m.cost_per_1k_tokens == 0 or m.cost_per_1k_tokens < 0.00005]
elif budget == "medium":
candidates = [m for m in candidates if m.cost_per_1k_tokens < 0.0002]
# Filter by latency
if latency_requirement == "low":
candidates = [m for m in candidates if m.avg_latency_ms < 30]
elif latency_requirement == "medium":
candidates = [m for m in candidates if m.avg_latency_ms < 100]
# Sort by quality
return sorted(candidates, key=lambda m: m.mteb_score, reverse=True)
Benchmarking Framework
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import time
import numpy as np
@dataclass
class BenchmarkResult:
"""Result of model benchmark."""
model_name: str
retrieval_accuracy: float
mrr: float # Mean Reciprocal Rank
ndcg: float # Normalized Discounted Cumulative Gain
avg_latency_ms: float
p95_latency_ms: float
throughput_per_sec: float
memory_mb: float
@dataclass
class BenchmarkDataset:
"""Dataset for benchmarking."""
queries: list[str]
documents: list[str]
relevance_labels: list[list[int]] # For each query, list of relevant doc indices
class EmbeddingBenchmark:
"""Benchmark embedding models."""
def __init__(self, dataset: BenchmarkDataset):
self.dataset = dataset
async def benchmark_model(
self,
model: Any,
model_name: str
) -> BenchmarkResult:
"""Run full benchmark on model."""
# Embed documents
doc_embeddings = await self._embed_with_timing(
model,
self.dataset.documents
)
# Embed queries and measure latency
latencies = []
query_embeddings = []
for query in self.dataset.queries:
start = time.perf_counter()
emb = await model.embed(query)
latencies.append((time.perf_counter() - start) * 1000)
query_embeddings.append(emb)
# Calculate retrieval metrics
accuracy = self._calculate_accuracy(
query_embeddings,
doc_embeddings,
self.dataset.relevance_labels
)
mrr = self._calculate_mrr(
query_embeddings,
doc_embeddings,
self.dataset.relevance_labels
)
ndcg = self._calculate_ndcg(
query_embeddings,
doc_embeddings,
self.dataset.relevance_labels
)
return BenchmarkResult(
model_name=model_name,
retrieval_accuracy=accuracy,
mrr=mrr,
ndcg=ndcg,
avg_latency_ms=np.mean(latencies),
p95_latency_ms=np.percentile(latencies, 95),
throughput_per_sec=1000 / np.mean(latencies),
memory_mb=0 # Would measure actual memory usage
)
async def _embed_with_timing(
self,
model: Any,
texts: list[str]
) -> list[list[float]]:
"""Embed texts and return embeddings."""
embeddings = []
for text in texts:
emb = await model.embed(text)
embeddings.append(emb)
return embeddings
def _calculate_accuracy(
self,
query_embs: list,
doc_embs: list,
relevance: list[list[int]],
k: int = 10
) -> float:
"""Calculate retrieval accuracy at k."""
correct = 0
for i, query_emb in enumerate(query_embs):
# Calculate similarities
similarities = [
self._cosine_similarity(query_emb, doc_emb)
for doc_emb in doc_embs
]
# Get top-k indices
top_k = np.argsort(similarities)[-k:][::-1]
# Check if any relevant doc is in top-k
relevant = set(relevance[i])
if any(idx in relevant for idx in top_k):
correct += 1
return correct / len(query_embs)
def _calculate_mrr(
self,
query_embs: list,
doc_embs: list,
relevance: list[list[int]]
) -> float:
"""Calculate Mean Reciprocal Rank."""
reciprocal_ranks = []
for i, query_emb in enumerate(query_embs):
similarities = [
self._cosine_similarity(query_emb, doc_emb)
for doc_emb in doc_embs
]
ranked = np.argsort(similarities)[::-1]
relevant = set(relevance[i])
for rank, idx in enumerate(ranked, 1):
if idx in relevant:
reciprocal_ranks.append(1 / rank)
break
else:
reciprocal_ranks.append(0)
return np.mean(reciprocal_ranks)
def _calculate_ndcg(
self,
query_embs: list,
doc_embs: list,
relevance: list[list[int]],
k: int = 10
) -> float:
"""Calculate Normalized Discounted Cumulative Gain."""
ndcg_scores = []
for i, query_emb in enumerate(query_embs):
similarities = [
self._cosine_similarity(query_emb, doc_emb)
for doc_emb in doc_embs
]
ranked = np.argsort(similarities)[-k:][::-1]
relevant = set(relevance[i])
# Calculate DCG
dcg = 0
for rank, idx in enumerate(ranked, 1):
if idx in relevant:
dcg += 1 / np.log2(rank + 1)
# Calculate ideal DCG
ideal_dcg = sum(1 / np.log2(r + 1) for r in range(1, min(len(relevant), k) + 1))
if ideal_dcg > 0:
ndcg_scores.append(dcg / ideal_dcg)
else:
ndcg_scores.append(0)
return np.mean(ndcg_scores)
def _cosine_similarity(self, a: list, b: list) -> float:
"""Calculate cosine similarity."""
a = np.array(a)
b = np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
class DomainBenchmark:
"""Benchmark models on domain-specific data."""
def __init__(self, domain: str):
self.domain = domain
self.test_pairs: list[tuple[str, str, float]] = [] # (query, doc, relevance)
def add_test_pair(self, query: str, document: str, relevance: float):
"""Add a test pair."""
self.test_pairs.append((query, document, relevance))
async def evaluate(self, model: Any) -> dict:
"""Evaluate model on domain data."""
predictions = []
labels = []
for query, doc, relevance in self.test_pairs:
query_emb = await model.embed(query)
doc_emb = await model.embed(doc)
similarity = np.dot(query_emb, doc_emb) / (
np.linalg.norm(query_emb) * np.linalg.norm(doc_emb)
)
predictions.append(similarity)
labels.append(relevance)
# Calculate correlation
correlation = np.corrcoef(predictions, labels)[0, 1]
# Calculate ranking accuracy
correct_rankings = 0
total_pairs = 0
for i in range(len(predictions)):
for j in range(i + 1, len(predictions)):
if labels[i] != labels[j]:
total_pairs += 1
if (predictions[i] > predictions[j]) == (labels[i] > labels[j]):
correct_rankings += 1
ranking_accuracy = correct_rankings / total_pairs if total_pairs > 0 else 0
return {
"domain": self.domain,
"correlation": correlation,
"ranking_accuracy": ranking_accuracy,
"num_pairs": len(self.test_pairs)
}
Model Evaluation Pipeline
from dataclasses import dataclass
from typing import Any, Optional, List
import asyncio
@dataclass
class EvaluationConfig:
"""Configuration for model evaluation."""
test_queries: list[str]
test_documents: list[str]
relevance_labels: list[list[int]]
batch_size: int = 32
num_trials: int = 3
class ModelEvaluator:
"""Evaluate and compare embedding models."""
def __init__(self, config: EvaluationConfig):
self.config = config
async def evaluate_model(
self,
model: Any,
model_name: str
) -> dict:
"""Evaluate a single model."""
results = {
"model_name": model_name,
"quality_metrics": {},
"performance_metrics": {},
"cost_metrics": {}
}
# Quality evaluation
benchmark = EmbeddingBenchmark(
BenchmarkDataset(
queries=self.config.test_queries,
documents=self.config.test_documents,
relevance_labels=self.config.relevance_labels
)
)
bench_result = await benchmark.benchmark_model(model, model_name)
results["quality_metrics"] = {
"retrieval_accuracy": bench_result.retrieval_accuracy,
"mrr": bench_result.mrr,
"ndcg": bench_result.ndcg
}
results["performance_metrics"] = {
"avg_latency_ms": bench_result.avg_latency_ms,
"p95_latency_ms": bench_result.p95_latency_ms,
"throughput_per_sec": bench_result.throughput_per_sec
}
return results
async def compare_models(
self,
models: list[tuple[Any, str]]
) -> list[dict]:
"""Compare multiple models."""
results = []
for model, name in models:
result = await self.evaluate_model(model, name)
results.append(result)
return results
def rank_models(
self,
results: list[dict],
weights: dict = None
) -> list[dict]:
"""Rank models by weighted score."""
weights = weights or {
"retrieval_accuracy": 0.3,
"mrr": 0.2,
"ndcg": 0.2,
"latency": 0.15,
"throughput": 0.15
}
for result in results:
score = 0
# Quality scores (higher is better)
score += weights["retrieval_accuracy"] * result["quality_metrics"]["retrieval_accuracy"]
score += weights["mrr"] * result["quality_metrics"]["mrr"]
score += weights["ndcg"] * result["quality_metrics"]["ndcg"]
# Performance scores (normalized)
max_latency = max(r["performance_metrics"]["avg_latency_ms"] for r in results)
latency_score = 1 - (result["performance_metrics"]["avg_latency_ms"] / max_latency)
score += weights["latency"] * latency_score
max_throughput = max(r["performance_metrics"]["throughput_per_sec"] for r in results)
throughput_score = result["performance_metrics"]["throughput_per_sec"] / max_throughput
score += weights["throughput"] * throughput_score
result["overall_score"] = score
return sorted(results, key=lambda r: r["overall_score"], reverse=True)
class ABTestRunner:
"""Run A/B tests between embedding models."""
def __init__(
self,
model_a: Any,
model_b: Any,
retriever: Any
):
self.model_a = model_a
self.model_b = model_b
self.retriever = retriever
self.results_a: list[dict] = []
self.results_b: list[dict] = []
async def run_query(self, query: str, use_model_a: bool) -> dict:
"""Run query with specified model."""
model = self.model_a if use_model_a else self.model_b
start = time.perf_counter()
embedding = await model.embed(query)
results = await self.retriever.search(embedding)
latency = (time.perf_counter() - start) * 1000
result = {
"query": query,
"results": results,
"latency_ms": latency,
"model": "A" if use_model_a else "B"
}
if use_model_a:
self.results_a.append(result)
else:
self.results_b.append(result)
return result
def get_statistics(self) -> dict:
"""Get A/B test statistics."""
if not self.results_a or not self.results_b:
return {"error": "Not enough data"}
latencies_a = [r["latency_ms"] for r in self.results_a]
latencies_b = [r["latency_ms"] for r in self.results_b]
return {
"model_a": {
"num_queries": len(self.results_a),
"avg_latency_ms": np.mean(latencies_a),
"p95_latency_ms": np.percentile(latencies_a, 95)
},
"model_b": {
"num_queries": len(self.results_b),
"avg_latency_ms": np.mean(latencies_b),
"p95_latency_ms": np.percentile(latencies_b, 95)
},
"latency_improvement": (np.mean(latencies_a) - np.mean(latencies_b)) / np.mean(latencies_a) * 100
}
class CostCalculator:
"""Calculate embedding costs."""
def __init__(self, model_spec: EmbeddingModelSpec):
self.spec = model_spec
def estimate_monthly_cost(
self,
queries_per_day: int,
avg_query_tokens: int,
documents_to_embed: int,
avg_doc_tokens: int
) -> dict:
"""Estimate monthly embedding costs."""
# Query costs
daily_query_tokens = queries_per_day * avg_query_tokens
monthly_query_tokens = daily_query_tokens * 30
query_cost = (monthly_query_tokens / 1000) * self.spec.cost_per_1k_tokens
# Document embedding costs (one-time, amortized over 12 months)
doc_tokens = documents_to_embed * avg_doc_tokens
doc_cost = (doc_tokens / 1000) * self.spec.cost_per_1k_tokens / 12
return {
"model": self.spec.name,
"monthly_query_cost": query_cost,
"monthly_doc_cost": doc_cost,
"total_monthly_cost": query_cost + doc_cost,
"cost_per_query": (avg_query_tokens / 1000) * self.spec.cost_per_1k_tokens
}
def compare_costs(
self,
other_spec: EmbeddingModelSpec,
queries_per_day: int,
avg_query_tokens: int
) -> dict:
"""Compare costs with another model."""
this_cost = self.estimate_monthly_cost(queries_per_day, avg_query_tokens, 0, 0)
other_calc = CostCalculator(other_spec)
other_cost = other_calc.estimate_monthly_cost(queries_per_day, avg_query_tokens, 0, 0)
return {
"model_a": self.spec.name,
"model_b": other_spec.name,
"cost_a": this_cost["monthly_query_cost"],
"cost_b": other_cost["monthly_query_cost"],
"savings": other_cost["monthly_query_cost"] - this_cost["monthly_query_cost"],
"savings_percent": (other_cost["monthly_query_cost"] - this_cost["monthly_query_cost"]) / other_cost["monthly_query_cost"] * 100 if other_cost["monthly_query_cost"] > 0 else 0
}
Model Adapters
from abc import ABC, abstractmethod
from typing import Any, Optional, List
import asyncio
class EmbeddingModel(ABC):
"""Abstract base class for embedding models."""
@abstractmethod
async def embed(self, text: str) -> list[float]:
"""Embed a single text."""
pass
@abstractmethod
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Embed a batch of texts."""
pass
@property
@abstractmethod
def dimensions(self) -> int:
"""Get embedding dimensions."""
pass
class OpenAIEmbedding(EmbeddingModel):
"""OpenAI embedding model adapter."""
def __init__(
self,
model: str = "text-embedding-3-small",
api_key: str = None
):
self.model = model
self.api_key = api_key
self._dimensions = 1536 if "small" in model else 3072
async def embed(self, text: str) -> list[float]:
"""Embed single text."""
import openai
client = openai.AsyncOpenAI(api_key=self.api_key)
response = await client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Embed batch of texts."""
import openai
client = openai.AsyncOpenAI(api_key=self.api_key)
response = await client.embeddings.create(
model=self.model,
input=texts
)
return [d.embedding for d in response.data]
@property
def dimensions(self) -> int:
return self._dimensions
class CohereEmbedding(EmbeddingModel):
"""Cohere embedding model adapter."""
def __init__(
self,
model: str = "embed-english-v3.0",
api_key: str = None
):
self.model = model
self.api_key = api_key
self._dimensions = 1024
async def embed(self, text: str) -> list[float]:
"""Embed single text."""
import cohere
client = cohere.AsyncClient(api_key=self.api_key)
response = await client.embed(
texts=[text],
model=self.model,
input_type="search_query"
)
return response.embeddings[0]
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Embed batch of texts."""
import cohere
client = cohere.AsyncClient(api_key=self.api_key)
response = await client.embed(
texts=texts,
model=self.model,
input_type="search_document"
)
return response.embeddings
@property
def dimensions(self) -> int:
return self._dimensions
class HuggingFaceEmbedding(EmbeddingModel):
"""Hugging Face embedding model adapter."""
def __init__(self, model_name: str = "BAAI/bge-large-en-v1.5"):
self.model_name = model_name
self._model = None
self._tokenizer = None
self._dimensions = None
def _load_model(self):
"""Lazy load model."""
if self._model is None:
from transformers import AutoModel, AutoTokenizer
import torch
self._tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self._model = AutoModel.from_pretrained(self.model_name)
self._model.eval()
# Get dimensions from model config
self._dimensions = self._model.config.hidden_size
async def embed(self, text: str) -> list[float]:
"""Embed single text."""
self._load_model()
import torch
inputs = self._tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True
)
with torch.no_grad():
outputs = self._model(**inputs)
embedding = outputs.last_hidden_state[:, 0, :].squeeze().tolist()
return embedding
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Embed batch of texts."""
self._load_model()
import torch
inputs = self._tokenizer(
texts,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True
)
with torch.no_grad():
outputs = self._model(**inputs)
embeddings = outputs.last_hidden_state[:, 0, :].tolist()
return embeddings
@property
def dimensions(self) -> int:
self._load_model()
return self._dimensions
class SentenceTransformerEmbedding(EmbeddingModel):
"""Sentence Transformers embedding adapter."""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model_name = model_name
self._model = None
def _load_model(self):
"""Lazy load model."""
if self._model is None:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.model_name)
async def embed(self, text: str) -> list[float]:
"""Embed single text."""
self._load_model()
return self._model.encode(text).tolist()
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Embed batch of texts."""
self._load_model()
return self._model.encode(texts).tolist()
@property
def dimensions(self) -> int:
self._load_model()
return self._model.get_sentence_embedding_dimension()
class EmbeddingModelFactory:
"""Factory for creating embedding models."""
@staticmethod
def create(
provider: str,
model_name: str,
**kwargs
) -> EmbeddingModel:
"""Create embedding model."""
if provider == "openai":
return OpenAIEmbedding(model=model_name, **kwargs)
elif provider == "cohere":
return CohereEmbedding(model=model_name, **kwargs)
elif provider == "huggingface":
return HuggingFaceEmbedding(model_name=model_name)
elif provider == "sentence_transformers":
return SentenceTransformerEmbedding(model_name=model_name)
else:
raise ValueError(f"Unknown provider: {provider}")
Production Selection Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict
app = FastAPI()
class SelectionRequest(BaseModel):
use_case: str
budget: str = "medium"
latency_requirement: str = "medium"
languages: Optional[List[str]] = None
min_quality_score: float = 0.0
class BenchmarkRequest(BaseModel):
model_names: List[str]
test_queries: List[str]
test_documents: List[str]
relevance_labels: List[List[int]]
class CostEstimateRequest(BaseModel):
model_name: str
queries_per_day: int
avg_query_tokens: int
documents_to_embed: int = 0
avg_doc_tokens: int = 0
# Initialize components
selector = ModelSelector()
@app.get("/v1/models")
async def list_models() -> list[dict]:
"""List all available models."""
return [
{
"name": spec.name,
"provider": spec.provider.value,
"type": spec.model_type.value,
"dimensions": spec.dimensions,
"max_tokens": spec.max_tokens,
"cost_per_1k_tokens": spec.cost_per_1k_tokens,
"mteb_score": spec.mteb_score,
"languages": spec.languages
}
for spec in MODEL_CATALOG.values()
]
@app.get("/v1/models/{model_name}")
async def get_model(model_name: str) -> dict:
"""Get model details."""
spec = MODEL_CATALOG.get(model_name)
if not spec:
raise HTTPException(status_code=404, detail="Model not found")
return {
"name": spec.name,
"provider": spec.provider.value,
"type": spec.model_type.value,
"dimensions": spec.dimensions,
"max_tokens": spec.max_tokens,
"cost_per_1k_tokens": spec.cost_per_1k_tokens,
"avg_latency_ms": spec.avg_latency_ms,
"mteb_score": spec.mteb_score,
"languages": spec.languages,
"description": spec.description
}
@app.post("/v1/recommend")
async def recommend_models(request: SelectionRequest) -> list[dict]:
"""Get model recommendations."""
recommendations = selector.recommend(
use_case=request.use_case,
budget=request.budget,
latency_requirement=request.latency_requirement
)
# Filter by quality score
recommendations = [
r for r in recommendations
if r.mteb_score >= request.min_quality_score
]
# Filter by languages
if request.languages:
recommendations = [
r for r in recommendations
if any(lang in r.languages for lang in request.languages)
or "multilingual" in r.languages
]
return [
{
"name": r.name,
"provider": r.provider.value,
"mteb_score": r.mteb_score,
"cost_per_1k_tokens": r.cost_per_1k_tokens,
"avg_latency_ms": r.avg_latency_ms,
"recommendation_reason": _get_recommendation_reason(r, request)
}
for r in recommendations[:5]
]
def _get_recommendation_reason(spec: EmbeddingModelSpec, request: SelectionRequest) -> str:
"""Generate recommendation reason."""
reasons = []
if spec.mteb_score > 65:
reasons.append("high quality")
if spec.cost_per_1k_tokens == 0:
reasons.append("free/open-source")
elif spec.cost_per_1k_tokens < 0.0001:
reasons.append("cost-effective")
if spec.avg_latency_ms < 30:
reasons.append("very fast")
if request.use_case == "code" and spec.model_type == ModelType.CODE:
reasons.append("optimized for code")
if request.use_case == "multilingual" and spec.model_type == ModelType.MULTILINGUAL:
reasons.append("multilingual support")
return ", ".join(reasons) if reasons else "good general-purpose model"
@app.post("/v1/estimate-cost")
async def estimate_cost(request: CostEstimateRequest) -> dict:
"""Estimate embedding costs."""
spec = MODEL_CATALOG.get(request.model_name)
if not spec:
raise HTTPException(status_code=404, detail="Model not found")
calculator = CostCalculator(spec)
return calculator.estimate_monthly_cost(
queries_per_day=request.queries_per_day,
avg_query_tokens=request.avg_query_tokens,
documents_to_embed=request.documents_to_embed,
avg_doc_tokens=request.avg_doc_tokens
)
@app.get("/v1/compare/{model_a}/{model_b}")
async def compare_models(model_a: str, model_b: str) -> dict:
"""Compare two models."""
spec_a = MODEL_CATALOG.get(model_a)
spec_b = MODEL_CATALOG.get(model_b)
if not spec_a or not spec_b:
raise HTTPException(status_code=404, detail="Model not found")
return {
"model_a": {
"name": spec_a.name,
"dimensions": spec_a.dimensions,
"mteb_score": spec_a.mteb_score,
"cost_per_1k_tokens": spec_a.cost_per_1k_tokens,
"avg_latency_ms": spec_a.avg_latency_ms
},
"model_b": {
"name": spec_b.name,
"dimensions": spec_b.dimensions,
"mteb_score": spec_b.mteb_score,
"cost_per_1k_tokens": spec_b.cost_per_1k_tokens,
"avg_latency_ms": spec_b.avg_latency_ms
},
"comparison": {
"quality_winner": model_a if spec_a.mteb_score > spec_b.mteb_score else model_b,
"cost_winner": model_a if spec_a.cost_per_1k_tokens < spec_b.cost_per_1k_tokens else model_b,
"speed_winner": model_a if spec_a.avg_latency_ms < spec_b.avg_latency_ms else model_b,
"quality_diff": abs(spec_a.mteb_score - spec_b.mteb_score),
"cost_diff": abs(spec_a.cost_per_1k_tokens - spec_b.cost_per_1k_tokens),
"latency_diff_ms": abs(spec_a.avg_latency_ms - spec_b.avg_latency_ms)
}
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- MTEB Leaderboard: https://huggingface.co/spaces/mteb/leaderboard
- OpenAI Embeddings: https://platform.openai.com/docs/guides/embeddings
- Cohere Embed: https://docs.cohere.com/docs/embeddings
- Voyage AI: https://docs.voyageai.com/
- Sentence Transformers: https://www.sbert.net/
Conclusion
Embedding model selection requires balancing quality, speed, and cost for your specific use case. Start by understanding your requirements: What languages do you need? How long are your documents? What's your latency budget? What can you spend? Use the MTEB leaderboard as a starting point, but always benchmark on your own data—domain-specific performance can vary significantly from general benchmarks. For most English-language applications, OpenAI's text-embedding-3-small offers an excellent balance of quality and cost. For multilingual needs, Cohere's embed-multilingual-v3 excels. For code search, Voyage's code-specialized model outperforms general models. If cost is critical and you can self-host, open-source models like BGE and E5 provide competitive quality at zero API cost. For latency-sensitive applications, smaller models like all-MiniLM-L6-v2 offer sub-10ms inference. Build evaluation pipelines that test models on your actual queries and documents, measure retrieval accuracy, and calculate total cost of ownership. The best model is the one that meets your quality threshold at the lowest cost within your latency constraints.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.