Streaming Response Patterns: Building Responsive LLM Applications

Introduction: Waiting for complete LLM responses creates poor user experiences. Users stare at loading spinners while models generate hundreds of tokens. Streaming delivers tokens as they’re generated, showing users immediate progress and reducing perceived latency dramatically. But streaming introduces complexity: you need to handle partial responses, buffer tokens for processing, manage connection failures mid-stream, and coordinate streaming with your application logic. This guide covers practical streaming patterns: consuming streaming APIs, buffering and processing tokens in real-time, handling errors during streams, and building streaming systems that feel responsive while remaining robust.

Streaming Response
Streaming Pipeline: Token Chunks, Stream Buffer, Real-time Processing

Basic Streaming

from dataclasses import dataclass, field
from typing import Any, Optional, AsyncIterator, Callable
import asyncio

@dataclass
class StreamChunk:
    """A chunk from a streaming response."""
    
    content: str
    finish_reason: str = None
    index: int = 0
    model: str = None

@dataclass
class StreamResult:
    """Complete result after streaming."""
    
    content: str
    chunks: list[StreamChunk]
    finish_reason: str
    total_tokens: int = 0

class StreamingClient:
    """Client for streaming LLM responses."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
    
    async def stream(
        self,
        messages: list[dict],
        on_chunk: Callable[[StreamChunk], None] = None,
        **kwargs
    ) -> StreamResult:
        """Stream response and optionally process chunks."""
        
        chunks = []
        content_parts = []
        finish_reason = None
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            stream=True,
            **kwargs
        )
        
        async for chunk in response:
            if chunk.choices:
                choice = chunk.choices[0]
                delta = choice.delta
                
                if delta.content:
                    stream_chunk = StreamChunk(
                        content=delta.content,
                        finish_reason=choice.finish_reason,
                        index=len(chunks),
                        model=chunk.model
                    )
                    
                    chunks.append(stream_chunk)
                    content_parts.append(delta.content)
                    
                    if on_chunk:
                        on_chunk(stream_chunk)
                
                if choice.finish_reason:
                    finish_reason = choice.finish_reason
        
        return StreamResult(
            content="".join(content_parts),
            chunks=chunks,
            finish_reason=finish_reason
        )
    
    async def stream_iter(
        self,
        messages: list[dict],
        **kwargs
    ) -> AsyncIterator[StreamChunk]:
        """Yield chunks as async iterator."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            stream=True,
            **kwargs
        )
        
        index = 0
        async for chunk in response:
            if chunk.choices:
                choice = chunk.choices[0]
                delta = choice.delta
                
                if delta.content:
                    yield StreamChunk(
                        content=delta.content,
                        finish_reason=choice.finish_reason,
                        index=index,
                        model=chunk.model
                    )
                    index += 1

# Usage example
async def print_streaming():
    client = StreamingClient(openai_client)
    
    async for chunk in client.stream_iter([
        {"role": "user", "content": "Write a short poem"}
    ]):
        print(chunk.content, end="", flush=True)
    
    print()  # Newline at end

Stream Buffering

from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator, Callable
import asyncio
import re

class StreamBuffer:
    """Buffer for accumulating stream chunks."""
    
    def __init__(self):
        self._buffer = []
        self._complete = False
        self._error: Exception = None
    
    def append(self, chunk: StreamChunk):
        """Add chunk to buffer."""
        self._buffer.append(chunk)
    
    def mark_complete(self):
        """Mark stream as complete."""
        self._complete = True
    
    def mark_error(self, error: Exception):
        """Mark stream as failed."""
        self._error = error
        self._complete = True
    
    @property
    def content(self) -> str:
        """Get accumulated content."""
        return "".join(c.content for c in self._buffer)
    
    @property
    def is_complete(self) -> bool:
        """Check if stream is complete."""
        return self._complete
    
    @property
    def chunk_count(self) -> int:
        """Number of chunks received."""
        return len(self._buffer)

