Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Streaming LLM Responses: Building Real-Time AI Applications

Introduction: Waiting 10-30 seconds for an LLM response feels like an eternity. Streaming changes everything—users see tokens appear in real-time, creating the illusion of instant response even when generation takes just as long. Beyond UX, streaming enables early termination (stop generating when you have enough), progressive processing (start working with partial responses), and better error handling (detect issues before waiting for full completion). This guide covers implementing streaming across OpenAI, Anthropic, and other providers, handling Server-Sent Events, building streaming UIs, and advanced patterns like streaming structured output.

LLM Streaming
Streaming: Real-time Token Delivery to UI

Basic Streaming with OpenAI

from openai import OpenAI

client = OpenAI()

def stream_completion(prompt: str, system_prompt: str = "") -> str:
    """Stream a completion and print tokens as they arrive."""
    
    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.append({"role": "user", "content": prompt})
    
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True
    )
    
    full_response = ""
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            full_response += content
    
    print()  # Newline at end
    return full_response

# Async streaming
import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def async_stream_completion(prompt: str) -> str:
    """Async streaming for better concurrency."""
    
    stream = await async_client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            full_response += content
    
    return full_response

# Stream with callback
def stream_with_callback(
    prompt: str,
    on_token: callable,
    on_complete: callable = None
) -> str:
    """Stream with callbacks for each token."""
    
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            on_token(content)
            full_response += content
    
    if on_complete:
        on_complete(full_response)
    
    return full_response

# Usage with callback
def print_token(token: str):
    print(token, end="", flush=True)

def on_done(response: str):
    print(f"\n\nTotal length: {len(response)} chars")

stream_with_callback("Explain quantum computing", print_token, on_done)

Streaming with Anthropic

import anthropic

client = anthropic.Anthropic()

def stream_claude(prompt: str) -> str:
    """Stream from Claude."""
    
    full_response = ""
    
    with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
            full_response += text
    
    return full_response

# With events for more control
def stream_claude_events(prompt: str) -> dict:
    """Stream with event handling."""
    
    result = {
        "content": "",
        "input_tokens": 0,
        "output_tokens": 0,
        "stop_reason": None
    }
    
    with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for event in stream:
            if event.type == "content_block_delta":
                if hasattr(event.delta, "text"):
                    print(event.delta.text, end="", flush=True)
                    result["content"] += event.delta.text
            
            elif event.type == "message_delta":
                result["stop_reason"] = event.delta.stop_reason
                result["output_tokens"] = event.usage.output_tokens
            
            elif event.type == "message_start":
                result["input_tokens"] = event.message.usage.input_tokens
    
    return result

# Async streaming
async def async_stream_claude(prompt: str) -> str:
    """Async Claude streaming."""
    
    async_client = anthropic.AsyncAnthropic()
    
    full_response = ""
    
    async with async_client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
            full_response += text
    
    return full_response

FastAPI Streaming Endpoint

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI()

async def generate_stream(prompt: str):
    """Generator for streaming response."""
    
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            # Format as Server-Sent Event
            yield f"data: {json.dumps({'content': content})}\n\n"
    
    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(request: Request):
    """Streaming chat endpoint."""
    
    body = await request.json()
    prompt = body.get("prompt", "")
    
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

# Client-side JavaScript to consume
"""
const eventSource = new EventSource('/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ prompt: 'Hello!' })
});

// Or with fetch for POST
async function streamChat(prompt) {
    const response = await fetch('/chat/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ prompt })
    });
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        const lines = chunk.split('\\n');
        
        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6);
                if (data === '[DONE]') return;
                
                const parsed = JSON.parse(data);
                document.getElementById('output').textContent += parsed.content;
            }
        }
    }
}
"""

Streaming Structured Output

import json
from typing import Generator

