Latest Articles

Batch Inference Optimization: Maximizing Throughput and Minimizing Costs

Introduction: Batch inference optimization is critical for cost-effective LLM deployment at scale. Processing requests individually wastes GPU resources—the model loads weights once but processes only a single sequence. Batching multiple requests together amortizes this overhead, dramatically improving throughput and reducing per-request costs. This guide covers the techniques that make batch inference efficient: dynamic batching strategies, request queuing, padding optimization, continuous batching, and async processing patterns. Understanding these patterns helps you build systems that handle high request volumes while maintaining acceptable latency. Whether you’re running inference on your own GPUs or optimizing API costs, these techniques will help you serve more requests with the same resources.

Batch Inference Optimization
Batch Processing: Request Queue, Dynamic Batching, Batch Inference

Basic Batching

from dataclasses import dataclass, field
from typing import Any, Optional, List, Callable
from datetime import datetime
import asyncio
import time

@dataclass
class InferenceRequest:
    """A single inference request."""
    
    request_id: str
    prompt: str
    max_tokens: int = 256
    temperature: float = 0.7
    created_at: datetime = field(default_factory=datetime.now)
    callback: Optional[Callable] = None

@dataclass
class InferenceResponse:
    """Response for an inference request."""
    
    request_id: str
    output: str
    tokens_generated: int
    latency_ms: float

class SimpleBatcher:
    """Simple fixed-size batching."""
    
    def __init__(
        self,
        model: Any,
        batch_size: int = 8,
        max_wait_ms: int = 100
    ):
        self.model = model
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        
        self.queue: list[InferenceRequest] = []
        self.lock = asyncio.Lock()
    
    async def add_request(self, request: InferenceRequest) -> InferenceResponse:
        """Add request and wait for response."""
        
        async with self.lock:
            self.queue.append(request)
            
            # Check if batch is ready
            if len(self.queue) >= self.batch_size:
                batch = self.queue[:self.batch_size]
                self.queue = self.queue[self.batch_size:]
                return await self._process_batch(batch, request.request_id)
        
        # Wait for batch to fill or timeout
        await asyncio.sleep(self.max_wait_ms / 1000)
        
        async with self.lock:
            if request in self.queue:
                # Process partial batch
                batch = self.queue.copy()
                self.queue = []
                return await self._process_batch(batch, request.request_id)
        
        # Request was processed in another batch
        return None
    
    async def _process_batch(
        self,
        batch: list[InferenceRequest],
        target_id: str
    ) -> Optional[InferenceResponse]:
        """Process a batch of requests."""
        
        start = time.time()
        
        # Prepare batch inputs
        prompts = [r.prompt for r in batch]
        
        # Run inference
        outputs = await self.model.generate_batch(prompts)
        
        latency = (time.time() - start) * 1000
        
        # Find target response
        for i, request in enumerate(batch):
            if request.request_id == target_id:
                return InferenceResponse(
                    request_id=target_id,
                    output=outputs[i],
                    tokens_generated=len(outputs[i].split()),
                    latency_ms=latency
                )
        
        return None

class TimeBasedBatcher:
    """Batch requests based on time windows."""
    
    def __init__(
        self,
        model: Any,
        window_ms: int = 50,
        max_batch_size: int = 32
    ):
        self.model = model
        self.window_ms = window_ms
        self.max_batch_size = max_batch_size
        
        self.queue: list[InferenceRequest] = []
        self.futures: dict[str, asyncio.Future] = {}
        self.lock = asyncio.Lock()
        self.processing = False
    
    async def add_request(self, request: InferenceRequest) -> InferenceResponse:
        """Add request and wait for response."""
        
        future = asyncio.Future()
        
        async with self.lock:
            self.queue.append(request)
            self.futures[request.request_id] = future
            
            if not self.processing:
                self.processing = True
                asyncio.create_task(self._process_window())
        
        return await future
    
    async def _process_window(self):
        """Process requests in time window."""
        
        # Wait for window
        await asyncio.sleep(self.window_ms / 1000)
        
        async with self.lock:
            if not self.queue:
                self.processing = False
                return
            
            # Take batch
            batch = self.queue[:self.max_batch_size]
            self.queue = self.queue[self.max_batch_size:]
            
            # Continue processing if more requests
            if self.queue:
                asyncio.create_task(self._process_window())
            else:
                self.processing = False
        
        # Process batch
        start = time.time()
        prompts = [r.prompt for r in batch]
        outputs = await self.model.generate_batch(prompts)
        latency = (time.time() - start) * 1000
        
        # Resolve futures
        for i, request in enumerate(batch):
            future = self.futures.pop(request.request_id, None)
            if future:
                future.set_result(InferenceResponse(
                    request_id=request.request_id,
                    output=outputs[i],
                    tokens_generated=len(outputs[i].split()),
                    latency_ms=latency
                ))

