Introduction: Processing LLM requests one at a time is inefficient. When you have multiple independent requests, sequential processing wastes time waiting for each response before starting the next. Batching groups requests together for parallel processing, dramatically improving throughput. But batching LLM requests isn’t straightforward—you need to handle rate limits, manage concurrent connections, deal with partial failures, and ensure responses map back to the correct requests. This guide covers practical batching strategies: request queuing, parallel execution with rate limiting, batch formation policies, and building batching systems that maximize throughput while respecting API constraints.

Request Queue
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from datetime import datetime
import asyncio
import uuid
@dataclass
class BatchRequest:
"""A request in the batch queue."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
prompt: str = ""
messages: list[dict] = None
model: str = "gpt-4o-mini"
priority: int = 0 # Higher = more urgent
created_at: datetime = field(default_factory=datetime.utcnow)
metadata: dict = field(default_factory=dict)
# For tracking
future: asyncio.Future = None
@dataclass
class BatchResponse:
"""Response for a batched request."""
request_id: str
content: str = None
error: str = None
latency_ms: float = 0
success: bool = True
class RequestQueue:
"""Queue for batching requests."""
def __init__(self, max_size: int = 1000):
self.max_size = max_size
self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue(maxsize=max_size)
self._pending: dict[str, BatchRequest] = {}
async def enqueue(self, request: BatchRequest) -> asyncio.Future:
"""Add request to queue."""
if self._queue.full():
raise RuntimeError("Queue is full")
# Create future for response
request.future = asyncio.get_event_loop().create_future()
# Priority queue uses (priority, timestamp, request)
# Negative priority so higher values are processed first
await self._queue.put((
-request.priority,
request.created_at.timestamp(),
request
))
self._pending[request.id] = request
return request.future
async def dequeue(self, count: int = 1) -> list[BatchRequest]:
"""Get requests from queue."""
requests = []
for _ in range(count):
if self._queue.empty():
break
_, _, request = await self._queue.get()
requests.append(request)
return requests
def complete(self, request_id: str, response: BatchResponse):
"""Complete a request with response."""
if request_id in self._pending:
request = self._pending.pop(request_id)
if request.future and not request.future.done():
request.future.set_result(response)
def fail(self, request_id: str, error: str):
"""Fail a request with error."""
if request_id in self._pending:
request = self._pending.pop(request_id)
if request.future and not request.future.done():
request.future.set_exception(Exception(error))
@property
def size(self) -> int:
"""Current queue size."""
return self._queue.qsize()
@property
def pending_count(self) -> int:
"""Number of pending requests."""
return len(self._pending)
class TimedBatchQueue:
"""Queue that forms batches based on time or size."""
def __init__(
self,
max_batch_size: int = 10,
max_wait_ms: int = 100
):
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self._requests: list[BatchRequest] = []
self._lock = asyncio.Lock()
self._batch_ready = asyncio.Event()
self._last_add_time: datetime = None
async def add(self, request: BatchRequest) -> asyncio.Future:
"""Add request to batch."""
request.future = asyncio.get_event_loop().create_future()
async with self._lock:
self._requests.append(request)
self._last_add_time = datetime.utcnow()
if len(self._requests) >= self.max_batch_size:
self._batch_ready.set()
return request.future
async def get_batch(self) -> list[BatchRequest]:
"""Get next batch of requests."""
while True:
# Wait for batch to be ready or timeout
try:
await asyncio.wait_for(
self._batch_ready.wait(),
timeout=self.max_wait_ms / 1000
)
except asyncio.TimeoutError:
pass
async with self._lock:
if self._requests:
batch = self._requests[:self.max_batch_size]
self._requests = self._requests[self.max_batch_size:]
self._batch_ready.clear()
return batch
# Small sleep to prevent busy loop
await asyncio.sleep(0.01)
Parallel Execution
from dataclasses import dataclass
from typing import Any, Optional
import asyncio
import time
@dataclass
class ExecutionResult:
"""Result of batch execution."""
responses: list[BatchResponse]
total_time_ms: float
success_count: int
failure_count: int
class ParallelExecutor:
"""Execute requests in parallel with rate limiting."""
def __init__(
self,
client: Any,
max_concurrent: int = 10,
requests_per_minute: int = 500
):
self.client = client
self.max_concurrent = max_concurrent
self.requests_per_minute = requests_per_minute
self._semaphore = asyncio.Semaphore(max_concurrent)
self._rate_limiter = RateLimiter(requests_per_minute)
async def execute_batch(
self,
requests: list[BatchRequest]
) -> ExecutionResult:
"""Execute batch of requests in parallel."""
start_time = time.time()
# Create tasks for all requests
tasks = [
self._execute_single(request)
for request in requests
]
# Execute in parallel
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
batch_responses = []
success_count = 0
failure_count = 0
for request, response in zip(requests, responses):
if isinstance(response, Exception):
batch_responses.append(BatchResponse(
request_id=request.id,
error=str(response),
success=False
))
failure_count += 1
else:
batch_responses.append(response)
if response.success:
success_count += 1
else:
failure_count += 1
total_time = (time.time() - start_time) * 1000
return ExecutionResult(
responses=batch_responses,
total_time_ms=total_time,
success_count=success_count,
failure_count=failure_count
)
async def _execute_single(self, request: BatchRequest) -> BatchResponse:
"""Execute a single request with rate limiting."""
async with self._semaphore:
await self._rate_limiter.acquire()
start_time = time.time()
try:
if request.messages:
response = await self.client.chat.completions.create(
model=request.model,
messages=request.messages
)
else:
response = await self.client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.prompt}]
)
latency = (time.time() - start_time) * 1000
return BatchResponse(
request_id=request.id,
content=response.choices[0].message.content,
latency_ms=latency,
success=True
)
except Exception as e:
latency = (time.time() - start_time) * 1000
return BatchResponse(
request_id=request.id,
error=str(e),
latency_ms=latency,
success=False
)
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, requests_per_minute: int):
self.rate = requests_per_minute / 60 # Requests per second
self.tokens = requests_per_minute
self.max_tokens = requests_per_minute
self.last_update = time.time()
self._lock = asyncio.Lock()
async def acquire(self):
"""Acquire a token, waiting if necessary."""
async with self._lock:
now = time.time()
# Refill tokens
elapsed = now - self.last_update
self.tokens = min(
self.max_tokens,
self.tokens + elapsed * self.rate
)
self.last_update = now
if self.tokens < 1:
# Wait for token
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
class AdaptiveExecutor:
"""Executor that adapts concurrency based on errors."""
def __init__(
self,
client: Any,
initial_concurrent: int = 10,
min_concurrent: int = 1,
max_concurrent: int = 50
):
self.client = client
self.concurrent = initial_concurrent
self.min_concurrent = min_concurrent
self.max_concurrent = max_concurrent
self._error_count = 0
self._success_count = 0
self._window_size = 100
async def execute_batch(
self,
requests: list[BatchRequest]
) -> ExecutionResult:
"""Execute with adaptive concurrency."""
executor = ParallelExecutor(
self.client,
max_concurrent=self.concurrent
)
result = await executor.execute_batch(requests)
# Update stats
self._success_count += result.success_count
self._error_count += result.failure_count
# Adapt concurrency
self._adapt_concurrency(result)
return result
def _adapt_concurrency(self, result: ExecutionResult):
"""Adapt concurrency based on results."""
total = result.success_count + result.failure_count
if total == 0:
return
error_rate = result.failure_count / total
if error_rate > 0.1:
# Too many errors, reduce concurrency
self.concurrent = max(
self.min_concurrent,
int(self.concurrent * 0.8)
)
elif error_rate < 0.01 and self.concurrent < self.max_concurrent:
# Low errors, increase concurrency
self.concurrent = min(
self.max_concurrent,
int(self.concurrent * 1.2)
)
Batch Processor
from dataclasses import dataclass
from typing import Any, Optional, Callable
import asyncio
@dataclass
class BatchConfig:
"""Configuration for batch processing."""
max_batch_size: int = 10
max_wait_ms: int = 100
max_concurrent: int = 10
requests_per_minute: int = 500
retry_failed: bool = True
max_retries: int = 3
class BatchProcessor:
"""Process requests in batches."""
def __init__(
self,
client: Any,
config: BatchConfig = None
):
self.client = client
self.config = config or BatchConfig()
self._queue = TimedBatchQueue(
max_batch_size=self.config.max_batch_size,
max_wait_ms=self.config.max_wait_ms
)
self._executor = ParallelExecutor(
client,
max_concurrent=self.config.max_concurrent,
requests_per_minute=self.config.requests_per_minute
)
self._running = False
self._processor_task: asyncio.Task = None
# Stats
self._total_processed = 0
self._total_failed = 0
async def start(self):
"""Start the batch processor."""
if self._running:
return
self._running = True
self._processor_task = asyncio.create_task(self._process_loop())
async def stop(self):
"""Stop the batch processor."""
self._running = False
if self._processor_task:
self._processor_task.cancel()
try:
await self._processor_task
except asyncio.CancelledError:
pass
async def submit(
self,
prompt: str = None,
messages: list[dict] = None,
model: str = "gpt-4o-mini",
priority: int = 0,
**metadata
) -> BatchResponse:
"""Submit a request for batch processing."""
request = BatchRequest(
prompt=prompt,
messages=messages,
model=model,
priority=priority,
metadata=metadata
)
future = await self._queue.add(request)
return await future
async def _process_loop(self):
"""Main processing loop."""
while self._running:
try:
# Get next batch
batch = await self._queue.get_batch()
if not batch:
continue
# Execute batch
result = await self._executor.execute_batch(batch)
# Handle results
for response in result.responses:
request = next(
(r for r in batch if r.id == response.request_id),
None
)
if request and request.future:
if not request.future.done():
request.future.set_result(response)
# Update stats
self._total_processed += result.success_count
self._total_failed += result.failure_count
# Retry failed if configured
if self.config.retry_failed:
await self._retry_failed(batch, result)
except asyncio.CancelledError:
break
except Exception as e:
# Log error but continue processing
print(f"Batch processing error: {e}")
await asyncio.sleep(1)
async def _retry_failed(
self,
batch: list[BatchRequest],
result: ExecutionResult
):
"""Retry failed requests."""
for response in result.responses:
if not response.success:
request = next(
(r for r in batch if r.id == response.request_id),
None
)
if request:
retry_count = request.metadata.get("retry_count", 0)
if retry_count < self.config.max_retries:
# Re-queue with incremented retry count
new_request = BatchRequest(
prompt=request.prompt,
messages=request.messages,
model=request.model,
priority=request.priority + 1, # Higher priority
metadata={
**request.metadata,
"retry_count": retry_count + 1
}
)
new_request.future = request.future
await self._queue.add(new_request)
@property
def stats(self) -> dict:
"""Get processor statistics."""
return {
"queue_size": self._queue._requests.__len__() if hasattr(self._queue, '_requests') else 0,
"total_processed": self._total_processed,
"total_failed": self._total_failed,
"success_rate": self._total_processed / (self._total_processed + self._total_failed) if (self._total_processed + self._total_failed) > 0 else 0
}
OpenAI Batch API
from dataclasses import dataclass
from typing import Any, Optional
import json
import asyncio
@dataclass
class BatchJob:
"""OpenAI batch job."""
id: str
status: str
input_file_id: str
output_file_id: str = None
error_file_id: str = None
created_at: int = 0
completed_at: int = None
request_counts: dict = None
class OpenAIBatchClient:
"""Client for OpenAI Batch API."""
def __init__(self, client: Any):
self.client = client
async def create_batch(
self,
requests: list[dict],
description: str = None
) -> BatchJob:
"""Create a batch job."""
# Format requests for batch API
batch_requests = []
for i, req in enumerate(requests):
batch_requests.append({
"custom_id": req.get("custom_id", f"request-{i}"),
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": req.get("model", "gpt-4o-mini"),
"messages": req.get("messages", []),
"max_tokens": req.get("max_tokens", 1000)
}
})
# Create JSONL content
jsonl_content = "\n".join(json.dumps(r) for r in batch_requests)
# Upload input file
input_file = await self.client.files.create(
file=jsonl_content.encode(),
purpose="batch"
)
# Create batch
batch = await self.client.batches.create(
input_file_id=input_file.id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"description": description} if description else None
)
return BatchJob(
id=batch.id,
status=batch.status,
input_file_id=input_file.id,
created_at=batch.created_at
)
async def get_batch_status(self, batch_id: str) -> BatchJob:
"""Get batch job status."""
batch = await self.client.batches.retrieve(batch_id)
return BatchJob(
id=batch.id,
status=batch.status,
input_file_id=batch.input_file_id,
output_file_id=batch.output_file_id,
error_file_id=batch.error_file_id,
created_at=batch.created_at,
completed_at=batch.completed_at,
request_counts=batch.request_counts
)
async def wait_for_completion(
self,
batch_id: str,
poll_interval: int = 60
) -> BatchJob:
"""Wait for batch to complete."""
while True:
job = await self.get_batch_status(batch_id)
if job.status in ["completed", "failed", "expired", "cancelled"]:
return job
await asyncio.sleep(poll_interval)
async def get_results(self, batch_id: str) -> list[dict]:
"""Get batch results."""
job = await self.get_batch_status(batch_id)
if job.status != "completed":
raise RuntimeError(f"Batch not completed: {job.status}")
if not job.output_file_id:
return []
# Download output file
content = await self.client.files.content(job.output_file_id)
# Parse JSONL
results = []
for line in content.text.split("\n"):
if line.strip():
results.append(json.loads(line))
return results
async def cancel_batch(self, batch_id: str) -> BatchJob:
"""Cancel a batch job."""
batch = await self.client.batches.cancel(batch_id)
return BatchJob(
id=batch.id,
status=batch.status,
input_file_id=batch.input_file_id
)
class BatchAPIProcessor:
"""Process large batches using OpenAI Batch API."""
def __init__(self, client: Any):
self.batch_client = OpenAIBatchClient(client)
async def process_batch(
self,
requests: list[dict],
wait_for_completion: bool = True
) -> dict:
"""Process batch of requests."""
# Create batch job
job = await self.batch_client.create_batch(requests)
if not wait_for_completion:
return {"job_id": job.id, "status": job.status}
# Wait for completion
completed_job = await self.batch_client.wait_for_completion(job.id)
if completed_job.status != "completed":
return {
"job_id": job.id,
"status": completed_job.status,
"error": "Batch did not complete successfully"
}
# Get results
results = await self.batch_client.get_results(job.id)
return {
"job_id": job.id,
"status": "completed",
"results": results,
"request_counts": completed_job.request_counts
}
Production Batching Service
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
app = FastAPI()
# Initialize components
batch_processor = None # Initialize with client
openai_batch_client = None # Initialize with client
class SubmitRequest(BaseModel):
prompt: Optional[str] = None
messages: Optional[list[dict]] = None
model: str = "gpt-4o-mini"
priority: int = 0
class BatchSubmitRequest(BaseModel):
requests: list[dict]
use_batch_api: bool = False
@app.on_event("startup")
async def startup():
"""Start batch processor on startup."""
await batch_processor.start()
@app.on_event("shutdown")
async def shutdown():
"""Stop batch processor on shutdown."""
await batch_processor.stop()
@app.post("/v1/submit")
async def submit_request(request: SubmitRequest):
"""Submit a single request for batch processing."""
response = await batch_processor.submit(
prompt=request.prompt,
messages=request.messages,
model=request.model,
priority=request.priority
)
return {
"request_id": response.request_id,
"content": response.content,
"success": response.success,
"error": response.error,
"latency_ms": response.latency_ms
}
@app.post("/v1/batch/submit")
async def submit_batch(request: BatchSubmitRequest):
"""Submit multiple requests."""
if request.use_batch_api:
# Use OpenAI Batch API for large batches
result = await openai_batch_client.process_batch(
request.requests,
wait_for_completion=False
)
return result
# Use real-time batching
tasks = [
batch_processor.submit(
prompt=req.get("prompt"),
messages=req.get("messages"),
model=req.get("model", "gpt-4o-mini")
)
for req in request.requests
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
results.append({
"index": i,
"success": False,
"error": str(response)
})
else:
results.append({
"index": i,
"success": response.success,
"content": response.content,
"error": response.error
})
return {"results": results}
@app.get("/v1/batch/{job_id}")
async def get_batch_status(job_id: str):
"""Get batch job status."""
job = await openai_batch_client.get_batch_status(job_id)
return {
"job_id": job.id,
"status": job.status,
"request_counts": job.request_counts
}
@app.get("/v1/batch/{job_id}/results")
async def get_batch_results(job_id: str):
"""Get batch job results."""
try:
results = await openai_batch_client.get_results(job_id)
return {"results": results}
except RuntimeError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/v1/stats")
async def get_stats():
"""Get processor statistics."""
return batch_processor.stats
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Batch API: https://platform.openai.com/docs/guides/batch
- asyncio Documentation: https://docs.python.org/3/library/asyncio.html
- Rate Limiting Patterns: https://cloud.google.com/architecture/rate-limiting-strategies-techniques
- Token Bucket Algorithm: https://en.wikipedia.org/wiki/Token_bucket
Conclusion
Request batching is essential for high-throughput LLM applications. For real-time use cases, implement timed batch queues that collect requests and process them together with parallel execution. Use rate limiters to stay within API limits while maximizing throughput. Implement adaptive concurrency that backs off when errors increase and scales up when the API is healthy. For large offline workloads, use OpenAI's Batch API which offers 50% cost savings and higher rate limits in exchange for longer processing times. The key insight is that batching trades latency for throughput—individual requests may wait slightly longer, but overall system capacity increases dramatically. Design your batching strategy based on your latency requirements: aggressive batching for background processing, lighter batching for interactive applications. Monitor queue depths and processing times to tune batch sizes and concurrency for your specific workload patterns.
