Introduction: Multi-turn conversations are where LLM applications become truly useful. Users don’t just ask single questions—they refine, follow up, reference previous context, and expect the assistant to remember what was discussed. Building effective multi-turn systems requires careful attention to context management, history compression, turn-taking logic, and graceful handling of topic changes. This guide covers practical patterns for designing conversations that feel natural: managing conversation state, compressing long histories, handling context switches, and building conversation flows that guide users toward their goals.

Conversation State
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from enum import Enum
import uuid
class MessageRole(Enum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
TOOL = "tool"
@dataclass
class Message:
"""A single message in a conversation."""
role: MessageRole
content: str
timestamp: datetime = field(default_factory=datetime.utcnow)
metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
"""Convert to API format."""
return {"role": self.role.value, "content": self.content}
@dataclass
class ConversationState:
"""State of a conversation."""
conversation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
messages: list[Message] = field(default_factory=list)
system_prompt: str = ""
context: dict = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
def add_message(self, role: MessageRole, content: str, metadata: dict = None):
"""Add a message to the conversation."""
message = Message(
role=role,
content=content,
metadata=metadata or {}
)
self.messages.append(message)
self.updated_at = datetime.utcnow()
def add_user_message(self, content: str):
"""Add a user message."""
self.add_message(MessageRole.USER, content)
def add_assistant_message(self, content: str):
"""Add an assistant message."""
self.add_message(MessageRole.ASSISTANT, content)
def get_messages_for_api(self) -> list[dict]:
"""Get messages in API format."""
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
for msg in self.messages:
messages.append(msg.to_dict())
return messages
def get_last_n_turns(self, n: int) -> list[Message]:
"""Get last n conversation turns (user + assistant pairs)."""
turns = []
count = 0
for msg in reversed(self.messages):
turns.insert(0, msg)
if msg.role == MessageRole.USER:
count += 1
if count >= n:
break
return turns
@property
def turn_count(self) -> int:
"""Count conversation turns."""
return sum(1 for m in self.messages if m.role == MessageRole.USER)
class ConversationManager:
"""Manage multiple conversations."""
def __init__(self):
self.conversations: dict[str, ConversationState] = {}
def create(self, system_prompt: str = "") -> ConversationState:
"""Create a new conversation."""
conversation = ConversationState(system_prompt=system_prompt)
self.conversations[conversation.conversation_id] = conversation
return conversation
def get(self, conversation_id: str) -> Optional[ConversationState]:
"""Get a conversation by ID."""
return self.conversations.get(conversation_id)
def delete(self, conversation_id: str):
"""Delete a conversation."""
if conversation_id in self.conversations:
del self.conversations[conversation_id]
def list_active(self, max_age_hours: int = 24) -> list[ConversationState]:
"""List recently active conversations."""
cutoff = datetime.utcnow() - timedelta(hours=max_age_hours)
return [
conv for conv in self.conversations.values()
if conv.updated_at > cutoff
]
History Compression
from dataclasses import dataclass
from typing import Any
@dataclass
class CompressedHistory:
"""Compressed conversation history."""
summary: str
key_facts: list[str]
recent_messages: list[Message]
original_turn_count: int
class HistoryCompressor:
"""Compress conversation history to fit context windows."""
def __init__(
self,
client: Any,
model: str = "gpt-4o-mini",
max_recent_turns: int = 5,
max_summary_tokens: int = 500
):
self.client = client
self.model = model
self.max_recent_turns = max_recent_turns
self.max_summary_tokens = max_summary_tokens
async def compress(self, conversation: ConversationState) -> CompressedHistory:
"""Compress conversation history."""
if conversation.turn_count <= self.max_recent_turns:
# No compression needed
return CompressedHistory(
summary="",
key_facts=[],
recent_messages=conversation.messages.copy(),
original_turn_count=conversation.turn_count
)
# Split into old and recent
recent = conversation.get_last_n_turns(self.max_recent_turns)
old_messages = conversation.messages[:-len(recent)]
# Summarize old messages
summary, key_facts = await self._summarize(old_messages)
return CompressedHistory(
summary=summary,
key_facts=key_facts,
recent_messages=recent,
original_turn_count=conversation.turn_count
)
async def _summarize(self, messages: list[Message]) -> tuple[str, list[str]]:
"""Summarize messages and extract key facts."""
conversation_text = "\n".join([
f"{m.role.value}: {m.content}" for m in messages
])
prompt = f"""Summarize this conversation and extract key facts.
Conversation:
{conversation_text}
Provide:
1. A brief summary (2-3 sentences)
2. Key facts the assistant should remember (bullet points)
Format:
SUMMARY:
KEY FACTS:
-
-
..."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_summary_tokens
)
content = response.choices[0].message.content
# Parse response
summary = ""
key_facts = []
if "SUMMARY:" in content:
summary_part = content.split("SUMMARY:")[1]
if "KEY FACTS:" in summary_part:
summary = summary_part.split("KEY FACTS:")[0].strip()
else:
summary = summary_part.strip()
if "KEY FACTS:" in content:
facts_part = content.split("KEY FACTS:")[1]
for line in facts_part.split("\n"):
line = line.strip()
if line.startswith("-"):
key_facts.append(line[1:].strip())
return summary, key_facts
def build_context_prompt(self, compressed: CompressedHistory) -> str:
"""Build context prompt from compressed history."""
parts = []
if compressed.summary:
parts.append(f"Previous conversation summary: {compressed.summary}")
if compressed.key_facts:
facts_text = "\n".join(f"- {f}" for f in compressed.key_facts)
parts.append(f"Key facts from conversation:\n{facts_text}")
return "\n\n".join(parts)
class SlidingWindowCompressor:
"""Simple sliding window compression."""
def __init__(self, max_messages: int = 20):
self.max_messages = max_messages
def compress(self, conversation: ConversationState) -> list[Message]:
"""Keep only recent messages."""
if len(conversation.messages) <= self.max_messages:
return conversation.messages.copy()
return conversation.messages[-self.max_messages:]
class TokenBudgetCompressor:
"""Compress to fit token budget."""
def __init__(self, max_tokens: int = 4000):
self.max_tokens = max_tokens
def compress(self, conversation: ConversationState) -> list[Message]:
"""Keep messages within token budget."""
# Estimate tokens (4 chars per token)
def estimate_tokens(msg: Message) -> int:
return len(msg.content) // 4
result = []
total_tokens = 0
# Work backwards, keeping recent messages
for msg in reversed(conversation.messages):
msg_tokens = estimate_tokens(msg)
if total_tokens + msg_tokens > self.max_tokens:
break
result.insert(0, msg)
total_tokens += msg_tokens
return result
Context Switching
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class TopicChangeType(Enum):
CONTINUATION = "continuation"
REFINEMENT = "refinement"
NEW_TOPIC = "new_topic"
RETURN_TO_PREVIOUS = "return_to_previous"
@dataclass
class TopicAnalysis:
"""Analysis of topic change."""
change_type: TopicChangeType
current_topic: str
previous_topic: Optional[str]
confidence: float
class TopicTracker:
"""Track conversation topics."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
self.topic_history: list[str] = []
async def analyze_message(
self,
message: str,
conversation: ConversationState
) -> TopicAnalysis:
"""Analyze if message changes topic."""
if not conversation.messages:
# First message
topic = await self._extract_topic(message)
self.topic_history.append(topic)
return TopicAnalysis(
change_type=TopicChangeType.NEW_TOPIC,
current_topic=topic,
previous_topic=None,
confidence=1.0
)
# Get recent context
recent = conversation.get_last_n_turns(3)
context = "\n".join([f"{m.role.value}: {m.content}" for m in recent])
prompt = f"""Analyze if this new message changes the conversation topic.
Recent conversation:
{context}
New message: {message}
Previous topics discussed: {', '.join(self.topic_history[-5:]) if self.topic_history else 'None'}
Classify the topic change:
- CONTINUATION: Same topic, continuing discussion
- REFINEMENT: Same topic, but narrowing/clarifying
- NEW_TOPIC: Completely different topic
- RETURN_TO_PREVIOUS: Returning to an earlier topic
Respond with:
CHANGE_TYPE:
CURRENT_TOPIC:
CONFIDENCE: <0.0-1.0>"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}]
)
content = response.choices[0].message.content
# Parse response
change_type = TopicChangeType.CONTINUATION
current_topic = ""
confidence = 0.8
for line in content.split("\n"):
if "CHANGE_TYPE:" in line:
type_str = line.split("CHANGE_TYPE:")[1].strip().upper()
try:
change_type = TopicChangeType[type_str]
except KeyError:
pass
elif "CURRENT_TOPIC:" in line:
current_topic = line.split("CURRENT_TOPIC:")[1].strip()
elif "CONFIDENCE:" in line:
try:
confidence = float(line.split("CONFIDENCE:")[1].strip())
except ValueError:
pass
# Update topic history
if change_type in [TopicChangeType.NEW_TOPIC, TopicChangeType.REFINEMENT]:
self.topic_history.append(current_topic)
previous_topic = self.topic_history[-2] if len(self.topic_history) > 1 else None
return TopicAnalysis(
change_type=change_type,
current_topic=current_topic,
previous_topic=previous_topic,
confidence=confidence
)
async def _extract_topic(self, message: str) -> str:
"""Extract topic from a message."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"What is the main topic of this message in 3-5 words?\n\n{message}"
}],
max_tokens=20
)
return response.choices[0].message.content.strip()
class ContextSwitchHandler:
"""Handle context switches gracefully."""
def __init__(self, topic_tracker: TopicTracker):
self.topic_tracker = topic_tracker
self.saved_contexts: dict[str, dict] = {}
async def handle_switch(
self,
message: str,
conversation: ConversationState
) -> dict:
"""Handle potential context switch."""
analysis = await self.topic_tracker.analyze_message(message, conversation)
result = {
"analysis": analysis,
"action": "continue",
"context_update": None
}
if analysis.change_type == TopicChangeType.NEW_TOPIC:
# Save current context
if analysis.previous_topic:
self.saved_contexts[analysis.previous_topic] = {
"messages": conversation.messages.copy(),
"context": conversation.context.copy()
}
result["action"] = "new_topic"
result["context_update"] = {
"topic": analysis.current_topic,
"cleared_history": True
}
elif analysis.change_type == TopicChangeType.RETURN_TO_PREVIOUS:
# Restore previous context if available
if analysis.current_topic in self.saved_contexts:
saved = self.saved_contexts[analysis.current_topic]
result["action"] = "restore_context"
result["context_update"] = {
"topic": analysis.current_topic,
"restored_messages": len(saved["messages"])
}
return result
Conversation Flows
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from enum import Enum
class FlowState(Enum):
ACTIVE = "active"
COMPLETED = "completed"
ABANDONED = "abandoned"
@dataclass
class FlowStep:
"""A step in a conversation flow."""
name: str
prompt: str
validator: Optional[Callable[[str], bool]] = None
extractor: Optional[Callable[[str], Any]] = None
next_step: Optional[str] = None
on_invalid: str = "I didn't understand that. Could you try again?"
@dataclass
class ConversationFlow:
"""A guided conversation flow."""
name: str
steps: dict[str, FlowStep]
start_step: str
current_step: str = ""
collected_data: dict = field(default_factory=dict)
state: FlowState = FlowState.ACTIVE
def __post_init__(self):
self.current_step = self.start_step
def get_current_prompt(self) -> str:
"""Get prompt for current step."""
step = self.steps.get(self.current_step)
return step.prompt if step else ""
def process_response(self, response: str) -> tuple[bool, str]:
"""Process user response and advance flow."""
step = self.steps.get(self.current_step)
if not step:
return False, "Flow error: unknown step"
# Validate response
if step.validator and not step.validator(response):
return False, step.on_invalid
# Extract data
if step.extractor:
self.collected_data[step.name] = step.extractor(response)
else:
self.collected_data[step.name] = response
# Move to next step
if step.next_step:
self.current_step = step.next_step
next_prompt = self.get_current_prompt()
return True, next_prompt
else:
self.state = FlowState.COMPLETED
return True, "Flow completed"
def is_complete(self) -> bool:
return self.state == FlowState.COMPLETED
class FlowBuilder:
"""Build conversation flows."""
def __init__(self, name: str):
self.name = name
self.steps: dict[str, FlowStep] = {}
self.start_step: str = ""
def add_step(
self,
name: str,
prompt: str,
validator: Callable[[str], bool] = None,
extractor: Callable[[str], Any] = None,
next_step: str = None,
on_invalid: str = None
) -> 'FlowBuilder':
"""Add a step to the flow."""
step = FlowStep(
name=name,
prompt=prompt,
validator=validator,
extractor=extractor,
next_step=next_step,
on_invalid=on_invalid or "Please try again."
)
self.steps[name] = step
if not self.start_step:
self.start_step = name
return self
def build(self) -> ConversationFlow:
"""Build the flow."""
return ConversationFlow(
name=self.name,
steps=self.steps,
start_step=self.start_step
)
# Example: Order flow
def create_order_flow() -> ConversationFlow:
"""Create an order-taking flow."""
def validate_email(s: str) -> bool:
return "@" in s and "." in s
def validate_quantity(s: str) -> bool:
try:
return int(s) > 0
except ValueError:
return False
return (
FlowBuilder("order")
.add_step(
name="greeting",
prompt="Welcome! What product would you like to order?",
next_step="quantity"
)
.add_step(
name="quantity",
prompt="How many would you like?",
validator=validate_quantity,
extractor=lambda s: int(s),
next_step="email",
on_invalid="Please enter a valid number."
)
.add_step(
name="email",
prompt="What's your email address for the confirmation?",
validator=validate_email,
next_step="confirm",
on_invalid="Please enter a valid email address."
)
.add_step(
name="confirm",
prompt="Great! I'll send the confirmation to your email. Is there anything else?"
)
.build()
)
class FlowManager:
"""Manage conversation flows."""
def __init__(self):
self.active_flows: dict[str, ConversationFlow] = {}
self.flow_templates: dict[str, Callable[[], ConversationFlow]] = {}
def register_flow(self, name: str, factory: Callable[[], ConversationFlow]):
"""Register a flow template."""
self.flow_templates[name] = factory
def start_flow(self, conversation_id: str, flow_name: str) -> Optional[str]:
"""Start a flow for a conversation."""
if flow_name not in self.flow_templates:
return None
flow = self.flow_templates[flow_name]()
self.active_flows[conversation_id] = flow
return flow.get_current_prompt()
def process_message(
self,
conversation_id: str,
message: str
) -> tuple[bool, str, Optional[dict]]:
"""Process message in active flow."""
flow = self.active_flows.get(conversation_id)
if not flow:
return False, "", None
success, response = flow.process_response(message)
if flow.is_complete():
data = flow.collected_data.copy()
del self.active_flows[conversation_id]
return True, response, data
return success, response, None
Production Conversation Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Any
app = FastAPI()
# Initialize components
conversation_manager = ConversationManager()
history_compressor = None # Initialize with actual client
flow_manager = FlowManager()
# Register flows
flow_manager.register_flow("order", create_order_flow)
llm_client = None # Initialize with actual client
class CreateConversationRequest(BaseModel):
system_prompt: str = ""
class SendMessageRequest(BaseModel):
message: str
class StartFlowRequest(BaseModel):
flow_name: str
@app.post("/v1/conversations")
async def create_conversation(request: CreateConversationRequest):
"""Create a new conversation."""
conversation = conversation_manager.create(request.system_prompt)
return {
"conversation_id": conversation.conversation_id,
"created_at": conversation.created_at.isoformat()
}
@app.post("/v1/conversations/{conversation_id}/messages")
async def send_message(conversation_id: str, request: SendMessageRequest):
"""Send a message in a conversation."""
conversation = conversation_manager.get(conversation_id)
if not conversation:
raise HTTPException(404, "Conversation not found")
# Check for active flow
in_flow, flow_response, flow_data = flow_manager.process_message(
conversation_id, request.message
)
if in_flow:
conversation.add_user_message(request.message)
conversation.add_assistant_message(flow_response)
return {
"response": flow_response,
"in_flow": True,
"flow_completed": flow_data is not None,
"flow_data": flow_data
}
# Regular conversation
conversation.add_user_message(request.message)
# Compress history if needed
if conversation.turn_count > 10 and history_compressor:
compressed = await history_compressor.compress(conversation)
context_prompt = history_compressor.build_context_prompt(compressed)
messages = [{"role": "system", "content": conversation.system_prompt}]
if context_prompt:
messages.append({"role": "system", "content": context_prompt})
messages.extend([m.to_dict() for m in compressed.recent_messages])
else:
messages = conversation.get_messages_for_api()
# Generate response
response = await llm_client.chat.completions.create(
model="gpt-4o",
messages=messages
)
assistant_message = response.choices[0].message.content
conversation.add_assistant_message(assistant_message)
return {
"response": assistant_message,
"turn_count": conversation.turn_count,
"in_flow": False
}
@app.post("/v1/conversations/{conversation_id}/flows")
async def start_flow(conversation_id: str, request: StartFlowRequest):
"""Start a guided flow."""
conversation = conversation_manager.get(conversation_id)
if not conversation:
raise HTTPException(404, "Conversation not found")
prompt = flow_manager.start_flow(conversation_id, request.flow_name)
if not prompt:
raise HTTPException(400, f"Unknown flow: {request.flow_name}")
conversation.add_assistant_message(prompt)
return {
"flow_started": request.flow_name,
"prompt": prompt
}
@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
"""Get conversation details."""
conversation = conversation_manager.get(conversation_id)
if not conversation:
raise HTTPException(404, "Conversation not found")
return {
"conversation_id": conversation.conversation_id,
"turn_count": conversation.turn_count,
"created_at": conversation.created_at.isoformat(),
"updated_at": conversation.updated_at.isoformat(),
"messages": [
{
"role": m.role.value,
"content": m.content,
"timestamp": m.timestamp.isoformat()
}
for m in conversation.messages
]
}
@app.get("/v1/conversations/{conversation_id}/history")
async def get_compressed_history(conversation_id: str):
"""Get compressed conversation history."""
conversation = conversation_manager.get(conversation_id)
if not conversation:
raise HTTPException(404, "Conversation not found")
if history_compressor:
compressed = await history_compressor.compress(conversation)
return {
"summary": compressed.summary,
"key_facts": compressed.key_facts,
"recent_turn_count": len([m for m in compressed.recent_messages if m.role == MessageRole.USER]),
"original_turn_count": compressed.original_turn_count
}
return {
"summary": "",
"key_facts": [],
"recent_turn_count": conversation.turn_count,
"original_turn_count": conversation.turn_count
}
@app.delete("/v1/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""Delete a conversation."""
conversation_manager.delete(conversation_id)
return {"status": "deleted"}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Chat Completions: https://platform.openai.com/docs/guides/chat
- LangChain Memory: https://python.langchain.com/docs/modules/memory/
- Anthropic Conversation Design: https://docs.anthropic.com/claude/docs/conversation-design
- Rasa Conversation Patterns: https://rasa.com/docs/rasa/conversation-driven-development
Conclusion
Effective multi-turn conversations require thoughtful state management. Track conversation state explicitly—don’t rely on the LLM to remember everything. Compress history intelligently when conversations grow long, preserving key facts and recent context while summarizing older exchanges. Handle topic changes gracefully—detect when users switch topics and decide whether to save context for potential return. Guided flows work well for structured tasks like order-taking or troubleshooting, where you need to collect specific information in sequence. Build your conversation system to be stateless at the API level but stateful in storage, enabling horizontal scaling. Monitor conversation metrics like turn count, topic changes, and flow completion rates to understand how users interact with your system. The goal is conversations that feel natural while reliably accomplishing user goals.
