Introduction: Waiting 10-30 seconds for an LLM response feels like an eternity. Streaming changes everything—users see tokens appear in real-time, creating the illusion of instant response even when generation takes just as long. Beyond UX, streaming enables early termination (stop generating when you have enough), progressive processing (start working with partial responses), and better error handling (detect issues before waiting for full completion). This guide covers implementing streaming across OpenAI, Anthropic, and other providers, handling Server-Sent Events, building streaming UIs, and advanced patterns like streaming structured output.

Basic Streaming with OpenAI
from openai import OpenAI
client = OpenAI()
def stream_completion(prompt: str, system_prompt: str = "") -> str:
"""Stream a completion and print tokens as they arrive."""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
stream = client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content
print() # Newline at end
return full_response
# Async streaming
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def async_stream_completion(prompt: str) -> str:
"""Async streaming for better concurrency."""
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content
return full_response
# Stream with callback
def stream_with_callback(
prompt: str,
on_token: callable,
on_complete: callable = None
) -> str:
"""Stream with callbacks for each token."""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
on_token(content)
full_response += content
if on_complete:
on_complete(full_response)
return full_response
# Usage with callback
def print_token(token: str):
print(token, end="", flush=True)
def on_done(response: str):
print(f"\n\nTotal length: {len(response)} chars")
stream_with_callback("Explain quantum computing", print_token, on_done)
Streaming with Anthropic
import anthropic
client = anthropic.Anthropic()
def stream_claude(prompt: str) -> str:
"""Stream from Claude."""
full_response = ""
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
full_response += text
return full_response
# With events for more control
def stream_claude_events(prompt: str) -> dict:
"""Stream with event handling."""
result = {
"content": "",
"input_tokens": 0,
"output_tokens": 0,
"stop_reason": None
}
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
print(event.delta.text, end="", flush=True)
result["content"] += event.delta.text
elif event.type == "message_delta":
result["stop_reason"] = event.delta.stop_reason
result["output_tokens"] = event.usage.output_tokens
elif event.type == "message_start":
result["input_tokens"] = event.message.usage.input_tokens
return result
# Async streaming
async def async_stream_claude(prompt: str) -> str:
"""Async Claude streaming."""
async_client = anthropic.AsyncAnthropic()
full_response = ""
async with async_client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
full_response += text
return full_response
FastAPI Streaming Endpoint
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI()
async def generate_stream(prompt: str):
"""Generator for streaming response."""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
# Format as Server-Sent Event
yield f"data: {json.dumps({'content': content})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(request: Request):
"""Streaming chat endpoint."""
body = await request.json()
prompt = body.get("prompt", "")
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
# Client-side JavaScript to consume
"""
const eventSource = new EventSource('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt: 'Hello!' })
});
// Or with fetch for POST
async function streamChat(prompt) {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
const parsed = JSON.parse(data);
document.getElementById('output').textContent += parsed.content;
}
}
}
}
"""
Streaming Structured Output
import json
from typing import Generator
def stream_json_objects(prompt: str) -> Generator[dict, None, None]:
"""Stream and parse JSON objects as they complete."""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Return a JSON array of objects. Each object on its own line."
},
{"role": "user", "content": prompt}
],
stream=True
)
buffer = ""
in_object = False
brace_count = 0
for chunk in stream:
if chunk.choices[0].delta.content is None:
continue
content = chunk.choices[0].delta.content
for char in content:
buffer += char
if char == '{':
in_object = True
brace_count += 1
elif char == '}':
brace_count -= 1
if in_object and brace_count == 0:
# Complete object found
try:
# Find the start of the object
start = buffer.rfind('{')
obj_str = buffer[start:]
obj = json.loads(obj_str)
yield obj
buffer = ""
except json.JSONDecodeError:
pass
in_object = False
# Partial JSON streaming with incremental parsing
class StreamingJSONParser:
"""Parse JSON incrementally as it streams."""
def __init__(self):
self.buffer = ""
self.parsed_keys = set()
def feed(self, chunk: str) -> dict:
"""Feed a chunk and return any newly completed fields."""
self.buffer += chunk
new_fields = {}
# Try to parse what we have
try:
# Add closing braces to make valid JSON
test_json = self.buffer
open_braces = test_json.count('{') - test_json.count('}')
open_brackets = test_json.count('[') - test_json.count(']')
test_json += ']' * open_brackets + '}' * open_braces
parsed = json.loads(test_json)
# Find new complete fields
for key, value in parsed.items():
if key not in self.parsed_keys:
# Check if value looks complete
if isinstance(value, str) and not value.endswith('...'):
new_fields[key] = value
self.parsed_keys.add(key)
elif isinstance(value, (int, float, bool, list)):
new_fields[key] = value
self.parsed_keys.add(key)
except json.JSONDecodeError:
pass
return new_fields
# Usage
def stream_with_partial_json(prompt: str):
"""Stream and emit partial JSON as fields complete."""
parser = StreamingJSONParser()
stream = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Return JSON with fields: title, summary, tags, score"},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
new_fields = parser.feed(chunk.choices[0].delta.content)
for key, value in new_fields.items():
print(f"Completed field '{key}': {value}")
yield key, value
Early Termination and Cancellation
import threading
class CancellableStream:
"""Stream that can be cancelled mid-generation."""
def __init__(self):
self.cancelled = False
self.content = ""
def cancel(self):
"""Cancel the stream."""
self.cancelled = True
def stream(self, prompt: str, max_tokens: int = None) -> str:
"""Stream with cancellation support."""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if self.cancelled:
print("\n[Cancelled]")
break
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
self.content += content
print(content, end="", flush=True)
# Optional: stop at token limit
if max_tokens and len(self.content.split()) >= max_tokens:
print("\n[Token limit reached]")
break
return self.content
# Early termination based on content
def stream_until_condition(prompt: str, stop_condition: callable) -> str:
"""Stream until a condition is met."""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
content = ""
for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
content += token
print(token, end="", flush=True)
if stop_condition(content):
print("\n[Condition met, stopping]")
break
return content
# Example: Stop when we have a complete code block
def has_complete_code_block(text: str) -> bool:
"""Check if text contains a complete code block."""
return text.count("```") >= 2
result = stream_until_condition(
"Write a Python function to sort a list",
has_complete_code_block
)
References
- OpenAI Streaming: https://platform.openai.com/docs/api-reference/streaming
- Anthropic Streaming: https://docs.anthropic.com/claude/reference/streaming
- Server-Sent Events: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
- FastAPI StreamingResponse: https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
Conclusion
Streaming transforms LLM applications from frustrating wait-and-see experiences into responsive, interactive tools. The perceived latency drops from seconds to milliseconds as users see the first token almost immediately. Beyond UX, streaming enables powerful patterns: early termination saves tokens and cost, progressive parsing lets you act on partial results, and cancellation gives users control. Implementation is straightforward—most providers support streaming with a simple flag. The complexity comes in building robust streaming infrastructure: handling disconnections, parsing partial JSON, managing concurrent streams, and building responsive UIs. Start with basic token streaming, add Server-Sent Events for web applications, then explore advanced patterns like streaming structured output. The investment pays dividends in user satisfaction and application responsiveness.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.

Leave a Reply