Latest Articles

LLM Latency Optimization: Techniques for Sub-Second Response Times

Introduction: LLM latency is the silent killer of user experience. Even the most accurate model becomes frustrating when users wait seconds for each response. The challenge is that LLM inference is inherently slow—autoregressive generation means each token depends on all previous tokens. This guide covers practical techniques for reducing perceived and actual latency: streaming responses to show progress immediately, semantic caching to skip inference entirely for similar queries, prompt optimization to reduce input tokens, model selection strategies that balance speed and quality, and infrastructure optimizations like batching and GPU utilization. Whether you’re building a chatbot, code assistant, or real-time application, these techniques will help you deliver snappy responses without sacrificing quality.

LLM Latency Optimization
Latency Optimization: Semantic Caching, Response Streaming, Speculative Decoding

Response Streaming

from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator
import asyncio
from abc import ABC, abstractmethod

@dataclass
class StreamChunk:
    """A chunk of streamed response."""
    
    content: str
    is_final: bool = False
    latency_ms: float = 0.0
    token_count: int = 0

class StreamingClient(ABC):
    """Abstract streaming LLM client."""
    
    @abstractmethod
    async def stream(
        self,
        prompt: str,
        **kwargs
    ) -> AsyncIterator[StreamChunk]:
        """Stream response tokens."""
        pass

class OpenAIStreamingClient(StreamingClient):
    """OpenAI streaming client."""
    
    def __init__(self, api_key: str, model: str = "gpt-4"):
        import openai
        self.client = openai.AsyncOpenAI(api_key=api_key)
        self.model = model
    
    async def stream(
        self,
        prompt: str,
        **kwargs
    ) -> AsyncIterator[StreamChunk]:
        """Stream from OpenAI."""
        
        import time
        start_time = time.time()
        token_count = 0
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            **kwargs
        )
        
        async for chunk in response:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                token_count += 1
                
                yield StreamChunk(
                    content=content,
                    is_final=False,
                    latency_ms=(time.time() - start_time) * 1000,
                    token_count=token_count
                )
        
        yield StreamChunk(
            content="",
            is_final=True,
            latency_ms=(time.time() - start_time) * 1000,
            token_count=token_count
        )

class StreamBuffer:
    """Buffer for accumulating streamed content."""
    
    def __init__(self):
        self.chunks: list[str] = []
        self.total_tokens = 0
        self.first_token_latency = None
        self.total_latency = None
    
    def add_chunk(self, chunk: StreamChunk):
        """Add chunk to buffer."""
        
        if chunk.content:
            self.chunks.append(chunk.content)
            self.total_tokens = chunk.token_count
            
            if self.first_token_latency is None:
                self.first_token_latency = chunk.latency_ms
        
        if chunk.is_final:
            self.total_latency = chunk.latency_ms
    
    def get_content(self) -> str:
        """Get accumulated content."""
        return "".join(self.chunks)
    
    def get_metrics(self) -> dict:
        """Get streaming metrics."""
        
        return {
            "first_token_latency_ms": self.first_token_latency,
            "total_latency_ms": self.total_latency,
            "total_tokens": self.total_tokens,
            "tokens_per_second": (
                self.total_tokens / (self.total_latency / 1000)
                if self.total_latency else 0
            )
        }

