Introduction: LLM API calls are slow—often 1-10 seconds per request. Sequential processing kills throughput. Async patterns let you process multiple requests concurrently, dramatically improving performance for batch operations, parallel tool calls, and high-traffic applications. This guide covers async LLM patterns in Python: using asyncio with OpenAI and Anthropic clients, managing concurrency with semaphores, implementing retry logic, and building production-ready async pipelines that maximize throughput while respecting rate limits.

Basic Async with OpenAI
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def get_completion(prompt: str) -> str:
"""Get a single completion asynchronously."""
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def process_prompts(prompts: list[str]) -> list[str]:
"""Process multiple prompts concurrently."""
tasks = [get_completion(prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
return results
# Usage
prompts = [
"What is Python?",
"What is JavaScript?",
"What is Rust?",
"What is Go?",
"What is TypeScript?"
]
# Sequential: ~10 seconds (2s per request)
# Concurrent: ~2 seconds (all at once)
results = asyncio.run(process_prompts(prompts))
for prompt, result in zip(prompts, results):
print(f"Q: {prompt[:30]}...")
print(f"A: {result[:100]}...\n")
Controlled Concurrency with Semaphores
import asyncio
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
R = TypeVar('R')
class AsyncLLMProcessor:
"""Process LLM requests with controlled concurrency."""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.client = AsyncOpenAI()
async def process_one(
self,
item: T,
processor: Callable[[T], Awaitable[R]]
) -> R:
"""Process single item with semaphore control."""
async with self.semaphore:
return await processor(item)
async def process_batch(
self,
items: list[T],
processor: Callable[[T], Awaitable[R]]
) -> list[R]:
"""Process batch with controlled concurrency."""
tasks = [
self.process_one(item, processor)
for item in items
]
return await asyncio.gather(*tasks)
async def complete(self, prompt: str, model: str = "gpt-4o-mini") -> str:
"""Get completion with concurrency control."""
async with self.semaphore:
response = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# Usage
processor = AsyncLLMProcessor(max_concurrent=20)
async def classify_text(text: str) -> str:
return await processor.complete(
f"Classify this text as positive, negative, or neutral:\n{text}"
)
texts = ["Great product!", "Terrible service", "It's okay", ...] * 100 # 300 texts
# Process 300 texts with max 20 concurrent requests
results = asyncio.run(processor.process_batch(texts, classify_text))
Async Streaming
from typing import AsyncIterator
async def stream_completion(prompt: str) -> AsyncIterator[str]:
"""Stream completion tokens."""
client = AsyncOpenAI()
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def stream_to_console(prompt: str):
"""Stream response to console."""
print("Response: ", end="", flush=True)
async for token in stream_completion(prompt):
print(token, end="", flush=True)
print() # Newline at end
# Multiple concurrent streams
async def parallel_streams(prompts: list[str]):
"""Run multiple streams in parallel."""
async def stream_one(idx: int, prompt: str):
tokens = []
async for token in stream_completion(prompt):
tokens.append(token)
return idx, "".join(tokens)
tasks = [stream_one(i, p) for i, p in enumerate(prompts)]
results = await asyncio.gather(*tasks)
# Sort by original index
return [r[1] for r in sorted(results, key=lambda x: x[0])]
# Usage
asyncio.run(stream_to_console("Write a haiku about programming"))
Async Retry with Backoff
import asyncio
import random
from functools import wraps
from openai import RateLimitError, APIError
def async_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
):
"""Decorator for async retry with exponential backoff."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except RateLimitError as e:
last_exception = e
if attempt == max_retries:
raise
# Calculate delay with jitter
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
jitter = random.uniform(0, delay * 0.1)
print(f"Rate limited. Retrying in {delay + jitter:.1f}s...")
await asyncio.sleep(delay + jitter)
except APIError as e:
last_exception = e
if attempt == max_retries:
raise
delay = base_delay * (attempt + 1)
print(f"API error: {e}. Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
class ResilientAsyncClient:
"""Async client with built-in retry logic."""
def __init__(self, max_concurrent: int = 10, max_retries: int = 3):
self.client = AsyncOpenAI()
self.semaphore = asyncio.Semaphore(max_concurrent)
self.max_retries = max_retries
@async_retry(max_retries=3, base_delay=1.0)
async def _call_api(self, messages: list[dict], model: str) -> str:
"""Make API call with retry."""
response = await self.client.chat.completions.create(
model=model,
messages=messages
)
return response.choices[0].message.content
async def complete(
self,
prompt: str,
model: str = "gpt-4o-mini",
system: str = None
) -> str:
"""Get completion with retry and concurrency control."""
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
async with self.semaphore:
return await self._call_api(messages, model)
# Usage
client = ResilientAsyncClient(max_concurrent=20, max_retries=3)
async def process_with_retries(prompts: list[str]) -> list[str]:
tasks = [client.complete(p) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
results = asyncio.run(process_with_retries(prompts))
Async Pipeline Pattern
import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class PipelineItem:
id: str
input: str
stage1_result: Optional[str] = None
stage2_result: Optional[str] = None
final_result: Optional[str] = None
error: Optional[str] = None
class AsyncPipeline:
"""Multi-stage async processing pipeline."""
def __init__(self, max_concurrent_per_stage: int = 10):
self.client = AsyncOpenAI()
self.stage_semaphores = {
"stage1": asyncio.Semaphore(max_concurrent_per_stage),
"stage2": asyncio.Semaphore(max_concurrent_per_stage),
"stage3": asyncio.Semaphore(max_concurrent_per_stage)
}
async def stage1_extract(self, item: PipelineItem) -> PipelineItem:
"""Stage 1: Extract key information."""
async with self.stage_semaphores["stage1"]:
try:
response = await self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"Extract key facts from:\n{item.input}"
}]
)
item.stage1_result = response.choices[0].message.content
except Exception as e:
item.error = f"Stage 1 error: {e}"
return item
async def stage2_analyze(self, item: PipelineItem) -> PipelineItem:
"""Stage 2: Analyze extracted information."""
if item.error:
return item
async with self.stage_semaphores["stage2"]:
try:
response = await self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"Analyze these facts:\n{item.stage1_result}"
}]
)
item.stage2_result = response.choices[0].message.content
except Exception as e:
item.error = f"Stage 2 error: {e}"
return item
async def stage3_summarize(self, item: PipelineItem) -> PipelineItem:
"""Stage 3: Generate final summary."""
if item.error:
return item
async with self.stage_semaphores["stage3"]:
try:
response = await self.client.chat.completions.create(
model="gpt-4o", # Use better model for final output
messages=[{
"role": "user",
"content": f"Summarize this analysis:\n{item.stage2_result}"
}]
)
item.final_result = response.choices[0].message.content
except Exception as e:
item.error = f"Stage 3 error: {e}"
return item
async def process_item(self, item: PipelineItem) -> PipelineItem:
"""Process single item through all stages."""
item = await self.stage1_extract(item)
item = await self.stage2_analyze(item)
item = await self.stage3_summarize(item)
return item
async def process_batch(self, items: list[PipelineItem]) -> list[PipelineItem]:
"""Process batch through pipeline."""
tasks = [self.process_item(item) for item in items]
return await asyncio.gather(*tasks)
# Usage
pipeline = AsyncPipeline(max_concurrent_per_stage=10)
items = [
PipelineItem(id=f"doc_{i}", input=f"Document {i} content...")
for i in range(50)
]
results = asyncio.run(pipeline.process_batch(items))
successful = [r for r in results if not r.error]
failed = [r for r in results if r.error]
print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Production Async Service
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import uuid
app = FastAPI()
class CompletionRequest(BaseModel):
prompt: str
model: str = "gpt-4o-mini"
class BatchRequest(BaseModel):
prompts: list[str]
model: str = "gpt-4o-mini"
class BatchStatus(BaseModel):
job_id: str
status: str
completed: int
total: int
results: list[str] = None
# In-memory job storage (use Redis in production)
jobs: dict[str, BatchStatus] = {}
client = ResilientAsyncClient(max_concurrent=50)
@app.post("/complete")
async def complete(request: CompletionRequest):
"""Single completion endpoint."""
result = await client.complete(request.prompt, request.model)
return {"result": result}
@app.post("/batch")
async def start_batch(request: BatchRequest, background_tasks: BackgroundTasks):
"""Start batch processing job."""
job_id = str(uuid.uuid4())
jobs[job_id] = BatchStatus(
job_id=job_id,
status="processing",
completed=0,
total=len(request.prompts)
)
background_tasks.add_task(
process_batch_job,
job_id,
request.prompts,
request.model
)
return {"job_id": job_id}
async def process_batch_job(job_id: str, prompts: list[str], model: str):
"""Background batch processing."""
results = []
async def process_one(idx: int, prompt: str):
result = await client.complete(prompt, model)
jobs[job_id].completed = idx + 1
return result
tasks = [process_one(i, p) for i, p in enumerate(prompts)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Convert exceptions to error strings
results = [
str(r) if isinstance(r, Exception) else r
for r in results
]
jobs[job_id].status = "completed"
jobs[job_id].results = results
@app.get("/batch/{job_id}")
async def get_batch_status(job_id: str):
"""Get batch job status."""
if job_id not in jobs:
return {"error": "Job not found"}
return jobs[job_id]
# WebSocket streaming
from fastapi import WebSocket
@app.websocket("/stream")
async def websocket_stream(websocket: WebSocket):
"""Stream completions over WebSocket."""
await websocket.accept()
while True:
data = await websocket.receive_json()
prompt = data.get("prompt", "")
async for token in stream_completion(prompt):
await websocket.send_json({"token": token})
await websocket.send_json({"done": True})
References
- OpenAI Async Client: https://github.com/openai/openai-python
- asyncio Documentation: https://docs.python.org/3/library/asyncio.html
- FastAPI: https://fastapi.tiangolo.com/
- aiohttp: https://docs.aiohttp.org/
Conclusion
Async patterns are essential for production LLM applications. The difference between sequential and concurrent processing can be 10-50x in throughput. Use asyncio with the async OpenAI client for concurrent requests. Control concurrency with semaphores to respect rate limits and avoid overwhelming APIs. Implement retry logic with exponential backoff for resilience. Build pipelines for multi-stage processing where each stage can run concurrently. For web applications, use FastAPI’s async support and background tasks for long-running operations. Stream responses over WebSockets for real-time user feedback. The patterns here scale from simple scripts to high-traffic production services. Start with basic async/await, add concurrency control, then layer in retry logic and monitoring as your needs grow.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.