Latest Articles

LLM Response Streaming: Building Real-Time AI Experiences

Introduction: Streaming LLM responses transforms the user experience from waiting for complete responses to seeing text appear in real-time, dramatically improving perceived latency. Instead of staring at a loading spinner for 5-10 seconds, users see the first tokens within milliseconds and can start reading while generation continues. But implementing streaming properly involves more than just enabling the stream parameter—you need to handle connection management, error recovery, token buffering, and client-side rendering. Server-Sent Events (SSE) and WebSockets each have tradeoffs for different use cases. This guide covers practical patterns for implementing LLM response streaming: from basic SSE endpoints to sophisticated buffering strategies and production-ready streaming services that handle errors gracefully and provide excellent user experiences.

LLM Response Streaming
Streaming: Generate, Buffer, Deliver

Basic Streaming Implementation

from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator, Callable
from enum import Enum
import asyncio
import json

class StreamEventType(Enum):
    """Types of stream events."""
    
    TOKEN = "token"
    CHUNK = "chunk"
    DONE = "done"
    ERROR = "error"
    METADATA = "metadata"

@dataclass
class StreamEvent:
    """A streaming event."""
    
    event_type: StreamEventType
    data: str
    metadata: dict = field(default_factory=dict)
    
    def to_sse(self) -> str:
        """Convert to SSE format."""
        
        event_data = {
            "type": self.event_type.value,
            "data": self.data,
            "metadata": self.metadata
        }
        
        return f"data: {json.dumps(event_data)}\n\n"
    
    def to_json(self) -> str:
        """Convert to JSON."""
        
        return json.dumps({
            "type": self.event_type.value,
            "data": self.data,
            "metadata": self.metadata
        })

class OpenAIStreamer:
    """Stream responses from OpenAI."""
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key
    
    async def stream(
        self,
        messages: list[dict],
        model: str = "gpt-4o-mini",
        **kwargs
    ) -> AsyncIterator[StreamEvent]:
        """Stream completion from OpenAI."""
        
        import openai
        
        client = openai.AsyncOpenAI(api_key=self.api_key)
        
        try:
            stream = await client.chat.completions.create(
                model=model,
                messages=messages,
                stream=True,
                **kwargs
            )
            
            async for chunk in stream:
                if chunk.choices and chunk.choices[0].delta.content:
                    yield StreamEvent(
                        event_type=StreamEventType.TOKEN,
                        data=chunk.choices[0].delta.content
                    )
            
            yield StreamEvent(
                event_type=StreamEventType.DONE,
                data=""
            )
            
        except Exception as e:
            yield StreamEvent(
                event_type=StreamEventType.ERROR,
                data=str(e)
            )

class AnthropicStreamer:
    """Stream responses from Anthropic."""
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key
    
    async def stream(
        self,
        messages: list[dict],
        model: str = "claude-3-haiku-20240307",
        **kwargs
    ) -> AsyncIterator[StreamEvent]:
        """Stream completion from Anthropic."""
        
        import anthropic
        
        client = anthropic.AsyncAnthropic(api_key=self.api_key)
        
        try:
            async with client.messages.stream(
                model=model,
                messages=messages,
                max_tokens=kwargs.get("max_tokens", 1024)
            ) as stream:
                async for text in stream.text_stream:
                    yield StreamEvent(
                        event_type=StreamEventType.TOKEN,
                        data=text
                    )
            
            yield StreamEvent(
                event_type=StreamEventType.DONE,
                data=""
            )
            
        except Exception as e:
            yield StreamEvent(
                event_type=StreamEventType.ERROR,
                data=str(e)
            )

class UnifiedStreamer:
    """Unified streaming interface."""
    
    def __init__(self):
        self.streamers = {
            "openai": OpenAIStreamer(),
            "anthropic": AnthropicStreamer()
        }
    
    async def stream(
        self,
        provider: str,
        messages: list[dict],
        model: str,
        **kwargs
    ) -> AsyncIterator[StreamEvent]:
        """Stream from any provider."""
        
        streamer = self.streamers.get(provider)
        
        if not streamer:
            yield StreamEvent(
                event_type=StreamEventType.ERROR,
                data=f"Unknown provider: {provider}"
            )
            return
        
        async for event in streamer.stream(messages, model, **kwargs):
            yield event

