Latest Articles

Embedding Model Selection: Choosing the Right Model for Your Use Case

Introduction: Choosing the right embedding model is one of the most impactful decisions in building semantic search and RAG systems. The embedding model determines how well your system understands the semantic meaning of text, how accurately it retrieves relevant documents, and ultimately how useful your AI application is to users. But the landscape is complex: OpenAI’s text-embedding-3, Cohere’s embed-v3, open-source models like BGE and E5, specialized models for code or multilingual content—each with different dimensions, performance characteristics, and cost profiles. The right choice depends on your specific use case: document length, language requirements, domain specificity, latency constraints, and budget. This guide covers practical approaches to embedding model selection: understanding the key metrics, benchmarking on your data, and making informed tradeoffs between quality, speed, and cost.

Embedding Model Selection
Embedding Selection: Evaluate, Benchmark, Compare

Model Catalog and Comparison

from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from enum import Enum

class ModelProvider(Enum):
    """Embedding model providers."""
    
    OPENAI = "openai"
    COHERE = "cohere"
    VOYAGE = "voyage"
    HUGGINGFACE = "huggingface"
    SENTENCE_TRANSFORMERS = "sentence_transformers"
    GOOGLE = "google"

class ModelType(Enum):
    """Types of embedding models."""
    
    GENERAL = "general"
    CODE = "code"
    MULTILINGUAL = "multilingual"
    DOMAIN_SPECIFIC = "domain_specific"

@dataclass
class EmbeddingModelSpec:
    """Specification for an embedding model."""
    
    name: str
    provider: ModelProvider
    model_type: ModelType
    dimensions: int
    max_tokens: int
    supports_batching: bool = True
    supports_truncation: bool = True
    cost_per_1k_tokens: float = 0.0
    avg_latency_ms: float = 0.0
    mteb_score: float = 0.0
    languages: list[str] = field(default_factory=lambda: ["en"])
    description: str = ""

# Model catalog
MODEL_CATALOG = {
    "text-embedding-3-small": EmbeddingModelSpec(
        name="text-embedding-3-small",
        provider=ModelProvider.OPENAI,
        model_type=ModelType.GENERAL,
        dimensions=1536,
        max_tokens=8191,
        cost_per_1k_tokens=0.00002,
        avg_latency_ms=50,
        mteb_score=62.3,
        languages=["en", "multilingual"],
        description="OpenAI's cost-effective embedding model"
    ),
    "text-embedding-3-large": EmbeddingModelSpec(
        name="text-embedding-3-large",
        provider=ModelProvider.OPENAI,
        model_type=ModelType.GENERAL,
        dimensions=3072,
        max_tokens=8191,
        cost_per_1k_tokens=0.00013,
        avg_latency_ms=80,
        mteb_score=64.6,
        languages=["en", "multilingual"],
        description="OpenAI's highest quality embedding model"
    ),
    "embed-english-v3.0": EmbeddingModelSpec(
        name="embed-english-v3.0",
        provider=ModelProvider.COHERE,
        model_type=ModelType.GENERAL,
        dimensions=1024,
        max_tokens=512,
        cost_per_1k_tokens=0.0001,
        avg_latency_ms=60,
        mteb_score=64.5,
        languages=["en"],
        description="Cohere's English embedding model"
    ),
    "embed-multilingual-v3.0": EmbeddingModelSpec(
        name="embed-multilingual-v3.0",
        provider=ModelProvider.COHERE,
        model_type=ModelType.MULTILINGUAL,
        dimensions=1024,
        max_tokens=512,
        cost_per_1k_tokens=0.0001,
        avg_latency_ms=70,
        mteb_score=66.3,
        languages=["100+ languages"],
        description="Cohere's multilingual embedding model"
    ),
    "voyage-large-2": EmbeddingModelSpec(
        name="voyage-large-2",
        provider=ModelProvider.VOYAGE,
        model_type=ModelType.GENERAL,
        dimensions=1536,
        max_tokens=16000,
        cost_per_1k_tokens=0.00012,
        avg_latency_ms=100,
        mteb_score=68.3,
        languages=["en"],
        description="Voyage AI's high-quality embedding model"
    ),
    "voyage-code-2": EmbeddingModelSpec(
        name="voyage-code-2",
        provider=ModelProvider.VOYAGE,
        model_type=ModelType.CODE,
        dimensions=1536,
        max_tokens=16000,
        cost_per_1k_tokens=0.00012,
        avg_latency_ms=100,
        mteb_score=0.0,
        languages=["code"],
        description="Voyage AI's code-specialized embedding model"
    ),
    "bge-large-en-v1.5": EmbeddingModelSpec(
        name="BAAI/bge-large-en-v1.5",
        provider=ModelProvider.HUGGINGFACE,
        model_type=ModelType.GENERAL,
        dimensions=1024,
        max_tokens=512,
        cost_per_1k_tokens=0.0,
        avg_latency_ms=20,
        mteb_score=64.2,
        languages=["en"],
        description="Open-source BGE model from BAAI"
    ),
    "e5-large-v2": EmbeddingModelSpec(
        name="intfloat/e5-large-v2",
        provider=ModelProvider.HUGGINGFACE,
        model_type=ModelType.GENERAL,
        dimensions=1024,
        max_tokens=512,
        cost_per_1k_tokens=0.0,
        avg_latency_ms=25,
        mteb_score=62.2,
        languages=["en"],
        description="Microsoft's E5 embedding model"
    ),
    "all-MiniLM-L6-v2": EmbeddingModelSpec(
        name="sentence-transformers/all-MiniLM-L6-v2",
        provider=ModelProvider.SENTENCE_TRANSFORMERS,
        model_type=ModelType.GENERAL,
        dimensions=384,
        max_tokens=256,
        cost_per_1k_tokens=0.0,
        avg_latency_ms=5,
        mteb_score=56.3,
        languages=["en"],
        description="Fast, lightweight embedding model"
    ),
    "text-embedding-004": EmbeddingModelSpec(
        name="text-embedding-004",
        provider=ModelProvider.GOOGLE,
        model_type=ModelType.GENERAL,
        dimensions=768,
        max_tokens=2048,
        cost_per_1k_tokens=0.00001,
        avg_latency_ms=40,
        mteb_score=66.3,
        languages=["en", "multilingual"],
        description="Google's Gecko embedding model"
    )
}

class ModelSelector:
    """Select embedding model based on requirements."""
    
    def __init__(self, catalog: dict[str, EmbeddingModelSpec] = None):
        self.catalog = catalog or MODEL_CATALOG
    
    def filter_by_provider(self, provider: ModelProvider) -> list[EmbeddingModelSpec]:
        """Filter models by provider."""
        
        return [m for m in self.catalog.values() if m.provider == provider]
    
    def filter_by_type(self, model_type: ModelType) -> list[EmbeddingModelSpec]:
        """Filter models by type."""
        
        return [m for m in self.catalog.values() if m.model_type == model_type]
    
    def filter_by_dimensions(
        self,
        min_dims: int = 0,
        max_dims: int = 10000
    ) -> list[EmbeddingModelSpec]:
        """Filter models by dimension range."""
        
        return [
            m for m in self.catalog.values()
            if min_dims <= m.dimensions <= max_dims
        ]
    
    def filter_by_cost(self, max_cost: float) -> list[EmbeddingModelSpec]:
        """Filter models by cost."""
        
        return [
            m for m in self.catalog.values()
            if m.cost_per_1k_tokens <= max_cost
        ]
    
    def rank_by_quality(self) -> list[EmbeddingModelSpec]:
        """Rank models by MTEB score."""
        
        models = list(self.catalog.values())
        return sorted(models, key=lambda m: m.mteb_score, reverse=True)
    
    def rank_by_speed(self) -> list[EmbeddingModelSpec]:
        """Rank models by latency."""
        
        models = list(self.catalog.values())
        return sorted(models, key=lambda m: m.avg_latency_ms)
    
    def recommend(
        self,
        use_case: str,
        budget: str = "medium",
        latency_requirement: str = "medium"
    ) -> list[EmbeddingModelSpec]:
        """Recommend models for use case."""
        
        candidates = list(self.catalog.values())
        
        # Filter by use case
        if use_case == "code":
            candidates = [m for m in candidates if m.model_type == ModelType.CODE or "code" in m.name.lower()]
        elif use_case == "multilingual":
            candidates = [m for m in candidates if m.model_type == ModelType.MULTILINGUAL or len(m.languages) > 1]
        
        # Filter by budget
        if budget == "low":
            candidates = [m for m in candidates if m.cost_per_1k_tokens == 0 or m.cost_per_1k_tokens < 0.00005]
        elif budget == "medium":
            candidates = [m for m in candidates if m.cost_per_1k_tokens < 0.0002]
        
        # Filter by latency
        if latency_requirement == "low":
            candidates = [m for m in candidates if m.avg_latency_ms < 30]
        elif latency_requirement == "medium":
            candidates = [m for m in candidates if m.avg_latency_ms < 100]
        
        # Sort by quality
        return sorted(candidates, key=lambda m: m.mteb_score, reverse=True)

