Hybrid Search Implementation: Combining Vector and Keyword Retrieval

Introduction: Hybrid search combines the best of both worlds: the semantic understanding of vector search with the precision of keyword matching. Pure vector search excels at finding conceptually similar content but can miss exact matches; pure keyword search finds exact terms but misses semantic relationships. Hybrid search fuses these approaches, using vector similarity for semantic relevance and BM25 or TF-IDF for lexical matching. The challenge lies in combining scores from different ranking systems—Reciprocal Rank Fusion (RRF) and weighted linear combination are the most common approaches. This guide covers practical patterns for implementing hybrid search: from basic score fusion to sophisticated reranking strategies that deliver the best of both retrieval paradigms.

Hybrid Search
Hybrid Search: Vector + Keyword + Fusion

Search Components

from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from abc import ABC, abstractmethod
import math

@dataclass
class SearchResult:
    """A search result."""
    
    id: str
    content: str
    score: float
    metadata: dict = field(default_factory=dict)
    source: str = "unknown"

class SearchBackend(ABC):
    """Abstract search backend."""
    
    @abstractmethod
    async def search(
        self,
        query: str,
        limit: int = 10
    ) -> list[SearchResult]:
        """Execute search."""
        pass

class VectorSearchBackend(SearchBackend):
    """Vector similarity search."""
    
    def __init__(self, embedding_model: Any, vector_store: Any):
        self.embedding_model = embedding_model
        self.vector_store = vector_store
    
    async def search(
        self,
        query: str,
        limit: int = 10
    ) -> list[SearchResult]:
        """Search by vector similarity."""
        
        # Generate query embedding
        query_embedding = await self.embedding_model.embed(query)
        
        # Search vector store
        results = await self.vector_store.search(
            embedding=query_embedding,
            limit=limit
        )
        
        return [
            SearchResult(
                id=r["id"],
                content=r["content"],
                score=r["score"],
                metadata=r.get("metadata", {}),
                source="vector"
            )
            for r in results
        ]

class BM25SearchBackend(SearchBackend):
    """BM25 keyword search."""
    
    def __init__(self, k1: float = 1.5, b: float = 0.75):
        self.k1 = k1
        self.b = b
        self.documents: list[dict] = []
        self.doc_lengths: list[int] = []
        self.avg_doc_length: float = 0
        self.term_frequencies: dict[str, dict[int, int]] = {}
        self.doc_frequencies: dict[str, int] = {}
        self.N: int = 0
    
    def index(self, documents: list[dict]):
        """Index documents for BM25."""
        
        self.documents = documents
        self.N = len(documents)
        
        for doc_id, doc in enumerate(documents):
            tokens = self._tokenize(doc["content"])
            self.doc_lengths.append(len(tokens))
            
            term_counts = {}
            for token in tokens:
                term_counts[token] = term_counts.get(token, 0) + 1
            
            for term, count in term_counts.items():
                if term not in self.term_frequencies:
                    self.term_frequencies[term] = {}
                    self.doc_frequencies[term] = 0
                
                self.term_frequencies[term][doc_id] = count
                self.doc_frequencies[term] += 1
        
        self.avg_doc_length = sum(self.doc_lengths) / len(self.doc_lengths) if self.doc_lengths else 0
    
    def _tokenize(self, text: str) -> list[str]:
        """Simple tokenization."""
        
        import re
        return re.findall(r'\w+', text.lower())
    
    def _idf(self, term: str) -> float:
        """Calculate IDF for term."""
        
        df = self.doc_frequencies.get(term, 0)
        if df == 0:
            return 0
        
        return math.log((self.N - df + 0.5) / (df + 0.5) + 1)
    
    def _score_document(self, query_terms: list[str], doc_id: int) -> float:
        """Calculate BM25 score for document."""
        
        score = 0
        doc_length = self.doc_lengths[doc_id]
        
        for term in query_terms:
            if term not in self.term_frequencies:
                continue
            
            tf = self.term_frequencies[term].get(doc_id, 0)
            if tf == 0:
                continue
            
            idf = self._idf(term)
            
            numerator = tf * (self.k1 + 1)
            denominator = tf + self.k1 * (1 - self.b + self.b * doc_length / self.avg_doc_length)
            
            score += idf * numerator / denominator
        
        return score
    
    async def search(
        self,
        query: str,
        limit: int = 10
    ) -> list[SearchResult]:
        """Search using BM25."""
        
        query_terms = self._tokenize(query)
        
        scores = []
        for doc_id in range(self.N):
            score = self._score_document(query_terms, doc_id)
            if score > 0:
                scores.append((doc_id, score))
        
        # Sort by score
        scores.sort(key=lambda x: x[1], reverse=True)
        
        results = []
        for doc_id, score in scores[:limit]:
            doc = self.documents[doc_id]
            results.append(SearchResult(
                id=doc.get("id", str(doc_id)),
                content=doc["content"],
                score=score,
                metadata=doc.get("metadata", {}),
                source="bm25"
            ))
        
        return results

class ElasticsearchBackend(SearchBackend):
    """Elasticsearch keyword search."""
    
    def __init__(self, client: Any, index_name: str):
        self.client = client
        self.index_name = index_name
    
    async def search(
        self,
        query: str,
        limit: int = 10
    ) -> list[SearchResult]:
        """Search using Elasticsearch."""
        
        response = await self.client.search(
            index=self.index_name,
            body={
                "query": {
                    "multi_match": {
                        "query": query,
                        "fields": ["content", "title^2"],
                        "type": "best_fields"
                    }
                },
                "size": limit
            }
        )
        
        results = []
        for hit in response["hits"]["hits"]:
            results.append(SearchResult(
                id=hit["_id"],
                content=hit["_source"].get("content", ""),
                score=hit["_score"],
                metadata=hit["_source"],
                source="elasticsearch"
            ))
        
        return results

Score Fusion Strategies

from dataclasses import dataclass
from typing import Any, Optional, List
from abc import ABC, abstractmethod

class FusionStrategy(ABC):
    """Abstract fusion strategy."""
    
    @abstractmethod
    def fuse(
        self,
        result_sets: list[list[SearchResult]]
    ) -> list[SearchResult]:
        """Fuse multiple result sets."""
        pass

class ReciprocalRankFusion(FusionStrategy):
    """Reciprocal Rank Fusion (RRF)."""
    
    def __init__(self, k: int = 60):
        self.k = k
    
    def fuse(
        self,
        result_sets: list[list[SearchResult]]
    ) -> list[SearchResult]:
        """Fuse using RRF."""
        
        # Calculate RRF scores
        rrf_scores: dict[str, float] = {}
        result_map: dict[str, SearchResult] = {}
        
        for results in result_sets:
            for rank, result in enumerate(results):
                rrf_score = 1 / (self.k + rank + 1)
                
                if result.id not in rrf_scores:
                    rrf_scores[result.id] = 0
                    result_map[result.id] = result
                
                rrf_scores[result.id] += rrf_score
        
        # Sort by RRF score
        sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
        
        # Build result list
        fused = []
        for result_id in sorted_ids:
            result = result_map[result_id]
            result.score = rrf_scores[result_id]
            result.source = "rrf"
            fused.append(result)
        
        return fused

class WeightedFusion(FusionStrategy):
    """Weighted linear combination."""
    
    def __init__(self, weights: list[float] = None):
        self.weights = weights or [0.5, 0.5]
    
    def fuse(
        self,
        result_sets: list[list[SearchResult]]
    ) -> list[SearchResult]:
        """Fuse using weighted combination."""
        
        # Normalize weights
        total = sum(self.weights)
        weights = [w / total for w in self.weights]
        
        # Normalize scores within each result set
        normalized_sets = []
        for results in result_sets:
            if not results:
                normalized_sets.append([])
                continue
            
            max_score = max(r.score for r in results)
            min_score = min(r.score for r in results)
            score_range = max_score - min_score if max_score != min_score else 1
            
            normalized = []
            for r in results:
                normalized_score = (r.score - min_score) / score_range
                normalized.append(SearchResult(
                    id=r.id,
                    content=r.content,
                    score=normalized_score,
                    metadata=r.metadata,
                    source=r.source
                ))
            normalized_sets.append(normalized)
        
        # Combine scores
        combined_scores: dict[str, float] = {}
        result_map: dict[str, SearchResult] = {}
        
        for i, results in enumerate(normalized_sets):
            weight = weights[i] if i < len(weights) else weights[-1]
            
            for result in results:
                if result.id not in combined_scores:
                    combined_scores[result.id] = 0
                    result_map[result.id] = result
                
                combined_scores[result.id] += weight * result.score
        
        # Sort by combined score
        sorted_ids = sorted(combined_scores.keys(), key=lambda x: combined_scores[x], reverse=True)
        
        fused = []
        for result_id in sorted_ids:
            result = result_map[result_id]
            result.score = combined_scores[result_id]
            result.source = "weighted"
            fused.append(result)
        
        return fused

class ConvexCombinationFusion(FusionStrategy):
    """Convex combination with learned weights."""
    
    def __init__(self, alpha: float = 0.5):
        self.alpha = alpha  # Weight for vector search
    
    def fuse(
        self,
        result_sets: list[list[SearchResult]]
    ) -> list[SearchResult]:
        """Fuse using convex combination."""
        
        if len(result_sets) != 2:
            raise ValueError("Convex combination requires exactly 2 result sets")
        
        vector_results, keyword_results = result_sets
        
        # Create score maps
        vector_scores = {r.id: r.score for r in vector_results}
        keyword_scores = {r.id: r.score for r in keyword_results}
        
        # Normalize scores
        def normalize(scores: dict) -> dict:
            if not scores:
                return {}
            max_s = max(scores.values())
            min_s = min(scores.values())
            range_s = max_s - min_s if max_s != min_s else 1
            return {k: (v - min_s) / range_s for k, v in scores.items()}
        
        vector_norm = normalize(vector_scores)
        keyword_norm = normalize(keyword_scores)
        
        # Combine
        all_ids = set(vector_norm.keys()) | set(keyword_norm.keys())
        result_map = {r.id: r for r in vector_results + keyword_results}
        
        combined = []
        for result_id in all_ids:
            v_score = vector_norm.get(result_id, 0)
            k_score = keyword_norm.get(result_id, 0)
            
            combined_score = self.alpha * v_score + (1 - self.alpha) * k_score
            
            result = result_map[result_id]
            result.score = combined_score
            result.source = "convex"
            combined.append(result)
        
        combined.sort(key=lambda x: x.score, reverse=True)
        
        return combined

class DistributionBasedFusion(FusionStrategy):
    """Distribution-based score fusion."""
    
    def fuse(
        self,
        result_sets: list[list[SearchResult]]
    ) -> list[SearchResult]:
        """Fuse using z-score normalization."""
        
        import statistics
        
        normalized_sets = []
        
        for results in result_sets:
            if len(results) < 2:
                normalized_sets.append(results)
                continue
            
            scores = [r.score for r in results]
            mean = statistics.mean(scores)
            stdev = statistics.stdev(scores) if len(scores) > 1 else 1
            
            normalized = []
            for r in results:
                z_score = (r.score - mean) / stdev if stdev > 0 else 0
                normalized.append(SearchResult(
                    id=r.id,
                    content=r.content,
                    score=z_score,
                    metadata=r.metadata,
                    source=r.source
                ))
            normalized_sets.append(normalized)
        
        # Sum z-scores
        combined_scores: dict[str, float] = {}
        result_map: dict[str, SearchResult] = {}
        
        for results in normalized_sets:
            for result in results:
                if result.id not in combined_scores:
                    combined_scores[result.id] = 0
                    result_map[result.id] = result
                
                combined_scores[result.id] += result.score
        
        sorted_ids = sorted(combined_scores.keys(), key=lambda x: combined_scores[x], reverse=True)
        
        fused = []
        for result_id in sorted_ids:
            result = result_map[result_id]
            result.score = combined_scores[result_id]
            result.source = "distribution"
            fused.append(result)
        
        return fused

Hybrid Search Implementation

from dataclasses import dataclass
from typing import Any, Optional, List
import asyncio

@dataclass
class HybridSearchConfig:
    """Configuration for hybrid search."""
    
    vector_weight: float = 0.5
    keyword_weight: float = 0.5
    fusion_strategy: str = "rrf"
    vector_limit: int = 20
    keyword_limit: int = 20
    final_limit: int = 10

class HybridSearch:
    """Hybrid search combining vector and keyword search."""
    
    def __init__(
        self,
        vector_backend: SearchBackend,
        keyword_backend: SearchBackend,
        config: HybridSearchConfig = None
    ):
        self.vector = vector_backend
        self.keyword = keyword_backend
        self.config = config or HybridSearchConfig()
        
        # Initialize fusion strategy
        if self.config.fusion_strategy == "rrf":
            self.fusion = ReciprocalRankFusion()
        elif self.config.fusion_strategy == "weighted":
            self.fusion = WeightedFusion([
                self.config.vector_weight,
                self.config.keyword_weight
            ])
        elif self.config.fusion_strategy == "convex":
            self.fusion = ConvexCombinationFusion(self.config.vector_weight)
        else:
            self.fusion = ReciprocalRankFusion()
    
    async def search(
        self,
        query: str,
        limit: int = None
    ) -> list[SearchResult]:
        """Execute hybrid search."""
        
        limit = limit or self.config.final_limit
        
        # Run both searches in parallel
        vector_task = asyncio.create_task(
            self.vector.search(query, self.config.vector_limit)
        )
        keyword_task = asyncio.create_task(
            self.keyword.search(query, self.config.keyword_limit)
        )
        
        vector_results, keyword_results = await asyncio.gather(
            vector_task, keyword_task
        )
        
        # Fuse results
        fused = self.fusion.fuse([vector_results, keyword_results])
        
        return fused[:limit]

class AdaptiveHybridSearch:
    """Adaptive hybrid search that adjusts weights."""
    
    def __init__(
        self,
        vector_backend: SearchBackend,
        keyword_backend: SearchBackend
    ):
        self.vector = vector_backend
        self.keyword = keyword_backend
        self.query_classifier = None
    
    def set_query_classifier(self, classifier: Any):
        """Set query classifier for adaptive weighting."""
        
        self.query_classifier = classifier
    
    async def search(
        self,
        query: str,
        limit: int = 10
    ) -> list[SearchResult]:
        """Execute adaptive hybrid search."""
        
        # Determine weights based on query type
        alpha = await self._determine_alpha(query)
        
        # Run searches
        vector_results = await self.vector.search(query, limit * 2)
        keyword_results = await self.keyword.search(query, limit * 2)
        
        # Fuse with adaptive weights
        fusion = ConvexCombinationFusion(alpha)
        fused = fusion.fuse([vector_results, keyword_results])
        
        return fused[:limit]
    
    async def _determine_alpha(self, query: str) -> float:
        """Determine vector weight based on query."""
        
        # Default balanced
        alpha = 0.5
        
        # Short queries favor keyword search
        if len(query.split()) <= 3:
            alpha = 0.3
        
        # Long queries favor semantic search
        elif len(query.split()) >= 10:
            alpha = 0.7
        
        # Questions favor semantic search
        if query.endswith("?"):
            alpha = min(alpha + 0.1, 0.8)
        
        # Quoted terms favor keyword search
        if '"' in query:
            alpha = max(alpha - 0.2, 0.2)
        
        return alpha

class MultiIndexHybridSearch:
    """Hybrid search across multiple indices."""
    
    def __init__(self):
        self.indices: dict[str, HybridSearch] = {}
    
    def add_index(self, name: str, hybrid_search: HybridSearch):
        """Add a search index."""
        
        self.indices[name] = hybrid_search
    
    async def search(
        self,
        query: str,
        indices: list[str] = None,
        limit: int = 10
    ) -> dict[str, list[SearchResult]]:
        """Search across multiple indices."""
        
        target_indices = indices or list(self.indices.keys())
        
        tasks = {
            name: asyncio.create_task(
                self.indices[name].search(query, limit)
            )
            for name in target_indices
            if name in self.indices
        }
        
        results = {}
        for name, task in tasks.items():
            results[name] = await task
        
        return results
    
    async def search_unified(
        self,
        query: str,
        indices: list[str] = None,
        limit: int = 10
    ) -> list[SearchResult]:
        """Search and merge results from all indices."""
        
        per_index_results = await self.search(query, indices, limit)
        
        # Merge using RRF
        all_results = list(per_index_results.values())
        fusion = ReciprocalRankFusion()
        
        return fusion.fuse(all_results)[:limit]

Production Hybrid Search Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict

app = FastAPI()

class SearchRequest(BaseModel):
    query: str
    limit: int = 10
    vector_weight: float = 0.5
    fusion_strategy: str = "rrf"
    indices: Optional[List[str]] = None

class SearchResponse(BaseModel):
    query: str
    results: List[Dict]
    total: int
    fusion_strategy: str

# Initialize components (would be configured in production)
# hybrid_search = HybridSearch(vector_backend, keyword_backend)

@app.post("/v1/search")
async def hybrid_search_endpoint(request: SearchRequest) -> SearchResponse:
    """Execute hybrid search."""
    
    config = HybridSearchConfig(
        vector_weight=request.vector_weight,
        keyword_weight=1 - request.vector_weight,
        fusion_strategy=request.fusion_strategy,
        final_limit=request.limit
    )
    
    # Would use actual backends in production
    results = []  # await hybrid_search.search(request.query, request.limit)
    
    return SearchResponse(
        query=request.query,
        results=[
            {
                "id": r.id,
                "content": r.content,
                "score": r.score,
                "source": r.source,
                "metadata": r.metadata
            }
            for r in results
        ],
        total=len(results),
        fusion_strategy=request.fusion_strategy
    )

@app.post("/v1/search/vector")
async def vector_search_endpoint(
    query: str,
    limit: int = 10
) -> list[dict]:
    """Vector-only search."""
    
    # results = await vector_backend.search(query, limit)
    results = []
    
    return [
        {
            "id": r.id,
            "content": r.content,
            "score": r.score
        }
        for r in results
    ]

@app.post("/v1/search/keyword")
async def keyword_search_endpoint(
    query: str,
    limit: int = 10
) -> list[dict]:
    """Keyword-only search."""
    
    # results = await keyword_backend.search(query, limit)
    results = []
    
    return [
        {
            "id": r.id,
            "content": r.content,
            "score": r.score
        }
        for r in results
    ]

@app.get("/v1/search/explain")
async def explain_search(
    query: str,
    limit: int = 5
) -> dict:
    """Explain search results."""
    
    # vector_results = await vector_backend.search(query, limit)
    # keyword_results = await keyword_backend.search(query, limit)
    
    return {
        "query": query,
        "vector_results": [],
        "keyword_results": [],
        "fused_results": [],
        "explanation": {
            "vector_contribution": 0.5,
            "keyword_contribution": 0.5,
            "fusion_method": "rrf"
        }
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Hybrid search delivers the best retrieval quality by combining semantic understanding with lexical precision. Reciprocal Rank Fusion (RRF) is the most robust fusion method—it’s parameter-free and works well across different score distributions. Weighted fusion gives you more control but requires tuning weights for your specific use case. For production systems, run vector and keyword searches in parallel to minimize latency, then fuse results. Consider adaptive weighting based on query characteristics: short keyword-like queries benefit from higher keyword weight, while longer natural language questions benefit from higher vector weight. Monitor both precision and recall separately for each search type to understand where improvements are needed. The optimal balance depends on your data and use case—start with equal weights and adjust based on user feedback and relevance metrics. Many vector databases now support hybrid search natively (Weaviate, Pinecone, Qdrant), which simplifies implementation and improves performance by avoiding separate round trips.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.