Introduction: LLM latency is the silent killer of user experience. Even the most accurate model becomes frustrating when users wait seconds for each response. The challenge is that LLM inference is inherently slow—autoregressive generation means each token depends on all previous tokens. This guide covers practical techniques for reducing perceived and actual latency: streaming responses to show progress immediately, semantic caching to skip inference entirely for similar queries, prompt optimization to reduce input tokens, model selection strategies that balance speed and quality, and infrastructure optimizations like batching and GPU utilization. Whether you’re building a chatbot, code assistant, or real-time application, these techniques will help you deliver snappy responses without sacrificing quality.

Response Streaming
from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator
import asyncio
from abc import ABC, abstractmethod
@dataclass
class StreamChunk:
"""A chunk of streamed response."""
content: str
is_final: bool = False
latency_ms: float = 0.0
token_count: int = 0
class StreamingClient(ABC):
"""Abstract streaming LLM client."""
@abstractmethod
async def stream(
self,
prompt: str,
**kwargs
) -> AsyncIterator[StreamChunk]:
"""Stream response tokens."""
pass
class OpenAIStreamingClient(StreamingClient):
"""OpenAI streaming client."""
def __init__(self, api_key: str, model: str = "gpt-4"):
import openai
self.client = openai.AsyncOpenAI(api_key=api_key)
self.model = model
async def stream(
self,
prompt: str,
**kwargs
) -> AsyncIterator[StreamChunk]:
"""Stream from OpenAI."""
import time
start_time = time.time()
token_count = 0
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
stream=True,
**kwargs
)
async for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
token_count += 1
yield StreamChunk(
content=content,
is_final=False,
latency_ms=(time.time() - start_time) * 1000,
token_count=token_count
)
yield StreamChunk(
content="",
is_final=True,
latency_ms=(time.time() - start_time) * 1000,
token_count=token_count
)
class StreamBuffer:
"""Buffer for accumulating streamed content."""
def __init__(self):
self.chunks: list[str] = []
self.total_tokens = 0
self.first_token_latency = None
self.total_latency = None
def add_chunk(self, chunk: StreamChunk):
"""Add chunk to buffer."""
if chunk.content:
self.chunks.append(chunk.content)
self.total_tokens = chunk.token_count
if self.first_token_latency is None:
self.first_token_latency = chunk.latency_ms
if chunk.is_final:
self.total_latency = chunk.latency_ms
def get_content(self) -> str:
"""Get accumulated content."""
return "".join(self.chunks)
def get_metrics(self) -> dict:
"""Get streaming metrics."""
return {
"first_token_latency_ms": self.first_token_latency,
"total_latency_ms": self.total_latency,
"total_tokens": self.total_tokens,
"tokens_per_second": (
self.total_tokens / (self.total_latency / 1000)
if self.total_latency else 0
)
}
class StreamingResponseHandler:
"""Handle streaming responses with callbacks."""
def __init__(
self,
on_token: callable = None,
on_sentence: callable = None,
on_complete: callable = None
):
self.on_token = on_token
self.on_sentence = on_sentence
self.on_complete = on_complete
self.buffer = StreamBuffer()
self.sentence_buffer = ""
async def handle_stream(
self,
stream: AsyncIterator[StreamChunk]
) -> str:
"""Process stream with callbacks."""
async for chunk in stream:
self.buffer.add_chunk(chunk)
if chunk.content:
# Token callback
if self.on_token:
await self._call_async(self.on_token, chunk.content)
# Sentence detection
self.sentence_buffer += chunk.content
if self._ends_sentence(self.sentence_buffer):
if self.on_sentence:
await self._call_async(self.on_sentence, self.sentence_buffer)
self.sentence_buffer = ""
if chunk.is_final:
# Flush remaining sentence buffer
if self.sentence_buffer and self.on_sentence:
await self._call_async(self.on_sentence, self.sentence_buffer)
if self.on_complete:
await self._call_async(
self.on_complete,
self.buffer.get_content(),
self.buffer.get_metrics()
)
return self.buffer.get_content()
def _ends_sentence(self, text: str) -> bool:
"""Check if text ends with sentence terminator."""
terminators = ['. ', '! ', '? ', '.\n', '!\n', '?\n']
return any(text.endswith(t) for t in terminators)
async def _call_async(self, func: callable, *args):
"""Call function, handling both sync and async."""
result = func(*args)
if asyncio.iscoroutine(result):
await result
Semantic Caching
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime, timedelta
import hashlib
import numpy as np
@dataclass
class CacheEntry:
"""Cache entry for LLM response."""
query: str
response: str
embedding: np.ndarray = None
created_at: datetime = field(default_factory=datetime.now)
hit_count: int = 0
metadata: dict = field(default_factory=dict)
class ExactCache:
"""Exact match cache."""
def __init__(self, max_size: int = 10000, ttl_hours: int = 24):
self.max_size = max_size
self.ttl = timedelta(hours=ttl_hours)
self.cache: dict[str, CacheEntry] = {}
def _hash_query(self, query: str) -> str:
"""Create cache key from query."""
normalized = query.lower().strip()
return hashlib.sha256(normalized.encode()).hexdigest()
def get(self, query: str) -> Optional[str]:
"""Get cached response."""
key = self._hash_query(query)
if key in self.cache:
entry = self.cache[key]
# Check TTL
if datetime.now() - entry.created_at < self.ttl:
entry.hit_count += 1
return entry.response
else:
del self.cache[key]
return None
def set(self, query: str, response: str):
"""Cache response."""
# Evict if at capacity
if len(self.cache) >= self.max_size:
self._evict_oldest()
key = self._hash_query(query)
self.cache[key] = CacheEntry(query=query, response=response)
def _evict_oldest(self):
"""Evict oldest entry."""
if not self.cache:
return
oldest_key = min(
self.cache.keys(),
key=lambda k: self.cache[k].created_at
)
del self.cache[oldest_key]
class SemanticCache:
"""Semantic similarity-based cache."""
def __init__(
self,
embedding_model: Any,
similarity_threshold: float = 0.95,
max_size: int = 10000
):
self.embedder = embedding_model
self.threshold = similarity_threshold
self.max_size = max_size
self.entries: list[CacheEntry] = []
self.embeddings: np.ndarray = None
async def get(self, query: str) -> Optional[str]:
"""Get semantically similar cached response."""
if not self.entries:
return None
# Embed query
query_embedding = await self.embedder.embed(query)
# Find most similar
similarities = np.dot(self.embeddings, query_embedding)
max_idx = np.argmax(similarities)
max_sim = similarities[max_idx]
if max_sim >= self.threshold:
entry = self.entries[max_idx]
entry.hit_count += 1
return entry.response
return None
async def set(self, query: str, response: str):
"""Cache response with embedding."""
# Embed query
embedding = await self.embedder.embed(query)
# Evict if at capacity
if len(self.entries) >= self.max_size:
self._evict_lru()
# Add entry
entry = CacheEntry(
query=query,
response=response,
embedding=embedding
)
self.entries.append(entry)
# Update embedding matrix
if self.embeddings is None:
self.embeddings = embedding.reshape(1, -1)
else:
self.embeddings = np.vstack([self.embeddings, embedding])
def _evict_lru(self):
"""Evict least recently used entry."""
if not self.entries:
return
# Find entry with lowest hit count
min_idx = min(
range(len(self.entries)),
key=lambda i: self.entries[i].hit_count
)
del self.entries[min_idx]
self.embeddings = np.delete(self.embeddings, min_idx, axis=0)
class HybridCache:
"""Combine exact and semantic caching."""
def __init__(
self,
embedding_model: Any,
exact_cache_size: int = 10000,
semantic_cache_size: int = 5000,
similarity_threshold: float = 0.95
):
self.exact_cache = ExactCache(max_size=exact_cache_size)
self.semantic_cache = SemanticCache(
embedding_model=embedding_model,
similarity_threshold=similarity_threshold,
max_size=semantic_cache_size
)
async def get(self, query: str) -> Optional[tuple[str, str]]:
"""Get cached response, returning (response, cache_type)."""
# Try exact match first (faster)
exact_result = self.exact_cache.get(query)
if exact_result:
return exact_result, "exact"
# Try semantic match
semantic_result = await self.semantic_cache.get(query)
if semantic_result:
return semantic_result, "semantic"
return None, None
async def set(self, query: str, response: str):
"""Cache in both caches."""
self.exact_cache.set(query, response)
await self.semantic_cache.set(query, response)
class CachedLLMClient:
"""LLM client with caching."""
def __init__(
self,
llm_client: Any,
cache: HybridCache
):
self.llm = llm_client
self.cache = cache
self.stats = {
"hits": 0,
"misses": 0,
"exact_hits": 0,
"semantic_hits": 0
}
async def complete(self, prompt: str, **kwargs) -> tuple[str, dict]:
"""Complete with caching."""
# Check cache
cached, cache_type = await self.cache.get(prompt)
if cached:
self.stats["hits"] += 1
if cache_type == "exact":
self.stats["exact_hits"] += 1
else:
self.stats["semantic_hits"] += 1
return cached, {"cached": True, "cache_type": cache_type}
# Cache miss - call LLM
self.stats["misses"] += 1
response = await self.llm.complete(prompt, **kwargs)
# Cache response
await self.cache.set(prompt, response.content)
return response.content, {"cached": False}
def get_stats(self) -> dict:
"""Get cache statistics."""
total = self.stats["hits"] + self.stats["misses"]
return {
**self.stats,
"hit_rate": self.stats["hits"] / max(total, 1),
"total_requests": total
}
Prompt Optimization
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class OptimizedPrompt:
"""Optimized prompt result."""
original: str
optimized: str
original_tokens: int
optimized_tokens: int
reduction_percent: float
class PromptOptimizer:
"""Optimize prompts for lower latency."""
def __init__(self, token_counter: Any):
self.counter = token_counter
def optimize(self, prompt: str) -> OptimizedPrompt:
"""Apply all optimizations."""
original_tokens = self.counter.count(prompt)
optimized = prompt
optimized = self._remove_redundancy(optimized)
optimized = self._compress_examples(optimized)
optimized = self._simplify_instructions(optimized)
optimized_tokens = self.counter.count(optimized)
return OptimizedPrompt(
original=prompt,
optimized=optimized,
original_tokens=original_tokens,
optimized_tokens=optimized_tokens,
reduction_percent=(
(original_tokens - optimized_tokens) / original_tokens * 100
)
)
def _remove_redundancy(self, prompt: str) -> str:
"""Remove redundant phrases."""
redundant_phrases = [
"Please note that",
"It's important to remember that",
"Keep in mind that",
"As mentioned earlier",
"As I said before",
"In other words",
]
result = prompt
for phrase in redundant_phrases:
result = result.replace(phrase, "")
# Remove multiple spaces
import re
result = re.sub(r'\s+', ' ', result)
return result.strip()
def _compress_examples(self, prompt: str) -> str:
"""Compress verbose examples."""
# Reduce example verbosity
import re
# Find example blocks
example_pattern = r'Example \d+:.*?(?=Example \d+:|$)'
def compress_example(match):
example = match.group(0)
# Keep first 200 chars of each example
if len(example) > 250:
return example[:200] + "..."
return example
return re.sub(example_pattern, compress_example, prompt, flags=re.DOTALL)
def _simplify_instructions(self, prompt: str) -> str:
"""Simplify verbose instructions."""
simplifications = {
"You are an AI assistant that helps users": "You help users",
"Please provide a detailed response": "Respond in detail",
"Make sure to include": "Include",
"It would be helpful if you could": "Please",
"I would like you to": "Please",
}
result = prompt
for verbose, simple in simplifications.items():
result = result.replace(verbose, simple)
return result
class DynamicPromptSelector:
"""Select prompt based on query complexity."""
def __init__(self, prompts: dict[str, str]):
self.prompts = prompts # complexity -> prompt template
def select(self, query: str) -> str:
"""Select appropriate prompt for query."""
complexity = self._estimate_complexity(query)
if complexity == "simple":
return self.prompts.get("simple", self.prompts["default"])
elif complexity == "complex":
return self.prompts.get("complex", self.prompts["default"])
else:
return self.prompts["default"]
def _estimate_complexity(self, query: str) -> str:
"""Estimate query complexity."""
# Simple heuristics
word_count = len(query.split())
complex_indicators = [
"compare", "analyze", "explain why",
"step by step", "detailed", "comprehensive"
]
has_complex_indicator = any(
ind in query.lower() for ind in complex_indicators
)
if word_count < 10 and not has_complex_indicator:
return "simple"
elif word_count > 50 or has_complex_indicator:
return "complex"
else:
return "medium"
class ContextWindowOptimizer:
"""Optimize context window usage."""
def __init__(self, max_tokens: int, token_counter: Any):
self.max_tokens = max_tokens
self.counter = token_counter
def optimize_context(
self,
system_prompt: str,
context: str,
query: str,
reserved_for_output: int = 1000
) -> tuple[str, str, str]:
"""Optimize to fit in context window."""
available = self.max_tokens - reserved_for_output
# Count fixed tokens
system_tokens = self.counter.count(system_prompt)
query_tokens = self.counter.count(query)
fixed_tokens = system_tokens + query_tokens
context_budget = available - fixed_tokens
if context_budget <= 0:
# Need to truncate system prompt
system_prompt = self._truncate(system_prompt, available // 2)
return system_prompt, "", query
# Truncate context if needed
context_tokens = self.counter.count(context)
if context_tokens > context_budget:
context = self._truncate(context, context_budget)
return system_prompt, context, query
def _truncate(self, text: str, max_tokens: int) -> str:
"""Truncate text to token limit."""
words = text.split()
# Binary search for cutoff
low, high = 0, len(words)
while low < high:
mid = (low + high + 1) // 2
candidate = " ".join(words[:mid])
if self.counter.count(candidate) <= max_tokens:
low = mid
else:
high = mid - 1
return " ".join(words[:low]) + "..."
Model Selection and Routing
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class ModelTier(Enum):
"""Model performance tiers."""
FAST = "fast" # GPT-3.5, Claude Instant
BALANCED = "balanced" # GPT-4-turbo, Claude 3 Sonnet
QUALITY = "quality" # GPT-4, Claude 3 Opus
@dataclass
class ModelConfig:
"""Model configuration."""
name: str
tier: ModelTier
avg_latency_ms: float
cost_per_1k_tokens: float
max_tokens: int
class LatencyAwareRouter:
"""Route to fastest suitable model."""
def __init__(self, models: list[ModelConfig]):
self.models = {m.name: m for m in models}
self.latency_history: dict[str, list[float]] = {
m.name: [] for m in models
}
def select_model(
self,
query: str,
max_latency_ms: float = None,
min_tier: ModelTier = ModelTier.FAST
) -> str:
"""Select model based on latency requirements."""
# Filter by tier
candidates = [
m for m in self.models.values()
if m.tier.value >= min_tier.value
]
# Filter by latency
if max_latency_ms:
candidates = [
m for m in candidates
if self._get_avg_latency(m.name) <= max_latency_ms
]
if not candidates:
# Fallback to fastest
return min(self.models.values(), key=lambda m: m.avg_latency_ms).name
# Select fastest among candidates
return min(candidates, key=lambda m: self._get_avg_latency(m.name)).name
def _get_avg_latency(self, model_name: str) -> float:
"""Get average latency for model."""
history = self.latency_history.get(model_name, [])
if history:
return sum(history[-100:]) / len(history[-100:])
return self.models[model_name].avg_latency_ms
def record_latency(self, model_name: str, latency_ms: float):
"""Record observed latency."""
if model_name in self.latency_history:
self.latency_history[model_name].append(latency_ms)
# Keep last 1000 observations
if len(self.latency_history[model_name]) > 1000:
self.latency_history[model_name] = self.latency_history[model_name][-1000:]
class AdaptiveModelSelector:
"""Adaptively select model based on query."""
def __init__(
self,
fast_model: str,
quality_model: str,
classifier: Any = None
):
self.fast_model = fast_model
self.quality_model = quality_model
self.classifier = classifier
async def select(self, query: str) -> str:
"""Select model based on query complexity."""
if self.classifier:
complexity = await self.classifier.classify(query)
else:
complexity = self._heuristic_complexity(query)
if complexity == "simple":
return self.fast_model
else:
return self.quality_model
def _heuristic_complexity(self, query: str) -> str:
"""Heuristic complexity estimation."""
# Simple queries
simple_patterns = [
"what is", "who is", "when did",
"define", "list", "name"
]
query_lower = query.lower()
if any(p in query_lower for p in simple_patterns):
if len(query.split()) < 15:
return "simple"
# Complex queries
complex_patterns = [
"explain", "analyze", "compare",
"why", "how does", "what are the implications"
]
if any(p in query_lower for p in complex_patterns):
return "complex"
# Default based on length
return "simple" if len(query.split()) < 20 else "complex"
class SpeculativeExecutor:
"""Speculatively execute on fast model, verify with quality model."""
def __init__(
self,
fast_client: Any,
quality_client: Any,
verifier: Any = None
):
self.fast = fast_client
self.quality = quality_client
self.verifier = verifier
async def execute(self, prompt: str) -> tuple[str, dict]:
"""Execute with speculation."""
import asyncio
# Start fast model immediately
fast_task = asyncio.create_task(self.fast.complete(prompt))
# Get fast response
fast_response = await fast_task
# Verify if needed
if self.verifier:
is_valid = await self.verifier.verify(prompt, fast_response.content)
if is_valid:
return fast_response.content, {"model": "fast", "verified": True}
# Fall back to quality model
quality_response = await self.quality.complete(prompt)
return quality_response.content, {"model": "quality", "verified": False}
return fast_response.content, {"model": "fast"}
Infrastructure Optimization
from dataclasses import dataclass
from typing import Any, Optional
import asyncio
from collections import deque
import time
@dataclass
class BatchedRequest:
"""Request in batch queue."""
prompt: str
future: asyncio.Future
created_at: float = None
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
class DynamicBatcher:
"""Dynamically batch requests for efficiency."""
def __init__(
self,
llm_client: Any,
max_batch_size: int = 8,
max_wait_ms: float = 50
):
self.llm = llm_client
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue: deque[BatchedRequest] = deque()
self.processing = False
self._lock = asyncio.Lock()
async def complete(self, prompt: str) -> str:
"""Add request to batch queue."""
future = asyncio.Future()
request = BatchedRequest(prompt=prompt, future=future)
async with self._lock:
self.queue.append(request)
if not self.processing:
self.processing = True
asyncio.create_task(self._process_batches())
return await future
async def _process_batches(self):
"""Process batches from queue."""
while True:
async with self._lock:
if not self.queue:
self.processing = False
return
# Collect batch
batch = []
while self.queue and len(batch) < self.max_batch_size:
batch.append(self.queue.popleft())
# Wait for more requests if batch is small
if len(batch) < self.max_batch_size:
await asyncio.sleep(self.max_wait_ms / 1000)
async with self._lock:
while self.queue and len(batch) < self.max_batch_size:
batch.append(self.queue.popleft())
# Process batch
await self._execute_batch(batch)
async def _execute_batch(self, batch: list[BatchedRequest]):
"""Execute batch of requests."""
prompts = [r.prompt for r in batch]
try:
responses = await self.llm.complete_batch(prompts)
for request, response in zip(batch, responses):
request.future.set_result(response)
except Exception as e:
for request in batch:
request.future.set_exception(e)
class ConnectionPool:
"""Pool of LLM client connections."""
def __init__(
self,
client_factory: callable,
pool_size: int = 10
):
self.factory = client_factory
self.pool_size = pool_size
self.available: asyncio.Queue = asyncio.Queue()
self.in_use = 0
self._initialized = False
async def initialize(self):
"""Initialize connection pool."""
if self._initialized:
return
for _ in range(self.pool_size):
client = self.factory()
await self.available.put(client)
self._initialized = True
async def acquire(self) -> Any:
"""Acquire client from pool."""
await self.initialize()
client = await self.available.get()
self.in_use += 1
return client
async def release(self, client: Any):
"""Release client back to pool."""
self.in_use -= 1
await self.available.put(client)
async def execute(self, func: callable, *args, **kwargs):
"""Execute function with pooled client."""
client = await self.acquire()
try:
return await func(client, *args, **kwargs)
finally:
await self.release(client)
class RequestDeduplicator:
"""Deduplicate concurrent identical requests."""
def __init__(self):
self.pending: dict[str, asyncio.Future] = {}
self._lock = asyncio.Lock()
async def execute(
self,
key: str,
func: callable,
*args,
**kwargs
) -> Any:
"""Execute with deduplication."""
async with self._lock:
if key in self.pending:
# Wait for existing request
return await self.pending[key]
# Create new future
future = asyncio.Future()
self.pending[key] = future
try:
result = await func(*args, **kwargs)
future.set_result(result)
return result
except Exception as e:
future.set_exception(e)
raise
finally:
async with self._lock:
del self.pending[key]
class PrewarmingClient:
"""Client that prewarms connections."""
def __init__(self, llm_client: Any, prewarm_prompts: list[str] = None):
self.llm = llm_client
self.prewarm_prompts = prewarm_prompts or ["Hello"]
self.warmed = False
async def warm(self):
"""Prewarm the client."""
if self.warmed:
return
# Send lightweight requests to warm up
for prompt in self.prewarm_prompts:
try:
await self.llm.complete(prompt, max_tokens=1)
except Exception:
pass
self.warmed = True
async def complete(self, prompt: str, **kwargs) -> Any:
"""Complete with automatic warming."""
if not self.warmed:
await self.warm()
return await self.llm.complete(prompt, **kwargs)
Production Latency Service
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import asyncio
import json
app = FastAPI()
class CompletionRequest(BaseModel):
prompt: str
stream: bool = False
use_cache: bool = True
max_latency_ms: Optional[float] = None
class BatchRequest(BaseModel):
prompts: list[str]
# Initialize components (placeholders)
class MockLLM:
async def complete(self, prompt: str, **kwargs):
class Response:
content = f"Response to: {prompt[:50]}"
await asyncio.sleep(0.1)
return Response()
async def complete_batch(self, prompts: list[str]):
return [f"Response to: {p[:50]}" for p in prompts]
class MockEmbedder:
async def embed(self, text: str):
import numpy as np
return np.random.randn(384)
llm = MockLLM()
embedder = MockEmbedder()
cache = HybridCache(embedder)
batcher = DynamicBatcher(llm)
@app.post("/v1/completions")
async def complete(request: CompletionRequest) -> dict:
"""Complete with latency optimizations."""
import time
start = time.time()
# Check cache
if request.use_cache:
cached, cache_type = await cache.get(request.prompt)
if cached:
return {
"response": cached,
"cached": True,
"cache_type": cache_type,
"latency_ms": (time.time() - start) * 1000
}
# Execute
response = await llm.complete(request.prompt)
# Cache result
if request.use_cache:
await cache.set(request.prompt, response.content)
return {
"response": response.content,
"cached": False,
"latency_ms": (time.time() - start) * 1000
}
@app.post("/v1/completions/stream")
async def stream_completion(request: CompletionRequest):
"""Stream completion response."""
async def generate():
# Simulate streaming
response = f"Response to: {request.prompt[:50]}"
for char in response:
yield f"data: {json.dumps({'content': char})}\n\n"
await asyncio.sleep(0.01)
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@app.post("/v1/completions/batch")
async def batch_complete(request: BatchRequest) -> dict:
"""Batch completion."""
import time
start = time.time()
responses = await asyncio.gather(*[
batcher.complete(prompt) for prompt in request.prompts
])
return {
"responses": responses,
"count": len(responses),
"latency_ms": (time.time() - start) * 1000
}
@app.get("/v1/cache/stats")
async def cache_stats() -> dict:
"""Get cache statistics."""
return {
"exact_cache_size": len(cache.exact_cache.cache),
"semantic_cache_size": len(cache.semantic_cache.entries)
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Streaming: https://platform.openai.com/docs/api-reference/streaming
- GPTCache: https://github.com/zilliztech/GPTCache
- Speculative Decoding: https://arxiv.org/abs/2211.17192
- vLLM: https://github.com/vllm-project/vllm
Conclusion
LLM latency optimization requires attacking the problem from multiple angles. Start with streaming—it doesn't reduce total latency but dramatically improves perceived responsiveness by showing users progress immediately. Implement semantic caching for queries that are similar to previous ones; even a 10% cache hit rate can significantly reduce average latency and costs. Optimize your prompts to reduce input tokens—every token saved is latency saved. Use model routing to send simple queries to faster models while reserving expensive models for complex tasks. At the infrastructure level, batch requests when possible, maintain connection pools, and deduplicate concurrent identical requests. Monitor your latency distribution, not just averages—p99 latency often matters more than mean latency for user experience. Consider prewarming connections during low-traffic periods. The key insight is that latency optimization is about the entire pipeline, not just model inference. A well-optimized system with caching, routing, and streaming can feel 10x faster than a naive implementation, even with the same underlying models.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.