Benchmarking Framework

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import time
import numpy as np

@dataclass
class BenchmarkResult:
    """Result of model benchmark."""
    
    model_name: str
    retrieval_accuracy: float
    mrr: float  # Mean Reciprocal Rank
    ndcg: float  # Normalized Discounted Cumulative Gain
    avg_latency_ms: float
    p95_latency_ms: float
    throughput_per_sec: float
    memory_mb: float

@dataclass
class BenchmarkDataset:
    """Dataset for benchmarking."""
    
    queries: list[str]
    documents: list[str]
    relevance_labels: list[list[int]]  # For each query, list of relevant doc indices

class EmbeddingBenchmark:
    """Benchmark embedding models."""
    
    def __init__(self, dataset: BenchmarkDataset):
        self.dataset = dataset
    
    async def benchmark_model(
        self,
        model: Any,
        model_name: str
    ) -> BenchmarkResult:
        """Run full benchmark on model."""
        
        # Embed documents
        doc_embeddings = await self._embed_with_timing(
            model,
            self.dataset.documents
        )
        
        # Embed queries and measure latency
        latencies = []
        query_embeddings = []
        
        for query in self.dataset.queries:
            start = time.perf_counter()
            emb = await model.embed(query)
            latencies.append((time.perf_counter() - start) * 1000)
            query_embeddings.append(emb)
        
        # Calculate retrieval metrics
        accuracy = self._calculate_accuracy(
            query_embeddings,
            doc_embeddings,
            self.dataset.relevance_labels
        )
        
        mrr = self._calculate_mrr(
            query_embeddings,
            doc_embeddings,
            self.dataset.relevance_labels
        )
        
        ndcg = self._calculate_ndcg(
            query_embeddings,
            doc_embeddings,
            self.dataset.relevance_labels
        )
        
        return BenchmarkResult(
            model_name=model_name,
            retrieval_accuracy=accuracy,
            mrr=mrr,
            ndcg=ndcg,
            avg_latency_ms=np.mean(latencies),
            p95_latency_ms=np.percentile(latencies, 95),
            throughput_per_sec=1000 / np.mean(latencies),
            memory_mb=0  # Would measure actual memory usage
        )
    
    async def _embed_with_timing(
        self,
        model: Any,
        texts: list[str]
    ) -> list[list[float]]:
        """Embed texts and return embeddings."""
        
        embeddings = []
        
        for text in texts:
            emb = await model.embed(text)
            embeddings.append(emb)
        
        return embeddings
    
    def _calculate_accuracy(
        self,
        query_embs: list,
        doc_embs: list,
        relevance: list[list[int]],
        k: int = 10
    ) -> float:
        """Calculate retrieval accuracy at k."""
        
        correct = 0
        
        for i, query_emb in enumerate(query_embs):
            # Calculate similarities
            similarities = [
                self._cosine_similarity(query_emb, doc_emb)
                for doc_emb in doc_embs
            ]
            
            # Get top-k indices
            top_k = np.argsort(similarities)[-k:][::-1]
            
            # Check if any relevant doc is in top-k
            relevant = set(relevance[i])
            if any(idx in relevant for idx in top_k):
                correct += 1
        
        return correct / len(query_embs)
    
    def _calculate_mrr(
        self,
        query_embs: list,
        doc_embs: list,
        relevance: list[list[int]]
    ) -> float:
        """Calculate Mean Reciprocal Rank."""
        
        reciprocal_ranks = []
        
        for i, query_emb in enumerate(query_embs):
            similarities = [
                self._cosine_similarity(query_emb, doc_emb)
                for doc_emb in doc_embs
            ]
            
            ranked = np.argsort(similarities)[::-1]
            relevant = set(relevance[i])
            
            for rank, idx in enumerate(ranked, 1):
                if idx in relevant:
                    reciprocal_ranks.append(1 / rank)
                    break
            else:
                reciprocal_ranks.append(0)
        
        return np.mean(reciprocal_ranks)
    
    def _calculate_ndcg(
        self,
        query_embs: list,
        doc_embs: list,
        relevance: list[list[int]],
        k: int = 10
    ) -> float:
        """Calculate Normalized Discounted Cumulative Gain."""
        
        ndcg_scores = []
        
        for i, query_emb in enumerate(query_embs):
            similarities = [
                self._cosine_similarity(query_emb, doc_emb)
                for doc_emb in doc_embs
            ]
            
            ranked = np.argsort(similarities)[-k:][::-1]
            relevant = set(relevance[i])
            
            # Calculate DCG
            dcg = 0
            for rank, idx in enumerate(ranked, 1):
                if idx in relevant:
                    dcg += 1 / np.log2(rank + 1)
            
            # Calculate ideal DCG
            ideal_dcg = sum(1 / np.log2(r + 1) for r in range(1, min(len(relevant), k) + 1))
            
            if ideal_dcg > 0:
                ndcg_scores.append(dcg / ideal_dcg)
            else:
                ndcg_scores.append(0)
        
        return np.mean(ndcg_scores)
    
    def _cosine_similarity(self, a: list, b: list) -> float:
        """Calculate cosine similarity."""
        
        a = np.array(a)
        b = np.array(b)
        
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