class AdaptiveBatcher:
    """Adaptive batching based on load."""
    
    def __init__(
        self,
        model: Any,
        min_batch_size: int = 1,
        max_batch_size: int = 32,
        target_latency_ms: int = 100
    ):
        self.model = model
        self.min_batch_size = min_batch_size
        self.max_batch_size = max_batch_size
        self.target_latency = target_latency_ms
        
        self.queue: list[InferenceRequest] = []
        self.futures: dict[str, asyncio.Future] = {}
        self.lock = asyncio.Lock()
        
        # Adaptive parameters
        self.current_batch_size = min_batch_size
        self.latency_history: list[float] = []
    
    async def add_request(self, request: InferenceRequest) -> InferenceResponse:
        """Add request with adaptive batching."""
        
        future = asyncio.Future()
        
        async with self.lock:
            self.queue.append(request)
            self.futures[request.request_id] = future
            
            if len(self.queue) >= self.current_batch_size:
                batch = self.queue[:self.current_batch_size]
                self.queue = self.queue[self.current_batch_size:]
                asyncio.create_task(self._process_batch(batch))
        
        return await future
    
    async def _process_batch(self, batch: list[InferenceRequest]):
        """Process batch and adapt size."""
        
        start = time.time()
        prompts = [r.prompt for r in batch]
        outputs = await self.model.generate_batch(prompts)
        latency = (time.time() - start) * 1000
        
        # Update latency history
        self.latency_history.append(latency)
        if len(self.latency_history) > 10:
            self.latency_history = self.latency_history[-10:]
        
        # Adapt batch size
        avg_latency = sum(self.latency_history) / len(self.latency_history)
        
        if avg_latency < self.target_latency * 0.8:
            # Can increase batch size
            self.current_batch_size = min(
                self.current_batch_size + 1,
                self.max_batch_size
            )
        elif avg_latency > self.target_latency * 1.2:
            # Need to decrease batch size
            self.current_batch_size = max(
                self.current_batch_size - 1,
                self.min_batch_size
            )
        
        # Resolve futures
        for i, request in enumerate(batch):
            future = self.futures.pop(request.request_id, None)
            if future:
                future.set_result(InferenceResponse(
                    request_id=request.request_id,
                    output=outputs[i],
                    tokens_generated=len(outputs[i].split()),
                    latency_ms=latency
                ))

Dynamic Batching

from dataclasses import dataclass, field
from typing import Any, Optional, List
import asyncio
import heapq

@dataclass
class PrioritizedRequest:
    """Request with priority."""
    
    priority: int
    request: InferenceRequest
    
    def __lt__(self, other):
        return self.priority < other.priority

