LLM Request Batching: Maximizing Throughput with Parallel Processing

Introduction: Processing LLM requests one at a time is inefficient. When you have multiple independent requests, sequential processing wastes time waiting for each response before starting the next. Batching groups requests together for parallel processing, dramatically improving throughput. But batching LLM requests isn’t straightforward—you need to handle rate limits, manage concurrent connections, deal with partial failures, and ensure responses map back to the correct requests. This guide covers practical batching strategies: request queuing, parallel execution with rate limiting, batch formation policies, and building batching systems that maximize throughput while respecting API constraints.

Request Batching
Batching Pipeline: Request Queue, Batch Formation, Parallel Processing

Request Queue

from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from datetime import datetime
import asyncio
import uuid

@dataclass
class BatchRequest:
    """A request in the batch queue."""
    
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    prompt: str = ""
    messages: list[dict] = None
    model: str = "gpt-4o-mini"
    priority: int = 0  # Higher = more urgent
    created_at: datetime = field(default_factory=datetime.utcnow)
    metadata: dict = field(default_factory=dict)
    
    # For tracking
    future: asyncio.Future = None

@dataclass
class BatchResponse:
    """Response for a batched request."""
    
    request_id: str
    content: str = None
    error: str = None
    latency_ms: float = 0
    success: bool = True

class RequestQueue:
    """Queue for batching requests."""
    
    def __init__(self, max_size: int = 1000):
        self.max_size = max_size
        self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue(maxsize=max_size)
        self._pending: dict[str, BatchRequest] = {}
    
    async def enqueue(self, request: BatchRequest) -> asyncio.Future:
        """Add request to queue."""
        
        if self._queue.full():
            raise RuntimeError("Queue is full")
        
        # Create future for response
        request.future = asyncio.get_event_loop().create_future()
        
        # Priority queue uses (priority, timestamp, request)
        # Negative priority so higher values are processed first
        await self._queue.put((
            -request.priority,
            request.created_at.timestamp(),
            request
        ))
        
        self._pending[request.id] = request
        
        return request.future
    
    async def dequeue(self, count: int = 1) -> list[BatchRequest]:
        """Get requests from queue."""
        
        requests = []
        
        for _ in range(count):
            if self._queue.empty():
                break
            
            _, _, request = await self._queue.get()
            requests.append(request)
        
        return requests
    
    def complete(self, request_id: str, response: BatchResponse):
        """Complete a request with response."""
        
        if request_id in self._pending:
            request = self._pending.pop(request_id)
            
            if request.future and not request.future.done():
                request.future.set_result(response)
    
    def fail(self, request_id: str, error: str):
        """Fail a request with error."""
        
        if request_id in self._pending:
            request = self._pending.pop(request_id)
            
            if request.future and not request.future.done():
                request.future.set_exception(Exception(error))
    
    @property
    def size(self) -> int:
        """Current queue size."""
        return self._queue.qsize()
    
    @property
    def pending_count(self) -> int:
        """Number of pending requests."""
        return len(self._pending)

class TimedBatchQueue:
    """Queue that forms batches based on time or size."""
    
    def __init__(
        self,
        max_batch_size: int = 10,
        max_wait_ms: int = 100
    ):
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        
        self._requests: list[BatchRequest] = []
        self._lock = asyncio.Lock()
        self._batch_ready = asyncio.Event()
        self._last_add_time: datetime = None
    
    async def add(self, request: BatchRequest) -> asyncio.Future:
        """Add request to batch."""
        
        request.future = asyncio.get_event_loop().create_future()
        
        async with self._lock:
            self._requests.append(request)
            self._last_add_time = datetime.utcnow()
            
            if len(self._requests) >= self.max_batch_size:
                self._batch_ready.set()
        
        return request.future
    
    async def get_batch(self) -> list[BatchRequest]:
        """Get next batch of requests."""
        
        while True:
            # Wait for batch to be ready or timeout
            try:
                await asyncio.wait_for(
                    self._batch_ready.wait(),
                    timeout=self.max_wait_ms / 1000
                )
            except asyncio.TimeoutError:
                pass
            
            async with self._lock:
                if self._requests:
                    batch = self._requests[:self.max_batch_size]
                    self._requests = self._requests[self.max_batch_size:]
                    self._batch_ready.clear()
                    return batch
            
            # Small sleep to prevent busy loop
            await asyncio.sleep(0.01)