class StreamingResponseHandler:
    """Handle streaming responses with callbacks."""
    
    def __init__(
        self,
        on_token: callable = None,
        on_sentence: callable = None,
        on_complete: callable = None
    ):
        self.on_token = on_token
        self.on_sentence = on_sentence
        self.on_complete = on_complete
        
        self.buffer = StreamBuffer()
        self.sentence_buffer = ""
    
    async def handle_stream(
        self,
        stream: AsyncIterator[StreamChunk]
    ) -> str:
        """Process stream with callbacks."""
        
        async for chunk in stream:
            self.buffer.add_chunk(chunk)
            
            if chunk.content:
                # Token callback
                if self.on_token:
                    await self._call_async(self.on_token, chunk.content)
                
                # Sentence detection
                self.sentence_buffer += chunk.content
                
                if self._ends_sentence(self.sentence_buffer):
                    if self.on_sentence:
                        await self._call_async(self.on_sentence, self.sentence_buffer)
                    self.sentence_buffer = ""
            
            if chunk.is_final:
                # Flush remaining sentence buffer
                if self.sentence_buffer and self.on_sentence:
                    await self._call_async(self.on_sentence, self.sentence_buffer)
                
                if self.on_complete:
                    await self._call_async(
                        self.on_complete,
                        self.buffer.get_content(),
                        self.buffer.get_metrics()
                    )
        
        return self.buffer.get_content()
    
    def _ends_sentence(self, text: str) -> bool:
        """Check if text ends with sentence terminator."""
        
        terminators = ['. ', '! ', '? ', '.\n', '!\n', '?\n']
        return any(text.endswith(t) for t in terminators)
    
    async def _call_async(self, func: callable, *args):
        """Call function, handling both sync and async."""
        
        result = func(*args)
        if asyncio.iscoroutine(result):
            await result

Semantic Caching

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime, timedelta
import hashlib
import numpy as np

@dataclass
class CacheEntry:
    """Cache entry for LLM response."""
    
    query: str
    response: str
    embedding: np.ndarray = None
    created_at: datetime = field(default_factory=datetime.now)
    hit_count: int = 0
    metadata: dict = field(default_factory=dict)

class ExactCache:
    """Exact match cache."""
    
    def __init__(self, max_size: int = 10000, ttl_hours: int = 24):
        self.max_size = max_size
        self.ttl = timedelta(hours=ttl_hours)
        self.cache: dict[str, CacheEntry] = {}
    
    def _hash_query(self, query: str) -> str:
        """Create cache key from query."""
        
        normalized = query.lower().strip()
        return hashlib.sha256(normalized.encode()).hexdigest()
    
    def get(self, query: str) -> Optional[str]:
        """Get cached response."""
        
        key = self._hash_query(query)
        
        if key in self.cache:
            entry = self.cache[key]
            
            # Check TTL
            if datetime.now() - entry.created_at < self.ttl:
                entry.hit_count += 1
                return entry.response
            else:
                del self.cache[key]
        
        return None
    
    def set(self, query: str, response: str):
        """Cache response."""
        
        # Evict if at capacity
        if len(self.cache) >= self.max_size:
            self._evict_oldest()
        
        key = self._hash_query(query)
        self.cache[key] = CacheEntry(query=query, response=response)
    
    def _evict_oldest(self):
        """Evict oldest entry."""
        
        if not self.cache:
            return
        
        oldest_key = min(
            self.cache.keys(),
            key=lambda k: self.cache[k].created_at
        )
        del self.cache[oldest_key]

class SemanticCache:
    """Semantic similarity-based cache."""
    
    def __init__(
        self,
        embedding_model: Any,
        similarity_threshold: float = 0.95,
        max_size: int = 10000
    ):
        self.embedder = embedding_model
        self.threshold = similarity_threshold
        self.max_size = max_size
        
        self.entries: list[CacheEntry] = []
        self.embeddings: np.ndarray = None
    
    async def get(self, query: str) -> Optional[str]:
        """Get semantically similar cached response."""
        
        if not self.entries:
            return None
        
        # Embed query
        query_embedding = await self.embedder.embed(query)
        
        # Find most similar
        similarities = np.dot(self.embeddings, query_embedding)
        max_idx = np.argmax(similarities)
        max_sim = similarities[max_idx]
        
        if max_sim >= self.threshold:
            entry = self.entries[max_idx]
            entry.hit_count += 1
            return entry.response
        
        return None
    
    async def set(self, query: str, response: str):
        """Cache response with embedding."""
        
        # Embed query
        embedding = await self.embedder.embed(query)
        
        # Evict if at capacity
        if len(self.entries) >= self.max_size:
            self._evict_lru()
        
        # Add entry
        entry = CacheEntry(
            query=query,
            response=response,
            embedding=embedding
        )
        self.entries.append(entry)
        
        # Update embedding matrix
        if self.embeddings is None:
            self.embeddings = embedding.reshape(1, -1)
        else:
            self.embeddings = np.vstack([self.embeddings, embedding])
    
    def _evict_lru(self):
        """Evict least recently used entry."""
        
        if not self.entries:
            return
        
        # Find entry with lowest hit count
        min_idx = min(
            range(len(self.entries)),
            key=lambda i: self.entries[i].hit_count
        )
        
        del self.entries[min_idx]
        self.embeddings = np.delete(self.embeddings, min_idx, axis=0)

class HybridCache:
    """Combine exact and semantic caching."""
    
    def __init__(
        self,
        embedding_model: Any,
        exact_cache_size: int = 10000,
        semantic_cache_size: int = 5000,
        similarity_threshold: float = 0.95
    ):
        self.exact_cache = ExactCache(max_size=exact_cache_size)
        self.semantic_cache = SemanticCache(
            embedding_model=embedding_model,
            similarity_threshold=similarity_threshold,
            max_size=semantic_cache_size
        )
    
    async def get(self, query: str) -> Optional[tuple[str, str]]:
        """Get cached response, returning (response, cache_type)."""
        
        # Try exact match first (faster)
        exact_result = self.exact_cache.get(query)
        if exact_result:
            return exact_result, "exact"
        
        # Try semantic match
        semantic_result = await self.semantic_cache.get(query)
        if semantic_result:
            return semantic_result, "semantic"
        
        return None, None
    
    async def set(self, query: str, response: str):
        """Cache in both caches."""
        
        self.exact_cache.set(query, response)
        await self.semantic_cache.set(query, response)

class CachedLLMClient:
    """LLM client with caching."""
    
    def __init__(
        self,
        llm_client: Any,
        cache: HybridCache
    ):
        self.llm = llm_client
        self.cache = cache
        
        self.stats = {
            "hits": 0,
            "misses": 0,
            "exact_hits": 0,
            "semantic_hits": 0
        }
    
    async def complete(self, prompt: str, **kwargs) -> tuple[str, dict]:
        """Complete with caching."""
        
        # Check cache
        cached, cache_type = await self.cache.get(prompt)
        
        if cached:
            self.stats["hits"] += 1
            if cache_type == "exact":
                self.stats["exact_hits"] += 1
            else:
                self.stats["semantic_hits"] += 1
            
            return cached, {"cached": True, "cache_type": cache_type}
        
        # Cache miss - call LLM
        self.stats["misses"] += 1
        
        response = await self.llm.complete(prompt, **kwargs)
        
        # Cache response
        await self.cache.set(prompt, response.content)
        
        return response.content, {"cached": False}
    
    def get_stats(self) -> dict:
        """Get cache statistics."""
        
        total = self.stats["hits"] + self.stats["misses"]
        
        return {
            **self.stats,
            "hit_rate": self.stats["hits"] / max(total, 1),
            "total_requests": total
        }

Prompt Optimization

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class OptimizedPrompt:
    """Optimized prompt result."""
    
    original: str
    optimized: str
    original_tokens: int
    optimized_tokens: int
    reduction_percent: float

