Batch Processing for LLMs: Maximizing Throughput with Async Execution and Rate Limiting

Introduction: Processing thousands of LLM requests efficiently requires batch processing strategies that maximize throughput while respecting rate limits and managing costs. Individual API calls are inefficient for bulk operations—batch processing enables parallel execution, request queuing, and optimized resource utilization. This guide covers practical batch processing patterns: async concurrent execution, request queuing with backpressure, rate-limited batch schedulers, result aggregation, and production-ready batch pipelines for processing large datasets through LLM APIs.

Batch Processing
Batch Processing: Queue, Scheduler, Parallel Executor

Async Concurrent Execution

from dataclasses import dataclass
from typing import Any, Callable, Optional
import asyncio
from datetime import datetime

@dataclass
class BatchRequest:
    """A single request in a batch."""
    
    id: str
    messages: list[dict]
    metadata: dict = None
    
    def __post_init__(self):
        self.metadata = self.metadata or {}

@dataclass
class BatchResult:
    """Result of a batch request."""
    
    id: str
    content: Optional[str] = None
    error: Optional[str] = None
    latency_ms: float = 0.0
    
    @property
    def success(self) -> bool:
        return self.error is None

class AsyncBatchProcessor:
    """Process batches with async concurrency."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini",
        max_concurrency: int = 10
    ):
        self.client = client
        self.model = model
        self.semaphore = asyncio.Semaphore(max_concurrency)
    
    async def process_batch(
        self,
        requests: list[BatchRequest],
        **kwargs
    ) -> list[BatchResult]:
        """Process all requests concurrently."""
        
        tasks = [
            self._process_single(request, **kwargs)
            for request in requests
        ]
        
        return await asyncio.gather(*tasks)
    
    async def _process_single(
        self,
        request: BatchRequest,
        **kwargs
    ) -> BatchResult:
        """Process a single request with semaphore."""
        
        async with self.semaphore:
            start = datetime.now()
            
            try:
                response = await self.client.chat.completions.create(
                    model=self.model,
                    messages=request.messages,
                    **kwargs
                )
                
                latency = (datetime.now() - start).total_seconds() * 1000
                
                return BatchResult(
                    id=request.id,
                    content=response.choices[0].message.content,
                    latency_ms=latency
                )
            
            except Exception as e:
                latency = (datetime.now() - start).total_seconds() * 1000
                
                return BatchResult(
                    id=request.id,
                    error=str(e),
                    latency_ms=latency
                )

class ChunkedBatchProcessor:
    """Process large batches in chunks."""
    
    def __init__(
        self,
        processor: AsyncBatchProcessor,
        chunk_size: int = 100
    ):
        self.processor = processor
        self.chunk_size = chunk_size
    
    async def process_batch(
        self,
        requests: list[BatchRequest],
        progress_callback: Callable[[int, int], None] = None,
        **kwargs
    ) -> list[BatchResult]:
        """Process in chunks with progress tracking."""
        
        all_results = []
        total = len(requests)
        
        for i in range(0, total, self.chunk_size):
            chunk = requests[i:i + self.chunk_size]
            
            results = await self.processor.process_batch(chunk, **kwargs)
            all_results.extend(results)
            
            if progress_callback:
                progress_callback(len(all_results), total)
        
        return all_results

# Streaming batch processor
class StreamingBatchProcessor:
    """Process batch with streaming results."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini",
        max_concurrency: int = 10
    ):
        self.client = client
        self.model = model
        self.semaphore = asyncio.Semaphore(max_concurrency)
    
    async def process_batch_stream(
        self,
        requests: list[BatchRequest],
        **kwargs
    ):
        """Yield results as they complete."""
        
        async def process_and_yield(request: BatchRequest):
            async with self.semaphore:
                start = datetime.now()
                
                try:
                    response = await self.client.chat.completions.create(
                        model=self.model,
                        messages=request.messages,
                        **kwargs
                    )
                    
                    latency = (datetime.now() - start).total_seconds() * 1000
                    
                    return BatchResult(
                        id=request.id,
                        content=response.choices[0].message.content,
                        latency_ms=latency
                    )
                
                except Exception as e:
                    latency = (datetime.now() - start).total_seconds() * 1000
                    
                    return BatchResult(
                        id=request.id,
                        error=str(e),
                        latency_ms=latency
                    )
        
        # Create tasks
        tasks = [
            asyncio.create_task(process_and_yield(req))
            for req in requests
        ]
        
        # Yield as completed
        for coro in asyncio.as_completed(tasks):
            result = await coro
            yield result

Rate-Limited Batch Scheduler

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime, timedelta
import asyncio
from collections import deque

@dataclass
class RateLimitConfig:
    """Rate limit configuration."""
    
    requests_per_minute: int = 60
    tokens_per_minute: int = 100000
    
    @property
    def request_interval(self) -> float:
        """Minimum seconds between requests."""
        return 60.0 / self.requests_per_minute

class TokenBucket:
    """Token bucket rate limiter."""
    
    def __init__(
        self,
        capacity: int,
        refill_rate: float  # tokens per second
    ):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = datetime.now()
    
    def _refill(self):
        """Refill tokens based on elapsed time."""
        
        now = datetime.now()
        elapsed = (now - self.last_refill).total_seconds()
        
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.refill_rate
        )
        self.last_refill = now
    
    async def acquire(self, tokens: int = 1) -> float:
        """Acquire tokens, return wait time."""
        
        self._refill()
        
        if self.tokens >= tokens:
            self.tokens -= tokens
            return 0.0
        
        # Calculate wait time
        needed = tokens - self.tokens
        wait_time = needed / self.refill_rate
        
        await asyncio.sleep(wait_time)
        
        self._refill()
        self.tokens -= tokens
        return wait_time

class RateLimitedBatchProcessor:
    """Batch processor with rate limiting."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini",
        config: RateLimitConfig = None
    ):
        self.client = client
        self.model = model
        self.config = config or RateLimitConfig()
        
        # Request rate limiter
        self.request_bucket = TokenBucket(
            capacity=self.config.requests_per_minute,
            refill_rate=self.config.requests_per_minute / 60.0
        )
        
        # Token rate limiter
        self.token_bucket = TokenBucket(
            capacity=self.config.tokens_per_minute,
            refill_rate=self.config.tokens_per_minute / 60.0
        )
    
    async def process_batch(
        self,
        requests: list[BatchRequest],
        max_tokens: int = 1000,
        **kwargs
    ) -> list[BatchResult]:
        """Process batch with rate limiting."""
        
        results = []
        
        for request in requests:
            # Wait for rate limits
            await self.request_bucket.acquire(1)
            await self.token_bucket.acquire(max_tokens)
            
            result = await self._process_single(request, max_tokens=max_tokens, **kwargs)
            results.append(result)
        
        return results
    
    async def _process_single(
        self,
        request: BatchRequest,
        **kwargs
    ) -> BatchResult:
        """Process single request."""
        
        start = datetime.now()
        
        try:
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=request.messages,
                **kwargs
            )
            
            latency = (datetime.now() - start).total_seconds() * 1000
            
            return BatchResult(
                id=request.id,
                content=response.choices[0].message.content,
                latency_ms=latency
            )
        
        except Exception as e:
            latency = (datetime.now() - start).total_seconds() * 1000
            
            return BatchResult(
                id=request.id,
                error=str(e),
                latency_ms=latency
            )

# Adaptive rate limiter
class AdaptiveRateLimiter:
    """Adjust rate based on API responses."""
    
    def __init__(self, initial_rpm: int = 60):
        self.current_rpm = initial_rpm
        self.min_rpm = 10
        self.max_rpm = 1000
        self.consecutive_successes = 0
        self.consecutive_failures = 0
    
    def record_success(self):
        """Record successful request."""
        
        self.consecutive_successes += 1
        self.consecutive_failures = 0
        
        # Increase rate after 10 consecutive successes
        if self.consecutive_successes >= 10:
            self.current_rpm = min(
                self.max_rpm,
                int(self.current_rpm * 1.2)
            )
            self.consecutive_successes = 0
    
    def record_rate_limit(self):
        """Record rate limit error."""
        
        self.consecutive_failures += 1
        self.consecutive_successes = 0
        
        # Decrease rate immediately
        self.current_rpm = max(
            self.min_rpm,
            int(self.current_rpm * 0.5)
        )
    
    @property
    def request_interval(self) -> float:
        return 60.0 / self.current_rpm

Request Queue with Backpressure

from dataclasses import dataclass
from typing import Any, Callable, Optional
import asyncio
from enum import Enum

class QueueStatus(Enum):
    ACCEPTING = "accepting"
    BACKPRESSURE = "backpressure"
    STOPPED = "stopped"

@dataclass
class QueuedRequest:
    """A request in the queue."""
    
    request: BatchRequest
    future: asyncio.Future
    priority: int = 0
    submitted_at: datetime = field(default_factory=datetime.now)

class BatchQueue:
    """Queue with backpressure for batch processing."""
    
    def __init__(
        self,
        processor: AsyncBatchProcessor,
        max_queue_size: int = 1000,
        batch_size: int = 10,
        batch_timeout: float = 1.0
    ):
        self.processor = processor
        self.max_queue_size = max_queue_size
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        
        self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
        self.status = QueueStatus.ACCEPTING
        self.current_size = 0
        
        self._worker_task = None
    
    async def start(self):
        """Start the queue worker."""
        
        self._worker_task = asyncio.create_task(self._worker())
    
    async def stop(self):
        """Stop the queue worker."""
        
        self.status = QueueStatus.STOPPED
        
        if self._worker_task:
            self._worker_task.cancel()
            try:
                await self._worker_task
            except asyncio.CancelledError:
                pass
    
    async def submit(
        self,
        request: BatchRequest,
        priority: int = 0
    ) -> asyncio.Future:
        """Submit request to queue."""
        
        if self.status == QueueStatus.STOPPED:
            raise Exception("Queue is stopped")
        
        if self.current_size >= self.max_queue_size:
            self.status = QueueStatus.BACKPRESSURE
            raise Exception("Queue is full - backpressure")
        
        future = asyncio.Future()
        
        queued = QueuedRequest(
            request=request,
            future=future,
            priority=priority
        )
        
        # Priority queue uses (priority, item) tuples
        await self.queue.put((priority, queued))
        self.current_size += 1
        
        return future
    
    async def _worker(self):
        """Process queue in batches."""
        
        while self.status != QueueStatus.STOPPED:
            batch = await self._collect_batch()
            
            if not batch:
                continue
            
            # Process batch
            requests = [q.request for q in batch]
            results = await self.processor.process_batch(requests)
            
            # Resolve futures
            for queued, result in zip(batch, results):
                if result.success:
                    queued.future.set_result(result)
                else:
                    queued.future.set_exception(Exception(result.error))
            
            self.current_size -= len(batch)
            
            # Check if we can accept again
            if self.current_size < self.max_queue_size * 0.8:
                self.status = QueueStatus.ACCEPTING
    
    async def _collect_batch(self) -> list[QueuedRequest]:
        """Collect a batch of requests."""
        
        batch = []
        deadline = datetime.now() + timedelta(seconds=self.batch_timeout)
        
        while len(batch) < self.batch_size:
            try:
                remaining = (deadline - datetime.now()).total_seconds()
                
                if remaining <= 0:
                    break
                
                _, queued = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=remaining
                )
                batch.append(queued)
            
            except asyncio.TimeoutError:
                break
        
        return batch
    
    def get_stats(self) -> dict:
        """Get queue statistics."""
        
        return {
            "status": self.status.value,
            "current_size": self.current_size,
            "max_size": self.max_queue_size,
            "utilization": self.current_size / self.max_queue_size
        }

# Priority queue processor
class PriorityBatchQueue(BatchQueue):
    """Queue with priority levels."""
    
    PRIORITY_HIGH = 0
    PRIORITY_NORMAL = 1
    PRIORITY_LOW = 2
    
    async def submit_high_priority(self, request: BatchRequest) -> asyncio.Future:
        return await self.submit(request, priority=self.PRIORITY_HIGH)
    
    async def submit_low_priority(self, request: BatchRequest) -> asyncio.Future:
        return await self.submit(request, priority=self.PRIORITY_LOW)

Result Aggregation

from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from datetime import datetime

@dataclass
class BatchStats:
    """Statistics for a batch run."""
    
    total_requests: int = 0
    successful: int = 0
    failed: int = 0
    total_latency_ms: float = 0.0
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    
    @property
    def success_rate(self) -> float:
        if self.total_requests == 0:
            return 0.0
        return self.successful / self.total_requests
    
    @property
    def avg_latency_ms(self) -> float:
        if self.successful == 0:
            return 0.0
        return self.total_latency_ms / self.successful
    
    @property
    def duration_seconds(self) -> float:
        if not self.start_time or not self.end_time:
            return 0.0
        return (self.end_time - self.start_time).total_seconds()
    
    @property
    def throughput(self) -> float:
        """Requests per second."""
        if self.duration_seconds == 0:
            return 0.0
        return self.total_requests / self.duration_seconds

class BatchResultAggregator:
    """Aggregate and analyze batch results."""
    
    def __init__(self):
        self.results: list[BatchResult] = []
        self.stats = BatchStats()
    
    def start_batch(self):
        """Mark batch start."""
        self.stats.start_time = datetime.now()
    
    def add_result(self, result: BatchResult):
        """Add a result to aggregation."""
        
        self.results.append(result)
        self.stats.total_requests += 1
        
        if result.success:
            self.stats.successful += 1
            self.stats.total_latency_ms += result.latency_ms
        else:
            self.stats.failed += 1
    
    def add_results(self, results: list[BatchResult]):
        """Add multiple results."""
        for result in results:
            self.add_result(result)
    
    def end_batch(self):
        """Mark batch end."""
        self.stats.end_time = datetime.now()
    
    def get_successful(self) -> list[BatchResult]:
        """Get successful results."""
        return [r for r in self.results if r.success]
    
    def get_failed(self) -> list[BatchResult]:
        """Get failed results."""
        return [r for r in self.results if not r.success]
    
    def get_summary(self) -> dict:
        """Get summary of batch run."""
        
        return {
            "total_requests": self.stats.total_requests,
            "successful": self.stats.successful,
            "failed": self.stats.failed,
            "success_rate": f"{self.stats.success_rate:.2%}",
            "avg_latency_ms": f"{self.stats.avg_latency_ms:.2f}",
            "duration_seconds": f"{self.stats.duration_seconds:.2f}",
            "throughput_rps": f"{self.stats.throughput:.2f}"
        }
    
    def export_results(self, format: str = "json") -> str:
        """Export results to format."""
        
        if format == "json":
            import json
            return json.dumps([
                {
                    "id": r.id,
                    "content": r.content,
                    "error": r.error,
                    "latency_ms": r.latency_ms
                }
                for r in self.results
            ], indent=2)
        
        elif format == "csv":
            lines = ["id,content,error,latency_ms"]
            for r in self.results:
                content = (r.content or "").replace('"', '""')
                error = (r.error or "").replace('"', '""')
                lines.append(f'"{r.id}","{content}","{error}",{r.latency_ms}')
            return "\n".join(lines)
        
        raise ValueError(f"Unknown format: {format}")

# Retry failed results
class BatchRetryHandler:
    """Handle retries for failed batch results."""
    
    def __init__(
        self,
        processor: AsyncBatchProcessor,
        max_retries: int = 3
    ):
        self.processor = processor
        self.max_retries = max_retries
    
    async def retry_failed(
        self,
        original_requests: list[BatchRequest],
        results: list[BatchResult],
        **kwargs
    ) -> list[BatchResult]:
        """Retry failed requests."""
        
        # Build request map
        request_map = {r.id: r for r in original_requests}
        
        # Identify failed
        failed_ids = [r.id for r in results if not r.success]
        
        if not failed_ids:
            return results
        
        # Retry
        retry_requests = [request_map[id] for id in failed_ids]
        retry_results = await self.processor.process_batch(retry_requests, **kwargs)
        
        # Merge results
        result_map = {r.id: r for r in results}
        for retry_result in retry_results:
            if retry_result.success:
                result_map[retry_result.id] = retry_result
        
        return list(result_map.values())

Production Batch Service

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import uuid

app = FastAPI()

# Initialize components
processor = None  # Initialize with actual client
queue = None
aggregator = BatchResultAggregator()

# In-memory job storage
jobs: dict[str, dict] = {}

class BatchSubmitRequest(BaseModel):
    requests: list[dict]  # List of {id, messages}
    max_tokens: Optional[int] = 1000
    temperature: Optional[float] = 0.7

class JobStatus(BaseModel):
    job_id: str
    status: str
    total: int
    completed: int
    failed: int

@app.post("/v1/batch/submit")
async def submit_batch(
    request: BatchSubmitRequest,
    background_tasks: BackgroundTasks
):
    """Submit a batch job."""
    
    job_id = str(uuid.uuid4())
    
    # Convert to BatchRequest objects
    batch_requests = [
        BatchRequest(
            id=r.get("id", str(uuid.uuid4())),
            messages=r["messages"]
        )
        for r in request.requests
    ]
    
    # Initialize job
    jobs[job_id] = {
        "status": "pending",
        "total": len(batch_requests),
        "completed": 0,
        "failed": 0,
        "results": []
    }
    
    # Process in background
    background_tasks.add_task(
        process_batch_job,
        job_id,
        batch_requests,
        request.max_tokens,
        request.temperature
    )
    
    return {"job_id": job_id, "status": "submitted"}

async def process_batch_job(
    job_id: str,
    requests: list[BatchRequest],
    max_tokens: int,
    temperature: float
):
    """Process batch job in background."""
    
    jobs[job_id]["status"] = "processing"
    
    try:
        results = await processor.process_batch(
            requests,
            max_tokens=max_tokens,
            temperature=temperature
        )
        
        jobs[job_id]["results"] = results
        jobs[job_id]["completed"] = sum(1 for r in results if r.success)
        jobs[job_id]["failed"] = sum(1 for r in results if not r.success)
        jobs[job_id]["status"] = "completed"
    
    except Exception as e:
        jobs[job_id]["status"] = "failed"
        jobs[job_id]["error"] = str(e)

@app.get("/v1/batch/{job_id}/status")
async def get_job_status(job_id: str) -> JobStatus:
    """Get job status."""
    
    if job_id not in jobs:
        raise HTTPException(404, "Job not found")
    
    job = jobs[job_id]
    
    return JobStatus(
        job_id=job_id,
        status=job["status"],
        total=job["total"],
        completed=job["completed"],
        failed=job["failed"]
    )

@app.get("/v1/batch/{job_id}/results")
async def get_job_results(job_id: str):
    """Get job results."""
    
    if job_id not in jobs:
        raise HTTPException(404, "Job not found")
    
    job = jobs[job_id]
    
    if job["status"] != "completed":
        raise HTTPException(400, f"Job not completed: {job['status']}")
    
    return {
        "job_id": job_id,
        "results": [
            {
                "id": r.id,
                "content": r.content,
                "error": r.error,
                "latency_ms": r.latency_ms
            }
            for r in job["results"]
        ]
    }

@app.get("/v1/queue/stats")
async def get_queue_stats():
    """Get queue statistics."""
    
    if queue:
        return queue.get_stats()
    return {"status": "queue not initialized"}

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Batch processing transforms LLM API usage from inefficient sequential calls to optimized parallel execution. Use async concurrency with semaphores to control parallelism without overwhelming the API. Implement rate limiting with token buckets to stay within provider limits while maximizing throughput. Add request queues with backpressure to handle traffic spikes gracefully. Track batch statistics for monitoring and optimization—success rates, latencies, and throughput metrics reveal bottlenecks. For large-scale processing, use chunked execution with progress tracking and automatic retry of failed requests. The goal is predictable, efficient processing that scales with your workload while respecting API constraints.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.