Introduction: Batch inference optimization is critical for cost-effective LLM deployment at scale. Processing requests individually wastes GPU resources—the model loads weights once but processes only a single sequence. Batching multiple requests together amortizes this overhead, dramatically improving throughput and reducing per-request costs. This guide covers the techniques that make batch inference efficient: dynamic batching strategies, request queuing, padding optimization, continuous batching, and async processing patterns. Understanding these patterns helps you build systems that handle high request volumes while maintaining acceptable latency. Whether you’re running inference on your own GPUs or optimizing API costs, these techniques will help you serve more requests with the same resources.

Basic Batching
from dataclasses import dataclass, field
from typing import Any, Optional, List, Callable
from datetime import datetime
import asyncio
import time
@dataclass
class InferenceRequest:
"""A single inference request."""
request_id: str
prompt: str
max_tokens: int = 256
temperature: float = 0.7
created_at: datetime = field(default_factory=datetime.now)
callback: Optional[Callable] = None
@dataclass
class InferenceResponse:
"""Response for an inference request."""
request_id: str
output: str
tokens_generated: int
latency_ms: float
class SimpleBatcher:
"""Simple fixed-size batching."""
def __init__(
self,
model: Any,
batch_size: int = 8,
max_wait_ms: int = 100
):
self.model = model
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.queue: list[InferenceRequest] = []
self.lock = asyncio.Lock()
async def add_request(self, request: InferenceRequest) -> InferenceResponse:
"""Add request and wait for response."""
async with self.lock:
self.queue.append(request)
# Check if batch is ready
if len(self.queue) >= self.batch_size:
batch = self.queue[:self.batch_size]
self.queue = self.queue[self.batch_size:]
return await self._process_batch(batch, request.request_id)
# Wait for batch to fill or timeout
await asyncio.sleep(self.max_wait_ms / 1000)
async with self.lock:
if request in self.queue:
# Process partial batch
batch = self.queue.copy()
self.queue = []
return await self._process_batch(batch, request.request_id)
# Request was processed in another batch
return None
async def _process_batch(
self,
batch: list[InferenceRequest],
target_id: str
) -> Optional[InferenceResponse]:
"""Process a batch of requests."""
start = time.time()
# Prepare batch inputs
prompts = [r.prompt for r in batch]
# Run inference
outputs = await self.model.generate_batch(prompts)
latency = (time.time() - start) * 1000
# Find target response
for i, request in enumerate(batch):
if request.request_id == target_id:
return InferenceResponse(
request_id=target_id,
output=outputs[i],
tokens_generated=len(outputs[i].split()),
latency_ms=latency
)
return None
class TimeBasedBatcher:
"""Batch requests based on time windows."""
def __init__(
self,
model: Any,
window_ms: int = 50,
max_batch_size: int = 32
):
self.model = model
self.window_ms = window_ms
self.max_batch_size = max_batch_size
self.queue: list[InferenceRequest] = []
self.futures: dict[str, asyncio.Future] = {}
self.lock = asyncio.Lock()
self.processing = False
async def add_request(self, request: InferenceRequest) -> InferenceResponse:
"""Add request and wait for response."""
future = asyncio.Future()
async with self.lock:
self.queue.append(request)
self.futures[request.request_id] = future
if not self.processing:
self.processing = True
asyncio.create_task(self._process_window())
return await future
async def _process_window(self):
"""Process requests in time window."""
# Wait for window
await asyncio.sleep(self.window_ms / 1000)
async with self.lock:
if not self.queue:
self.processing = False
return
# Take batch
batch = self.queue[:self.max_batch_size]
self.queue = self.queue[self.max_batch_size:]
# Continue processing if more requests
if self.queue:
asyncio.create_task(self._process_window())
else:
self.processing = False
# Process batch
start = time.time()
prompts = [r.prompt for r in batch]
outputs = await self.model.generate_batch(prompts)
latency = (time.time() - start) * 1000
# Resolve futures
for i, request in enumerate(batch):
future = self.futures.pop(request.request_id, None)
if future:
future.set_result(InferenceResponse(
request_id=request.request_id,
output=outputs[i],
tokens_generated=len(outputs[i].split()),
latency_ms=latency
))
class AdaptiveBatcher:
"""Adaptive batching based on load."""
def __init__(
self,
model: Any,
min_batch_size: int = 1,
max_batch_size: int = 32,
target_latency_ms: int = 100
):
self.model = model
self.min_batch_size = min_batch_size
self.max_batch_size = max_batch_size
self.target_latency = target_latency_ms
self.queue: list[InferenceRequest] = []
self.futures: dict[str, asyncio.Future] = {}
self.lock = asyncio.Lock()
# Adaptive parameters
self.current_batch_size = min_batch_size
self.latency_history: list[float] = []
async def add_request(self, request: InferenceRequest) -> InferenceResponse:
"""Add request with adaptive batching."""
future = asyncio.Future()
async with self.lock:
self.queue.append(request)
self.futures[request.request_id] = future
if len(self.queue) >= self.current_batch_size:
batch = self.queue[:self.current_batch_size]
self.queue = self.queue[self.current_batch_size:]
asyncio.create_task(self._process_batch(batch))
return await future
async def _process_batch(self, batch: list[InferenceRequest]):
"""Process batch and adapt size."""
start = time.time()
prompts = [r.prompt for r in batch]
outputs = await self.model.generate_batch(prompts)
latency = (time.time() - start) * 1000
# Update latency history
self.latency_history.append(latency)
if len(self.latency_history) > 10:
self.latency_history = self.latency_history[-10:]
# Adapt batch size
avg_latency = sum(self.latency_history) / len(self.latency_history)
if avg_latency < self.target_latency * 0.8:
# Can increase batch size
self.current_batch_size = min(
self.current_batch_size + 1,
self.max_batch_size
)
elif avg_latency > self.target_latency * 1.2:
# Need to decrease batch size
self.current_batch_size = max(
self.current_batch_size - 1,
self.min_batch_size
)
# Resolve futures
for i, request in enumerate(batch):
future = self.futures.pop(request.request_id, None)
if future:
future.set_result(InferenceResponse(
request_id=request.request_id,
output=outputs[i],
tokens_generated=len(outputs[i].split()),
latency_ms=latency
))
Dynamic Batching
from dataclasses import dataclass, field
from typing import Any, Optional, List
import asyncio
import heapq
@dataclass
class PrioritizedRequest:
"""Request with priority."""
priority: int
request: InferenceRequest
def __lt__(self, other):
return self.priority < other.priority
class PriorityBatcher:
"""Batch with priority queue."""
def __init__(
self,
model: Any,
batch_size: int = 16,
max_wait_ms: int = 100
):
self.model = model
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.queue: list[PrioritizedRequest] = []
self.futures: dict[str, asyncio.Future] = {}
self.lock = asyncio.Lock()
async def add_request(
self,
request: InferenceRequest,
priority: int = 5
) -> InferenceResponse:
"""Add request with priority (lower = higher priority)."""
future = asyncio.Future()
async with self.lock:
heapq.heappush(
self.queue,
PrioritizedRequest(priority, request)
)
self.futures[request.request_id] = future
# Start processing if needed
asyncio.create_task(self._maybe_process())
return await future
async def _maybe_process(self):
"""Process if batch is ready."""
await asyncio.sleep(self.max_wait_ms / 1000)
async with self.lock:
if not self.queue:
return
# Take highest priority requests
batch = []
while self.queue and len(batch) < self.batch_size:
item = heapq.heappop(self.queue)
batch.append(item.request)
if batch:
await self._process_batch(batch)
async def _process_batch(self, batch: list[InferenceRequest]):
"""Process batch."""
start = time.time()
prompts = [r.prompt for r in batch]
outputs = await self.model.generate_batch(prompts)
latency = (time.time() - start) * 1000
for i, request in enumerate(batch):
future = self.futures.pop(request.request_id, None)
if future:
future.set_result(InferenceResponse(
request_id=request.request_id,
output=outputs[i],
tokens_generated=len(outputs[i].split()),
latency_ms=latency
))
class LengthAwareBatcher:
"""Batch requests by similar lengths."""
def __init__(
self,
model: Any,
max_batch_tokens: int = 4096,
max_batch_size: int = 32
):
self.model = model
self.max_batch_tokens = max_batch_tokens
self.max_batch_size = max_batch_size
# Buckets by length
self.buckets: dict[int, list[InferenceRequest]] = {}
self.futures: dict[str, asyncio.Future] = {}
self.lock = asyncio.Lock()
def _get_bucket(self, length: int) -> int:
"""Get bucket for length."""
# Buckets: 0-128, 128-256, 256-512, 512-1024, 1024+
if length < 128:
return 128
elif length < 256:
return 256
elif length < 512:
return 512
elif length < 1024:
return 1024
else:
return 2048
async def add_request(self, request: InferenceRequest) -> InferenceResponse:
"""Add request to appropriate bucket."""
future = asyncio.Future()
length = len(request.prompt.split())
bucket = self._get_bucket(length)
async with self.lock:
if bucket not in self.buckets:
self.buckets[bucket] = []
self.buckets[bucket].append(request)
self.futures[request.request_id] = future
# Check if bucket is ready
bucket_requests = self.buckets[bucket]
total_tokens = sum(len(r.prompt.split()) for r in bucket_requests)
if (len(bucket_requests) >= self.max_batch_size or
total_tokens >= self.max_batch_tokens):
batch = bucket_requests[:self.max_batch_size]
self.buckets[bucket] = bucket_requests[self.max_batch_size:]
asyncio.create_task(self._process_batch(batch))
return await future
async def _process_batch(self, batch: list[InferenceRequest]):
"""Process batch."""
start = time.time()
prompts = [r.prompt for r in batch]
outputs = await self.model.generate_batch(prompts)
latency = (time.time() - start) * 1000
for i, request in enumerate(batch):
future = self.futures.pop(request.request_id, None)
if future:
future.set_result(InferenceResponse(
request_id=request.request_id,
output=outputs[i],
tokens_generated=len(outputs[i].split()),
latency_ms=latency
))
class ContinuousBatcher:
"""Continuous batching for streaming inference."""
def __init__(
self,
model: Any,
max_batch_size: int = 32,
max_sequence_length: int = 2048
):
self.model = model
self.max_batch_size = max_batch_size
self.max_sequence_length = max_sequence_length
self.active_sequences: dict[str, dict] = {}
self.pending_requests: list[InferenceRequest] = []
self.lock = asyncio.Lock()
self.running = False
async def add_request(self, request: InferenceRequest) -> InferenceResponse:
"""Add request to continuous batch."""
future = asyncio.Future()
async with self.lock:
self.pending_requests.append(request)
self.active_sequences[request.request_id] = {
"request": request,
"future": future,
"tokens": [],
"done": False
}
if not self.running:
self.running = True
asyncio.create_task(self._run_continuous())
return await future
async def _run_continuous(self):
"""Run continuous batching loop."""
while True:
async with self.lock:
# Add pending requests to active batch
while (self.pending_requests and
len(self.active_sequences) < self.max_batch_size):
request = self.pending_requests.pop(0)
# Already added in add_request
# Get active sequences
active = [
seq for seq in self.active_sequences.values()
if not seq["done"]
]
if not active:
self.running = False
return
# Run one step of generation
prompts = [seq["request"].prompt for seq in active]
partial_outputs = [seq["tokens"] for seq in active]
# Generate next tokens
next_tokens = await self.model.generate_step(prompts, partial_outputs)
# Update sequences
async with self.lock:
for i, seq in enumerate(active):
token = next_tokens[i]
seq["tokens"].append(token)
# Check if done
if (token == "" or
len(seq["tokens"]) >= seq["request"].max_tokens):
seq["done"] = True
# Resolve future
output = " ".join(seq["tokens"])
seq["future"].set_result(InferenceResponse(
request_id=seq["request"].request_id,
output=output,
tokens_generated=len(seq["tokens"]),
latency_ms=0 # Would track actual latency
))
del self.active_sequences[seq["request"].request_id]
Padding and Memory Optimization
from dataclasses import dataclass
from typing import Any, List, Tuple
import numpy as np
@dataclass
class PaddingConfig:
"""Configuration for padding."""
pad_token_id: int = 0
padding_side: str = "right" # or "left"
max_length: int = 2048
class PaddingOptimizer:
"""Optimize padding for batched inference."""
def __init__(self, config: PaddingConfig = None):
self.config = config or PaddingConfig()
def pad_batch(
self,
sequences: list[list[int]],
max_length: int = None
) -> Tuple[np.ndarray, np.ndarray]:
"""Pad sequences to same length."""
max_len = max_length or max(len(s) for s in sequences)
max_len = min(max_len, self.config.max_length)
batch_size = len(sequences)
padded = np.full((batch_size, max_len), self.config.pad_token_id)
attention_mask = np.zeros((batch_size, max_len))
for i, seq in enumerate(sequences):
seq_len = min(len(seq), max_len)
if self.config.padding_side == "right":
padded[i, :seq_len] = seq[:seq_len]
attention_mask[i, :seq_len] = 1
else:
padded[i, -seq_len:] = seq[:seq_len]
attention_mask[i, -seq_len:] = 1
return padded, attention_mask
def compute_padding_efficiency(
self,
sequences: list[list[int]]
) -> float:
"""Compute padding efficiency."""
if not sequences:
return 0.0
total_tokens = sum(len(s) for s in sequences)
max_len = max(len(s) for s in sequences)
padded_tokens = len(sequences) * max_len
return total_tokens / padded_tokens if padded_tokens > 0 else 0.0
class BucketedPadding:
"""Bucket sequences by length to minimize padding."""
def __init__(
self,
bucket_boundaries: list[int] = None,
pad_token_id: int = 0
):
self.boundaries = bucket_boundaries or [64, 128, 256, 512, 1024, 2048]
self.pad_token_id = pad_token_id
def get_bucket(self, length: int) -> int:
"""Get bucket for sequence length."""
for boundary in self.boundaries:
if length <= boundary:
return boundary
return self.boundaries[-1]
def bucket_sequences(
self,
sequences: list[list[int]]
) -> dict[int, list[list[int]]]:
"""Group sequences by bucket."""
buckets = {}
for seq in sequences:
bucket = self.get_bucket(len(seq))
if bucket not in buckets:
buckets[bucket] = []
buckets[bucket].append(seq)
return buckets
def pad_bucket(
self,
sequences: list[list[int]],
bucket_size: int
) -> np.ndarray:
"""Pad sequences in a bucket."""
batch_size = len(sequences)
padded = np.full((batch_size, bucket_size), self.pad_token_id)
for i, seq in enumerate(sequences):
seq_len = min(len(seq), bucket_size)
padded[i, :seq_len] = seq[:seq_len]
return padded
class PackedSequences:
"""Pack multiple sequences without padding."""
def __init__(self, max_length: int = 2048):
self.max_length = max_length
def pack(
self,
sequences: list[list[int]],
separator_token: int = -1
) -> Tuple[list[int], list[Tuple[int, int]]]:
"""Pack sequences into single sequence."""
packed = []
positions = [] # (start, end) for each sequence
for seq in sequences:
if len(packed) + len(seq) + 1 > self.max_length:
break
start = len(packed)
packed.extend(seq)
if separator_token >= 0:
packed.append(separator_token)
positions.append((start, len(packed)))
return packed, positions
def unpack(
self,
packed_output: list[int],
positions: list[Tuple[int, int]]
) -> list[list[int]]:
"""Unpack outputs back to individual sequences."""
outputs = []
for start, end in positions:
outputs.append(packed_output[start:end])
return outputs
class MemoryEfficientBatcher:
"""Memory-efficient batching with gradient checkpointing."""
def __init__(
self,
model: Any,
max_memory_mb: int = 8000,
tokens_per_mb: int = 1000
):
self.model = model
self.max_memory = max_memory_mb
self.tokens_per_mb = tokens_per_mb
def estimate_memory(
self,
batch_size: int,
sequence_length: int
) -> float:
"""Estimate memory usage in MB."""
total_tokens = batch_size * sequence_length
return total_tokens / self.tokens_per_mb
def find_optimal_batch_size(
self,
sequences: list[list[int]]
) -> int:
"""Find optimal batch size for memory."""
if not sequences:
return 0
max_len = max(len(s) for s in sequences)
# Binary search for optimal batch size
low, high = 1, len(sequences)
optimal = 1
while low <= high:
mid = (low + high) // 2
memory = self.estimate_memory(mid, max_len)
if memory <= self.max_memory:
optimal = mid
low = mid + 1
else:
high = mid - 1
return optimal
async def process_with_memory_limit(
self,
sequences: list[list[int]]
) -> list[list[int]]:
"""Process sequences within memory limit."""
results = []
while sequences:
batch_size = self.find_optimal_batch_size(sequences)
batch = sequences[:batch_size]
sequences = sequences[batch_size:]
# Process batch
outputs = await self.model.generate_batch(batch)
results.extend(outputs)
return results
Async Processing
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import asyncio
from concurrent.futures import ThreadPoolExecutor
@dataclass
class AsyncBatchConfig:
"""Configuration for async batching."""
max_concurrent_batches: int = 4
batch_size: int = 16
timeout_seconds: float = 30.0
class AsyncBatchProcessor:
"""Process batches asynchronously."""
def __init__(
self,
model: Any,
config: AsyncBatchConfig = None
):
self.model = model
self.config = config or AsyncBatchConfig()
self.semaphore = asyncio.Semaphore(self.config.max_concurrent_batches)
self.queue: asyncio.Queue = asyncio.Queue()
self.results: dict[str, asyncio.Future] = {}
async def submit(self, request: InferenceRequest) -> InferenceResponse:
"""Submit request for processing."""
future = asyncio.Future()
self.results[request.request_id] = future
await self.queue.put(request)
try:
return await asyncio.wait_for(
future,
timeout=self.config.timeout_seconds
)
except asyncio.TimeoutError:
raise TimeoutError(f"Request {request.request_id} timed out")
async def start_workers(self, num_workers: int = 2):
"""Start batch processing workers."""
workers = [
asyncio.create_task(self._worker())
for _ in range(num_workers)
]
return workers
async def _worker(self):
"""Worker that processes batches."""
while True:
batch = []
# Collect batch
try:
while len(batch) < self.config.batch_size:
request = await asyncio.wait_for(
self.queue.get(),
timeout=0.1
)
batch.append(request)
except asyncio.TimeoutError:
pass
if batch:
async with self.semaphore:
await self._process_batch(batch)
async def _process_batch(self, batch: list[InferenceRequest]):
"""Process a batch."""
start = time.time()
prompts = [r.prompt for r in batch]
outputs = await self.model.generate_batch(prompts)
latency = (time.time() - start) * 1000
for i, request in enumerate(batch):
future = self.results.pop(request.request_id, None)
if future and not future.done():
future.set_result(InferenceResponse(
request_id=request.request_id,
output=outputs[i],
tokens_generated=len(outputs[i].split()),
latency_ms=latency
))
class ParallelBatchProcessor:
"""Process multiple batches in parallel."""
def __init__(
self,
models: list[Any],
batch_size: int = 16
):
self.models = models
self.batch_size = batch_size
self.model_queues = [asyncio.Queue() for _ in models]
self.current_model = 0
async def submit(self, request: InferenceRequest) -> InferenceResponse:
"""Submit to least loaded model."""
# Round-robin distribution
model_idx = self.current_model
self.current_model = (self.current_model + 1) % len(self.models)
future = asyncio.Future()
await self.model_queues[model_idx].put((request, future))
return await future
async def start(self):
"""Start all model workers."""
workers = []
for i, model in enumerate(self.models):
worker = asyncio.create_task(
self._model_worker(i, model)
)
workers.append(worker)
return workers
async def _model_worker(self, model_idx: int, model: Any):
"""Worker for a single model."""
queue = self.model_queues[model_idx]
while True:
batch = []
futures = []
# Collect batch
while len(batch) < self.batch_size:
try:
request, future = await asyncio.wait_for(
queue.get(),
timeout=0.05
)
batch.append(request)
futures.append(future)
except asyncio.TimeoutError:
break
if batch:
start = time.time()
prompts = [r.prompt for r in batch]
outputs = await model.generate_batch(prompts)
latency = (time.time() - start) * 1000
for i, (request, future) in enumerate(zip(batch, futures)):
if not future.done():
future.set_result(InferenceResponse(
request_id=request.request_id,
output=outputs[i],
tokens_generated=len(outputs[i].split()),
latency_ms=latency
))
class StreamingBatchProcessor:
"""Process batches with streaming output."""
def __init__(self, model: Any, batch_size: int = 8):
self.model = model
self.batch_size = batch_size
async def process_streaming(
self,
requests: list[InferenceRequest]
) -> AsyncIterator[Tuple[str, str]]:
"""Process with streaming output."""
# Group into batches
for i in range(0, len(requests), self.batch_size):
batch = requests[i:i + self.batch_size]
prompts = [r.prompt for r in batch]
# Stream tokens
async for tokens in self.model.generate_stream_batch(prompts):
for j, token in enumerate(tokens):
if token:
yield batch[j].request_id, token
Production Batch Service
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import uuid
import time
app = FastAPI()
class BatchRequest(BaseModel):
prompts: list[str]
max_tokens: Optional[int] = 256
temperature: Optional[float] = 0.7
class BatchResponse(BaseModel):
batch_id: str
outputs: list[str]
total_tokens: int
latency_ms: float
throughput_tokens_per_sec: float
class AsyncBatchRequest(BaseModel):
prompts: list[str]
callback_url: Optional[str] = None
class BatchStatus(BaseModel):
batch_id: str
status: str # pending, processing, completed, failed
progress: float
outputs: Optional[list[str]] = None
# Mock model
class MockModel:
async def generate_batch(self, prompts):
await asyncio.sleep(0.1 * len(prompts))
return [f"Response to: {p[:30]}..." for p in prompts]
model = MockModel()
batch_jobs: dict[str, BatchStatus] = {}
@app.post("/v1/batch/sync")
async def batch_sync(request: BatchRequest) -> BatchResponse:
"""Synchronous batch inference."""
start = time.time()
outputs = await model.generate_batch(request.prompts)
latency = (time.time() - start) * 1000
total_tokens = sum(len(o.split()) for o in outputs)
throughput = total_tokens / (latency / 1000) if latency > 0 else 0
return BatchResponse(
batch_id=str(uuid.uuid4()),
outputs=outputs,
total_tokens=total_tokens,
latency_ms=latency,
throughput_tokens_per_sec=throughput
)
@app.post("/v1/batch/async")
async def batch_async(
request: AsyncBatchRequest,
background_tasks: BackgroundTasks
) -> dict:
"""Asynchronous batch inference."""
batch_id = str(uuid.uuid4())
batch_jobs[batch_id] = BatchStatus(
batch_id=batch_id,
status="pending",
progress=0.0
)
background_tasks.add_task(
process_batch_async,
batch_id,
request.prompts,
request.callback_url
)
return {"batch_id": batch_id, "status": "pending"}
async def process_batch_async(
batch_id: str,
prompts: list[str],
callback_url: str = None
):
"""Process batch in background."""
batch_jobs[batch_id].status = "processing"
try:
outputs = await model.generate_batch(prompts)
batch_jobs[batch_id].status = "completed"
batch_jobs[batch_id].progress = 1.0
batch_jobs[batch_id].outputs = outputs
# Call webhook if provided
if callback_url:
import httpx
async with httpx.AsyncClient() as client:
await client.post(callback_url, json={
"batch_id": batch_id,
"outputs": outputs
})
except Exception as e:
batch_jobs[batch_id].status = "failed"
@app.get("/v1/batch/{batch_id}")
async def get_batch_status(batch_id: str) -> BatchStatus:
"""Get batch job status."""
if batch_id not in batch_jobs:
raise HTTPException(status_code=404, detail="Batch not found")
return batch_jobs[batch_id]
@app.get("/v1/batch/stats")
async def get_stats() -> dict:
"""Get batching statistics."""
total = len(batch_jobs)
completed = sum(1 for j in batch_jobs.values() if j.status == "completed")
pending = sum(1 for j in batch_jobs.values() if j.status == "pending")
processing = sum(1 for j in batch_jobs.values() if j.status == "processing")
return {
"total_batches": total,
"completed": completed,
"pending": pending,
"processing": processing
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- vLLM: https://github.com/vllm-project/vllm
- TensorRT-LLM: https://github.com/NVIDIA/TensorRT-LLM
- Text Generation Inference: https://github.com/huggingface/text-generation-inference
- Orca Paper: https://www.usenix.org/conference/osdi22/presentation/yu
Conclusion
Batch inference optimization is essential for cost-effective LLM deployment. The key insight is that GPU utilization improves dramatically with batching—a single request might use only 10% of available compute, while a batch of 16 can approach 80%+ utilization. Dynamic batching adapts to traffic patterns, collecting requests during low-traffic periods and processing immediately during high-traffic bursts. Length-aware batching groups similar-length sequences to minimize padding waste—a batch of 128-token and 2048-token sequences wastes 90% of compute on padding. Continuous batching takes this further by allowing new requests to join mid-generation, maximizing GPU utilization for streaming workloads. Memory optimization through bucketed padding and sequence packing reduces memory pressure, allowing larger batch sizes. Async processing with multiple workers enables parallel batch execution across multiple GPUs or model replicas. For production systems, implement priority queues to ensure latency-sensitive requests get processed quickly while background tasks can wait for fuller batches. Monitor your padding efficiency and batch utilization metrics—these directly translate to cost savings. The goal is maximizing tokens processed per GPU-second while meeting latency SLAs for different request priorities.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.