class PriorityBatcher:
    """Batch with priority queue."""
    
    def __init__(
        self,
        model: Any,
        batch_size: int = 16,
        max_wait_ms: int = 100
    ):
        self.model = model
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        
        self.queue: list[PrioritizedRequest] = []
        self.futures: dict[str, asyncio.Future] = {}
        self.lock = asyncio.Lock()
    
    async def add_request(
        self,
        request: InferenceRequest,
        priority: int = 5
    ) -> InferenceResponse:
        """Add request with priority (lower = higher priority)."""
        
        future = asyncio.Future()
        
        async with self.lock:
            heapq.heappush(
                self.queue,
                PrioritizedRequest(priority, request)
            )
            self.futures[request.request_id] = future
        
        # Start processing if needed
        asyncio.create_task(self._maybe_process())
        
        return await future
    
    async def _maybe_process(self):
        """Process if batch is ready."""
        
        await asyncio.sleep(self.max_wait_ms / 1000)
        
        async with self.lock:
            if not self.queue:
                return
            
            # Take highest priority requests
            batch = []
            while self.queue and len(batch) < self.batch_size:
                item = heapq.heappop(self.queue)
                batch.append(item.request)
        
        if batch:
            await self._process_batch(batch)
    
    async def _process_batch(self, batch: list[InferenceRequest]):
        """Process batch."""
        
        start = time.time()
        prompts = [r.prompt for r in batch]
        outputs = await self.model.generate_batch(prompts)
        latency = (time.time() - start) * 1000
        
        for i, request in enumerate(batch):
            future = self.futures.pop(request.request_id, None)
            if future:
                future.set_result(InferenceResponse(
                    request_id=request.request_id,
                    output=outputs[i],
                    tokens_generated=len(outputs[i].split()),
                    latency_ms=latency
                ))

class LengthAwareBatcher:
    """Batch requests by similar lengths."""
    
    def __init__(
        self,
        model: Any,
        max_batch_tokens: int = 4096,
        max_batch_size: int = 32
    ):
        self.model = model
        self.max_batch_tokens = max_batch_tokens
        self.max_batch_size = max_batch_size
        
        # Buckets by length
        self.buckets: dict[int, list[InferenceRequest]] = {}
        self.futures: dict[str, asyncio.Future] = {}
        self.lock = asyncio.Lock()
    
    def _get_bucket(self, length: int) -> int:
        """Get bucket for length."""
        
        # Buckets: 0-128, 128-256, 256-512, 512-1024, 1024+
        if length < 128:
            return 128
        elif length < 256:
            return 256
        elif length < 512:
            return 512
        elif length < 1024:
            return 1024
        else:
            return 2048
    
    async def add_request(self, request: InferenceRequest) -> InferenceResponse:
        """Add request to appropriate bucket."""
        
        future = asyncio.Future()
        length = len(request.prompt.split())
        bucket = self._get_bucket(length)
        
        async with self.lock:
            if bucket not in self.buckets:
                self.buckets[bucket] = []
            
            self.buckets[bucket].append(request)
            self.futures[request.request_id] = future
            
            # Check if bucket is ready
            bucket_requests = self.buckets[bucket]
            total_tokens = sum(len(r.prompt.split()) for r in bucket_requests)
            
            if (len(bucket_requests) >= self.max_batch_size or
                total_tokens >= self.max_batch_tokens):
                batch = bucket_requests[:self.max_batch_size]
                self.buckets[bucket] = bucket_requests[self.max_batch_size:]
                asyncio.create_task(self._process_batch(batch))
        
        return await future
    
    async def _process_batch(self, batch: list[InferenceRequest]):
        """Process batch."""
        
        start = time.time()
        prompts = [r.prompt for r in batch]
        outputs = await self.model.generate_batch(prompts)
        latency = (time.time() - start) * 1000
        
        for i, request in enumerate(batch):
            future = self.futures.pop(request.request_id, None)
            if future:
                future.set_result(InferenceResponse(
                    request_id=request.request_id,
                    output=outputs[i],
                    tokens_generated=len(outputs[i].split()),
                    latency_ms=latency
                ))

class ContinuousBatcher:
    """Continuous batching for streaming inference."""
    
    def __init__(
        self,
        model: Any,
        max_batch_size: int = 32,
        max_sequence_length: int = 2048
    ):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_sequence_length = max_sequence_length
        
        self.active_sequences: dict[str, dict] = {}
        self.pending_requests: list[InferenceRequest] = []
        self.lock = asyncio.Lock()
        self.running = False
    
    async def add_request(self, request: InferenceRequest) -> InferenceResponse:
        """Add request to continuous batch."""
        
        future = asyncio.Future()
        
        async with self.lock:
            self.pending_requests.append(request)
            self.active_sequences[request.request_id] = {
                "request": request,
                "future": future,
                "tokens": [],
                "done": False
            }
            
            if not self.running:
                self.running = True
                asyncio.create_task(self._run_continuous())
        
        return await future
    
    async def _run_continuous(self):
        """Run continuous batching loop."""
        
        while True:
            async with self.lock:
                # Add pending requests to active batch
                while (self.pending_requests and 
                       len(self.active_sequences) < self.max_batch_size):
                    request = self.pending_requests.pop(0)
                    # Already added in add_request
                
                # Get active sequences
                active = [
                    seq for seq in self.active_sequences.values()
                    if not seq["done"]
                ]
                
                if not active:
                    self.running = False
                    return
            
            # Run one step of generation
            prompts = [seq["request"].prompt for seq in active]
            partial_outputs = [seq["tokens"] for seq in active]
            
            # Generate next tokens
            next_tokens = await self.model.generate_step(prompts, partial_outputs)
            
            # Update sequences
            async with self.lock:
                for i, seq in enumerate(active):
                    token = next_tokens[i]
                    seq["tokens"].append(token)
                    
                    # Check if done
                    if (token == "" or 
                        len(seq["tokens"]) >= seq["request"].max_tokens):
                        seq["done"] = True
                        
                        # Resolve future
                        output = " ".join(seq["tokens"])
                        seq["future"].set_result(InferenceResponse(
                            request_id=seq["request"].request_id,
                            output=output,
                            tokens_generated=len(seq["tokens"]),
                            latency_ms=0  # Would track actual latency
                        ))
                        
                        del self.active_sequences[seq["request"].request_id]

Padding and Memory Optimization

from dataclasses import dataclass
from typing import Any, List, Tuple
import numpy as np

@dataclass
class PaddingConfig:
    """Configuration for padding."""
    
    pad_token_id: int = 0
    padding_side: str = "right"  # or "left"
    max_length: int = 2048

