LLM Batch Processing: Scaling AI Workloads from Hundreds to Millions

Introduction: Processing thousands or millions of items through LLMs requires different patterns than single-request applications. Naive sequential processing is too slow, while uncontrolled parallelism hits rate limits and wastes money on retries. This guide covers production batch processing patterns: chunking strategies, parallel execution with rate limiting, progress tracking, checkpoint/resume for long jobs, cost estimation, and handling partial failures gracefully. These patterns apply whether you’re processing documents, generating embeddings, or running classification at scale.

LLM Batch Processing
LLM Batch Processing: From Dataset to Processed Results

Basic Batch Processing

import asyncio
from openai import AsyncOpenAI
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass

T = TypeVar('T')
R = TypeVar('R')

client = AsyncOpenAI()

@dataclass
class BatchResult:
    successful: list[tuple[int, any]]
    failed: list[tuple[int, Exception]]
    
    @property
    def success_rate(self) -> float:
        total = len(self.successful) + len(self.failed)
        return len(self.successful) / total if total > 0 else 0

async def process_batch(
    items: list[T],
    processor: Callable[[T], Awaitable[R]],
    max_concurrent: int = 10
) -> BatchResult:
    """Process items with controlled concurrency."""
    
    semaphore = asyncio.Semaphore(max_concurrent)
    successful = []
    failed = []
    
    async def process_one(idx: int, item: T):
        async with semaphore:
            try:
                result = await processor(item)
                successful.append((idx, result))
            except Exception as e:
                failed.append((idx, e))
    
    tasks = [process_one(i, item) for i, item in enumerate(items)]
    await asyncio.gather(*tasks)
    
    return BatchResult(successful=successful, failed=failed)

# Example: Classify documents
async def classify_document(text: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Classify the document into: tech, business, science, other"},
            {"role": "user", "content": text[:2000]}
        ]
    )
    return response.choices[0].message.content

# Usage
documents = ["Document 1...", "Document 2...", ...]  # 1000 documents

result = await process_batch(documents, classify_document, max_concurrent=20)

print(f"Success rate: {result.success_rate:.1%}")
print(f"Failed: {len(result.failed)}")

Rate-Limited Batch Processor

import time
from collections import deque

class RateLimitedBatchProcessor:
    """Batch processor with token bucket rate limiting."""
    
    def __init__(
        self,
        requests_per_minute: int = 500,
        tokens_per_minute: int = 150000,
        max_concurrent: int = 50
    ):
        self.rpm = requests_per_minute
        self.tpm = tokens_per_minute
        self.max_concurrent = max_concurrent
        
        self.request_times: deque = deque()
        self.token_usage: deque = deque()
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.lock = asyncio.Lock()
    
    async def _wait_for_capacity(self, estimated_tokens: int):
        """Wait until we have capacity for the request."""
        
        while True:
            async with self.lock:
                now = time.time()
                minute_ago = now - 60
                
                # Clean old entries
                while self.request_times and self.request_times[0] < minute_ago:
                    self.request_times.popleft()
                while self.token_usage and self.token_usage[0][0] < minute_ago:
                    self.token_usage.popleft()
                
                # Check capacity
                current_requests = len(self.request_times)
                current_tokens = sum(t[1] for t in self.token_usage)
                
                if current_requests < self.rpm and current_tokens + estimated_tokens < self.tpm:
                    self.request_times.append(now)
                    self.token_usage.append((now, estimated_tokens))
                    return
            
            # Wait and retry
            await asyncio.sleep(0.1)
    
    async def process(
        self,
        items: list[T],
        processor: Callable[[T], Awaitable[R]],
        estimate_tokens: Callable[[T], int] = lambda x: 500
    ) -> BatchResult:
        """Process items with rate limiting."""
        
        successful = []
        failed = []
        
        async def process_one(idx: int, item: T):
            async with self.semaphore:
                tokens = estimate_tokens(item)
                await self._wait_for_capacity(tokens)
                
                try:
                    result = await processor(item)
                    successful.append((idx, result))
                except Exception as e:
                    failed.append((idx, e))
        
        tasks = [process_one(i, item) for i, item in enumerate(items)]
        await asyncio.gather(*tasks)
        
        return BatchResult(successful=successful, failed=failed)

# Usage
processor = RateLimitedBatchProcessor(
    requests_per_minute=500,
    tokens_per_minute=150000,
    max_concurrent=50
)

def estimate_tokens(text: str) -> int:
    return len(text) // 4 + 100  # Rough estimate