class PromptOptimizer:
    """Optimize prompts for lower latency."""
    
    def __init__(self, token_counter: Any):
        self.counter = token_counter
    
    def optimize(self, prompt: str) -> OptimizedPrompt:
        """Apply all optimizations."""
        
        original_tokens = self.counter.count(prompt)
        
        optimized = prompt
        optimized = self._remove_redundancy(optimized)
        optimized = self._compress_examples(optimized)
        optimized = self._simplify_instructions(optimized)
        
        optimized_tokens = self.counter.count(optimized)
        
        return OptimizedPrompt(
            original=prompt,
            optimized=optimized,
            original_tokens=original_tokens,
            optimized_tokens=optimized_tokens,
            reduction_percent=(
                (original_tokens - optimized_tokens) / original_tokens * 100
            )
        )
    
    def _remove_redundancy(self, prompt: str) -> str:
        """Remove redundant phrases."""
        
        redundant_phrases = [
            "Please note that",
            "It's important to remember that",
            "Keep in mind that",
            "As mentioned earlier",
            "As I said before",
            "In other words",
        ]
        
        result = prompt
        for phrase in redundant_phrases:
            result = result.replace(phrase, "")
        
        # Remove multiple spaces
        import re
        result = re.sub(r'\s+', ' ', result)
        
        return result.strip()
    
    def _compress_examples(self, prompt: str) -> str:
        """Compress verbose examples."""
        
        # Reduce example verbosity
        import re
        
        # Find example blocks
        example_pattern = r'Example \d+:.*?(?=Example \d+:|$)'
        
        def compress_example(match):
            example = match.group(0)
            # Keep first 200 chars of each example
            if len(example) > 250:
                return example[:200] + "..."
            return example
        
        return re.sub(example_pattern, compress_example, prompt, flags=re.DOTALL)
    
    def _simplify_instructions(self, prompt: str) -> str:
        """Simplify verbose instructions."""
        
        simplifications = {
            "You are an AI assistant that helps users": "You help users",
            "Please provide a detailed response": "Respond in detail",
            "Make sure to include": "Include",
            "It would be helpful if you could": "Please",
            "I would like you to": "Please",
        }
        
        result = prompt
        for verbose, simple in simplifications.items():
            result = result.replace(verbose, simple)
        
        return result

class DynamicPromptSelector:
    """Select prompt based on query complexity."""
    
    def __init__(self, prompts: dict[str, str]):
        self.prompts = prompts  # complexity -> prompt template
    
    def select(self, query: str) -> str:
        """Select appropriate prompt for query."""
        
        complexity = self._estimate_complexity(query)
        
        if complexity == "simple":
            return self.prompts.get("simple", self.prompts["default"])
        elif complexity == "complex":
            return self.prompts.get("complex", self.prompts["default"])
        else:
            return self.prompts["default"]
    
    def _estimate_complexity(self, query: str) -> str:
        """Estimate query complexity."""
        
        # Simple heuristics
        word_count = len(query.split())
        
        complex_indicators = [
            "compare", "analyze", "explain why",
            "step by step", "detailed", "comprehensive"
        ]
        
        has_complex_indicator = any(
            ind in query.lower() for ind in complex_indicators
        )
        
        if word_count < 10 and not has_complex_indicator:
            return "simple"
        elif word_count > 50 or has_complex_indicator:
            return "complex"
        else:
            return "medium"