class DomainBenchmark:
    """Benchmark models on domain-specific data."""
    
    def __init__(self, domain: str):
        self.domain = domain
        self.test_pairs: list[tuple[str, str, float]] = []  # (query, doc, relevance)
    
    def add_test_pair(self, query: str, document: str, relevance: float):
        """Add a test pair."""
        
        self.test_pairs.append((query, document, relevance))
    
    async def evaluate(self, model: Any) -> dict:
        """Evaluate model on domain data."""
        
        predictions = []
        labels = []
        
        for query, doc, relevance in self.test_pairs:
            query_emb = await model.embed(query)
            doc_emb = await model.embed(doc)
            
            similarity = np.dot(query_emb, doc_emb) / (
                np.linalg.norm(query_emb) * np.linalg.norm(doc_emb)
            )
            
            predictions.append(similarity)
            labels.append(relevance)
        
        # Calculate correlation
        correlation = np.corrcoef(predictions, labels)[0, 1]
        
        # Calculate ranking accuracy
        correct_rankings = 0
        total_pairs = 0
        
        for i in range(len(predictions)):
            for j in range(i + 1, len(predictions)):
                if labels[i] != labels[j]:
                    total_pairs += 1
                    if (predictions[i] > predictions[j]) == (labels[i] > labels[j]):
                        correct_rankings += 1
        
        ranking_accuracy = correct_rankings / total_pairs if total_pairs > 0 else 0
        
        return {
            "domain": self.domain,
            "correlation": correlation,
            "ranking_accuracy": ranking_accuracy,
            "num_pairs": len(self.test_pairs)
        }

Model Evaluation Pipeline

from dataclasses import dataclass
from typing import Any, Optional, List
import asyncio

@dataclass
class EvaluationConfig:
    """Configuration for model evaluation."""
    
    test_queries: list[str]
    test_documents: list[str]
    relevance_labels: list[list[int]]
    batch_size: int = 32
    num_trials: int = 3

class ModelEvaluator:
    """Evaluate and compare embedding models."""
    
    def __init__(self, config: EvaluationConfig):
        self.config = config
    
    async def evaluate_model(
        self,
        model: Any,
        model_name: str
    ) -> dict:
        """Evaluate a single model."""
        
        results = {
            "model_name": model_name,
            "quality_metrics": {},
            "performance_metrics": {},
            "cost_metrics": {}
        }
        
        # Quality evaluation
        benchmark = EmbeddingBenchmark(
            BenchmarkDataset(
                queries=self.config.test_queries,
                documents=self.config.test_documents,
                relevance_labels=self.config.relevance_labels
            )
        )
        
        bench_result = await benchmark.benchmark_model(model, model_name)
        
        results["quality_metrics"] = {
            "retrieval_accuracy": bench_result.retrieval_accuracy,
            "mrr": bench_result.mrr,
            "ndcg": bench_result.ndcg
        }
        
        results["performance_metrics"] = {
            "avg_latency_ms": bench_result.avg_latency_ms,
            "p95_latency_ms": bench_result.p95_latency_ms,
            "throughput_per_sec": bench_result.throughput_per_sec
        }
        
        return results
    
    async def compare_models(
        self,
        models: list[tuple[Any, str]]
    ) -> list[dict]:
        """Compare multiple models."""
        
        results = []
        
        for model, name in models:
            result = await self.evaluate_model(model, name)
            results.append(result)
        
        return results
    
    def rank_models(
        self,
        results: list[dict],
        weights: dict = None
    ) -> list[dict]:
        """Rank models by weighted score."""
        
        weights = weights or {
            "retrieval_accuracy": 0.3,
            "mrr": 0.2,
            "ndcg": 0.2,
            "latency": 0.15,
            "throughput": 0.15
        }
        
        for result in results:
            score = 0
            
            # Quality scores (higher is better)
            score += weights["retrieval_accuracy"] * result["quality_metrics"]["retrieval_accuracy"]
            score += weights["mrr"] * result["quality_metrics"]["mrr"]
            score += weights["ndcg"] * result["quality_metrics"]["ndcg"]
            
            # Performance scores (normalized)
            max_latency = max(r["performance_metrics"]["avg_latency_ms"] for r in results)
            latency_score = 1 - (result["performance_metrics"]["avg_latency_ms"] / max_latency)
            score += weights["latency"] * latency_score
            
            max_throughput = max(r["performance_metrics"]["throughput_per_sec"] for r in results)
            throughput_score = result["performance_metrics"]["throughput_per_sec"] / max_throughput
            score += weights["throughput"] * throughput_score
            
            result["overall_score"] = score
        
        return sorted(results, key=lambda r: r["overall_score"], reverse=True)

class ABTestRunner:
    """Run A/B tests between embedding models."""
    
    def __init__(
        self,
        model_a: Any,
        model_b: Any,
        retriever: Any
    ):
        self.model_a = model_a
        self.model_b = model_b
        self.retriever = retriever
        self.results_a: list[dict] = []
        self.results_b: list[dict] = []
    
    async def run_query(self, query: str, use_model_a: bool) -> dict:
        """Run query with specified model."""
        
        model = self.model_a if use_model_a else self.model_b
        
        start = time.perf_counter()
        embedding = await model.embed(query)
        results = await self.retriever.search(embedding)
        latency = (time.perf_counter() - start) * 1000
        
        result = {
            "query": query,
            "results": results,
            "latency_ms": latency,
            "model": "A" if use_model_a else "B"
        }
        
        if use_model_a:
            self.results_a.append(result)
        else:
            self.results_b.append(result)
        
        return result
    
    def get_statistics(self) -> dict:
        """Get A/B test statistics."""
        
        if not self.results_a or not self.results_b:
            return {"error": "Not enough data"}
        
        latencies_a = [r["latency_ms"] for r in self.results_a]
        latencies_b = [r["latency_ms"] for r in self.results_b]
        
        return {
            "model_a": {
                "num_queries": len(self.results_a),
                "avg_latency_ms": np.mean(latencies_a),
                "p95_latency_ms": np.percentile(latencies_a, 95)
            },
            "model_b": {
                "num_queries": len(self.results_b),
                "avg_latency_ms": np.mean(latencies_b),
                "p95_latency_ms": np.percentile(latencies_b, 95)
            },
            "latency_improvement": (np.mean(latencies_a) - np.mean(latencies_b)) / np.mean(latencies_a) * 100
        }

