Conversation History Management: Building Memory for Multi-Turn AI Applications

Introduction: Chatbots and conversational AI need memory. Without conversation history, every message exists in isolation—the model can’t reference what was said before, follow up on previous topics, or maintain coherent multi-turn dialogues. But history management is tricky: context windows are limited, old messages may be irrelevant, and naive approaches quickly hit token limits. This guide covers practical strategies for managing conversation history: storing messages efficiently, truncating to fit context windows, summarizing old conversations to preserve key information, and building memory systems that scale.

Conversation History
History Management: Message Storage, Window Truncation, History Summarization

Message Storage

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import uuid
import json

@dataclass
class Message:
    """A single conversation message."""
    
    role: str  # "user", "assistant", "system"
    content: str
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)
    metadata: dict = field(default_factory=dict)
    token_count: int = 0
    
    def to_dict(self) -> dict:
        """Convert to API format."""
        return {"role": self.role, "content": self.content}
    
    def to_json(self) -> str:
        """Serialize to JSON."""
        return json.dumps({
            "id": self.id,
            "role": self.role,
            "content": self.content,
            "timestamp": self.timestamp.isoformat(),
            "metadata": self.metadata,
            "token_count": self.token_count
        })
    
    @classmethod
    def from_json(cls, data: str) -> "Message":
        """Deserialize from JSON."""
        obj = json.loads(data)
        return cls(
            id=obj["id"],
            role=obj["role"],
            content=obj["content"],
            timestamp=datetime.fromisoformat(obj["timestamp"]),
            metadata=obj.get("metadata", {}),
            token_count=obj.get("token_count", 0)
        )

@dataclass
class Conversation:
    """A conversation with message history."""
    
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    messages: list[Message] = field(default_factory=list)
    system_prompt: str = None
    metadata: dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
    
    def add_message(self, role: str, content: str, **kwargs) -> Message:
        """Add a message to the conversation."""
        
        message = Message(role=role, content=content, **kwargs)
        self.messages.append(message)
        self.updated_at = datetime.utcnow()
        return message
    
    def get_messages_for_api(self) -> list[dict]:
        """Get messages in API format."""
        
        result = []
        
        if self.system_prompt:
            result.append({"role": "system", "content": self.system_prompt})
        
        for msg in self.messages:
            result.append(msg.to_dict())
        
        return result
    
    @property
    def total_tokens(self) -> int:
        """Total tokens in conversation."""
        return sum(m.token_count for m in self.messages)

class ConversationStore:
    """Store and retrieve conversations."""
    
    def __init__(self):
        self._conversations: dict[str, Conversation] = {}
    
    def create(self, system_prompt: str = None, **metadata) -> Conversation:
        """Create a new conversation."""
        
        conv = Conversation(system_prompt=system_prompt, metadata=metadata)
        self._conversations[conv.id] = conv
        return conv
    
    def get(self, conversation_id: str) -> Optional[Conversation]:
        """Get a conversation by ID."""
        return self._conversations.get(conversation_id)
    
    def delete(self, conversation_id: str) -> bool:
        """Delete a conversation."""
        
        if conversation_id in self._conversations:
            del self._conversations[conversation_id]
            return True
        return False
    
    def list_conversations(
        self,
        limit: int = 100,
        offset: int = 0
    ) -> list[Conversation]:
        """List conversations."""
        
        convs = sorted(
            self._conversations.values(),
            key=lambda c: c.updated_at,
            reverse=True
        )
        return convs[offset:offset + limit]

class RedisConversationStore:
    """Redis-backed conversation store."""
    
    def __init__(self, redis_client: Any, ttl: int = 86400):
        self.redis = redis_client
        self.ttl = ttl
    
    async def save(self, conversation: Conversation):
        """Save conversation to Redis."""
        
        key = f"conv:{conversation.id}"
        
        data = {
            "id": conversation.id,
            "system_prompt": conversation.system_prompt,
            "metadata": json.dumps(conversation.metadata),
            "created_at": conversation.created_at.isoformat(),
            "updated_at": conversation.updated_at.isoformat()
        }
        
        await self.redis.hset(key, mapping=data)
        
        # Store messages as list
        msg_key = f"conv:{conversation.id}:messages"
        await self.redis.delete(msg_key)
        
        for msg in conversation.messages:
            await self.redis.rpush(msg_key, msg.to_json())
        
        await self.redis.expire(key, self.ttl)
        await self.redis.expire(msg_key, self.ttl)
    
    async def load(self, conversation_id: str) -> Optional[Conversation]:
        """Load conversation from Redis."""
        
        key = f"conv:{conversation_id}"
        data = await self.redis.hgetall(key)
        
        if not data:
            return None
        
        conv = Conversation(
            id=data[b"id"].decode(),
            system_prompt=data.get(b"system_prompt", b"").decode() or None,
            metadata=json.loads(data.get(b"metadata", b"{}").decode()),
            created_at=datetime.fromisoformat(data[b"created_at"].decode()),
            updated_at=datetime.fromisoformat(data[b"updated_at"].decode())
        )
        
        # Load messages
        msg_key = f"conv:{conversation_id}:messages"
        messages = await self.redis.lrange(msg_key, 0, -1)
        
        conv.messages = [Message.from_json(m.decode()) for m in messages]
        
        return conv

Window Truncation

from dataclasses import dataclass
from typing import Any, Optional
import tiktoken

@dataclass
class TruncationResult:
    """Result of history truncation."""
    
    messages: list[Message]
    truncated_count: int
    total_tokens: int
    within_limit: bool

class TokenCounter:
    """Count tokens for messages."""
    
    def __init__(self, model: str = "gpt-4o-mini"):
        try:
            self.encoder = tiktoken.encoding_for_model(model)
        except KeyError:
            self.encoder = tiktoken.get_encoding("cl100k_base")
    
    def count(self, text: str) -> int:
        """Count tokens in text."""
        return len(self.encoder.encode(text))
    
    def count_message(self, message: Message) -> int:
        """Count tokens in a message."""
        
        # Message overhead (role, formatting)
        overhead = 4
        return overhead + self.count(message.content)
    
    def count_messages(self, messages: list[Message]) -> int:
        """Count total tokens in messages."""
        
        total = 2  # Reply priming
        for msg in messages:
            total += self.count_message(msg)
        return total

class HistoryTruncator:
    """Truncate conversation history to fit context window."""
    
    def __init__(self, counter: TokenCounter = None):
        self.counter = counter or TokenCounter()
    
    def truncate_to_limit(
        self,
        messages: list[Message],
        max_tokens: int,
        keep_system: bool = True,
        keep_recent: int = 2
    ) -> TruncationResult:
        """Truncate messages to fit token limit."""
        
        if not messages:
            return TruncationResult(
                messages=[],
                truncated_count=0,
                total_tokens=0,
                within_limit=True
            )
        
        # Separate system messages
        system_msgs = [m for m in messages if m.role == "system"]
        other_msgs = [m for m in messages if m.role != "system"]
        
        # Calculate system tokens
        system_tokens = sum(self.counter.count_message(m) for m in system_msgs)
        available = max_tokens - system_tokens
        
        if available <= 0:
            return TruncationResult(
                messages=system_msgs if keep_system else [],
                truncated_count=len(other_msgs),
                total_tokens=system_tokens,
                within_limit=False
            )
        
        # Keep recent messages
        recent = other_msgs[-keep_recent:] if keep_recent else []
        recent_tokens = sum(self.counter.count_message(m) for m in recent)
        
        if recent_tokens > available:
            # Even recent messages don't fit
            result_msgs = system_msgs if keep_system else []
            return TruncationResult(
                messages=result_msgs,
                truncated_count=len(other_msgs),
                total_tokens=system_tokens,
                within_limit=False
            )
        
        # Add older messages that fit
        remaining = available - recent_tokens
        older = other_msgs[:-keep_recent] if keep_recent else other_msgs
        
        kept_older = []
        for msg in reversed(older):
            msg_tokens = self.counter.count_message(msg)
            if msg_tokens <= remaining:
                kept_older.insert(0, msg)
                remaining -= msg_tokens
            else:
                break
        
        # Combine results
        result_msgs = []
        if keep_system:
            result_msgs.extend(system_msgs)
        result_msgs.extend(kept_older)
        result_msgs.extend(recent)
        
        total_tokens = self.counter.count_messages(result_msgs)
        truncated = len(messages) - len(result_msgs)
        
        return TruncationResult(
            messages=result_msgs,
            truncated_count=truncated,
            total_tokens=total_tokens,
            within_limit=total_tokens <= max_tokens
        )
    
    def truncate_by_turns(
        self,
        messages: list[Message],
        max_turns: int
    ) -> TruncationResult:
        """Truncate to keep only recent turns."""
        
        # A turn is a user message + assistant response
        system_msgs = [m for m in messages if m.role == "system"]
        other_msgs = [m for m in messages if m.role != "system"]
        
        # Count turns from the end
        turns = 0
        keep_from = len(other_msgs)
        
        for i in range(len(other_msgs) - 1, -1, -1):
            if other_msgs[i].role == "user":
                turns += 1
                if turns > max_turns:
                    break
                keep_from = i
        
        kept = other_msgs[keep_from:]
        result_msgs = system_msgs + kept
        
        return TruncationResult(
            messages=result_msgs,
            truncated_count=len(messages) - len(result_msgs),
            total_tokens=self.counter.count_messages(result_msgs),
            within_limit=True
        )

History Summarization

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class SummaryResult:
    """Result of history summarization."""
    
    summary: str
    original_tokens: int
    summary_tokens: int
    messages_summarized: int

class HistorySummarizer:
    """Summarize conversation history."""
    
    def __init__(self, client: Any, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model
        self.counter = TokenCounter()
    
    async def summarize(
        self,
        messages: list[Message],
        max_summary_tokens: int = 500
    ) -> SummaryResult:
        """Summarize conversation messages."""
        
        if not messages:
            return SummaryResult(
                summary="",
                original_tokens=0,
                summary_tokens=0,
                messages_summarized=0
            )
        
        # Format messages for summarization
        formatted = "\n".join(
            f"{m.role.upper()}: {m.content}"
            for m in messages
        )
        
        original_tokens = self.counter.count(formatted)
        
        prompt = f"""Summarize this conversation, preserving key information:
- Main topics discussed
- Important decisions or conclusions
- Any commitments or action items
- Key facts mentioned

Keep the summary concise (under {max_summary_tokens} tokens).

Conversation:
{formatted}

Summary:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=max_summary_tokens
        )
        
        summary = response.choices[0].message.content
        
        return SummaryResult(
            summary=summary,
            original_tokens=original_tokens,
            summary_tokens=self.counter.count(summary),
            messages_summarized=len(messages)
        )
    
    async def progressive_summarize(
        self,
        messages: list[Message],
        existing_summary: str = None,
        chunk_size: int = 10
    ) -> SummaryResult:
        """Progressively summarize, building on existing summary."""
        
        if not messages:
            return SummaryResult(
                summary=existing_summary or "",
                original_tokens=0,
                summary_tokens=self.counter.count(existing_summary or ""),
                messages_summarized=0
            )
        
        formatted = "\n".join(
            f"{m.role.upper()}: {m.content}"
            for m in messages
        )
        
        if existing_summary:
            prompt = f"""Update this conversation summary with new messages.

Previous summary:
{existing_summary}

New messages:
{formatted}

Updated summary (preserve important information from both):"""
        else:
            prompt = f"""Summarize this conversation:

{formatted}

Summary:"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500
        )
        
        summary = response.choices[0].message.content
        
        return SummaryResult(
            summary=summary,
            original_tokens=self.counter.count(formatted),
            summary_tokens=self.counter.count(summary),
            messages_summarized=len(messages)
        )

class SummarizingHistoryManager:
    """Manage history with automatic summarization."""
    
    def __init__(
        self,
        client: Any,
        max_tokens: int = 4000,
        summarize_threshold: int = 3000
    ):
        self.client = client
        self.max_tokens = max_tokens
        self.summarize_threshold = summarize_threshold
        self.counter = TokenCounter()
        self.truncator = HistoryTruncator(self.counter)
        self.summarizer = HistorySummarizer(client)
    
    async def prepare_context(
        self,
        conversation: Conversation,
        reserved_tokens: int = 1000
    ) -> list[dict]:
        """Prepare conversation context for API call."""
        
        available = self.max_tokens - reserved_tokens
        
        # Check if we need to summarize
        total_tokens = self.counter.count_messages(conversation.messages)
        
        if total_tokens <= available:
            return conversation.get_messages_for_api()
        
        # Summarize older messages
        messages = conversation.messages
        
        # Keep recent messages
        recent_count = 4  # Keep last 2 turns
        recent = messages[-recent_count:]
        older = messages[:-recent_count]
        
        if older:
            # Summarize older messages
            summary_result = await self.summarizer.summarize(older)
            
            # Create summary message
            summary_msg = Message(
                role="system",
                content=f"Previous conversation summary: {summary_result.summary}"
            )
            
            # Build result
            result = []
            
            if conversation.system_prompt:
                result.append({"role": "system", "content": conversation.system_prompt})
            
            result.append(summary_msg.to_dict())
            
            for msg in recent:
                result.append(msg.to_dict())
            
            return result
        
        # Just truncate if no older messages
        truncation = self.truncator.truncate_to_limit(
            messages, available, keep_recent=recent_count
        )
        
        result = []
        if conversation.system_prompt:
            result.append({"role": "system", "content": conversation.system_prompt})
        
        for msg in truncation.messages:
            if msg.role != "system":
                result.append(msg.to_dict())
        
        return result