class ContextWindowOptimizer:
    """Optimize context window usage."""
    
    def __init__(self, max_tokens: int, token_counter: Any):
        self.max_tokens = max_tokens
        self.counter = token_counter
    
    def optimize_context(
        self,
        system_prompt: str,
        context: str,
        query: str,
        reserved_for_output: int = 1000
    ) -> tuple[str, str, str]:
        """Optimize to fit in context window."""
        
        available = self.max_tokens - reserved_for_output
        
        # Count fixed tokens
        system_tokens = self.counter.count(system_prompt)
        query_tokens = self.counter.count(query)
        
        fixed_tokens = system_tokens + query_tokens
        context_budget = available - fixed_tokens
        
        if context_budget <= 0:
            # Need to truncate system prompt
            system_prompt = self._truncate(system_prompt, available // 2)
            return system_prompt, "", query
        
        # Truncate context if needed
        context_tokens = self.counter.count(context)
        
        if context_tokens > context_budget:
            context = self._truncate(context, context_budget)
        
        return system_prompt, context, query
    
    def _truncate(self, text: str, max_tokens: int) -> str:
        """Truncate text to token limit."""
        
        words = text.split()
        
        # Binary search for cutoff
        low, high = 0, len(words)
        
        while low < high:
            mid = (low + high + 1) // 2
            candidate = " ".join(words[:mid])
            
            if self.counter.count(candidate) <= max_tokens:
                low = mid
            else:
                high = mid - 1
        
        return " ".join(words[:low]) + "..."

Model Selection and Routing

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class ModelTier(Enum):
    """Model performance tiers."""
    
    FAST = "fast"       # GPT-3.5, Claude Instant
    BALANCED = "balanced"  # GPT-4-turbo, Claude 3 Sonnet
    QUALITY = "quality"    # GPT-4, Claude 3 Opus

@dataclass
class ModelConfig:
    """Model configuration."""
    
    name: str
    tier: ModelTier
    avg_latency_ms: float
    cost_per_1k_tokens: float
    max_tokens: int

class LatencyAwareRouter:
    """Route to fastest suitable model."""
    
    def __init__(self, models: list[ModelConfig]):
        self.models = {m.name: m for m in models}
        self.latency_history: dict[str, list[float]] = {
            m.name: [] for m in models
        }
    
    def select_model(
        self,
        query: str,
        max_latency_ms: float = None,
        min_tier: ModelTier = ModelTier.FAST
    ) -> str:
        """Select model based on latency requirements."""
        
        # Filter by tier
        candidates = [
            m for m in self.models.values()
            if m.tier.value >= min_tier.value
        ]
        
        # Filter by latency
        if max_latency_ms:
            candidates = [
                m for m in candidates
                if self._get_avg_latency(m.name) <= max_latency_ms
            ]
        
        if not candidates:
            # Fallback to fastest
            return min(self.models.values(), key=lambda m: m.avg_latency_ms).name
        
        # Select fastest among candidates
        return min(candidates, key=lambda m: self._get_avg_latency(m.name)).name
    
    def _get_avg_latency(self, model_name: str) -> float:
        """Get average latency for model."""
        
        history = self.latency_history.get(model_name, [])
        
        if history:
            return sum(history[-100:]) / len(history[-100:])
        
        return self.models[model_name].avg_latency_ms
    
    def record_latency(self, model_name: str, latency_ms: float):
        """Record observed latency."""
        
        if model_name in self.latency_history:
            self.latency_history[model_name].append(latency_ms)
            
            # Keep last 1000 observations
            if len(self.latency_history[model_name]) > 1000:
                self.latency_history[model_name] = self.latency_history[model_name][-1000:]

class AdaptiveModelSelector:
    """Adaptively select model based on query."""
    
    def __init__(
        self,
        fast_model: str,
        quality_model: str,
        classifier: Any = None
    ):
        self.fast_model = fast_model
        self.quality_model = quality_model
        self.classifier = classifier
    
    async def select(self, query: str) -> str:
        """Select model based on query complexity."""
        
        if self.classifier:
            complexity = await self.classifier.classify(query)
        else:
            complexity = self._heuristic_complexity(query)
        
        if complexity == "simple":
            return self.fast_model
        else:
            return self.quality_model
    
    def _heuristic_complexity(self, query: str) -> str:
        """Heuristic complexity estimation."""
        
        # Simple queries
        simple_patterns = [
            "what is", "who is", "when did",
            "define", "list", "name"
        ]
        
        query_lower = query.lower()
        
        if any(p in query_lower for p in simple_patterns):
            if len(query.split()) < 15:
                return "simple"
        
        # Complex queries
        complex_patterns = [
            "explain", "analyze", "compare",
            "why", "how does", "what are the implications"
        ]
        
        if any(p in query_lower for p in complex_patterns):
            return "complex"
        
        # Default based on length
        return "simple" if len(query.split()) < 20 else "complex"

class SpeculativeExecutor:
    """Speculatively execute on fast model, verify with quality model."""
    
    def __init__(
        self,
        fast_client: Any,
        quality_client: Any,
        verifier: Any = None
    ):
        self.fast = fast_client
        self.quality = quality_client
        self.verifier = verifier
    
    async def execute(self, prompt: str) -> tuple[str, dict]:
        """Execute with speculation."""
        
        import asyncio
        
        # Start fast model immediately
        fast_task = asyncio.create_task(self.fast.complete(prompt))
        
        # Get fast response
        fast_response = await fast_task
        
        # Verify if needed
        if self.verifier:
            is_valid = await self.verifier.verify(prompt, fast_response.content)
            
            if is_valid:
                return fast_response.content, {"model": "fast", "verified": True}
            
            # Fall back to quality model
            quality_response = await self.quality.complete(prompt)
            return quality_response.content, {"model": "quality", "verified": False}
        
        return fast_response.content, {"model": "fast"}

Infrastructure Optimization

from dataclasses import dataclass
from typing import Any, Optional
import asyncio
from collections import deque
import time

@dataclass
class BatchedRequest:
    """Request in batch queue."""
    
    prompt: str
    future: asyncio.Future
    created_at: float = None
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = time.time()

class DynamicBatcher:
    """Dynamically batch requests for efficiency."""
    
    def __init__(
        self,
        llm_client: Any,
        max_batch_size: int = 8,
        max_wait_ms: float = 50
    ):
        self.llm = llm_client
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        
        self.queue: deque[BatchedRequest] = deque()
        self.processing = False
        self._lock = asyncio.Lock()
    
    async def complete(self, prompt: str) -> str:
        """Add request to batch queue."""
        
        future = asyncio.Future()
        request = BatchedRequest(prompt=prompt, future=future)
        
        async with self._lock:
            self.queue.append(request)
            
            if not self.processing:
                self.processing = True
                asyncio.create_task(self._process_batches())
        
        return await future
    
    async def _process_batches(self):
        """Process batches from queue."""
        
        while True:
            async with self._lock:
                if not self.queue:
                    self.processing = False
                    return
                
                # Collect batch
                batch = []
                while self.queue and len(batch) < self.max_batch_size:
                    batch.append(self.queue.popleft())
            
            # Wait for more requests if batch is small
            if len(batch) < self.max_batch_size:
                await asyncio.sleep(self.max_wait_ms / 1000)
                
                async with self._lock:
                    while self.queue and len(batch) < self.max_batch_size:
                        batch.append(self.queue.popleft())
            
            # Process batch
            await self._execute_batch(batch)
    
    async def _execute_batch(self, batch: list[BatchedRequest]):
        """Execute batch of requests."""
        
        prompts = [r.prompt for r in batch]
        
        try:
            responses = await self.llm.complete_batch(prompts)
            
            for request, response in zip(batch, responses):
                request.future.set_result(response)
        except Exception as e:
            for request in batch:
                request.future.set_exception(e)

class ConnectionPool:
    """Pool of LLM client connections."""
    
    def __init__(
        self,
        client_factory: callable,
        pool_size: int = 10
    ):
        self.factory = client_factory
        self.pool_size = pool_size
        
        self.available: asyncio.Queue = asyncio.Queue()
        self.in_use = 0
        self._initialized = False
    
    async def initialize(self):
        """Initialize connection pool."""
        
        if self._initialized:
            return
        
        for _ in range(self.pool_size):
            client = self.factory()
            await self.available.put(client)
        
        self._initialized = True
    
    async def acquire(self) -> Any:
        """Acquire client from pool."""
        
        await self.initialize()
        
        client = await self.available.get()
        self.in_use += 1
        
        return client
    
    async def release(self, client: Any):
        """Release client back to pool."""
        
        self.in_use -= 1
        await self.available.put(client)
    
    async def execute(self, func: callable, *args, **kwargs):
        """Execute function with pooled client."""
        
        client = await self.acquire()
        
        try:
            return await func(client, *args, **kwargs)
        finally:
            await self.release(client)

class RequestDeduplicator:
    """Deduplicate concurrent identical requests."""
    
    def __init__(self):
        self.pending: dict[str, asyncio.Future] = {}
        self._lock = asyncio.Lock()
    
    async def execute(
        self,
        key: str,
        func: callable,
        *args,
        **kwargs
    ) -> Any:
        """Execute with deduplication."""
        
        async with self._lock:
            if key in self.pending:
                # Wait for existing request
                return await self.pending[key]
            
            # Create new future
            future = asyncio.Future()
            self.pending[key] = future
        
        try:
            result = await func(*args, **kwargs)
            future.set_result(result)
            return result
        except Exception as e:
            future.set_exception(e)
            raise
        finally:
            async with self._lock:
                del self.pending[key]

class PrewarmingClient:
    """Client that prewarms connections."""
    
    def __init__(self, llm_client: Any, prewarm_prompts: list[str] = None):
        self.llm = llm_client
        self.prewarm_prompts = prewarm_prompts or ["Hello"]
        self.warmed = False
    
    async def warm(self):
        """Prewarm the client."""
        
        if self.warmed:
            return
        
        # Send lightweight requests to warm up
        for prompt in self.prewarm_prompts:
            try:
                await self.llm.complete(prompt, max_tokens=1)
            except Exception:
                pass
        
        self.warmed = True
    
    async def complete(self, prompt: str, **kwargs) -> Any:
        """Complete with automatic warming."""
        
        if not self.warmed:
            await self.warm()
        
        return await self.llm.complete(prompt, **kwargs)

Production Latency Service

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import asyncio
import json

app = FastAPI()

class CompletionRequest(BaseModel):
    prompt: str
    stream: bool = False
    use_cache: bool = True
    max_latency_ms: Optional[float] = None

class BatchRequest(BaseModel):
    prompts: list[str]

# Initialize components (placeholders)
class MockLLM:
    async def complete(self, prompt: str, **kwargs):
        class Response:
            content = f"Response to: {prompt[:50]}"
        await asyncio.sleep(0.1)
        return Response()
    
    async def complete_batch(self, prompts: list[str]):
        return [f"Response to: {p[:50]}" for p in prompts]

class MockEmbedder:
    async def embed(self, text: str):
        import numpy as np
        return np.random.randn(384)

llm = MockLLM()
embedder = MockEmbedder()
cache = HybridCache(embedder)
batcher = DynamicBatcher(llm)

@app.post("/v1/completions")
async def complete(request: CompletionRequest) -> dict:
    """Complete with latency optimizations."""
    
    import time
    start = time.time()
    
    # Check cache
    if request.use_cache:
        cached, cache_type = await cache.get(request.prompt)
        if cached:
            return {
                "response": cached,
                "cached": True,
                "cache_type": cache_type,
                "latency_ms": (time.time() - start) * 1000
            }
    
    # Execute
    response = await llm.complete(request.prompt)
    
    # Cache result
    if request.use_cache:
        await cache.set(request.prompt, response.content)
    
    return {
        "response": response.content,
        "cached": False,
        "latency_ms": (time.time() - start) * 1000
    }

@app.post("/v1/completions/stream")
async def stream_completion(request: CompletionRequest):
    """Stream completion response."""
    
    async def generate():
        # Simulate streaming
        response = f"Response to: {request.prompt[:50]}"
        
        for char in response:
            yield f"data: {json.dumps({'content': char})}\n\n"
            await asyncio.sleep(0.01)
        
        yield f"data: {json.dumps({'done': True})}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

@app.post("/v1/completions/batch")
async def batch_complete(request: BatchRequest) -> dict:
    """Batch completion."""
    
    import time
    start = time.time()
    
    responses = await asyncio.gather(*[
        batcher.complete(prompt) for prompt in request.prompts
    ])
    
    return {
        "responses": responses,
        "count": len(responses),
        "latency_ms": (time.time() - start) * 1000
    }

@app.get("/v1/cache/stats")
async def cache_stats() -> dict:
    """Get cache statistics."""
    
    return {
        "exact_cache_size": len(cache.exact_cache.cache),
        "semantic_cache_size": len(cache.semantic_cache.entries)
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

LLM latency optimization requires attacking the problem from multiple angles. Start with streaming—it doesn't reduce total latency but dramatically improves perceived responsiveness by showing users progress immediately. Implement semantic caching for queries that are similar to previous ones; even a 10% cache hit rate can significantly reduce average latency and costs. Optimize your prompts to reduce input tokens—every token saved is latency saved. Use model routing to send simple queries to faster models while reserving expensive models for complex tasks. At the infrastructure level, batch requests when possible, maintain connection pools, and deduplicate concurrent identical requests. Monitor your latency distribution, not just averages—p99 latency often matters more than mean latency for user experience. Consider prewarming connections during low-traffic periods. The key insight is that latency optimization is about the entire pipeline, not just model inference. A well-optimized system with caching, routing, and streaming can feel 10x faster than a naive implementation, even with the same underlying models.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion