Introduction: Chatbots and conversational AI need memory. Without conversation history, every message exists in isolation—the model can’t reference what was said before, follow up on previous topics, or maintain coherent multi-turn dialogues. But history management is tricky: context windows are limited, old messages may be irrelevant, and naive approaches quickly hit token limits. This guide covers practical strategies for managing conversation history: storing messages efficiently, truncating to fit context windows, summarizing old conversations to preserve key information, and building memory systems that scale.

Message Storage
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import uuid
import json
@dataclass
class Message:
"""A single conversation message."""
role: str # "user", "assistant", "system"
content: str
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
metadata: dict = field(default_factory=dict)
token_count: int = 0
def to_dict(self) -> dict:
"""Convert to API format."""
return {"role": self.role, "content": self.content}
def to_json(self) -> str:
"""Serialize to JSON."""
return json.dumps({
"id": self.id,
"role": self.role,
"content": self.content,
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
"token_count": self.token_count
})
@classmethod
def from_json(cls, data: str) -> "Message":
"""Deserialize from JSON."""
obj = json.loads(data)
return cls(
id=obj["id"],
role=obj["role"],
content=obj["content"],
timestamp=datetime.fromisoformat(obj["timestamp"]),
metadata=obj.get("metadata", {}),
token_count=obj.get("token_count", 0)
)
@dataclass
class Conversation:
"""A conversation with message history."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
messages: list[Message] = field(default_factory=list)
system_prompt: str = None
metadata: dict = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
def add_message(self, role: str, content: str, **kwargs) -> Message:
"""Add a message to the conversation."""
message = Message(role=role, content=content, **kwargs)
self.messages.append(message)
self.updated_at = datetime.utcnow()
return message
def get_messages_for_api(self) -> list[dict]:
"""Get messages in API format."""
result = []
if self.system_prompt:
result.append({"role": "system", "content": self.system_prompt})
for msg in self.messages:
result.append(msg.to_dict())
return result
@property
def total_tokens(self) -> int:
"""Total tokens in conversation."""
return sum(m.token_count for m in self.messages)
class ConversationStore:
"""Store and retrieve conversations."""
def __init__(self):
self._conversations: dict[str, Conversation] = {}
def create(self, system_prompt: str = None, **metadata) -> Conversation:
"""Create a new conversation."""
conv = Conversation(system_prompt=system_prompt, metadata=metadata)
self._conversations[conv.id] = conv
return conv
def get(self, conversation_id: str) -> Optional[Conversation]:
"""Get a conversation by ID."""
return self._conversations.get(conversation_id)
def delete(self, conversation_id: str) -> bool:
"""Delete a conversation."""
if conversation_id in self._conversations:
del self._conversations[conversation_id]
return True
return False
def list_conversations(
self,
limit: int = 100,
offset: int = 0
) -> list[Conversation]:
"""List conversations."""
convs = sorted(
self._conversations.values(),
key=lambda c: c.updated_at,
reverse=True
)
return convs[offset:offset + limit]
class RedisConversationStore:
"""Redis-backed conversation store."""
def __init__(self, redis_client: Any, ttl: int = 86400):
self.redis = redis_client
self.ttl = ttl
async def save(self, conversation: Conversation):
"""Save conversation to Redis."""
key = f"conv:{conversation.id}"
data = {
"id": conversation.id,
"system_prompt": conversation.system_prompt,
"metadata": json.dumps(conversation.metadata),
"created_at": conversation.created_at.isoformat(),
"updated_at": conversation.updated_at.isoformat()
}
await self.redis.hset(key, mapping=data)
# Store messages as list
msg_key = f"conv:{conversation.id}:messages"
await self.redis.delete(msg_key)
for msg in conversation.messages:
await self.redis.rpush(msg_key, msg.to_json())
await self.redis.expire(key, self.ttl)
await self.redis.expire(msg_key, self.ttl)
async def load(self, conversation_id: str) -> Optional[Conversation]:
"""Load conversation from Redis."""
key = f"conv:{conversation_id}"
data = await self.redis.hgetall(key)
if not data:
return None
conv = Conversation(
id=data[b"id"].decode(),
system_prompt=data.get(b"system_prompt", b"").decode() or None,
metadata=json.loads(data.get(b"metadata", b"{}").decode()),
created_at=datetime.fromisoformat(data[b"created_at"].decode()),
updated_at=datetime.fromisoformat(data[b"updated_at"].decode())
)
# Load messages
msg_key = f"conv:{conversation_id}:messages"
messages = await self.redis.lrange(msg_key, 0, -1)
conv.messages = [Message.from_json(m.decode()) for m in messages]
return conv
Window Truncation
from dataclasses import dataclass
from typing import Any, Optional
import tiktoken
@dataclass
class TruncationResult:
"""Result of history truncation."""
messages: list[Message]
truncated_count: int
total_tokens: int
within_limit: bool
class TokenCounter:
"""Count tokens for messages."""
def __init__(self, model: str = "gpt-4o-mini"):
try:
self.encoder = tiktoken.encoding_for_model(model)
except KeyError:
self.encoder = tiktoken.get_encoding("cl100k_base")
def count(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoder.encode(text))
def count_message(self, message: Message) -> int:
"""Count tokens in a message."""
# Message overhead (role, formatting)
overhead = 4
return overhead + self.count(message.content)
def count_messages(self, messages: list[Message]) -> int:
"""Count total tokens in messages."""
total = 2 # Reply priming
for msg in messages:
total += self.count_message(msg)
return total
class HistoryTruncator:
"""Truncate conversation history to fit context window."""
def __init__(self, counter: TokenCounter = None):
self.counter = counter or TokenCounter()
def truncate_to_limit(
self,
messages: list[Message],
max_tokens: int,
keep_system: bool = True,
keep_recent: int = 2
) -> TruncationResult:
"""Truncate messages to fit token limit."""
if not messages:
return TruncationResult(
messages=[],
truncated_count=0,
total_tokens=0,
within_limit=True
)
# Separate system messages
system_msgs = [m for m in messages if m.role == "system"]
other_msgs = [m for m in messages if m.role != "system"]
# Calculate system tokens
system_tokens = sum(self.counter.count_message(m) for m in system_msgs)
available = max_tokens - system_tokens
if available <= 0:
return TruncationResult(
messages=system_msgs if keep_system else [],
truncated_count=len(other_msgs),
total_tokens=system_tokens,
within_limit=False
)
# Keep recent messages
recent = other_msgs[-keep_recent:] if keep_recent else []
recent_tokens = sum(self.counter.count_message(m) for m in recent)
if recent_tokens > available:
# Even recent messages don't fit
result_msgs = system_msgs if keep_system else []
return TruncationResult(
messages=result_msgs,
truncated_count=len(other_msgs),
total_tokens=system_tokens,
within_limit=False
)
# Add older messages that fit
remaining = available - recent_tokens
older = other_msgs[:-keep_recent] if keep_recent else other_msgs
kept_older = []
for msg in reversed(older):
msg_tokens = self.counter.count_message(msg)
if msg_tokens <= remaining:
kept_older.insert(0, msg)
remaining -= msg_tokens
else:
break
# Combine results
result_msgs = []
if keep_system:
result_msgs.extend(system_msgs)
result_msgs.extend(kept_older)
result_msgs.extend(recent)
total_tokens = self.counter.count_messages(result_msgs)
truncated = len(messages) - len(result_msgs)
return TruncationResult(
messages=result_msgs,
truncated_count=truncated,
total_tokens=total_tokens,
within_limit=total_tokens <= max_tokens
)
def truncate_by_turns(
self,
messages: list[Message],
max_turns: int
) -> TruncationResult:
"""Truncate to keep only recent turns."""
# A turn is a user message + assistant response
system_msgs = [m for m in messages if m.role == "system"]
other_msgs = [m for m in messages if m.role != "system"]
# Count turns from the end
turns = 0
keep_from = len(other_msgs)
for i in range(len(other_msgs) - 1, -1, -1):
if other_msgs[i].role == "user":
turns += 1
if turns > max_turns:
break
keep_from = i
kept = other_msgs[keep_from:]
result_msgs = system_msgs + kept
return TruncationResult(
messages=result_msgs,
truncated_count=len(messages) - len(result_msgs),
total_tokens=self.counter.count_messages(result_msgs),
within_limit=True
)
History Summarization
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class SummaryResult:
"""Result of history summarization."""
summary: str
original_tokens: int
summary_tokens: int
messages_summarized: int
class HistorySummarizer:
"""Summarize conversation history."""
def __init__(self, client: Any, model: str = "gpt-4o-mini"):
self.client = client
self.model = model
self.counter = TokenCounter()
async def summarize(
self,
messages: list[Message],
max_summary_tokens: int = 500
) -> SummaryResult:
"""Summarize conversation messages."""
if not messages:
return SummaryResult(
summary="",
original_tokens=0,
summary_tokens=0,
messages_summarized=0
)
# Format messages for summarization
formatted = "\n".join(
f"{m.role.upper()}: {m.content}"
for m in messages
)
original_tokens = self.counter.count(formatted)
prompt = f"""Summarize this conversation, preserving key information:
- Main topics discussed
- Important decisions or conclusions
- Any commitments or action items
- Key facts mentioned
Keep the summary concise (under {max_summary_tokens} tokens).
Conversation:
{formatted}
Summary:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_summary_tokens
)
summary = response.choices[0].message.content
return SummaryResult(
summary=summary,
original_tokens=original_tokens,
summary_tokens=self.counter.count(summary),
messages_summarized=len(messages)
)
async def progressive_summarize(
self,
messages: list[Message],
existing_summary: str = None,
chunk_size: int = 10
) -> SummaryResult:
"""Progressively summarize, building on existing summary."""
if not messages:
return SummaryResult(
summary=existing_summary or "",
original_tokens=0,
summary_tokens=self.counter.count(existing_summary or ""),
messages_summarized=0
)
formatted = "\n".join(
f"{m.role.upper()}: {m.content}"
for m in messages
)
if existing_summary:
prompt = f"""Update this conversation summary with new messages.
Previous summary:
{existing_summary}
New messages:
{formatted}
Updated summary (preserve important information from both):"""
else:
prompt = f"""Summarize this conversation:
{formatted}
Summary:"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=500
)
summary = response.choices[0].message.content
return SummaryResult(
summary=summary,
original_tokens=self.counter.count(formatted),
summary_tokens=self.counter.count(summary),
messages_summarized=len(messages)
)
class SummarizingHistoryManager:
"""Manage history with automatic summarization."""
def __init__(
self,
client: Any,
max_tokens: int = 4000,
summarize_threshold: int = 3000
):
self.client = client
self.max_tokens = max_tokens
self.summarize_threshold = summarize_threshold
self.counter = TokenCounter()
self.truncator = HistoryTruncator(self.counter)
self.summarizer = HistorySummarizer(client)
async def prepare_context(
self,
conversation: Conversation,
reserved_tokens: int = 1000
) -> list[dict]:
"""Prepare conversation context for API call."""
available = self.max_tokens - reserved_tokens
# Check if we need to summarize
total_tokens = self.counter.count_messages(conversation.messages)
if total_tokens <= available:
return conversation.get_messages_for_api()
# Summarize older messages
messages = conversation.messages
# Keep recent messages
recent_count = 4 # Keep last 2 turns
recent = messages[-recent_count:]
older = messages[:-recent_count]
if older:
# Summarize older messages
summary_result = await self.summarizer.summarize(older)
# Create summary message
summary_msg = Message(
role="system",
content=f"Previous conversation summary: {summary_result.summary}"
)
# Build result
result = []
if conversation.system_prompt:
result.append({"role": "system", "content": conversation.system_prompt})
result.append(summary_msg.to_dict())
for msg in recent:
result.append(msg.to_dict())
return result
# Just truncate if no older messages
truncation = self.truncator.truncate_to_limit(
messages, available, keep_recent=recent_count
)
result = []
if conversation.system_prompt:
result.append({"role": "system", "content": conversation.system_prompt})
for msg in truncation.messages:
if msg.role != "system":
result.append(msg.to_dict())
return result
Memory Patterns
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
@dataclass
class MemoryEntry:
"""A memory entry."""
content: str
importance: float = 0.5
timestamp: datetime = field(default_factory=datetime.utcnow)
access_count: int = 0
last_accessed: datetime = None
metadata: dict = field(default_factory=dict)
class WorkingMemory:
"""Short-term working memory."""
def __init__(self, capacity: int = 10):
self.capacity = capacity
self.entries: list[MemoryEntry] = []
def add(self, content: str, importance: float = 0.5, **metadata):
"""Add entry to working memory."""
entry = MemoryEntry(
content=content,
importance=importance,
metadata=metadata
)
self.entries.append(entry)
# Evict if over capacity
if len(self.entries) > self.capacity:
self._evict()
def _evict(self):
"""Evict least important entry."""
if not self.entries:
return
# Sort by importance and recency
self.entries.sort(
key=lambda e: (e.importance, e.timestamp.timestamp()),
reverse=True
)
# Remove lowest priority
self.entries.pop()
def get_context(self) -> str:
"""Get working memory as context string."""
if not self.entries:
return ""
return "\n".join(
f"- {e.content}"
for e in sorted(self.entries, key=lambda e: e.timestamp)
)
class LongTermMemory:
"""Long-term memory with retrieval."""
def __init__(self, client: Any, model: str = "text-embedding-3-small"):
self.client = client
self.model = model
self.entries: list[MemoryEntry] = []
self._embeddings: dict[int, list[float]] = {}
async def store(self, content: str, importance: float = 0.5, **metadata):
"""Store entry in long-term memory."""
entry = MemoryEntry(
content=content,
importance=importance,
metadata=metadata
)
# Generate embedding
response = await self.client.embeddings.create(
model=self.model,
input=content
)
idx = len(self.entries)
self.entries.append(entry)
self._embeddings[idx] = response.data[0].embedding
async def retrieve(
self,
query: str,
top_k: int = 5,
min_similarity: float = 0.5
) -> list[MemoryEntry]:
"""Retrieve relevant memories."""
if not self.entries:
return []
# Get query embedding
response = await self.client.embeddings.create(
model=self.model,
input=query
)
query_emb = response.data[0].embedding
# Calculate similarities
import numpy as np
similarities = []
for idx, entry in enumerate(self.entries):
emb = self._embeddings[idx]
sim = np.dot(query_emb, emb) / (
np.linalg.norm(query_emb) * np.linalg.norm(emb)
)
similarities.append((idx, sim))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
# Return top-k above threshold
results = []
for idx, sim in similarities[:top_k]:
if sim >= min_similarity:
entry = self.entries[idx]
entry.access_count += 1
entry.last_accessed = datetime.utcnow()
results.append(entry)
return results
class HybridMemory:
"""Hybrid memory combining working and long-term memory."""
def __init__(self, client: Any):
self.working = WorkingMemory(capacity=10)
self.long_term = LongTermMemory(client)
async def process_message(self, message: Message):
"""Process a message and update memories."""
# Add to working memory
self.working.add(
content=f"{message.role}: {message.content}",
importance=0.7 if message.role == "user" else 0.5
)
# Store important information in long-term
if self._is_important(message):
await self.long_term.store(
content=message.content,
importance=0.8,
role=message.role,
timestamp=message.timestamp.isoformat()
)
def _is_important(self, message: Message) -> bool:
"""Determine if message should go to long-term memory."""
# Simple heuristics
important_keywords = [
"remember", "important", "don't forget",
"my name is", "i prefer", "always"
]
content_lower = message.content.lower()
return any(kw in content_lower for kw in important_keywords)
async def get_relevant_context(self, query: str) -> str:
"""Get relevant context from both memories."""
# Get working memory
working_context = self.working.get_context()
# Retrieve from long-term
memories = await self.long_term.retrieve(query, top_k=3)
long_term_context = "\n".join(
f"- {m.content}" for m in memories
) if memories else ""
parts = []
if working_context:
parts.append(f"Recent context:\n{working_context}")
if long_term_context:
parts.append(f"Relevant memories:\n{long_term_context}")
return "\n\n".join(parts)
Production History Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
store = ConversationStore()
history_manager = None # Initialize with client
class CreateConversationRequest(BaseModel):
system_prompt: Optional[str] = None
metadata: Optional[dict] = None
class AddMessageRequest(BaseModel):
role: str
content: str
metadata: Optional[dict] = None
class PrepareContextRequest(BaseModel):
max_tokens: int = 4000
reserved_tokens: int = 1000
@app.post("/v1/conversations")
async def create_conversation(request: CreateConversationRequest):
"""Create a new conversation."""
conv = store.create(
system_prompt=request.system_prompt,
**(request.metadata or {})
)
return {
"conversation_id": conv.id,
"created_at": conv.created_at.isoformat()
}
@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
"""Get conversation details."""
conv = store.get(conversation_id)
if not conv:
raise HTTPException(status_code=404, detail="Conversation not found")
return {
"id": conv.id,
"system_prompt": conv.system_prompt,
"message_count": len(conv.messages),
"total_tokens": conv.total_tokens,
"created_at": conv.created_at.isoformat(),
"updated_at": conv.updated_at.isoformat()
}
@app.post("/v1/conversations/{conversation_id}/messages")
async def add_message(conversation_id: str, request: AddMessageRequest):
"""Add a message to conversation."""
conv = store.get(conversation_id)
if not conv:
raise HTTPException(status_code=404, detail="Conversation not found")
counter = TokenCounter()
token_count = counter.count(request.content)
message = conv.add_message(
role=request.role,
content=request.content,
token_count=token_count,
**(request.metadata or {})
)
return {
"message_id": message.id,
"token_count": token_count,
"total_messages": len(conv.messages)
}
@app.get("/v1/conversations/{conversation_id}/messages")
async def get_messages(
conversation_id: str,
limit: int = 100,
offset: int = 0
):
"""Get conversation messages."""
conv = store.get(conversation_id)
if not conv:
raise HTTPException(status_code=404, detail="Conversation not found")
messages = conv.messages[offset:offset + limit]
return {
"messages": [
{
"id": m.id,
"role": m.role,
"content": m.content,
"timestamp": m.timestamp.isoformat(),
"token_count": m.token_count
}
for m in messages
],
"total": len(conv.messages)
}
@app.post("/v1/conversations/{conversation_id}/context")
async def prepare_context(
conversation_id: str,
request: PrepareContextRequest
):
"""Prepare optimized context for API call."""
conv = store.get(conversation_id)
if not conv:
raise HTTPException(status_code=404, detail="Conversation not found")
messages = await history_manager.prepare_context(
conv,
reserved_tokens=request.reserved_tokens
)
counter = TokenCounter()
total_tokens = sum(counter.count(m["content"]) for m in messages)
return {
"messages": messages,
"message_count": len(messages),
"total_tokens": total_tokens
}
@app.delete("/v1/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""Delete a conversation."""
if store.delete(conversation_id):
return {"deleted": True}
raise HTTPException(status_code=404, detail="Conversation not found")
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LangChain Memory: https://python.langchain.com/docs/modules/memory/
- MemGPT: https://memgpt.ai/
- OpenAI Chat Completions: https://platform.openai.com/docs/guides/chat
- tiktoken: https://github.com/openai/tiktoken
Conclusion
Effective conversation history management is essential for building coherent multi-turn AI applications. Start with proper message storage that tracks metadata like timestamps and token counts. Implement truncation strategies that preserve recent context while staying within token limits—keeping the last few turns is usually more important than keeping everything. Use summarization to compress older history when simple truncation loses too much information. For sophisticated applications, implement hybrid memory systems that combine working memory for recent context with long-term memory for important facts that should persist across sessions. The key insight is that not all history is equally valuable—recent messages, user preferences, and key decisions matter more than routine exchanges. Design your history management to prioritize what matters while gracefully handling the constraints of context windows.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.