Categories

Archives

A sample text widget

Etiam pulvinar consectetur dolor sed malesuada. Ut convallis euismod dolor nec pretium. Nunc ut tristique massa.

Nam sodales mi vitae dolor ullamcorper et vulputate enim accumsan. Morbi orci magna, tincidunt vitae molestie nec, molestie at mi. Nulla nulla lorem, suscipit in posuere in, interdum non magna.

Multi-turn Conversation Design: Building Natural Dialogue Systems with LLMs

Introduction: Multi-turn conversations are where LLM applications become truly useful. Users don’t just ask single questions—they refine, follow up, reference previous context, and expect the assistant to remember what was discussed. Building effective multi-turn systems requires careful attention to context management, history compression, turn-taking logic, and graceful handling of topic changes. This guide covers practical patterns for designing conversations that feel natural: managing conversation state, compressing long histories, handling context switches, and building conversation flows that guide users toward their goals.

Conversation Design
Conversation Management: Context, History, Response Generation

Conversation State

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from enum import Enum
import uuid

class MessageRole(Enum):
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"
    TOOL = "tool"

@dataclass
class Message:
    """A single message in a conversation."""
    
    role: MessageRole
    content: str
    timestamp: datetime = field(default_factory=datetime.utcnow)
    metadata: dict = field(default_factory=dict)
    
    def to_dict(self) -> dict:
        """Convert to API format."""
        return {"role": self.role.value, "content": self.content}

@dataclass
class ConversationState:
    """State of a conversation."""
    
    conversation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    messages: list[Message] = field(default_factory=list)
    system_prompt: str = ""
    context: dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
    
    def add_message(self, role: MessageRole, content: str, metadata: dict = None):
        """Add a message to the conversation."""
        
        message = Message(
            role=role,
            content=content,
            metadata=metadata or {}
        )
        self.messages.append(message)
        self.updated_at = datetime.utcnow()
    
    def add_user_message(self, content: str):
        """Add a user message."""
        self.add_message(MessageRole.USER, content)
    
    def add_assistant_message(self, content: str):
        """Add an assistant message."""
        self.add_message(MessageRole.ASSISTANT, content)
    
    def get_messages_for_api(self) -> list[dict]:
        """Get messages in API format."""
        
        messages = []
        
        if self.system_prompt:
            messages.append({"role": "system", "content": self.system_prompt})
        
        for msg in self.messages:
            messages.append(msg.to_dict())
        
        return messages
    
    def get_last_n_turns(self, n: int) -> list[Message]:
        """Get last n conversation turns (user + assistant pairs)."""
        
        turns = []
        count = 0
        
        for msg in reversed(self.messages):
            turns.insert(0, msg)
            if msg.role == MessageRole.USER:
                count += 1
                if count >= n:
                    break
        
        return turns
    
    @property
    def turn_count(self) -> int:
        """Count conversation turns."""
        return sum(1 for m in self.messages if m.role == MessageRole.USER)

class ConversationManager:
    """Manage multiple conversations."""
    
    def __init__(self):
        self.conversations: dict[str, ConversationState] = {}
    
    def create(self, system_prompt: str = "") -> ConversationState:
        """Create a new conversation."""
        
        conversation = ConversationState(system_prompt=system_prompt)
        self.conversations[conversation.conversation_id] = conversation
        return conversation
    
    def get(self, conversation_id: str) -> Optional[ConversationState]:
        """Get a conversation by ID."""
        return self.conversations.get(conversation_id)
    
    def delete(self, conversation_id: str):
        """Delete a conversation."""
        if conversation_id in self.conversations:
            del self.conversations[conversation_id]
    
    def list_active(self, max_age_hours: int = 24) -> list[ConversationState]:
        """List recently active conversations."""
        
        cutoff = datetime.utcnow() - timedelta(hours=max_age_hours)
        
        return [
            conv for conv in self.conversations.values()
            if conv.updated_at > cutoff
        ]

History Compression

from dataclasses import dataclass
from typing import Any

@dataclass
class CompressedHistory:
    """Compressed conversation history."""
    
    summary: str
    key_facts: list[str]
    recent_messages: list[Message]
    original_turn_count: int

class HistoryCompressor:
    """Compress conversation history to fit context windows."""
    
    def __init__(
        self,
        client: Any,
        model: str = "gpt-4o-mini",
        max_recent_turns: int = 5,
        max_summary_tokens: int = 500
    ):
        self.client = client
        self.model = model
        self.max_recent_turns = max_recent_turns
        self.max_summary_tokens = max_summary_tokens
    
    async def compress(self, conversation: ConversationState) -> CompressedHistory:
        """Compress conversation history."""
        
        if conversation.turn_count <= self.max_recent_turns:
            # No compression needed
            return CompressedHistory(
                summary="",
                key_facts=[],
                recent_messages=conversation.messages.copy(),
                original_turn_count=conversation.turn_count
            )
        
        # Split into old and recent
        recent = conversation.get_last_n_turns(self.max_recent_turns)
        old_messages = conversation.messages[:-len(recent)]
        
        # Summarize old messages
        summary, key_facts = await self._summarize(old_messages)
        
        return CompressedHistory(
            summary=summary,
            key_facts=key_facts,
            recent_messages=recent,
            original_turn_count=conversation.turn_count
        )
    
    async def _summarize(self, messages: list[Message]) -> tuple[str, list[str]]:
        """Summarize messages and extract key facts."""
        
        conversation_text = "\n".join([
            f"{m.role.value}: {m.content}" for m in messages
        ])
        
        prompt = f"""Summarize this conversation and extract key facts.

Conversation:
{conversation_text}

Provide:
1. A brief summary (2-3 sentences)
2. Key facts the assistant should remember (bullet points)

Format:
SUMMARY: 
KEY FACTS:
- 
- 
..."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=self.max_summary_tokens
        )
        
        content = response.choices[0].message.content
        
        # Parse response
        summary = ""
        key_facts = []
        
        if "SUMMARY:" in content:
            summary_part = content.split("SUMMARY:")[1]
            if "KEY FACTS:" in summary_part:
                summary = summary_part.split("KEY FACTS:")[0].strip()
            else:
                summary = summary_part.strip()
        
        if "KEY FACTS:" in content:
            facts_part = content.split("KEY FACTS:")[1]
            for line in facts_part.split("\n"):
                line = line.strip()
                if line.startswith("-"):
                    key_facts.append(line[1:].strip())
        
        return summary, key_facts
    
    def build_context_prompt(self, compressed: CompressedHistory) -> str:
        """Build context prompt from compressed history."""
        
        parts = []
        
        if compressed.summary:
            parts.append(f"Previous conversation summary: {compressed.summary}")
        
        if compressed.key_facts:
            facts_text = "\n".join(f"- {f}" for f in compressed.key_facts)
            parts.append(f"Key facts from conversation:\n{facts_text}")
        
        return "\n\n".join(parts)

class SlidingWindowCompressor:
    """Simple sliding window compression."""
    
    def __init__(self, max_messages: int = 20):
        self.max_messages = max_messages
    
    def compress(self, conversation: ConversationState) -> list[Message]:
        """Keep only recent messages."""
        
        if len(conversation.messages) <= self.max_messages:
            return conversation.messages.copy()
        
        return conversation.messages[-self.max_messages:]

class TokenBudgetCompressor:
    """Compress to fit token budget."""
    
    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens
    
    def compress(self, conversation: ConversationState) -> list[Message]:
        """Keep messages within token budget."""
        
        # Estimate tokens (4 chars per token)
        def estimate_tokens(msg: Message) -> int:
            return len(msg.content) // 4
        
        result = []
        total_tokens = 0
        
        # Work backwards, keeping recent messages
        for msg in reversed(conversation.messages):
            msg_tokens = estimate_tokens(msg)
            
            if total_tokens + msg_tokens > self.max_tokens:
                break
            
            result.insert(0, msg)
            total_tokens += msg_tokens
        
        return result

Context Switching

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class TopicChangeType(Enum):
    CONTINUATION = "continuation"
    REFINEMENT = "refinement"
    NEW_TOPIC = "new_topic"
    RETURN_TO_PREVIOUS = "return_to_previous"

@dataclass
class TopicAnalysis:
    """Analysis of topic change."""
    
    change_type: TopicChangeType
    current_topic: str
    previous_topic: Optional[str]
    confidence: float

class TopicTracker:
    """Track conversation topics."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
        self.topic_history: list[str] = []
    
    async def analyze_message(
        self,
        message: str,
        conversation: ConversationState
    ) -> TopicAnalysis:
        """Analyze if message changes topic."""
        
        if not conversation.messages:
            # First message
            topic = await self._extract_topic(message)
            self.topic_history.append(topic)
            
            return TopicAnalysis(
                change_type=TopicChangeType.NEW_TOPIC,
                current_topic=topic,
                previous_topic=None,
                confidence=1.0
            )
        
        # Get recent context
        recent = conversation.get_last_n_turns(3)
        context = "\n".join([f"{m.role.value}: {m.content}" for m in recent])
        
        prompt = f"""Analyze if this new message changes the conversation topic.

Recent conversation:
{context}

New message: {message}

Previous topics discussed: {', '.join(self.topic_history[-5:]) if self.topic_history else 'None'}

Classify the topic change:
- CONTINUATION: Same topic, continuing discussion
- REFINEMENT: Same topic, but narrowing/clarifying
- NEW_TOPIC: Completely different topic
- RETURN_TO_PREVIOUS: Returning to an earlier topic

Respond with:
CHANGE_TYPE: 
CURRENT_TOPIC: 
CONFIDENCE: <0.0-1.0>"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        content = response.choices[0].message.content
        
        # Parse response
        change_type = TopicChangeType.CONTINUATION
        current_topic = ""
        confidence = 0.8
        
        for line in content.split("\n"):
            if "CHANGE_TYPE:" in line:
                type_str = line.split("CHANGE_TYPE:")[1].strip().upper()
                try:
                    change_type = TopicChangeType[type_str]
                except KeyError:
                    pass
            elif "CURRENT_TOPIC:" in line:
                current_topic = line.split("CURRENT_TOPIC:")[1].strip()
            elif "CONFIDENCE:" in line:
                try:
                    confidence = float(line.split("CONFIDENCE:")[1].strip())
                except ValueError:
                    pass
        
        # Update topic history
        if change_type in [TopicChangeType.NEW_TOPIC, TopicChangeType.REFINEMENT]:
            self.topic_history.append(current_topic)
        
        previous_topic = self.topic_history[-2] if len(self.topic_history) > 1 else None
        
        return TopicAnalysis(
            change_type=change_type,
            current_topic=current_topic,
            previous_topic=previous_topic,
            confidence=confidence
        )
    
    async def _extract_topic(self, message: str) -> str:
        """Extract topic from a message."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{
                "role": "user",
                "content": f"What is the main topic of this message in 3-5 words?\n\n{message}"
            }],
            max_tokens=20
        )
        
        return response.choices[0].message.content.strip()

