Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Microsoft Visual Studio 2015 Update 3 – hotfix build – 14.0.25422.1 (KB3165756)

Microsoft has released a hot-fix for Visual Studio 2015 Update 3, to fix certain critical issues identified after the release of Update 3.

Supported Version Visual Studio 2015 Update 3
File name VS14-KB3165756.exe
Date published 07/12/2016
File size 2.4 MB
This update applied to:
  • Visual Studio Professional 2015
  • Visual Studio Enterprise 2015
  • Visual Studio Community 2015
  • Visual Studio Express 2015 for Web
  • Visual Studio Express 2015 for Desktop
  • Visual Studio Express 2015 for Windows 10
What’s Fixed ?: Refer to knowledge-base article KB3165756

This update can be downloaded from Microsoft Download Center here

Semantic Caching Strategies: Reducing LLM Costs Through Intelligent Query Matching

Introduction: Semantic caching revolutionizes how we handle LLM requests by recognizing that similar questions deserve similar answers. Unlike traditional exact-match caching, semantic caching uses embeddings to find queries that are semantically equivalent, returning cached responses even when the wording differs. This can reduce LLM API costs by 30-70% while dramatically improving response latency for common queries. This guide covers the techniques that make semantic caching effective: embedding-based similarity search, threshold tuning for cache hit decisions, cache invalidation strategies, hybrid caching approaches, and production deployment patterns. Whether you’re building a customer support bot or a search assistant, these patterns will help you serve faster responses at lower cost.

Semantic Caching Strategies
Semantic Cache: Query Embedding, Similarity Search, Cache Decision

Basic Semantic Cache

from dataclasses import dataclass, field
from typing import Any, Optional, Tuple
from datetime import datetime, timedelta
import numpy as np
import hashlib

@dataclass
class CacheEntry:
    """A cached query-response pair."""
    
    query: str
    response: str
    embedding: np.ndarray
    created_at: datetime = field(default_factory=datetime.now)
    access_count: int = 0
    last_accessed: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)

class SemanticCache:
    """Basic semantic cache using embeddings."""
    
    def __init__(
        self,
        embedding_model: Any,
        similarity_threshold: float = 0.95,
        max_entries: int = 10000
    ):
        self.embedder = embedding_model
        self.threshold = similarity_threshold
        self.max_entries = max_entries
        
        self.entries: list[CacheEntry] = []
        self.embedding_matrix: np.ndarray = None
    
    def get(self, query: str) -> Optional[str]:
        """Get cached response for query."""
        
        if not self.entries:
            return None
        
        # Embed query
        query_embedding = self.embedder.embed(query).vector
        
        # Find most similar
        similarities = self._compute_similarities(query_embedding)
        best_idx = np.argmax(similarities)
        best_similarity = similarities[best_idx]
        
        if best_similarity >= self.threshold:
            entry = self.entries[best_idx]
            entry.access_count += 1
            entry.last_accessed = datetime.now()
            return entry.response
        
        return None
    
    def set(self, query: str, response: str, metadata: dict = None):
        """Cache a query-response pair."""
        
        # Embed query
        embedding = self.embedder.embed(query).vector
        
        # Check for duplicate
        if self.entries:
            similarities = self._compute_similarities(embedding)
            if np.max(similarities) >= self.threshold:
                # Update existing entry
                idx = np.argmax(similarities)
                self.entries[idx].response = response
                self.entries[idx].last_accessed = datetime.now()
                return
        
        # Create new entry
        entry = CacheEntry(
            query=query,
            response=response,
            embedding=embedding,
            metadata=metadata or {}
        )
        
        self.entries.append(entry)
        self._update_embedding_matrix()
        
        # Evict if over capacity
        if len(self.entries) > self.max_entries:
            self._evict()
    
    def _compute_similarities(self, query_embedding: np.ndarray) -> np.ndarray:
        """Compute cosine similarities with all entries."""
        
        if self.embedding_matrix is None:
            return np.array([])
        
        # Normalize
        query_norm = query_embedding / np.linalg.norm(query_embedding)
        
        # Cosine similarity
        similarities = np.dot(self.embedding_matrix, query_norm)
        
        return similarities
    
    def _update_embedding_matrix(self):
        """Update the embedding matrix for fast similarity search."""
        
        embeddings = [e.embedding for e in self.entries]
        self.embedding_matrix = np.array(embeddings)
        
        # Normalize rows
        norms = np.linalg.norm(self.embedding_matrix, axis=1, keepdims=True)
        self.embedding_matrix = self.embedding_matrix / norms
    
    def _evict(self):
        """Evict least valuable entries."""
        
        # Score by recency and access count
        now = datetime.now()
        scores = []
        
        for entry in self.entries:
            age_hours = (now - entry.last_accessed).total_seconds() / 3600
            score = entry.access_count / (1 + age_hours)
            scores.append(score)
        
        # Remove lowest scoring entries
        num_to_remove = len(self.entries) - self.max_entries + 100
        indices_to_remove = np.argsort(scores)[:num_to_remove]
        
        self.entries = [
            e for i, e in enumerate(self.entries)
            if i not in indices_to_remove
        ]
        
        self._update_embedding_matrix()
    
    def clear(self):
        """Clear the cache."""
        
        self.entries = []
        self.embedding_matrix = None
    
    def stats(self) -> dict:
        """Get cache statistics."""
        
        if not self.entries:
            return {"size": 0}
        
        return {
            "size": len(self.entries),
            "total_accesses": sum(e.access_count for e in self.entries),
            "avg_accesses": np.mean([e.access_count for e in self.entries]),
            "oldest_entry": min(e.created_at for e in self.entries).isoformat(),
            "newest_entry": max(e.created_at for e in self.entries).isoformat()
        }