Parallel Execution

from dataclasses import dataclass
from typing import Any, Optional
import asyncio
import time

@dataclass
class ExecutionResult:
    """Result of batch execution."""
    
    responses: list[BatchResponse]
    total_time_ms: float
    success_count: int
    failure_count: int

class ParallelExecutor:
    """Execute requests in parallel with rate limiting."""
    
    def __init__(
        self,
        client: Any,
        max_concurrent: int = 10,
        requests_per_minute: int = 500
    ):
        self.client = client
        self.max_concurrent = max_concurrent
        self.requests_per_minute = requests_per_minute
        
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._rate_limiter = RateLimiter(requests_per_minute)
    
    async def execute_batch(
        self,
        requests: list[BatchRequest]
    ) -> ExecutionResult:
        """Execute batch of requests in parallel."""
        
        start_time = time.time()
        
        # Create tasks for all requests
        tasks = [
            self._execute_single(request)
            for request in requests
        ]
        
        # Execute in parallel
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        batch_responses = []
        success_count = 0
        failure_count = 0
        
        for request, response in zip(requests, responses):
            if isinstance(response, Exception):
                batch_responses.append(BatchResponse(
                    request_id=request.id,
                    error=str(response),
                    success=False
                ))
                failure_count += 1
            else:
                batch_responses.append(response)
                if response.success:
                    success_count += 1
                else:
                    failure_count += 1
        
        total_time = (time.time() - start_time) * 1000
        
        return ExecutionResult(
            responses=batch_responses,
            total_time_ms=total_time,
            success_count=success_count,
            failure_count=failure_count
        )
    
    async def _execute_single(self, request: BatchRequest) -> BatchResponse:
        """Execute a single request with rate limiting."""
        
        async with self._semaphore:
            await self._rate_limiter.acquire()
            
            start_time = time.time()
            
            try:
                if request.messages:
                    response = await self.client.chat.completions.create(
                        model=request.model,
                        messages=request.messages
                    )
                else:
                    response = await self.client.chat.completions.create(
                        model=request.model,
                        messages=[{"role": "user", "content": request.prompt}]
                    )
                
                latency = (time.time() - start_time) * 1000
                
                return BatchResponse(
                    request_id=request.id,
                    content=response.choices[0].message.content,
                    latency_ms=latency,
                    success=True
                )
            
            except Exception as e:
                latency = (time.time() - start_time) * 1000
                
                return BatchResponse(
                    request_id=request.id,
                    error=str(e),
                    latency_ms=latency,
                    success=False
                )

class RateLimiter:
    """Token bucket rate limiter."""
    
    def __init__(self, requests_per_minute: int):
        self.rate = requests_per_minute / 60  # Requests per second
        self.tokens = requests_per_minute
        self.max_tokens = requests_per_minute
        self.last_update = time.time()
        self._lock = asyncio.Lock()
    
    async def acquire(self):
        """Acquire a token, waiting if necessary."""
        
        async with self._lock:
            now = time.time()
            
            # Refill tokens
            elapsed = now - self.last_update
            self.tokens = min(
                self.max_tokens,
                self.tokens + elapsed * self.rate
            )
            self.last_update = now
            
            if self.tokens < 1:
                # Wait for token
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1

class AdaptiveExecutor:
    """Executor that adapts concurrency based on errors."""
    
    def __init__(
        self,
        client: Any,
        initial_concurrent: int = 10,
        min_concurrent: int = 1,
        max_concurrent: int = 50
    ):
        self.client = client
        self.concurrent = initial_concurrent
        self.min_concurrent = min_concurrent
        self.max_concurrent = max_concurrent
        
        self._error_count = 0
        self._success_count = 0
        self._window_size = 100
    
    async def execute_batch(
        self,
        requests: list[BatchRequest]
    ) -> ExecutionResult:
        """Execute with adaptive concurrency."""
        
        executor = ParallelExecutor(
            self.client,
            max_concurrent=self.concurrent
        )
        
        result = await executor.execute_batch(requests)
        
        # Update stats
        self._success_count += result.success_count
        self._error_count += result.failure_count
        
        # Adapt concurrency
        self._adapt_concurrency(result)
        
        return result
    
    def _adapt_concurrency(self, result: ExecutionResult):
        """Adapt concurrency based on results."""
        
        total = result.success_count + result.failure_count
        if total == 0:
            return
        
        error_rate = result.failure_count / total
        
        if error_rate > 0.1:
            # Too many errors, reduce concurrency
            self.concurrent = max(
                self.min_concurrent,
                int(self.concurrent * 0.8)
            )
        elif error_rate < 0.01 and self.concurrent < self.max_concurrent:
            # Low errors, increase concurrency
            self.concurrent = min(
                self.max_concurrent,
                int(self.concurrent * 1.2)
            )

Batch Processor

from dataclasses import dataclass
from typing import Any, Optional, Callable
import asyncio

@dataclass
class BatchConfig:
    """Configuration for batch processing."""
    
    max_batch_size: int = 10
    max_wait_ms: int = 100
    max_concurrent: int = 10
    requests_per_minute: int = 500
    retry_failed: bool = True
    max_retries: int = 3

class BatchProcessor:
    """Process requests in batches."""
    
    def __init__(
        self,
        client: Any,
        config: BatchConfig = None
    ):
        self.client = client
        self.config = config or BatchConfig()
        
        self._queue = TimedBatchQueue(
            max_batch_size=self.config.max_batch_size,
            max_wait_ms=self.config.max_wait_ms
        )
        
        self._executor = ParallelExecutor(
            client,
            max_concurrent=self.config.max_concurrent,
            requests_per_minute=self.config.requests_per_minute
        )
        
        self._running = False
        self._processor_task: asyncio.Task = None
        
        # Stats
        self._total_processed = 0
        self._total_failed = 0
    
    async def start(self):
        """Start the batch processor."""
        
        if self._running:
            return
        
        self._running = True
        self._processor_task = asyncio.create_task(self._process_loop())
    
    async def stop(self):
        """Stop the batch processor."""
        
        self._running = False
        
        if self._processor_task:
            self._processor_task.cancel()
            try:
                await self._processor_task
            except asyncio.CancelledError:
                pass
    
    async def submit(
        self,
        prompt: str = None,
        messages: list[dict] = None,
        model: str = "gpt-4o-mini",
        priority: int = 0,
        **metadata
    ) -> BatchResponse:
        """Submit a request for batch processing."""
        
        request = BatchRequest(
            prompt=prompt,
            messages=messages,
            model=model,
            priority=priority,
            metadata=metadata
        )
        
        future = await self._queue.add(request)
        return await future
    
    async def _process_loop(self):
        """Main processing loop."""
        
        while self._running:
            try:
                # Get next batch
                batch = await self._queue.get_batch()
                
                if not batch:
                    continue
                
                # Execute batch
                result = await self._executor.execute_batch(batch)
                
                # Handle results
                for response in result.responses:
                    request = next(
                        (r for r in batch if r.id == response.request_id),
                        None
                    )
                    
                    if request and request.future:
                        if not request.future.done():
                            request.future.set_result(response)
                
                # Update stats
                self._total_processed += result.success_count
                self._total_failed += result.failure_count
                
                # Retry failed if configured
                if self.config.retry_failed:
                    await self._retry_failed(batch, result)
            
            except asyncio.CancelledError:
                break
            except Exception as e:
                # Log error but continue processing
                print(f"Batch processing error: {e}")
                await asyncio.sleep(1)
    
    async def _retry_failed(
        self,
        batch: list[BatchRequest],
        result: ExecutionResult
    ):
        """Retry failed requests."""
        
        for response in result.responses:
            if not response.success:
                request = next(
                    (r for r in batch if r.id == response.request_id),
                    None
                )
                
                if request:
                    retry_count = request.metadata.get("retry_count", 0)
                    
                    if retry_count < self.config.max_retries:
                        # Re-queue with incremented retry count
                        new_request = BatchRequest(
                            prompt=request.prompt,
                            messages=request.messages,
                            model=request.model,
                            priority=request.priority + 1,  # Higher priority
                            metadata={
                                **request.metadata,
                                "retry_count": retry_count + 1
                            }
                        )
                        new_request.future = request.future
                        await self._queue.add(new_request)
    
    @property
    def stats(self) -> dict:
        """Get processor statistics."""
        
        return {
            "queue_size": self._queue._requests.__len__() if hasattr(self._queue, '_requests') else 0,
            "total_processed": self._total_processed,
            "total_failed": self._total_failed,
            "success_rate": self._total_processed / (self._total_processed + self._total_failed) if (self._total_processed + self._total_failed) > 0 else 0
        }