def stream_json_objects(prompt: str) -> Generator[dict, None, None]:
    """Stream and parse JSON objects as they complete."""
    
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": "Return a JSON array of objects. Each object on its own line."
            },
            {"role": "user", "content": prompt}
        ],
        stream=True
    )
    
    buffer = ""
    in_object = False
    brace_count = 0
    
    for chunk in stream:
        if chunk.choices[0].delta.content is None:
            continue
        
        content = chunk.choices[0].delta.content
        
        for char in content:
            buffer += char
            
            if char == '{':
                in_object = True
                brace_count += 1
            elif char == '}':
                brace_count -= 1
                
                if in_object and brace_count == 0:
                    # Complete object found
                    try:
                        # Find the start of the object
                        start = buffer.rfind('{')
                        obj_str = buffer[start:]
                        obj = json.loads(obj_str)
                        yield obj
                        buffer = ""
                    except json.JSONDecodeError:
                        pass
                    
                    in_object = False

# Partial JSON streaming with incremental parsing
class StreamingJSONParser:
    """Parse JSON incrementally as it streams."""
    
    def __init__(self):
        self.buffer = ""
        self.parsed_keys = set()
    
    def feed(self, chunk: str) -> dict:
        """Feed a chunk and return any newly completed fields."""
        self.buffer += chunk
        
        new_fields = {}
        
        # Try to parse what we have
        try:
            # Add closing braces to make valid JSON
            test_json = self.buffer
            open_braces = test_json.count('{') - test_json.count('}')
            open_brackets = test_json.count('[') - test_json.count(']')
            
            test_json += ']' * open_brackets + '}' * open_braces
            
            parsed = json.loads(test_json)
            
            # Find new complete fields
            for key, value in parsed.items():
                if key not in self.parsed_keys:
                    # Check if value looks complete
                    if isinstance(value, str) and not value.endswith('...'):
                        new_fields[key] = value
                        self.parsed_keys.add(key)
                    elif isinstance(value, (int, float, bool, list)):
                        new_fields[key] = value
                        self.parsed_keys.add(key)
        
        except json.JSONDecodeError:
            pass
        
        return new_fields

# Usage
def stream_with_partial_json(prompt: str):
    """Stream and emit partial JSON as fields complete."""
    
    parser = StreamingJSONParser()
    
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Return JSON with fields: title, summary, tags, score"},
            {"role": "user", "content": prompt}
        ],
        response_format={"type": "json_object"},
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            new_fields = parser.feed(chunk.choices[0].delta.content)
            
            for key, value in new_fields.items():
                print(f"Completed field '{key}': {value}")
                yield key, value

Early Termination and Cancellation

import threading

class CancellableStream:
    """Stream that can be cancelled mid-generation."""
    
    def __init__(self):
        self.cancelled = False
        self.content = ""
    
    def cancel(self):
        """Cancel the stream."""
        self.cancelled = True
    
    def stream(self, prompt: str, max_tokens: int = None) -> str:
        """Stream with cancellation support."""
        
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )
        
        for chunk in stream:
            if self.cancelled:
                print("\n[Cancelled]")
                break
            
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                self.content += content
                print(content, end="", flush=True)
                
                # Optional: stop at token limit
                if max_tokens and len(self.content.split()) >= max_tokens:
                    print("\n[Token limit reached]")
                    break
        
        return self.content

# Early termination based on content
def stream_until_condition(prompt: str, stop_condition: callable) -> str:
    """Stream until a condition is met."""
    
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    content = ""
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            token = chunk.choices[0].delta.content
            content += token
            print(token, end="", flush=True)
            
            if stop_condition(content):
                print("\n[Condition met, stopping]")
                break
    
    return content

# Example: Stop when we have a complete code block
def has_complete_code_block(text: str) -> bool:
    """Check if text contains a complete code block."""
    return text.count("```") >= 2

result = stream_until_condition(
    "Write a Python function to sort a list",
    has_complete_code_block
)

References

Conclusion

Streaming transforms LLM applications from frustrating wait-and-see experiences into responsive, interactive tools. The perceived latency drops from seconds to milliseconds as users see the first token almost immediately. Beyond UX, streaming enables powerful patterns: early termination saves tokens and cost, progressive parsing lets you act on partial results, and cancellation gives users control. Implementation is straightforward—most providers support streaming with a simple flag. The complexity comes in building robust streaming infrastructure: handling disconnections, parsing partial JSON, managing concurrent streams, and building responsive UIs. Start with basic token streaming, add Server-Sent Events for web applications, then explore advanced patterns like streaming structured output. The investment pays dividends in user satisfaction and application responsiveness.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

  

  

  

This site uses Akismet to reduce spam. Learn how your comment data is processed.