result = await processor.process(
    documents,
    classify_document,
    estimate_tokens=estimate_tokens
)

Checkpoint and Resume

import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional

@dataclass
class BatchCheckpoint:
    job_id: str
    total_items: int
    processed_indices: set[int]
    results: dict[int, any]
    errors: dict[int, str]
    
    def save(self, path: Path):
        data = {
            "job_id": self.job_id,
            "total_items": self.total_items,
            "processed_indices": list(self.processed_indices),
            "results": self.results,
            "errors": self.errors
        }
        path.write_text(json.dumps(data))
    
    @classmethod
    def load(cls, path: Path) -> "BatchCheckpoint":
        data = json.loads(path.read_text())
        return cls(
            job_id=data["job_id"],
            total_items=data["total_items"],
            processed_indices=set(data["processed_indices"]),
            results=data["results"],
            errors=data["errors"]
        )

class ResumableBatchProcessor:
    """Batch processor with checkpoint/resume support."""
    
    def __init__(
        self,
        job_id: str,
        checkpoint_dir: Path = Path("./checkpoints"),
        checkpoint_interval: int = 100
    ):
        self.job_id = job_id
        self.checkpoint_dir = checkpoint_dir
        self.checkpoint_interval = checkpoint_interval
        self.checkpoint_dir.mkdir(exist_ok=True)
        
        self.checkpoint_path = checkpoint_dir / f"{job_id}.json"
    
    async def process(
        self,
        items: list[T],
        processor: Callable[[T], Awaitable[R]],
        max_concurrent: int = 20
    ) -> BatchCheckpoint:
        """Process with automatic checkpointing."""
        
        # Load or create checkpoint
        if self.checkpoint_path.exists():
            checkpoint = BatchCheckpoint.load(self.checkpoint_path)
            print(f"Resuming from checkpoint: {len(checkpoint.processed_indices)}/{checkpoint.total_items}")
        else:
            checkpoint = BatchCheckpoint(
                job_id=self.job_id,
                total_items=len(items),
                processed_indices=set(),
                results={},
                errors={}
            )
        
        # Find unprocessed items
        pending = [
            (i, item) for i, item in enumerate(items)
            if i not in checkpoint.processed_indices
        ]
        
        semaphore = asyncio.Semaphore(max_concurrent)
        lock = asyncio.Lock()
        processed_count = 0
        
        async def process_one(idx: int, item: T):
            nonlocal processed_count
            
            async with semaphore:
                try:
                    result = await processor(item)
                    
                    async with lock:
                        checkpoint.results[idx] = result
                        checkpoint.processed_indices.add(idx)
                        processed_count += 1
                        
                        # Checkpoint periodically
                        if processed_count % self.checkpoint_interval == 0:
                            checkpoint.save(self.checkpoint_path)
                            print(f"Checkpoint saved: {len(checkpoint.processed_indices)}/{checkpoint.total_items}")
                
                except Exception as e:
                    async with lock:
                        checkpoint.errors[idx] = str(e)
                        checkpoint.processed_indices.add(idx)
        
        tasks = [process_one(i, item) for i, item in pending]
        await asyncio.gather(*tasks)
        
        # Final checkpoint
        checkpoint.save(self.checkpoint_path)
        
        return checkpoint

# Usage
processor = ResumableBatchProcessor(
    job_id="classify_docs_2024",
    checkpoint_interval=50
)

checkpoint = await processor.process(documents, classify_document)

print(f"Completed: {len(checkpoint.results)}")
print(f"Errors: {len(checkpoint.errors)}")

Cost Estimation and Tracking

from dataclasses import dataclass

@dataclass
class ModelPricing:
    input_per_1k: float
    output_per_1k: float

PRICING = {
    "gpt-4o": ModelPricing(0.0025, 0.01),
    "gpt-4o-mini": ModelPricing(0.00015, 0.0006),
    "gpt-4-turbo": ModelPricing(0.01, 0.03),
    "claude-3-5-sonnet": ModelPricing(0.003, 0.015),
}

