Introduction: Semantic caching revolutionizes how we handle LLM requests by recognizing that similar questions deserve similar answers. Unlike traditional exact-match caching, semantic caching uses embeddings to find queries that are semantically equivalent, returning cached responses even when the wording differs. This can reduce LLM API costs by 30-70% while dramatically improving response latency for common queries. This guide covers the techniques that make semantic caching effective: embedding-based similarity search, threshold tuning for cache hit decisions, cache invalidation strategies, hybrid caching approaches, and production deployment patterns. Whether you’re building a customer support bot or a search assistant, these patterns will help you serve faster responses at lower cost.
Semantic Cache: Query Embedding, Similarity Search, Cache Decision
Basic Semantic Cache
from dataclasses import dataclass, field
from typing import Any, Optional, Tuple
from datetime import datetime, timedelta
import numpy as np
import hashlib
@dataclass
class CacheEntry:
"""A cached query-response pair."""
query: str
response: str
embedding: np.ndarray
created_at: datetime = field(default_factory=datetime.now)
access_count: int = 0
last_accessed: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
class SemanticCache:
"""Basic semantic cache using embeddings."""
def __init__(
self,
embedding_model: Any,
similarity_threshold: float = 0.95,
max_entries: int = 10000
):
self.embedder = embedding_model
self.threshold = similarity_threshold
self.max_entries = max_entries
self.entries: list[CacheEntry] = []
self.embedding_matrix: np.ndarray = None
def get(self, query: str) -> Optional[str]:
"""Get cached response for query."""
if not self.entries:
return None
# Embed query
query_embedding = self.embedder.embed(query).vector
# Find most similar
similarities = self._compute_similarities(query_embedding)
best_idx = np.argmax(similarities)
best_similarity = similarities[best_idx]
if best_similarity >= self.threshold:
entry = self.entries[best_idx]
entry.access_count += 1
entry.last_accessed = datetime.now()
return entry.response
return None
def set(self, query: str, response: str, metadata: dict = None):
"""Cache a query-response pair."""
# Embed query
embedding = self.embedder.embed(query).vector
# Check for duplicate
if self.entries:
similarities = self._compute_similarities(embedding)
if np.max(similarities) >= self.threshold:
# Update existing entry
idx = np.argmax(similarities)
self.entries[idx].response = response
self.entries[idx].last_accessed = datetime.now()
return
# Create new entry
entry = CacheEntry(
query=query,
response=response,
embedding=embedding,
metadata=metadata or {}
)
self.entries.append(entry)
self._update_embedding_matrix()
# Evict if over capacity
if len(self.entries) > self.max_entries:
self._evict()
def _compute_similarities(self, query_embedding: np.ndarray) -> np.ndarray:
"""Compute cosine similarities with all entries."""
if self.embedding_matrix is None:
return np.array([])
# Normalize
query_norm = query_embedding / np.linalg.norm(query_embedding)
# Cosine similarity
similarities = np.dot(self.embedding_matrix, query_norm)
return similarities
def _update_embedding_matrix(self):
"""Update the embedding matrix for fast similarity search."""
embeddings = [e.embedding for e in self.entries]
self.embedding_matrix = np.array(embeddings)
# Normalize rows
norms = np.linalg.norm(self.embedding_matrix, axis=1, keepdims=True)
self.embedding_matrix = self.embedding_matrix / norms
def _evict(self):
"""Evict least valuable entries."""
# Score by recency and access count
now = datetime.now()
scores = []
for entry in self.entries:
age_hours = (now - entry.last_accessed).total_seconds() / 3600
score = entry.access_count / (1 + age_hours)
scores.append(score)
# Remove lowest scoring entries
num_to_remove = len(self.entries) - self.max_entries + 100
indices_to_remove = np.argsort(scores)[:num_to_remove]
self.entries = [
e for i, e in enumerate(self.entries)
if i not in indices_to_remove
]
self._update_embedding_matrix()
def clear(self):
"""Clear the cache."""
self.entries = []
self.embedding_matrix = None
def stats(self) -> dict:
"""Get cache statistics."""
if not self.entries:
return {"size": 0}
return {
"size": len(self.entries),
"total_accesses": sum(e.access_count for e in self.entries),
"avg_accesses": np.mean([e.access_count for e in self.entries]),
"oldest_entry": min(e.created_at for e in self.entries).isoformat(),
"newest_entry": max(e.created_at for e in self.entries).isoformat()
}
Advanced Similarity Strategies
from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np
@dataclass
class SimilarityResult:
"""Result of similarity search."""
entry: CacheEntry
similarity: float
rank: int
class AdaptiveThresholdCache(SemanticCache):
"""Cache with adaptive similarity threshold."""
def __init__(
self,
embedding_model: Any,
base_threshold: float = 0.90,
min_threshold: float = 0.85,
max_threshold: float = 0.98
):
super().__init__(embedding_model, base_threshold)
self.base_threshold = base_threshold
self.min_threshold = min_threshold
self.max_threshold = max_threshold
# Track hit/miss patterns
self.hit_similarities: list[float] = []
self.miss_similarities: list[float] = []
def get(self, query: str) -> Optional[str]:
"""Get with adaptive threshold."""
if not self.entries:
return None
query_embedding = self.embedder.embed(query).vector
similarities = self._compute_similarities(query_embedding)
best_idx = np.argmax(similarities)
best_similarity = similarities[best_idx]
# Adaptive threshold based on query characteristics
threshold = self._compute_threshold(query, best_similarity)
if best_similarity >= threshold:
self.hit_similarities.append(best_similarity)
entry = self.entries[best_idx]
entry.access_count += 1
return entry.response
self.miss_similarities.append(best_similarity)
return None
def _compute_threshold(self, query: str, best_similarity: float) -> float:
"""Compute adaptive threshold."""
threshold = self.base_threshold
# Adjust based on query length
query_words = len(query.split())
if query_words < 5:
# Short queries need higher threshold
threshold += 0.02
elif query_words > 20:
# Long queries can use lower threshold
threshold -= 0.02
# Adjust based on historical patterns
if len(self.hit_similarities) > 100:
avg_hit = np.mean(self.hit_similarities[-100:])
if avg_hit > 0.97:
# Hits are very confident, can lower threshold
threshold -= 0.01
return np.clip(threshold, self.min_threshold, self.max_threshold)
class MultiLevelCache:
"""Multi-level cache with different similarity thresholds."""
def __init__(
self,
embedding_model: Any,
levels: list[dict] = None
):
self.embedder = embedding_model
# Default levels
self.levels = levels or [
{"name": "exact", "threshold": 0.99, "ttl_hours": 168},
{"name": "high", "threshold": 0.95, "ttl_hours": 24},
{"name": "medium", "threshold": 0.90, "ttl_hours": 1}
]
self.caches = {
level["name"]: SemanticCache(embedding_model, level["threshold"])
for level in self.levels
}
def get(self, query: str) -> Optional[Tuple[str, str]]:
"""Get from appropriate cache level."""
query_embedding = self.embedder.embed(query).vector
# Check each level
for level in self.levels:
cache = self.caches[level["name"]]
if cache.entries:
similarities = cache._compute_similarities(query_embedding)
best_idx = np.argmax(similarities)
best_similarity = similarities[best_idx]
if best_similarity >= level["threshold"]:
entry = cache.entries[best_idx]
# Check TTL
age_hours = (datetime.now() - entry.created_at).total_seconds() / 3600
if age_hours <= level["ttl_hours"]:
return entry.response, level["name"]
return None, None
def set(self, query: str, response: str, level: str = "high"):
"""Set in specific cache level."""
if level in self.caches:
self.caches[level].set(query, response)
class ClusteredCache:
"""Cache with query clustering for faster lookup."""
def __init__(
self,
embedding_model: Any,
num_clusters: int = 100,
similarity_threshold: float = 0.95
):
self.embedder = embedding_model
self.num_clusters = num_clusters
self.threshold = similarity_threshold
self.clusters: dict[int, list[CacheEntry]] = {}
self.cluster_centroids: np.ndarray = None
def _assign_cluster(self, embedding: np.ndarray) -> int:
"""Assign embedding to nearest cluster."""
if self.cluster_centroids is None:
return 0
similarities = np.dot(self.cluster_centroids, embedding)
return int(np.argmax(similarities))
def get(self, query: str) -> Optional[str]:
"""Get with cluster-based lookup."""
query_embedding = self.embedder.embed(query).vector
query_embedding = query_embedding / np.linalg.norm(query_embedding)
# Find cluster
cluster_id = self._assign_cluster(query_embedding)
if cluster_id not in self.clusters:
return None
# Search within cluster
cluster_entries = self.clusters[cluster_id]
best_entry = None
best_similarity = 0
for entry in cluster_entries:
entry_norm = entry.embedding / np.linalg.norm(entry.embedding)
similarity = np.dot(query_embedding, entry_norm)
if similarity > best_similarity:
best_similarity = similarity
best_entry = entry
if best_similarity >= self.threshold:
return best_entry.response
return None
def set(self, query: str, response: str):
"""Set with cluster assignment."""
embedding = self.embedder.embed(query).vector
embedding = embedding / np.linalg.norm(embedding)
cluster_id = self._assign_cluster(embedding)
entry = CacheEntry(
query=query,
response=response,
embedding=embedding
)
if cluster_id not in self.clusters:
self.clusters[cluster_id] = []
self.clusters[cluster_id].append(entry)
def rebuild_clusters(self):
"""Rebuild cluster centroids."""
from sklearn.cluster import KMeans
# Collect all embeddings
all_embeddings = []
for entries in self.clusters.values():
for entry in entries:
all_embeddings.append(entry.embedding)
if len(all_embeddings) < self.num_clusters:
return
# Cluster
embeddings_matrix = np.array(all_embeddings)
kmeans = KMeans(n_clusters=self.num_clusters, random_state=42)
kmeans.fit(embeddings_matrix)
self.cluster_centroids = kmeans.cluster_centers_
self.cluster_centroids = self.cluster_centroids / np.linalg.norm(
self.cluster_centroids, axis=1, keepdims=True
)
# Reassign entries to new clusters
new_clusters = {}
for entries in self.clusters.values():
for entry in entries:
new_cluster = self._assign_cluster(entry.embedding)
if new_cluster not in new_clusters:
new_clusters[new_cluster] = []
new_clusters[new_cluster].append(entry)
self.clusters = new_clusters
Cache Invalidation
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from datetime import datetime, timedelta
from enum import Enum
class InvalidationStrategy(Enum):
"""Cache invalidation strategies."""
TTL = "ttl"
LRU = "lru"
LFU = "lfu"
ADAPTIVE = "adaptive"
@dataclass
class InvalidationConfig:
"""Configuration for cache invalidation."""
strategy: InvalidationStrategy = InvalidationStrategy.TTL
ttl_seconds: int = 3600
max_entries: int = 10000
min_accesses: int = 2
class TTLInvalidator:
"""Time-based cache invalidation."""
def __init__(self, ttl_seconds: int = 3600):
self.ttl = timedelta(seconds=ttl_seconds)
def is_valid(self, entry: CacheEntry) -> bool:
"""Check if entry is still valid."""
age = datetime.now() - entry.created_at
return age < self.ttl
def invalidate(self, entries: list[CacheEntry]) -> list[CacheEntry]:
"""Remove expired entries."""
return [e for e in entries if self.is_valid(e)]
class LRUInvalidator:
"""Least Recently Used invalidation."""
def __init__(self, max_entries: int = 10000):
self.max_entries = max_entries
def invalidate(self, entries: list[CacheEntry]) -> list[CacheEntry]:
"""Remove least recently used entries."""
if len(entries) <= self.max_entries:
return entries
# Sort by last accessed
sorted_entries = sorted(
entries,
key=lambda e: e.last_accessed,
reverse=True
)
return sorted_entries[:self.max_entries]
class LFUInvalidator:
"""Least Frequently Used invalidation."""
def __init__(self, max_entries: int = 10000, decay_factor: float = 0.9):
self.max_entries = max_entries
self.decay_factor = decay_factor
def invalidate(self, entries: list[CacheEntry]) -> list[CacheEntry]:
"""Remove least frequently used entries."""
if len(entries) <= self.max_entries:
return entries
# Calculate frequency score with time decay
now = datetime.now()
scores = []
for entry in entries:
age_hours = (now - entry.created_at).total_seconds() / 3600
decay = self.decay_factor ** age_hours
score = entry.access_count * decay
scores.append((entry, score))
# Sort by score
scores.sort(key=lambda x: x[1], reverse=True)
return [e for e, _ in scores[:self.max_entries]]
class AdaptiveInvalidator:
"""Adaptive invalidation based on query patterns."""
def __init__(
self,
max_entries: int = 10000,
min_hit_rate: float = 0.1
):
self.max_entries = max_entries
self.min_hit_rate = min_hit_rate
def invalidate(self, entries: list[CacheEntry]) -> list[CacheEntry]:
"""Adaptively invalidate based on performance."""
if len(entries) <= self.max_entries:
return entries
# Calculate value score
now = datetime.now()
scored_entries = []
for entry in entries:
# Factors: access count, recency, response length
age_hours = max(1, (now - entry.created_at).total_seconds() / 3600)
recency_hours = max(1, (now - entry.last_accessed).total_seconds() / 3600)
# Hit rate approximation
hit_rate = entry.access_count / age_hours
# Value score
score = (
hit_rate * 10 +
entry.access_count * 0.5 +
1 / recency_hours
)
scored_entries.append((entry, score))
# Keep high-value entries
scored_entries.sort(key=lambda x: x[1], reverse=True)
return [e for e, _ in scored_entries[:self.max_entries]]
class ContentAwareInvalidator:
"""Invalidate based on content changes."""
def __init__(self, change_detector: Callable = None):
self.change_detector = change_detector
self.content_versions: dict[str, str] = {}
def register_content(self, content_id: str, version: str):
"""Register content version."""
self.content_versions[content_id] = version
def invalidate_for_content(
self,
entries: list[CacheEntry],
content_id: str,
new_version: str
) -> list[CacheEntry]:
"""Invalidate entries related to changed content."""
old_version = self.content_versions.get(content_id)
if old_version == new_version:
return entries
# Update version
self.content_versions[content_id] = new_version
# Remove entries related to this content
return [
e for e in entries
if e.metadata.get("content_id") != content_id
]
Hybrid Caching
from dataclasses import dataclass
from typing import Any, Optional, Dict
import hashlib
class HybridCache:
"""Combine exact and semantic caching."""
def __init__(
self,
embedding_model: Any,
semantic_threshold: float = 0.95
):
self.embedder = embedding_model
self.semantic_threshold = semantic_threshold
# Exact match cache (hash-based)
self.exact_cache: dict[str, CacheEntry] = {}
# Semantic cache
self.semantic_cache = SemanticCache(
embedding_model,
semantic_threshold
)
def _hash_query(self, query: str) -> str:
"""Create hash for exact matching."""
# Normalize query
normalized = query.lower().strip()
return hashlib.md5(normalized.encode()).hexdigest()
def get(self, query: str) -> Optional[Tuple[str, str]]:
"""Get from cache (exact first, then semantic)."""
# Try exact match first
query_hash = self._hash_query(query)
if query_hash in self.exact_cache:
entry = self.exact_cache[query_hash]
entry.access_count += 1
entry.last_accessed = datetime.now()
return entry.response, "exact"
# Try semantic match
response = self.semantic_cache.get(query)
if response:
return response, "semantic"
return None, None
def set(self, query: str, response: str, metadata: dict = None):
"""Set in both caches."""
# Exact cache
query_hash = self._hash_query(query)
embedding = self.embedder.embed(query).vector
entry = CacheEntry(
query=query,
response=response,
embedding=embedding,
metadata=metadata or {}
)
self.exact_cache[query_hash] = entry
# Semantic cache
self.semantic_cache.set(query, response, metadata)
def stats(self) -> dict:
"""Get combined statistics."""
return {
"exact_cache_size": len(self.exact_cache),
"semantic_cache": self.semantic_cache.stats()
}
class TieredCache:
"""Multi-tier caching with different backends."""
def __init__(
self,
embedding_model: Any,
redis_client: Any = None,
vector_store: Any = None
):
self.embedder = embedding_model
self.redis = redis_client
self.vector_store = vector_store
# L1: In-memory (fast, small)
self.l1_cache = SemanticCache(embedding_model, max_entries=1000)
# L2: Redis (medium speed, medium size)
# L3: Vector store (slower, large)
async def get(self, query: str) -> Optional[Tuple[str, str]]:
"""Get from tiered cache."""
# L1: In-memory
response = self.l1_cache.get(query)
if response:
return response, "l1"
# L2: Redis
if self.redis:
response = await self._get_from_redis(query)
if response:
# Promote to L1
self.l1_cache.set(query, response)
return response, "l2"
# L3: Vector store
if self.vector_store:
response = await self._get_from_vector_store(query)
if response:
# Promote to L1 and L2
self.l1_cache.set(query, response)
if self.redis:
await self._set_in_redis(query, response)
return response, "l3"
return None, None
async def set(self, query: str, response: str):
"""Set in all tiers."""
# L1
self.l1_cache.set(query, response)
# L2
if self.redis:
await self._set_in_redis(query, response)
# L3
if self.vector_store:
await self._set_in_vector_store(query, response)
async def _get_from_redis(self, query: str) -> Optional[str]:
"""Get from Redis cache."""
query_hash = hashlib.md5(query.encode()).hexdigest()
return await self.redis.get(f"cache:{query_hash}")
async def _set_in_redis(self, query: str, response: str):
"""Set in Redis cache."""
query_hash = hashlib.md5(query.encode()).hexdigest()
await self.redis.set(f"cache:{query_hash}", response, ex=3600)
async def _get_from_vector_store(self, query: str) -> Optional[str]:
"""Get from vector store."""
embedding = self.embedder.embed(query).vector
results = await self.vector_store.search(embedding, k=1)
if results and results[0].score >= 0.95:
return results[0].metadata.get("response")
return None
async def _set_in_vector_store(self, query: str, response: str):
"""Set in vector store."""
embedding = self.embedder.embed(query).vector
await self.vector_store.add(
embedding=embedding,
metadata={"query": query, "response": response}
)
class ContextAwareCache:
"""Cache that considers conversation context."""
def __init__(
self,
embedding_model: Any,
context_weight: float = 0.3
):
self.embedder = embedding_model
self.context_weight = context_weight
self.cache = SemanticCache(embedding_model)
def get(
self,
query: str,
context: list[str] = None
) -> Optional[str]:
"""Get considering context."""
# Embed query
query_embedding = self.embedder.embed(query).vector
# Embed context if provided
if context:
context_text = " ".join(context[-3:]) # Last 3 messages
context_embedding = self.embedder.embed(context_text).vector
# Combine embeddings
combined = (
(1 - self.context_weight) * query_embedding +
self.context_weight * context_embedding
)
combined = combined / np.linalg.norm(combined)
else:
combined = query_embedding
# Search cache
if not self.cache.entries:
return None
similarities = np.dot(
self.cache.embedding_matrix,
combined
)
best_idx = np.argmax(similarities)
if similarities[best_idx] >= self.cache.threshold:
return self.cache.entries[best_idx].response
return None
def set(
self,
query: str,
response: str,
context: list[str] = None
):
"""Set with context."""
metadata = {}
if context:
metadata["context_hash"] = hashlib.md5(
"".join(context).encode()
).hexdigest()
self.cache.set(query, response, metadata)
Production Cache Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import time
app = FastAPI()
class CacheRequest(BaseModel):
query: str
context: Optional[list[str]] = None
class CacheResponse(BaseModel):
response: Optional[str]
cache_hit: bool
cache_level: Optional[str]
latency_ms: float
class SetCacheRequest(BaseModel):
query: str
response: str
metadata: Optional[dict] = None
class CacheMetrics:
"""Track cache performance."""
def __init__(self):
self.hits = 0
self.misses = 0
self.hit_latencies = []
self.miss_latencies = []
def record_hit(self, latency: float):
self.hits += 1
self.hit_latencies.append(latency)
def record_miss(self, latency: float):
self.misses += 1
self.miss_latencies.append(latency)
def stats(self) -> dict:
total = self.hits + self.misses
return {
"total_requests": total,
"hits": self.hits,
"misses": self.misses,
"hit_rate": self.hits / total if total > 0 else 0,
"avg_hit_latency_ms": sum(self.hit_latencies) / len(self.hit_latencies) * 1000 if self.hit_latencies else 0,
"avg_miss_latency_ms": sum(self.miss_latencies) / len(self.miss_latencies) * 1000 if self.miss_latencies else 0
}
metrics = CacheMetrics()
# Mock cache for demo
class MockCache:
def __init__(self):
self.data = {}
def get(self, query):
return self.data.get(query)
def set(self, query, response, metadata=None):
self.data[query] = response
cache = MockCache()
@app.post("/v1/cache/get")
async def get_cached(request: CacheRequest) -> CacheResponse:
"""Get from cache."""
start = time.time()
response = cache.get(request.query)
latency = time.time() - start
if response:
metrics.record_hit(latency)
return CacheResponse(
response=response,
cache_hit=True,
cache_level="semantic",
latency_ms=latency * 1000
)
metrics.record_miss(latency)
return CacheResponse(
response=None,
cache_hit=False,
cache_level=None,
latency_ms=latency * 1000
)
@app.post("/v1/cache/set")
async def set_cached(request: SetCacheRequest) -> dict:
"""Set in cache."""
cache.set(request.query, request.response, request.metadata)
return {"status": "cached"}
@app.get("/v1/cache/stats")
async def get_stats() -> dict:
"""Get cache statistics."""
return metrics.stats()
@app.delete("/v1/cache")
async def clear_cache() -> dict:
"""Clear cache."""
cache.data = {}
return {"status": "cleared"}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
Conclusion
Semantic caching transforms LLM economics by recognizing that users often ask similar questions in different ways. The similarity threshold is the critical parameter—too high and you miss valid cache hits, too low and you return irrelevant responses. Start with 0.95 and adjust based on your domain; specialized domains can often use lower thresholds because the query space is more constrained. Hybrid caching combining exact and semantic matching gives you the best of both worlds—instant responses for repeated queries and intelligent matching for paraphrases. Multi-tier caching with in-memory, Redis, and vector store layers balances speed and capacity. Cache invalidation is as important as caching itself; time-based TTL works for most cases, but content-aware invalidation is essential when underlying data changes. Monitor your hit rate and latency distributions—a well-tuned cache should achieve 30-50% hit rates for typical applications. The key insight is that semantic caching is not just about saving money—it's about providing faster, more consistent responses to your users while reducing load on your LLM infrastructure.