LLM Deployment Strategies: From Model Optimization to Production Scaling

Introduction: Deploying LLMs to production is fundamentally different from deploying traditional ML models. The models are massive, inference is computationally expensive, and latency requirements are stringent. This guide covers the strategies that make LLM deployment practical: model optimization techniques like quantization and pruning, inference serving with batching and caching, containerization with GPU support, auto-scaling based on queue depth and latency, and monitoring for both performance and quality. Whether you’re deploying open-source models like Llama or fine-tuned versions of commercial models, these patterns will help you build reliable, cost-effective inference infrastructure that scales with demand.

LLM Deployment Strategies
Deployment Pipeline: Model Optimization, Inference Serving, Auto Scaling

Model Optimization

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
import torch

@dataclass
class OptimizationConfig:
    """Configuration for model optimization."""
    
    quantization: str = None  # "int8", "int4", "fp16"
    pruning_ratio: float = 0.0
    use_flash_attention: bool = True
    use_kv_cache: bool = True
    max_batch_size: int = 32

class ModelOptimizer(ABC):
    """Abstract model optimizer."""
    
    @abstractmethod
    def optimize(self, model: Any, config: OptimizationConfig) -> Any:
        """Optimize the model."""
        pass

class QuantizationOptimizer(ModelOptimizer):
    """Quantize model weights."""
    
    def optimize(self, model: Any, config: OptimizationConfig) -> Any:
        """Apply quantization."""
        
        if config.quantization == "int8":
            return self._quantize_int8(model)
        elif config.quantization == "int4":
            return self._quantize_int4(model)
        elif config.quantization == "fp16":
            return self._quantize_fp16(model)
        
        return model
    
    def _quantize_int8(self, model: Any) -> Any:
        """INT8 quantization using bitsandbytes."""
        
        from transformers import BitsAndBytesConfig
        
        quantization_config = BitsAndBytesConfig(
            load_in_8bit=True,
            llm_int8_threshold=6.0,
            llm_int8_has_fp16_weight=False
        )
        
        return model  # Would reload with config
    
    def _quantize_int4(self, model: Any) -> Any:
        """INT4 quantization using GPTQ or AWQ."""
        
        from transformers import BitsAndBytesConfig
        
        quantization_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4"
        )
        
        return model
    
    def _quantize_fp16(self, model: Any) -> Any:
        """FP16 conversion."""
        return model.half()

class AWQQuantizer:
    """Activation-aware Weight Quantization."""
    
    def __init__(self, calibration_data: list[str]):
        self.calibration_data = calibration_data
    
    def quantize(self, model_path: str, output_path: str):
        """Quantize model using AWQ."""
        
        from awq import AutoAWQForCausalLM
        from transformers import AutoTokenizer
        
        # Load model
        model = AutoAWQForCausalLM.from_pretrained(model_path)
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        
        # Quantize
        quant_config = {
            "zero_point": True,
            "q_group_size": 128,
            "w_bit": 4,
            "version": "GEMM"
        }
        
        model.quantize(
            tokenizer,
            quant_config=quant_config,
            calib_data=self.calibration_data
        )
        
        # Save
        model.save_quantized(output_path)
        tokenizer.save_pretrained(output_path)

class GPTQQuantizer:
    """GPTQ quantization."""
    
    def __init__(self, bits: int = 4, group_size: int = 128):
        self.bits = bits
        self.group_size = group_size
    
    def quantize(self, model_path: str, output_path: str, dataset: str = "c4"):
        """Quantize using GPTQ."""
        
        from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig
        from transformers import AutoTokenizer
        
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        
        quantize_config = BaseQuantizeConfig(
            bits=self.bits,
            group_size=self.group_size,
            desc_act=False
        )
        
        model = AutoGPTQForCausalLM.from_pretrained(
            model_path,
            quantize_config=quantize_config
        )
        
        # Quantize with calibration data
        model.quantize(self._get_calibration_data(tokenizer, dataset))
        
        # Save
        model.save_quantized(output_path)
        tokenizer.save_pretrained(output_path)
    
    def _get_calibration_data(self, tokenizer, dataset: str) -> list:
        """Get calibration data."""
        from datasets import load_dataset
        
        data = load_dataset(dataset, split="train[:1000]")
        return [tokenizer(d["text"]) for d in data]

class ModelPruner:
    """Prune model weights."""
    
    def __init__(self, pruning_ratio: float = 0.3):
        self.ratio = pruning_ratio
    
    def prune(self, model: Any) -> Any:
        """Apply magnitude pruning."""
        
        import torch.nn.utils.prune as prune
        
        for name, module in model.named_modules():
            if isinstance(module, torch.nn.Linear):
                prune.l1_unstructured(module, name='weight', amount=self.ratio)
                prune.remove(module, 'weight')
        
        return model

class KVCacheOptimizer:
    """Optimize KV cache for inference."""
    
    def __init__(self, max_cache_size: int = 4096):
        self.max_cache_size = max_cache_size
    
    def configure(self, model: Any) -> Any:
        """Configure KV cache."""
        
        # Enable KV cache
        model.config.use_cache = True
        
        # Set cache parameters
        if hasattr(model.config, 'max_position_embeddings'):
            model.config.max_position_embeddings = min(
                model.config.max_position_embeddings,
                self.max_cache_size
            )
        
        return model

Inference Serving

from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator
import asyncio
from abc import ABC, abstractmethod

@dataclass
class InferenceRequest:
    """Request for inference."""
    
    id: str
    prompt: str
    max_tokens: int = 256
    temperature: float = 0.7
    stream: bool = False
    priority: int = 0

@dataclass
class InferenceResponse:
    """Response from inference."""
    
    id: str
    text: str
    tokens_generated: int
    latency_ms: float
    finish_reason: str = "stop"

class InferenceEngine(ABC):
    """Abstract inference engine."""
    
    @abstractmethod
    async def generate(self, request: InferenceRequest) -> InferenceResponse:
        """Generate response."""
        pass
    
    @abstractmethod
    async def generate_stream(self, request: InferenceRequest) -> AsyncIterator[str]:
        """Stream response."""
        pass

class VLLMEngine(InferenceEngine):
    """vLLM inference engine."""
    
    def __init__(
        self,
        model_path: str,
        tensor_parallel_size: int = 1,
        gpu_memory_utilization: float = 0.9
    ):
        from vllm import LLM, SamplingParams
        
        self.llm = LLM(
            model=model_path,
            tensor_parallel_size=tensor_parallel_size,
            gpu_memory_utilization=gpu_memory_utilization,
            trust_remote_code=True
        )
        self.SamplingParams = SamplingParams
    
    async def generate(self, request: InferenceRequest) -> InferenceResponse:
        """Generate with vLLM."""
        
        import time
        start = time.time()
        
        sampling_params = self.SamplingParams(
            max_tokens=request.max_tokens,
            temperature=request.temperature
        )
        
        outputs = self.llm.generate([request.prompt], sampling_params)
        
        output = outputs[0]
        
        return InferenceResponse(
            id=request.id,
            text=output.outputs[0].text,
            tokens_generated=len(output.outputs[0].token_ids),
            latency_ms=(time.time() - start) * 1000,
            finish_reason=output.outputs[0].finish_reason
        )
    
    async def generate_stream(self, request: InferenceRequest) -> AsyncIterator[str]:
        """Stream with vLLM."""
        
        sampling_params = self.SamplingParams(
            max_tokens=request.max_tokens,
            temperature=request.temperature
        )
        
        async for output in self.llm.generate_stream(request.prompt, sampling_params):
            yield output.outputs[0].text

class TGIEngine(InferenceEngine):
    """Text Generation Inference engine."""
    
    def __init__(self, endpoint: str):
        self.endpoint = endpoint
    
    async def generate(self, request: InferenceRequest) -> InferenceResponse:
        """Generate with TGI."""
        
        import aiohttp
        import time
        
        start = time.time()
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.endpoint}/generate",
                json={
                    "inputs": request.prompt,
                    "parameters": {
                        "max_new_tokens": request.max_tokens,
                        "temperature": request.temperature
                    }
                }
            ) as resp:
                data = await resp.json()
        
        return InferenceResponse(
            id=request.id,
            text=data["generated_text"],
            tokens_generated=data.get("details", {}).get("generated_tokens", 0),
            latency_ms=(time.time() - start) * 1000
        )
    
    async def generate_stream(self, request: InferenceRequest) -> AsyncIterator[str]:
        """Stream with TGI."""
        
        import aiohttp
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.endpoint}/generate_stream",
                json={
                    "inputs": request.prompt,
                    "parameters": {
                        "max_new_tokens": request.max_tokens,
                        "temperature": request.temperature
                    }
                }
            ) as resp:
                async for line in resp.content:
                    if line:
                        data = line.decode().strip()
                        if data.startswith("data:"):
                            import json
                            chunk = json.loads(data[5:])
                            yield chunk.get("token", {}).get("text", "")

class BatchingEngine:
    """Continuous batching for inference."""
    
    def __init__(
        self,
        engine: InferenceEngine,
        max_batch_size: int = 32,
        max_wait_ms: float = 50
    ):
        self.engine = engine
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        
        self.queue: asyncio.Queue = asyncio.Queue()
        self.results: dict[str, asyncio.Future] = {}
        self._running = False
    
    async def start(self):
        """Start batch processing."""
        
        self._running = True
        asyncio.create_task(self._process_batches())
    
    async def stop(self):
        """Stop batch processing."""
        self._running = False
    
    async def generate(self, request: InferenceRequest) -> InferenceResponse:
        """Submit request for batched processing."""
        
        future = asyncio.Future()
        self.results[request.id] = future
        
        await self.queue.put(request)
        
        return await future
    
    async def _process_batches(self):
        """Process requests in batches."""
        
        while self._running:
            batch = []
            
            # Collect batch
            try:
                # Wait for first request
                request = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=0.1
                )
                batch.append(request)
                
                # Collect more requests up to batch size or timeout
                deadline = asyncio.get_event_loop().time() + self.max_wait_ms / 1000
                
                while len(batch) < self.max_batch_size:
                    remaining = deadline - asyncio.get_event_loop().time()
                    if remaining <= 0:
                        break
                    
                    try:
                        request = await asyncio.wait_for(
                            self.queue.get(),
                            timeout=remaining
                        )
                        batch.append(request)
                    except asyncio.TimeoutError:
                        break
                
            except asyncio.TimeoutError:
                continue
            
            # Process batch
            if batch:
                await self._process_batch(batch)
    
    async def _process_batch(self, batch: list[InferenceRequest]):
        """Process a batch of requests."""
        
        # Process in parallel (engine handles actual batching)
        tasks = [self.engine.generate(req) for req in batch]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Return results
        for request, result in zip(batch, results):
            future = self.results.pop(request.id, None)
            if future:
                if isinstance(result, Exception):
                    future.set_exception(result)
                else:
                    future.set_result(result)

Containerization and GPU Support

# Dockerfile for LLM serving
"""
FROM nvidia/cuda:12.1-runtime-ubuntu22.04

# Install Python
RUN apt-get update && apt-get install -y \
    python3.10 \
    python3-pip \
    && rm -rf /var/lib/apt/lists/*

# Install dependencies
COPY requirements.txt /app/
RUN pip3 install --no-cache-dir -r /app/requirements.txt

# Install vLLM
RUN pip3 install vllm

# Copy application
COPY . /app/
WORKDIR /app

# Set environment variables
ENV CUDA_VISIBLE_DEVICES=0
ENV TRANSFORMERS_CACHE=/app/cache
ENV HF_HOME=/app/cache

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s \
    CMD curl -f http://localhost:8000/health || exit 1

# Run server
CMD ["python3", "-m", "vllm.entrypoints.openai.api_server", \
     "--model", "/app/model", \
     "--host", "0.0.0.0", \
     "--port", "8000"]
"""

# Kubernetes deployment
"""
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-server
spec:
  replicas: 2
  selector:
    matchLabels:
      app: llm-server
  template:
    metadata:
      labels:
        app: llm-server
    spec:
      containers:
      - name: llm-server
        image: llm-server:latest
        ports:
        - containerPort: 8000
        resources:
          limits:
            nvidia.com/gpu: 1
            memory: "32Gi"
            cpu: "8"
          requests:
            nvidia.com/gpu: 1
            memory: "24Gi"
            cpu: "4"
        env:
        - name: CUDA_VISIBLE_DEVICES
          value: "0"
        - name: MODEL_PATH
          value: "/models/llama-7b"
        volumeMounts:
        - name: model-storage
          mountPath: /models
        - name: cache-storage
          mountPath: /app/cache
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 120
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 60
          periodSeconds: 10
      volumes:
      - name: model-storage
        persistentVolumeClaim:
          claimName: model-pvc
      - name: cache-storage
        emptyDir: {}
      nodeSelector:
        nvidia.com/gpu.product: NVIDIA-A100-SXM4-40GB
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule
---
apiVersion: v1
kind: Service
metadata:
  name: llm-service
spec:
  selector:
    app: llm-server
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP
"""

# Docker Compose for local development
"""
version: '3.8'

services:
  llm-server:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./models:/app/model
      - ./cache:/app/cache
    environment:
      - CUDA_VISIBLE_DEVICES=0
      - MODEL_PATH=/app/model
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 120s

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - llm-server
"""

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class ContainerConfig:
    """Container configuration."""
    
    image: str
    gpu_count: int = 1
    memory_limit: str = "32Gi"
    cpu_limit: str = "8"
    model_path: str = "/models"
    port: int = 8000

class ContainerBuilder:
    """Build container configurations."""
    
    def __init__(self, config: ContainerConfig):
        self.config = config
    
    def generate_dockerfile(self) -> str:
        """Generate Dockerfile."""
        
        return f"""FROM nvidia/cuda:12.1-runtime-ubuntu22.04

RUN apt-get update && apt-get install -y python3.10 python3-pip curl
RUN pip3 install vllm transformers torch

COPY . /app/
WORKDIR /app

ENV CUDA_VISIBLE_DEVICES=0
EXPOSE {self.config.port}

HEALTHCHECK --interval=30s --timeout=10s CMD curl -f http://localhost:{self.config.port}/health || exit 1

CMD ["python3", "-m", "vllm.entrypoints.openai.api_server", "--model", "{self.config.model_path}", "--port", "{self.config.port}"]
"""
    
    def generate_k8s_deployment(self) -> dict:
        """Generate Kubernetes deployment."""
        
        return {
            "apiVersion": "apps/v1",
            "kind": "Deployment",
            "metadata": {"name": "llm-server"},
            "spec": {
                "replicas": 1,
                "selector": {"matchLabels": {"app": "llm-server"}},
                "template": {
                    "metadata": {"labels": {"app": "llm-server"}},
                    "spec": {
                        "containers": [{
                            "name": "llm-server",
                            "image": self.config.image,
                            "ports": [{"containerPort": self.config.port}],
                            "resources": {
                                "limits": {
                                    "nvidia.com/gpu": self.config.gpu_count,
                                    "memory": self.config.memory_limit,
                                    "cpu": self.config.cpu_limit
                                }
                            }
                        }]
                    }
                }
            }
        }

Auto-Scaling

from dataclasses import dataclass
from typing import Any, Optional
import asyncio
from abc import ABC, abstractmethod

@dataclass
class ScalingMetrics:
    """Metrics for scaling decisions."""
    
    queue_depth: int
    avg_latency_ms: float
    requests_per_second: float
    gpu_utilization: float
    memory_utilization: float

@dataclass
class ScalingConfig:
    """Auto-scaling configuration."""
    
    min_replicas: int = 1
    max_replicas: int = 10
    target_queue_depth: int = 10
    target_latency_ms: float = 1000
    scale_up_threshold: float = 0.8
    scale_down_threshold: float = 0.3
    cooldown_seconds: int = 60

class AutoScaler(ABC):
    """Abstract auto-scaler."""
    
    @abstractmethod
    async def get_metrics(self) -> ScalingMetrics:
        """Get current metrics."""
        pass
    
    @abstractmethod
    async def scale(self, replicas: int):
        """Scale to target replicas."""
        pass

class KubernetesAutoScaler(AutoScaler):
    """Kubernetes-based auto-scaler."""
    
    def __init__(
        self,
        namespace: str,
        deployment: str,
        config: ScalingConfig
    ):
        self.namespace = namespace
        self.deployment = deployment
        self.config = config
        self.last_scale_time = 0
    
    async def get_metrics(self) -> ScalingMetrics:
        """Get metrics from Prometheus/metrics server."""
        
        # Would query Prometheus or metrics API
        return ScalingMetrics(
            queue_depth=0,
            avg_latency_ms=0,
            requests_per_second=0,
            gpu_utilization=0,
            memory_utilization=0
        )
    
    async def scale(self, replicas: int):
        """Scale deployment."""
        
        from kubernetes import client, config
        
        config.load_incluster_config()
        apps_v1 = client.AppsV1Api()
        
        # Clamp to limits
        replicas = max(self.config.min_replicas, min(replicas, self.config.max_replicas))
        
        # Patch deployment
        apps_v1.patch_namespaced_deployment_scale(
            name=self.deployment,
            namespace=self.namespace,
            body={"spec": {"replicas": replicas}}
        )
    
    async def run(self):
        """Run auto-scaling loop."""
        
        import time
        
        while True:
            metrics = await self.get_metrics()
            current_replicas = await self._get_current_replicas()
            
            # Calculate desired replicas
            desired = self._calculate_desired_replicas(metrics, current_replicas)
            
            # Check cooldown
            if time.time() - self.last_scale_time < self.config.cooldown_seconds:
                await asyncio.sleep(10)
                continue
            
            # Scale if needed
            if desired != current_replicas:
                await self.scale(desired)
                self.last_scale_time = time.time()
            
            await asyncio.sleep(10)
    
    def _calculate_desired_replicas(
        self,
        metrics: ScalingMetrics,
        current: int
    ) -> int:
        """Calculate desired replica count."""
        
        # Queue-based scaling
        if metrics.queue_depth > self.config.target_queue_depth * self.config.scale_up_threshold:
            return min(current + 1, self.config.max_replicas)
        
        # Latency-based scaling
        if metrics.avg_latency_ms > self.config.target_latency_ms * self.config.scale_up_threshold:
            return min(current + 1, self.config.max_replicas)
        
        # Scale down if underutilized
        if (metrics.queue_depth < self.config.target_queue_depth * self.config.scale_down_threshold and
            metrics.avg_latency_ms < self.config.target_latency_ms * self.config.scale_down_threshold):
            return max(current - 1, self.config.min_replicas)
        
        return current
    
    async def _get_current_replicas(self) -> int:
        """Get current replica count."""
        
        from kubernetes import client, config
        
        config.load_incluster_config()
        apps_v1 = client.AppsV1Api()
        
        deployment = apps_v1.read_namespaced_deployment(
            name=self.deployment,
            namespace=self.namespace
        )
        
        return deployment.spec.replicas

class QueueBasedScaler:
    """Scale based on request queue depth."""
    
    def __init__(
        self,
        queue: asyncio.Queue,
        scaler: AutoScaler,
        requests_per_replica: int = 10
    ):
        self.queue = queue
        self.scaler = scaler
        self.requests_per_replica = requests_per_replica
    
    async def run(self):
        """Run queue-based scaling."""
        
        while True:
            queue_size = self.queue.qsize()
            
            # Calculate needed replicas
            needed = max(1, queue_size // self.requests_per_replica)
            
            await self.scaler.scale(needed)
            await asyncio.sleep(5)

class PredictiveScaler:
    """Predictive auto-scaling based on patterns."""
    
    def __init__(self, scaler: AutoScaler, history_hours: int = 24):
        self.scaler = scaler
        self.history_hours = history_hours
        self.traffic_history: list[tuple[float, float]] = []  # (timestamp, rps)
    
    def record_traffic(self, rps: float):
        """Record traffic data point."""
        
        import time
        self.traffic_history.append((time.time(), rps))
        
        # Trim old data
        cutoff = time.time() - self.history_hours * 3600
        self.traffic_history = [
            (t, r) for t, r in self.traffic_history if t > cutoff
        ]
    
    def predict_traffic(self, minutes_ahead: int = 15) -> float:
        """Predict traffic for scaling."""
        
        import time
        from datetime import datetime
        
        if len(self.traffic_history) < 10:
            return 0
        
        # Simple: look at same time yesterday
        target_time = time.time() + minutes_ahead * 60
        yesterday = target_time - 24 * 3600
        
        # Find closest historical point
        closest = min(
            self.traffic_history,
            key=lambda x: abs(x[0] - yesterday)
        )
        
        return closest[1]
    
    async def run(self):
        """Run predictive scaling."""
        
        while True:
            predicted_rps = self.predict_traffic(minutes_ahead=15)
            
            # Scale based on prediction
            # Assume 10 RPS per replica
            needed_replicas = max(1, int(predicted_rps / 10))
            
            await self.scaler.scale(needed_replicas)
            await asyncio.sleep(60)

Monitoring and Observability

from dataclasses import dataclass, field
from typing import Any, Optional
import time
from datetime import datetime

@dataclass
class InferenceMetrics:
    """Metrics for a single inference."""
    
    request_id: str
    model: str
    prompt_tokens: int
    completion_tokens: int
    latency_ms: float
    time_to_first_token_ms: float = 0
    tokens_per_second: float = 0
    timestamp: datetime = field(default_factory=datetime.now)

class MetricsCollector:
    """Collect and export metrics."""
    
    def __init__(self):
        self.metrics: list[InferenceMetrics] = []
        self._setup_prometheus()
    
    def _setup_prometheus(self):
        """Setup Prometheus metrics."""
        
        from prometheus_client import Counter, Histogram, Gauge
        
        self.request_counter = Counter(
            'llm_requests_total',
            'Total LLM requests',
            ['model', 'status']
        )
        
        self.latency_histogram = Histogram(
            'llm_request_latency_seconds',
            'Request latency',
            ['model'],
            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
        )
        
        self.tokens_histogram = Histogram(
            'llm_tokens_generated',
            'Tokens generated per request',
            ['model'],
            buckets=[10, 50, 100, 200, 500, 1000, 2000]
        )
        
        self.ttft_histogram = Histogram(
            'llm_time_to_first_token_seconds',
            'Time to first token',
            ['model'],
            buckets=[0.05, 0.1, 0.2, 0.5, 1.0, 2.0]
        )
        
        self.active_requests = Gauge(
            'llm_active_requests',
            'Currently processing requests',
            ['model']
        )
        
        self.queue_depth = Gauge(
            'llm_queue_depth',
            'Requests waiting in queue',
            ['model']
        )
    
    def record(self, metrics: InferenceMetrics, status: str = "success"):
        """Record inference metrics."""
        
        self.metrics.append(metrics)
        
        # Update Prometheus
        self.request_counter.labels(model=metrics.model, status=status).inc()
        self.latency_histogram.labels(model=metrics.model).observe(metrics.latency_ms / 1000)
        self.tokens_histogram.labels(model=metrics.model).observe(metrics.completion_tokens)
        
        if metrics.time_to_first_token_ms > 0:
            self.ttft_histogram.labels(model=metrics.model).observe(
                metrics.time_to_first_token_ms / 1000
            )
    
    def get_stats(self, window_minutes: int = 5) -> dict:
        """Get aggregated stats."""
        
        cutoff = datetime.now().timestamp() - window_minutes * 60
        recent = [m for m in self.metrics if m.timestamp.timestamp() > cutoff]
        
        if not recent:
            return {}
        
        latencies = [m.latency_ms for m in recent]
        tokens = [m.completion_tokens for m in recent]
        
        return {
            "request_count": len(recent),
            "avg_latency_ms": sum(latencies) / len(latencies),
            "p50_latency_ms": sorted(latencies)[len(latencies) // 2],
            "p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)],
            "avg_tokens": sum(tokens) / len(tokens),
            "total_tokens": sum(tokens)
        }

class QualityMonitor:
    """Monitor output quality."""
    
    def __init__(self, llm_client: Any = None):
        self.llm = llm_client
        self.quality_scores: list[tuple[str, float]] = []
    
    async def evaluate(self, prompt: str, response: str) -> float:
        """Evaluate response quality."""
        
        if not self.llm:
            return 0.5
        
        eval_prompt = f"""Rate the quality of this response on a scale of 0-10.
Consider: relevance, accuracy, completeness, and clarity.

Prompt: {prompt[:200]}

Response: {response[:500]}

Score (just the number):"""
        
        result = await self.llm.complete(eval_prompt)
        
        try:
            score = float(result.content.strip()) / 10
        except ValueError:
            score = 0.5
        
        self.quality_scores.append((response[:100], score))
        return score
    
    def get_quality_stats(self) -> dict:
        """Get quality statistics."""
        
        if not self.quality_scores:
            return {}
        
        scores = [s for _, s in self.quality_scores]
        
        return {
            "avg_quality": sum(scores) / len(scores),
            "min_quality": min(scores),
            "max_quality": max(scores),
            "samples": len(scores)
        }

class AlertManager:
    """Manage alerts for LLM service."""
    
    def __init__(self):
        self.alert_rules: list[dict] = []
        self.active_alerts: list[dict] = []
    
    def add_rule(
        self,
        name: str,
        condition: callable,
        severity: str = "warning",
        message: str = ""
    ):
        """Add alert rule."""
        
        self.alert_rules.append({
            "name": name,
            "condition": condition,
            "severity": severity,
            "message": message
        })
    
    def check_alerts(self, metrics: dict):
        """Check all alert rules."""
        
        for rule in self.alert_rules:
            if rule["condition"](metrics):
                alert = {
                    "name": rule["name"],
                    "severity": rule["severity"],
                    "message": rule["message"],
                    "timestamp": datetime.now().isoformat()
                }
                
                # Check if already active
                if not any(a["name"] == alert["name"] for a in self.active_alerts):
                    self.active_alerts.append(alert)
                    self._send_alert(alert)
            else:
                # Clear alert if condition no longer met
                self.active_alerts = [
                    a for a in self.active_alerts if a["name"] != rule["name"]
                ]
    
    def _send_alert(self, alert: dict):
        """Send alert notification."""
        
        # Would integrate with PagerDuty, Slack, etc.
        print(f"ALERT [{alert['severity']}]: {alert['name']} - {alert['message']}")

# Setup common alerts
def setup_alerts(alert_manager: AlertManager):
    """Setup common LLM service alerts."""
    
    alert_manager.add_rule(
        name="high_latency",
        condition=lambda m: m.get("p99_latency_ms", 0) > 5000,
        severity="warning",
        message="P99 latency exceeds 5 seconds"
    )
    
    alert_manager.add_rule(
        name="error_rate",
        condition=lambda m: m.get("error_rate", 0) > 0.05,
        severity="critical",
        message="Error rate exceeds 5%"
    )
    
    alert_manager.add_rule(
        name="queue_depth",
        condition=lambda m: m.get("queue_depth", 0) > 100,
        severity="warning",
        message="Request queue depth exceeds 100"
    )

Production Deployment Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
import asyncio
from prometheus_client import make_asgi_app

app = FastAPI()

# Mount Prometheus metrics
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

class GenerateRequest(BaseModel):
    prompt: str
    max_tokens: int = 256
    temperature: float = 0.7
    stream: bool = False

class GenerateResponse(BaseModel):
    text: str
    tokens: int
    latency_ms: float

# Initialize components
metrics_collector = MetricsCollector()

@app.post("/v1/generate")
async def generate(request: GenerateRequest) -> GenerateResponse:
    """Generate text."""
    
    import time
    import uuid
    
    start = time.time()
    request_id = str(uuid.uuid4())
    
    # Would call actual inference engine
    text = f"Response to: {request.prompt[:50]}..."
    tokens = len(text.split())
    
    latency = (time.time() - start) * 1000
    
    # Record metrics
    metrics_collector.record(InferenceMetrics(
        request_id=request_id,
        model="default",
        prompt_tokens=len(request.prompt.split()),
        completion_tokens=tokens,
        latency_ms=latency
    ))
    
    return GenerateResponse(
        text=text,
        tokens=tokens,
        latency_ms=latency
    )

@app.get("/v1/stats")
async def get_stats() -> dict:
    """Get service statistics."""
    
    return metrics_collector.get_stats()

@app.get("/health")
async def health():
    """Health check."""
    
    return {
        "status": "healthy",
        "model_loaded": True
    }

@app.get("/ready")
async def ready():
    """Readiness check."""
    
    return {"ready": True}

References

Conclusion

LLM deployment requires balancing latency, throughput, and cost. Start with quantization—INT4 quantization can reduce memory by 4x with minimal quality loss, making larger models practical on smaller GPUs. Use inference engines like vLLM or TGI that implement continuous batching and PagedAttention; they dramatically improve throughput compared to naive implementations. Containerize with proper GPU support and resource limits; LLMs are memory-hungry and will crash without proper constraints. Implement auto-scaling based on queue depth and latency rather than just CPU utilization; LLM workloads have different scaling characteristics than traditional services. Monitor both performance metrics (latency, throughput, GPU utilization) and quality metrics (output coherence, task success rate); degraded quality often precedes performance issues. For production, implement health checks that verify the model can actually generate responses, not just that the process is running. The key insight is that LLM deployment is infrastructure engineering—the same principles of reliability, observability, and scalability apply, but the specific techniques differ due to the unique characteristics of large model inference.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.