Streaming LLM Responses: SSE, WebSockets, and Real-Time Token Delivery

Introduction: Streaming responses dramatically improve perceived latency in LLM applications. Instead of waiting seconds for a complete response, users see tokens appear in real-time, creating a more engaging experience. Implementing streaming correctly requires understanding Server-Sent Events (SSE), handling partial tokens, managing connection lifecycle, and gracefully handling errors mid-stream. This guide covers practical streaming patterns: basic streaming with OpenAI, buffering strategies, SSE implementation with FastAPI, WebSocket alternatives, and client-side consumption.

Streaming Responses
Streaming Pipeline: LLM API to Token Buffer to SSE Stream

Basic Streaming with OpenAI

from openai import OpenAI
from typing import Generator, AsyncGenerator

client = OpenAI()

def stream_completion(
    prompt: str,
    model: str = "gpt-4o-mini"
) -> Generator[str, None, None]:
    """Stream completion tokens synchronously."""
    
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    for chunk in response:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

# Usage
for token in stream_completion("Explain quantum computing"):
    print(token, end="", flush=True)

# Async streaming
async def async_stream_completion(
    prompt: str,
    model: str = "gpt-4o-mini"
) -> AsyncGenerator[str, None]:
    """Stream completion tokens asynchronously."""
    
    from openai import AsyncOpenAI
    async_client = AsyncOpenAI()
    
    response = await async_client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    async for chunk in response:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

# Streaming with metadata
from dataclasses import dataclass
from typing import Optional

@dataclass
class StreamChunk:
    """A chunk from the stream with metadata."""
    
    content: str
    finish_reason: Optional[str] = None
    model: Optional[str] = None
    index: int = 0

def stream_with_metadata(
    prompt: str,
    model: str = "gpt-4o-mini"
) -> Generator[StreamChunk, None, None]:
    """Stream with full chunk metadata."""
    
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    index = 0
    
    for chunk in response:
        choice = chunk.choices[0]
        
        yield StreamChunk(
            content=choice.delta.content or "",
            finish_reason=choice.finish_reason,
            model=chunk.model,
            index=index
        )
        
        index += 1

# Collect full response while streaming
def stream_and_collect(
    prompt: str,
    model: str = "gpt-4o-mini"
) -> tuple[Generator[str, None, None], list[str]]:
    """Stream tokens and collect full response."""
    
    collected = []
    
    def generator():
        for token in stream_completion(prompt, model):
            collected.append(token)
            yield token
    
    return generator(), collected

Token Buffering Strategies

from collections import deque
from typing import Callable

class TokenBuffer:
    """Buffer tokens for processing."""
    
    def __init__(self, flush_on: list[str] = None):
        self.buffer = ""
        self.flush_triggers = flush_on or [". ", "! ", "? ", "\n"]
    
    def add(self, token: str) -> list[str]:
        """Add token and return complete segments."""
        
        self.buffer += token
        segments = []
        
        # Check for flush triggers
        for trigger in self.flush_triggers:
            while trigger in self.buffer:
                idx = self.buffer.index(trigger) + len(trigger)
                segments.append(self.buffer[:idx])
                self.buffer = self.buffer[idx:]
        
        return segments
    
    def flush(self) -> str:
        """Flush remaining buffer."""
        
        remaining = self.buffer
        self.buffer = ""
        return remaining

class SentenceBuffer:
    """Buffer that yields complete sentences."""
    
    def __init__(self):
        self.buffer = ""
        self.sentence_endings = ".!?"
    
    def add(self, token: str) -> list[str]:
        """Add token and return complete sentences."""
        
        self.buffer += token
        sentences = []
        
        i = 0
        while i < len(self.buffer):
            if self.buffer[i] in self.sentence_endings:
                # Check for sentence end (followed by space or end)
                if i + 1 >= len(self.buffer) or self.buffer[i + 1] in " \n":
                    sentence = self.buffer[:i + 1].strip()
                    if sentence:
                        sentences.append(sentence)
                    self.buffer = self.buffer[i + 1:].lstrip()
                    i = 0
                    continue
            i += 1
        
        return sentences
    
    def flush(self) -> str:
        remaining = self.buffer.strip()
        self.buffer = ""
        return remaining

class WordBuffer:
    """Buffer that yields complete words."""
    
    def __init__(self, min_words: int = 1):
        self.buffer = ""
        self.min_words = min_words
    
    def add(self, token: str) -> list[str]:
        """Add token and return complete words."""
        
        self.buffer += token
        words = []
        
        # Split on whitespace
        parts = self.buffer.split()
        
        if len(parts) > self.min_words:
            # Keep last part as potentially incomplete
            complete = parts[:-1]
            self.buffer = parts[-1] if parts else ""
            words = complete
        
        return words
    
    def flush(self) -> str:
        remaining = self.buffer.strip()
        self.buffer = ""
        return remaining

# Streaming with buffering
def stream_sentences(
    prompt: str,
    model: str = "gpt-4o-mini"
) -> Generator[str, None, None]:
    """Stream complete sentences."""
    
    buffer = SentenceBuffer()
    
    for token in stream_completion(prompt, model):
        sentences = buffer.add(token)
        for sentence in sentences:
            yield sentence
    
    # Flush remaining
    remaining = buffer.flush()
    if remaining:
        yield remaining

# Usage
for sentence in stream_sentences("Tell me a short story"):
    print(f"[Sentence]: {sentence}")

Server-Sent Events (SSE) with FastAPI

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import json
import asyncio

app = FastAPI()

async def generate_sse_stream(prompt: str):
    """Generate SSE events from LLM stream."""
    
    async for token in async_stream_completion(prompt):
        # SSE format: data: {json}\n\n
        data = json.dumps({"token": token})
        yield f"data: {data}\n\n"
    
    # Send done event
    yield f"data: {json.dumps({'done': True})}\n\n"

@app.get("/stream")
async def stream_endpoint(prompt: str):
    """SSE streaming endpoint."""
    
    return StreamingResponse(
        generate_sse_stream(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # Disable nginx buffering
        }
    )

# With EventSourceResponse for better SSE support
@app.get("/stream/v2")
async def stream_v2(prompt: str):
    """SSE streaming with EventSourceResponse."""
    
    async def event_generator():
        async for token in async_stream_completion(prompt):
            yield {
                "event": "token",
                "data": json.dumps({"content": token})
            }
        
        yield {
            "event": "done",
            "data": json.dumps({"finished": True})
        }
    
    return EventSourceResponse(event_generator())

# Streaming with progress events
@app.get("/stream/progress")
async def stream_with_progress(prompt: str):
    """Stream with progress updates."""
    
    async def event_generator():
        tokens = []
        
        yield {
            "event": "start",
            "data": json.dumps({"status": "generating"})
        }
        
        async for token in async_stream_completion(prompt):
            tokens.append(token)
            
            yield {
                "event": "token",
                "data": json.dumps({
                    "content": token,
                    "total_tokens": len(tokens)
                })
            }
        
        yield {
            "event": "complete",
            "data": json.dumps({
                "status": "done",
                "total_tokens": len(tokens),
                "full_response": "".join(tokens)
            })
        }
    
    return EventSourceResponse(event_generator())

# Handle client disconnection
@app.get("/stream/safe")
async def safe_stream(request: Request, prompt: str):
    """Stream with disconnection handling."""
    
    async def event_generator():
        try:
            async for token in async_stream_completion(prompt):
                # Check if client disconnected
                if await request.is_disconnected():
                    print("Client disconnected")
                    break
                
                yield {
                    "event": "token",
                    "data": json.dumps({"content": token})
                }
                
        except asyncio.CancelledError:
            print("Stream cancelled")
        
        finally:
            yield {
                "event": "done",
                "data": json.dumps({"finished": True})
            }
    
    return EventSourceResponse(event_generator())

WebSocket Streaming

from fastapi import WebSocket, WebSocketDisconnect
import json

@app.websocket("/ws/stream")
async def websocket_stream(websocket: WebSocket):
    """WebSocket streaming endpoint."""
    
    await websocket.accept()
    
    try:
        while True:
            # Receive prompt from client
            data = await websocket.receive_json()
            prompt = data.get("prompt", "")
            
            if not prompt:
                await websocket.send_json({"error": "No prompt provided"})
                continue
            
            # Stream response
            await websocket.send_json({"type": "start"})
            
            full_response = []
            
            async for token in async_stream_completion(prompt):
                full_response.append(token)
                await websocket.send_json({
                    "type": "token",
                    "content": token
                })
            
            await websocket.send_json({
                "type": "done",
                "full_response": "".join(full_response)
            })
            
    except WebSocketDisconnect:
        print("WebSocket disconnected")

# WebSocket with conversation history
class ConversationWebSocket:
    """WebSocket handler with conversation state."""
    
    def __init__(self):
        self.conversations: dict[str, list[dict]] = {}
    
    async def handle(self, websocket: WebSocket, conversation_id: str):
        """Handle WebSocket connection."""
        
        await websocket.accept()
        
        if conversation_id not in self.conversations:
            self.conversations[conversation_id] = []
        
        messages = self.conversations[conversation_id]
        
        try:
            while True:
                data = await websocket.receive_json()
                user_message = data.get("message", "")
                
                messages.append({"role": "user", "content": user_message})
                
                # Stream response
                assistant_response = []
                
                async for token in self._stream_with_history(messages):
                    assistant_response.append(token)
                    await websocket.send_json({
                        "type": "token",
                        "content": token
                    })
                
                full_response = "".join(assistant_response)
                messages.append({"role": "assistant", "content": full_response})
                
                await websocket.send_json({
                    "type": "done",
                    "message_count": len(messages)
                })
                
        except WebSocketDisconnect:
            pass
    
    async def _stream_with_history(
        self,
        messages: list[dict]
    ) -> AsyncGenerator[str, None]:
        """Stream with conversation history."""
        
        from openai import AsyncOpenAI
        client = AsyncOpenAI()
        
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            stream=True
        )
        
        async for chunk in response:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

conversation_handler = ConversationWebSocket()

@app.websocket("/ws/chat/{conversation_id}")
async def chat_websocket(websocket: WebSocket, conversation_id: str):
    await conversation_handler.handle(websocket, conversation_id)

Client-Side Consumption

# Python client for SSE
import httpx

async def consume_sse_stream(url: str, prompt: str):
    """Consume SSE stream from server."""
    
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "GET",
            url,
            params={"prompt": prompt},
            timeout=None
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = json.loads(line[6:])
                    
                    if data.get("done"):
                        break
                    
                    token = data.get("token", "")
                    print(token, end="", flush=True)

# JavaScript client example (for reference)
"""
// EventSource (SSE)
const eventSource = new EventSource(`/stream?prompt=${encodeURIComponent(prompt)}`);

eventSource.addEventListener('token', (event) => {
    const data = JSON.parse(event.data);
    document.getElementById('output').textContent += data.content;
});

eventSource.addEventListener('done', (event) => {
    eventSource.close();
});

eventSource.onerror = (error) => {
    console.error('SSE error:', error);
    eventSource.close();
};

// WebSocket
const ws = new WebSocket('ws://localhost:8000/ws/stream');

ws.onopen = () => {
    ws.send(JSON.stringify({ prompt: 'Hello' }));
};

ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.type === 'token') {
        document.getElementById('output').textContent += data.content;
    } else if (data.type === 'done') {
        console.log('Stream complete');
    }
};

// Fetch with ReadableStream
async function streamFetch(prompt) {
    const response = await fetch(`/stream?prompt=${encodeURIComponent(prompt)}`);
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const text = decoder.decode(value);
        // Parse SSE format
        const lines = text.split('\\n');
        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = JSON.parse(line.slice(6));
                console.log(data.token);
            }
        }
    }
}
"""

# Streaming with retry logic
class StreamingClient:
    """Client with retry and reconnection."""
    
    def __init__(self, base_url: str, max_retries: int = 3):
        self.base_url = base_url
        self.max_retries = max_retries
    
    async def stream(
        self,
        prompt: str,
        on_token: Callable[[str], None] = None
    ) -> str:
        """Stream with automatic retry."""
        
        collected = []
        retries = 0
        
        while retries < self.max_retries:
            try:
                async with httpx.AsyncClient() as client:
                    async with client.stream(
                        "GET",
                        f"{self.base_url}/stream",
                        params={"prompt": prompt},
                        timeout=httpx.Timeout(60.0, connect=10.0)
                    ) as response:
                        async for line in response.aiter_lines():
                            if line.startswith("data: "):
                                data = json.loads(line[6:])
                                
                                if data.get("done"):
                                    return "".join(collected)
                                
                                token = data.get("token", "")
                                collected.append(token)
                                
                                if on_token:
                                    on_token(token)
                
            except (httpx.ReadTimeout, httpx.ConnectError) as e:
                retries += 1
                if retries >= self.max_retries:
                    raise
                
                await asyncio.sleep(2 ** retries)
        
        return "".join(collected)

Production Streaming Service

from fastapi import FastAPI, HTTPException, Request, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import time
import uuid

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

class StreamRequest(BaseModel):
    prompt: str
    model: str = "gpt-4o-mini"
    max_tokens: int = 1000

# Metrics tracking
stream_metrics = {
    "active_streams": 0,
    "total_streams": 0,
    "total_tokens": 0
}

@app.post("/v1/stream")
async def stream_completion_endpoint(
    request: Request,
    body: StreamRequest
):
    """Production streaming endpoint."""
    
    stream_id = str(uuid.uuid4())
    start_time = time.time()
    
    stream_metrics["active_streams"] += 1
    stream_metrics["total_streams"] += 1
    
    async def event_generator():
        tokens_generated = 0
        
        try:
            # Send stream start
            yield {
                "event": "stream_start",
                "data": json.dumps({
                    "stream_id": stream_id,
                    "model": body.model
                })
            }
            
            async for token in async_stream_completion(body.prompt, body.model):
                # Check client connection
                if await request.is_disconnected():
                    break
                
                tokens_generated += 1
                
                yield {
                    "event": "token",
                    "data": json.dumps({
                        "content": token,
                        "index": tokens_generated
                    })
                }
                
                # Respect max_tokens
                if tokens_generated >= body.max_tokens:
                    break
            
            # Send completion
            duration = time.time() - start_time
            
            yield {
                "event": "stream_end",
                "data": json.dumps({
                    "stream_id": stream_id,
                    "tokens_generated": tokens_generated,
                    "duration_ms": duration * 1000,
                    "tokens_per_second": tokens_generated / duration if duration > 0 else 0
                })
            }
            
        except Exception as e:
            yield {
                "event": "error",
                "data": json.dumps({
                    "error": str(e),
                    "stream_id": stream_id
                })
            }
        
        finally:
            stream_metrics["active_streams"] -= 1
            stream_metrics["total_tokens"] += tokens_generated
    
    return EventSourceResponse(
        event_generator(),
        headers={
            "X-Stream-ID": stream_id,
            "Cache-Control": "no-cache"
        }
    )

@app.get("/v1/stream/metrics")
async def get_stream_metrics():
    """Get streaming metrics."""
    return stream_metrics

@app.get("/health")
async def health():
    return {
        "status": "healthy",
        "active_streams": stream_metrics["active_streams"]
    }

References

Conclusion

Streaming dramatically improves user experience in LLM applications by showing responses as they generate. Use Server-Sent Events (SSE) for simple one-way streaming—it’s well-supported and easy to implement. WebSockets are better for bidirectional communication like chat applications. Implement token buffering when you need to process complete sentences or words rather than raw tokens. Handle client disconnections gracefully to avoid wasted API calls. For production, track streaming metrics, implement timeouts, and consider rate limiting per-stream. The key is balancing responsiveness with reliability—users should see tokens immediately while the system handles errors and disconnections gracefully.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.