class PaddingOptimizer:
    """Optimize padding for batched inference."""
    
    def __init__(self, config: PaddingConfig = None):
        self.config = config or PaddingConfig()
    
    def pad_batch(
        self,
        sequences: list[list[int]],
        max_length: int = None
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Pad sequences to same length."""
        
        max_len = max_length or max(len(s) for s in sequences)
        max_len = min(max_len, self.config.max_length)
        
        batch_size = len(sequences)
        padded = np.full((batch_size, max_len), self.config.pad_token_id)
        attention_mask = np.zeros((batch_size, max_len))
        
        for i, seq in enumerate(sequences):
            seq_len = min(len(seq), max_len)
            
            if self.config.padding_side == "right":
                padded[i, :seq_len] = seq[:seq_len]
                attention_mask[i, :seq_len] = 1
            else:
                padded[i, -seq_len:] = seq[:seq_len]
                attention_mask[i, -seq_len:] = 1
        
        return padded, attention_mask
    
    def compute_padding_efficiency(
        self,
        sequences: list[list[int]]
    ) -> float:
        """Compute padding efficiency."""
        
        if not sequences:
            return 0.0
        
        total_tokens = sum(len(s) for s in sequences)
        max_len = max(len(s) for s in sequences)
        padded_tokens = len(sequences) * max_len
        
        return total_tokens / padded_tokens if padded_tokens > 0 else 0.0

class BucketedPadding:
    """Bucket sequences by length to minimize padding."""
    
    def __init__(
        self,
        bucket_boundaries: list[int] = None,
        pad_token_id: int = 0
    ):
        self.boundaries = bucket_boundaries or [64, 128, 256, 512, 1024, 2048]
        self.pad_token_id = pad_token_id
    
    def get_bucket(self, length: int) -> int:
        """Get bucket for sequence length."""
        
        for boundary in self.boundaries:
            if length <= boundary:
                return boundary
        
        return self.boundaries[-1]
    
    def bucket_sequences(
        self,
        sequences: list[list[int]]
    ) -> dict[int, list[list[int]]]:
        """Group sequences by bucket."""
        
        buckets = {}
        
        for seq in sequences:
            bucket = self.get_bucket(len(seq))
            
            if bucket not in buckets:
                buckets[bucket] = []
            
            buckets[bucket].append(seq)
        
        return buckets
    
    def pad_bucket(
        self,
        sequences: list[list[int]],
        bucket_size: int
    ) -> np.ndarray:
        """Pad sequences in a bucket."""
        
        batch_size = len(sequences)
        padded = np.full((batch_size, bucket_size), self.pad_token_id)
        
        for i, seq in enumerate(sequences):
            seq_len = min(len(seq), bucket_size)
            padded[i, :seq_len] = seq[:seq_len]
        
        return padded

class PackedSequences:
    """Pack multiple sequences without padding."""
    
    def __init__(self, max_length: int = 2048):
        self.max_length = max_length
    
    def pack(
        self,
        sequences: list[list[int]],
        separator_token: int = -1
    ) -> Tuple[list[int], list[Tuple[int, int]]]:
        """Pack sequences into single sequence."""
        
        packed = []
        positions = []  # (start, end) for each sequence
        
        for seq in sequences:
            if len(packed) + len(seq) + 1 > self.max_length:
                break
            
            start = len(packed)
            packed.extend(seq)
            
            if separator_token >= 0:
                packed.append(separator_token)
            
            positions.append((start, len(packed)))
        
        return packed, positions
    
    def unpack(
        self,
        packed_output: list[int],
        positions: list[Tuple[int, int]]
    ) -> list[list[int]]:
        """Unpack outputs back to individual sequences."""
        
        outputs = []
        
        for start, end in positions:
            outputs.append(packed_output[start:end])
        
        return outputs

class MemoryEfficientBatcher:
    """Memory-efficient batching with gradient checkpointing."""
    
    def __init__(
        self,
        model: Any,
        max_memory_mb: int = 8000,
        tokens_per_mb: int = 1000
    ):
        self.model = model
        self.max_memory = max_memory_mb
        self.tokens_per_mb = tokens_per_mb
    
    def estimate_memory(
        self,
        batch_size: int,
        sequence_length: int
    ) -> float:
        """Estimate memory usage in MB."""
        
        total_tokens = batch_size * sequence_length
        return total_tokens / self.tokens_per_mb
    
    def find_optimal_batch_size(
        self,
        sequences: list[list[int]]
    ) -> int:
        """Find optimal batch size for memory."""
        
        if not sequences:
            return 0
        
        max_len = max(len(s) for s in sequences)
        
        # Binary search for optimal batch size
        low, high = 1, len(sequences)
        optimal = 1
        
        while low <= high:
            mid = (low + high) // 2
            memory = self.estimate_memory(mid, max_len)
            
            if memory <= self.max_memory:
                optimal = mid
                low = mid + 1
            else:
                high = mid - 1
        
        return optimal
    
    async def process_with_memory_limit(
        self,
        sequences: list[list[int]]
    ) -> list[list[int]]:
        """Process sequences within memory limit."""
        
        results = []
        
        while sequences:
            batch_size = self.find_optimal_batch_size(sequences)
            batch = sequences[:batch_size]
            sequences = sequences[batch_size:]
            
            # Process batch
            outputs = await self.model.generate_batch(batch)
            results.extend(outputs)
        
        return results

Async Processing

from dataclasses import dataclass
from typing import Any, Optional, List, Callable
import asyncio
from concurrent.futures import ThreadPoolExecutor

@dataclass
class AsyncBatchConfig:
    """Configuration for async batching."""
    
    max_concurrent_batches: int = 4
    batch_size: int = 16
    timeout_seconds: float = 30.0

class AsyncBatchProcessor:
    """Process batches asynchronously."""
    
    def __init__(
        self,
        model: Any,
        config: AsyncBatchConfig = None
    ):
        self.model = model
        self.config = config or AsyncBatchConfig()
        
        self.semaphore = asyncio.Semaphore(self.config.max_concurrent_batches)
        self.queue: asyncio.Queue = asyncio.Queue()
        self.results: dict[str, asyncio.Future] = {}
    
    async def submit(self, request: InferenceRequest) -> InferenceResponse:
        """Submit request for processing."""
        
        future = asyncio.Future()
        self.results[request.request_id] = future
        
        await self.queue.put(request)
        
        try:
            return await asyncio.wait_for(
                future,
                timeout=self.config.timeout_seconds
            )
        except asyncio.TimeoutError:
            raise TimeoutError(f"Request {request.request_id} timed out")
    
    async def start_workers(self, num_workers: int = 2):
        """Start batch processing workers."""
        
        workers = [
            asyncio.create_task(self._worker())
            for _ in range(num_workers)
        ]
        
        return workers
    
    async def _worker(self):
        """Worker that processes batches."""
        
        while True:
            batch = []
            
            # Collect batch
            try:
                while len(batch) < self.config.batch_size:
                    request = await asyncio.wait_for(
                        self.queue.get(),
                        timeout=0.1
                    )
                    batch.append(request)
            except asyncio.TimeoutError:
                pass
            
            if batch:
                async with self.semaphore:
                    await self._process_batch(batch)
    
    async def _process_batch(self, batch: list[InferenceRequest]):
        """Process a batch."""
        
        start = time.time()
        prompts = [r.prompt for r in batch]
        
        outputs = await self.model.generate_batch(prompts)
        
        latency = (time.time() - start) * 1000
        
        for i, request in enumerate(batch):
            future = self.results.pop(request.request_id, None)
            if future and not future.done():
                future.set_result(InferenceResponse(
                    request_id=request.request_id,
                    output=outputs[i],
                    tokens_generated=len(outputs[i].split()),
                    latency_ms=latency
                ))

class ParallelBatchProcessor:
    """Process multiple batches in parallel."""
    
    def __init__(
        self,
        models: list[Any],
        batch_size: int = 16
    ):
        self.models = models
        self.batch_size = batch_size
        self.model_queues = [asyncio.Queue() for _ in models]
        self.current_model = 0
    
    async def submit(self, request: InferenceRequest) -> InferenceResponse:
        """Submit to least loaded model."""
        
        # Round-robin distribution
        model_idx = self.current_model
        self.current_model = (self.current_model + 1) % len(self.models)
        
        future = asyncio.Future()
        await self.model_queues[model_idx].put((request, future))
        
        return await future
    
    async def start(self):
        """Start all model workers."""
        
        workers = []
        for i, model in enumerate(self.models):
            worker = asyncio.create_task(
                self._model_worker(i, model)
            )
            workers.append(worker)
        
        return workers
    
    async def _model_worker(self, model_idx: int, model: Any):
        """Worker for a single model."""
        
        queue = self.model_queues[model_idx]
        
        while True:
            batch = []
            futures = []
            
            # Collect batch
            while len(batch) < self.batch_size:
                try:
                    request, future = await asyncio.wait_for(
                        queue.get(),
                        timeout=0.05
                    )
                    batch.append(request)
                    futures.append(future)
                except asyncio.TimeoutError:
                    break
            
            if batch:
                start = time.time()
                prompts = [r.prompt for r in batch]
                outputs = await model.generate_batch(prompts)
                latency = (time.time() - start) * 1000
                
                for i, (request, future) in enumerate(zip(batch, futures)):
                    if not future.done():
                        future.set_result(InferenceResponse(
                            request_id=request.request_id,
                            output=outputs[i],
                            tokens_generated=len(outputs[i].split()),
                            latency_ms=latency
                        ))

class StreamingBatchProcessor:
    """Process batches with streaming output."""
    
    def __init__(self, model: Any, batch_size: int = 8):
        self.model = model
        self.batch_size = batch_size
    
    async def process_streaming(
        self,
        requests: list[InferenceRequest]
    ) -> AsyncIterator[Tuple[str, str]]:
        """Process with streaming output."""
        
        # Group into batches
        for i in range(0, len(requests), self.batch_size):
            batch = requests[i:i + self.batch_size]
            prompts = [r.prompt for r in batch]
            
            # Stream tokens
            async for tokens in self.model.generate_stream_batch(prompts):
                for j, token in enumerate(tokens):
                    if token:
                        yield batch[j].request_id, token

Production Batch Service

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import uuid
import time

app = FastAPI()

class BatchRequest(BaseModel):
    prompts: list[str]
    max_tokens: Optional[int] = 256
    temperature: Optional[float] = 0.7

class BatchResponse(BaseModel):
    batch_id: str
    outputs: list[str]
    total_tokens: int
    latency_ms: float
    throughput_tokens_per_sec: float

class AsyncBatchRequest(BaseModel):
    prompts: list[str]
    callback_url: Optional[str] = None

class BatchStatus(BaseModel):
    batch_id: str
    status: str  # pending, processing, completed, failed
    progress: float
    outputs: Optional[list[str]] = None

# Mock model
class MockModel:
    async def generate_batch(self, prompts):
        await asyncio.sleep(0.1 * len(prompts))
        return [f"Response to: {p[:30]}..." for p in prompts]

model = MockModel()
batch_jobs: dict[str, BatchStatus] = {}

@app.post("/v1/batch/sync")
async def batch_sync(request: BatchRequest) -> BatchResponse:
    """Synchronous batch inference."""
    
    start = time.time()
    
    outputs = await model.generate_batch(request.prompts)
    
    latency = (time.time() - start) * 1000
    total_tokens = sum(len(o.split()) for o in outputs)
    throughput = total_tokens / (latency / 1000) if latency > 0 else 0
    
    return BatchResponse(
        batch_id=str(uuid.uuid4()),
        outputs=outputs,
        total_tokens=total_tokens,
        latency_ms=latency,
        throughput_tokens_per_sec=throughput
    )

@app.post("/v1/batch/async")
async def batch_async(
    request: AsyncBatchRequest,
    background_tasks: BackgroundTasks
) -> dict:
    """Asynchronous batch inference."""
    
    batch_id = str(uuid.uuid4())
    
    batch_jobs[batch_id] = BatchStatus(
        batch_id=batch_id,
        status="pending",
        progress=0.0
    )
    
    background_tasks.add_task(
        process_batch_async,
        batch_id,
        request.prompts,
        request.callback_url
    )
    
    return {"batch_id": batch_id, "status": "pending"}

async def process_batch_async(
    batch_id: str,
    prompts: list[str],
    callback_url: str = None
):
    """Process batch in background."""
    
    batch_jobs[batch_id].status = "processing"
    
    try:
        outputs = await model.generate_batch(prompts)
        
        batch_jobs[batch_id].status = "completed"
        batch_jobs[batch_id].progress = 1.0
        batch_jobs[batch_id].outputs = outputs
        
        # Call webhook if provided
        if callback_url:
            import httpx
            async with httpx.AsyncClient() as client:
                await client.post(callback_url, json={
                    "batch_id": batch_id,
                    "outputs": outputs
                })
    
    except Exception as e:
        batch_jobs[batch_id].status = "failed"

@app.get("/v1/batch/{batch_id}")
async def get_batch_status(batch_id: str) -> BatchStatus:
    """Get batch job status."""
    
    if batch_id not in batch_jobs:
        raise HTTPException(status_code=404, detail="Batch not found")
    
    return batch_jobs[batch_id]

@app.get("/v1/batch/stats")
async def get_stats() -> dict:
    """Get batching statistics."""
    
    total = len(batch_jobs)
    completed = sum(1 for j in batch_jobs.values() if j.status == "completed")
    pending = sum(1 for j in batch_jobs.values() if j.status == "pending")
    processing = sum(1 for j in batch_jobs.values() if j.status == "processing")
    
    return {
        "total_batches": total,
        "completed": completed,
        "pending": pending,
        "processing": processing
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Batch inference optimization is essential for cost-effective LLM deployment. The key insight is that GPU utilization improves dramatically with batching—a single request might use only 10% of available compute, while a batch of 16 can approach 80%+ utilization. Dynamic batching adapts to traffic patterns, collecting requests during low-traffic periods and processing immediately during high-traffic bursts. Length-aware batching groups similar-length sequences to minimize padding waste—a batch of 128-token and 2048-token sequences wastes 90% of compute on padding. Continuous batching takes this further by allowing new requests to join mid-generation, maximizing GPU utilization for streaming workloads. Memory optimization through bucketed padding and sequence packing reduces memory pressure, allowing larger batch sizes. Async processing with multiple workers enables parallel batch execution across multiple GPUs or model replicas. For production systems, implement priority queues to ensure latency-sensitive requests get processed quickly while background tasks can wait for fuller batches. Monitor your padding efficiency and batch utilization metrics—these directly translate to cost savings. The goal is maximizing tokens processed per GPU-second while meeting latency SLAs for different request priorities.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion