Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Getting Started with Azure CLI 2.0

Older days we used to manage azure resources through AzureRM PowerShell modules . This was very much flexible for any Azure Administrator or Developers to run Automated Deployments to Azure Resource Manager resources.

Azure CLI  is the next improved version with simplified cmdlets to make life easier and it is cross-platform.

You can use Azure CLI in two ways:

  1. Azure Portal – Through Azure Cloud Shell
  2. PowerShell module

Installation Steps:

  • Download Azure CLI designed for Linux/Windows/MacOS based on your OS.
  • Install image and follow the steps.

 

image

image

  • Verify the Installation by executing cmdlet [  az –version  ]
az –-version

image

Running the Azure CLI from PowerShell has some advantages over running the Azure CLI from the Windows command prompt, provides additional tab completion features.

Now let us try logging in to Azure using Azure CLI. There are various ways of logging in, for this article I would try simple web login using az login command.

Execute the following cmdlet to login to Azure:

az login

The Azure CLI will  launch your default browser to open the Azure sign-in page. After a successful sign in, you’ll be connected to your Azure subscription.  If it fails, follow the command-line instructions and enter an authorization code at https://aka.ms/devicelogin.

Create a azure resource group and verify:

az group create –name "thingx-dev" –location "southcentralus" 
az group list --output table

 

Hope that is helpful for you to get started with Azure CLI. To learn more about Azure CLI cmdlets : https://github.com/Azure/azure-cli

NDepend–VSTS/Azure DevOps Integration–Part 01

In my previous article I wrote an introductory about NDepend and how it will be useful for Agile Team to ensure code quality.

In that article we found how we can use NDepend in a developer machine. Now with this article we will familiarize ourselves in using NDepend in your build automation pipeline in your VSTS/Azure DevOps Build Agent.

There are two types of integration possible for NDepend:

  1. Directly using NDepend Package Extension from VSTS Marketplace
  2. Manual Integration using NDepend Command Line Tool. (This would provide you more control over licensing by setting up the license in your own OnPrem VSTS Build Agent.

For the interest of this article I will cover the use of VSTS Package Extension and using NDepend Build Task in VSTS Build Pipeline.

Installation of NDepend Extension for VSTS/Azure DevOps :

1.) Got to Azure DevOps Market Place:  https://marketplace.visualstudio.com/items?itemName=ndepend.ndependextension

image

2.) Click on Get to Install this extension in to your AzureDevOps account and follow the steps. For the demo purpose I am starting with 30 day free trial, otherwise you can go ahead and buy the full license.

image

image

image

3.) Now when you get back to Azure DevOps project, you can see the NDepend side menu enabled, this is where you would see the report summary of your project.

image

Integration NDepend into Azure DevOps Pipeline :

1.) Select “NDepend Task” and add in to Pipeline

image

image

Note:

  • You can choose to stop the build when at least one quality gate fails.
  • You also need to specify the NDepend project file customized for your project, otherwise NDepend will use their default project file configuration.  Having your own NDepend project file will provide you more control over the policies for the scan.

Queue a new Build and wait for Build to complete. Now you can see the BuildArtifacts includes all NDepend report file.

image

Now you go back to NDepend menu from Left side menu item in Summary Tab. This will provide you detailed view of Technical Debt in your project.

image

image

image

image

image

In the next article I will cover the manual integration steps.

New Microsoft Azure Certifications

Microsoft has recently announced new certification exam tracks for Azure Administrators, Developers and Architects. Here are the line ups that should help you move your career with right certifications. 

The three new Microsoft Azure Certifications are:

  • Microsoft Certified Azure Developer
  • Microsoft Certified Azure Administrator
  • Microsoft Certified Azure Architect

These certifications would essentially split the previous MCSA/MCSE: Cloud Platform and Infrastructure track and introduce new exams for individual certification track. 

So far I only have limited information available about all the exam numbers for each individual track, as recently Microsoft has made BETA exams available for Microsoft Certified Azure Administrator track. 

These exams are still in BETA, would commence general availability in coming months.  Will keep you posted about newer exams for other tracks as we get to know more. 

References: https://www.microsoft.com/en-us/learning/exam-list.aspx 

Async LLM Patterns: Building High-Throughput AI Applications

Introduction: LLM APIs are inherently slow—even fast models take hundreds of milliseconds per request. When you need to process multiple prompts, make parallel API calls, or handle high-throughput workloads, synchronous code becomes a bottleneck. Async patterns let you overlap I/O wait times, dramatically improving throughput without adding complexity. This guide covers practical async patterns for LLM applications: concurrent request handling, batching strategies, streaming with async generators, retry logic with exponential backoff, and production-ready patterns for building responsive AI applications. Whether you’re building a chatbot handling multiple users, a batch processing pipeline, or a real-time agent, these patterns will help you maximize throughput while keeping your code maintainable.

Async LLM Processing
Async LLM: Request Queue, Batch Processor, Parallel Execution

Basic Async LLM Client

import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator
from abc import ABC, abstractmethod
import time

@dataclass
class LLMResponse:
    """LLM response."""
    
    content: str
    model: str
    usage: dict = field(default_factory=dict)
    latency_ms: float = 0
    
@dataclass
class LLMConfig:
    """LLM configuration."""
    
    model: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 1000
    timeout: float = 60.0

class AsyncLLMClient(ABC):
    """Abstract async LLM client."""
    
    @abstractmethod
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Generate completion."""
        pass
    
    @abstractmethod
    async def stream(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream completion."""
        pass

class AsyncOpenAIClient(AsyncLLMClient):
    """Async OpenAI client."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1"
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create session."""
        
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                }
            )
        return self._session
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Generate completion."""
        
        config = config or LLMConfig()
        session = await self._get_session()
        
        start = time.time()
        
        async with session.post(
            f"{self.base_url}/chat/completions",
            json={
                "model": config.model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": config.temperature,
                "max_tokens": config.max_tokens
            },
            timeout=aiohttp.ClientTimeout(total=config.timeout)
        ) as response:
            data = await response.json()
            
            latency = (time.time() - start) * 1000
            
            return LLMResponse(
                content=data["choices"][0]["message"]["content"],
                model=config.model,
                usage=data.get("usage", {}),
                latency_ms=latency
            )
    
    async def stream(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream completion."""
        
        config = config or LLMConfig()
        session = await self._get_session()
        
        async with session.post(
            f"{self.base_url}/chat/completions",
            json={
                "model": config.model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": config.temperature,
                "max_tokens": config.max_tokens,
                "stream": True
            },
            timeout=aiohttp.ClientTimeout(total=config.timeout)
        ) as response:
            async for line in response.content:
                line = line.decode().strip()
                
                if line.startswith("data: "):
                    data = line[6:]
                    
                    if data == "[DONE]":
                        break
                    
                    import json
                    chunk = json.loads(data)
                    
                    if chunk["choices"][0].get("delta", {}).get("content"):
                        yield chunk["choices"][0]["delta"]["content"]
    
    async def close(self):
        """Close session."""
        
        if self._session and not self._session.closed:
            await self._session.close()

class AsyncAnthropicClient(AsyncLLMClient):
    """Async Anthropic client."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.anthropic.com/v1"
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create session."""
        
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                headers={
                    "x-api-key": self.api_key,
                    "Content-Type": "application/json",
                    "anthropic-version": "2023-06-01"
                }
            )
        return self._session
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Generate completion."""
        
        config = config or LLMConfig()
        session = await self._get_session()
        
        # Map model names
        model = config.model
        if model.startswith("gpt"):
            model = "claude-3-sonnet-20240229"
        
        start = time.time()
        
        async with session.post(
            f"{self.base_url}/messages",
            json={
                "model": model,
                "max_tokens": config.max_tokens,
                "messages": [{"role": "user", "content": prompt}]
            },
            timeout=aiohttp.ClientTimeout(total=config.timeout)
        ) as response:
            data = await response.json()
            
            latency = (time.time() - start) * 1000
            
            return LLMResponse(
                content=data["content"][0]["text"],
                model=model,
                usage=data.get("usage", {}),
                latency_ms=latency
            )
    
    async def stream(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream completion."""
        
        config = config or LLMConfig()
        session = await self._get_session()
        
        model = config.model
        if model.startswith("gpt"):
            model = "claude-3-sonnet-20240229"
        
        async with session.post(
            f"{self.base_url}/messages",
            json={
                "model": model,
                "max_tokens": config.max_tokens,
                "messages": [{"role": "user", "content": prompt}],
                "stream": True
            },
            timeout=aiohttp.ClientTimeout(total=config.timeout)
        ) as response:
            async for line in response.content:
                line = line.decode().strip()
                
                if line.startswith("data: "):
                    import json
                    data = json.loads(line[6:])
                    
                    if data["type"] == "content_block_delta":
                        yield data["delta"]["text"]
    
    async def close(self):
        """Close session."""
        
        if self._session and not self._session.closed:
            await self._session.close()

Concurrent Request Patterns

import asyncio
from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class ConcurrentResult:
    """Result from concurrent execution."""
    
    results: list[LLMResponse]
    total_time_ms: float
    avg_latency_ms: float
    success_count: int
    error_count: int

class ConcurrentExecutor:
    """Execute multiple LLM requests concurrently."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        max_concurrency: int = 10
    ):
        self.client = client
        self.semaphore = asyncio.Semaphore(max_concurrency)
    
    async def _execute_one(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> tuple[Optional[LLMResponse], Optional[Exception]]:
        """Execute single request with semaphore."""
        
        async with self.semaphore:
            try:
                result = await self.client.complete(prompt, config)
                return result, None
            except Exception as e:
                return None, e
    
    async def execute_all(
        self,
        prompts: list[str],
        config: LLMConfig = None
    ) -> ConcurrentResult:
        """Execute all prompts concurrently."""
        
        start = time.time()
        
        tasks = [
            self._execute_one(prompt, config)
            for prompt in prompts
        ]
        
        results = await asyncio.gather(*tasks)
        
        total_time = (time.time() - start) * 1000
        
        successful = [r for r, e in results if r is not None]
        errors = [e for r, e in results if e is not None]
        
        avg_latency = (
            sum(r.latency_ms for r in successful) / len(successful)
            if successful else 0
        )
        
        return ConcurrentResult(
            results=successful,
            total_time_ms=total_time,
            avg_latency_ms=avg_latency,
            success_count=len(successful),
            error_count=len(errors)
        )
    
    async def execute_with_progress(
        self,
        prompts: list[str],
        config: LLMConfig = None,
        progress_callback: callable = None
    ) -> ConcurrentResult:
        """Execute with progress tracking."""
        
        start = time.time()
        results = []
        errors = []
        completed = 0
        
        async def execute_with_tracking(prompt: str, index: int):
            nonlocal completed
            
            result, error = await self._execute_one(prompt, config)
            
            completed += 1
            
            if progress_callback:
                await progress_callback(completed, len(prompts), result, error)
            
            return result, error
        
        tasks = [
            execute_with_tracking(prompt, i)
            for i, prompt in enumerate(prompts)
        ]
        
        task_results = await asyncio.gather(*tasks)
        
        total_time = (time.time() - start) * 1000
        
        successful = [r for r, e in task_results if r is not None]
        
        avg_latency = (
            sum(r.latency_ms for r in successful) / len(successful)
            if successful else 0
        )
        
        return ConcurrentResult(
            results=successful,
            total_time_ms=total_time,
            avg_latency_ms=avg_latency,
            success_count=len(successful),
            error_count=len(task_results) - len(successful)
        )

class MapReduceExecutor:
    """Map-reduce pattern for LLM processing."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        max_concurrency: int = 10
    ):
        self.client = client
        self.executor = ConcurrentExecutor(client, max_concurrency)
    
    async def map_reduce(
        self,
        items: list[str],
        map_prompt_template: str,
        reduce_prompt_template: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Map items through LLM, then reduce results."""
        
        # Map phase
        map_prompts = [
            map_prompt_template.format(item=item)
            for item in items
        ]
        
        map_results = await self.executor.execute_all(map_prompts, config)
        
        # Reduce phase
        mapped_outputs = "\n\n".join(
            f"Item {i+1}:\n{r.content}"
            for i, r in enumerate(map_results.results)
        )
        
        reduce_prompt = reduce_prompt_template.format(
            mapped_outputs=mapped_outputs
        )
        
        return await self.client.complete(reduce_prompt, config)
    
    async def parallel_chains(
        self,
        input_data: str,
        chain_prompts: list[list[str]],
        config: LLMConfig = None
    ) -> list[LLMResponse]:
        """Execute multiple prompt chains in parallel."""
        
        async def execute_chain(prompts: list[str]) -> LLMResponse:
            """Execute a single chain sequentially."""
            
            context = input_data
            result = None
            
            for prompt_template in prompts:
                prompt = prompt_template.format(context=context)
                result = await self.client.complete(prompt, config)
                context = result.content
            
            return result
        
        tasks = [execute_chain(chain) for chain in chain_prompts]
        return await asyncio.gather(*tasks)

Batching and Queuing

import asyncio
from dataclasses import dataclass
from typing import Any, Optional
from collections import deque

@dataclass
class BatchRequest:
    """Request in batch queue."""
    
    prompt: str
    config: LLMConfig
    future: asyncio.Future
    created_at: float = field(default_factory=time.time)

class BatchProcessor:
    """Process requests in batches."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        batch_size: int = 10,
        batch_timeout: float = 0.1,
        max_queue_size: int = 1000
    ):
        self.client = client
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.max_queue_size = max_queue_size
        
        self.queue: deque[BatchRequest] = deque()
        self._running = False
        self._processor_task: Optional[asyncio.Task] = None
    
    async def start(self):
        """Start batch processor."""
        
        self._running = True
        self._processor_task = asyncio.create_task(self._process_loop())
    
    async def stop(self):
        """Stop batch processor."""
        
        self._running = False
        
        if self._processor_task:
            self._processor_task.cancel()
            try:
                await self._processor_task
            except asyncio.CancelledError:
                pass
    
    async def submit(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Submit request to batch queue."""
        
        if len(self.queue) >= self.max_queue_size:
            raise RuntimeError("Queue full")
        
        future = asyncio.get_event_loop().create_future()
        
        request = BatchRequest(
            prompt=prompt,
            config=config or LLMConfig(),
            future=future
        )
        
        self.queue.append(request)
        
        return await future
    
    async def _process_loop(self):
        """Main processing loop."""
        
        while self._running:
            batch = await self._collect_batch()
            
            if batch:
                await self._process_batch(batch)
            else:
                await asyncio.sleep(0.01)
    
    async def _collect_batch(self) -> list[BatchRequest]:
        """Collect batch of requests."""
        
        batch = []
        deadline = time.time() + self.batch_timeout
        
        while len(batch) < self.batch_size:
            if self.queue:
                batch.append(self.queue.popleft())
            elif batch:
                # Have some items, check timeout
                if time.time() >= deadline:
                    break
                await asyncio.sleep(0.01)
            else:
                # No items, wait a bit
                await asyncio.sleep(0.01)
                break
        
        return batch
    
    async def _process_batch(self, batch: list[BatchRequest]):
        """Process batch of requests."""
        
        # Execute all requests concurrently
        tasks = [
            self.client.complete(req.prompt, req.config)
            for req in batch
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Resolve futures
        for request, result in zip(batch, results):
            if isinstance(result, Exception):
                request.future.set_exception(result)
            else:
                request.future.set_result(result)

class PriorityQueue:
    """Priority queue for LLM requests."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        max_concurrency: int = 10
    ):
        self.client = client
        self.semaphore = asyncio.Semaphore(max_concurrency)
        
        # Priority queues (0 = highest)
        self.queues: dict[int, deque] = {
            0: deque(),  # Critical
            1: deque(),  # High
            2: deque(),  # Normal
            3: deque()   # Low
        }
        
        self._running = False
    
    async def submit(
        self,
        prompt: str,
        priority: int = 2,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Submit request with priority."""
        
        future = asyncio.get_event_loop().create_future()
        
        request = BatchRequest(
            prompt=prompt,
            config=config or LLMConfig(),
            future=future
        )
        
        self.queues[priority].append(request)
        
        return await future
    
    async def start(self):
        """Start processing."""
        
        self._running = True
        asyncio.create_task(self._process_loop())
    
    async def _process_loop(self):
        """Process requests by priority."""
        
        while self._running:
            request = self._get_next_request()
            
            if request:
                asyncio.create_task(self._process_request(request))
            else:
                await asyncio.sleep(0.01)
    
    def _get_next_request(self) -> Optional[BatchRequest]:
        """Get highest priority request."""
        
        for priority in sorted(self.queues.keys()):
            if self.queues[priority]:
                return self.queues[priority].popleft()
        return None
    
    async def _process_request(self, request: BatchRequest):
        """Process single request."""
        
        async with self.semaphore:
            try:
                result = await self.client.complete(
                    request.prompt,
                    request.config
                )
                request.future.set_result(result)
            except Exception as e:
                request.future.set_exception(e)

Retry and Error Handling

import asyncio
import random
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class RetryStrategy(Enum):
    """Retry strategies."""
    
    EXPONENTIAL = "exponential"
    LINEAR = "linear"
    CONSTANT = "constant"

@dataclass
class RetryConfig:
    """Retry configuration."""
    
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
    jitter: bool = True
    retryable_errors: tuple = (
        aiohttp.ClientError,
        asyncio.TimeoutError
    )

class RetryableClient:
    """LLM client with retry logic."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        config: RetryConfig = None
    ):
        self.client = client
        self.config = config or RetryConfig()
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay for attempt."""
        
        if self.config.strategy == RetryStrategy.EXPONENTIAL:
            delay = self.config.base_delay * (2 ** attempt)
        elif self.config.strategy == RetryStrategy.LINEAR:
            delay = self.config.base_delay * (attempt + 1)
        else:
            delay = self.config.base_delay
        
        delay = min(delay, self.config.max_delay)
        
        if self.config.jitter:
            delay = delay * (0.5 + random.random())
        
        return delay
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Complete with retries."""
        
        last_error = None
        
        for attempt in range(self.config.max_retries + 1):
            try:
                return await self.client.complete(prompt, config)
            
            except self.config.retryable_errors as e:
                last_error = e
                
                if attempt < self.config.max_retries:
                    delay = self._calculate_delay(attempt)
                    await asyncio.sleep(delay)
            
            except Exception as e:
                # Non-retryable error
                raise
        
        raise last_error

class CircuitBreaker:
    """Circuit breaker for LLM calls."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_requests: int = 3
    ):
        self.client = client
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests
        
        self.failures = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed, open, half_open
        self._lock = asyncio.Lock()
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Complete with circuit breaker."""
        
        async with self._lock:
            await self._check_state()
        
        if self.state == "open":
            raise RuntimeError("Circuit breaker is open")
        
        try:
            result = await self.client.complete(prompt, config)
            
            async with self._lock:
                self._on_success()
            
            return result
        
        except Exception as e:
            async with self._lock:
                self._on_failure()
            raise
    
    async def _check_state(self):
        """Check and update circuit state."""
        
        if self.state == "open":
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = "half_open"
                self.failures = 0
    
    def _on_success(self):
        """Handle successful call."""
        
        if self.state == "half_open":
            self.failures = 0
            self.state = "closed"
    
    def _on_failure(self):
        """Handle failed call."""
        
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.failures >= self.failure_threshold:
            self.state = "open"

class FallbackClient:
    """Client with fallback providers."""
    
    def __init__(self, clients: list[AsyncLLMClient]):
        self.clients = clients
    
    async def complete(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> LLMResponse:
        """Try clients in order until one succeeds."""
        
        errors = []
        
        for client in self.clients:
            try:
                return await client.complete(prompt, config)
            except Exception as e:
                errors.append(e)
        
        raise RuntimeError(f"All providers failed: {errors}")

Streaming Patterns

import asyncio
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator

class StreamingAggregator:
    """Aggregate multiple streams."""
    
    def __init__(self, client: AsyncLLMClient):
        self.client = client
    
    async def stream_first(
        self,
        prompts: list[str],
        config: LLMConfig = None
    ) -> AsyncIterator[tuple[int, str]]:
        """Stream from first responding prompt."""
        
        queues = [asyncio.Queue() for _ in prompts]
        done_event = asyncio.Event()
        
        async def stream_to_queue(index: int, prompt: str):
            try:
                async for chunk in self.client.stream(prompt, config):
                    if done_event.is_set():
                        return
                    await queues[index].put((index, chunk))
                await queues[index].put((index, None))  # Signal done
            except Exception as e:
                await queues[index].put((index, e))
        
        # Start all streams
        tasks = [
            asyncio.create_task(stream_to_queue(i, p))
            for i, p in enumerate(prompts)
        ]
        
        # Yield from first responding stream
        first_index = None
        
        while True:
            # Check all queues
            for i, queue in enumerate(queues):
                if not queue.empty():
                    index, item = await queue.get()
                    
                    if first_index is None:
                        first_index = index
                    
                    if index == first_index:
                        if item is None:
                            done_event.set()
                            return
                        if isinstance(item, Exception):
                            done_event.set()
                            raise item
                        yield index, item
            
            await asyncio.sleep(0.01)
    
    async def stream_merge(
        self,
        prompts: list[str],
        config: LLMConfig = None
    ) -> AsyncIterator[tuple[int, str]]:
        """Merge all streams interleaved."""
        
        queue = asyncio.Queue()
        active_streams = len(prompts)
        
        async def stream_to_queue(index: int, prompt: str):
            nonlocal active_streams
            
            try:
                async for chunk in self.client.stream(prompt, config):
                    await queue.put((index, chunk))
            except Exception as e:
                await queue.put((index, e))
            finally:
                active_streams -= 1
                if active_streams == 0:
                    await queue.put(None)  # Signal all done
        
        # Start all streams
        for i, prompt in enumerate(prompts):
            asyncio.create_task(stream_to_queue(i, prompt))
        
        # Yield merged results
        while True:
            item = await queue.get()
            
            if item is None:
                return
            
            index, chunk = item
            
            if isinstance(chunk, Exception):
                continue  # Skip errors, continue with others
            
            yield index, chunk

class StreamBuffer:
    """Buffer streaming output."""
    
    def __init__(
        self,
        client: AsyncLLMClient,
        buffer_size: int = 10
    ):
        self.client = client
        self.buffer_size = buffer_size
    
    async def stream_buffered(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream with buffering."""
        
        buffer = []
        
        async for chunk in self.client.stream(prompt, config):
            buffer.append(chunk)
            
            if len(buffer) >= self.buffer_size:
                yield "".join(buffer)
                buffer = []
        
        if buffer:
            yield "".join(buffer)
    
    async def stream_sentences(
        self,
        prompt: str,
        config: LLMConfig = None
    ) -> AsyncIterator[str]:
        """Stream complete sentences."""
        
        import re
        
        buffer = ""
        sentence_pattern = re.compile(r'[.!?]+\s*')
        
        async for chunk in self.client.stream(prompt, config):
            buffer += chunk
            
            # Find complete sentences
            while True:
                match = sentence_pattern.search(buffer)
                
                if match:
                    sentence = buffer[:match.end()]
                    buffer = buffer[match.end():]
                    yield sentence
                else:
                    break
        
        # Yield remaining
        if buffer.strip():
            yield buffer

Production Async Service

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import json

app = FastAPI()

class CompletionRequest(BaseModel):
    prompt: str
    model: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 1000
    stream: bool = False
    priority: int = 2

class BatchRequest(BaseModel):
    prompts: list[str]
    model: str = "gpt-4"
    max_concurrency: int = 10

# Initialize clients
openai_client = AsyncOpenAIClient(api_key="your-key")
anthropic_client = AsyncAnthropicClient(api_key="your-key")

# Fallback client
fallback_client = FallbackClient([openai_client, anthropic_client])

# Retryable client
retry_client = RetryableClient(fallback_client)

# Circuit breaker
circuit_breaker = CircuitBreaker(retry_client)

# Batch processor
batch_processor = BatchProcessor(circuit_breaker)

# Concurrent executor
executor = ConcurrentExecutor(circuit_breaker)

@app.on_event("startup")
async def startup():
    await batch_processor.start()

@app.on_event("shutdown")
async def shutdown():
    await batch_processor.stop()
    await openai_client.close()
    await anthropic_client.close()

@app.post("/v1/completions")
async def create_completion(request: CompletionRequest):
    """Create completion."""
    
    config = LLMConfig(
        model=request.model,
        temperature=request.temperature,
        max_tokens=request.max_tokens
    )
    
    if request.stream:
        async def generate():
            async for chunk in circuit_breaker.client.stream(
                request.prompt,
                config
            ):
                yield f"data: {json.dumps({'content': chunk})}\n\n"
            yield "data: [DONE]\n\n"
        
        return StreamingResponse(
            generate(),
            media_type="text/event-stream"
        )
    
    result = await circuit_breaker.complete(request.prompt, config)
    
    return {
        "content": result.content,
        "model": result.model,
        "usage": result.usage,
        "latency_ms": result.latency_ms
    }

@app.post("/v1/completions/batch")
async def create_batch_completion(request: BatchRequest):
    """Create batch completions."""
    
    config = LLMConfig(model=request.model)
    
    local_executor = ConcurrentExecutor(
        circuit_breaker,
        max_concurrency=request.max_concurrency
    )
    
    result = await local_executor.execute_all(request.prompts, config)
    
    return {
        "results": [
            {
                "content": r.content,
                "latency_ms": r.latency_ms
            }
            for r in result.results
        ],
        "total_time_ms": result.total_time_ms,
        "avg_latency_ms": result.avg_latency_ms,
        "success_count": result.success_count,
        "error_count": result.error_count
    }

@app.get("/v1/health")
async def health():
    """Health check."""
    
    return {
        "status": "healthy",
        "circuit_breaker_state": circuit_breaker.state,
        "queue_size": len(batch_processor.queue)
    }

@app.get("/v1/metrics")
async def metrics():
    """Get metrics."""
    
    return {
        "circuit_breaker": {
            "state": circuit_breaker.state,
            "failures": circuit_breaker.failures
        },
        "batch_processor": {
            "queue_size": len(batch_processor.queue)
        }
    }

References

Conclusion

Async patterns are essential for building responsive, high-throughput LLM applications. Start with basic async clients using aiohttp or httpx—the performance gains from overlapping I/O are immediate. Use semaphores to control concurrency and prevent overwhelming API rate limits. Implement retry logic with exponential backoff for transient failures, and circuit breakers to fail fast when providers are down. For high-volume workloads, batch requests to amortize overhead and use priority queues to ensure critical requests get processed first. Streaming responses improve perceived latency—users see output immediately rather than waiting for complete responses. The fallback pattern across multiple providers improves reliability, though watch for subtle differences in model behavior. Monitor queue depths, latency percentiles, and error rates to tune concurrency limits. The key insight is that async isn’t just about performance—it’s about building resilient systems that handle failures gracefully and scale with demand. These patterns form the foundation for production LLM services that can handle thousands of concurrent users while maintaining responsiveness.