class WordBuffer:
    """Buffer that yields complete words."""
    
    def __init__(self):
        self._partial = ""
    
    def add(self, text: str) -> list[str]:
        """Add text and return complete words."""
        
        self._partial += text
        
        # Split on whitespace
        parts = self._partial.split()
        
        if self._partial.endswith((' ', '\n', '\t')):
            # All parts are complete
            self._partial = ""
            return parts
        else:
            # Last part might be incomplete
            self._partial = parts[-1] if parts else ""
            return parts[:-1] if len(parts) > 1 else []
    
    def flush(self) -> str:
        """Get remaining partial content."""
        result = self._partial
        self._partial = ""
        return result

class SentenceBuffer:
    """Buffer that yields complete sentences."""
    
    SENTENCE_ENDINGS = re.compile(r'([.!?])\s+')
    
    def __init__(self):
        self._partial = ""
    
    def add(self, text: str) -> list[str]:
        """Add text and return complete sentences."""
        
        self._partial += text
        
        sentences = []
        
        while True:
            match = self.SENTENCE_ENDINGS.search(self._partial)
            if not match:
                break
            
            end_pos = match.end()
            sentences.append(self._partial[:end_pos].strip())
            self._partial = self._partial[end_pos:]
        
        return sentences
    
    def flush(self) -> str:
        """Get remaining partial content."""
        result = self._partial.strip()
        self._partial = ""
        return result

class LineBuffer:
    """Buffer that yields complete lines."""
    
    def __init__(self):
        self._partial = ""
    
    def add(self, text: str) -> list[str]:
        """Add text and return complete lines."""
        
        self._partial += text
        
        lines = self._partial.split('\n')
        
        if self._partial.endswith('\n'):
            self._partial = ""
            return lines[:-1]  # Exclude empty string after final newline
        else:
            self._partial = lines[-1]
            return lines[:-1]
    
    def flush(self) -> str:
        """Get remaining partial content."""
        result = self._partial
        self._partial = ""
        return result

class BufferedStreamProcessor:
    """Process stream with buffering."""
    
    def __init__(
        self,
        buffer_type: str = "word"  # "word", "sentence", "line"
    ):
        if buffer_type == "word":
            self.buffer = WordBuffer()
        elif buffer_type == "sentence":
            self.buffer = SentenceBuffer()
        elif buffer_type == "line":
            self.buffer = LineBuffer()
        else:
            raise ValueError(f"Unknown buffer type: {buffer_type}")
    
    async def process_stream(
        self,
        stream: AsyncIterator[StreamChunk],
        on_item: Callable[[str], None]
    ) -> str:
        """Process stream with buffering."""
        
        full_content = []
        
        async for chunk in stream:
            full_content.append(chunk.content)
            
            items = self.buffer.add(chunk.content)
            for item in items:
                on_item(item)
        
        # Flush remaining
        remaining = self.buffer.flush()
        if remaining:
            on_item(remaining)
        
        return "".join(full_content)

Stream Processing

from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator, Callable
import asyncio
import json
import re

class StreamParser:
    """Parse structured content from streams."""
    
    def __init__(self):
        self._buffer = ""
    
    def add(self, text: str) -> list[Any]:
        """Add text and return parsed items."""
        raise NotImplementedError

class JSONStreamParser(StreamParser):
    """Parse JSON objects from stream."""
    
    def __init__(self):
        super().__init__()
        self._brace_count = 0
        self._in_string = False
        self._escape_next = False
    
    def add(self, text: str) -> list[dict]:
        """Add text and return complete JSON objects."""
        
        objects = []
        
        for char in text:
            self._buffer += char
            
            if self._escape_next:
                self._escape_next = False
                continue
            
            if char == '\\':
                self._escape_next = True
                continue
            
            if char == '"':
                self._in_string = not self._in_string
                continue
            
            if self._in_string:
                continue
            
            if char == '{':
                self._brace_count += 1
            elif char == '}':
                self._brace_count -= 1
                
                if self._brace_count == 0:
                    # Complete object
                    try:
                        obj = json.loads(self._buffer.strip())
                        objects.append(obj)
                    except json.JSONDecodeError:
                        pass
                    
                    self._buffer = ""
        
        return objects

class CodeBlockParser(StreamParser):
    """Parse code blocks from stream."""
    
    CODE_BLOCK_START = re.compile(r'```(\w*)\n?')
    CODE_BLOCK_END = '```'
    
    def __init__(self):
        super().__init__()
        self._in_code_block = False
        self._language = None
        self._code_buffer = ""
    
    def add(self, text: str) -> list[dict]:
        """Add text and return complete code blocks."""
        
        self._buffer += text
        blocks = []
        
        while True:
            if not self._in_code_block:
                # Look for start
                match = self.CODE_BLOCK_START.search(self._buffer)
                if match:
                    self._in_code_block = True
                    self._language = match.group(1) or None
                    self._buffer = self._buffer[match.end():]
                    self._code_buffer = ""
                else:
                    break
            else:
                # Look for end
                end_pos = self._buffer.find(self.CODE_BLOCK_END)
                if end_pos >= 0:
                    self._code_buffer += self._buffer[:end_pos]
                    blocks.append({
                        "language": self._language,
                        "code": self._code_buffer.strip()
                    })
                    self._buffer = self._buffer[end_pos + 3:]
                    self._in_code_block = False
                else:
                    self._code_buffer += self._buffer
                    self._buffer = ""
                    break
        
        return blocks

class StreamTransformer:
    """Transform stream content in real-time."""
    
    def __init__(self, transform: Callable[[str], str]):
        self.transform = transform
    
    async def transform_stream(
        self,
        stream: AsyncIterator[StreamChunk]
    ) -> AsyncIterator[StreamChunk]:
        """Transform each chunk."""
        
        async for chunk in stream:
            transformed = self.transform(chunk.content)
            yield StreamChunk(
                content=transformed,
                finish_reason=chunk.finish_reason,
                index=chunk.index,
                model=chunk.model
            )

class StreamAggregator:
    """Aggregate multiple streams."""
    
    async def merge_streams(
        self,
        streams: list[AsyncIterator[StreamChunk]]
    ) -> AsyncIterator[tuple[int, StreamChunk]]:
        """Merge multiple streams, yielding (stream_index, chunk)."""
        
        async def read_stream(index: int, stream: AsyncIterator[StreamChunk]):
            async for chunk in stream:
                yield (index, chunk)
        
        # Create tasks for all streams
        iterators = [read_stream(i, s) for i, s in enumerate(streams)]
        
        # Use asyncio to merge
        pending = set()
        for it in iterators:
            task = asyncio.create_task(it.__anext__())
            pending.add((task, it))
        
        while pending:
            done, _ = await asyncio.wait(
                [t for t, _ in pending],
                return_when=asyncio.FIRST_COMPLETED
            )
            
            for task in done:
                # Find the iterator for this task
                it = next(i for t, i in pending if t == task)
                pending = {(t, i) for t, i in pending if t != task}
                
                try:
                    result = task.result()
                    yield result
                    
                    # Schedule next read
                    new_task = asyncio.create_task(it.__anext__())
                    pending.add((new_task, it))
                    
                except StopAsyncIteration:
                    pass

Server-Sent Events

from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
import asyncio
import json

@dataclass
class SSEEvent:
    """Server-Sent Event."""
    
    data: str
    event: str = None
    id: str = None
    retry: int = None
    
    def encode(self) -> str:
        """Encode as SSE format."""
        
        lines = []
        
        if self.event:
            lines.append(f"event: {self.event}")
        
        if self.id:
            lines.append(f"id: {self.id}")
        
        if self.retry:
            lines.append(f"retry: {self.retry}")
        
        # Data can be multiline
        for line in self.data.split('\n'):
            lines.append(f"data: {line}")
        
        return '\n'.join(lines) + '\n\n'

class SSEStreamAdapter:
    """Adapt LLM stream to SSE format."""
    
    def __init__(self, streaming_client: StreamingClient):
        self.client = streaming_client
    
    async def stream_sse(
        self,
        messages: list[dict],
        **kwargs
    ) -> AsyncIterator[SSEEvent]:
        """Stream as SSE events."""
        
        event_id = 0
        
        async for chunk in self.client.stream_iter(messages, **kwargs):
            yield SSEEvent(
                data=json.dumps({
                    "content": chunk.content,
                    "finish_reason": chunk.finish_reason
                }),
                event="chunk",
                id=str(event_id)
            )
            event_id += 1
        
        # Send done event
        yield SSEEvent(
            data=json.dumps({"done": True}),
            event="done",
            id=str(event_id)
        )

# FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/v1/stream")
async def stream_completion(messages: list[dict]):
    """Stream completion as SSE."""
    
    adapter = SSEStreamAdapter(streaming_client)
    
    async def generate():
        async for event in adapter.stream_sse(messages):
            yield event.encode()
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

class SSEClient:
    """Client for consuming SSE streams."""
    
    def __init__(self, http_client: Any):
        self.http_client = http_client
    
    async def stream(self, url: str, **kwargs) -> AsyncIterator[SSEEvent]:
        """Consume SSE stream."""
        
        async with self.http_client.stream("GET", url, **kwargs) as response:
            buffer = ""
            
            async for chunk in response.aiter_text():
                buffer += chunk
                
                while '\n\n' in buffer:
                    event_str, buffer = buffer.split('\n\n', 1)
                    event = self._parse_event(event_str)
                    if event:
                        yield event
    
    def _parse_event(self, event_str: str) -> Optional[SSEEvent]:
        """Parse SSE event from string."""
        
        data_lines = []
        event = None
        event_id = None
        retry = None
        
        for line in event_str.split('\n'):
            if line.startswith('data: '):
                data_lines.append(line[6:])
            elif line.startswith('event: '):
                event = line[7:]
            elif line.startswith('id: '):
                event_id = line[4:]
            elif line.startswith('retry: '):
                retry = int(line[7:])
        
        if data_lines:
            return SSEEvent(
                data='\n'.join(data_lines),
                event=event,
                id=event_id,
                retry=retry
            )
        
        return None

WebSocket Streaming

from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
import asyncio
import json

@dataclass
class WSMessage:
    """WebSocket message."""
    
    type: str
    data: Any
    
    def to_json(self) -> str:
        return json.dumps({"type": self.type, "data": self.data})
    
    @classmethod
    def from_json(cls, text: str) -> "WSMessage":
        obj = json.loads(text)
        return cls(type=obj["type"], data=obj["data"])

class WebSocketStreamHandler:
    """Handle streaming over WebSocket."""
    
    def __init__(self, streaming_client: StreamingClient):
        self.client = streaming_client
    
    async def handle_connection(self, websocket: Any):
        """Handle WebSocket connection."""
        
        try:
            async for message in websocket:
                msg = WSMessage.from_json(message)
                
                if msg.type == "complete":
                    await self._handle_completion(websocket, msg.data)
                elif msg.type == "cancel":
                    # Handle cancellation
                    pass
                    
        except Exception as e:
            await websocket.send(WSMessage(
                type="error",
                data={"message": str(e)}
            ).to_json())
    
    async def _handle_completion(self, websocket: Any, data: dict):
        """Handle completion request."""
        
        messages = data.get("messages", [])
        
        # Send start event
        await websocket.send(WSMessage(
            type="start",
            data={}
        ).to_json())
        
        # Stream chunks
        async for chunk in self.client.stream_iter(messages):
            await websocket.send(WSMessage(
                type="chunk",
                data={
                    "content": chunk.content,
                    "index": chunk.index
                }
            ).to_json())
        
        # Send done event
        await websocket.send(WSMessage(
            type="done",
            data={}
        ).to_json())

# FastAPI WebSocket endpoint
from fastapi import WebSocket

@app.websocket("/v1/ws/stream")
async def websocket_stream(websocket: WebSocket):
    """WebSocket streaming endpoint."""
    
    await websocket.accept()
    
    handler = WebSocketStreamHandler(streaming_client)
    await handler.handle_connection(websocket)

class WebSocketStreamClient:
    """Client for WebSocket streaming."""
    
    def __init__(self, url: str):
        self.url = url
        self.websocket = None
    
    async def connect(self):
        """Connect to WebSocket."""
        import websockets
        self.websocket = await websockets.connect(self.url)
    
    async def disconnect(self):
        """Disconnect from WebSocket."""
        if self.websocket:
            await self.websocket.close()
    
    async def stream_completion(
        self,
        messages: list[dict]
    ) -> AsyncIterator[str]:
        """Stream completion over WebSocket."""
        
        # Send request
        await self.websocket.send(WSMessage(
            type="complete",
            data={"messages": messages}
        ).to_json())
        
        # Receive chunks
        async for message in self.websocket:
            msg = WSMessage.from_json(message)
            
            if msg.type == "chunk":
                yield msg.data["content"]
            elif msg.type == "done":
                break
            elif msg.type == "error":
                raise Exception(msg.data["message"])

Production Streaming Service

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import asyncio

app = FastAPI()

# Initialize components
streaming_client = None  # Initialize with client

class StreamRequest(BaseModel):
    messages: list[dict]
    model: str = "gpt-4o-mini"
    buffer_type: Optional[str] = None  # "word", "sentence", "line"

@app.post("/v1/stream/raw")
async def stream_raw(request: StreamRequest):
    """Stream raw chunks."""
    
    async def generate():
        async for chunk in streaming_client.stream_iter(
            request.messages,
            model=request.model
        ):
            yield f"data: {json.dumps({'content': chunk.content})}\n\n"
        
        yield f"data: {json.dumps({'done': True})}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

@app.post("/v1/stream/buffered")
async def stream_buffered(request: StreamRequest):
    """Stream with buffering."""
    
    buffer_type = request.buffer_type or "word"
    processor = BufferedStreamProcessor(buffer_type)
    
    async def generate():
        items_queue = asyncio.Queue()
        
        async def collect_items():
            stream = streaming_client.stream_iter(
                request.messages,
                model=request.model
            )
            
            await processor.process_stream(
                stream,
                lambda item: items_queue.put_nowait(item)
            )
            await items_queue.put(None)  # Signal done
        
        # Start collection task
        task = asyncio.create_task(collect_items())
        
        # Yield items as they arrive
        while True:
            item = await items_queue.get()
            if item is None:
                break
            yield f"data: {json.dumps({'item': item})}\n\n"
        
        await task
        yield f"data: {json.dumps({'done': True})}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

@app.post("/v1/stream/json")
async def stream_json_objects(request: StreamRequest):
    """Stream parsed JSON objects."""
    
    parser = JSONStreamParser()
    
    async def generate():
        async for chunk in streaming_client.stream_iter(
            request.messages,
            model=request.model
        ):
            objects = parser.add(chunk.content)
            for obj in objects:
                yield f"data: {json.dumps({'object': obj})}\n\n"
        
        yield f"data: {json.dumps({'done': True})}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Streaming transforms the user experience of LLM applications from waiting to watching. Start with basic streaming that yields tokens as they arrive—even this simple change dramatically improves perceived responsiveness. Use buffering when you need to process complete units like words, sentences, or lines rather than raw tokens. Implement parsers for structured content like JSON or code blocks that need to be complete before processing. Choose your transport wisely: Server-Sent Events work well for simple one-way streaming, WebSockets enable bidirectional communication for interactive applications. Handle errors gracefully—streams can fail mid-response, and your application needs to recover without losing context. The key insight is that streaming isn’t just about speed—it’s about feedback. Users who see progress feel engaged; users who stare at spinners feel frustrated. Build streaming into your LLM applications from the start, and your users will thank you.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.