OpenAI Batch API

from dataclasses import dataclass
from typing import Any, Optional
import json
import asyncio

@dataclass
class BatchJob:
    """OpenAI batch job."""
    
    id: str
    status: str
    input_file_id: str
    output_file_id: str = None
    error_file_id: str = None
    created_at: int = 0
    completed_at: int = None
    request_counts: dict = None

class OpenAIBatchClient:
    """Client for OpenAI Batch API."""
    
    def __init__(self, client: Any):
        self.client = client
    
    async def create_batch(
        self,
        requests: list[dict],
        description: str = None
    ) -> BatchJob:
        """Create a batch job."""
        
        # Format requests for batch API
        batch_requests = []
        for i, req in enumerate(requests):
            batch_requests.append({
                "custom_id": req.get("custom_id", f"request-{i}"),
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": req.get("model", "gpt-4o-mini"),
                    "messages": req.get("messages", []),
                    "max_tokens": req.get("max_tokens", 1000)
                }
            })
        
        # Create JSONL content
        jsonl_content = "\n".join(json.dumps(r) for r in batch_requests)
        
        # Upload input file
        input_file = await self.client.files.create(
            file=jsonl_content.encode(),
            purpose="batch"
        )
        
        # Create batch
        batch = await self.client.batches.create(
            input_file_id=input_file.id,
            endpoint="/v1/chat/completions",
            completion_window="24h",
            metadata={"description": description} if description else None
        )
        
        return BatchJob(
            id=batch.id,
            status=batch.status,
            input_file_id=input_file.id,
            created_at=batch.created_at
        )
    
    async def get_batch_status(self, batch_id: str) -> BatchJob:
        """Get batch job status."""
        
        batch = await self.client.batches.retrieve(batch_id)
        
        return BatchJob(
            id=batch.id,
            status=batch.status,
            input_file_id=batch.input_file_id,
            output_file_id=batch.output_file_id,
            error_file_id=batch.error_file_id,
            created_at=batch.created_at,
            completed_at=batch.completed_at,
            request_counts=batch.request_counts
        )
    
    async def wait_for_completion(
        self,
        batch_id: str,
        poll_interval: int = 60
    ) -> BatchJob:
        """Wait for batch to complete."""
        
        while True:
            job = await self.get_batch_status(batch_id)
            
            if job.status in ["completed", "failed", "expired", "cancelled"]:
                return job
            
            await asyncio.sleep(poll_interval)
    
    async def get_results(self, batch_id: str) -> list[dict]:
        """Get batch results."""
        
        job = await self.get_batch_status(batch_id)
        
        if job.status != "completed":
            raise RuntimeError(f"Batch not completed: {job.status}")
        
        if not job.output_file_id:
            return []
        
        # Download output file
        content = await self.client.files.content(job.output_file_id)
        
        # Parse JSONL
        results = []
        for line in content.text.split("\n"):
            if line.strip():
                results.append(json.loads(line))
        
        return results
    
    async def cancel_batch(self, batch_id: str) -> BatchJob:
        """Cancel a batch job."""
        
        batch = await self.client.batches.cancel(batch_id)
        
        return BatchJob(
            id=batch.id,
            status=batch.status,
            input_file_id=batch.input_file_id
        )

class BatchAPIProcessor:
    """Process large batches using OpenAI Batch API."""
    
    def __init__(self, client: Any):
        self.batch_client = OpenAIBatchClient(client)
    
    async def process_batch(
        self,
        requests: list[dict],
        wait_for_completion: bool = True
    ) -> dict:
        """Process batch of requests."""
        
        # Create batch job
        job = await self.batch_client.create_batch(requests)
        
        if not wait_for_completion:
            return {"job_id": job.id, "status": job.status}
        
        # Wait for completion
        completed_job = await self.batch_client.wait_for_completion(job.id)
        
        if completed_job.status != "completed":
            return {
                "job_id": job.id,
                "status": completed_job.status,
                "error": "Batch did not complete successfully"
            }
        
        # Get results
        results = await self.batch_client.get_results(job.id)
        
        return {
            "job_id": job.id,
            "status": "completed",
            "results": results,
            "request_counts": completed_job.request_counts
        }

Production Batching Service

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio

app = FastAPI()

# Initialize components
batch_processor = None  # Initialize with client
openai_batch_client = None  # Initialize with client

class SubmitRequest(BaseModel):
    prompt: Optional[str] = None
    messages: Optional[list[dict]] = None
    model: str = "gpt-4o-mini"
    priority: int = 0

class BatchSubmitRequest(BaseModel):
    requests: list[dict]
    use_batch_api: bool = False

@app.on_event("startup")
async def startup():
    """Start batch processor on startup."""
    await batch_processor.start()

@app.on_event("shutdown")
async def shutdown():
    """Stop batch processor on shutdown."""
    await batch_processor.stop()

@app.post("/v1/submit")
async def submit_request(request: SubmitRequest):
    """Submit a single request for batch processing."""
    
    response = await batch_processor.submit(
        prompt=request.prompt,
        messages=request.messages,
        model=request.model,
        priority=request.priority
    )
    
    return {
        "request_id": response.request_id,
        "content": response.content,
        "success": response.success,
        "error": response.error,
        "latency_ms": response.latency_ms
    }

@app.post("/v1/batch/submit")
async def submit_batch(request: BatchSubmitRequest):
    """Submit multiple requests."""
    
    if request.use_batch_api:
        # Use OpenAI Batch API for large batches
        result = await openai_batch_client.process_batch(
            request.requests,
            wait_for_completion=False
        )
        return result
    
    # Use real-time batching
    tasks = [
        batch_processor.submit(
            prompt=req.get("prompt"),
            messages=req.get("messages"),
            model=req.get("model", "gpt-4o-mini")
        )
        for req in request.requests
    ]
    
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    
    results = []
    for i, response in enumerate(responses):
        if isinstance(response, Exception):
            results.append({
                "index": i,
                "success": False,
                "error": str(response)
            })
        else:
            results.append({
                "index": i,
                "success": response.success,
                "content": response.content,
                "error": response.error
            })
    
    return {"results": results}

@app.get("/v1/batch/{job_id}")
async def get_batch_status(job_id: str):
    """Get batch job status."""
    
    job = await openai_batch_client.get_batch_status(job_id)
    
    return {
        "job_id": job.id,
        "status": job.status,
        "request_counts": job.request_counts
    }

@app.get("/v1/batch/{job_id}/results")
async def get_batch_results(job_id: str):
    """Get batch job results."""
    
    try:
        results = await openai_batch_client.get_results(job_id)
        return {"results": results}
    except RuntimeError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/v1/stats")
async def get_stats():
    """Get processor statistics."""
    
    return batch_processor.stats

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Request batching is essential for high-throughput LLM applications. For real-time use cases, implement timed batch queues that collect requests and process them together with parallel execution. Use rate limiters to stay within API limits while maximizing throughput. Implement adaptive concurrency that backs off when errors increase and scales up when the API is healthy. For large offline workloads, use OpenAI's Batch API which offers 50% cost savings and higher rate limits in exchange for longer processing times. The key insight is that batching trades latency for throughput—individual requests may wait slightly longer, but overall system capacity increases dramatically. Design your batching strategy based on your latency requirements: aggressive batching for background processing, lighter batching for interactive applications. Monitor queue depths and processing times to tune batch sizes and concurrency for your specific workload patterns.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.