Async LLM Patterns: Building High-Throughput AI Applications

Introduction: LLM APIs are inherently slow—even fast models take hundreds of milliseconds per request. When you need to process multiple prompts, make parallel API calls, or handle high-throughput workloads, synchronous code becomes a bottleneck. Async patterns let you overlap I/O wait times, dramatically improving throughput without adding complexity. This guide covers practical async patterns for LLM applications: concurrent request handling, batching strategies, streaming with async generators, retry logic with exponential backoff, and production-ready patterns for building responsive AI applications. Whether you’re building a chatbot handling multiple users, a batch processing pipeline, or a real-time agent, these patterns will help you maximize throughput while keeping your code maintainable.

Async LLM Processing
Async LLM: Request Queue, Batch Processor, Parallel Execution

Basic Async LLM Client

import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator
from abc import ABC, abstractmethod
import time

@dataclass
class LLMResponse:
    """LLM response."""
    
    content: str
    model: str
    usage: dict = field(default_factory=dict)
    latency_ms: float = 0
    
@dataclass
class LLMConfig:
    """LLM configuration."""
    
    model: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 1000
    timeout: float = 60.0

class AsyncLLMClient(ABC):
    """Abstract async LLM client."""
    
    @abstractmethod
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Generate completion."""
        pass
    
    @abstractmethod
    async def stream(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream completion."""
        pass

class AsyncOpenAIClient(AsyncLLMClient):
    """Async OpenAI client."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1"
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create session."""
        
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                }
            )
        return self._session
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Generate completion."""
        
        config = config or LLMConfig()
        session = await self._get_session()
        
        start = time.time()
        
        async with session.post(
            f"{self.base_url}/chat/completions",
            json={
                "model": config.model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": config.temperature,
                "max_tokens": config.max_tokens
            },
            timeout=aiohttp.ClientTimeout(total=config.timeout)
        ) as response:
            data = await response.json()
            
            latency = (time.time() - start) * 1000
            
            return LLMResponse(
                content=data["choices"][0]["message"]["content"],
                model=config.model,
                usage=data.get("usage", {}),
                latency_ms=latency
            )
    
    async def stream(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream completion."""
        
        config = config or LLMConfig()
        session = await self._get_session()
        
        async with session.post(
            f"{self.base_url}/chat/completions",
            json={
                "model": config.model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": config.temperature,
                "max_tokens": config.max_tokens,
                "stream": True
            },
            timeout=aiohttp.ClientTimeout(total=config.timeout)
        ) as response:
            async for line in response.content:
                line = line.decode().strip()
                
                if line.startswith("data: "):
                    data = line[6:]
                    
                    if data == "[DONE]":
                        break
                    
                    import json
                    chunk = json.loads(data)
                    
                    if chunk["choices"][0].get("delta", {}).get("content"):
                        yield chunk["choices"][0]["delta"]["content"]
    
    async def close(self):
        """Close session."""
        
        if self._session and not self._session.closed:
            await self._session.close()

class AsyncAnthropicClient(AsyncLLMClient):
    """Async Anthropic client."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.anthropic.com/v1"
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create session."""
        
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                headers={
                    "x-api-key": self.api_key,
                    "Content-Type": "application/json",
                    "anthropic-version": "2023-06-01"
                }
            )
        return self._session
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Generate completion."""
        
        config = config or LLMConfig()
        session = await self._get_session()
        
        # Map model names
        model = config.model
        if model.startswith("gpt"):
            model = "claude-3-sonnet-20240229"
        
        start = time.time()
        
        async with session.post(
            f"{self.base_url}/messages",
            json={
                "model": model,
                "max_tokens": config.max_tokens,
                "messages": [{"role": "user", "content": prompt}]
            },
            timeout=aiohttp.ClientTimeout(total=config.timeout)
        ) as response:
            data = await response.json()
            
            latency = (time.time() - start) * 1000
            
            return LLMResponse(
                content=data["content"][0]["text"],
                model=model,
                usage=data.get("usage", {}),
                latency_ms=latency
            )
    
    async def stream(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream completion."""
        
        config = config or LLMConfig()
        session = await self._get_session()
        
        model = config.model
        if model.startswith("gpt"):
            model = "claude-3-sonnet-20240229"
        
        async with session.post(
            f"{self.base_url}/messages",
            json={
                "model": model,
                "max_tokens": config.max_tokens,
                "messages": [{"role": "user", "content": prompt}],
                "stream": True
            },
            timeout=aiohttp.ClientTimeout(total=config.timeout)
        ) as response:
            async for line in response.content:
                line = line.decode().strip()
                
                if line.startswith("data: "):
                    import json
                    data = json.loads(line[6:])
                    
                    if data["type"] == "content_block_delta":
                        yield data["delta"]["text"]
    
    async def close(self):
        """Close session."""
        
        if self._session and not self._session.closed:
            await self._session.close()

Concurrent Request Patterns

import asyncio
from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class ConcurrentResult:
    """Result from concurrent execution."""
    
    results: list[LLMResponse]
    total_time_ms: float
    avg_latency_ms: float
    success_count: int
    error_count: int

class ConcurrentExecutor:
    """Execute multiple LLM requests concurrently."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        max_concurrency: int = 10
    ):
        self.client = client
        self.semaphore = asyncio.Semaphore(max_concurrency)
    
    async def _execute_one(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> tuple[Optional[LLMResponse], Optional[Exception]]:
        """Execute single request with semaphore."""
        
        async with self.semaphore:
            try:
                result = await self.client.complete(prompt, config)
                return result, None
            except Exception as e:
                return None, e
    
    async def execute_all(
        self,
        prompts: list[str],
        config: LLMConfig = None
    ) -> ConcurrentResult:
        """Execute all prompts concurrently."""
        
        start = time.time()
        
        tasks = [
            self._execute_one(prompt, config)
            for prompt in prompts
        ]
        
        results = await asyncio.gather(*tasks)
        
        total_time = (time.time() - start) * 1000
        
        successful = [r for r, e in results if r is not None]
        errors = [e for r, e in results if e is not None]
        
        avg_latency = (
            sum(r.latency_ms for r in successful) / len(successful)
            if successful else 0
        )
        
        return ConcurrentResult(
            results=successful,
            total_time_ms=total_time,
            avg_latency_ms=avg_latency,
            success_count=len(successful),
            error_count=len(errors)
        )
    
    async def execute_with_progress(
        self,
        prompts: list[str],
        config: LLMConfig = None,
        progress_callback: callable = None
    ) -> ConcurrentResult:
        """Execute with progress tracking."""
        
        start = time.time()
        results = []
        errors = []
        completed = 0
        
        async def execute_with_tracking(prompt: str, index: int):
            nonlocal completed
            
            result, error = await self._execute_one(prompt, config)
            
            completed += 1
            
            if progress_callback:
                await progress_callback(completed, len(prompts), result, error)
            
            return result, error
        
        tasks = [
            execute_with_tracking(prompt, i)
            for i, prompt in enumerate(prompts)
        ]
        
        task_results = await asyncio.gather(*tasks)
        
        total_time = (time.time() - start) * 1000
        
        successful = [r for r, e in task_results if r is not None]
        
        avg_latency = (
            sum(r.latency_ms for r in successful) / len(successful)
            if successful else 0
        )
        
        return ConcurrentResult(
            results=successful,
            total_time_ms=total_time,
            avg_latency_ms=avg_latency,
            success_count=len(successful),
            error_count=len(task_results) - len(successful)
        )

class MapReduceExecutor:
    """Map-reduce pattern for LLM processing."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        max_concurrency: int = 10
    ):
        self.client = client
        self.executor = ConcurrentExecutor(client, max_concurrency)
    
    async def map_reduce(
        self,
        items: list[str],
        map_prompt_template: str,
        reduce_prompt_template: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Map items through LLM, then reduce results."""
        
        # Map phase
        map_prompts = [
            map_prompt_template.format(item=item)
            for item in items
        ]
        
        map_results = await self.executor.execute_all(map_prompts, config)
        
        # Reduce phase
        mapped_outputs = "\n\n".join(
            f"Item {i+1}:\n{r.content}"
            for i, r in enumerate(map_results.results)
        )
        
        reduce_prompt = reduce_prompt_template.format(
            mapped_outputs=mapped_outputs
        )
        
        return await self.client.complete(reduce_prompt, config)
    
    async def parallel_chains(
        self,
        input_data: str,
        chain_prompts: list[list[str]],
        config: LLMConfig = None
    ) -> list[LLMResponse]:
        """Execute multiple prompt chains in parallel."""
        
        async def execute_chain(prompts: list[str]) -> LLMResponse:
            """Execute a single chain sequentially."""
            
            context = input_data
            result = None
            
            for prompt_template in prompts:
                prompt = prompt_template.format(context=context)
                result = await self.client.complete(prompt, config)
                context = result.content
            
            return result
        
        tasks = [execute_chain(chain) for chain in chain_prompts]
        return await asyncio.gather(*tasks)

Batching and Queuing

import asyncio
from dataclasses import dataclass
from typing import Any, Optional
from collections import deque

@dataclass
class BatchRequest:
    """Request in batch queue."""
    
    prompt: str
    config: LLMConfig
    future: asyncio.Future
    created_at: float = field(default_factory=time.time)

class BatchProcessor:
    """Process requests in batches."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        batch_size: int = 10,
        batch_timeout: float = 0.1,
        max_queue_size: int = 1000
    ):
        self.client = client
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.max_queue_size = max_queue_size
        
        self.queue: deque[BatchRequest] = deque()
        self._running = False
        self._processor_task: Optional[asyncio.Task] = None
    
    async def start(self):
        """Start batch processor."""
        
        self._running = True
        self._processor_task = asyncio.create_task(self._process_loop())
    
    async def stop(self):
        """Stop batch processor."""
        
        self._running = False
        
        if self._processor_task:
            self._processor_task.cancel()
            try:
                await self._processor_task
            except asyncio.CancelledError:
                pass
    
    async def submit(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Submit request to batch queue."""
        
        if len(self.queue) >= self.max_queue_size:
            raise RuntimeError("Queue full")
        
        future = asyncio.get_event_loop().create_future()
        
        request = BatchRequest(
            prompt=prompt,
            config=config or LLMConfig(),
            future=future
        )
        
        self.queue.append(request)
        
        return await future
    
    async def _process_loop(self):
        """Main processing loop."""
        
        while self._running:
            batch = await self._collect_batch()
            
            if batch:
                await self._process_batch(batch)
            else:
                await asyncio.sleep(0.01)
    
    async def _collect_batch(self) -> list[BatchRequest]:
        """Collect batch of requests."""
        
        batch = []
        deadline = time.time() + self.batch_timeout
        
        while len(batch) < self.batch_size:
            if self.queue:
                batch.append(self.queue.popleft())
            elif batch:
                # Have some items, check timeout
                if time.time() >= deadline:
                    break
                await asyncio.sleep(0.01)
            else:
                # No items, wait a bit
                await asyncio.sleep(0.01)
                break
        
        return batch
    
    async def _process_batch(self, batch: list[BatchRequest]):
        """Process batch of requests."""
        
        # Execute all requests concurrently
        tasks = [
            self.client.complete(req.prompt, req.config)
            for req in batch
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Resolve futures
        for request, result in zip(batch, results):
            if isinstance(result, Exception):
                request.future.set_exception(result)
            else:
                request.future.set_result(result)

class PriorityQueue:
    """Priority queue for LLM requests."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        max_concurrency: int = 10
    ):
        self.client = client
        self.semaphore = asyncio.Semaphore(max_concurrency)
        
        # Priority queues (0 = highest)
        self.queues: dict[int, deque] = {
            0: deque(),  # Critical
            1: deque(),  # High
            2: deque(),  # Normal
            3: deque()   # Low
        }
        
        self._running = False
    
    async def submit(
        self,
        prompt: str,
        priority: int = 2,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Submit request with priority."""
        
        future = asyncio.get_event_loop().create_future()
        
        request = BatchRequest(
            prompt=prompt,
            config=config or LLMConfig(),
            future=future
        )
        
        self.queues[priority].append(request)
        
        return await future
    
    async def start(self):
        """Start processing."""
        
        self._running = True
        asyncio.create_task(self._process_loop())
    
    async def _process_loop(self):
        """Process requests by priority."""
        
        while self._running:
            request = self._get_next_request()
            
            if request:
                asyncio.create_task(self._process_request(request))
            else:
                await asyncio.sleep(0.01)
    
    def _get_next_request(self) -> Optional[BatchRequest]:
        """Get highest priority request."""
        
        for priority in sorted(self.queues.keys()):
            if self.queues[priority]:
                return self.queues[priority].popleft()
        return None
    
    async def _process_request(self, request: BatchRequest):
        """Process single request."""
        
        async with self.semaphore:
            try:
                result = await self.client.complete(
                    request.prompt,
                    request.config
                )
                request.future.set_result(result)
            except Exception as e:
                request.future.set_exception(e)

Retry and Error Handling

import asyncio
import random
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class RetryStrategy(Enum):
    """Retry strategies."""
    
    EXPONENTIAL = "exponential"
    LINEAR = "linear"
    CONSTANT = "constant"

@dataclass
class RetryConfig:
    """Retry configuration."""
    
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
    jitter: bool = True
    retryable_errors: tuple = (
        aiohttp.ClientError,
        asyncio.TimeoutError
    )

class RetryableClient:
    """LLM client with retry logic."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        config: RetryConfig = None
    ):
        self.client = client
        self.config = config or RetryConfig()
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay for attempt."""
        
        if self.config.strategy == RetryStrategy.EXPONENTIAL:
            delay = self.config.base_delay * (2 ** attempt)
        elif self.config.strategy == RetryStrategy.LINEAR:
            delay = self.config.base_delay * (attempt + 1)
        else:
            delay = self.config.base_delay
        
        delay = min(delay, self.config.max_delay)
        
        if self.config.jitter:
            delay = delay * (0.5 + random.random())
        
        return delay
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Complete with retries."""
        
        last_error = None
        
        for attempt in range(self.config.max_retries + 1):
            try:
                return await self.client.complete(prompt, config)
            
            except self.config.retryable_errors as e:
                last_error = e
                
                if attempt < self.config.max_retries:
                    delay = self._calculate_delay(attempt)
                    await asyncio.sleep(delay)
            
            except Exception as e:
                # Non-retryable error
                raise
        
        raise last_error

class CircuitBreaker:
    """Circuit breaker for LLM calls."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_requests: int = 3
    ):
        self.client = client
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests
        
        self.failures = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed, open, half_open
        self._lock = asyncio.Lock()
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Complete with circuit breaker."""
        
        async with self._lock:
            await self._check_state()
        
        if self.state == "open":
            raise RuntimeError("Circuit breaker is open")
        
        try:
            result = await self.client.complete(prompt, config)
            
            async with self._lock:
                self._on_success()
            
            return result
        
        except Exception as e:
            async with self._lock:
                self._on_failure()
            raise
    
    async def _check_state(self):
        """Check and update circuit state."""
        
        if self.state == "open":
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = "half_open"
                self.failures = 0
    
    def _on_success(self):
        """Handle successful call."""
        
        if self.state == "half_open":
            self.failures = 0
            self.state = "closed"
    
    def _on_failure(self):
        """Handle failed call."""
        
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.failures >= self.failure_threshold:
            self.state = "open"

class FallbackClient:
    """Client with fallback providers."""
    
    def __init__(self, clients: list[AsyncLLMClient]):
        self.clients = clients
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Try clients in order until one succeeds."""
        
        errors = []
        
        for client in self.clients:
            try:
                return await client.complete(prompt, config)
            except Exception as e:
                errors.append(e)
        
        raise RuntimeError(f"All providers failed: {errors}")

Streaming Patterns

import asyncio
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator

class StreamingAggregator:
    """Aggregate multiple streams."""
    
    def __init__(self, client: AsyncLLMClient):
        self.client = client
    
    async def stream_first(
        self,
        prompts: list[str],
        config: LLMConfig = None
    ) -> AsyncIterator[tuple[int, str]]:
        """Stream from first responding prompt."""
        
        queues = [asyncio.Queue() for _ in prompts]
        done_event = asyncio.Event()
        
        async def stream_to_queue(index: int, prompt: str):
            try:
                async for chunk in self.client.stream(prompt, config):
                    if done_event.is_set():
                        return
                    await queues[index].put((index, chunk))
                await queues[index].put((index, None))  # Signal done
            except Exception as e:
                await queues[index].put((index, e))
        
        # Start all streams
        tasks = [
            asyncio.create_task(stream_to_queue(i, p))
            for i, p in enumerate(prompts)
        ]
        
        # Yield from first responding stream
        first_index = None
        
        while True:
            # Check all queues
            for i, queue in enumerate(queues):
                if not queue.empty():
                    index, item = await queue.get()
                    
                    if first_index is None:
                        first_index = index
                    
                    if index == first_index:
                        if item is None:
                            done_event.set()
                            return
                        if isinstance(item, Exception):
                            done_event.set()
                            raise item
                        yield index, item
            
            await asyncio.sleep(0.01)
    
    async def stream_merge(
        self,
        prompts: list[str],
        config: LLMConfig = None
    ) -> AsyncIterator[tuple[int, str]]:
        """Merge all streams interleaved."""
        
        queue = asyncio.Queue()
        active_streams = len(prompts)
        
        async def stream_to_queue(index: int, prompt: str):
            nonlocal active_streams
            
            try:
                async for chunk in self.client.stream(prompt, config):
                    await queue.put((index, chunk))
            except Exception as e:
                await queue.put((index, e))
            finally:
                active_streams -= 1
                if active_streams == 0:
                    await queue.put(None)  # Signal all done
        
        # Start all streams
        for i, prompt in enumerate(prompts):
            asyncio.create_task(stream_to_queue(i, prompt))
        
        # Yield merged results
        while True:
            item = await queue.get()
            
            if item is None:
                return
            
            index, chunk = item
            
            if isinstance(chunk, Exception):
                continue  # Skip errors, continue with others
            
            yield index, chunk

class StreamBuffer:
    """Buffer streaming output."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        buffer_size: int = 10
    ):
        self.client = client
        self.buffer_size = buffer_size
    
    async def stream_buffered(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream with buffering."""
        
        buffer = []
        
        async for chunk in self.client.stream(prompt, config):
            buffer.append(chunk)
            
            if len(buffer) >= self.buffer_size:
                yield "".join(buffer)
                buffer = []
        
        if buffer:
            yield "".join(buffer)
    
    async def stream_sentences(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream complete sentences."""
        
        import re
        
        buffer = ""
        sentence_pattern = re.compile(r'[.!?]+\s*')
        
        async for chunk in self.client.stream(prompt, config):
            buffer += chunk
            
            # Find complete sentences
            while True:
                match = sentence_pattern.search(buffer)
                
                if match:
                    sentence = buffer[:match.end()]
                    buffer = buffer[match.end():]
                    yield sentence
                else:
                    break
        
        # Yield remaining
        if buffer.strip():
            yield buffer

Production Async Service

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import json

app = FastAPI()

class CompletionRequest(BaseModel):
    prompt: str
    model: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 1000
    stream: bool = False
    priority: int = 2

class BatchRequest(BaseModel):
    prompts: list[str]
    model: str = "gpt-4"
    max_concurrency: int = 10

# Initialize clients
openai_client = AsyncOpenAIClient(api_key="your-key")
anthropic_client = AsyncAnthropicClient(api_key="your-key")

# Fallback client
fallback_client = FallbackClient([openai_client, anthropic_client])

# Retryable client
retry_client = RetryableClient(fallback_client)

# Circuit breaker
circuit_breaker = CircuitBreaker(retry_client)

# Batch processor
batch_processor = BatchProcessor(circuit_breaker)

# Concurrent executor
executor = ConcurrentExecutor(circuit_breaker)

@app.on_event("startup")
async def startup():
    await batch_processor.start()

@app.on_event("shutdown")
async def shutdown():
    await batch_processor.stop()
    await openai_client.close()
    await anthropic_client.close()

@app.post("/v1/completions")
async def create_completion(request: CompletionRequest):
    """Create completion."""
    
    config = LLMConfig(
        model=request.model,
        temperature=request.temperature,
        max_tokens=request.max_tokens
    )
    
    if request.stream:
        async def generate():
            async for chunk in circuit_breaker.client.stream(
                request.prompt,
                config
            ):
                yield f"data: {json.dumps({'content': chunk})}\n\n"
            yield "data: [DONE]\n\n"
        
        return StreamingResponse(
            generate(),
            media_type="text/event-stream"
        )
    
    result = await circuit_breaker.complete(request.prompt, config)
    
    return {
        "content": result.content,
        "model": result.model,
        "usage": result.usage,
        "latency_ms": result.latency_ms
    }

@app.post("/v1/completions/batch")
async def create_batch_completion(request: BatchRequest):
    """Create batch completions."""
    
    config = LLMConfig(model=request.model)
    
    local_executor = ConcurrentExecutor(
        circuit_breaker,
        max_concurrency=request.max_concurrency
    )
    
    result = await local_executor.execute_all(request.prompts, config)
    
    return {
        "results": [
            {
                "content": r.content,
                "latency_ms": r.latency_ms
            }
            for r in result.results
        ],
        "total_time_ms": result.total_time_ms,
        "avg_latency_ms": result.avg_latency_ms,
        "success_count": result.success_count,
        "error_count": result.error_count
    }

@app.get("/v1/health")
async def health():
    """Health check."""
    
    return {
        "status": "healthy",
        "circuit_breaker_state": circuit_breaker.state,
        "queue_size": len(batch_processor.queue)
    }

@app.get("/v1/metrics")
async def metrics():
    """Get metrics."""
    
    return {
        "circuit_breaker": {
            "state": circuit_breaker.state,
            "failures": circuit_breaker.failures
        },
        "batch_processor": {
            "queue_size": len(batch_processor.queue)
        }
    }

References

Conclusion

Async patterns are essential for building responsive, high-throughput LLM applications. Start with basic async clients using aiohttp or httpx—the performance gains from overlapping I/O are immediate. Use semaphores to control concurrency and prevent overwhelming API rate limits. Implement retry logic with exponential backoff for transient failures, and circuit breakers to fail fast when providers are down. For high-volume workloads, batch requests to amortize overhead and use priority queues to ensure critical requests get processed first. Streaming responses improve perceived latency—users see output immediately rather than waiting for complete responses. The fallback pattern across multiple providers improves reliability, though watch for subtle differences in model behavior. Monitor queue depths, latency percentiles, and error rates to tune concurrency limits. The key insight is that async isn’t just about performance—it’s about building resilient systems that handle failures gracefully and scale with demand. These patterns form the foundation for production LLM services that can handle thousands of concurrent users while maintaining responsiveness.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.