class CostTracker:
    """Track and estimate LLM costs."""
    
    def __init__(self, model: str):
        self.model = model
        self.pricing = PRICING.get(model, ModelPricing(0.01, 0.03))
        
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.request_count = 0
    
    def estimate_batch_cost(
        self,
        items: list[str],
        avg_output_tokens: int = 100
    ) -> float:
        """Estimate cost before running."""
        
        total_input = sum(len(item) // 4 for item in items)
        total_output = len(items) * avg_output_tokens
        
        input_cost = (total_input / 1000) * self.pricing.input_per_1k
        output_cost = (total_output / 1000) * self.pricing.output_per_1k
        
        return input_cost + output_cost
    
    def record_usage(self, input_tokens: int, output_tokens: int):
        """Record actual usage."""
        self.total_input_tokens += input_tokens
        self.total_output_tokens += output_tokens
        self.request_count += 1
    
    @property
    def total_cost(self) -> float:
        input_cost = (self.total_input_tokens / 1000) * self.pricing.input_per_1k
        output_cost = (self.total_output_tokens / 1000) * self.pricing.output_per_1k
        return input_cost + output_cost
    
    def summary(self) -> str:
        return f"""
Cost Summary:
  Requests: {self.request_count:,}
  Input tokens: {self.total_input_tokens:,}
  Output tokens: {self.total_output_tokens:,}
  Total cost: ${self.total_cost:.4f}
  Avg cost/request: ${self.total_cost / max(self.request_count, 1):.6f}
"""

# Usage with batch processor
tracker = CostTracker("gpt-4o-mini")

# Estimate before running
estimated = tracker.estimate_batch_cost(documents)
print(f"Estimated cost: ${estimated:.2f}")

# Track during processing
async def classify_with_tracking(text: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Classify: tech, business, science, other"},
            {"role": "user", "content": text[:2000]}
        ]
    )
    
    tracker.record_usage(
        response.usage.prompt_tokens,
        response.usage.completion_tokens
    )
    
    return response.choices[0].message.content

result = await process_batch(documents, classify_with_tracking)
print(tracker.summary())

OpenAI Batch API

import json
from openai import OpenAI

client = OpenAI()

def create_batch_file(items: list[dict], output_path: str) -> str:
    """Create JSONL file for batch API."""
    
    with open(output_path, 'w') as f:
        for i, item in enumerate(items):
            request = {
                "custom_id": f"request-{i}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o-mini",
                    "messages": item["messages"],
                    "max_tokens": 1000
                }
            }
            f.write(json.dumps(request) + "\n")
    
    return output_path

def submit_batch(file_path: str) -> str:
    """Submit batch job to OpenAI."""
    
    # Upload file
    with open(file_path, 'rb') as f:
        file = client.files.create(file=f, purpose="batch")
    
    # Create batch
    batch = client.batches.create(
        input_file_id=file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h"
    )
    
    return batch.id

def check_batch_status(batch_id: str) -> dict:
    """Check batch job status."""
    
    batch = client.batches.retrieve(batch_id)
    
    return {
        "status": batch.status,
        "completed": batch.request_counts.completed,
        "failed": batch.request_counts.failed,
        "total": batch.request_counts.total
    }

def get_batch_results(batch_id: str) -> list[dict]:
    """Download batch results."""
    
    batch = client.batches.retrieve(batch_id)
    
    if batch.status != "completed":
        raise ValueError(f"Batch not complete: {batch.status}")
    
    # Download output file
    content = client.files.content(batch.output_file_id)
    
    results = []
    for line in content.text.split("\n"):
        if line:
            results.append(json.loads(line))
    
    return results

# Usage - 50% cheaper than real-time API
items = [
    {"messages": [{"role": "user", "content": doc}]}
    for doc in documents
]

# Create and submit
file_path = create_batch_file(items, "batch_input.jsonl")
batch_id = submit_batch(file_path)

print(f"Batch submitted: {batch_id}")

# Poll for completion
import time
while True:
    status = check_batch_status(batch_id)
    print(f"Status: {status['status']} - {status['completed']}/{status['total']}")
    
    if status["status"] in ("completed", "failed", "expired"):
        break
    
    time.sleep(60)

# Get results
results = get_batch_results(batch_id)

References

Conclusion

Batch processing LLMs at scale requires careful orchestration of concurrency, rate limiting, and error handling. Start with controlled parallelism using semaphores to avoid overwhelming APIs. Implement token bucket rate limiting to stay within provider limits. Add checkpoint/resume for long-running jobs so failures don’t lose progress. Track costs throughout to avoid surprises. For large batches, consider OpenAI’s Batch API which offers 50% cost savings for non-urgent workloads. The patterns here scale from hundreds to millions of items—the key is building robust infrastructure that handles failures gracefully and provides visibility into progress. Always estimate costs before running large batches, and start with a small sample to validate your processing logic before committing to the full dataset.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.