Introduction: Hybrid search combines the best of both worlds: the semantic understanding of vector search with the precision of keyword matching. Pure vector search excels at finding conceptually similar content but can miss exact matches; pure keyword search finds exact terms but misses semantic relationships. Hybrid search fuses these approaches, using vector similarity for semantic relevance and BM25 or TF-IDF for lexical matching. The challenge lies in combining scores from different ranking systems—Reciprocal Rank Fusion (RRF) and weighted linear combination are the most common approaches. This guide covers practical patterns for implementing hybrid search: from basic score fusion to sophisticated reranking strategies that deliver the best of both retrieval paradigms.

Search Components
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from abc import ABC, abstractmethod
import math
@dataclass
class SearchResult:
"""A search result."""
id: str
content: str
score: float
metadata: dict = field(default_factory=dict)
source: str = "unknown"
class SearchBackend(ABC):
"""Abstract search backend."""
@abstractmethod
async def search(
self,
query: str,
limit: int = 10
) -> list[SearchResult]:
"""Execute search."""
pass
class VectorSearchBackend(SearchBackend):
"""Vector similarity search."""
def __init__(self, embedding_model: Any, vector_store: Any):
self.embedding_model = embedding_model
self.vector_store = vector_store
async def search(
self,
query: str,
limit: int = 10
) -> list[SearchResult]:
"""Search by vector similarity."""
# Generate query embedding
query_embedding = await self.embedding_model.embed(query)
# Search vector store
results = await self.vector_store.search(
embedding=query_embedding,
limit=limit
)
return [
SearchResult(
id=r["id"],
content=r["content"],
score=r["score"],
metadata=r.get("metadata", {}),
source="vector"
)
for r in results
]
class BM25SearchBackend(SearchBackend):
"""BM25 keyword search."""
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.documents: list[dict] = []
self.doc_lengths: list[int] = []
self.avg_doc_length: float = 0
self.term_frequencies: dict[str, dict[int, int]] = {}
self.doc_frequencies: dict[str, int] = {}
self.N: int = 0
def index(self, documents: list[dict]):
"""Index documents for BM25."""
self.documents = documents
self.N = len(documents)
for doc_id, doc in enumerate(documents):
tokens = self._tokenize(doc["content"])
self.doc_lengths.append(len(tokens))
term_counts = {}
for token in tokens:
term_counts[token] = term_counts.get(token, 0) + 1
for term, count in term_counts.items():
if term not in self.term_frequencies:
self.term_frequencies[term] = {}
self.doc_frequencies[term] = 0
self.term_frequencies[term][doc_id] = count
self.doc_frequencies[term] += 1
self.avg_doc_length = sum(self.doc_lengths) / len(self.doc_lengths) if self.doc_lengths else 0
def _tokenize(self, text: str) -> list[str]:
"""Simple tokenization."""
import re
return re.findall(r'\w+', text.lower())
def _idf(self, term: str) -> float:
"""Calculate IDF for term."""
df = self.doc_frequencies.get(term, 0)
if df == 0:
return 0
return math.log((self.N - df + 0.5) / (df + 0.5) + 1)
def _score_document(self, query_terms: list[str], doc_id: int) -> float:
"""Calculate BM25 score for document."""
score = 0
doc_length = self.doc_lengths[doc_id]
for term in query_terms:
if term not in self.term_frequencies:
continue
tf = self.term_frequencies[term].get(doc_id, 0)
if tf == 0:
continue
idf = self._idf(term)
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (1 - self.b + self.b * doc_length / self.avg_doc_length)
score += idf * numerator / denominator
return score
async def search(
self,
query: str,
limit: int = 10
) -> list[SearchResult]:
"""Search using BM25."""
query_terms = self._tokenize(query)
scores = []
for doc_id in range(self.N):
score = self._score_document(query_terms, doc_id)
if score > 0:
scores.append((doc_id, score))
# Sort by score
scores.sort(key=lambda x: x[1], reverse=True)
results = []
for doc_id, score in scores[:limit]:
doc = self.documents[doc_id]
results.append(SearchResult(
id=doc.get("id", str(doc_id)),
content=doc["content"],
score=score,
metadata=doc.get("metadata", {}),
source="bm25"
))
return results
class ElasticsearchBackend(SearchBackend):
"""Elasticsearch keyword search."""
def __init__(self, client: Any, index_name: str):
self.client = client
self.index_name = index_name
async def search(
self,
query: str,
limit: int = 10
) -> list[SearchResult]:
"""Search using Elasticsearch."""
response = await self.client.search(
index=self.index_name,
body={
"query": {
"multi_match": {
"query": query,
"fields": ["content", "title^2"],
"type": "best_fields"
}
},
"size": limit
}
)
results = []
for hit in response["hits"]["hits"]:
results.append(SearchResult(
id=hit["_id"],
content=hit["_source"].get("content", ""),
score=hit["_score"],
metadata=hit["_source"],
source="elasticsearch"
))
return results
Score Fusion Strategies
from dataclasses import dataclass
from typing import Any, Optional, List
from abc import ABC, abstractmethod
class FusionStrategy(ABC):
"""Abstract fusion strategy."""
@abstractmethod
def fuse(
self,
result_sets: list[list[SearchResult]]
) -> list[SearchResult]:
"""Fuse multiple result sets."""
pass
class ReciprocalRankFusion(FusionStrategy):
"""Reciprocal Rank Fusion (RRF)."""
def __init__(self, k: int = 60):
self.k = k
def fuse(
self,
result_sets: list[list[SearchResult]]
) -> list[SearchResult]:
"""Fuse using RRF."""
# Calculate RRF scores
rrf_scores: dict[str, float] = {}
result_map: dict[str, SearchResult] = {}
for results in result_sets:
for rank, result in enumerate(results):
rrf_score = 1 / (self.k + rank + 1)
if result.id not in rrf_scores:
rrf_scores[result.id] = 0
result_map[result.id] = result
rrf_scores[result.id] += rrf_score
# Sort by RRF score
sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
# Build result list
fused = []
for result_id in sorted_ids:
result = result_map[result_id]
result.score = rrf_scores[result_id]
result.source = "rrf"
fused.append(result)
return fused
class WeightedFusion(FusionStrategy):
"""Weighted linear combination."""
def __init__(self, weights: list[float] = None):
self.weights = weights or [0.5, 0.5]
def fuse(
self,
result_sets: list[list[SearchResult]]
) -> list[SearchResult]:
"""Fuse using weighted combination."""
# Normalize weights
total = sum(self.weights)
weights = [w / total for w in self.weights]
# Normalize scores within each result set
normalized_sets = []
for results in result_sets:
if not results:
normalized_sets.append([])
continue
max_score = max(r.score for r in results)
min_score = min(r.score for r in results)
score_range = max_score - min_score if max_score != min_score else 1
normalized = []
for r in results:
normalized_score = (r.score - min_score) / score_range
normalized.append(SearchResult(
id=r.id,
content=r.content,
score=normalized_score,
metadata=r.metadata,
source=r.source
))
normalized_sets.append(normalized)
# Combine scores
combined_scores: dict[str, float] = {}
result_map: dict[str, SearchResult] = {}
for i, results in enumerate(normalized_sets):
weight = weights[i] if i < len(weights) else weights[-1]
for result in results:
if result.id not in combined_scores:
combined_scores[result.id] = 0
result_map[result.id] = result
combined_scores[result.id] += weight * result.score
# Sort by combined score
sorted_ids = sorted(combined_scores.keys(), key=lambda x: combined_scores[x], reverse=True)
fused = []
for result_id in sorted_ids:
result = result_map[result_id]
result.score = combined_scores[result_id]
result.source = "weighted"
fused.append(result)
return fused
class ConvexCombinationFusion(FusionStrategy):
"""Convex combination with learned weights."""
def __init__(self, alpha: float = 0.5):
self.alpha = alpha # Weight for vector search
def fuse(
self,
result_sets: list[list[SearchResult]]
) -> list[SearchResult]:
"""Fuse using convex combination."""
if len(result_sets) != 2:
raise ValueError("Convex combination requires exactly 2 result sets")
vector_results, keyword_results = result_sets
# Create score maps
vector_scores = {r.id: r.score for r in vector_results}
keyword_scores = {r.id: r.score for r in keyword_results}
# Normalize scores
def normalize(scores: dict) -> dict:
if not scores:
return {}
max_s = max(scores.values())
min_s = min(scores.values())
range_s = max_s - min_s if max_s != min_s else 1
return {k: (v - min_s) / range_s for k, v in scores.items()}
vector_norm = normalize(vector_scores)
keyword_norm = normalize(keyword_scores)
# Combine
all_ids = set(vector_norm.keys()) | set(keyword_norm.keys())
result_map = {r.id: r for r in vector_results + keyword_results}
combined = []
for result_id in all_ids:
v_score = vector_norm.get(result_id, 0)
k_score = keyword_norm.get(result_id, 0)
combined_score = self.alpha * v_score + (1 - self.alpha) * k_score
result = result_map[result_id]
result.score = combined_score
result.source = "convex"
combined.append(result)
combined.sort(key=lambda x: x.score, reverse=True)
return combined
class DistributionBasedFusion(FusionStrategy):
"""Distribution-based score fusion."""
def fuse(
self,
result_sets: list[list[SearchResult]]
) -> list[SearchResult]:
"""Fuse using z-score normalization."""
import statistics
normalized_sets = []
for results in result_sets:
if len(results) < 2:
normalized_sets.append(results)
continue
scores = [r.score for r in results]
mean = statistics.mean(scores)
stdev = statistics.stdev(scores) if len(scores) > 1 else 1
normalized = []
for r in results:
z_score = (r.score - mean) / stdev if stdev > 0 else 0
normalized.append(SearchResult(
id=r.id,
content=r.content,
score=z_score,
metadata=r.metadata,
source=r.source
))
normalized_sets.append(normalized)
# Sum z-scores
combined_scores: dict[str, float] = {}
result_map: dict[str, SearchResult] = {}
for results in normalized_sets:
for result in results:
if result.id not in combined_scores:
combined_scores[result.id] = 0
result_map[result.id] = result
combined_scores[result.id] += result.score
sorted_ids = sorted(combined_scores.keys(), key=lambda x: combined_scores[x], reverse=True)
fused = []
for result_id in sorted_ids:
result = result_map[result_id]
result.score = combined_scores[result_id]
result.source = "distribution"
fused.append(result)
return fused
Hybrid Search Implementation
from dataclasses import dataclass
from typing import Any, Optional, List
import asyncio
@dataclass
class HybridSearchConfig:
"""Configuration for hybrid search."""
vector_weight: float = 0.5
keyword_weight: float = 0.5
fusion_strategy: str = "rrf"
vector_limit: int = 20
keyword_limit: int = 20
final_limit: int = 10
class HybridSearch:
"""Hybrid search combining vector and keyword search."""
def __init__(
self,
vector_backend: SearchBackend,
keyword_backend: SearchBackend,
config: HybridSearchConfig = None
):
self.vector = vector_backend
self.keyword = keyword_backend
self.config = config or HybridSearchConfig()
# Initialize fusion strategy
if self.config.fusion_strategy == "rrf":
self.fusion = ReciprocalRankFusion()
elif self.config.fusion_strategy == "weighted":
self.fusion = WeightedFusion([
self.config.vector_weight,
self.config.keyword_weight
])
elif self.config.fusion_strategy == "convex":
self.fusion = ConvexCombinationFusion(self.config.vector_weight)
else:
self.fusion = ReciprocalRankFusion()
async def search(
self,
query: str,
limit: int = None
) -> list[SearchResult]:
"""Execute hybrid search."""
limit = limit or self.config.final_limit
# Run both searches in parallel
vector_task = asyncio.create_task(
self.vector.search(query, self.config.vector_limit)
)
keyword_task = asyncio.create_task(
self.keyword.search(query, self.config.keyword_limit)
)
vector_results, keyword_results = await asyncio.gather(
vector_task, keyword_task
)
# Fuse results
fused = self.fusion.fuse([vector_results, keyword_results])
return fused[:limit]
class AdaptiveHybridSearch:
"""Adaptive hybrid search that adjusts weights."""
def __init__(
self,
vector_backend: SearchBackend,
keyword_backend: SearchBackend
):
self.vector = vector_backend
self.keyword = keyword_backend
self.query_classifier = None
def set_query_classifier(self, classifier: Any):
"""Set query classifier for adaptive weighting."""
self.query_classifier = classifier
async def search(
self,
query: str,
limit: int = 10
) -> list[SearchResult]:
"""Execute adaptive hybrid search."""
# Determine weights based on query type
alpha = await self._determine_alpha(query)
# Run searches
vector_results = await self.vector.search(query, limit * 2)
keyword_results = await self.keyword.search(query, limit * 2)
# Fuse with adaptive weights
fusion = ConvexCombinationFusion(alpha)
fused = fusion.fuse([vector_results, keyword_results])
return fused[:limit]
async def _determine_alpha(self, query: str) -> float:
"""Determine vector weight based on query."""
# Default balanced
alpha = 0.5
# Short queries favor keyword search
if len(query.split()) <= 3:
alpha = 0.3
# Long queries favor semantic search
elif len(query.split()) >= 10:
alpha = 0.7
# Questions favor semantic search
if query.endswith("?"):
alpha = min(alpha + 0.1, 0.8)
# Quoted terms favor keyword search
if '"' in query:
alpha = max(alpha - 0.2, 0.2)
return alpha
class MultiIndexHybridSearch:
"""Hybrid search across multiple indices."""
def __init__(self):
self.indices: dict[str, HybridSearch] = {}
def add_index(self, name: str, hybrid_search: HybridSearch):
"""Add a search index."""
self.indices[name] = hybrid_search
async def search(
self,
query: str,
indices: list[str] = None,
limit: int = 10
) -> dict[str, list[SearchResult]]:
"""Search across multiple indices."""
target_indices = indices or list(self.indices.keys())
tasks = {
name: asyncio.create_task(
self.indices[name].search(query, limit)
)
for name in target_indices
if name in self.indices
}
results = {}
for name, task in tasks.items():
results[name] = await task
return results
async def search_unified(
self,
query: str,
indices: list[str] = None,
limit: int = 10
) -> list[SearchResult]:
"""Search and merge results from all indices."""
per_index_results = await self.search(query, indices, limit)
# Merge using RRF
all_results = list(per_index_results.values())
fusion = ReciprocalRankFusion()
return fusion.fuse(all_results)[:limit]
Production Hybrid Search Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict
app = FastAPI()
class SearchRequest(BaseModel):
query: str
limit: int = 10
vector_weight: float = 0.5
fusion_strategy: str = "rrf"
indices: Optional[List[str]] = None
class SearchResponse(BaseModel):
query: str
results: List[Dict]
total: int
fusion_strategy: str
# Initialize components (would be configured in production)
# hybrid_search = HybridSearch(vector_backend, keyword_backend)
@app.post("/v1/search")
async def hybrid_search_endpoint(request: SearchRequest) -> SearchResponse:
"""Execute hybrid search."""
config = HybridSearchConfig(
vector_weight=request.vector_weight,
keyword_weight=1 - request.vector_weight,
fusion_strategy=request.fusion_strategy,
final_limit=request.limit
)
# Would use actual backends in production
results = [] # await hybrid_search.search(request.query, request.limit)
return SearchResponse(
query=request.query,
results=[
{
"id": r.id,
"content": r.content,
"score": r.score,
"source": r.source,
"metadata": r.metadata
}
for r in results
],
total=len(results),
fusion_strategy=request.fusion_strategy
)
@app.post("/v1/search/vector")
async def vector_search_endpoint(
query: str,
limit: int = 10
) -> list[dict]:
"""Vector-only search."""
# results = await vector_backend.search(query, limit)
results = []
return [
{
"id": r.id,
"content": r.content,
"score": r.score
}
for r in results
]
@app.post("/v1/search/keyword")
async def keyword_search_endpoint(
query: str,
limit: int = 10
) -> list[dict]:
"""Keyword-only search."""
# results = await keyword_backend.search(query, limit)
results = []
return [
{
"id": r.id,
"content": r.content,
"score": r.score
}
for r in results
]
@app.get("/v1/search/explain")
async def explain_search(
query: str,
limit: int = 5
) -> dict:
"""Explain search results."""
# vector_results = await vector_backend.search(query, limit)
# keyword_results = await keyword_backend.search(query, limit)
return {
"query": query,
"vector_results": [],
"keyword_results": [],
"fused_results": [],
"explanation": {
"vector_contribution": 0.5,
"keyword_contribution": 0.5,
"fusion_method": "rrf"
}
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- Reciprocal Rank Fusion: https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf
- BM25 Algorithm: https://en.wikipedia.org/wiki/Okapi_BM25
- Weaviate Hybrid Search: https://weaviate.io/developers/weaviate/search/hybrid
- Pinecone Hybrid Search: https://docs.pinecone.io/docs/hybrid-search
- Elasticsearch Vector Search: https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html
Conclusion
Hybrid search delivers the best retrieval quality by combining semantic understanding with lexical precision. Reciprocal Rank Fusion (RRF) is the most robust fusion method—it’s parameter-free and works well across different score distributions. Weighted fusion gives you more control but requires tuning weights for your specific use case. For production systems, run vector and keyword searches in parallel to minimize latency, then fuse results. Consider adaptive weighting based on query characteristics: short keyword-like queries benefit from higher keyword weight, while longer natural language questions benefit from higher vector weight. Monitor both precision and recall separately for each search type to understand where improvements are needed. The optimal balance depends on your data and use case—start with equal weights and adjust based on user feedback and relevance metrics. Many vector databases now support hybrid search natively (Weaviate, Pinecone, Qdrant), which simplifies implementation and improves performance by avoiding separate round trips.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.