Memory Patterns

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime

@dataclass
class MemoryEntry:
    """A memory entry."""
    
    content: str
    importance: float = 0.5
    timestamp: datetime = field(default_factory=datetime.utcnow)
    access_count: int = 0
    last_accessed: datetime = None
    metadata: dict = field(default_factory=dict)

class WorkingMemory:
    """Short-term working memory."""
    
    def __init__(self, capacity: int = 10):
        self.capacity = capacity
        self.entries: list[MemoryEntry] = []
    
    def add(self, content: str, importance: float = 0.5, **metadata):
        """Add entry to working memory."""
        
        entry = MemoryEntry(
            content=content,
            importance=importance,
            metadata=metadata
        )
        
        self.entries.append(entry)
        
        # Evict if over capacity
        if len(self.entries) > self.capacity:
            self._evict()
    
    def _evict(self):
        """Evict least important entry."""
        
        if not self.entries:
            return
        
        # Sort by importance and recency
        self.entries.sort(
            key=lambda e: (e.importance, e.timestamp.timestamp()),
            reverse=True
        )
        
        # Remove lowest priority
        self.entries.pop()
    
    def get_context(self) -> str:
        """Get working memory as context string."""
        
        if not self.entries:
            return ""
        
        return "\n".join(
            f"- {e.content}"
            for e in sorted(self.entries, key=lambda e: e.timestamp)
        )

class LongTermMemory:
    """Long-term memory with retrieval."""
    
    def __init__(self, client: Any, model: str = "text-embedding-3-small"):
        self.client = client
        self.model = model
        self.entries: list[MemoryEntry] = []
        self._embeddings: dict[int, list[float]] = {}
    
    async def store(self, content: str, importance: float = 0.5, **metadata):
        """Store entry in long-term memory."""
        
        entry = MemoryEntry(
            content=content,
            importance=importance,
            metadata=metadata
        )
        
        # Generate embedding
        response = await self.client.embeddings.create(
            model=self.model,
            input=content
        )
        
        idx = len(self.entries)
        self.entries.append(entry)
        self._embeddings[idx] = response.data[0].embedding
    
    async def retrieve(
        self,
        query: str,
        top_k: int = 5,
        min_similarity: float = 0.5
    ) -> list[MemoryEntry]:
        """Retrieve relevant memories."""
        
        if not self.entries:
            return []
        
        # Get query embedding
        response = await self.client.embeddings.create(
            model=self.model,
            input=query
        )
        query_emb = response.data[0].embedding
        
        # Calculate similarities
        import numpy as np
        
        similarities = []
        for idx, entry in enumerate(self.entries):
            emb = self._embeddings[idx]
            sim = np.dot(query_emb, emb) / (
                np.linalg.norm(query_emb) * np.linalg.norm(emb)
            )
            similarities.append((idx, sim))
        
        # Sort by similarity
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # Return top-k above threshold
        results = []
        for idx, sim in similarities[:top_k]:
            if sim >= min_similarity:
                entry = self.entries[idx]
                entry.access_count += 1
                entry.last_accessed = datetime.utcnow()
                results.append(entry)
        
        return results

class HybridMemory:
    """Hybrid memory combining working and long-term memory."""
    
    def __init__(self, client: Any):
        self.working = WorkingMemory(capacity=10)
        self.long_term = LongTermMemory(client)
    
    async def process_message(self, message: Message):
        """Process a message and update memories."""
        
        # Add to working memory
        self.working.add(
            content=f"{message.role}: {message.content}",
            importance=0.7 if message.role == "user" else 0.5
        )
        
        # Store important information in long-term
        if self._is_important(message):
            await self.long_term.store(
                content=message.content,
                importance=0.8,
                role=message.role,
                timestamp=message.timestamp.isoformat()
            )
    
    def _is_important(self, message: Message) -> bool:
        """Determine if message should go to long-term memory."""
        
        # Simple heuristics
        important_keywords = [
            "remember", "important", "don't forget",
            "my name is", "i prefer", "always"
        ]
        
        content_lower = message.content.lower()
        return any(kw in content_lower for kw in important_keywords)
    
    async def get_relevant_context(self, query: str) -> str:
        """Get relevant context from both memories."""
        
        # Get working memory
        working_context = self.working.get_context()
        
        # Retrieve from long-term
        memories = await self.long_term.retrieve(query, top_k=3)
        long_term_context = "\n".join(
            f"- {m.content}" for m in memories
        ) if memories else ""
        
        parts = []
        if working_context:
            parts.append(f"Recent context:\n{working_context}")
        if long_term_context:
            parts.append(f"Relevant memories:\n{long_term_context}")
        
        return "\n\n".join(parts)

Production History Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
store = ConversationStore()
history_manager = None  # Initialize with client

class CreateConversationRequest(BaseModel):
    system_prompt: Optional[str] = None
    metadata: Optional[dict] = None

class AddMessageRequest(BaseModel):
    role: str
    content: str
    metadata: Optional[dict] = None

class PrepareContextRequest(BaseModel):
    max_tokens: int = 4000
    reserved_tokens: int = 1000

@app.post("/v1/conversations")
async def create_conversation(request: CreateConversationRequest):
    """Create a new conversation."""
    
    conv = store.create(
        system_prompt=request.system_prompt,
        **(request.metadata or {})
    )
    
    return {
        "conversation_id": conv.id,
        "created_at": conv.created_at.isoformat()
    }

@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
    """Get conversation details."""
    
    conv = store.get(conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")
    
    return {
        "id": conv.id,
        "system_prompt": conv.system_prompt,
        "message_count": len(conv.messages),
        "total_tokens": conv.total_tokens,
        "created_at": conv.created_at.isoformat(),
        "updated_at": conv.updated_at.isoformat()
    }

@app.post("/v1/conversations/{conversation_id}/messages")
async def add_message(conversation_id: str, request: AddMessageRequest):
    """Add a message to conversation."""
    
    conv = store.get(conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")
    
    counter = TokenCounter()
    token_count = counter.count(request.content)
    
    message = conv.add_message(
        role=request.role,
        content=request.content,
        token_count=token_count,
        **(request.metadata or {})
    )
    
    return {
        "message_id": message.id,
        "token_count": token_count,
        "total_messages": len(conv.messages)
    }

@app.get("/v1/conversations/{conversation_id}/messages")
async def get_messages(
    conversation_id: str,
    limit: int = 100,
    offset: int = 0
):
    """Get conversation messages."""
    
    conv = store.get(conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")
    
    messages = conv.messages[offset:offset + limit]
    
    return {
        "messages": [
            {
                "id": m.id,
                "role": m.role,
                "content": m.content,
                "timestamp": m.timestamp.isoformat(),
                "token_count": m.token_count
            }
            for m in messages
        ],
        "total": len(conv.messages)
    }

@app.post("/v1/conversations/{conversation_id}/context")
async def prepare_context(
    conversation_id: str,
    request: PrepareContextRequest
):
    """Prepare optimized context for API call."""
    
    conv = store.get(conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")
    
    messages = await history_manager.prepare_context(
        conv,
        reserved_tokens=request.reserved_tokens
    )
    
    counter = TokenCounter()
    total_tokens = sum(counter.count(m["content"]) for m in messages)
    
    return {
        "messages": messages,
        "message_count": len(messages),
        "total_tokens": total_tokens
    }

@app.delete("/v1/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
    """Delete a conversation."""
    
    if store.delete(conversation_id):
        return {"deleted": True}
    
    raise HTTPException(status_code=404, detail="Conversation not found")

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Effective conversation history management is essential for building coherent multi-turn AI applications. Start with proper message storage that tracks metadata like timestamps and token counts. Implement truncation strategies that preserve recent context while staying within token limits—keeping the last few turns is usually more important than keeping everything. Use summarization to compress older history when simple truncation loses too much information. For sophisticated applications, implement hybrid memory systems that combine working memory for recent context with long-term memory for important facts that should persist across sessions. The key insight is that not all history is equally valuable—recent messages, user preferences, and key decisions matter more than routine exchanges. Design your history management to prioritize what matters while gracefully handling the constraints of context windows.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.