Async LLM Patterns: Maximizing Throughput with Concurrent Processing

Introduction: LLM API calls are slow—often 1-10 seconds per request. Sequential processing kills throughput. Async patterns let you process multiple requests concurrently, dramatically improving performance for batch operations, parallel tool calls, and high-traffic applications. This guide covers async LLM patterns in Python: using asyncio with OpenAI and Anthropic clients, managing concurrency with semaphores, implementing retry logic, and building production-ready async pipelines that maximize throughput while respecting rate limits.

Async LLM Patterns
Async LLM: Concurrent Processing with Rate Control

Basic Async with OpenAI

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def get_completion(prompt: str) -> str:
    """Get a single completion asynchronously."""
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.choices[0].message.content

async def process_prompts(prompts: list[str]) -> list[str]:
    """Process multiple prompts concurrently."""
    
    tasks = [get_completion(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    
    return results

# Usage
prompts = [
    "What is Python?",
    "What is JavaScript?",
    "What is Rust?",
    "What is Go?",
    "What is TypeScript?"
]

# Sequential: ~10 seconds (2s per request)
# Concurrent: ~2 seconds (all at once)
results = asyncio.run(process_prompts(prompts))

for prompt, result in zip(prompts, results):
    print(f"Q: {prompt[:30]}...")
    print(f"A: {result[:100]}...\n")

Controlled Concurrency with Semaphores

import asyncio
from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')
R = TypeVar('R')

class AsyncLLMProcessor:
    """Process LLM requests with controlled concurrency."""
    
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.client = AsyncOpenAI()
    
    async def process_one(
        self,
        item: T,
        processor: Callable[[T], Awaitable[R]]
    ) -> R:
        """Process single item with semaphore control."""
        
        async with self.semaphore:
            return await processor(item)
    
    async def process_batch(
        self,
        items: list[T],
        processor: Callable[[T], Awaitable[R]]
    ) -> list[R]:
        """Process batch with controlled concurrency."""
        
        tasks = [
            self.process_one(item, processor)
            for item in items
        ]
        
        return await asyncio.gather(*tasks)
    
    async def complete(self, prompt: str, model: str = "gpt-4o-mini") -> str:
        """Get completion with concurrency control."""
        
        async with self.semaphore:
            response = await self.client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.choices[0].message.content

# Usage
processor = AsyncLLMProcessor(max_concurrent=20)

async def classify_text(text: str) -> str:
    return await processor.complete(
        f"Classify this text as positive, negative, or neutral:\n{text}"
    )

texts = ["Great product!", "Terrible service", "It's okay", ...] * 100  # 300 texts

# Process 300 texts with max 20 concurrent requests
results = asyncio.run(processor.process_batch(texts, classify_text))

Async Streaming

from typing import AsyncIterator

async def stream_completion(prompt: str) -> AsyncIterator[str]:
    """Stream completion tokens."""
    
    client = AsyncOpenAI()
    
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

async def stream_to_console(prompt: str):
    """Stream response to console."""
    
    print("Response: ", end="", flush=True)
    
    async for token in stream_completion(prompt):
        print(token, end="", flush=True)
    
    print()  # Newline at end

# Multiple concurrent streams
async def parallel_streams(prompts: list[str]):
    """Run multiple streams in parallel."""
    
    async def stream_one(idx: int, prompt: str):
        tokens = []
        async for token in stream_completion(prompt):
            tokens.append(token)
        return idx, "".join(tokens)
    
    tasks = [stream_one(i, p) for i, p in enumerate(prompts)]
    results = await asyncio.gather(*tasks)
    
    # Sort by original index
    return [r[1] for r in sorted(results, key=lambda x: x[0])]

# Usage
asyncio.run(stream_to_console("Write a haiku about programming"))

Async Retry with Backoff

import asyncio
import random
from functools import wraps
from openai import RateLimitError, APIError

def async_retry(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0
):
    """Decorator for async retry with exponential backoff."""
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                
                except RateLimitError as e:
                    last_exception = e
                    
                    if attempt == max_retries:
                        raise
                    
                    # Calculate delay with jitter
                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay
                    )
                    jitter = random.uniform(0, delay * 0.1)
                    
                    print(f"Rate limited. Retrying in {delay + jitter:.1f}s...")
                    await asyncio.sleep(delay + jitter)
                
                except APIError as e:
                    last_exception = e
                    
                    if attempt == max_retries:
                        raise
                    
                    delay = base_delay * (attempt + 1)
                    print(f"API error: {e}. Retrying in {delay:.1f}s...")
                    await asyncio.sleep(delay)
            
            raise last_exception
        
        return wrapper
    return decorator

class ResilientAsyncClient:
    """Async client with built-in retry logic."""
    
    def __init__(self, max_concurrent: int = 10, max_retries: int = 3):
        self.client = AsyncOpenAI()
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.max_retries = max_retries
    
    @async_retry(max_retries=3, base_delay=1.0)
    async def _call_api(self, messages: list[dict], model: str) -> str:
        """Make API call with retry."""
        
        response = await self.client.chat.completions.create(
            model=model,
            messages=messages
        )
        return response.choices[0].message.content
    
    async def complete(
        self,
        prompt: str,
        model: str = "gpt-4o-mini",
        system: str = None
    ) -> str:
        """Get completion with retry and concurrency control."""
        
        messages = []
        if system:
            messages.append({"role": "system", "content": system})
        messages.append({"role": "user", "content": prompt})
        
        async with self.semaphore:
            return await self._call_api(messages, model)

# Usage
client = ResilientAsyncClient(max_concurrent=20, max_retries=3)

async def process_with_retries(prompts: list[str]) -> list[str]:
    tasks = [client.complete(p) for p in prompts]
    return await asyncio.gather(*tasks, return_exceptions=True)

results = asyncio.run(process_with_retries(prompts))

Async Pipeline Pattern

import asyncio
from dataclasses import dataclass
from typing import Optional

@dataclass
class PipelineItem:
    id: str
    input: str
    stage1_result: Optional[str] = None
    stage2_result: Optional[str] = None
    final_result: Optional[str] = None
    error: Optional[str] = None

class AsyncPipeline:
    """Multi-stage async processing pipeline."""
    
    def __init__(self, max_concurrent_per_stage: int = 10):
        self.client = AsyncOpenAI()
        self.stage_semaphores = {
            "stage1": asyncio.Semaphore(max_concurrent_per_stage),
            "stage2": asyncio.Semaphore(max_concurrent_per_stage),
            "stage3": asyncio.Semaphore(max_concurrent_per_stage)
        }
    
    async def stage1_extract(self, item: PipelineItem) -> PipelineItem:
        """Stage 1: Extract key information."""
        
        async with self.stage_semaphores["stage1"]:
            try:
                response = await self.client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{
                        "role": "user",
                        "content": f"Extract key facts from:\n{item.input}"
                    }]
                )
                item.stage1_result = response.choices[0].message.content
            except Exception as e:
                item.error = f"Stage 1 error: {e}"
        
        return item
    
    async def stage2_analyze(self, item: PipelineItem) -> PipelineItem:
        """Stage 2: Analyze extracted information."""
        
        if item.error:
            return item
        
        async with self.stage_semaphores["stage2"]:
            try:
                response = await self.client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{
                        "role": "user",
                        "content": f"Analyze these facts:\n{item.stage1_result}"
                    }]
                )
                item.stage2_result = response.choices[0].message.content
            except Exception as e:
                item.error = f"Stage 2 error: {e}"
        
        return item
    
    async def stage3_summarize(self, item: PipelineItem) -> PipelineItem:
        """Stage 3: Generate final summary."""
        
        if item.error:
            return item
        
        async with self.stage_semaphores["stage3"]:
            try:
                response = await self.client.chat.completions.create(
                    model="gpt-4o",  # Use better model for final output
                    messages=[{
                        "role": "user",
                        "content": f"Summarize this analysis:\n{item.stage2_result}"
                    }]
                )
                item.final_result = response.choices[0].message.content
            except Exception as e:
                item.error = f"Stage 3 error: {e}"
        
        return item
    
    async def process_item(self, item: PipelineItem) -> PipelineItem:
        """Process single item through all stages."""
        
        item = await self.stage1_extract(item)
        item = await self.stage2_analyze(item)
        item = await self.stage3_summarize(item)
        
        return item
    
    async def process_batch(self, items: list[PipelineItem]) -> list[PipelineItem]:
        """Process batch through pipeline."""
        
        tasks = [self.process_item(item) for item in items]
        return await asyncio.gather(*tasks)

# Usage
pipeline = AsyncPipeline(max_concurrent_per_stage=10)

items = [
    PipelineItem(id=f"doc_{i}", input=f"Document {i} content...")
    for i in range(50)
]

results = asyncio.run(pipeline.process_batch(items))

successful = [r for r in results if not r.error]
failed = [r for r in results if r.error]

print(f"Processed: {len(successful)} successful, {len(failed)} failed")

Production Async Service

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import uuid

app = FastAPI()

class CompletionRequest(BaseModel):
    prompt: str
    model: str = "gpt-4o-mini"

class BatchRequest(BaseModel):
    prompts: list[str]
    model: str = "gpt-4o-mini"

class BatchStatus(BaseModel):
    job_id: str
    status: str
    completed: int
    total: int
    results: list[str] = None

# In-memory job storage (use Redis in production)
jobs: dict[str, BatchStatus] = {}

client = ResilientAsyncClient(max_concurrent=50)

@app.post("/complete")
async def complete(request: CompletionRequest):
    """Single completion endpoint."""
    
    result = await client.complete(request.prompt, request.model)
    return {"result": result}

@app.post("/batch")
async def start_batch(request: BatchRequest, background_tasks: BackgroundTasks):
    """Start batch processing job."""
    
    job_id = str(uuid.uuid4())
    
    jobs[job_id] = BatchStatus(
        job_id=job_id,
        status="processing",
        completed=0,
        total=len(request.prompts)
    )
    
    background_tasks.add_task(
        process_batch_job,
        job_id,
        request.prompts,
        request.model
    )
    
    return {"job_id": job_id}

async def process_batch_job(job_id: str, prompts: list[str], model: str):
    """Background batch processing."""
    
    results = []
    
    async def process_one(idx: int, prompt: str):
        result = await client.complete(prompt, model)
        jobs[job_id].completed = idx + 1
        return result
    
    tasks = [process_one(i, p) for i, p in enumerate(prompts)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Convert exceptions to error strings
    results = [
        str(r) if isinstance(r, Exception) else r
        for r in results
    ]
    
    jobs[job_id].status = "completed"
    jobs[job_id].results = results

@app.get("/batch/{job_id}")
async def get_batch_status(job_id: str):
    """Get batch job status."""
    
    if job_id not in jobs:
        return {"error": "Job not found"}
    
    return jobs[job_id]

# WebSocket streaming
from fastapi import WebSocket

@app.websocket("/stream")
async def websocket_stream(websocket: WebSocket):
    """Stream completions over WebSocket."""
    
    await websocket.accept()
    
    while True:
        data = await websocket.receive_json()
        prompt = data.get("prompt", "")
        
        async for token in stream_completion(prompt):
            await websocket.send_json({"token": token})
        
        await websocket.send_json({"done": True})

References

Conclusion

Async patterns are essential for production LLM applications. The difference between sequential and concurrent processing can be 10-50x in throughput. Use asyncio with the async OpenAI client for concurrent requests. Control concurrency with semaphores to respect rate limits and avoid overwhelming APIs. Implement retry logic with exponential backoff for resilience. Build pipelines for multi-stage processing where each stage can run concurrently. For web applications, use FastAPI’s async support and background tasks for long-running operations. Stream responses over WebSockets for real-time user feedback. The patterns here scale from simple scripts to high-traffic production services. Start with basic async/await, add concurrency control, then layer in retry logic and monitoring as your needs grow.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.