class ContextSwitchHandler:
    """Handle context switches gracefully."""
    
    def __init__(self, topic_tracker: TopicTracker):
        self.topic_tracker = topic_tracker
        self.saved_contexts: dict[str, dict] = {}
    
    async def handle_switch(
        self,
        message: str,
        conversation: ConversationState
    ) -> dict:
        """Handle potential context switch."""
        
        analysis = await self.topic_tracker.analyze_message(message, conversation)
        
        result = {
            "analysis": analysis,
            "action": "continue",
            "context_update": None
        }
        
        if analysis.change_type == TopicChangeType.NEW_TOPIC:
            # Save current context
            if analysis.previous_topic:
                self.saved_contexts[analysis.previous_topic] = {
                    "messages": conversation.messages.copy(),
                    "context": conversation.context.copy()
                }
            
            result["action"] = "new_topic"
            result["context_update"] = {
                "topic": analysis.current_topic,
                "cleared_history": True
            }
        
        elif analysis.change_type == TopicChangeType.RETURN_TO_PREVIOUS:
            # Restore previous context if available
            if analysis.current_topic in self.saved_contexts:
                saved = self.saved_contexts[analysis.current_topic]
                result["action"] = "restore_context"
                result["context_update"] = {
                    "topic": analysis.current_topic,
                    "restored_messages": len(saved["messages"])
                }
        
        return result

Conversation Flows

from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from enum import Enum

class FlowState(Enum):
    ACTIVE = "active"
    COMPLETED = "completed"
    ABANDONED = "abandoned"

@dataclass
class FlowStep:
    """A step in a conversation flow."""
    
    name: str
    prompt: str
    validator: Optional[Callable[[str], bool]] = None
    extractor: Optional[Callable[[str], Any]] = None
    next_step: Optional[str] = None
    on_invalid: str = "I didn't understand that. Could you try again?"

@dataclass
class ConversationFlow:
    """A guided conversation flow."""
    
    name: str
    steps: dict[str, FlowStep]
    start_step: str
    current_step: str = ""
    collected_data: dict = field(default_factory=dict)
    state: FlowState = FlowState.ACTIVE
    
    def __post_init__(self):
        self.current_step = self.start_step
    
    def get_current_prompt(self) -> str:
        """Get prompt for current step."""
        step = self.steps.get(self.current_step)
        return step.prompt if step else ""
    
    def process_response(self, response: str) -> tuple[bool, str]:
        """Process user response and advance flow."""
        
        step = self.steps.get(self.current_step)
        if not step:
            return False, "Flow error: unknown step"
        
        # Validate response
        if step.validator and not step.validator(response):
            return False, step.on_invalid
        
        # Extract data
        if step.extractor:
            self.collected_data[step.name] = step.extractor(response)
        else:
            self.collected_data[step.name] = response
        
        # Move to next step
        if step.next_step:
            self.current_step = step.next_step
            next_prompt = self.get_current_prompt()
            return True, next_prompt
        else:
            self.state = FlowState.COMPLETED
            return True, "Flow completed"
    
    def is_complete(self) -> bool:
        return self.state == FlowState.COMPLETED

class FlowBuilder:
    """Build conversation flows."""
    
    def __init__(self, name: str):
        self.name = name
        self.steps: dict[str, FlowStep] = {}
        self.start_step: str = ""
    
    def add_step(
        self,
        name: str,
        prompt: str,
        validator: Callable[[str], bool] = None,
        extractor: Callable[[str], Any] = None,
        next_step: str = None,
        on_invalid: str = None
    ) -> 'FlowBuilder':
        """Add a step to the flow."""
        
        step = FlowStep(
            name=name,
            prompt=prompt,
            validator=validator,
            extractor=extractor,
            next_step=next_step,
            on_invalid=on_invalid or "Please try again."
        )
        
        self.steps[name] = step
        
        if not self.start_step:
            self.start_step = name
        
        return self
    
    def build(self) -> ConversationFlow:
        """Build the flow."""
        return ConversationFlow(
            name=self.name,
            steps=self.steps,
            start_step=self.start_step
        )

# Example: Order flow
def create_order_flow() -> ConversationFlow:
    """Create an order-taking flow."""
    
    def validate_email(s: str) -> bool:
        return "@" in s and "." in s
    
    def validate_quantity(s: str) -> bool:
        try:
            return int(s) > 0
        except ValueError:
            return False
    
    return (
        FlowBuilder("order")
        .add_step(
            name="greeting",
            prompt="Welcome! What product would you like to order?",
            next_step="quantity"
        )
        .add_step(
            name="quantity",
            prompt="How many would you like?",
            validator=validate_quantity,
            extractor=lambda s: int(s),
            next_step="email",
            on_invalid="Please enter a valid number."
        )
        .add_step(
            name="email",
            prompt="What's your email address for the confirmation?",
            validator=validate_email,
            next_step="confirm",
            on_invalid="Please enter a valid email address."
        )
        .add_step(
            name="confirm",
            prompt="Great! I'll send the confirmation to your email. Is there anything else?"
        )
        .build()
    )

class FlowManager:
    """Manage conversation flows."""
    
    def __init__(self):
        self.active_flows: dict[str, ConversationFlow] = {}
        self.flow_templates: dict[str, Callable[[], ConversationFlow]] = {}
    
    def register_flow(self, name: str, factory: Callable[[], ConversationFlow]):
        """Register a flow template."""
        self.flow_templates[name] = factory
    
    def start_flow(self, conversation_id: str, flow_name: str) -> Optional[str]:
        """Start a flow for a conversation."""
        
        if flow_name not in self.flow_templates:
            return None
        
        flow = self.flow_templates[flow_name]()
        self.active_flows[conversation_id] = flow
        
        return flow.get_current_prompt()
    
    def process_message(
        self,
        conversation_id: str,
        message: str
    ) -> tuple[bool, str, Optional[dict]]:
        """Process message in active flow."""
        
        flow = self.active_flows.get(conversation_id)
        if not flow:
            return False, "", None
        
        success, response = flow.process_response(message)
        
        if flow.is_complete():
            data = flow.collected_data.copy()
            del self.active_flows[conversation_id]
            return True, response, data
        
        return success, response, None

Production Conversation Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any

app = FastAPI()

# Initialize components
conversation_manager = ConversationManager()
history_compressor = None  # Initialize with actual client
flow_manager = FlowManager()

# Register flows
flow_manager.register_flow("order", create_order_flow)

llm_client = None  # Initialize with actual client

class CreateConversationRequest(BaseModel):
    system_prompt: str = ""

class SendMessageRequest(BaseModel):
    message: str

class StartFlowRequest(BaseModel):
    flow_name: str

@app.post("/v1/conversations")
async def create_conversation(request: CreateConversationRequest):
    """Create a new conversation."""
    
    conversation = conversation_manager.create(request.system_prompt)
    
    return {
        "conversation_id": conversation.conversation_id,
        "created_at": conversation.created_at.isoformat()
    }

@app.post("/v1/conversations/{conversation_id}/messages")
async def send_message(conversation_id: str, request: SendMessageRequest):
    """Send a message in a conversation."""
    
    conversation = conversation_manager.get(conversation_id)
    if not conversation:
        raise HTTPException(404, "Conversation not found")
    
    # Check for active flow
    in_flow, flow_response, flow_data = flow_manager.process_message(
        conversation_id, request.message
    )
    
    if in_flow:
        conversation.add_user_message(request.message)
        conversation.add_assistant_message(flow_response)
        
        return {
            "response": flow_response,
            "in_flow": True,
            "flow_completed": flow_data is not None,
            "flow_data": flow_data
        }
    
    # Regular conversation
    conversation.add_user_message(request.message)
    
    # Compress history if needed
    if conversation.turn_count > 10 and history_compressor:
        compressed = await history_compressor.compress(conversation)
        context_prompt = history_compressor.build_context_prompt(compressed)
        
        messages = [{"role": "system", "content": conversation.system_prompt}]
        if context_prompt:
            messages.append({"role": "system", "content": context_prompt})
        messages.extend([m.to_dict() for m in compressed.recent_messages])
    else:
        messages = conversation.get_messages_for_api()
    
    # Generate response
    response = await llm_client.chat.completions.create(
        model="gpt-4o",
        messages=messages
    )
    
    assistant_message = response.choices[0].message.content
    conversation.add_assistant_message(assistant_message)
    
    return {
        "response": assistant_message,
        "turn_count": conversation.turn_count,
        "in_flow": False
    }

@app.post("/v1/conversations/{conversation_id}/flows")
async def start_flow(conversation_id: str, request: StartFlowRequest):
    """Start a guided flow."""
    
    conversation = conversation_manager.get(conversation_id)
    if not conversation:
        raise HTTPException(404, "Conversation not found")
    
    prompt = flow_manager.start_flow(conversation_id, request.flow_name)
    
    if not prompt:
        raise HTTPException(400, f"Unknown flow: {request.flow_name}")
    
    conversation.add_assistant_message(prompt)
    
    return {
        "flow_started": request.flow_name,
        "prompt": prompt
    }

@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
    """Get conversation details."""
    
    conversation = conversation_manager.get(conversation_id)
    if not conversation:
        raise HTTPException(404, "Conversation not found")
    
    return {
        "conversation_id": conversation.conversation_id,
        "turn_count": conversation.turn_count,
        "created_at": conversation.created_at.isoformat(),
        "updated_at": conversation.updated_at.isoformat(),
        "messages": [
            {
                "role": m.role.value,
                "content": m.content,
                "timestamp": m.timestamp.isoformat()
            }
            for m in conversation.messages
        ]
    }

@app.get("/v1/conversations/{conversation_id}/history")
async def get_compressed_history(conversation_id: str):
    """Get compressed conversation history."""
    
    conversation = conversation_manager.get(conversation_id)
    if not conversation:
        raise HTTPException(404, "Conversation not found")
    
    if history_compressor:
        compressed = await history_compressor.compress(conversation)
        
        return {
            "summary": compressed.summary,
            "key_facts": compressed.key_facts,
            "recent_turn_count": len([m for m in compressed.recent_messages if m.role == MessageRole.USER]),
            "original_turn_count": compressed.original_turn_count
        }
    
    return {
        "summary": "",
        "key_facts": [],
        "recent_turn_count": conversation.turn_count,
        "original_turn_count": conversation.turn_count
    }

@app.delete("/v1/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
    """Delete a conversation."""
    
    conversation_manager.delete(conversation_id)
    return {"status": "deleted"}

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Effective multi-turn conversations require thoughtful state management. Track conversation state explicitly—don’t rely on the LLM to remember everything. Compress history intelligently when conversations grow long, preserving key facts and recent context while summarizing older exchanges. Handle topic changes gracefully—detect when users switch topics and decide whether to save context for potential return. Guided flows work well for structured tasks like order-taking or troubleshooting, where you need to collect specific information in sequence. Build your conversation system to be stateless at the API level but stateful in storage, enabling horizontal scaling. Monitor conversation metrics like turn count, topic changes, and flow completion rates to understand how users interact with your system. The goal is conversations that feel natural while reliably accomplishing user goals.