Introduction: LLM API calls are inherently I/O-bound—waiting for network responses dominates execution time. Async programming transforms this bottleneck into an opportunity for massive parallelism. Instead of waiting sequentially for each response, async patterns enable concurrent execution of hundreds of requests while efficiently managing resources. This guide covers practical async patterns for LLM applications: concurrent request handling, semaphore-based rate limiting, streaming with async generators, task queues, and production-ready async architectures that maximize throughput while maintaining reliability.

Basic Async Patterns
import asyncio
from typing import Any, Callable, TypeVar
from dataclasses import dataclass
T = TypeVar('T')
async def simple_concurrent_calls(
client: Any,
prompts: list[str],
model: str = "gpt-4o-mini"
) -> list[str]:
"""Execute multiple LLM calls concurrently."""
async def call_llm(prompt: str) -> str:
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# Execute all calls concurrently
tasks = [call_llm(prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
return results
async def concurrent_with_semaphore(
client: Any,
prompts: list[str],
max_concurrent: int = 10,
model: str = "gpt-4o-mini"
) -> list[str]:
"""Limit concurrent calls with semaphore."""
semaphore = asyncio.Semaphore(max_concurrent)
async def call_with_limit(prompt: str) -> str:
async with semaphore:
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
tasks = [call_with_limit(prompt) for prompt in prompts]
return await asyncio.gather(*tasks)
async def as_completed_pattern(
client: Any,
prompts: list[str],
model: str = "gpt-4o-mini"
):
"""Process results as they complete."""
async def call_llm(idx: int, prompt: str) -> tuple[int, str]:
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return idx, response.choices[0].message.content
tasks = [
asyncio.create_task(call_llm(i, prompt))
for i, prompt in enumerate(prompts)
]
# Yield results as they complete
for coro in asyncio.as_completed(tasks):
idx, result = await coro
yield idx, result
@dataclass
class AsyncResult:
"""Result with metadata."""
index: int
content: str
latency_ms: float
success: bool
error: str = None
async def robust_concurrent_calls(
client: Any,
prompts: list[str],
max_concurrent: int = 10,
timeout: float = 30.0,
model: str = "gpt-4o-mini"
) -> list[AsyncResult]:
"""Concurrent calls with error handling and timeouts."""
import time
semaphore = asyncio.Semaphore(max_concurrent)
async def call_with_handling(idx: int, prompt: str) -> AsyncResult:
start = time.time()
async with semaphore:
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
),
timeout=timeout
)
latency = (time.time() - start) * 1000
return AsyncResult(
index=idx,
content=response.choices[0].message.content,
latency_ms=latency,
success=True
)
except asyncio.TimeoutError:
latency = (time.time() - start) * 1000
return AsyncResult(
index=idx,
content="",
latency_ms=latency,
success=False,
error="Timeout"
)
except Exception as e:
latency = (time.time() - start) * 1000
return AsyncResult(
index=idx,
content="",
latency_ms=latency,
success=False,
error=str(e)
)
tasks = [
call_with_handling(i, prompt)
for i, prompt in enumerate(prompts)
]
return await asyncio.gather(*tasks)
Rate-Limited Async Client
import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
@dataclass
class RateLimitConfig:
"""Rate limit configuration."""
requests_per_minute: int = 60
tokens_per_minute: int = 100000
class AsyncTokenBucket:
"""Async token bucket rate limiter."""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate # tokens per second
self.last_refill = datetime.now()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1):
"""Acquire tokens, waiting if necessary."""
async with self._lock:
await self._refill()
while self.tokens < tokens:
# Calculate wait time
needed = tokens - self.tokens
wait_time = needed / self.refill_rate
await asyncio.sleep(wait_time)
await self._refill()
self.tokens -= tokens
async def _refill(self):
"""Refill tokens based on elapsed time."""
now = datetime.now()
elapsed = (now - self.last_refill).total_seconds()
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.refill_rate
)
self.last_refill = now
class RateLimitedAsyncClient:
"""Async LLM client with rate limiting."""
def __init__(
self,
client: Any,
config: RateLimitConfig = None
):
self.client = client
self.config = config or RateLimitConfig()
# Request rate limiter
self.request_limiter = AsyncTokenBucket(
capacity=self.config.requests_per_minute,
refill_rate=self.config.requests_per_minute / 60.0
)
# Token rate limiter
self.token_limiter = AsyncTokenBucket(
capacity=self.config.tokens_per_minute,
refill_rate=self.config.tokens_per_minute / 60.0
)
async def chat_completion(
self,
messages: list[dict],
model: str = "gpt-4o-mini",
max_tokens: int = 1000,
**kwargs
) -> Any:
"""Create chat completion with rate limiting."""
# Estimate input tokens (rough approximation)
input_tokens = sum(len(m.get("content", "")) // 4 for m in messages)
total_tokens = input_tokens + max_tokens
# Acquire rate limit tokens
await self.request_limiter.acquire(1)
await self.token_limiter.acquire(total_tokens)
# Make request
response = await self.client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
**kwargs
)
return response
class AdaptiveRateLimiter:
"""Adapt rate based on API responses."""
def __init__(self, initial_rpm: int = 60):
self.current_rpm = initial_rpm
self.min_rpm = 10
self.max_rpm = 500
self._lock = asyncio.Lock()
self.consecutive_successes = 0
async def record_success(self):
"""Record successful request."""
async with self._lock:
self.consecutive_successes += 1
if self.consecutive_successes >= 10:
self.current_rpm = min(
self.max_rpm,
int(self.current_rpm * 1.1)
)
self.consecutive_successes = 0
async def record_rate_limit(self):
"""Record rate limit error."""
async with self._lock:
self.current_rpm = max(
self.min_rpm,
int(self.current_rpm * 0.5)
)
self.consecutive_successes = 0
@property
def request_interval(self) -> float:
return 60.0 / self.current_rpm
Async Streaming
import asyncio
from typing import Any, AsyncIterator
async def stream_completion(
client: Any,
messages: list[dict],
model: str = "gpt-4o-mini"
) -> AsyncIterator[str]:
"""Stream completion tokens."""
stream = await client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def parallel_streams(
client: Any,
prompts: list[str],
model: str = "gpt-4o-mini"
) -> AsyncIterator[tuple[int, str]]:
"""Stream multiple completions in parallel."""
async def stream_one(idx: int, prompt: str):
messages = [{"role": "user", "content": prompt}]
async for token in stream_completion(client, messages, model):
yield idx, token
# Create all streams
streams = [stream_one(i, p) for i, p in enumerate(prompts)]
# Merge streams
async def merge_streams():
tasks = {
asyncio.create_task(stream.__anext__()): stream
for stream in streams
}
while tasks:
done, _ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
stream = tasks.pop(task)
try:
result = task.result()
yield result
# Schedule next item from this stream
tasks[asyncio.create_task(stream.__anext__())] = stream
except StopAsyncIteration:
pass
async for item in merge_streams():
yield item
class AsyncStreamBuffer:
"""Buffer for async stream processing."""
def __init__(self, max_size: int = 100):
self.queue = asyncio.Queue(maxsize=max_size)
self.done = False
async def put(self, item: Any):
"""Add item to buffer."""
await self.queue.put(item)
async def get(self) -> Any:
"""Get item from buffer."""
return await self.queue.get()
def mark_done(self):
"""Mark stream as complete."""
self.done = True
async def __aiter__(self):
"""Iterate over buffered items."""
while not self.done or not self.queue.empty():
try:
item = await asyncio.wait_for(
self.queue.get(),
timeout=0.1
)
yield item
except asyncio.TimeoutError:
if self.done:
break
async def buffered_stream(
client: Any,
messages: list[dict],
buffer_size: int = 100,
model: str = "gpt-4o-mini"
) -> AsyncIterator[str]:
"""Stream with buffering for backpressure handling."""
buffer = AsyncStreamBuffer(max_size=buffer_size)
async def producer():
try:
async for token in stream_completion(client, messages, model):
await buffer.put(token)
finally:
buffer.mark_done()
# Start producer
producer_task = asyncio.create_task(producer())
# Consume from buffer
async for token in buffer:
yield token
await producer_task
Async Task Queue
import asyncio
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from datetime import datetime
import uuid
@dataclass
class Task:
"""A task in the queue."""
id: str
payload: Any
priority: int = 0
created_at: datetime = field(default_factory=datetime.now)
def __lt__(self, other):
return self.priority < other.priority
@dataclass
class TaskResult:
"""Result of task execution."""
task_id: str
result: Any = None
error: str = None
latency_ms: float = 0.0
class AsyncTaskQueue:
"""Async task queue with workers."""
def __init__(
self,
processor: Callable[[Any], Any],
num_workers: int = 5,
max_queue_size: int = 1000
):
self.processor = processor
self.num_workers = num_workers
self.queue = asyncio.PriorityQueue(maxsize=max_queue_size)
self.results: dict[str, asyncio.Future] = {}
self._workers = []
self._running = False
async def start(self):
"""Start worker tasks."""
self._running = True
self._workers = [
asyncio.create_task(self._worker(i))
for i in range(self.num_workers)
]
async def stop(self):
"""Stop all workers."""
self._running = False
for worker in self._workers:
worker.cancel()
await asyncio.gather(*self._workers, return_exceptions=True)
async def submit(
self,
payload: Any,
priority: int = 0
) -> asyncio.Future:
"""Submit task to queue."""
task_id = str(uuid.uuid4())
task = Task(id=task_id, payload=payload, priority=priority)
future = asyncio.Future()
self.results[task_id] = future
await self.queue.put((priority, task))
return future
async def _worker(self, worker_id: int):
"""Worker coroutine."""
while self._running:
try:
_, task = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
start = datetime.now()
try:
if asyncio.iscoroutinefunction(self.processor):
result = await self.processor(task.payload)
else:
result = self.processor(task.payload)
latency = (datetime.now() - start).total_seconds() * 1000
task_result = TaskResult(
task_id=task.id,
result=result,
latency_ms=latency
)
except Exception as e:
latency = (datetime.now() - start).total_seconds() * 1000
task_result = TaskResult(
task_id=task.id,
error=str(e),
latency_ms=latency
)
# Resolve future
future = self.results.get(task.id)
if future and not future.done():
future.set_result(task_result)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
class LLMTaskQueue(AsyncTaskQueue):
"""Task queue specialized for LLM calls."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini",
num_workers: int = 10,
max_queue_size: int = 1000
):
self.client = client
self.model = model
super().__init__(
processor=self._process_llm_task,
num_workers=num_workers,
max_queue_size=max_queue_size
)
async def _process_llm_task(self, payload: dict) -> str:
"""Process LLM task."""
messages = payload.get("messages", [])
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
**payload.get("kwargs", {})
)
return response.choices[0].message.content
async def submit_prompt(
self,
prompt: str,
priority: int = 0,
**kwargs
) -> asyncio.Future:
"""Submit a prompt for processing."""
payload = {
"messages": [{"role": "user", "content": prompt}],
"kwargs": kwargs
}
return await self.submit(payload, priority)
Production Async Service
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, Any
import asyncio
app = FastAPI()
# Initialize components
rate_limited_client = None # Initialize with actual client
task_queue = None
class BatchRequest(BaseModel):
prompts: list[str]
model: str = "gpt-4o-mini"
max_concurrent: int = 10
class StreamRequest(BaseModel):
prompt: str
model: str = "gpt-4o-mini"
class QueueRequest(BaseModel):
prompt: str
priority: int = 0
# In-memory job storage
jobs: dict[str, dict] = {}
@app.post("/v1/batch")
async def batch_completion(request: BatchRequest):
"""Execute batch of completions concurrently."""
results = await robust_concurrent_calls(
client=rate_limited_client.client,
prompts=request.prompts,
max_concurrent=request.max_concurrent,
model=request.model
)
return {
"results": [
{
"index": r.index,
"content": r.content,
"success": r.success,
"error": r.error,
"latency_ms": r.latency_ms
}
for r in results
],
"summary": {
"total": len(results),
"successful": sum(1 for r in results if r.success),
"failed": sum(1 for r in results if not r.success)
}
}
@app.post("/v1/stream")
async def stream_completion_endpoint(request: StreamRequest):
"""Stream completion response."""
from fastapi.responses import StreamingResponse
async def generate():
messages = [{"role": "user", "content": request.prompt}]
async for token in stream_completion(
rate_limited_client.client,
messages,
request.model
):
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@app.post("/v1/queue/submit")
async def submit_to_queue(request: QueueRequest):
"""Submit task to queue."""
future = await task_queue.submit_prompt(
request.prompt,
priority=request.priority
)
# Store job reference
job_id = str(uuid.uuid4())
jobs[job_id] = {"future": future, "status": "pending"}
return {"job_id": job_id}
@app.get("/v1/queue/{job_id}")
async def get_queue_result(job_id: str):
"""Get result from queue."""
if job_id not in jobs:
raise HTTPException(404, "Job not found")
job = jobs[job_id]
future = job["future"]
if not future.done():
return {"status": "pending"}
result = future.result()
return {
"status": "completed",
"result": result.result,
"error": result.error,
"latency_ms": result.latency_ms
}
@app.get("/v1/stats")
async def get_stats():
"""Get async processing stats."""
return {
"queue_size": task_queue.queue.qsize() if task_queue else 0,
"pending_jobs": sum(1 for j in jobs.values() if not j["future"].done()),
"completed_jobs": sum(1 for j in jobs.values() if j["future"].done())
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- Python AsyncIO: https://docs.python.org/3/library/asyncio.html
- OpenAI Async Client: https://github.com/openai/openai-python
- HTTPX Async: https://www.python-httpx.org/async/
- FastAPI Async: https://fastapi.tiangolo.com/async/
Conclusion
Async patterns are essential for high-throughput LLM applications. Use asyncio.gather for simple concurrent execution when you need all results together. Semaphores control concurrency to respect rate limits and prevent overwhelming the API. asyncio.as_completed enables processing results as they arrive—useful for streaming progress to users. Token bucket rate limiters provide smooth, adaptive rate control that maximizes throughput while avoiding 429 errors. Async streaming enables real-time token delivery to users while processing continues. Task queues with worker pools handle variable load and provide backpressure when the system is overwhelmed. Design your async architecture around your specific latency and throughput requirements—the patterns here provide building blocks for production-ready systems.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply