Introduction: Deploying LLMs to production is fundamentally different from deploying traditional ML models. The models are massive, inference is computationally expensive, and latency requirements are stringent. This guide covers the strategies that make LLM deployment practical: model optimization techniques like quantization and pruning, inference serving with batching and caching, containerization with GPU support, auto-scaling based on queue depth and latency, and monitoring for both performance and quality. Whether you’re deploying open-source models like Llama or fine-tuned versions of commercial models, these patterns will help you build reliable, cost-effective inference infrastructure that scales with demand.

Model Optimization
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
import torch
@dataclass
class OptimizationConfig:
"""Configuration for model optimization."""
quantization: str = None # "int8", "int4", "fp16"
pruning_ratio: float = 0.0
use_flash_attention: bool = True
use_kv_cache: bool = True
max_batch_size: int = 32
class ModelOptimizer(ABC):
"""Abstract model optimizer."""
@abstractmethod
def optimize(self, model: Any, config: OptimizationConfig) -> Any:
"""Optimize the model."""
pass
class QuantizationOptimizer(ModelOptimizer):
"""Quantize model weights."""
def optimize(self, model: Any, config: OptimizationConfig) -> Any:
"""Apply quantization."""
if config.quantization == "int8":
return self._quantize_int8(model)
elif config.quantization == "int4":
return self._quantize_int4(model)
elif config.quantization == "fp16":
return self._quantize_fp16(model)
return model
def _quantize_int8(self, model: Any) -> Any:
"""INT8 quantization using bitsandbytes."""
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0,
llm_int8_has_fp16_weight=False
)
return model # Would reload with config
def _quantize_int4(self, model: Any) -> Any:
"""INT4 quantization using GPTQ or AWQ."""
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4"
)
return model
def _quantize_fp16(self, model: Any) -> Any:
"""FP16 conversion."""
return model.half()
class AWQQuantizer:
"""Activation-aware Weight Quantization."""
def __init__(self, calibration_data: list[str]):
self.calibration_data = calibration_data
def quantize(self, model_path: str, output_path: str):
"""Quantize model using AWQ."""
from awq import AutoAWQForCausalLM
from transformers import AutoTokenizer
# Load model
model = AutoAWQForCausalLM.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
# Quantize
quant_config = {
"zero_point": True,
"q_group_size": 128,
"w_bit": 4,
"version": "GEMM"
}
model.quantize(
tokenizer,
quant_config=quant_config,
calib_data=self.calibration_data
)
# Save
model.save_quantized(output_path)
tokenizer.save_pretrained(output_path)
class GPTQQuantizer:
"""GPTQ quantization."""
def __init__(self, bits: int = 4, group_size: int = 128):
self.bits = bits
self.group_size = group_size
def quantize(self, model_path: str, output_path: str, dataset: str = "c4"):
"""Quantize using GPTQ."""
from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path)
quantize_config = BaseQuantizeConfig(
bits=self.bits,
group_size=self.group_size,
desc_act=False
)
model = AutoGPTQForCausalLM.from_pretrained(
model_path,
quantize_config=quantize_config
)
# Quantize with calibration data
model.quantize(self._get_calibration_data(tokenizer, dataset))
# Save
model.save_quantized(output_path)
tokenizer.save_pretrained(output_path)
def _get_calibration_data(self, tokenizer, dataset: str) -> list:
"""Get calibration data."""
from datasets import load_dataset
data = load_dataset(dataset, split="train[:1000]")
return [tokenizer(d["text"]) for d in data]
class ModelPruner:
"""Prune model weights."""
def __init__(self, pruning_ratio: float = 0.3):
self.ratio = pruning_ratio
def prune(self, model: Any) -> Any:
"""Apply magnitude pruning."""
import torch.nn.utils.prune as prune
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear):
prune.l1_unstructured(module, name='weight', amount=self.ratio)
prune.remove(module, 'weight')
return model
class KVCacheOptimizer:
"""Optimize KV cache for inference."""
def __init__(self, max_cache_size: int = 4096):
self.max_cache_size = max_cache_size
def configure(self, model: Any) -> Any:
"""Configure KV cache."""
# Enable KV cache
model.config.use_cache = True
# Set cache parameters
if hasattr(model.config, 'max_position_embeddings'):
model.config.max_position_embeddings = min(
model.config.max_position_embeddings,
self.max_cache_size
)
return model
Inference Serving
from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator
import asyncio
from abc import ABC, abstractmethod
@dataclass
class InferenceRequest:
"""Request for inference."""
id: str
prompt: str
max_tokens: int = 256
temperature: float = 0.7
stream: bool = False
priority: int = 0
@dataclass
class InferenceResponse:
"""Response from inference."""
id: str
text: str
tokens_generated: int
latency_ms: float
finish_reason: str = "stop"
class InferenceEngine(ABC):
"""Abstract inference engine."""
@abstractmethod
async def generate(self, request: InferenceRequest) -> InferenceResponse:
"""Generate response."""
pass
@abstractmethod
async def generate_stream(self, request: InferenceRequest) -> AsyncIterator[str]:
"""Stream response."""
pass
class VLLMEngine(InferenceEngine):
"""vLLM inference engine."""
def __init__(
self,
model_path: str,
tensor_parallel_size: int = 1,
gpu_memory_utilization: float = 0.9
):
from vllm import LLM, SamplingParams
self.llm = LLM(
model=model_path,
tensor_parallel_size=tensor_parallel_size,
gpu_memory_utilization=gpu_memory_utilization,
trust_remote_code=True
)
self.SamplingParams = SamplingParams
async def generate(self, request: InferenceRequest) -> InferenceResponse:
"""Generate with vLLM."""
import time
start = time.time()
sampling_params = self.SamplingParams(
max_tokens=request.max_tokens,
temperature=request.temperature
)
outputs = self.llm.generate([request.prompt], sampling_params)
output = outputs[0]
return InferenceResponse(
id=request.id,
text=output.outputs[0].text,
tokens_generated=len(output.outputs[0].token_ids),
latency_ms=(time.time() - start) * 1000,
finish_reason=output.outputs[0].finish_reason
)
async def generate_stream(self, request: InferenceRequest) -> AsyncIterator[str]:
"""Stream with vLLM."""
sampling_params = self.SamplingParams(
max_tokens=request.max_tokens,
temperature=request.temperature
)
async for output in self.llm.generate_stream(request.prompt, sampling_params):
yield output.outputs[0].text
class TGIEngine(InferenceEngine):
"""Text Generation Inference engine."""
def __init__(self, endpoint: str):
self.endpoint = endpoint
async def generate(self, request: InferenceRequest) -> InferenceResponse:
"""Generate with TGI."""
import aiohttp
import time
start = time.time()
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.endpoint}/generate",
json={
"inputs": request.prompt,
"parameters": {
"max_new_tokens": request.max_tokens,
"temperature": request.temperature
}
}
) as resp:
data = await resp.json()
return InferenceResponse(
id=request.id,
text=data["generated_text"],
tokens_generated=data.get("details", {}).get("generated_tokens", 0),
latency_ms=(time.time() - start) * 1000
)
async def generate_stream(self, request: InferenceRequest) -> AsyncIterator[str]:
"""Stream with TGI."""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.endpoint}/generate_stream",
json={
"inputs": request.prompt,
"parameters": {
"max_new_tokens": request.max_tokens,
"temperature": request.temperature
}
}
) as resp:
async for line in resp.content:
if line:
data = line.decode().strip()
if data.startswith("data:"):
import json
chunk = json.loads(data[5:])
yield chunk.get("token", {}).get("text", "")
class BatchingEngine:
"""Continuous batching for inference."""
def __init__(
self,
engine: InferenceEngine,
max_batch_size: int = 32,
max_wait_ms: float = 50
):
self.engine = engine
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue: asyncio.Queue = asyncio.Queue()
self.results: dict[str, asyncio.Future] = {}
self._running = False
async def start(self):
"""Start batch processing."""
self._running = True
asyncio.create_task(self._process_batches())
async def stop(self):
"""Stop batch processing."""
self._running = False
async def generate(self, request: InferenceRequest) -> InferenceResponse:
"""Submit request for batched processing."""
future = asyncio.Future()
self.results[request.id] = future
await self.queue.put(request)
return await future
async def _process_batches(self):
"""Process requests in batches."""
while self._running:
batch = []
# Collect batch
try:
# Wait for first request
request = await asyncio.wait_for(
self.queue.get(),
timeout=0.1
)
batch.append(request)
# Collect more requests up to batch size or timeout
deadline = asyncio.get_event_loop().time() + self.max_wait_ms / 1000
while len(batch) < self.max_batch_size:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
break
try:
request = await asyncio.wait_for(
self.queue.get(),
timeout=remaining
)
batch.append(request)
except asyncio.TimeoutError:
break
except asyncio.TimeoutError:
continue
# Process batch
if batch:
await self._process_batch(batch)
async def _process_batch(self, batch: list[InferenceRequest]):
"""Process a batch of requests."""
# Process in parallel (engine handles actual batching)
tasks = [self.engine.generate(req) for req in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Return results
for request, result in zip(batch, results):
future = self.results.pop(request.id, None)
if future:
if isinstance(result, Exception):
future.set_exception(result)
else:
future.set_result(result)
Containerization and GPU Support
# Dockerfile for LLM serving
"""
FROM nvidia/cuda:12.1-runtime-ubuntu22.04
# Install Python
RUN apt-get update && apt-get install -y \
python3.10 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Install dependencies
COPY requirements.txt /app/
RUN pip3 install --no-cache-dir -r /app/requirements.txt
# Install vLLM
RUN pip3 install vllm
# Copy application
COPY . /app/
WORKDIR /app
# Set environment variables
ENV CUDA_VISIBLE_DEVICES=0
ENV TRANSFORMERS_CACHE=/app/cache
ENV HF_HOME=/app/cache
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s \
CMD curl -f http://localhost:8000/health || exit 1
# Run server
CMD ["python3", "-m", "vllm.entrypoints.openai.api_server", \
"--model", "/app/model", \
"--host", "0.0.0.0", \
"--port", "8000"]
"""
# Kubernetes deployment
"""
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-server
spec:
replicas: 2
selector:
matchLabels:
app: llm-server
template:
metadata:
labels:
app: llm-server
spec:
containers:
- name: llm-server
image: llm-server:latest
ports:
- containerPort: 8000
resources:
limits:
nvidia.com/gpu: 1
memory: "32Gi"
cpu: "8"
requests:
nvidia.com/gpu: 1
memory: "24Gi"
cpu: "4"
env:
- name: CUDA_VISIBLE_DEVICES
value: "0"
- name: MODEL_PATH
value: "/models/llama-7b"
volumeMounts:
- name: model-storage
mountPath: /models
- name: cache-storage
mountPath: /app/cache
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 120
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 10
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-pvc
- name: cache-storage
emptyDir: {}
nodeSelector:
nvidia.com/gpu.product: NVIDIA-A100-SXM4-40GB
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
---
apiVersion: v1
kind: Service
metadata:
name: llm-service
spec:
selector:
app: llm-server
ports:
- port: 80
targetPort: 8000
type: ClusterIP
"""
# Docker Compose for local development
"""
version: '3.8'
services:
llm-server:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/model
- ./cache:/app/cache
environment:
- CUDA_VISIBLE_DEVICES=0
- MODEL_PATH=/app/model
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 120s
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- llm-server
"""
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class ContainerConfig:
"""Container configuration."""
image: str
gpu_count: int = 1
memory_limit: str = "32Gi"
cpu_limit: str = "8"
model_path: str = "/models"
port: int = 8000
class ContainerBuilder:
"""Build container configurations."""
def __init__(self, config: ContainerConfig):
self.config = config
def generate_dockerfile(self) -> str:
"""Generate Dockerfile."""
return f"""FROM nvidia/cuda:12.1-runtime-ubuntu22.04
RUN apt-get update && apt-get install -y python3.10 python3-pip curl
RUN pip3 install vllm transformers torch
COPY . /app/
WORKDIR /app
ENV CUDA_VISIBLE_DEVICES=0
EXPOSE {self.config.port}
HEALTHCHECK --interval=30s --timeout=10s CMD curl -f http://localhost:{self.config.port}/health || exit 1
CMD ["python3", "-m", "vllm.entrypoints.openai.api_server", "--model", "{self.config.model_path}", "--port", "{self.config.port}"]
"""
def generate_k8s_deployment(self) -> dict:
"""Generate Kubernetes deployment."""
return {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": "llm-server"},
"spec": {
"replicas": 1,
"selector": {"matchLabels": {"app": "llm-server"}},
"template": {
"metadata": {"labels": {"app": "llm-server"}},
"spec": {
"containers": [{
"name": "llm-server",
"image": self.config.image,
"ports": [{"containerPort": self.config.port}],
"resources": {
"limits": {
"nvidia.com/gpu": self.config.gpu_count,
"memory": self.config.memory_limit,
"cpu": self.config.cpu_limit
}
}
}]
}
}
}
}
Auto-Scaling
from dataclasses import dataclass
from typing import Any, Optional
import asyncio
from abc import ABC, abstractmethod
@dataclass
class ScalingMetrics:
"""Metrics for scaling decisions."""
queue_depth: int
avg_latency_ms: float
requests_per_second: float
gpu_utilization: float
memory_utilization: float
@dataclass
class ScalingConfig:
"""Auto-scaling configuration."""
min_replicas: int = 1
max_replicas: int = 10
target_queue_depth: int = 10
target_latency_ms: float = 1000
scale_up_threshold: float = 0.8
scale_down_threshold: float = 0.3
cooldown_seconds: int = 60
class AutoScaler(ABC):
"""Abstract auto-scaler."""
@abstractmethod
async def get_metrics(self) -> ScalingMetrics:
"""Get current metrics."""
pass
@abstractmethod
async def scale(self, replicas: int):
"""Scale to target replicas."""
pass
class KubernetesAutoScaler(AutoScaler):
"""Kubernetes-based auto-scaler."""
def __init__(
self,
namespace: str,
deployment: str,
config: ScalingConfig
):
self.namespace = namespace
self.deployment = deployment
self.config = config
self.last_scale_time = 0
async def get_metrics(self) -> ScalingMetrics:
"""Get metrics from Prometheus/metrics server."""
# Would query Prometheus or metrics API
return ScalingMetrics(
queue_depth=0,
avg_latency_ms=0,
requests_per_second=0,
gpu_utilization=0,
memory_utilization=0
)
async def scale(self, replicas: int):
"""Scale deployment."""
from kubernetes import client, config
config.load_incluster_config()
apps_v1 = client.AppsV1Api()
# Clamp to limits
replicas = max(self.config.min_replicas, min(replicas, self.config.max_replicas))
# Patch deployment
apps_v1.patch_namespaced_deployment_scale(
name=self.deployment,
namespace=self.namespace,
body={"spec": {"replicas": replicas}}
)
async def run(self):
"""Run auto-scaling loop."""
import time
while True:
metrics = await self.get_metrics()
current_replicas = await self._get_current_replicas()
# Calculate desired replicas
desired = self._calculate_desired_replicas(metrics, current_replicas)
# Check cooldown
if time.time() - self.last_scale_time < self.config.cooldown_seconds:
await asyncio.sleep(10)
continue
# Scale if needed
if desired != current_replicas:
await self.scale(desired)
self.last_scale_time = time.time()
await asyncio.sleep(10)
def _calculate_desired_replicas(
self,
metrics: ScalingMetrics,
current: int
) -> int:
"""Calculate desired replica count."""
# Queue-based scaling
if metrics.queue_depth > self.config.target_queue_depth * self.config.scale_up_threshold:
return min(current + 1, self.config.max_replicas)
# Latency-based scaling
if metrics.avg_latency_ms > self.config.target_latency_ms * self.config.scale_up_threshold:
return min(current + 1, self.config.max_replicas)
# Scale down if underutilized
if (metrics.queue_depth < self.config.target_queue_depth * self.config.scale_down_threshold and
metrics.avg_latency_ms < self.config.target_latency_ms * self.config.scale_down_threshold):
return max(current - 1, self.config.min_replicas)
return current
async def _get_current_replicas(self) -> int:
"""Get current replica count."""
from kubernetes import client, config
config.load_incluster_config()
apps_v1 = client.AppsV1Api()
deployment = apps_v1.read_namespaced_deployment(
name=self.deployment,
namespace=self.namespace
)
return deployment.spec.replicas
class QueueBasedScaler:
"""Scale based on request queue depth."""
def __init__(
self,
queue: asyncio.Queue,
scaler: AutoScaler,
requests_per_replica: int = 10
):
self.queue = queue
self.scaler = scaler
self.requests_per_replica = requests_per_replica
async def run(self):
"""Run queue-based scaling."""
while True:
queue_size = self.queue.qsize()
# Calculate needed replicas
needed = max(1, queue_size // self.requests_per_replica)
await self.scaler.scale(needed)
await asyncio.sleep(5)
class PredictiveScaler:
"""Predictive auto-scaling based on patterns."""
def __init__(self, scaler: AutoScaler, history_hours: int = 24):
self.scaler = scaler
self.history_hours = history_hours
self.traffic_history: list[tuple[float, float]] = [] # (timestamp, rps)
def record_traffic(self, rps: float):
"""Record traffic data point."""
import time
self.traffic_history.append((time.time(), rps))
# Trim old data
cutoff = time.time() - self.history_hours * 3600
self.traffic_history = [
(t, r) for t, r in self.traffic_history if t > cutoff
]
def predict_traffic(self, minutes_ahead: int = 15) -> float:
"""Predict traffic for scaling."""
import time
from datetime import datetime
if len(self.traffic_history) < 10:
return 0
# Simple: look at same time yesterday
target_time = time.time() + minutes_ahead * 60
yesterday = target_time - 24 * 3600
# Find closest historical point
closest = min(
self.traffic_history,
key=lambda x: abs(x[0] - yesterday)
)
return closest[1]
async def run(self):
"""Run predictive scaling."""
while True:
predicted_rps = self.predict_traffic(minutes_ahead=15)
# Scale based on prediction
# Assume 10 RPS per replica
needed_replicas = max(1, int(predicted_rps / 10))
await self.scaler.scale(needed_replicas)
await asyncio.sleep(60)
Monitoring and Observability
from dataclasses import dataclass, field
from typing import Any, Optional
import time
from datetime import datetime
@dataclass
class InferenceMetrics:
"""Metrics for a single inference."""
request_id: str
model: str
prompt_tokens: int
completion_tokens: int
latency_ms: float
time_to_first_token_ms: float = 0
tokens_per_second: float = 0
timestamp: datetime = field(default_factory=datetime.now)
class MetricsCollector:
"""Collect and export metrics."""
def __init__(self):
self.metrics: list[InferenceMetrics] = []
self._setup_prometheus()
def _setup_prometheus(self):
"""Setup Prometheus metrics."""
from prometheus_client import Counter, Histogram, Gauge
self.request_counter = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'status']
)
self.latency_histogram = Histogram(
'llm_request_latency_seconds',
'Request latency',
['model'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
self.tokens_histogram = Histogram(
'llm_tokens_generated',
'Tokens generated per request',
['model'],
buckets=[10, 50, 100, 200, 500, 1000, 2000]
)
self.ttft_histogram = Histogram(
'llm_time_to_first_token_seconds',
'Time to first token',
['model'],
buckets=[0.05, 0.1, 0.2, 0.5, 1.0, 2.0]
)
self.active_requests = Gauge(
'llm_active_requests',
'Currently processing requests',
['model']
)
self.queue_depth = Gauge(
'llm_queue_depth',
'Requests waiting in queue',
['model']
)
def record(self, metrics: InferenceMetrics, status: str = "success"):
"""Record inference metrics."""
self.metrics.append(metrics)
# Update Prometheus
self.request_counter.labels(model=metrics.model, status=status).inc()
self.latency_histogram.labels(model=metrics.model).observe(metrics.latency_ms / 1000)
self.tokens_histogram.labels(model=metrics.model).observe(metrics.completion_tokens)
if metrics.time_to_first_token_ms > 0:
self.ttft_histogram.labels(model=metrics.model).observe(
metrics.time_to_first_token_ms / 1000
)
def get_stats(self, window_minutes: int = 5) -> dict:
"""Get aggregated stats."""
cutoff = datetime.now().timestamp() - window_minutes * 60
recent = [m for m in self.metrics if m.timestamp.timestamp() > cutoff]
if not recent:
return {}
latencies = [m.latency_ms for m in recent]
tokens = [m.completion_tokens for m in recent]
return {
"request_count": len(recent),
"avg_latency_ms": sum(latencies) / len(latencies),
"p50_latency_ms": sorted(latencies)[len(latencies) // 2],
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)],
"avg_tokens": sum(tokens) / len(tokens),
"total_tokens": sum(tokens)
}
class QualityMonitor:
"""Monitor output quality."""
def __init__(self, llm_client: Any = None):
self.llm = llm_client
self.quality_scores: list[tuple[str, float]] = []
async def evaluate(self, prompt: str, response: str) -> float:
"""Evaluate response quality."""
if not self.llm:
return 0.5
eval_prompt = f"""Rate the quality of this response on a scale of 0-10.
Consider: relevance, accuracy, completeness, and clarity.
Prompt: {prompt[:200]}
Response: {response[:500]}
Score (just the number):"""
result = await self.llm.complete(eval_prompt)
try:
score = float(result.content.strip()) / 10
except ValueError:
score = 0.5
self.quality_scores.append((response[:100], score))
return score
def get_quality_stats(self) -> dict:
"""Get quality statistics."""
if not self.quality_scores:
return {}
scores = [s for _, s in self.quality_scores]
return {
"avg_quality": sum(scores) / len(scores),
"min_quality": min(scores),
"max_quality": max(scores),
"samples": len(scores)
}
class AlertManager:
"""Manage alerts for LLM service."""
def __init__(self):
self.alert_rules: list[dict] = []
self.active_alerts: list[dict] = []
def add_rule(
self,
name: str,
condition: callable,
severity: str = "warning",
message: str = ""
):
"""Add alert rule."""
self.alert_rules.append({
"name": name,
"condition": condition,
"severity": severity,
"message": message
})
def check_alerts(self, metrics: dict):
"""Check all alert rules."""
for rule in self.alert_rules:
if rule["condition"](metrics):
alert = {
"name": rule["name"],
"severity": rule["severity"],
"message": rule["message"],
"timestamp": datetime.now().isoformat()
}
# Check if already active
if not any(a["name"] == alert["name"] for a in self.active_alerts):
self.active_alerts.append(alert)
self._send_alert(alert)
else:
# Clear alert if condition no longer met
self.active_alerts = [
a for a in self.active_alerts if a["name"] != rule["name"]
]
def _send_alert(self, alert: dict):
"""Send alert notification."""
# Would integrate with PagerDuty, Slack, etc.
print(f"ALERT [{alert['severity']}]: {alert['name']} - {alert['message']}")
# Setup common alerts
def setup_alerts(alert_manager: AlertManager):
"""Setup common LLM service alerts."""
alert_manager.add_rule(
name="high_latency",
condition=lambda m: m.get("p99_latency_ms", 0) > 5000,
severity="warning",
message="P99 latency exceeds 5 seconds"
)
alert_manager.add_rule(
name="error_rate",
condition=lambda m: m.get("error_rate", 0) > 0.05,
severity="critical",
message="Error rate exceeds 5%"
)
alert_manager.add_rule(
name="queue_depth",
condition=lambda m: m.get("queue_depth", 0) > 100,
severity="warning",
message="Request queue depth exceeds 100"
)
Production Deployment Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import asyncio
from prometheus_client import make_asgi_app
app = FastAPI()
# Mount Prometheus metrics
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
class GenerateRequest(BaseModel):
prompt: str
max_tokens: int = 256
temperature: float = 0.7
stream: bool = False
class GenerateResponse(BaseModel):
text: str
tokens: int
latency_ms: float
# Initialize components
metrics_collector = MetricsCollector()
@app.post("/v1/generate")
async def generate(request: GenerateRequest) -> GenerateResponse:
"""Generate text."""
import time
import uuid
start = time.time()
request_id = str(uuid.uuid4())
# Would call actual inference engine
text = f"Response to: {request.prompt[:50]}..."
tokens = len(text.split())
latency = (time.time() - start) * 1000
# Record metrics
metrics_collector.record(InferenceMetrics(
request_id=request_id,
model="default",
prompt_tokens=len(request.prompt.split()),
completion_tokens=tokens,
latency_ms=latency
))
return GenerateResponse(
text=text,
tokens=tokens,
latency_ms=latency
)
@app.get("/v1/stats")
async def get_stats() -> dict:
"""Get service statistics."""
return metrics_collector.get_stats()
@app.get("/health")
async def health():
"""Health check."""
return {
"status": "healthy",
"model_loaded": True
}
@app.get("/ready")
async def ready():
"""Readiness check."""
return {"ready": True}
References
- vLLM: https://github.com/vllm-project/vllm
- Text Generation Inference: https://github.com/huggingface/text-generation-inference
- GPTQ: https://github.com/IST-DASLab/gptq
- AWQ: https://github.com/mit-han-lab/llm-awq
Conclusion
LLM deployment requires balancing latency, throughput, and cost. Start with quantization—INT4 quantization can reduce memory by 4x with minimal quality loss, making larger models practical on smaller GPUs. Use inference engines like vLLM or TGI that implement continuous batching and PagedAttention; they dramatically improve throughput compared to naive implementations. Containerize with proper GPU support and resource limits; LLMs are memory-hungry and will crash without proper constraints. Implement auto-scaling based on queue depth and latency rather than just CPU utilization; LLM workloads have different scaling characteristics than traditional services. Monitor both performance metrics (latency, throughput, GPU utilization) and quality metrics (output coherence, task success rate); degraded quality often precedes performance issues. For production, implement health checks that verify the model can actually generate responses, not just that the process is running. The key insight is that LLM deployment is infrastructure engineering—the same principles of reliability, observability, and scalability apply, but the specific techniques differ due to the unique characteristics of large model inference.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.