Advanced Similarity Strategies

from dataclasses import dataclass
from typing import Any, Optional, List
import numpy as np

@dataclass
class SimilarityResult:
    """Result of similarity search."""
    
    entry: CacheEntry
    similarity: float
    rank: int

class AdaptiveThresholdCache(SemanticCache):
    """Cache with adaptive similarity threshold."""
    
    def __init__(
        self,
        embedding_model: Any,
        base_threshold: float = 0.90,
        min_threshold: float = 0.85,
        max_threshold: float = 0.98
    ):
        super().__init__(embedding_model, base_threshold)
        self.base_threshold = base_threshold
        self.min_threshold = min_threshold
        self.max_threshold = max_threshold
        
        # Track hit/miss patterns
        self.hit_similarities: list[float] = []
        self.miss_similarities: list[float] = []
    
    def get(self, query: str) -> Optional[str]:
        """Get with adaptive threshold."""
        
        if not self.entries:
            return None
        
        query_embedding = self.embedder.embed(query).vector
        similarities = self._compute_similarities(query_embedding)
        
        best_idx = np.argmax(similarities)
        best_similarity = similarities[best_idx]
        
        # Adaptive threshold based on query characteristics
        threshold = self._compute_threshold(query, best_similarity)
        
        if best_similarity >= threshold:
            self.hit_similarities.append(best_similarity)
            entry = self.entries[best_idx]
            entry.access_count += 1
            return entry.response
        
        self.miss_similarities.append(best_similarity)
        return None
    
    def _compute_threshold(self, query: str, best_similarity: float) -> float:
        """Compute adaptive threshold."""
        
        threshold = self.base_threshold
        
        # Adjust based on query length
        query_words = len(query.split())
        if query_words < 5:
            # Short queries need higher threshold
            threshold += 0.02
        elif query_words > 20:
            # Long queries can use lower threshold
            threshold -= 0.02
        
        # Adjust based on historical patterns
        if len(self.hit_similarities) > 100:
            avg_hit = np.mean(self.hit_similarities[-100:])
            if avg_hit > 0.97:
                # Hits are very confident, can lower threshold
                threshold -= 0.01
        
        return np.clip(threshold, self.min_threshold, self.max_threshold)

class MultiLevelCache:
    """Multi-level cache with different similarity thresholds."""
    
    def __init__(
        self,
        embedding_model: Any,
        levels: list[dict] = None
    ):
        self.embedder = embedding_model
        
        # Default levels
        self.levels = levels or [
            {"name": "exact", "threshold": 0.99, "ttl_hours": 168},
            {"name": "high", "threshold": 0.95, "ttl_hours": 24},
            {"name": "medium", "threshold": 0.90, "ttl_hours": 1}
        ]
        
        self.caches = {
            level["name"]: SemanticCache(embedding_model, level["threshold"])
            for level in self.levels
        }
    
    def get(self, query: str) -> Optional[Tuple[str, str]]:
        """Get from appropriate cache level."""
        
        query_embedding = self.embedder.embed(query).vector
        
        # Check each level
        for level in self.levels:
            cache = self.caches[level["name"]]
            
            if cache.entries:
                similarities = cache._compute_similarities(query_embedding)
                best_idx = np.argmax(similarities)
                best_similarity = similarities[best_idx]
                
                if best_similarity >= level["threshold"]:
                    entry = cache.entries[best_idx]
                    
                    # Check TTL
                    age_hours = (datetime.now() - entry.created_at).total_seconds() / 3600
                    if age_hours <= level["ttl_hours"]:
                        return entry.response, level["name"]
        
        return None, None
    
    def set(self, query: str, response: str, level: str = "high"):
        """Set in specific cache level."""
        
        if level in self.caches:
            self.caches[level].set(query, response)

class ClusteredCache:
    """Cache with query clustering for faster lookup."""
    
    def __init__(
        self,
        embedding_model: Any,
        num_clusters: int = 100,
        similarity_threshold: float = 0.95
    ):
        self.embedder = embedding_model
        self.num_clusters = num_clusters
        self.threshold = similarity_threshold
        
        self.clusters: dict[int, list[CacheEntry]] = {}
        self.cluster_centroids: np.ndarray = None
    
    def _assign_cluster(self, embedding: np.ndarray) -> int:
        """Assign embedding to nearest cluster."""
        
        if self.cluster_centroids is None:
            return 0
        
        similarities = np.dot(self.cluster_centroids, embedding)
        return int(np.argmax(similarities))
    
    def get(self, query: str) -> Optional[str]:
        """Get with cluster-based lookup."""
        
        query_embedding = self.embedder.embed(query).vector
        query_embedding = query_embedding / np.linalg.norm(query_embedding)
        
        # Find cluster
        cluster_id = self._assign_cluster(query_embedding)
        
        if cluster_id not in self.clusters:
            return None
        
        # Search within cluster
        cluster_entries = self.clusters[cluster_id]
        
        best_entry = None
        best_similarity = 0
        
        for entry in cluster_entries:
            entry_norm = entry.embedding / np.linalg.norm(entry.embedding)
            similarity = np.dot(query_embedding, entry_norm)
            
            if similarity > best_similarity:
                best_similarity = similarity
                best_entry = entry
        
        if best_similarity >= self.threshold:
            return best_entry.response
        
        return None
    
    def set(self, query: str, response: str):
        """Set with cluster assignment."""
        
        embedding = self.embedder.embed(query).vector
        embedding = embedding / np.linalg.norm(embedding)
        
        cluster_id = self._assign_cluster(embedding)
        
        entry = CacheEntry(
            query=query,
            response=response,
            embedding=embedding
        )
        
        if cluster_id not in self.clusters:
            self.clusters[cluster_id] = []
        
        self.clusters[cluster_id].append(entry)
    
    def rebuild_clusters(self):
        """Rebuild cluster centroids."""
        
        from sklearn.cluster import KMeans
        
        # Collect all embeddings
        all_embeddings = []
        for entries in self.clusters.values():
            for entry in entries:
                all_embeddings.append(entry.embedding)
        
        if len(all_embeddings) < self.num_clusters:
            return
        
        # Cluster
        embeddings_matrix = np.array(all_embeddings)
        kmeans = KMeans(n_clusters=self.num_clusters, random_state=42)
        kmeans.fit(embeddings_matrix)
        
        self.cluster_centroids = kmeans.cluster_centers_
        self.cluster_centroids = self.cluster_centroids / np.linalg.norm(
            self.cluster_centroids, axis=1, keepdims=True
        )
        
        # Reassign entries to new clusters
        new_clusters = {}
        for entries in self.clusters.values():
            for entry in entries:
                new_cluster = self._assign_cluster(entry.embedding)
                if new_cluster not in new_clusters:
                    new_clusters[new_cluster] = []
                new_clusters[new_cluster].append(entry)
        
        self.clusters = new_clusters

Cache Invalidation

from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from datetime import datetime, timedelta
from enum import Enum

class InvalidationStrategy(Enum):
    """Cache invalidation strategies."""
    
    TTL = "ttl"
    LRU = "lru"
    LFU = "lfu"
    ADAPTIVE = "adaptive"

@dataclass
class InvalidationConfig:
    """Configuration for cache invalidation."""
    
    strategy: InvalidationStrategy = InvalidationStrategy.TTL
    ttl_seconds: int = 3600
    max_entries: int = 10000
    min_accesses: int = 2

class TTLInvalidator:
    """Time-based cache invalidation."""
    
    def __init__(self, ttl_seconds: int = 3600):
        self.ttl = timedelta(seconds=ttl_seconds)
    
    def is_valid(self, entry: CacheEntry) -> bool:
        """Check if entry is still valid."""
        
        age = datetime.now() - entry.created_at
        return age < self.ttl
    
    def invalidate(self, entries: list[CacheEntry]) -> list[CacheEntry]:
        """Remove expired entries."""
        
        return [e for e in entries if self.is_valid(e)]

class LRUInvalidator:
    """Least Recently Used invalidation."""
    
    def __init__(self, max_entries: int = 10000):
        self.max_entries = max_entries
    
    def invalidate(self, entries: list[CacheEntry]) -> list[CacheEntry]:
        """Remove least recently used entries."""
        
        if len(entries) <= self.max_entries:
            return entries
        
        # Sort by last accessed
        sorted_entries = sorted(
            entries,
            key=lambda e: e.last_accessed,
            reverse=True
        )
        
        return sorted_entries[:self.max_entries]

class LFUInvalidator:
    """Least Frequently Used invalidation."""
    
    def __init__(self, max_entries: int = 10000, decay_factor: float = 0.9):
        self.max_entries = max_entries
        self.decay_factor = decay_factor
    
    def invalidate(self, entries: list[CacheEntry]) -> list[CacheEntry]:
        """Remove least frequently used entries."""
        
        if len(entries) <= self.max_entries:
            return entries
        
        # Calculate frequency score with time decay
        now = datetime.now()
        scores = []
        
        for entry in entries:
            age_hours = (now - entry.created_at).total_seconds() / 3600
            decay = self.decay_factor ** age_hours
            score = entry.access_count * decay
            scores.append((entry, score))
        
        # Sort by score
        scores.sort(key=lambda x: x[1], reverse=True)
        
        return [e for e, _ in scores[:self.max_entries]]

class AdaptiveInvalidator:
    """Adaptive invalidation based on query patterns."""
    
    def __init__(
        self,
        max_entries: int = 10000,
        min_hit_rate: float = 0.1
    ):
        self.max_entries = max_entries
        self.min_hit_rate = min_hit_rate
    
    def invalidate(self, entries: list[CacheEntry]) -> list[CacheEntry]:
        """Adaptively invalidate based on performance."""
        
        if len(entries) <= self.max_entries:
            return entries
        
        # Calculate value score
        now = datetime.now()
        scored_entries = []
        
        for entry in entries:
            # Factors: access count, recency, response length
            age_hours = max(1, (now - entry.created_at).total_seconds() / 3600)
            recency_hours = max(1, (now - entry.last_accessed).total_seconds() / 3600)
            
            # Hit rate approximation
            hit_rate = entry.access_count / age_hours
            
            # Value score
            score = (
                hit_rate * 10 +
                entry.access_count * 0.5 +
                1 / recency_hours
            )
            
            scored_entries.append((entry, score))
        
        # Keep high-value entries
        scored_entries.sort(key=lambda x: x[1], reverse=True)
        
        return [e for e, _ in scored_entries[:self.max_entries]]

class ContentAwareInvalidator:
    """Invalidate based on content changes."""
    
    def __init__(self, change_detector: Callable = None):
        self.change_detector = change_detector
        self.content_versions: dict[str, str] = {}
    
    def register_content(self, content_id: str, version: str):
        """Register content version."""
        
        self.content_versions[content_id] = version
    
    def invalidate_for_content(
        self,
        entries: list[CacheEntry],
        content_id: str,
        new_version: str
    ) -> list[CacheEntry]:
        """Invalidate entries related to changed content."""
        
        old_version = self.content_versions.get(content_id)
        
        if old_version == new_version:
            return entries
        
        # Update version
        self.content_versions[content_id] = new_version
        
        # Remove entries related to this content
        return [
            e for e in entries
            if e.metadata.get("content_id") != content_id
        ]

Hybrid Caching

from dataclasses import dataclass
from typing import Any, Optional, Dict
import hashlib

class HybridCache:
    """Combine exact and semantic caching."""
    
    def __init__(
        self,
        embedding_model: Any,
        semantic_threshold: float = 0.95
    ):
        self.embedder = embedding_model
        self.semantic_threshold = semantic_threshold
        
        # Exact match cache (hash-based)
        self.exact_cache: dict[str, CacheEntry] = {}
        
        # Semantic cache
        self.semantic_cache = SemanticCache(
            embedding_model,
            semantic_threshold
        )
    
    def _hash_query(self, query: str) -> str:
        """Create hash for exact matching."""
        
        # Normalize query
        normalized = query.lower().strip()
        return hashlib.md5(normalized.encode()).hexdigest()
    
    def get(self, query: str) -> Optional[Tuple[str, str]]:
        """Get from cache (exact first, then semantic)."""
        
        # Try exact match first
        query_hash = self._hash_query(query)
        
        if query_hash in self.exact_cache:
            entry = self.exact_cache[query_hash]
            entry.access_count += 1
            entry.last_accessed = datetime.now()
            return entry.response, "exact"
        
        # Try semantic match
        response = self.semantic_cache.get(query)
        
        if response:
            return response, "semantic"
        
        return None, None
    
    def set(self, query: str, response: str, metadata: dict = None):
        """Set in both caches."""
        
        # Exact cache
        query_hash = self._hash_query(query)
        embedding = self.embedder.embed(query).vector
        
        entry = CacheEntry(
            query=query,
            response=response,
            embedding=embedding,
            metadata=metadata or {}
        )
        
        self.exact_cache[query_hash] = entry
        
        # Semantic cache
        self.semantic_cache.set(query, response, metadata)
    
    def stats(self) -> dict:
        """Get combined statistics."""
        
        return {
            "exact_cache_size": len(self.exact_cache),
            "semantic_cache": self.semantic_cache.stats()
        }

class TieredCache:
    """Multi-tier caching with different backends."""
    
    def __init__(
        self,
        embedding_model: Any,
        redis_client: Any = None,
        vector_store: Any = None
    ):
        self.embedder = embedding_model
        self.redis = redis_client
        self.vector_store = vector_store
        
        # L1: In-memory (fast, small)
        self.l1_cache = SemanticCache(embedding_model, max_entries=1000)
        
        # L2: Redis (medium speed, medium size)
        # L3: Vector store (slower, large)
    
    async def get(self, query: str) -> Optional[Tuple[str, str]]:
        """Get from tiered cache."""
        
        # L1: In-memory
        response = self.l1_cache.get(query)
        if response:
            return response, "l1"
        
        # L2: Redis
        if self.redis:
            response = await self._get_from_redis(query)
            if response:
                # Promote to L1
                self.l1_cache.set(query, response)
                return response, "l2"
        
        # L3: Vector store
        if self.vector_store:
            response = await self._get_from_vector_store(query)
            if response:
                # Promote to L1 and L2
                self.l1_cache.set(query, response)
                if self.redis:
                    await self._set_in_redis(query, response)
                return response, "l3"
        
        return None, None
    
    async def set(self, query: str, response: str):
        """Set in all tiers."""
        
        # L1
        self.l1_cache.set(query, response)
        
        # L2
        if self.redis:
            await self._set_in_redis(query, response)
        
        # L3
        if self.vector_store:
            await self._set_in_vector_store(query, response)
    
    async def _get_from_redis(self, query: str) -> Optional[str]:
        """Get from Redis cache."""
        
        query_hash = hashlib.md5(query.encode()).hexdigest()
        return await self.redis.get(f"cache:{query_hash}")
    
    async def _set_in_redis(self, query: str, response: str):
        """Set in Redis cache."""
        
        query_hash = hashlib.md5(query.encode()).hexdigest()
        await self.redis.set(f"cache:{query_hash}", response, ex=3600)
    
    async def _get_from_vector_store(self, query: str) -> Optional[str]:
        """Get from vector store."""
        
        embedding = self.embedder.embed(query).vector
        results = await self.vector_store.search(embedding, k=1)
        
        if results and results[0].score >= 0.95:
            return results[0].metadata.get("response")
        
        return None
    
    async def _set_in_vector_store(self, query: str, response: str):
        """Set in vector store."""
        
        embedding = self.embedder.embed(query).vector
        await self.vector_store.add(
            embedding=embedding,
            metadata={"query": query, "response": response}
        )

class ContextAwareCache:
    """Cache that considers conversation context."""
    
    def __init__(
        self,
        embedding_model: Any,
        context_weight: float = 0.3
    ):
        self.embedder = embedding_model
        self.context_weight = context_weight
        self.cache = SemanticCache(embedding_model)
    
    def get(
        self,
        query: str,
        context: list[str] = None
    ) -> Optional[str]:
        """Get considering context."""
        
        # Embed query
        query_embedding = self.embedder.embed(query).vector
        
        # Embed context if provided
        if context:
            context_text = " ".join(context[-3:])  # Last 3 messages
            context_embedding = self.embedder.embed(context_text).vector
            
            # Combine embeddings
            combined = (
                (1 - self.context_weight) * query_embedding +
                self.context_weight * context_embedding
            )
            combined = combined / np.linalg.norm(combined)
        else:
            combined = query_embedding
        
        # Search cache
        if not self.cache.entries:
            return None
        
        similarities = np.dot(
            self.cache.embedding_matrix,
            combined
        )
        
        best_idx = np.argmax(similarities)
        
        if similarities[best_idx] >= self.cache.threshold:
            return self.cache.entries[best_idx].response
        
        return None
    
    def set(
        self,
        query: str,
        response: str,
        context: list[str] = None
    ):
        """Set with context."""
        
        metadata = {}
        if context:
            metadata["context_hash"] = hashlib.md5(
                "".join(context).encode()
            ).hexdigest()
        
        self.cache.set(query, response, metadata)

Production Cache Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import time

app = FastAPI()

class CacheRequest(BaseModel):
    query: str
    context: Optional[list[str]] = None

class CacheResponse(BaseModel):
    response: Optional[str]
    cache_hit: bool
    cache_level: Optional[str]
    latency_ms: float

class SetCacheRequest(BaseModel):
    query: str
    response: str
    metadata: Optional[dict] = None

class CacheMetrics:
    """Track cache performance."""
    
    def __init__(self):
        self.hits = 0
        self.misses = 0
        self.hit_latencies = []
        self.miss_latencies = []
    
    def record_hit(self, latency: float):
        self.hits += 1
        self.hit_latencies.append(latency)
    
    def record_miss(self, latency: float):
        self.misses += 1
        self.miss_latencies.append(latency)
    
    def stats(self) -> dict:
        total = self.hits + self.misses
        return {
            "total_requests": total,
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": self.hits / total if total > 0 else 0,
            "avg_hit_latency_ms": sum(self.hit_latencies) / len(self.hit_latencies) * 1000 if self.hit_latencies else 0,
            "avg_miss_latency_ms": sum(self.miss_latencies) / len(self.miss_latencies) * 1000 if self.miss_latencies else 0
        }

metrics = CacheMetrics()

# Mock cache for demo
class MockCache:
    def __init__(self):
        self.data = {}
    
    def get(self, query):
        return self.data.get(query)
    
    def set(self, query, response, metadata=None):
        self.data[query] = response

cache = MockCache()

@app.post("/v1/cache/get")
async def get_cached(request: CacheRequest) -> CacheResponse:
    """Get from cache."""
    
    start = time.time()
    
    response = cache.get(request.query)
    
    latency = time.time() - start
    
    if response:
        metrics.record_hit(latency)
        return CacheResponse(
            response=response,
            cache_hit=True,
            cache_level="semantic",
            latency_ms=latency * 1000
        )
    
    metrics.record_miss(latency)
    return CacheResponse(
        response=None,
        cache_hit=False,
        cache_level=None,
        latency_ms=latency * 1000
    )

@app.post("/v1/cache/set")
async def set_cached(request: SetCacheRequest) -> dict:
    """Set in cache."""
    
    cache.set(request.query, request.response, request.metadata)
    
    return {"status": "cached"}

@app.get("/v1/cache/stats")
async def get_stats() -> dict:
    """Get cache statistics."""
    return metrics.stats()

@app.delete("/v1/cache")
async def clear_cache() -> dict:
    """Clear cache."""
    
    cache.data = {}
    return {"status": "cleared"}

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Semantic caching transforms LLM economics by recognizing that users often ask similar questions in different ways. The similarity threshold is the critical parameter—too high and you miss valid cache hits, too low and you return irrelevant responses. Start with 0.95 and adjust based on your domain; specialized domains can often use lower thresholds because the query space is more constrained. Hybrid caching combining exact and semantic matching gives you the best of both worlds—instant responses for repeated queries and intelligent matching for paraphrases. Multi-tier caching with in-memory, Redis, and vector store layers balances speed and capacity. Cache invalidation is as important as caching itself; time-based TTL works for most cases, but content-aware invalidation is essential when underlying data changes. Monitor your hit rate and latency distributions—a well-tuned cache should achieve 30-50% hit rates for typical applications. The key insight is that semantic caching is not just about saving money—it's about providing faster, more consistent responses to your users while reducing load on your LLM infrastructure.