Stream Buffering and Processing

from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator, Callable
import asyncio
import re

@dataclass
class BufferConfig:
    """Configuration for stream buffering."""
    
    min_chunk_size: int = 1
    max_chunk_size: int = 100
    flush_interval_ms: int = 50
    buffer_by_sentence: bool = False
    buffer_by_word: bool = False

class StreamBuffer:
    """Buffer streaming tokens."""
    
    def __init__(self, config: BufferConfig = None):
        self.config = config or BufferConfig()
        self.buffer = ""
        self.last_flush = 0
    
    async def process(
        self,
        stream: AsyncIterator[StreamEvent]
    ) -> AsyncIterator[StreamEvent]:
        """Process stream with buffering."""
        
        import time
        
        async for event in stream:
            if event.event_type == StreamEventType.TOKEN:
                self.buffer += event.data
                
                # Check flush conditions
                should_flush = False
                
                # Size-based flush
                if len(self.buffer) >= self.config.max_chunk_size:
                    should_flush = True
                
                # Sentence-based flush
                elif self.config.buffer_by_sentence:
                    if re.search(r'[.!?]\s*$', self.buffer):
                        should_flush = True
                
                # Word-based flush
                elif self.config.buffer_by_word:
                    if self.buffer.endswith(' '):
                        should_flush = True
                
                # Time-based flush
                current_time = time.time() * 1000
                if current_time - self.last_flush >= self.config.flush_interval_ms:
                    if len(self.buffer) >= self.config.min_chunk_size:
                        should_flush = True
                
                if should_flush:
                    yield StreamEvent(
                        event_type=StreamEventType.CHUNK,
                        data=self.buffer
                    )
                    self.buffer = ""
                    self.last_flush = current_time
            
            elif event.event_type == StreamEventType.DONE:
                # Flush remaining buffer
                if self.buffer:
                    yield StreamEvent(
                        event_type=StreamEventType.CHUNK,
                        data=self.buffer
                    )
                    self.buffer = ""
                
                yield event
            
            else:
                yield event

class StreamTransformer:
    """Transform streaming content."""
    
    def __init__(self):
        self.transformers: list[Callable[[str], str]] = []
    
    def add_transformer(self, func: Callable[[str], str]):
        """Add a transformer function."""
        
        self.transformers.append(func)
    
    async def process(
        self,
        stream: AsyncIterator[StreamEvent]
    ) -> AsyncIterator[StreamEvent]:
        """Transform stream content."""
        
        async for event in stream:
            if event.event_type in [StreamEventType.TOKEN, StreamEventType.CHUNK]:
                data = event.data
                
                for transformer in self.transformers:
                    data = transformer(data)
                
                yield StreamEvent(
                    event_type=event.event_type,
                    data=data,
                    metadata=event.metadata
                )
            else:
                yield event

class StreamAccumulator:
    """Accumulate stream for post-processing."""
    
    def __init__(self):
        self.accumulated = ""
        self.events: list[StreamEvent] = []
    
    async def process(
        self,
        stream: AsyncIterator[StreamEvent]
    ) -> AsyncIterator[StreamEvent]:
        """Accumulate and pass through stream."""
        
        async for event in stream:
            self.events.append(event)
            
            if event.event_type in [StreamEventType.TOKEN, StreamEventType.CHUNK]:
                self.accumulated += event.data
            
            yield event
    
    def get_full_response(self) -> str:
        """Get accumulated response."""
        
        return self.accumulated
    
    def get_token_count(self) -> int:
        """Estimate token count."""
        
        return len(self.accumulated) // 4

class StreamValidator:
    """Validate streaming content."""
    
    def __init__(self):
        self.validators: list[Callable[[str], bool]] = []
        self.accumulated = ""
    
    def add_validator(self, func: Callable[[str], bool]):
        """Add validator function."""
        
        self.validators.append(func)
    
    async def process(
        self,
        stream: AsyncIterator[StreamEvent]
    ) -> AsyncIterator[StreamEvent]:
        """Validate stream content."""
        
        async for event in stream:
            if event.event_type in [StreamEventType.TOKEN, StreamEventType.CHUNK]:
                self.accumulated += event.data
                
                # Run validators on accumulated content
                for validator in self.validators:
                    if not validator(self.accumulated):
                        yield StreamEvent(
                            event_type=StreamEventType.ERROR,
                            data="Validation failed",
                            metadata={"accumulated": self.accumulated}
                        )
                        return
            
            yield event

Server-Sent Events (SSE)

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from typing import AsyncIterator
import asyncio
import json

app = FastAPI()

async def generate_sse_stream(
    streamer: UnifiedStreamer,
    provider: str,
    messages: list[dict],
    model: str
) -> AsyncIterator[str]:
    """Generate SSE stream."""
    
    async for event in streamer.stream(provider, messages, model):
        yield event.to_sse()
        
        # Small delay to prevent overwhelming client
        await asyncio.sleep(0.01)

@app.post("/v1/stream/sse")
async def stream_sse(request: Request):
    """Stream response via SSE."""
    
    body = await request.json()
    
    streamer = UnifiedStreamer()
    
    return StreamingResponse(
        generate_sse_stream(
            streamer,
            body.get("provider", "openai"),
            body.get("messages", []),
            body.get("model", "gpt-4o-mini")
        ),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

class SSEClient:
    """Client for consuming SSE streams."""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
    
    async def stream(
        self,
        endpoint: str,
        payload: dict
    ) -> AsyncIterator[StreamEvent]:
        """Consume SSE stream."""
        
        import httpx
        
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                f"{self.base_url}{endpoint}",
                json=payload,
                headers={"Accept": "text/event-stream"}
            ) as response:
                async for line in response.aiter_lines():
                    if line.startswith("data: "):
                        data = json.loads(line[6:])
                        
                        yield StreamEvent(
                            event_type=StreamEventType(data["type"]),
                            data=data["data"],
                            metadata=data.get("metadata", {})
                        )

class SSEConnectionManager:
    """Manage SSE connections."""
    
    def __init__(self):
        self.active_connections: dict[str, asyncio.Queue] = {}
    
    async def connect(self, connection_id: str) -> asyncio.Queue:
        """Create new connection."""
        
        queue = asyncio.Queue()
        self.active_connections[connection_id] = queue
        
        return queue
    
    async def disconnect(self, connection_id: str):
        """Remove connection."""
        
        if connection_id in self.active_connections:
            del self.active_connections[connection_id]
    
    async def send_event(self, connection_id: str, event: StreamEvent):
        """Send event to connection."""
        
        if connection_id in self.active_connections:
            await self.active_connections[connection_id].put(event)
    
    async def broadcast(self, event: StreamEvent):
        """Broadcast to all connections."""
        
        for queue in self.active_connections.values():
            await queue.put(event)

WebSocket Streaming

from fastapi import WebSocket, WebSocketDisconnect
from typing import Any, Optional
import asyncio
import json

class WebSocketStreamer:
    """Stream via WebSocket."""
    
    def __init__(self):
        self.connections: dict[str, WebSocket] = {}
    
    async def connect(self, websocket: WebSocket, client_id: str):
        """Accept WebSocket connection."""
        
        await websocket.accept()
        self.connections[client_id] = websocket
    
    def disconnect(self, client_id: str):
        """Remove connection."""
        
        if client_id in self.connections:
            del self.connections[client_id]
    
    async def send_event(self, client_id: str, event: StreamEvent):
        """Send event to client."""
        
        if client_id in self.connections:
            await self.connections[client_id].send_text(event.to_json())
    
    async def stream_to_client(
        self,
        client_id: str,
        stream: AsyncIterator[StreamEvent]
    ):
        """Stream events to client."""
        
        async for event in stream:
            await self.send_event(client_id, event)

ws_streamer = WebSocketStreamer()

@app.websocket("/v1/stream/ws/{client_id}")
async def websocket_stream(websocket: WebSocket, client_id: str):
    """WebSocket streaming endpoint."""
    
    await ws_streamer.connect(websocket, client_id)
    
    try:
        while True:
            # Receive request
            data = await websocket.receive_text()
            request = json.loads(data)
            
            # Create stream
            streamer = UnifiedStreamer()
            stream = streamer.stream(
                request.get("provider", "openai"),
                request.get("messages", []),
                request.get("model", "gpt-4o-mini")
            )
            
            # Stream response
            await ws_streamer.stream_to_client(client_id, stream)
    
    except WebSocketDisconnect:
        ws_streamer.disconnect(client_id)

class WebSocketClient:
    """WebSocket client for streaming."""
    
    def __init__(self, url: str):
        self.url = url
        self.websocket = None
    
    async def connect(self):
        """Connect to WebSocket."""
        
        import websockets
        
        self.websocket = await websockets.connect(self.url)
    
    async def disconnect(self):
        """Disconnect from WebSocket."""
        
        if self.websocket:
            await self.websocket.close()
    
    async def stream(self, request: dict) -> AsyncIterator[StreamEvent]:
        """Send request and stream response."""
        
        if not self.websocket:
            await self.connect()
        
        # Send request
        await self.websocket.send(json.dumps(request))
        
        # Receive stream
        while True:
            try:
                message = await self.websocket.recv()
                data = json.loads(message)
                
                event = StreamEvent(
                    event_type=StreamEventType(data["type"]),
                    data=data["data"],
                    metadata=data.get("metadata", {})
                )
                
                yield event
                
                if event.event_type == StreamEventType.DONE:
                    break
            
            except Exception as e:
                yield StreamEvent(
                    event_type=StreamEventType.ERROR,
                    data=str(e)
                )
                break

Error Handling and Recovery

from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
import asyncio

@dataclass
class RetryConfig:
    """Configuration for retry logic."""
    
    max_retries: int = 3
    base_delay_ms: int = 1000
    max_delay_ms: int = 10000
    exponential_base: float = 2.0

class StreamRetryHandler:
    """Handle stream errors with retry."""
    
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
    
    async def with_retry(
        self,
        stream_factory: Callable[[], AsyncIterator[StreamEvent]]
    ) -> AsyncIterator[StreamEvent]:
        """Wrap stream with retry logic."""
        
        retries = 0
        accumulated = ""
        
        while retries <= self.config.max_retries:
            try:
                stream = stream_factory()
                
                async for event in stream:
                    if event.event_type == StreamEventType.ERROR:
                        raise Exception(event.data)
                    
                    if event.event_type in [StreamEventType.TOKEN, StreamEventType.CHUNK]:
                        accumulated += event.data
                    
                    yield event
                
                # Success - exit retry loop
                return
            
            except Exception as e:
                retries += 1
                
                if retries > self.config.max_retries:
                    yield StreamEvent(
                        event_type=StreamEventType.ERROR,
                        data=f"Max retries exceeded: {str(e)}",
                        metadata={"accumulated": accumulated}
                    )
                    return
                
                # Calculate delay
                delay = min(
                    self.config.base_delay_ms * (self.config.exponential_base ** (retries - 1)),
                    self.config.max_delay_ms
                )
                
                yield StreamEvent(
                    event_type=StreamEventType.METADATA,
                    data="",
                    metadata={
                        "retry": retries,
                        "delay_ms": delay,
                        "error": str(e)
                    }
                )
                
                await asyncio.sleep(delay / 1000)

class StreamTimeoutHandler:
    """Handle stream timeouts."""
    
    def __init__(self, timeout_seconds: float = 30.0):
        self.timeout = timeout_seconds
    
    async def with_timeout(
        self,
        stream: AsyncIterator[StreamEvent]
    ) -> AsyncIterator[StreamEvent]:
        """Wrap stream with timeout."""
        
        try:
            async for event in stream:
                # Reset timeout on each event
                yield event
        
        except asyncio.TimeoutError:
            yield StreamEvent(
                event_type=StreamEventType.ERROR,
                data="Stream timeout"
            )

class StreamHealthMonitor:
    """Monitor stream health."""
    
    def __init__(
        self,
        heartbeat_interval_ms: int = 5000,
        max_silence_ms: int = 30000
    ):
        self.heartbeat_interval = heartbeat_interval_ms
        self.max_silence = max_silence_ms
        self.last_event_time = 0
    
    async def monitor(
        self,
        stream: AsyncIterator[StreamEvent]
    ) -> AsyncIterator[StreamEvent]:
        """Monitor stream and emit heartbeats."""
        
        import time
        
        self.last_event_time = time.time() * 1000
        
        async def heartbeat_task():
            while True:
                await asyncio.sleep(self.heartbeat_interval / 1000)
                
                current_time = time.time() * 1000
                silence = current_time - self.last_event_time
                
                if silence > self.max_silence:
                    return StreamEvent(
                        event_type=StreamEventType.ERROR,
                        data="Stream silence timeout"
                    )
        
        heartbeat = asyncio.create_task(heartbeat_task())
        
        try:
            async for event in stream:
                self.last_event_time = time.time() * 1000
                yield event
        
        finally:
            heartbeat.cancel()

Production Streaming Service

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, List, Dict
import asyncio

app = FastAPI()

class StreamRequest(BaseModel):
    provider: str = "openai"
    model: str = "gpt-4o-mini"
    messages: List[Dict]
    max_tokens: int = 1024
    buffer_config: Optional[Dict] = None

# Initialize components
streamer = UnifiedStreamer()
buffer_config = BufferConfig()

async def create_stream_pipeline(
    request: StreamRequest
) -> AsyncIterator[str]:
    """Create full streaming pipeline."""
    
    # Create base stream
    stream = streamer.stream(
        request.provider,
        request.messages,
        request.model,
        max_tokens=request.max_tokens
    )
    
    # Add buffering
    if request.buffer_config:
        config = BufferConfig(**request.buffer_config)
    else:
        config = buffer_config
    
    buffer = StreamBuffer(config)
    stream = buffer.process(stream)
    
    # Add accumulator for metrics
    accumulator = StreamAccumulator()
    stream = accumulator.process(stream)
    
    # Convert to SSE
    async for event in stream:
        yield event.to_sse()

@app.post("/v1/chat/stream")
async def stream_chat(request: StreamRequest):
    """Stream chat completion."""
    
    return StreamingResponse(
        create_stream_pipeline(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

@app.post("/v1/chat/stream/json")
async def stream_chat_json(request: StreamRequest):
    """Stream chat as JSON lines."""
    
    async def generate():
        async for event in streamer.stream(
            request.provider,
            request.messages,
            request.model
        ):
            yield event.to_json() + "\n"
    
    return StreamingResponse(
        generate(),
        media_type="application/x-ndjson"
    )

# WebSocket endpoint
@app.websocket("/v1/chat/ws")
async def websocket_chat(websocket: WebSocket):
    """WebSocket chat endpoint."""
    
    await websocket.accept()
    
    try:
        while True:
            data = await websocket.receive_json()
            
            request = StreamRequest(**data)
            
            async for event in streamer.stream(
                request.provider,
                request.messages,
                request.model
            ):
                await websocket.send_text(event.to_json())
    
    except WebSocketDisconnect:
        pass

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Streaming transforms LLM applications from sluggish to responsive. The key is choosing the right transport: Server-Sent Events (SSE) are simpler and work well for unidirectional streaming from server to client; WebSockets enable bidirectional communication for interactive applications. Buffering strategies matter for user experience—streaming individual tokens can feel jittery, while buffering by words or sentences creates smoother reading. Implement proper error handling with retry logic and timeouts; streams can fail mid-response, and users expect graceful recovery. For production systems, add health monitoring to detect stalled streams, implement connection management for scalability, and consider using message queues for high-throughput scenarios. On the client side, handle reconnection gracefully and provide visual feedback during streaming. The goal is to make AI responses feel instantaneous while maintaining reliability. With proper streaming implementation, users start seeing responses within milliseconds instead of waiting seconds, dramatically improving the perceived quality of your AI application.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion