Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Async LLM Patterns: Concurrent Execution, Rate Limiting, and Task Queues for High-Throughput AI Applications

Introduction: LLM API calls are inherently I/O-bound—waiting for network responses dominates execution time. Async programming transforms this bottleneck into an opportunity for massive parallelism. Instead of waiting sequentially for each response, async patterns enable concurrent execution of hundreds of requests while efficiently managing resources. This guide covers practical async patterns for LLM applications: concurrent request handling, semaphore-based rate limiting, streaming with async generators, task queues, and production-ready async architectures that maximize throughput while maintaining reliability.

Async Patterns
Async Processing: Task Queue, Workers, Result Gathering

Basic Async Patterns

import asyncio
from typing import Any, Callable, TypeVar
from dataclasses import dataclass

T = TypeVar('T')

async def simple_concurrent_calls(
    client: Any,
    prompts: list[str],
    model: str = "gpt-4o-mini"
) -> list[str]:
    """Execute multiple LLM calls concurrently."""
    
    async def call_llm(prompt: str) -> str:
        response = await client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    # Execute all calls concurrently
    tasks = [call_llm(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    
    return results

async def concurrent_with_semaphore(
    client: Any,
    prompts: list[str],
    max_concurrent: int = 10,
    model: str = "gpt-4o-mini"
) -> list[str]:
    """Limit concurrent calls with semaphore."""
    
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call_with_limit(prompt: str) -> str:
        async with semaphore:
            response = await client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.choices[0].message.content
    
    tasks = [call_with_limit(prompt) for prompt in prompts]
    return await asyncio.gather(*tasks)

async def as_completed_pattern(
    client: Any,
    prompts: list[str],
    model: str = "gpt-4o-mini"
):
    """Process results as they complete."""
    
    async def call_llm(idx: int, prompt: str) -> tuple[int, str]:
        response = await client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        return idx, response.choices[0].message.content
    
    tasks = [
        asyncio.create_task(call_llm(i, prompt))
        for i, prompt in enumerate(prompts)
    ]
    
    # Yield results as they complete
    for coro in asyncio.as_completed(tasks):
        idx, result = await coro
        yield idx, result

@dataclass
class AsyncResult:
    """Result with metadata."""
    
    index: int
    content: str
    latency_ms: float
    success: bool
    error: str = None

async def robust_concurrent_calls(
    client: Any,
    prompts: list[str],
    max_concurrent: int = 10,
    timeout: float = 30.0,
    model: str = "gpt-4o-mini"
) -> list[AsyncResult]:
    """Concurrent calls with error handling and timeouts."""
    
    import time
    
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call_with_handling(idx: int, prompt: str) -> AsyncResult:
        start = time.time()
        
        async with semaphore:
            try:
                response = await asyncio.wait_for(
                    client.chat.completions.create(
                        model=model,
                        messages=[{"role": "user", "content": prompt}]
                    ),
                    timeout=timeout
                )
                
                latency = (time.time() - start) * 1000
                
                return AsyncResult(
                    index=idx,
                    content=response.choices[0].message.content,
                    latency_ms=latency,
                    success=True
                )
            
            except asyncio.TimeoutError:
                latency = (time.time() - start) * 1000
                return AsyncResult(
                    index=idx,
                    content="",
                    latency_ms=latency,
                    success=False,
                    error="Timeout"
                )
            
            except Exception as e:
                latency = (time.time() - start) * 1000
                return AsyncResult(
                    index=idx,
                    content="",
                    latency_ms=latency,
                    success=False,
                    error=str(e)
                )
    
    tasks = [
        call_with_handling(i, prompt)
        for i, prompt in enumerate(prompts)
    ]
    
    return await asyncio.gather(*tasks)

Rate-Limited Async Client

import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any

@dataclass
class RateLimitConfig:
    """Rate limit configuration."""
    
    requests_per_minute: int = 60
    tokens_per_minute: int = 100000

class AsyncTokenBucket:
    """Async token bucket rate limiter."""
    
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate  # tokens per second
        self.last_refill = datetime.now()
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1):
        """Acquire tokens, waiting if necessary."""
        
        async with self._lock:
            await self._refill()
            
            while self.tokens < tokens:
                # Calculate wait time
                needed = tokens - self.tokens
                wait_time = needed / self.refill_rate
                
                await asyncio.sleep(wait_time)
                await self._refill()
            
            self.tokens -= tokens
    
    async def _refill(self):
        """Refill tokens based on elapsed time."""
        
        now = datetime.now()
        elapsed = (now - self.last_refill).total_seconds()
        
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.refill_rate
        )
        self.last_refill = now

class RateLimitedAsyncClient:
    """Async LLM client with rate limiting."""
    
    def __init__(
        self,
        client: Any,
        config: RateLimitConfig = None
    ):
        self.client = client
        self.config = config or RateLimitConfig()
        
        # Request rate limiter
        self.request_limiter = AsyncTokenBucket(
            capacity=self.config.requests_per_minute,
            refill_rate=self.config.requests_per_minute / 60.0
        )
        
        # Token rate limiter
        self.token_limiter = AsyncTokenBucket(
            capacity=self.config.tokens_per_minute,
            refill_rate=self.config.tokens_per_minute / 60.0
        )
    
    async def chat_completion(
        self,
        messages: list[dict],
        model: str = "gpt-4o-mini",
        max_tokens: int = 1000,
        **kwargs
    ) -> Any:
        """Create chat completion with rate limiting."""
        
        # Estimate input tokens (rough approximation)
        input_tokens = sum(len(m.get("content", "")) // 4 for m in messages)
        total_tokens = input_tokens + max_tokens
        
        # Acquire rate limit tokens
        await self.request_limiter.acquire(1)
        await self.token_limiter.acquire(total_tokens)
        
        # Make request
        response = await self.client.chat.completions.create(
            model=model,
            messages=messages,
            max_tokens=max_tokens,
            **kwargs
        )
        
        return response

class AdaptiveRateLimiter:
    """Adapt rate based on API responses."""
    
    def __init__(self, initial_rpm: int = 60):
        self.current_rpm = initial_rpm
        self.min_rpm = 10
        self.max_rpm = 500
        self._lock = asyncio.Lock()
        self.consecutive_successes = 0
    
    async def record_success(self):
        """Record successful request."""
        
        async with self._lock:
            self.consecutive_successes += 1
            
            if self.consecutive_successes >= 10:
                self.current_rpm = min(
                    self.max_rpm,
                    int(self.current_rpm * 1.1)
                )
                self.consecutive_successes = 0
    
    async def record_rate_limit(self):
        """Record rate limit error."""
        
        async with self._lock:
            self.current_rpm = max(
                self.min_rpm,
                int(self.current_rpm * 0.5)
            )
            self.consecutive_successes = 0
    
    @property
    def request_interval(self) -> float:
        return 60.0 / self.current_rpm

Async Streaming

import asyncio
from typing import Any, AsyncIterator

async def stream_completion(
    client: Any,
    messages: list[dict],
    model: str = "gpt-4o-mini"
) -> AsyncIterator[str]:
    """Stream completion tokens."""
    
    stream = await client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

async def parallel_streams(
    client: Any,
    prompts: list[str],
    model: str = "gpt-4o-mini"
) -> AsyncIterator[tuple[int, str]]:
    """Stream multiple completions in parallel."""
    
    async def stream_one(idx: int, prompt: str):
        messages = [{"role": "user", "content": prompt}]
        
        async for token in stream_completion(client, messages, model):
            yield idx, token
    
    # Create all streams
    streams = [stream_one(i, p) for i, p in enumerate(prompts)]
    
    # Merge streams
    async def merge_streams():
        tasks = {
            asyncio.create_task(stream.__anext__()): stream
            for stream in streams
        }
        
        while tasks:
            done, _ = await asyncio.wait(
                tasks.keys(),
                return_when=asyncio.FIRST_COMPLETED
            )
            
            for task in done:
                stream = tasks.pop(task)
                
                try:
                    result = task.result()
                    yield result
                    
                    # Schedule next item from this stream
                    tasks[asyncio.create_task(stream.__anext__())] = stream
                
                except StopAsyncIteration:
                    pass
    
    async for item in merge_streams():
        yield item

class AsyncStreamBuffer:
    """Buffer for async stream processing."""
    
    def __init__(self, max_size: int = 100):
        self.queue = asyncio.Queue(maxsize=max_size)
        self.done = False
    
    async def put(self, item: Any):
        """Add item to buffer."""
        await self.queue.put(item)
    
    async def get(self) -> Any:
        """Get item from buffer."""
        return await self.queue.get()
    
    def mark_done(self):
        """Mark stream as complete."""
        self.done = True
    
    async def __aiter__(self):
        """Iterate over buffered items."""
        while not self.done or not self.queue.empty():
            try:
                item = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=0.1
                )
                yield item
            except asyncio.TimeoutError:
                if self.done:
                    break

async def buffered_stream(
    client: Any,
    messages: list[dict],
    buffer_size: int = 100,
    model: str = "gpt-4o-mini"
) -> AsyncIterator[str]:
    """Stream with buffering for backpressure handling."""
    
    buffer = AsyncStreamBuffer(max_size=buffer_size)
    
    async def producer():
        try:
            async for token in stream_completion(client, messages, model):
                await buffer.put(token)
        finally:
            buffer.mark_done()
    
    # Start producer
    producer_task = asyncio.create_task(producer())
    
    # Consume from buffer
    async for token in buffer:
        yield token
    
    await producer_task

Async Task Queue

import asyncio
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from datetime import datetime
import uuid

@dataclass
class Task:
    """A task in the queue."""
    
    id: str
    payload: Any
    priority: int = 0
    created_at: datetime = field(default_factory=datetime.now)
    
    def __lt__(self, other):
        return self.priority < other.priority

@dataclass
class TaskResult:
    """Result of task execution."""
    
    task_id: str
    result: Any = None
    error: str = None
    latency_ms: float = 0.0

class AsyncTaskQueue:
    """Async task queue with workers."""
    
    def __init__(
        self,
        processor: Callable[[Any], Any],
        num_workers: int = 5,
        max_queue_size: int = 1000
    ):
        self.processor = processor
        self.num_workers = num_workers
        self.queue = asyncio.PriorityQueue(maxsize=max_queue_size)
        self.results: dict[str, asyncio.Future] = {}
        self._workers = []
        self._running = False
    
    async def start(self):
        """Start worker tasks."""
        
        self._running = True
        self._workers = [
            asyncio.create_task(self._worker(i))
            for i in range(self.num_workers)
        ]
    
    async def stop(self):
        """Stop all workers."""
        
        self._running = False
        
        for worker in self._workers:
            worker.cancel()
        
        await asyncio.gather(*self._workers, return_exceptions=True)
    
    async def submit(
        self,
        payload: Any,
        priority: int = 0
    ) -> asyncio.Future:
        """Submit task to queue."""
        
        task_id = str(uuid.uuid4())
        task = Task(id=task_id, payload=payload, priority=priority)
        
        future = asyncio.Future()
        self.results[task_id] = future
        
        await self.queue.put((priority, task))
        
        return future
    
    async def _worker(self, worker_id: int):
        """Worker coroutine."""
        
        while self._running:
            try:
                _, task = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=1.0
                )
                
                start = datetime.now()
                
                try:
                    if asyncio.iscoroutinefunction(self.processor):
                        result = await self.processor(task.payload)
                    else:
                        result = self.processor(task.payload)
                    
                    latency = (datetime.now() - start).total_seconds() * 1000
                    
                    task_result = TaskResult(
                        task_id=task.id,
                        result=result,
                        latency_ms=latency
                    )
                
                except Exception as e:
                    latency = (datetime.now() - start).total_seconds() * 1000
                    
                    task_result = TaskResult(
                        task_id=task.id,
                        error=str(e),
                        latency_ms=latency
                    )
                
                # Resolve future
                future = self.results.get(task.id)
                if future and not future.done():
                    future.set_result(task_result)
            
            except asyncio.TimeoutError:
                continue
            except asyncio.CancelledError:
                break

class LLMTaskQueue(AsyncTaskQueue):
    """Task queue specialized for LLM calls."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini",
        num_workers: int = 10,
        max_queue_size: int = 1000
    ):
        self.client = client
        self.model = model
        
        super().__init__(
            processor=self._process_llm_task,
            num_workers=num_workers,
            max_queue_size=max_queue_size
        )
    
    async def _process_llm_task(self, payload: dict) -> str:
        """Process LLM task."""
        
        messages = payload.get("messages", [])
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            **payload.get("kwargs", {})
        )
        
        return response.choices[0].message.content
    
    async def submit_prompt(
        self,
        prompt: str,
        priority: int = 0,
        **kwargs
    ) -> asyncio.Future:
        """Submit a prompt for processing."""
        
        payload = {
            "messages": [{"role": "user", "content": prompt}],
            "kwargs": kwargs
        }
        
        return await self.submit(payload, priority)

Production Async Service

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, Any
import asyncio

app = FastAPI()

# Initialize components
rate_limited_client = None  # Initialize with actual client
task_queue = None

class BatchRequest(BaseModel):
    prompts: list[str]
    model: str = "gpt-4o-mini"
    max_concurrent: int = 10

class StreamRequest(BaseModel):
    prompt: str
    model: str = "gpt-4o-mini"

class QueueRequest(BaseModel):
    prompt: str
    priority: int = 0

# In-memory job storage
jobs: dict[str, dict] = {}

@app.post("/v1/batch")
async def batch_completion(request: BatchRequest):
    """Execute batch of completions concurrently."""
    
    results = await robust_concurrent_calls(
        client=rate_limited_client.client,
        prompts=request.prompts,
        max_concurrent=request.max_concurrent,
        model=request.model
    )
    
    return {
        "results": [
            {
                "index": r.index,
                "content": r.content,
                "success": r.success,
                "error": r.error,
                "latency_ms": r.latency_ms
            }
            for r in results
        ],
        "summary": {
            "total": len(results),
            "successful": sum(1 for r in results if r.success),
            "failed": sum(1 for r in results if not r.success)
        }
    }

@app.post("/v1/stream")
async def stream_completion_endpoint(request: StreamRequest):
    """Stream completion response."""
    
    from fastapi.responses import StreamingResponse
    
    async def generate():
        messages = [{"role": "user", "content": request.prompt}]
        
        async for token in stream_completion(
            rate_limited_client.client,
            messages,
            request.model
        ):
            yield f"data: {token}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

@app.post("/v1/queue/submit")
async def submit_to_queue(request: QueueRequest):
    """Submit task to queue."""
    
    future = await task_queue.submit_prompt(
        request.prompt,
        priority=request.priority
    )
    
    # Store job reference
    job_id = str(uuid.uuid4())
    jobs[job_id] = {"future": future, "status": "pending"}
    
    return {"job_id": job_id}

@app.get("/v1/queue/{job_id}")
async def get_queue_result(job_id: str):
    """Get result from queue."""
    
    if job_id not in jobs:
        raise HTTPException(404, "Job not found")
    
    job = jobs[job_id]
    future = job["future"]
    
    if not future.done():
        return {"status": "pending"}
    
    result = future.result()
    
    return {
        "status": "completed",
        "result": result.result,
        "error": result.error,
        "latency_ms": result.latency_ms
    }

@app.get("/v1/stats")
async def get_stats():
    """Get async processing stats."""
    
    return {
        "queue_size": task_queue.queue.qsize() if task_queue else 0,
        "pending_jobs": sum(1 for j in jobs.values() if not j["future"].done()),
        "completed_jobs": sum(1 for j in jobs.values() if j["future"].done())
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Async patterns are essential for high-throughput LLM applications. Use asyncio.gather for simple concurrent execution when you need all results together. Semaphores control concurrency to respect rate limits and prevent overwhelming the API. asyncio.as_completed enables processing results as they arrive—useful for streaming progress to users. Token bucket rate limiters provide smooth, adaptive rate control that maximizes throughput while avoiding 429 errors. Async streaming enables real-time token delivery to users while processing continues. Task queues with worker pools handle variable load and provide backpressure when the system is overwhelmed. Design your async architecture around your specific latency and throughput requirements—the patterns here provide building blocks for production-ready systems.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

  

  

  

This site uses Akismet to reduce spam. Learn how your comment data is processed.