Introduction: Processing thousands of LLM requests efficiently requires batch processing strategies that maximize throughput while respecting rate limits and managing costs. Individual API calls are inefficient for bulk operations—batch processing enables parallel execution, request queuing, and optimized resource utilization. This guide covers practical batch processing patterns: async concurrent execution, request queuing with backpressure, rate-limited batch schedulers, result aggregation, and production-ready batch pipelines for processing large datasets through LLM APIs.

Async Concurrent Execution
from dataclasses import dataclass
from typing import Any, Callable, Optional
import asyncio
from datetime import datetime
@dataclass
class BatchRequest:
"""A single request in a batch."""
id: str
messages: list[dict]
metadata: dict = None
def __post_init__(self):
self.metadata = self.metadata or {}
@dataclass
class BatchResult:
"""Result of a batch request."""
id: str
content: Optional[str] = None
error: Optional[str] = None
latency_ms: float = 0.0
@property
def success(self) -> bool:
return self.error is None
class AsyncBatchProcessor:
"""Process batches with async concurrency."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini",
max_concurrency: int = 10
):
self.client = client
self.model = model
self.semaphore = asyncio.Semaphore(max_concurrency)
async def process_batch(
self,
requests: list[BatchRequest],
**kwargs
) -> list[BatchResult]:
"""Process all requests concurrently."""
tasks = [
self._process_single(request, **kwargs)
for request in requests
]
return await asyncio.gather(*tasks)
async def _process_single(
self,
request: BatchRequest,
**kwargs
) -> BatchResult:
"""Process a single request with semaphore."""
async with self.semaphore:
start = datetime.now()
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=request.messages,
**kwargs
)
latency = (datetime.now() - start).total_seconds() * 1000
return BatchResult(
id=request.id,
content=response.choices[0].message.content,
latency_ms=latency
)
except Exception as e:
latency = (datetime.now() - start).total_seconds() * 1000
return BatchResult(
id=request.id,
error=str(e),
latency_ms=latency
)
class ChunkedBatchProcessor:
"""Process large batches in chunks."""
def __init__(
self,
processor: AsyncBatchProcessor,
chunk_size: int = 100
):
self.processor = processor
self.chunk_size = chunk_size
async def process_batch(
self,
requests: list[BatchRequest],
progress_callback: Callable[[int, int], None] = None,
**kwargs
) -> list[BatchResult]:
"""Process in chunks with progress tracking."""
all_results = []
total = len(requests)
for i in range(0, total, self.chunk_size):
chunk = requests[i:i + self.chunk_size]
results = await self.processor.process_batch(chunk, **kwargs)
all_results.extend(results)
if progress_callback:
progress_callback(len(all_results), total)
return all_results
# Streaming batch processor
class StreamingBatchProcessor:
"""Process batch with streaming results."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini",
max_concurrency: int = 10
):
self.client = client
self.model = model
self.semaphore = asyncio.Semaphore(max_concurrency)
async def process_batch_stream(
self,
requests: list[BatchRequest],
**kwargs
):
"""Yield results as they complete."""
async def process_and_yield(request: BatchRequest):
async with self.semaphore:
start = datetime.now()
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=request.messages,
**kwargs
)
latency = (datetime.now() - start).total_seconds() * 1000
return BatchResult(
id=request.id,
content=response.choices[0].message.content,
latency_ms=latency
)
except Exception as e:
latency = (datetime.now() - start).total_seconds() * 1000
return BatchResult(
id=request.id,
error=str(e),
latency_ms=latency
)
# Create tasks
tasks = [
asyncio.create_task(process_and_yield(req))
for req in requests
]
# Yield as completed
for coro in asyncio.as_completed(tasks):
result = await coro
yield result
Rate-Limited Batch Scheduler
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime, timedelta
import asyncio
from collections import deque
@dataclass
class RateLimitConfig:
"""Rate limit configuration."""
requests_per_minute: int = 60
tokens_per_minute: int = 100000
@property
def request_interval(self) -> float:
"""Minimum seconds between requests."""
return 60.0 / self.requests_per_minute
class TokenBucket:
"""Token bucket rate limiter."""
def __init__(
self,
capacity: int,
refill_rate: float # tokens per second
):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = datetime.now()
def _refill(self):
"""Refill tokens based on elapsed time."""
now = datetime.now()
elapsed = (now - self.last_refill).total_seconds()
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.refill_rate
)
self.last_refill = now
async def acquire(self, tokens: int = 1) -> float:
"""Acquire tokens, return wait time."""
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return 0.0
# Calculate wait time
needed = tokens - self.tokens
wait_time = needed / self.refill_rate
await asyncio.sleep(wait_time)
self._refill()
self.tokens -= tokens
return wait_time
class RateLimitedBatchProcessor:
"""Batch processor with rate limiting."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini",
config: RateLimitConfig = None
):
self.client = client
self.model = model
self.config = config or RateLimitConfig()
# Request rate limiter
self.request_bucket = TokenBucket(
capacity=self.config.requests_per_minute,
refill_rate=self.config.requests_per_minute / 60.0
)
# Token rate limiter
self.token_bucket = TokenBucket(
capacity=self.config.tokens_per_minute,
refill_rate=self.config.tokens_per_minute / 60.0
)
async def process_batch(
self,
requests: list[BatchRequest],
max_tokens: int = 1000,
**kwargs
) -> list[BatchResult]:
"""Process batch with rate limiting."""
results = []
for request in requests:
# Wait for rate limits
await self.request_bucket.acquire(1)
await self.token_bucket.acquire(max_tokens)
result = await self._process_single(request, max_tokens=max_tokens, **kwargs)
results.append(result)
return results
async def _process_single(
self,
request: BatchRequest,
**kwargs
) -> BatchResult:
"""Process single request."""
start = datetime.now()
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=request.messages,
**kwargs
)
latency = (datetime.now() - start).total_seconds() * 1000
return BatchResult(
id=request.id,
content=response.choices[0].message.content,
latency_ms=latency
)
except Exception as e:
latency = (datetime.now() - start).total_seconds() * 1000
return BatchResult(
id=request.id,
error=str(e),
latency_ms=latency
)
# Adaptive rate limiter
class AdaptiveRateLimiter:
"""Adjust rate based on API responses."""
def __init__(self, initial_rpm: int = 60):
self.current_rpm = initial_rpm
self.min_rpm = 10
self.max_rpm = 1000
self.consecutive_successes = 0
self.consecutive_failures = 0
def record_success(self):
"""Record successful request."""
self.consecutive_successes += 1
self.consecutive_failures = 0
# Increase rate after 10 consecutive successes
if self.consecutive_successes >= 10:
self.current_rpm = min(
self.max_rpm,
int(self.current_rpm * 1.2)
)
self.consecutive_successes = 0
def record_rate_limit(self):
"""Record rate limit error."""
self.consecutive_failures += 1
self.consecutive_successes = 0
# Decrease rate immediately
self.current_rpm = max(
self.min_rpm,
int(self.current_rpm * 0.5)
)
@property
def request_interval(self) -> float:
return 60.0 / self.current_rpm
Request Queue with Backpressure
from dataclasses import dataclass
from typing import Any, Callable, Optional
import asyncio
from enum import Enum
class QueueStatus(Enum):
ACCEPTING = "accepting"
BACKPRESSURE = "backpressure"
STOPPED = "stopped"
@dataclass
class QueuedRequest:
"""A request in the queue."""
request: BatchRequest
future: asyncio.Future
priority: int = 0
submitted_at: datetime = field(default_factory=datetime.now)
class BatchQueue:
"""Queue with backpressure for batch processing."""
def __init__(
self,
processor: AsyncBatchProcessor,
max_queue_size: int = 1000,
batch_size: int = 10,
batch_timeout: float = 1.0
):
self.processor = processor
self.max_queue_size = max_queue_size
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self.status = QueueStatus.ACCEPTING
self.current_size = 0
self._worker_task = None
async def start(self):
"""Start the queue worker."""
self._worker_task = asyncio.create_task(self._worker())
async def stop(self):
"""Stop the queue worker."""
self.status = QueueStatus.STOPPED
if self._worker_task:
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
async def submit(
self,
request: BatchRequest,
priority: int = 0
) -> asyncio.Future:
"""Submit request to queue."""
if self.status == QueueStatus.STOPPED:
raise Exception("Queue is stopped")
if self.current_size >= self.max_queue_size:
self.status = QueueStatus.BACKPRESSURE
raise Exception("Queue is full - backpressure")
future = asyncio.Future()
queued = QueuedRequest(
request=request,
future=future,
priority=priority
)
# Priority queue uses (priority, item) tuples
await self.queue.put((priority, queued))
self.current_size += 1
return future
async def _worker(self):
"""Process queue in batches."""
while self.status != QueueStatus.STOPPED:
batch = await self._collect_batch()
if not batch:
continue
# Process batch
requests = [q.request for q in batch]
results = await self.processor.process_batch(requests)
# Resolve futures
for queued, result in zip(batch, results):
if result.success:
queued.future.set_result(result)
else:
queued.future.set_exception(Exception(result.error))
self.current_size -= len(batch)
# Check if we can accept again
if self.current_size < self.max_queue_size * 0.8:
self.status = QueueStatus.ACCEPTING
async def _collect_batch(self) -> list[QueuedRequest]:
"""Collect a batch of requests."""
batch = []
deadline = datetime.now() + timedelta(seconds=self.batch_timeout)
while len(batch) < self.batch_size:
try:
remaining = (deadline - datetime.now()).total_seconds()
if remaining <= 0:
break
_, queued = await asyncio.wait_for(
self.queue.get(),
timeout=remaining
)
batch.append(queued)
except asyncio.TimeoutError:
break
return batch
def get_stats(self) -> dict:
"""Get queue statistics."""
return {
"status": self.status.value,
"current_size": self.current_size,
"max_size": self.max_queue_size,
"utilization": self.current_size / self.max_queue_size
}
# Priority queue processor
class PriorityBatchQueue(BatchQueue):
"""Queue with priority levels."""
PRIORITY_HIGH = 0
PRIORITY_NORMAL = 1
PRIORITY_LOW = 2
async def submit_high_priority(self, request: BatchRequest) -> asyncio.Future:
return await self.submit(request, priority=self.PRIORITY_HIGH)
async def submit_low_priority(self, request: BatchRequest) -> asyncio.Future:
return await self.submit(request, priority=self.PRIORITY_LOW)
Result Aggregation
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from datetime import datetime
@dataclass
class BatchStats:
"""Statistics for a batch run."""
total_requests: int = 0
successful: int = 0
failed: int = 0
total_latency_ms: float = 0.0
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
@property
def success_rate(self) -> float:
if self.total_requests == 0:
return 0.0
return self.successful / self.total_requests
@property
def avg_latency_ms(self) -> float:
if self.successful == 0:
return 0.0
return self.total_latency_ms / self.successful
@property
def duration_seconds(self) -> float:
if not self.start_time or not self.end_time:
return 0.0
return (self.end_time - self.start_time).total_seconds()
@property
def throughput(self) -> float:
"""Requests per second."""
if self.duration_seconds == 0:
return 0.0
return self.total_requests / self.duration_seconds
class BatchResultAggregator:
"""Aggregate and analyze batch results."""
def __init__(self):
self.results: list[BatchResult] = []
self.stats = BatchStats()
def start_batch(self):
"""Mark batch start."""
self.stats.start_time = datetime.now()
def add_result(self, result: BatchResult):
"""Add a result to aggregation."""
self.results.append(result)
self.stats.total_requests += 1
if result.success:
self.stats.successful += 1
self.stats.total_latency_ms += result.latency_ms
else:
self.stats.failed += 1
def add_results(self, results: list[BatchResult]):
"""Add multiple results."""
for result in results:
self.add_result(result)
def end_batch(self):
"""Mark batch end."""
self.stats.end_time = datetime.now()
def get_successful(self) -> list[BatchResult]:
"""Get successful results."""
return [r for r in self.results if r.success]
def get_failed(self) -> list[BatchResult]:
"""Get failed results."""
return [r for r in self.results if not r.success]
def get_summary(self) -> dict:
"""Get summary of batch run."""
return {
"total_requests": self.stats.total_requests,
"successful": self.stats.successful,
"failed": self.stats.failed,
"success_rate": f"{self.stats.success_rate:.2%}",
"avg_latency_ms": f"{self.stats.avg_latency_ms:.2f}",
"duration_seconds": f"{self.stats.duration_seconds:.2f}",
"throughput_rps": f"{self.stats.throughput:.2f}"
}
def export_results(self, format: str = "json") -> str:
"""Export results to format."""
if format == "json":
import json
return json.dumps([
{
"id": r.id,
"content": r.content,
"error": r.error,
"latency_ms": r.latency_ms
}
for r in self.results
], indent=2)
elif format == "csv":
lines = ["id,content,error,latency_ms"]
for r in self.results:
content = (r.content or "").replace('"', '""')
error = (r.error or "").replace('"', '""')
lines.append(f'"{r.id}","{content}","{error}",{r.latency_ms}')
return "\n".join(lines)
raise ValueError(f"Unknown format: {format}")
# Retry failed results
class BatchRetryHandler:
"""Handle retries for failed batch results."""
def __init__(
self,
processor: AsyncBatchProcessor,
max_retries: int = 3
):
self.processor = processor
self.max_retries = max_retries
async def retry_failed(
self,
original_requests: list[BatchRequest],
results: list[BatchResult],
**kwargs
) -> list[BatchResult]:
"""Retry failed requests."""
# Build request map
request_map = {r.id: r for r in original_requests}
# Identify failed
failed_ids = [r.id for r in results if not r.success]
if not failed_ids:
return results
# Retry
retry_requests = [request_map[id] for id in failed_ids]
retry_results = await self.processor.process_batch(retry_requests, **kwargs)
# Merge results
result_map = {r.id: r for r in results}
for retry_result in retry_results:
if retry_result.success:
result_map[retry_result.id] = retry_result
return list(result_map.values())
Production Batch Service
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import uuid
app = FastAPI()
# Initialize components
processor = None # Initialize with actual client
queue = None
aggregator = BatchResultAggregator()
# In-memory job storage
jobs: dict[str, dict] = {}
class BatchSubmitRequest(BaseModel):
requests: list[dict] # List of {id, messages}
max_tokens: Optional[int] = 1000
temperature: Optional[float] = 0.7
class JobStatus(BaseModel):
job_id: str
status: str
total: int
completed: int
failed: int
@app.post("/v1/batch/submit")
async def submit_batch(
request: BatchSubmitRequest,
background_tasks: BackgroundTasks
):
"""Submit a batch job."""
job_id = str(uuid.uuid4())
# Convert to BatchRequest objects
batch_requests = [
BatchRequest(
id=r.get("id", str(uuid.uuid4())),
messages=r["messages"]
)
for r in request.requests
]
# Initialize job
jobs[job_id] = {
"status": "pending",
"total": len(batch_requests),
"completed": 0,
"failed": 0,
"results": []
}
# Process in background
background_tasks.add_task(
process_batch_job,
job_id,
batch_requests,
request.max_tokens,
request.temperature
)
return {"job_id": job_id, "status": "submitted"}
async def process_batch_job(
job_id: str,
requests: list[BatchRequest],
max_tokens: int,
temperature: float
):
"""Process batch job in background."""
jobs[job_id]["status"] = "processing"
try:
results = await processor.process_batch(
requests,
max_tokens=max_tokens,
temperature=temperature
)
jobs[job_id]["results"] = results
jobs[job_id]["completed"] = sum(1 for r in results if r.success)
jobs[job_id]["failed"] = sum(1 for r in results if not r.success)
jobs[job_id]["status"] = "completed"
except Exception as e:
jobs[job_id]["status"] = "failed"
jobs[job_id]["error"] = str(e)
@app.get("/v1/batch/{job_id}/status")
async def get_job_status(job_id: str) -> JobStatus:
"""Get job status."""
if job_id not in jobs:
raise HTTPException(404, "Job not found")
job = jobs[job_id]
return JobStatus(
job_id=job_id,
status=job["status"],
total=job["total"],
completed=job["completed"],
failed=job["failed"]
)
@app.get("/v1/batch/{job_id}/results")
async def get_job_results(job_id: str):
"""Get job results."""
if job_id not in jobs:
raise HTTPException(404, "Job not found")
job = jobs[job_id]
if job["status"] != "completed":
raise HTTPException(400, f"Job not completed: {job['status']}")
return {
"job_id": job_id,
"results": [
{
"id": r.id,
"content": r.content,
"error": r.error,
"latency_ms": r.latency_ms
}
for r in job["results"]
]
}
@app.get("/v1/queue/stats")
async def get_queue_stats():
"""Get queue statistics."""
if queue:
return queue.get_stats()
return {"status": "queue not initialized"}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Batch API: https://platform.openai.com/docs/guides/batch
- AsyncIO Documentation: https://docs.python.org/3/library/asyncio.html
- Rate Limiting Patterns: https://cloud.google.com/architecture/rate-limiting-strategies-techniques
- Token Bucket Algorithm: https://en.wikipedia.org/wiki/Token_bucket
Conclusion
Batch processing transforms LLM API usage from inefficient sequential calls to optimized parallel execution. Use async concurrency with semaphores to control parallelism without overwhelming the API. Implement rate limiting with token buckets to stay within provider limits while maximizing throughput. Add request queues with backpressure to handle traffic spikes gracefully. Track batch statistics for monitoring and optimization—success rates, latencies, and throughput metrics reveal bottlenecks. For large-scale processing, use chunked execution with progress tracking and automatic retry of failed requests. The goal is predictable, efficient processing that scales with your workload while respecting API constraints.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.