Introduction: Waiting for complete LLM responses creates poor user experiences. Users stare at loading spinners while models generate hundreds of tokens. Streaming delivers tokens as they’re generated, showing users immediate progress and reducing perceived latency dramatically. But streaming introduces complexity: you need to handle partial responses, buffer tokens for processing, manage connection failures mid-stream, and coordinate streaming with your application logic. This guide covers practical streaming patterns: consuming streaming APIs, buffering and processing tokens in real-time, handling errors during streams, and building streaming systems that feel responsive while remaining robust.

Basic Streaming
from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator, Callable
import asyncio
@dataclass
class StreamChunk:
"""A chunk from a streaming response."""
content: str
finish_reason: str = None
index: int = 0
model: str = None
@dataclass
class StreamResult:
"""Complete result after streaming."""
content: str
chunks: list[StreamChunk]
finish_reason: str
total_tokens: int = 0
class StreamingClient:
"""Client for streaming LLM responses."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
async def stream(
self,
messages: list[dict],
on_chunk: Callable[[StreamChunk], None] = None,
**kwargs
) -> StreamResult:
"""Stream response and optionally process chunks."""
chunks = []
content_parts = []
finish_reason = None
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True,
**kwargs
)
async for chunk in response:
if chunk.choices:
choice = chunk.choices[0]
delta = choice.delta
if delta.content:
stream_chunk = StreamChunk(
content=delta.content,
finish_reason=choice.finish_reason,
index=len(chunks),
model=chunk.model
)
chunks.append(stream_chunk)
content_parts.append(delta.content)
if on_chunk:
on_chunk(stream_chunk)
if choice.finish_reason:
finish_reason = choice.finish_reason
return StreamResult(
content="".join(content_parts),
chunks=chunks,
finish_reason=finish_reason
)
async def stream_iter(
self,
messages: list[dict],
**kwargs
) -> AsyncIterator[StreamChunk]:
"""Yield chunks as async iterator."""
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True,
**kwargs
)
index = 0
async for chunk in response:
if chunk.choices:
choice = chunk.choices[0]
delta = choice.delta
if delta.content:
yield StreamChunk(
content=delta.content,
finish_reason=choice.finish_reason,
index=index,
model=chunk.model
)
index += 1
# Usage example
async def print_streaming():
client = StreamingClient(openai_client)
async for chunk in client.stream_iter([
{"role": "user", "content": "Write a short poem"}
]):
print(chunk.content, end="", flush=True)
print() # Newline at end
Stream Buffering
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator, Callable
import asyncio
import re
class StreamBuffer:
"""Buffer for accumulating stream chunks."""
def __init__(self):
self._buffer = []
self._complete = False
self._error: Exception = None
def append(self, chunk: StreamChunk):
"""Add chunk to buffer."""
self._buffer.append(chunk)
def mark_complete(self):
"""Mark stream as complete."""
self._complete = True
def mark_error(self, error: Exception):
"""Mark stream as failed."""
self._error = error
self._complete = True
@property
def content(self) -> str:
"""Get accumulated content."""
return "".join(c.content for c in self._buffer)
@property
def is_complete(self) -> bool:
"""Check if stream is complete."""
return self._complete
@property
def chunk_count(self) -> int:
"""Number of chunks received."""
return len(self._buffer)
class WordBuffer:
"""Buffer that yields complete words."""
def __init__(self):
self._partial = ""
def add(self, text: str) -> list[str]:
"""Add text and return complete words."""
self._partial += text
# Split on whitespace
parts = self._partial.split()
if self._partial.endswith((' ', '\n', '\t')):
# All parts are complete
self._partial = ""
return parts
else:
# Last part might be incomplete
self._partial = parts[-1] if parts else ""
return parts[:-1] if len(parts) > 1 else []
def flush(self) -> str:
"""Get remaining partial content."""
result = self._partial
self._partial = ""
return result
class SentenceBuffer:
"""Buffer that yields complete sentences."""
SENTENCE_ENDINGS = re.compile(r'([.!?])\s+')
def __init__(self):
self._partial = ""
def add(self, text: str) -> list[str]:
"""Add text and return complete sentences."""
self._partial += text
sentences = []
while True:
match = self.SENTENCE_ENDINGS.search(self._partial)
if not match:
break
end_pos = match.end()
sentences.append(self._partial[:end_pos].strip())
self._partial = self._partial[end_pos:]
return sentences
def flush(self) -> str:
"""Get remaining partial content."""
result = self._partial.strip()
self._partial = ""
return result
class LineBuffer:
"""Buffer that yields complete lines."""
def __init__(self):
self._partial = ""
def add(self, text: str) -> list[str]:
"""Add text and return complete lines."""
self._partial += text
lines = self._partial.split('\n')
if self._partial.endswith('\n'):
self._partial = ""
return lines[:-1] # Exclude empty string after final newline
else:
self._partial = lines[-1]
return lines[:-1]
def flush(self) -> str:
"""Get remaining partial content."""
result = self._partial
self._partial = ""
return result
class BufferedStreamProcessor:
"""Process stream with buffering."""
def __init__(
self,
buffer_type: str = "word" # "word", "sentence", "line"
):
if buffer_type == "word":
self.buffer = WordBuffer()
elif buffer_type == "sentence":
self.buffer = SentenceBuffer()
elif buffer_type == "line":
self.buffer = LineBuffer()
else:
raise ValueError(f"Unknown buffer type: {buffer_type}")
async def process_stream(
self,
stream: AsyncIterator[StreamChunk],
on_item: Callable[[str], None]
) -> str:
"""Process stream with buffering."""
full_content = []
async for chunk in stream:
full_content.append(chunk.content)
items = self.buffer.add(chunk.content)
for item in items:
on_item(item)
# Flush remaining
remaining = self.buffer.flush()
if remaining:
on_item(remaining)
return "".join(full_content)
Stream Processing
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator, Callable
import asyncio
import json
import re
class StreamParser:
"""Parse structured content from streams."""
def __init__(self):
self._buffer = ""
def add(self, text: str) -> list[Any]:
"""Add text and return parsed items."""
raise NotImplementedError
class JSONStreamParser(StreamParser):
"""Parse JSON objects from stream."""
def __init__(self):
super().__init__()
self._brace_count = 0
self._in_string = False
self._escape_next = False
def add(self, text: str) -> list[dict]:
"""Add text and return complete JSON objects."""
objects = []
for char in text:
self._buffer += char
if self._escape_next:
self._escape_next = False
continue
if char == '\\':
self._escape_next = True
continue
if char == '"':
self._in_string = not self._in_string
continue
if self._in_string:
continue
if char == '{':
self._brace_count += 1
elif char == '}':
self._brace_count -= 1
if self._brace_count == 0:
# Complete object
try:
obj = json.loads(self._buffer.strip())
objects.append(obj)
except json.JSONDecodeError:
pass
self._buffer = ""
return objects
class CodeBlockParser(StreamParser):
"""Parse code blocks from stream."""
CODE_BLOCK_START = re.compile(r'```(\w*)\n?')
CODE_BLOCK_END = '```'
def __init__(self):
super().__init__()
self._in_code_block = False
self._language = None
self._code_buffer = ""
def add(self, text: str) -> list[dict]:
"""Add text and return complete code blocks."""
self._buffer += text
blocks = []
while True:
if not self._in_code_block:
# Look for start
match = self.CODE_BLOCK_START.search(self._buffer)
if match:
self._in_code_block = True
self._language = match.group(1) or None
self._buffer = self._buffer[match.end():]
self._code_buffer = ""
else:
break
else:
# Look for end
end_pos = self._buffer.find(self.CODE_BLOCK_END)
if end_pos >= 0:
self._code_buffer += self._buffer[:end_pos]
blocks.append({
"language": self._language,
"code": self._code_buffer.strip()
})
self._buffer = self._buffer[end_pos + 3:]
self._in_code_block = False
else:
self._code_buffer += self._buffer
self._buffer = ""
break
return blocks
class StreamTransformer:
"""Transform stream content in real-time."""
def __init__(self, transform: Callable[[str], str]):
self.transform = transform
async def transform_stream(
self,
stream: AsyncIterator[StreamChunk]
) -> AsyncIterator[StreamChunk]:
"""Transform each chunk."""
async for chunk in stream:
transformed = self.transform(chunk.content)
yield StreamChunk(
content=transformed,
finish_reason=chunk.finish_reason,
index=chunk.index,
model=chunk.model
)
class StreamAggregator:
"""Aggregate multiple streams."""
async def merge_streams(
self,
streams: list[AsyncIterator[StreamChunk]]
) -> AsyncIterator[tuple[int, StreamChunk]]:
"""Merge multiple streams, yielding (stream_index, chunk)."""
async def read_stream(index: int, stream: AsyncIterator[StreamChunk]):
async for chunk in stream:
yield (index, chunk)
# Create tasks for all streams
iterators = [read_stream(i, s) for i, s in enumerate(streams)]
# Use asyncio to merge
pending = set()
for it in iterators:
task = asyncio.create_task(it.__anext__())
pending.add((task, it))
while pending:
done, _ = await asyncio.wait(
[t for t, _ in pending],
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
# Find the iterator for this task
it = next(i for t, i in pending if t == task)
pending = {(t, i) for t, i in pending if t != task}
try:
result = task.result()
yield result
# Schedule next read
new_task = asyncio.create_task(it.__anext__())
pending.add((new_task, it))
except StopAsyncIteration:
pass
Server-Sent Events
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
import asyncio
import json
@dataclass
class SSEEvent:
"""Server-Sent Event."""
data: str
event: str = None
id: str = None
retry: int = None
def encode(self) -> str:
"""Encode as SSE format."""
lines = []
if self.event:
lines.append(f"event: {self.event}")
if self.id:
lines.append(f"id: {self.id}")
if self.retry:
lines.append(f"retry: {self.retry}")
# Data can be multiline
for line in self.data.split('\n'):
lines.append(f"data: {line}")
return '\n'.join(lines) + '\n\n'
class SSEStreamAdapter:
"""Adapt LLM stream to SSE format."""
def __init__(self, streaming_client: StreamingClient):
self.client = streaming_client
async def stream_sse(
self,
messages: list[dict],
**kwargs
) -> AsyncIterator[SSEEvent]:
"""Stream as SSE events."""
event_id = 0
async for chunk in self.client.stream_iter(messages, **kwargs):
yield SSEEvent(
data=json.dumps({
"content": chunk.content,
"finish_reason": chunk.finish_reason
}),
event="chunk",
id=str(event_id)
)
event_id += 1
# Send done event
yield SSEEvent(
data=json.dumps({"done": True}),
event="done",
id=str(event_id)
)
# FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/v1/stream")
async def stream_completion(messages: list[dict]):
"""Stream completion as SSE."""
adapter = SSEStreamAdapter(streaming_client)
async def generate():
async for event in adapter.stream_sse(messages):
yield event.encode()
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
class SSEClient:
"""Client for consuming SSE streams."""
def __init__(self, http_client: Any):
self.http_client = http_client
async def stream(self, url: str, **kwargs) -> AsyncIterator[SSEEvent]:
"""Consume SSE stream."""
async with self.http_client.stream("GET", url, **kwargs) as response:
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while '\n\n' in buffer:
event_str, buffer = buffer.split('\n\n', 1)
event = self._parse_event(event_str)
if event:
yield event
def _parse_event(self, event_str: str) -> Optional[SSEEvent]:
"""Parse SSE event from string."""
data_lines = []
event = None
event_id = None
retry = None
for line in event_str.split('\n'):
if line.startswith('data: '):
data_lines.append(line[6:])
elif line.startswith('event: '):
event = line[7:]
elif line.startswith('id: '):
event_id = line[4:]
elif line.startswith('retry: '):
retry = int(line[7:])
if data_lines:
return SSEEvent(
data='\n'.join(data_lines),
event=event,
id=event_id,
retry=retry
)
return None
WebSocket Streaming
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
import asyncio
import json
@dataclass
class WSMessage:
"""WebSocket message."""
type: str
data: Any
def to_json(self) -> str:
return json.dumps({"type": self.type, "data": self.data})
@classmethod
def from_json(cls, text: str) -> "WSMessage":
obj = json.loads(text)
return cls(type=obj["type"], data=obj["data"])
class WebSocketStreamHandler:
"""Handle streaming over WebSocket."""
def __init__(self, streaming_client: StreamingClient):
self.client = streaming_client
async def handle_connection(self, websocket: Any):
"""Handle WebSocket connection."""
try:
async for message in websocket:
msg = WSMessage.from_json(message)
if msg.type == "complete":
await self._handle_completion(websocket, msg.data)
elif msg.type == "cancel":
# Handle cancellation
pass
except Exception as e:
await websocket.send(WSMessage(
type="error",
data={"message": str(e)}
).to_json())
async def _handle_completion(self, websocket: Any, data: dict):
"""Handle completion request."""
messages = data.get("messages", [])
# Send start event
await websocket.send(WSMessage(
type="start",
data={}
).to_json())
# Stream chunks
async for chunk in self.client.stream_iter(messages):
await websocket.send(WSMessage(
type="chunk",
data={
"content": chunk.content,
"index": chunk.index
}
).to_json())
# Send done event
await websocket.send(WSMessage(
type="done",
data={}
).to_json())
# FastAPI WebSocket endpoint
from fastapi import WebSocket
@app.websocket("/v1/ws/stream")
async def websocket_stream(websocket: WebSocket):
"""WebSocket streaming endpoint."""
await websocket.accept()
handler = WebSocketStreamHandler(streaming_client)
await handler.handle_connection(websocket)
class WebSocketStreamClient:
"""Client for WebSocket streaming."""
def __init__(self, url: str):
self.url = url
self.websocket = None
async def connect(self):
"""Connect to WebSocket."""
import websockets
self.websocket = await websockets.connect(self.url)
async def disconnect(self):
"""Disconnect from WebSocket."""
if self.websocket:
await self.websocket.close()
async def stream_completion(
self,
messages: list[dict]
) -> AsyncIterator[str]:
"""Stream completion over WebSocket."""
# Send request
await self.websocket.send(WSMessage(
type="complete",
data={"messages": messages}
).to_json())
# Receive chunks
async for message in self.websocket:
msg = WSMessage.from_json(message)
if msg.type == "chunk":
yield msg.data["content"]
elif msg.type == "done":
break
elif msg.type == "error":
raise Exception(msg.data["message"])
Production Streaming Service
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import asyncio
app = FastAPI()
# Initialize components
streaming_client = None # Initialize with client
class StreamRequest(BaseModel):
messages: list[dict]
model: str = "gpt-4o-mini"
buffer_type: Optional[str] = None # "word", "sentence", "line"
@app.post("/v1/stream/raw")
async def stream_raw(request: StreamRequest):
"""Stream raw chunks."""
async def generate():
async for chunk in streaming_client.stream_iter(
request.messages,
model=request.model
):
yield f"data: {json.dumps({'content': chunk.content})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@app.post("/v1/stream/buffered")
async def stream_buffered(request: StreamRequest):
"""Stream with buffering."""
buffer_type = request.buffer_type or "word"
processor = BufferedStreamProcessor(buffer_type)
async def generate():
items_queue = asyncio.Queue()
async def collect_items():
stream = streaming_client.stream_iter(
request.messages,
model=request.model
)
await processor.process_stream(
stream,
lambda item: items_queue.put_nowait(item)
)
await items_queue.put(None) # Signal done
# Start collection task
task = asyncio.create_task(collect_items())
# Yield items as they arrive
while True:
item = await items_queue.get()
if item is None:
break
yield f"data: {json.dumps({'item': item})}\n\n"
await task
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@app.post("/v1/stream/json")
async def stream_json_objects(request: StreamRequest):
"""Stream parsed JSON objects."""
parser = JSONStreamParser()
async def generate():
async for chunk in streaming_client.stream_iter(
request.messages,
model=request.model
):
objects = parser.add(chunk.content)
for obj in objects:
yield f"data: {json.dumps({'object': obj})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Streaming: https://platform.openai.com/docs/api-reference/streaming
- Server-Sent Events: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
- WebSocket Protocol: https://datatracker.ietf.org/doc/html/rfc6455
- FastAPI Streaming: https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
Conclusion
Streaming transforms the user experience of LLM applications from waiting to watching. Start with basic streaming that yields tokens as they arrive—even this simple change dramatically improves perceived responsiveness. Use buffering when you need to process complete units like words, sentences, or lines rather than raw tokens. Implement parsers for structured content like JSON or code blocks that need to be complete before processing. Choose your transport wisely: Server-Sent Events work well for simple one-way streaming, WebSockets enable bidirectional communication for interactive applications. Handle errors gracefully—streams can fail mid-response, and your application needs to recover without losing context. The key insight is that streaming isn’t just about speed—it’s about feedback. Users who see progress feel engaged; users who stare at spinners feel frustrated. Build streaming into your LLM applications from the start, and your users will thank you.
