Introduction: Semantic search goes beyond keyword matching to understand the meaning and intent behind queries. By converting text to dense vector embeddings, semantic search finds conceptually similar content even when exact words don’t match. However, naive implementations often underperform—poor embedding choices, suboptimal indexing, and lack of reranking lead to irrelevant results. This guide covers practical optimization techniques: selecting the right embedding model for your domain, tuning vector index parameters for speed and accuracy, implementing hybrid search that combines semantic and keyword approaches, and adding reranking to improve precision. Whether you’re building a RAG system, document search, or recommendation engine, these optimizations can dramatically improve search quality.

Embedding Selection
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
from enum import Enum
class EmbeddingTask(Enum):
"""Embedding task types."""
RETRIEVAL_QUERY = "retrieval_query"
RETRIEVAL_DOCUMENT = "retrieval_document"
SEMANTIC_SIMILARITY = "semantic_similarity"
CLASSIFICATION = "classification"
CLUSTERING = "clustering"
@dataclass
class EmbeddingConfig:
"""Configuration for embedding model."""
model_name: str
dimension: int
max_tokens: int
normalize: bool = True
batch_size: int = 32
task_prefix: dict[EmbeddingTask, str] = None
class EmbeddingModel(ABC):
"""Abstract embedding model."""
@abstractmethod
async def embed(
self,
texts: list[str],
task: EmbeddingTask = None
) -> list[list[float]]:
"""Generate embeddings for texts."""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Embedding dimension."""
pass
class OpenAIEmbedding(EmbeddingModel):
"""OpenAI embedding model."""
def __init__(
self,
client: Any,
model: str = "text-embedding-3-small",
dimensions: int = None
):
self.client = client
self.model = model
self._dimensions = dimensions or self._get_default_dimension()
def _get_default_dimension(self) -> int:
"""Get default dimension for model."""
defaults = {
"text-embedding-3-small": 1536,
"text-embedding-3-large": 3072,
"text-embedding-ada-002": 1536
}
return defaults.get(self.model, 1536)
@property
def dimension(self) -> int:
return self._dimensions
async def embed(
self,
texts: list[str],
task: EmbeddingTask = None
) -> list[list[float]]:
"""Generate embeddings using OpenAI."""
kwargs = {"model": self.model, "input": texts}
if self._dimensions and self.model.startswith("text-embedding-3"):
kwargs["dimensions"] = self._dimensions
response = await self.client.embeddings.create(**kwargs)
return [item.embedding for item in response.data]
class CohereEmbedding(EmbeddingModel):
"""Cohere embedding model."""
def __init__(
self,
client: Any,
model: str = "embed-english-v3.0"
):
self.client = client
self.model = model
self._task_map = {
EmbeddingTask.RETRIEVAL_QUERY: "search_query",
EmbeddingTask.RETRIEVAL_DOCUMENT: "search_document",
EmbeddingTask.CLASSIFICATION: "classification",
EmbeddingTask.CLUSTERING: "clustering"
}
@property
def dimension(self) -> int:
return 1024
async def embed(
self,
texts: list[str],
task: EmbeddingTask = None
) -> list[list[float]]:
"""Generate embeddings using Cohere."""
input_type = self._task_map.get(task, "search_document")
response = await self.client.embed(
texts=texts,
model=self.model,
input_type=input_type
)
return response.embeddings
class VoyageEmbedding(EmbeddingModel):
"""Voyage AI embedding model."""
def __init__(
self,
client: Any,
model: str = "voyage-large-2"
):
self.client = client
self.model = model
self._dimensions = {
"voyage-large-2": 1536,
"voyage-code-2": 1536,
"voyage-lite-02-instruct": 1024
}
@property
def dimension(self) -> int:
return self._dimensions.get(self.model, 1536)
async def embed(
self,
texts: list[str],
task: EmbeddingTask = None
) -> list[list[float]]:
"""Generate embeddings using Voyage."""
input_type = "query" if task == EmbeddingTask.RETRIEVAL_QUERY else "document"
response = await self.client.embed(
texts=texts,
model=self.model,
input_type=input_type
)
return response.embeddings
class SentenceTransformerEmbedding(EmbeddingModel):
"""Local sentence transformer model."""
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
device: str = "cpu"
):
self.model_name = model_name
self.device = device
self._model = None
self._dimension = None
def _load_model(self):
"""Lazy load model."""
if self._model is None:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.model_name, device=self.device)
self._dimension = self._model.get_sentence_embedding_dimension()
@property
def dimension(self) -> int:
self._load_model()
return self._dimension
async def embed(
self,
texts: list[str],
task: EmbeddingTask = None
) -> list[list[float]]:
"""Generate embeddings locally."""
self._load_model()
embeddings = self._model.encode(
texts,
normalize_embeddings=True,
show_progress_bar=False
)
return embeddings.tolist()
class InstructorEmbedding(EmbeddingModel):
"""Instructor embedding with task instructions."""
def __init__(
self,
model_name: str = "hkunlp/instructor-large",
device: str = "cpu"
):
self.model_name = model_name
self.device = device
self._model = None
self._instructions = {
EmbeddingTask.RETRIEVAL_QUERY: "Represent the question for retrieving relevant documents:",
EmbeddingTask.RETRIEVAL_DOCUMENT: "Represent the document for retrieval:",
EmbeddingTask.SEMANTIC_SIMILARITY: "Represent the sentence for semantic similarity:",
EmbeddingTask.CLASSIFICATION: "Represent the text for classification:",
EmbeddingTask.CLUSTERING: "Represent the text for clustering:"
}
def _load_model(self):
"""Lazy load model."""
if self._model is None:
from InstructorEmbedding import INSTRUCTOR
self._model = INSTRUCTOR(self.model_name, device=self.device)
@property
def dimension(self) -> int:
return 768
async def embed(
self,
texts: list[str],
task: EmbeddingTask = None
) -> list[list[float]]:
"""Generate embeddings with instructions."""
self._load_model()
instruction = self._instructions.get(
task,
self._instructions[EmbeddingTask.RETRIEVAL_DOCUMENT]
)
inputs = [[instruction, text] for text in texts]
embeddings = self._model.encode(inputs)
return embeddings.tolist()
Vector Index Optimization
from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
import numpy as np
@dataclass
class SearchResult:
"""A search result."""
id: str
score: float
metadata: dict = None
text: str = None
@dataclass
class IndexConfig:
"""Vector index configuration."""
dimension: int
metric: str = "cosine" # cosine, euclidean, dot_product
# HNSW parameters
ef_construction: int = 200
m: int = 16
ef_search: int = 100
# IVF parameters
nlist: int = 100
nprobe: int = 10
# PQ parameters
pq_m: int = 8
pq_bits: int = 8
class VectorIndex(ABC):
"""Abstract vector index."""
@abstractmethod
async def add(
self,
ids: list[str],
embeddings: list[list[float]],
metadata: list[dict] = None
) -> None:
"""Add vectors to index."""
pass
@abstractmethod
async def search(
self,
query_embedding: list[float],
k: int = 10,
filter: dict = None
) -> list[SearchResult]:
"""Search for similar vectors."""
pass
@abstractmethod
async def delete(self, ids: list[str]) -> None:
"""Delete vectors by ID."""
pass
class FAISSIndex(VectorIndex):
"""FAISS vector index."""
def __init__(self, config: IndexConfig):
self.config = config
self._index = None
self._id_map: dict[int, str] = {}
self._metadata: dict[str, dict] = {}
self._next_id = 0
self._build_index()
def _build_index(self):
"""Build FAISS index."""
import faiss
d = self.config.dimension
# Choose index type based on expected size
if self.config.metric == "cosine":
# Normalize vectors for cosine similarity
quantizer = faiss.IndexFlatIP(d)
else:
quantizer = faiss.IndexFlatL2(d)
# Use IVF for larger datasets
self._index = faiss.IndexIVFFlat(
quantizer,
d,
self.config.nlist,
faiss.METRIC_INNER_PRODUCT if self.config.metric == "cosine" else faiss.METRIC_L2
)
async def add(
self,
ids: list[str],
embeddings: list[list[float]],
metadata: list[dict] = None
) -> None:
"""Add vectors to FAISS index."""
import faiss
vectors = np.array(embeddings, dtype=np.float32)
# Normalize for cosine similarity
if self.config.metric == "cosine":
faiss.normalize_L2(vectors)
# Train index if needed
if not self._index.is_trained:
self._index.train(vectors)
# Add vectors
self._index.add(vectors)
# Store ID mapping and metadata
for i, id_ in enumerate(ids):
internal_id = self._next_id + i
self._id_map[internal_id] = id_
if metadata and i < len(metadata):
self._metadata[id_] = metadata[i]
self._next_id += len(ids)
async def search(
self,
query_embedding: list[float],
k: int = 10,
filter: dict = None
) -> list[SearchResult]:
"""Search FAISS index."""
import faiss
query = np.array([query_embedding], dtype=np.float32)
if self.config.metric == "cosine":
faiss.normalize_L2(query)
# Set search parameters
self._index.nprobe = self.config.nprobe
# Search
distances, indices = self._index.search(query, k)
results = []
for dist, idx in zip(distances[0], indices[0]):
if idx == -1:
continue
id_ = self._id_map.get(idx)
if not id_:
continue
# Apply filter if provided
if filter:
meta = self._metadata.get(id_, {})
if not self._matches_filter(meta, filter):
continue
results.append(SearchResult(
id=id_,
score=float(dist),
metadata=self._metadata.get(id_)
))
return results
def _matches_filter(self, metadata: dict, filter: dict) -> bool:
"""Check if metadata matches filter."""
for key, value in filter.items():
if key not in metadata:
return False
if metadata[key] != value:
return False
return True
async def delete(self, ids: list[str]) -> None:
"""Delete vectors (not directly supported in FAISS)."""
# FAISS doesn't support deletion well
# Would need to rebuild index
for id_ in ids:
if id_ in self._metadata:
del self._metadata[id_]
class HNSWIndex(VectorIndex):
"""HNSW index using hnswlib."""
def __init__(self, config: IndexConfig, max_elements: int = 100000):
self.config = config
self.max_elements = max_elements
self._index = None
self._metadata: dict[int, dict] = {}
self._id_to_internal: dict[str, int] = {}
self._internal_to_id: dict[int, str] = {}
self._next_id = 0
self._build_index()
def _build_index(self):
"""Build HNSW index."""
import hnswlib
space = "cosine" if self.config.metric == "cosine" else "l2"
self._index = hnswlib.Index(space=space, dim=self.config.dimension)
self._index.init_index(
max_elements=self.max_elements,
ef_construction=self.config.ef_construction,
M=self.config.m
)
async def add(
self,
ids: list[str],
embeddings: list[list[float]],
metadata: list[dict] = None
) -> None:
"""Add vectors to HNSW index."""
vectors = np.array(embeddings, dtype=np.float32)
internal_ids = list(range(self._next_id, self._next_id + len(ids)))
self._index.add_items(vectors, internal_ids)
for i, (id_, internal_id) in enumerate(zip(ids, internal_ids)):
self._id_to_internal[id_] = internal_id
self._internal_to_id[internal_id] = id_
if metadata and i < len(metadata):
self._metadata[internal_id] = metadata[i]
self._next_id += len(ids)
async def search(
self,
query_embedding: list[float],
k: int = 10,
filter: dict = None
) -> list[SearchResult]:
"""Search HNSW index."""
query = np.array([query_embedding], dtype=np.float32)
self._index.set_ef(self.config.ef_search)
# Search more if filtering
search_k = k * 3 if filter else k
internal_ids, distances = self._index.knn_query(query, k=search_k)
results = []
for internal_id, dist in zip(internal_ids[0], distances[0]):
id_ = self._internal_to_id.get(internal_id)
if not id_:
continue
meta = self._metadata.get(internal_id, {})
if filter and not self._matches_filter(meta, filter):
continue
results.append(SearchResult(
id=id_,
score=1 - float(dist), # Convert distance to similarity
metadata=meta
))
if len(results) >= k:
break
return results
def _matches_filter(self, metadata: dict, filter: dict) -> bool:
"""Check if metadata matches filter."""
for key, value in filter.items():
if key not in metadata:
return False
if metadata[key] != value:
return False
return True
async def delete(self, ids: list[str]) -> None:
"""Mark vectors as deleted."""
for id_ in ids:
internal_id = self._id_to_internal.get(id_)
if internal_id is not None:
self._index.mark_deleted(internal_id)
del self._id_to_internal[id_]
del self._internal_to_id[internal_id]
if internal_id in self._metadata:
del self._metadata[internal_id]
Hybrid Search
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class FusionMethod(Enum):
"""Methods for fusing search results."""
RRF = "rrf" # Reciprocal Rank Fusion
LINEAR = "linear" # Linear combination
CONVEX = "convex" # Convex combination
@dataclass
class HybridConfig:
"""Hybrid search configuration."""
semantic_weight: float = 0.7
keyword_weight: float = 0.3
fusion_method: FusionMethod = FusionMethod.RRF
rrf_k: int = 60
class KeywordSearch:
"""BM25-based keyword search."""
def __init__(self):
self._documents: dict[str, str] = {}
self._bm25 = None
self._doc_ids: list[str] = []
def add(self, id_: str, text: str) -> None:
"""Add document to index."""
self._documents[id_] = text
self._bm25 = None # Invalidate index
def _build_index(self):
"""Build BM25 index."""
from rank_bm25 import BM25Okapi
self._doc_ids = list(self._documents.keys())
tokenized = [
doc.lower().split()
for doc in self._documents.values()
]
self._bm25 = BM25Okapi(tokenized)
def search(self, query: str, k: int = 10) -> list[SearchResult]:
"""Search using BM25."""
if self._bm25 is None:
self._build_index()
tokenized_query = query.lower().split()
scores = self._bm25.get_scores(tokenized_query)
# Get top k
top_indices = scores.argsort()[-k:][::-1]
results = []
for idx in top_indices:
if scores[idx] > 0:
results.append(SearchResult(
id=self._doc_ids[idx],
score=float(scores[idx]),
text=self._documents[self._doc_ids[idx]]
))
return results
class HybridSearch:
"""Combine semantic and keyword search."""
def __init__(
self,
embedding_model: EmbeddingModel,
vector_index: VectorIndex,
config: HybridConfig = None
):
self.embedding_model = embedding_model
self.vector_index = vector_index
self.config = config or HybridConfig()
self.keyword_search = KeywordSearch()
self._documents: dict[str, str] = {}
async def add(
self,
id_: str,
text: str,
metadata: dict = None
) -> None:
"""Add document to both indexes."""
# Add to keyword index
self.keyword_search.add(id_, text)
self._documents[id_] = text
# Add to vector index
embeddings = await self.embedding_model.embed(
[text],
task=EmbeddingTask.RETRIEVAL_DOCUMENT
)
await self.vector_index.add(
ids=[id_],
embeddings=embeddings,
metadata=[metadata] if metadata else None
)
async def search(
self,
query: str,
k: int = 10,
filter: dict = None
) -> list[SearchResult]:
"""Perform hybrid search."""
# Semantic search
query_embedding = await self.embedding_model.embed(
[query],
task=EmbeddingTask.RETRIEVAL_QUERY
)
semantic_results = await self.vector_index.search(
query_embedding[0],
k=k * 2,
filter=filter
)
# Keyword search
keyword_results = self.keyword_search.search(query, k=k * 2)
# Fuse results
if self.config.fusion_method == FusionMethod.RRF:
return self._rrf_fusion(semantic_results, keyword_results, k)
elif self.config.fusion_method == FusionMethod.LINEAR:
return self._linear_fusion(semantic_results, keyword_results, k)
else:
return self._convex_fusion(semantic_results, keyword_results, k)
def _rrf_fusion(
self,
semantic: list[SearchResult],
keyword: list[SearchResult],
k: int
) -> list[SearchResult]:
"""Reciprocal Rank Fusion."""
rrf_k = self.config.rrf_k
scores: dict[str, float] = {}
# Score semantic results
for rank, result in enumerate(semantic):
scores[result.id] = scores.get(result.id, 0) + \
self.config.semantic_weight / (rrf_k + rank + 1)
# Score keyword results
for rank, result in enumerate(keyword):
scores[result.id] = scores.get(result.id, 0) + \
self.config.keyword_weight / (rrf_k + rank + 1)
# Sort by combined score
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
results = []
for id_ in sorted_ids[:k]:
results.append(SearchResult(
id=id_,
score=scores[id_],
text=self._documents.get(id_)
))
return results
def _linear_fusion(
self,
semantic: list[SearchResult],
keyword: list[SearchResult],
k: int
) -> list[SearchResult]:
"""Linear combination of scores."""
# Normalize scores
semantic_scores = self._normalize_scores(semantic)
keyword_scores = self._normalize_scores(keyword)
# Combine scores
combined: dict[str, float] = {}
for result in semantic:
combined[result.id] = \
self.config.semantic_weight * semantic_scores.get(result.id, 0)
for result in keyword:
combined[result.id] = combined.get(result.id, 0) + \
self.config.keyword_weight * keyword_scores.get(result.id, 0)
# Sort and return
sorted_ids = sorted(combined.keys(), key=lambda x: combined[x], reverse=True)
return [
SearchResult(id=id_, score=combined[id_], text=self._documents.get(id_))
for id_ in sorted_ids[:k]
]
def _convex_fusion(
self,
semantic: list[SearchResult],
keyword: list[SearchResult],
k: int
) -> list[SearchResult]:
"""Convex combination (weighted average)."""
# Same as linear but ensures weights sum to 1
total_weight = self.config.semantic_weight + self.config.keyword_weight
sem_w = self.config.semantic_weight / total_weight
key_w = self.config.keyword_weight / total_weight
semantic_scores = self._normalize_scores(semantic)
keyword_scores = self._normalize_scores(keyword)
combined: dict[str, float] = {}
all_ids = set(r.id for r in semantic) | set(r.id for r in keyword)
for id_ in all_ids:
combined[id_] = \
sem_w * semantic_scores.get(id_, 0) + \
key_w * keyword_scores.get(id_, 0)
sorted_ids = sorted(combined.keys(), key=lambda x: combined[x], reverse=True)
return [
SearchResult(id=id_, score=combined[id_], text=self._documents.get(id_))
for id_ in sorted_ids[:k]
]
def _normalize_scores(self, results: list[SearchResult]) -> dict[str, float]:
"""Normalize scores to 0-1 range."""
if not results:
return {}
scores = [r.score for r in results]
min_score = min(scores)
max_score = max(scores)
if max_score == min_score:
return {r.id: 1.0 for r in results}
return {
r.id: (r.score - min_score) / (max_score - min_score)
for r in results
}
Query Optimization
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class QueryExpansion:
"""Expanded query with variations."""
original: str
expanded: list[str]
hypothetical_answers: list[str] = None
class QueryOptimizer:
"""Optimize queries for better retrieval."""
def __init__(
self,
llm_client: Any = None,
model: str = "gpt-4o-mini"
):
self.llm_client = llm_client
self.model = model
async def expand_query(self, query: str) -> QueryExpansion:
"""Expand query with variations."""
if not self.llm_client:
return QueryExpansion(original=query, expanded=[query])
prompt = f"""Generate 3 alternative phrasings of this search query that might help find relevant documents:
Query: {query}
Return only the alternative queries, one per line."""
response = await self.llm_client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
alternatives = response.choices[0].message.content.strip().split("\n")
alternatives = [a.strip() for a in alternatives if a.strip()]
return QueryExpansion(
original=query,
expanded=[query] + alternatives[:3]
)
async def generate_hyde(self, query: str) -> QueryExpansion:
"""Generate Hypothetical Document Embeddings."""
if not self.llm_client:
return QueryExpansion(original=query, expanded=[query])
prompt = f"""Write a short passage that would be a perfect answer to this question:
Question: {query}
Write 2-3 sentences as if from a document that answers this question."""
response = await self.llm_client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
hypothetical = response.choices[0].message.content.strip()
return QueryExpansion(
original=query,
expanded=[query],
hypothetical_answers=[hypothetical]
)
async def decompose_query(self, query: str) -> list[str]:
"""Decompose complex query into sub-queries."""
if not self.llm_client:
return [query]
prompt = f"""Break down this complex question into simpler sub-questions that together would answer the original:
Question: {query}
Return 2-4 simpler questions, one per line."""
response = await self.llm_client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
sub_queries = response.choices[0].message.content.strip().split("\n")
sub_queries = [q.strip() for q in sub_queries if q.strip()]
return sub_queries if sub_queries else [query]
class MultiQuerySearch:
"""Search with multiple query variations."""
def __init__(
self,
search: HybridSearch,
optimizer: QueryOptimizer
):
self.search = search
self.optimizer = optimizer
async def search_expanded(
self,
query: str,
k: int = 10
) -> list[SearchResult]:
"""Search with expanded queries."""
expansion = await self.optimizer.expand_query(query)
all_results: dict[str, SearchResult] = {}
for expanded_query in expansion.expanded:
results = await self.search.search(expanded_query, k=k)
for result in results:
if result.id not in all_results:
all_results[result.id] = result
else:
# Keep higher score
if result.score > all_results[result.id].score:
all_results[result.id] = result
# Sort by score
sorted_results = sorted(
all_results.values(),
key=lambda x: x.score,
reverse=True
)
return sorted_results[:k]
async def search_hyde(
self,
query: str,
k: int = 10
) -> list[SearchResult]:
"""Search using HyDE."""
expansion = await self.optimizer.generate_hyde(query)
# Search with both original and hypothetical
all_results: dict[str, SearchResult] = {}
# Original query
results = await self.search.search(query, k=k)
for result in results:
all_results[result.id] = result
# Hypothetical answer
if expansion.hypothetical_answers:
for hypo in expansion.hypothetical_answers:
results = await self.search.search(hypo, k=k)
for result in results:
if result.id not in all_results:
all_results[result.id] = result
sorted_results = sorted(
all_results.values(),
key=lambda x: x.score,
reverse=True
)
return sorted_results[:k]
Production Search Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class IndexRequest(BaseModel):
id: str
text: str
metadata: Optional[dict] = None
class SearchRequest(BaseModel):
query: str
k: int = 10
filter: Optional[dict] = None
use_hybrid: bool = True
expand_query: bool = False
use_hyde: bool = False
class SearchResultResponse(BaseModel):
id: str
score: float
text: Optional[str] = None
metadata: Optional[dict] = None
class SearchResponse(BaseModel):
results: list[SearchResultResponse]
query: str
expanded_queries: Optional[list[str]] = None
# Initialize components (would be configured properly)
# embedding_model = OpenAIEmbedding(client)
# vector_index = HNSWIndex(IndexConfig(dimension=1536))
# hybrid_search = HybridSearch(embedding_model, vector_index)
# optimizer = QueryOptimizer(llm_client)
@app.post("/v1/index")
async def index_document(request: IndexRequest):
"""Index a document."""
# await hybrid_search.add(request.id, request.text, request.metadata)
return {
"status": "indexed",
"id": request.id
}
@app.post("/v1/index/batch")
async def index_batch(documents: list[IndexRequest]):
"""Index multiple documents."""
# for doc in documents:
# await hybrid_search.add(doc.id, doc.text, doc.metadata)
return {
"status": "indexed",
"count": len(documents)
}
@app.post("/v1/search")
async def search(request: SearchRequest) -> SearchResponse:
"""Search for documents."""
expanded = None
# Would use actual search
# if request.use_hyde:
# results = await multi_query.search_hyde(request.query, request.k)
# elif request.expand_query:
# results = await multi_query.search_expanded(request.query, request.k)
# elif request.use_hybrid:
# results = await hybrid_search.search(request.query, request.k, request.filter)
# else:
# results = await vector_index.search(query_embedding, request.k, request.filter)
# Placeholder results
results = []
return SearchResponse(
results=[
SearchResultResponse(
id=r.id,
score=r.score,
text=r.text,
metadata=r.metadata
)
for r in results
],
query=request.query,
expanded_queries=expanded
)
@app.delete("/v1/index/{doc_id}")
async def delete_document(doc_id: str):
"""Delete a document."""
# await vector_index.delete([doc_id])
return {"status": "deleted", "id": doc_id}
@app.get("/v1/stats")
async def get_stats():
"""Get index statistics."""
return {
"total_documents": 0,
"index_type": "HNSW",
"dimension": 1536,
"metric": "cosine"
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- MTEB Leaderboard: https://huggingface.co/spaces/mteb/leaderboard
- FAISS: https://github.com/facebookresearch/faiss
- HyDE Paper: https://arxiv.org/abs/2212.10496
- Sentence Transformers: https://www.sbert.net/
Conclusion
Semantic search quality depends on multiple factors working together. Start with embedding selection—choose models trained for retrieval tasks and consider domain-specific options for specialized content. Optimize your vector index parameters: HNSW’s ef_construction and M values affect build time and recall, while ef_search controls query-time accuracy. Implement hybrid search combining semantic and keyword approaches—BM25 catches exact matches that embeddings might miss, while embeddings find conceptually similar content. Use query optimization techniques like expansion and HyDE to improve recall for ambiguous queries. Add reranking with cross-encoders for precision-critical applications. Monitor your search quality with metrics like MRR, NDCG, and recall@k, and continuously tune based on user feedback. The key insight is that semantic search is not a single algorithm but a pipeline—each component contributes to overall quality, and optimizing the full pipeline yields better results than perfecting any single component.