class CostCalculator:
    """Calculate embedding costs."""
    
    def __init__(self, model_spec: EmbeddingModelSpec):
        self.spec = model_spec
    
    def estimate_monthly_cost(
        self,
        queries_per_day: int,
        avg_query_tokens: int,
        documents_to_embed: int,
        avg_doc_tokens: int
    ) -> dict:
        """Estimate monthly embedding costs."""
        
        # Query costs
        daily_query_tokens = queries_per_day * avg_query_tokens
        monthly_query_tokens = daily_query_tokens * 30
        query_cost = (monthly_query_tokens / 1000) * self.spec.cost_per_1k_tokens
        
        # Document embedding costs (one-time, amortized over 12 months)
        doc_tokens = documents_to_embed * avg_doc_tokens
        doc_cost = (doc_tokens / 1000) * self.spec.cost_per_1k_tokens / 12
        
        return {
            "model": self.spec.name,
            "monthly_query_cost": query_cost,
            "monthly_doc_cost": doc_cost,
            "total_monthly_cost": query_cost + doc_cost,
            "cost_per_query": (avg_query_tokens / 1000) * self.spec.cost_per_1k_tokens
        }
    
    def compare_costs(
        self,
        other_spec: EmbeddingModelSpec,
        queries_per_day: int,
        avg_query_tokens: int
    ) -> dict:
        """Compare costs with another model."""
        
        this_cost = self.estimate_monthly_cost(queries_per_day, avg_query_tokens, 0, 0)
        
        other_calc = CostCalculator(other_spec)
        other_cost = other_calc.estimate_monthly_cost(queries_per_day, avg_query_tokens, 0, 0)
        
        return {
            "model_a": self.spec.name,
            "model_b": other_spec.name,
            "cost_a": this_cost["monthly_query_cost"],
            "cost_b": other_cost["monthly_query_cost"],
            "savings": other_cost["monthly_query_cost"] - this_cost["monthly_query_cost"],
            "savings_percent": (other_cost["monthly_query_cost"] - this_cost["monthly_query_cost"]) / other_cost["monthly_query_cost"] * 100 if other_cost["monthly_query_cost"] > 0 else 0
        }

Model Adapters

from abc import ABC, abstractmethod
from typing import Any, Optional, List
import asyncio

class EmbeddingModel(ABC):
    """Abstract base class for embedding models."""
    
    @abstractmethod
    async def embed(self, text: str) -> list[float]:
        """Embed a single text."""
        pass
    
    @abstractmethod
    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Embed a batch of texts."""
        pass
    
    @property
    @abstractmethod
    def dimensions(self) -> int:
        """Get embedding dimensions."""
        pass

class OpenAIEmbedding(EmbeddingModel):
    """OpenAI embedding model adapter."""
    
    def __init__(
        self,
        model: str = "text-embedding-3-small",
        api_key: str = None
    ):
        self.model = model
        self.api_key = api_key
        self._dimensions = 1536 if "small" in model else 3072
    
    async def embed(self, text: str) -> list[float]:
        """Embed single text."""
        
        import openai
        
        client = openai.AsyncOpenAI(api_key=self.api_key)
        
        response = await client.embeddings.create(
            model=self.model,
            input=text
        )
        
        return response.data[0].embedding
    
    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Embed batch of texts."""
        
        import openai
        
        client = openai.AsyncOpenAI(api_key=self.api_key)
        
        response = await client.embeddings.create(
            model=self.model,
            input=texts
        )
        
        return [d.embedding for d in response.data]
    
    @property
    def dimensions(self) -> int:
        return self._dimensions

class CohereEmbedding(EmbeddingModel):
    """Cohere embedding model adapter."""
    
    def __init__(
        self,
        model: str = "embed-english-v3.0",
        api_key: str = None
    ):
        self.model = model
        self.api_key = api_key
        self._dimensions = 1024
    
    async def embed(self, text: str) -> list[float]:
        """Embed single text."""
        
        import cohere
        
        client = cohere.AsyncClient(api_key=self.api_key)
        
        response = await client.embed(
            texts=[text],
            model=self.model,
            input_type="search_query"
        )
        
        return response.embeddings[0]
    
    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Embed batch of texts."""
        
        import cohere
        
        client = cohere.AsyncClient(api_key=self.api_key)
        
        response = await client.embed(
            texts=texts,
            model=self.model,
            input_type="search_document"
        )
        
        return response.embeddings
    
    @property
    def dimensions(self) -> int:
        return self._dimensions

class HuggingFaceEmbedding(EmbeddingModel):
    """Hugging Face embedding model adapter."""
    
    def __init__(self, model_name: str = "BAAI/bge-large-en-v1.5"):
        self.model_name = model_name
        self._model = None
        self._tokenizer = None
        self._dimensions = None
    
    def _load_model(self):
        """Lazy load model."""
        
        if self._model is None:
            from transformers import AutoModel, AutoTokenizer
            import torch
            
            self._tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            self._model = AutoModel.from_pretrained(self.model_name)
            self._model.eval()
            
            # Get dimensions from model config
            self._dimensions = self._model.config.hidden_size
    
    async def embed(self, text: str) -> list[float]:
        """Embed single text."""
        
        self._load_model()
        
        import torch
        
        inputs = self._tokenizer(
            text,
            return_tensors="pt",
            truncation=True,
            max_length=512,
            padding=True
        )
        
        with torch.no_grad():
            outputs = self._model(**inputs)
            embedding = outputs.last_hidden_state[:, 0, :].squeeze().tolist()
        
        return embedding
    
    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Embed batch of texts."""
        
        self._load_model()
        
        import torch
        
        inputs = self._tokenizer(
            texts,
            return_tensors="pt",
            truncation=True,
            max_length=512,
            padding=True
        )
        
        with torch.no_grad():
            outputs = self._model(**inputs)
            embeddings = outputs.last_hidden_state[:, 0, :].tolist()
        
        return embeddings
    
    @property
    def dimensions(self) -> int:
        self._load_model()
        return self._dimensions

class SentenceTransformerEmbedding(EmbeddingModel):
    """Sentence Transformers embedding adapter."""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model_name = model_name
        self._model = None
    
    def _load_model(self):
        """Lazy load model."""
        
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer(self.model_name)
    
    async def embed(self, text: str) -> list[float]:
        """Embed single text."""
        
        self._load_model()
        return self._model.encode(text).tolist()
    
    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Embed batch of texts."""
        
        self._load_model()
        return self._model.encode(texts).tolist()
    
    @property
    def dimensions(self) -> int:
        self._load_model()
        return self._model.get_sentence_embedding_dimension()

class EmbeddingModelFactory:
    """Factory for creating embedding models."""
    
    @staticmethod
    def create(
        provider: str,
        model_name: str,
        **kwargs
    ) -> EmbeddingModel:
        """Create embedding model."""
        
        if provider == "openai":
            return OpenAIEmbedding(model=model_name, **kwargs)
        elif provider == "cohere":
            return CohereEmbedding(model=model_name, **kwargs)
        elif provider == "huggingface":
            return HuggingFaceEmbedding(model_name=model_name)
        elif provider == "sentence_transformers":
            return SentenceTransformerEmbedding(model_name=model_name)
        else:
            raise ValueError(f"Unknown provider: {provider}")

Production Selection Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict

app = FastAPI()

class SelectionRequest(BaseModel):
    use_case: str
    budget: str = "medium"
    latency_requirement: str = "medium"
    languages: Optional[List[str]] = None
    min_quality_score: float = 0.0

class BenchmarkRequest(BaseModel):
    model_names: List[str]
    test_queries: List[str]
    test_documents: List[str]
    relevance_labels: List[List[int]]

class CostEstimateRequest(BaseModel):
    model_name: str
    queries_per_day: int
    avg_query_tokens: int
    documents_to_embed: int = 0
    avg_doc_tokens: int = 0

# Initialize components
selector = ModelSelector()

@app.get("/v1/models")
async def list_models() -> list[dict]:
    """List all available models."""
    
    return [
        {
            "name": spec.name,
            "provider": spec.provider.value,
            "type": spec.model_type.value,
            "dimensions": spec.dimensions,
            "max_tokens": spec.max_tokens,
            "cost_per_1k_tokens": spec.cost_per_1k_tokens,
            "mteb_score": spec.mteb_score,
            "languages": spec.languages
        }
        for spec in MODEL_CATALOG.values()
    ]

@app.get("/v1/models/{model_name}")
async def get_model(model_name: str) -> dict:
    """Get model details."""
    
    spec = MODEL_CATALOG.get(model_name)
    
    if not spec:
        raise HTTPException(status_code=404, detail="Model not found")
    
    return {
        "name": spec.name,
        "provider": spec.provider.value,
        "type": spec.model_type.value,
        "dimensions": spec.dimensions,
        "max_tokens": spec.max_tokens,
        "cost_per_1k_tokens": spec.cost_per_1k_tokens,
        "avg_latency_ms": spec.avg_latency_ms,
        "mteb_score": spec.mteb_score,
        "languages": spec.languages,
        "description": spec.description
    }

@app.post("/v1/recommend")
async def recommend_models(request: SelectionRequest) -> list[dict]:
    """Get model recommendations."""
    
    recommendations = selector.recommend(
        use_case=request.use_case,
        budget=request.budget,
        latency_requirement=request.latency_requirement
    )
    
    # Filter by quality score
    recommendations = [
        r for r in recommendations
        if r.mteb_score >= request.min_quality_score
    ]
    
    # Filter by languages
    if request.languages:
        recommendations = [
            r for r in recommendations
            if any(lang in r.languages for lang in request.languages)
            or "multilingual" in r.languages
        ]
    
    return [
        {
            "name": r.name,
            "provider": r.provider.value,
            "mteb_score": r.mteb_score,
            "cost_per_1k_tokens": r.cost_per_1k_tokens,
            "avg_latency_ms": r.avg_latency_ms,
            "recommendation_reason": _get_recommendation_reason(r, request)
        }
        for r in recommendations[:5]
    ]

def _get_recommendation_reason(spec: EmbeddingModelSpec, request: SelectionRequest) -> str:
    """Generate recommendation reason."""
    
    reasons = []
    
    if spec.mteb_score > 65:
        reasons.append("high quality")
    
    if spec.cost_per_1k_tokens == 0:
        reasons.append("free/open-source")
    elif spec.cost_per_1k_tokens < 0.0001:
        reasons.append("cost-effective")
    
    if spec.avg_latency_ms < 30:
        reasons.append("very fast")
    
    if request.use_case == "code" and spec.model_type == ModelType.CODE:
        reasons.append("optimized for code")
    
    if request.use_case == "multilingual" and spec.model_type == ModelType.MULTILINGUAL:
        reasons.append("multilingual support")
    
    return ", ".join(reasons) if reasons else "good general-purpose model"

@app.post("/v1/estimate-cost")
async def estimate_cost(request: CostEstimateRequest) -> dict:
    """Estimate embedding costs."""
    
    spec = MODEL_CATALOG.get(request.model_name)
    
    if not spec:
        raise HTTPException(status_code=404, detail="Model not found")
    
    calculator = CostCalculator(spec)
    
    return calculator.estimate_monthly_cost(
        queries_per_day=request.queries_per_day,
        avg_query_tokens=request.avg_query_tokens,
        documents_to_embed=request.documents_to_embed,
        avg_doc_tokens=request.avg_doc_tokens
    )

@app.get("/v1/compare/{model_a}/{model_b}")
async def compare_models(model_a: str, model_b: str) -> dict:
    """Compare two models."""
    
    spec_a = MODEL_CATALOG.get(model_a)
    spec_b = MODEL_CATALOG.get(model_b)
    
    if not spec_a or not spec_b:
        raise HTTPException(status_code=404, detail="Model not found")
    
    return {
        "model_a": {
            "name": spec_a.name,
            "dimensions": spec_a.dimensions,
            "mteb_score": spec_a.mteb_score,
            "cost_per_1k_tokens": spec_a.cost_per_1k_tokens,
            "avg_latency_ms": spec_a.avg_latency_ms
        },
        "model_b": {
            "name": spec_b.name,
            "dimensions": spec_b.dimensions,
            "mteb_score": spec_b.mteb_score,
            "cost_per_1k_tokens": spec_b.cost_per_1k_tokens,
            "avg_latency_ms": spec_b.avg_latency_ms
        },
        "comparison": {
            "quality_winner": model_a if spec_a.mteb_score > spec_b.mteb_score else model_b,
            "cost_winner": model_a if spec_a.cost_per_1k_tokens < spec_b.cost_per_1k_tokens else model_b,
            "speed_winner": model_a if spec_a.avg_latency_ms < spec_b.avg_latency_ms else model_b,
            "quality_diff": abs(spec_a.mteb_score - spec_b.mteb_score),
            "cost_diff": abs(spec_a.cost_per_1k_tokens - spec_b.cost_per_1k_tokens),
            "latency_diff_ms": abs(spec_a.avg_latency_ms - spec_b.avg_latency_ms)
        }
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Embedding model selection requires balancing quality, speed, and cost for your specific use case. Start by understanding your requirements: What languages do you need? How long are your documents? What's your latency budget? What can you spend? Use the MTEB leaderboard as a starting point, but always benchmark on your own data—domain-specific performance can vary significantly from general benchmarks. For most English-language applications, OpenAI's text-embedding-3-small offers an excellent balance of quality and cost. For multilingual needs, Cohere's embed-multilingual-v3 excels. For code search, Voyage's code-specialized model outperforms general models. If cost is critical and you can self-host, open-source models like BGE and E5 provide competitive quality at zero API cost. For latency-sensitive applications, smaller models like all-MiniLM-L6-v2 offer sub-10ms inference. Build evaluation pipelines that test models on your actual queries and documents, measure retrieval accuracy, and calculate total cost of ownership. The best model is the one that meets your quality threshold at the lowest cost within your latency constraints.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion