Introduction: LLM APIs are inherently slow—even fast models take hundreds of milliseconds per request. When you need to process multiple prompts, make parallel API calls, or handle high-throughput workloads, synchronous code becomes a bottleneck. Async patterns let you overlap I/O wait times, dramatically improving throughput without adding complexity. This guide covers practical async patterns for LLM applications: concurrent request handling, batching strategies, streaming with async generators, retry logic with exponential backoff, and production-ready patterns for building responsive AI applications. Whether you’re building a chatbot handling multiple users, a batch processing pipeline, or a real-time agent, these patterns will help you maximize throughput while keeping your code maintainable.

Basic Async LLM Client
import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator
from abc import ABC, abstractmethod
import time
@dataclass
class LLMResponse:
"""LLM response."""
content: str
model: str
usage: dict = field(default_factory=dict)
latency_ms: float = 0
@dataclass
class LLMConfig:
"""LLM configuration."""
model: str = "gpt-4"
temperature: float = 0.7
max_tokens: int = 1000
timeout: float = 60.0
class AsyncLLMClient(ABC):
"""Abstract async LLM client."""
@abstractmethod
async def complete(
self,
prompt: str,
config: LLMConfig = None
) -> LLMResponse:
"""Generate completion."""
pass
@abstractmethod
async def stream(
self,
prompt: str,
config: LLMConfig = None
) -> AsyncIterator[str]:
"""Stream completion."""
pass
class AsyncOpenAIClient(AsyncLLMClient):
"""Async OpenAI client."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
return self._session
async def complete(
self,
prompt: str,
config: LLMConfig = None
) -> LLMResponse:
"""Generate completion."""
config = config or LLMConfig()
session = await self._get_session()
start = time.time()
async with session.post(
f"{self.base_url}/chat/completions",
json={
"model": config.model,
"messages": [{"role": "user", "content": prompt}],
"temperature": config.temperature,
"max_tokens": config.max_tokens
},
timeout=aiohttp.ClientTimeout(total=config.timeout)
) as response:
data = await response.json()
latency = (time.time() - start) * 1000
return LLMResponse(
content=data["choices"][0]["message"]["content"],
model=config.model,
usage=data.get("usage", {}),
latency_ms=latency
)
async def stream(
self,
prompt: str,
config: LLMConfig = None
) -> AsyncIterator[str]:
"""Stream completion."""
config = config or LLMConfig()
session = await self._get_session()
async with session.post(
f"{self.base_url}/chat/completions",
json={
"model": config.model,
"messages": [{"role": "user", "content": prompt}],
"temperature": config.temperature,
"max_tokens": config.max_tokens,
"stream": True
},
timeout=aiohttp.ClientTimeout(total=config.timeout)
) as response:
async for line in response.content:
line = line.decode().strip()
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
import json
chunk = json.loads(data)
if chunk["choices"][0].get("delta", {}).get("content"):
yield chunk["choices"][0]["delta"]["content"]
async def close(self):
"""Close session."""
if self._session and not self._session.closed:
await self._session.close()
class AsyncAnthropicClient(AsyncLLMClient):
"""Async Anthropic client."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.anthropic.com/v1"
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={
"x-api-key": self.api_key,
"Content-Type": "application/json",
"anthropic-version": "2023-06-01"
}
)
return self._session
async def complete(
self,
prompt: str,
config: LLMConfig = None
) -> LLMResponse:
"""Generate completion."""
config = config or LLMConfig()
session = await self._get_session()
# Map model names
model = config.model
if model.startswith("gpt"):
model = "claude-3-sonnet-20240229"
start = time.time()
async with session.post(
f"{self.base_url}/messages",
json={
"model": model,
"max_tokens": config.max_tokens,
"messages": [{"role": "user", "content": prompt}]
},
timeout=aiohttp.ClientTimeout(total=config.timeout)
) as response:
data = await response.json()
latency = (time.time() - start) * 1000
return LLMResponse(
content=data["content"][0]["text"],
model=model,
usage=data.get("usage", {}),
latency_ms=latency
)
async def stream(
self,
prompt: str,
config: LLMConfig = None
) -> AsyncIterator[str]:
"""Stream completion."""
config = config or LLMConfig()
session = await self._get_session()
model = config.model
if model.startswith("gpt"):
model = "claude-3-sonnet-20240229"
async with session.post(
f"{self.base_url}/messages",
json={
"model": model,
"max_tokens": config.max_tokens,
"messages": [{"role": "user", "content": prompt}],
"stream": True
},
timeout=aiohttp.ClientTimeout(total=config.timeout)
) as response:
async for line in response.content:
line = line.decode().strip()
if line.startswith("data: "):
import json
data = json.loads(line[6:])
if data["type"] == "content_block_delta":
yield data["delta"]["text"]
async def close(self):
"""Close session."""
if self._session and not self._session.closed:
await self._session.close()
Concurrent Request Patterns
import asyncio
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class ConcurrentResult:
"""Result from concurrent execution."""
results: list[LLMResponse]
total_time_ms: float
avg_latency_ms: float
success_count: int
error_count: int
class ConcurrentExecutor:
"""Execute multiple LLM requests concurrently."""
def __init__(
self,
client: AsyncLLMClient,
max_concurrency: int = 10
):
self.client = client
self.semaphore = asyncio.Semaphore(max_concurrency)
async def _execute_one(
self,
prompt: str,
config: LLMConfig = None
) -> tuple[Optional[LLMResponse], Optional[Exception]]:
"""Execute single request with semaphore."""
async with self.semaphore:
try:
result = await self.client.complete(prompt, config)
return result, None
except Exception as e:
return None, e
async def execute_all(
self,
prompts: list[str],
config: LLMConfig = None
) -> ConcurrentResult:
"""Execute all prompts concurrently."""
start = time.time()
tasks = [
self._execute_one(prompt, config)
for prompt in prompts
]
results = await asyncio.gather(*tasks)
total_time = (time.time() - start) * 1000
successful = [r for r, e in results if r is not None]
errors = [e for r, e in results if e is not None]
avg_latency = (
sum(r.latency_ms for r in successful) / len(successful)
if successful else 0
)
return ConcurrentResult(
results=successful,
total_time_ms=total_time,
avg_latency_ms=avg_latency,
success_count=len(successful),
error_count=len(errors)
)
async def execute_with_progress(
self,
prompts: list[str],
config: LLMConfig = None,
progress_callback: callable = None
) -> ConcurrentResult:
"""Execute with progress tracking."""
start = time.time()
results = []
errors = []
completed = 0
async def execute_with_tracking(prompt: str, index: int):
nonlocal completed
result, error = await self._execute_one(prompt, config)
completed += 1
if progress_callback:
await progress_callback(completed, len(prompts), result, error)
return result, error
tasks = [
execute_with_tracking(prompt, i)
for i, prompt in enumerate(prompts)
]
task_results = await asyncio.gather(*tasks)
total_time = (time.time() - start) * 1000
successful = [r for r, e in task_results if r is not None]
avg_latency = (
sum(r.latency_ms for r in successful) / len(successful)
if successful else 0
)
return ConcurrentResult(
results=successful,
total_time_ms=total_time,
avg_latency_ms=avg_latency,
success_count=len(successful),
error_count=len(task_results) - len(successful)
)
class MapReduceExecutor:
"""Map-reduce pattern for LLM processing."""
def __init__(
self,
client: AsyncLLMClient,
max_concurrency: int = 10
):
self.client = client
self.executor = ConcurrentExecutor(client, max_concurrency)
async def map_reduce(
self,
items: list[str],
map_prompt_template: str,
reduce_prompt_template: str,
config: LLMConfig = None
) -> LLMResponse:
"""Map items through LLM, then reduce results."""
# Map phase
map_prompts = [
map_prompt_template.format(item=item)
for item in items
]
map_results = await self.executor.execute_all(map_prompts, config)
# Reduce phase
mapped_outputs = "\n\n".join(
f"Item {i+1}:\n{r.content}"
for i, r in enumerate(map_results.results)
)
reduce_prompt = reduce_prompt_template.format(
mapped_outputs=mapped_outputs
)
return await self.client.complete(reduce_prompt, config)
async def parallel_chains(
self,
input_data: str,
chain_prompts: list[list[str]],
config: LLMConfig = None
) -> list[LLMResponse]:
"""Execute multiple prompt chains in parallel."""
async def execute_chain(prompts: list[str]) -> LLMResponse:
"""Execute a single chain sequentially."""
context = input_data
result = None
for prompt_template in prompts:
prompt = prompt_template.format(context=context)
result = await self.client.complete(prompt, config)
context = result.content
return result
tasks = [execute_chain(chain) for chain in chain_prompts]
return await asyncio.gather(*tasks)
Batching and Queuing
import asyncio
from dataclasses import dataclass
from typing import Any, Optional
from collections import deque
@dataclass
class BatchRequest:
"""Request in batch queue."""
prompt: str
config: LLMConfig
future: asyncio.Future
created_at: float = field(default_factory=time.time)
class BatchProcessor:
"""Process requests in batches."""
def __init__(
self,
client: AsyncLLMClient,
batch_size: int = 10,
batch_timeout: float = 0.1,
max_queue_size: int = 1000
):
self.client = client
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.max_queue_size = max_queue_size
self.queue: deque[BatchRequest] = deque()
self._running = False
self._processor_task: Optional[asyncio.Task] = None
async def start(self):
"""Start batch processor."""
self._running = True
self._processor_task = asyncio.create_task(self._process_loop())
async def stop(self):
"""Stop batch processor."""
self._running = False
if self._processor_task:
self._processor_task.cancel()
try:
await self._processor_task
except asyncio.CancelledError:
pass
async def submit(
self,
prompt: str,
config: LLMConfig = None
) -> LLMResponse:
"""Submit request to batch queue."""
if len(self.queue) >= self.max_queue_size:
raise RuntimeError("Queue full")
future = asyncio.get_event_loop().create_future()
request = BatchRequest(
prompt=prompt,
config=config or LLMConfig(),
future=future
)
self.queue.append(request)
return await future
async def _process_loop(self):
"""Main processing loop."""
while self._running:
batch = await self._collect_batch()
if batch:
await self._process_batch(batch)
else:
await asyncio.sleep(0.01)
async def _collect_batch(self) -> list[BatchRequest]:
"""Collect batch of requests."""
batch = []
deadline = time.time() + self.batch_timeout
while len(batch) < self.batch_size:
if self.queue:
batch.append(self.queue.popleft())
elif batch:
# Have some items, check timeout
if time.time() >= deadline:
break
await asyncio.sleep(0.01)
else:
# No items, wait a bit
await asyncio.sleep(0.01)
break
return batch
async def _process_batch(self, batch: list[BatchRequest]):
"""Process batch of requests."""
# Execute all requests concurrently
tasks = [
self.client.complete(req.prompt, req.config)
for req in batch
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Resolve futures
for request, result in zip(batch, results):
if isinstance(result, Exception):
request.future.set_exception(result)
else:
request.future.set_result(result)
class PriorityQueue:
"""Priority queue for LLM requests."""
def __init__(
self,
client: AsyncLLMClient,
max_concurrency: int = 10
):
self.client = client
self.semaphore = asyncio.Semaphore(max_concurrency)
# Priority queues (0 = highest)
self.queues: dict[int, deque] = {
0: deque(), # Critical
1: deque(), # High
2: deque(), # Normal
3: deque() # Low
}
self._running = False
async def submit(
self,
prompt: str,
priority: int = 2,
config: LLMConfig = None
) -> LLMResponse:
"""Submit request with priority."""
future = asyncio.get_event_loop().create_future()
request = BatchRequest(
prompt=prompt,
config=config or LLMConfig(),
future=future
)
self.queues[priority].append(request)
return await future
async def start(self):
"""Start processing."""
self._running = True
asyncio.create_task(self._process_loop())
async def _process_loop(self):
"""Process requests by priority."""
while self._running:
request = self._get_next_request()
if request:
asyncio.create_task(self._process_request(request))
else:
await asyncio.sleep(0.01)
def _get_next_request(self) -> Optional[BatchRequest]:
"""Get highest priority request."""
for priority in sorted(self.queues.keys()):
if self.queues[priority]:
return self.queues[priority].popleft()
return None
async def _process_request(self, request: BatchRequest):
"""Process single request."""
async with self.semaphore:
try:
result = await self.client.complete(
request.prompt,
request.config
)
request.future.set_result(result)
except Exception as e:
request.future.set_exception(e)
Retry and Error Handling
import asyncio
import random
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class RetryStrategy(Enum):
"""Retry strategies."""
EXPONENTIAL = "exponential"
LINEAR = "linear"
CONSTANT = "constant"
@dataclass
class RetryConfig:
"""Retry configuration."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
jitter: bool = True
retryable_errors: tuple = (
aiohttp.ClientError,
asyncio.TimeoutError
)
class RetryableClient:
"""LLM client with retry logic."""
def __init__(
self,
client: AsyncLLMClient,
config: RetryConfig = None
):
self.client = client
self.config = config or RetryConfig()
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay for attempt."""
if self.config.strategy == RetryStrategy.EXPONENTIAL:
delay = self.config.base_delay * (2 ** attempt)
elif self.config.strategy == RetryStrategy.LINEAR:
delay = self.config.base_delay * (attempt + 1)
else:
delay = self.config.base_delay
delay = min(delay, self.config.max_delay)
if self.config.jitter:
delay = delay * (0.5 + random.random())
return delay
async def complete(
self,
prompt: str,
config: LLMConfig = None
) -> LLMResponse:
"""Complete with retries."""
last_error = None
for attempt in range(self.config.max_retries + 1):
try:
return await self.client.complete(prompt, config)
except self.config.retryable_errors as e:
last_error = e
if attempt < self.config.max_retries:
delay = self._calculate_delay(attempt)
await asyncio.sleep(delay)
except Exception as e:
# Non-retryable error
raise
raise last_error
class CircuitBreaker:
"""Circuit breaker for LLM calls."""
def __init__(
self,
client: AsyncLLMClient,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_requests: int = 3
):
self.client = client
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.failures = 0
self.last_failure_time = 0
self.state = "closed" # closed, open, half_open
self._lock = asyncio.Lock()
async def complete(
self,
prompt: str,
config: LLMConfig = None
) -> LLMResponse:
"""Complete with circuit breaker."""
async with self._lock:
await self._check_state()
if self.state == "open":
raise RuntimeError("Circuit breaker is open")
try:
result = await self.client.complete(prompt, config)
async with self._lock:
self._on_success()
return result
except Exception as e:
async with self._lock:
self._on_failure()
raise
async def _check_state(self):
"""Check and update circuit state."""
if self.state == "open":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "half_open"
self.failures = 0
def _on_success(self):
"""Handle successful call."""
if self.state == "half_open":
self.failures = 0
self.state = "closed"
def _on_failure(self):
"""Handle failed call."""
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
class FallbackClient:
"""Client with fallback providers."""
def __init__(self, clients: list[AsyncLLMClient]):
self.clients = clients
async def complete(
self,
prompt: str,
config: LLMConfig = None
) -> LLMResponse:
"""Try clients in order until one succeeds."""
errors = []
for client in self.clients:
try:
return await client.complete(prompt, config)
except Exception as e:
errors.append(e)
raise RuntimeError(f"All providers failed: {errors}")
Streaming Patterns
import asyncio
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
class StreamingAggregator:
"""Aggregate multiple streams."""
def __init__(self, client: AsyncLLMClient):
self.client = client
async def stream_first(
self,
prompts: list[str],
config: LLMConfig = None
) -> AsyncIterator[tuple[int, str]]:
"""Stream from first responding prompt."""
queues = [asyncio.Queue() for _ in prompts]
done_event = asyncio.Event()
async def stream_to_queue(index: int, prompt: str):
try:
async for chunk in self.client.stream(prompt, config):
if done_event.is_set():
return
await queues[index].put((index, chunk))
await queues[index].put((index, None)) # Signal done
except Exception as e:
await queues[index].put((index, e))
# Start all streams
tasks = [
asyncio.create_task(stream_to_queue(i, p))
for i, p in enumerate(prompts)
]
# Yield from first responding stream
first_index = None
while True:
# Check all queues
for i, queue in enumerate(queues):
if not queue.empty():
index, item = await queue.get()
if first_index is None:
first_index = index
if index == first_index:
if item is None:
done_event.set()
return
if isinstance(item, Exception):
done_event.set()
raise item
yield index, item
await asyncio.sleep(0.01)
async def stream_merge(
self,
prompts: list[str],
config: LLMConfig = None
) -> AsyncIterator[tuple[int, str]]:
"""Merge all streams interleaved."""
queue = asyncio.Queue()
active_streams = len(prompts)
async def stream_to_queue(index: int, prompt: str):
nonlocal active_streams
try:
async for chunk in self.client.stream(prompt, config):
await queue.put((index, chunk))
except Exception as e:
await queue.put((index, e))
finally:
active_streams -= 1
if active_streams == 0:
await queue.put(None) # Signal all done
# Start all streams
for i, prompt in enumerate(prompts):
asyncio.create_task(stream_to_queue(i, prompt))
# Yield merged results
while True:
item = await queue.get()
if item is None:
return
index, chunk = item
if isinstance(chunk, Exception):
continue # Skip errors, continue with others
yield index, chunk
class StreamBuffer:
"""Buffer streaming output."""
def __init__(
self,
client: AsyncLLMClient,
buffer_size: int = 10
):
self.client = client
self.buffer_size = buffer_size
async def stream_buffered(
self,
prompt: str,
config: LLMConfig = None
) -> AsyncIterator[str]:
"""Stream with buffering."""
buffer = []
async for chunk in self.client.stream(prompt, config):
buffer.append(chunk)
if len(buffer) >= self.buffer_size:
yield "".join(buffer)
buffer = []
if buffer:
yield "".join(buffer)
async def stream_sentences(
self,
prompt: str,
config: LLMConfig = None
) -> AsyncIterator[str]:
"""Stream complete sentences."""
import re
buffer = ""
sentence_pattern = re.compile(r'[.!?]+\s*')
async for chunk in self.client.stream(prompt, config):
buffer += chunk
# Find complete sentences
while True:
match = sentence_pattern.search(buffer)
if match:
sentence = buffer[:match.end()]
buffer = buffer[match.end():]
yield sentence
else:
break
# Yield remaining
if buffer.strip():
yield buffer
Production Async Service
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import json
app = FastAPI()
class CompletionRequest(BaseModel):
prompt: str
model: str = "gpt-4"
temperature: float = 0.7
max_tokens: int = 1000
stream: bool = False
priority: int = 2
class BatchRequest(BaseModel):
prompts: list[str]
model: str = "gpt-4"
max_concurrency: int = 10
# Initialize clients
openai_client = AsyncOpenAIClient(api_key="your-key")
anthropic_client = AsyncAnthropicClient(api_key="your-key")
# Fallback client
fallback_client = FallbackClient([openai_client, anthropic_client])
# Retryable client
retry_client = RetryableClient(fallback_client)
# Circuit breaker
circuit_breaker = CircuitBreaker(retry_client)
# Batch processor
batch_processor = BatchProcessor(circuit_breaker)
# Concurrent executor
executor = ConcurrentExecutor(circuit_breaker)
@app.on_event("startup")
async def startup():
await batch_processor.start()
@app.on_event("shutdown")
async def shutdown():
await batch_processor.stop()
await openai_client.close()
await anthropic_client.close()
@app.post("/v1/completions")
async def create_completion(request: CompletionRequest):
"""Create completion."""
config = LLMConfig(
model=request.model,
temperature=request.temperature,
max_tokens=request.max_tokens
)
if request.stream:
async def generate():
async for chunk in circuit_breaker.client.stream(
request.prompt,
config
):
yield f"data: {json.dumps({'content': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
result = await circuit_breaker.complete(request.prompt, config)
return {
"content": result.content,
"model": result.model,
"usage": result.usage,
"latency_ms": result.latency_ms
}
@app.post("/v1/completions/batch")
async def create_batch_completion(request: BatchRequest):
"""Create batch completions."""
config = LLMConfig(model=request.model)
local_executor = ConcurrentExecutor(
circuit_breaker,
max_concurrency=request.max_concurrency
)
result = await local_executor.execute_all(request.prompts, config)
return {
"results": [
{
"content": r.content,
"latency_ms": r.latency_ms
}
for r in result.results
],
"total_time_ms": result.total_time_ms,
"avg_latency_ms": result.avg_latency_ms,
"success_count": result.success_count,
"error_count": result.error_count
}
@app.get("/v1/health")
async def health():
"""Health check."""
return {
"status": "healthy",
"circuit_breaker_state": circuit_breaker.state,
"queue_size": len(batch_processor.queue)
}
@app.get("/v1/metrics")
async def metrics():
"""Get metrics."""
return {
"circuit_breaker": {
"state": circuit_breaker.state,
"failures": circuit_breaker.failures
},
"batch_processor": {
"queue_size": len(batch_processor.queue)
}
}
References
- Python asyncio: https://docs.python.org/3/library/asyncio.html
- aiohttp: https://docs.aiohttp.org/
- OpenAI Async: https://platform.openai.com/docs/api-reference
- Anthropic SDK: https://docs.anthropic.com/en/api/client-sdks
Conclusion
Async patterns are essential for building responsive, high-throughput LLM applications. Start with basic async clients using aiohttp or httpx—the performance gains from overlapping I/O are immediate. Use semaphores to control concurrency and prevent overwhelming API rate limits. Implement retry logic with exponential backoff for transient failures, and circuit breakers to fail fast when providers are down. For high-volume workloads, batch requests to amortize overhead and use priority queues to ensure critical requests get processed first. Streaming responses improve perceived latency—users see output immediately rather than waiting for complete responses. The fallback pattern across multiple providers improves reliability, though watch for subtle differences in model behavior. Monitor queue depths, latency percentiles, and error rates to tune concurrency limits. The key insight is that async isn’t just about performance—it’s about building resilient systems that handle failures gracefully and scale with demand. These patterns form the foundation for production LLM services that can handle thousands of concurrent users while maintaining responsiveness.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.