Introduction: Streaming LLM responses transforms the user experience from waiting for complete responses to seeing text appear in real-time, dramatically improving perceived latency. Instead of staring at a loading spinner for 5-10 seconds, users see the first tokens within milliseconds and can start reading while generation continues. But implementing streaming properly involves more than just enabling the stream parameter—you need to handle connection management, error recovery, token buffering, and client-side rendering. Server-Sent Events (SSE) and WebSockets each have tradeoffs for different use cases. This guide covers practical patterns for implementing LLM response streaming: from basic SSE endpoints to sophisticated buffering strategies and production-ready streaming services that handle errors gracefully and provide excellent user experiences.

Basic Streaming Implementation
from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator, Callable
from enum import Enum
import asyncio
import json
class StreamEventType(Enum):
"""Types of stream events."""
TOKEN = "token"
CHUNK = "chunk"
DONE = "done"
ERROR = "error"
METADATA = "metadata"
@dataclass
class StreamEvent:
"""A streaming event."""
event_type: StreamEventType
data: str
metadata: dict = field(default_factory=dict)
def to_sse(self) -> str:
"""Convert to SSE format."""
event_data = {
"type": self.event_type.value,
"data": self.data,
"metadata": self.metadata
}
return f"data: {json.dumps(event_data)}\n\n"
def to_json(self) -> str:
"""Convert to JSON."""
return json.dumps({
"type": self.event_type.value,
"data": self.data,
"metadata": self.metadata
})
class OpenAIStreamer:
"""Stream responses from OpenAI."""
def __init__(self, api_key: str = None):
self.api_key = api_key
async def stream(
self,
messages: list[dict],
model: str = "gpt-4o-mini",
**kwargs
) -> AsyncIterator[StreamEvent]:
"""Stream completion from OpenAI."""
import openai
client = openai.AsyncOpenAI(api_key=self.api_key)
try:
stream = await client.chat.completions.create(
model=model,
messages=messages,
stream=True,
**kwargs
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield StreamEvent(
event_type=StreamEventType.TOKEN,
data=chunk.choices[0].delta.content
)
yield StreamEvent(
event_type=StreamEventType.DONE,
data=""
)
except Exception as e:
yield StreamEvent(
event_type=StreamEventType.ERROR,
data=str(e)
)
class AnthropicStreamer:
"""Stream responses from Anthropic."""
def __init__(self, api_key: str = None):
self.api_key = api_key
async def stream(
self,
messages: list[dict],
model: str = "claude-3-haiku-20240307",
**kwargs
) -> AsyncIterator[StreamEvent]:
"""Stream completion from Anthropic."""
import anthropic
client = anthropic.AsyncAnthropic(api_key=self.api_key)
try:
async with client.messages.stream(
model=model,
messages=messages,
max_tokens=kwargs.get("max_tokens", 1024)
) as stream:
async for text in stream.text_stream:
yield StreamEvent(
event_type=StreamEventType.TOKEN,
data=text
)
yield StreamEvent(
event_type=StreamEventType.DONE,
data=""
)
except Exception as e:
yield StreamEvent(
event_type=StreamEventType.ERROR,
data=str(e)
)
class UnifiedStreamer:
"""Unified streaming interface."""
def __init__(self):
self.streamers = {
"openai": OpenAIStreamer(),
"anthropic": AnthropicStreamer()
}
async def stream(
self,
provider: str,
messages: list[dict],
model: str,
**kwargs
) -> AsyncIterator[StreamEvent]:
"""Stream from any provider."""
streamer = self.streamers.get(provider)
if not streamer:
yield StreamEvent(
event_type=StreamEventType.ERROR,
data=f"Unknown provider: {provider}"
)
return
async for event in streamer.stream(messages, model, **kwargs):
yield event
Stream Buffering and Processing
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator, Callable
import asyncio
import re
@dataclass
class BufferConfig:
"""Configuration for stream buffering."""
min_chunk_size: int = 1
max_chunk_size: int = 100
flush_interval_ms: int = 50
buffer_by_sentence: bool = False
buffer_by_word: bool = False
class StreamBuffer:
"""Buffer streaming tokens."""
def __init__(self, config: BufferConfig = None):
self.config = config or BufferConfig()
self.buffer = ""
self.last_flush = 0
async def process(
self,
stream: AsyncIterator[StreamEvent]
) -> AsyncIterator[StreamEvent]:
"""Process stream with buffering."""
import time
async for event in stream:
if event.event_type == StreamEventType.TOKEN:
self.buffer += event.data
# Check flush conditions
should_flush = False
# Size-based flush
if len(self.buffer) >= self.config.max_chunk_size:
should_flush = True
# Sentence-based flush
elif self.config.buffer_by_sentence:
if re.search(r'[.!?]\s*$', self.buffer):
should_flush = True
# Word-based flush
elif self.config.buffer_by_word:
if self.buffer.endswith(' '):
should_flush = True
# Time-based flush
current_time = time.time() * 1000
if current_time - self.last_flush >= self.config.flush_interval_ms:
if len(self.buffer) >= self.config.min_chunk_size:
should_flush = True
if should_flush:
yield StreamEvent(
event_type=StreamEventType.CHUNK,
data=self.buffer
)
self.buffer = ""
self.last_flush = current_time
elif event.event_type == StreamEventType.DONE:
# Flush remaining buffer
if self.buffer:
yield StreamEvent(
event_type=StreamEventType.CHUNK,
data=self.buffer
)
self.buffer = ""
yield event
else:
yield event
class StreamTransformer:
"""Transform streaming content."""
def __init__(self):
self.transformers: list[Callable[[str], str]] = []
def add_transformer(self, func: Callable[[str], str]):
"""Add a transformer function."""
self.transformers.append(func)
async def process(
self,
stream: AsyncIterator[StreamEvent]
) -> AsyncIterator[StreamEvent]:
"""Transform stream content."""
async for event in stream:
if event.event_type in [StreamEventType.TOKEN, StreamEventType.CHUNK]:
data = event.data
for transformer in self.transformers:
data = transformer(data)
yield StreamEvent(
event_type=event.event_type,
data=data,
metadata=event.metadata
)
else:
yield event
class StreamAccumulator:
"""Accumulate stream for post-processing."""
def __init__(self):
self.accumulated = ""
self.events: list[StreamEvent] = []
async def process(
self,
stream: AsyncIterator[StreamEvent]
) -> AsyncIterator[StreamEvent]:
"""Accumulate and pass through stream."""
async for event in stream:
self.events.append(event)
if event.event_type in [StreamEventType.TOKEN, StreamEventType.CHUNK]:
self.accumulated += event.data
yield event
def get_full_response(self) -> str:
"""Get accumulated response."""
return self.accumulated
def get_token_count(self) -> int:
"""Estimate token count."""
return len(self.accumulated) // 4
class StreamValidator:
"""Validate streaming content."""
def __init__(self):
self.validators: list[Callable[[str], bool]] = []
self.accumulated = ""
def add_validator(self, func: Callable[[str], bool]):
"""Add validator function."""
self.validators.append(func)
async def process(
self,
stream: AsyncIterator[StreamEvent]
) -> AsyncIterator[StreamEvent]:
"""Validate stream content."""
async for event in stream:
if event.event_type in [StreamEventType.TOKEN, StreamEventType.CHUNK]:
self.accumulated += event.data
# Run validators on accumulated content
for validator in self.validators:
if not validator(self.accumulated):
yield StreamEvent(
event_type=StreamEventType.ERROR,
data="Validation failed",
metadata={"accumulated": self.accumulated}
)
return
yield event
Server-Sent Events (SSE)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from typing import AsyncIterator
import asyncio
import json
app = FastAPI()
async def generate_sse_stream(
streamer: UnifiedStreamer,
provider: str,
messages: list[dict],
model: str
) -> AsyncIterator[str]:
"""Generate SSE stream."""
async for event in streamer.stream(provider, messages, model):
yield event.to_sse()
# Small delay to prevent overwhelming client
await asyncio.sleep(0.01)
@app.post("/v1/stream/sse")
async def stream_sse(request: Request):
"""Stream response via SSE."""
body = await request.json()
streamer = UnifiedStreamer()
return StreamingResponse(
generate_sse_stream(
streamer,
body.get("provider", "openai"),
body.get("messages", []),
body.get("model", "gpt-4o-mini")
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
class SSEClient:
"""Client for consuming SSE streams."""
def __init__(self, base_url: str):
self.base_url = base_url
async def stream(
self,
endpoint: str,
payload: dict
) -> AsyncIterator[StreamEvent]:
"""Consume SSE stream."""
import httpx
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{self.base_url}{endpoint}",
json=payload,
headers={"Accept": "text/event-stream"}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
yield StreamEvent(
event_type=StreamEventType(data["type"]),
data=data["data"],
metadata=data.get("metadata", {})
)
class SSEConnectionManager:
"""Manage SSE connections."""
def __init__(self):
self.active_connections: dict[str, asyncio.Queue] = {}
async def connect(self, connection_id: str) -> asyncio.Queue:
"""Create new connection."""
queue = asyncio.Queue()
self.active_connections[connection_id] = queue
return queue
async def disconnect(self, connection_id: str):
"""Remove connection."""
if connection_id in self.active_connections:
del self.active_connections[connection_id]
async def send_event(self, connection_id: str, event: StreamEvent):
"""Send event to connection."""
if connection_id in self.active_connections:
await self.active_connections[connection_id].put(event)
async def broadcast(self, event: StreamEvent):
"""Broadcast to all connections."""
for queue in self.active_connections.values():
await queue.put(event)
WebSocket Streaming
from fastapi import WebSocket, WebSocketDisconnect
from typing import Any, Optional
import asyncio
import json
class WebSocketStreamer:
"""Stream via WebSocket."""
def __init__(self):
self.connections: dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, client_id: str):
"""Accept WebSocket connection."""
await websocket.accept()
self.connections[client_id] = websocket
def disconnect(self, client_id: str):
"""Remove connection."""
if client_id in self.connections:
del self.connections[client_id]
async def send_event(self, client_id: str, event: StreamEvent):
"""Send event to client."""
if client_id in self.connections:
await self.connections[client_id].send_text(event.to_json())
async def stream_to_client(
self,
client_id: str,
stream: AsyncIterator[StreamEvent]
):
"""Stream events to client."""
async for event in stream:
await self.send_event(client_id, event)
ws_streamer = WebSocketStreamer()
@app.websocket("/v1/stream/ws/{client_id}")
async def websocket_stream(websocket: WebSocket, client_id: str):
"""WebSocket streaming endpoint."""
await ws_streamer.connect(websocket, client_id)
try:
while True:
# Receive request
data = await websocket.receive_text()
request = json.loads(data)
# Create stream
streamer = UnifiedStreamer()
stream = streamer.stream(
request.get("provider", "openai"),
request.get("messages", []),
request.get("model", "gpt-4o-mini")
)
# Stream response
await ws_streamer.stream_to_client(client_id, stream)
except WebSocketDisconnect:
ws_streamer.disconnect(client_id)
class WebSocketClient:
"""WebSocket client for streaming."""
def __init__(self, url: str):
self.url = url
self.websocket = None
async def connect(self):
"""Connect to WebSocket."""
import websockets
self.websocket = await websockets.connect(self.url)
async def disconnect(self):
"""Disconnect from WebSocket."""
if self.websocket:
await self.websocket.close()
async def stream(self, request: dict) -> AsyncIterator[StreamEvent]:
"""Send request and stream response."""
if not self.websocket:
await self.connect()
# Send request
await self.websocket.send(json.dumps(request))
# Receive stream
while True:
try:
message = await self.websocket.recv()
data = json.loads(message)
event = StreamEvent(
event_type=StreamEventType(data["type"]),
data=data["data"],
metadata=data.get("metadata", {})
)
yield event
if event.event_type == StreamEventType.DONE:
break
except Exception as e:
yield StreamEvent(
event_type=StreamEventType.ERROR,
data=str(e)
)
break
Error Handling and Recovery
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
import asyncio
@dataclass
class RetryConfig:
"""Configuration for retry logic."""
max_retries: int = 3
base_delay_ms: int = 1000
max_delay_ms: int = 10000
exponential_base: float = 2.0
class StreamRetryHandler:
"""Handle stream errors with retry."""
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
async def with_retry(
self,
stream_factory: Callable[[], AsyncIterator[StreamEvent]]
) -> AsyncIterator[StreamEvent]:
"""Wrap stream with retry logic."""
retries = 0
accumulated = ""
while retries <= self.config.max_retries:
try:
stream = stream_factory()
async for event in stream:
if event.event_type == StreamEventType.ERROR:
raise Exception(event.data)
if event.event_type in [StreamEventType.TOKEN, StreamEventType.CHUNK]:
accumulated += event.data
yield event
# Success - exit retry loop
return
except Exception as e:
retries += 1
if retries > self.config.max_retries:
yield StreamEvent(
event_type=StreamEventType.ERROR,
data=f"Max retries exceeded: {str(e)}",
metadata={"accumulated": accumulated}
)
return
# Calculate delay
delay = min(
self.config.base_delay_ms * (self.config.exponential_base ** (retries - 1)),
self.config.max_delay_ms
)
yield StreamEvent(
event_type=StreamEventType.METADATA,
data="",
metadata={
"retry": retries,
"delay_ms": delay,
"error": str(e)
}
)
await asyncio.sleep(delay / 1000)
class StreamTimeoutHandler:
"""Handle stream timeouts."""
def __init__(self, timeout_seconds: float = 30.0):
self.timeout = timeout_seconds
async def with_timeout(
self,
stream: AsyncIterator[StreamEvent]
) -> AsyncIterator[StreamEvent]:
"""Wrap stream with timeout."""
try:
async for event in stream:
# Reset timeout on each event
yield event
except asyncio.TimeoutError:
yield StreamEvent(
event_type=StreamEventType.ERROR,
data="Stream timeout"
)
class StreamHealthMonitor:
"""Monitor stream health."""
def __init__(
self,
heartbeat_interval_ms: int = 5000,
max_silence_ms: int = 30000
):
self.heartbeat_interval = heartbeat_interval_ms
self.max_silence = max_silence_ms
self.last_event_time = 0
async def monitor(
self,
stream: AsyncIterator[StreamEvent]
) -> AsyncIterator[StreamEvent]:
"""Monitor stream and emit heartbeats."""
import time
self.last_event_time = time.time() * 1000
async def heartbeat_task():
while True:
await asyncio.sleep(self.heartbeat_interval / 1000)
current_time = time.time() * 1000
silence = current_time - self.last_event_time
if silence > self.max_silence:
return StreamEvent(
event_type=StreamEventType.ERROR,
data="Stream silence timeout"
)
heartbeat = asyncio.create_task(heartbeat_task())
try:
async for event in stream:
self.last_event_time = time.time() * 1000
yield event
finally:
heartbeat.cancel()
Production Streaming Service
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, List, Dict
import asyncio
app = FastAPI()
class StreamRequest(BaseModel):
provider: str = "openai"
model: str = "gpt-4o-mini"
messages: List[Dict]
max_tokens: int = 1024
buffer_config: Optional[Dict] = None
# Initialize components
streamer = UnifiedStreamer()
buffer_config = BufferConfig()
async def create_stream_pipeline(
request: StreamRequest
) -> AsyncIterator[str]:
"""Create full streaming pipeline."""
# Create base stream
stream = streamer.stream(
request.provider,
request.messages,
request.model,
max_tokens=request.max_tokens
)
# Add buffering
if request.buffer_config:
config = BufferConfig(**request.buffer_config)
else:
config = buffer_config
buffer = StreamBuffer(config)
stream = buffer.process(stream)
# Add accumulator for metrics
accumulator = StreamAccumulator()
stream = accumulator.process(stream)
# Convert to SSE
async for event in stream:
yield event.to_sse()
@app.post("/v1/chat/stream")
async def stream_chat(request: StreamRequest):
"""Stream chat completion."""
return StreamingResponse(
create_stream_pipeline(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@app.post("/v1/chat/stream/json")
async def stream_chat_json(request: StreamRequest):
"""Stream chat as JSON lines."""
async def generate():
async for event in streamer.stream(
request.provider,
request.messages,
request.model
):
yield event.to_json() + "\n"
return StreamingResponse(
generate(),
media_type="application/x-ndjson"
)
# WebSocket endpoint
@app.websocket("/v1/chat/ws")
async def websocket_chat(websocket: WebSocket):
"""WebSocket chat endpoint."""
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
request = StreamRequest(**data)
async for event in streamer.stream(
request.provider,
request.messages,
request.model
):
await websocket.send_text(event.to_json())
except WebSocketDisconnect:
pass
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Streaming: https://platform.openai.com/docs/api-reference/streaming
- Anthropic Streaming: https://docs.anthropic.com/claude/reference/streaming
- Server-Sent Events: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
- FastAPI StreamingResponse: https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
- WebSockets: https://websockets.readthedocs.io/
Conclusion
Streaming transforms LLM applications from sluggish to responsive. The key is choosing the right transport: Server-Sent Events (SSE) are simpler and work well for unidirectional streaming from server to client; WebSockets enable bidirectional communication for interactive applications. Buffering strategies matter for user experience—streaming individual tokens can feel jittery, while buffering by words or sentences creates smoother reading. Implement proper error handling with retry logic and timeouts; streams can fail mid-response, and users expect graceful recovery. For production systems, add health monitoring to detect stalled streams, implement connection management for scalability, and consider using message queues for high-throughput scenarios. On the client side, handle reconnection gracefully and provide visual feedback during streaming. The goal is to make AI responses feel instantaneous while maintaining reliability. With proper streaming implementation, users start seeing responses within milliseconds instead of waiting seconds, dramatically improving the perceived quality of your AI application.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.
