Conversation Memory Patterns: Building Stateful LLM Applications

Introduction: LLMs are stateless—each request starts fresh with no memory of previous interactions. Building conversational applications requires implementing memory systems that maintain context across turns while staying within token limits. The challenge is balancing completeness (keeping all relevant context) with efficiency (not wasting tokens on irrelevant history). This guide covers practical memory patterns: buffer memory for recent messages, summary memory for compressing long conversations, vector memory for semantic retrieval of relevant past context, and hybrid approaches that combine multiple strategies. Whether you’re building a chatbot, assistant, or multi-turn reasoning system, effective memory management is essential for coherent, contextual responses.

Conversation Memory
Conversation Memory: Buffer, Summary, and Vector Memory Patterns

Buffer Memory

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from abc import ABC, abstractmethod
from enum import Enum

class MessageRole(Enum):
    """Message roles."""
    
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"
    FUNCTION = "function"
    TOOL = "tool"

@dataclass
class Message:
    """A conversation message."""
    
    role: MessageRole
    content: str
    timestamp: datetime = field(default_factory=datetime.utcnow)
    metadata: dict = field(default_factory=dict)
    token_count: int = 0
    
    def to_dict(self) -> dict:
        """Convert to API format."""
        return {
            "role": self.role.value,
            "content": self.content
        }

class Memory(ABC):
    """Abstract memory interface."""
    
    @abstractmethod
    def add(self, message: Message) -> None:
        """Add message to memory."""
        pass
    
    @abstractmethod
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get messages from memory."""
        pass
    
    @abstractmethod
    def clear(self) -> None:
        """Clear memory."""
        pass

class BufferMemory(Memory):
    """Simple buffer memory - keeps last N messages."""
    
    def __init__(self, max_messages: int = 20):
        self.max_messages = max_messages
        self._messages: list[Message] = []
    
    def add(self, message: Message) -> None:
        """Add message, removing oldest if at capacity."""
        
        self._messages.append(message)
        
        # Remove oldest messages if over limit
        while len(self._messages) > self.max_messages:
            self._messages.pop(0)
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get all buffered messages."""
        
        if max_tokens is None:
            return list(self._messages)
        
        # Return messages that fit within token limit
        result = []
        total_tokens = 0
        
        for msg in reversed(self._messages):
            if total_tokens + msg.token_count > max_tokens:
                break
            result.insert(0, msg)
            total_tokens += msg.token_count
        
        return result
    
    def clear(self) -> None:
        """Clear buffer."""
        self._messages.clear()

class TokenBufferMemory(Memory):
    """Buffer memory with token limit."""
    
    def __init__(
        self,
        max_tokens: int = 4000,
        tokenizer: Any = None
    ):
        self.max_tokens = max_tokens
        self.tokenizer = tokenizer
        self._messages: list[Message] = []
        self._total_tokens = 0
    
    def _count_tokens(self, text: str) -> int:
        """Count tokens in text."""
        
        if self.tokenizer:
            return len(self.tokenizer.encode(text))
        # Rough estimate: 4 chars per token
        return len(text) // 4
    
    def add(self, message: Message) -> None:
        """Add message, removing oldest to stay within token limit."""
        
        # Count tokens if not already set
        if message.token_count == 0:
            message.token_count = self._count_tokens(message.content)
        
        self._messages.append(message)
        self._total_tokens += message.token_count
        
        # Remove oldest messages to stay within limit
        while self._total_tokens > self.max_tokens and len(self._messages) > 1:
            removed = self._messages.pop(0)
            self._total_tokens -= removed.token_count
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get messages within token limit."""
        
        limit = min(max_tokens or self.max_tokens, self.max_tokens)
        
        result = []
        total = 0
        
        for msg in reversed(self._messages):
            if total + msg.token_count > limit:
                break
            result.insert(0, msg)
            total += msg.token_count
        
        return result
    
    def clear(self) -> None:
        """Clear buffer."""
        self._messages.clear()
        self._total_tokens = 0

class WindowBufferMemory(Memory):
    """Sliding window buffer with configurable overlap."""
    
    def __init__(
        self,
        window_size: int = 10,
        overlap: int = 2
    ):
        self.window_size = window_size
        self.overlap = overlap
        self._messages: list[Message] = []
        self._windows: list[list[Message]] = []
    
    def add(self, message: Message) -> None:
        """Add message to current window."""
        
        self._messages.append(message)
        
        # Create new window when current is full
        if len(self._messages) >= self.window_size:
            self._windows.append(self._messages[:-self.overlap])
            self._messages = self._messages[-self.overlap:]
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get current window messages."""
        return list(self._messages)
    
    def get_all_windows(self) -> list[list[Message]]:
        """Get all windows including current."""
        return self._windows + [self._messages]
    
    def clear(self) -> None:
        """Clear all windows."""
        self._messages.clear()
        self._windows.clear()

Summary Memory

from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime

@dataclass
class ConversationSummary:
    """A conversation summary."""
    
    content: str
    message_count: int
    created_at: datetime
    token_count: int = 0

class SummaryMemory(Memory):
    """Memory that summarizes old messages."""
    
    def __init__(
        self,
        llm_client: Any,
        model: str = "gpt-4o-mini",
        buffer_size: int = 10,
        summary_max_tokens: int = 500
    ):
        self.llm_client = llm_client
        self.model = model
        self.buffer_size = buffer_size
        self.summary_max_tokens = summary_max_tokens
        
        self._buffer: list[Message] = []
        self._summary: Optional[ConversationSummary] = None
    
    def add(self, message: Message) -> None:
        """Add message, summarizing when buffer is full."""
        
        self._buffer.append(message)
        
        # Summarize when buffer exceeds limit
        if len(self._buffer) > self.buffer_size:
            self._summarize_buffer()
    
    def _summarize_buffer(self) -> None:
        """Summarize oldest messages in buffer."""
        
        import asyncio
        
        # Get messages to summarize (keep recent ones)
        to_summarize = self._buffer[:-self.buffer_size // 2]
        self._buffer = self._buffer[-self.buffer_size // 2:]
        
        # Build summary prompt
        existing_summary = ""
        if self._summary:
            existing_summary = f"Previous summary:\n{self._summary.content}\n\n"
        
        messages_text = "\n".join([
            f"{m.role.value}: {m.content}"
            for m in to_summarize
        ])
        
        prompt = f"""{existing_summary}New messages to incorporate:
{messages_text}

Provide a concise summary of the conversation so far, capturing:
- Key topics discussed
- Important decisions or conclusions
- Any pending questions or tasks
- Relevant context for continuing the conversation

Summary:"""
        
        # Generate summary (sync wrapper for async)
        loop = asyncio.get_event_loop()
        response = loop.run_until_complete(
            self.llm_client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=self.summary_max_tokens
            )
        )
        
        summary_text = response.choices[0].message.content
        
        self._summary = ConversationSummary(
            content=summary_text,
            message_count=(self._summary.message_count if self._summary else 0) + len(to_summarize),
            created_at=datetime.utcnow(),
            token_count=len(summary_text) // 4
        )
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get summary + recent messages."""
        
        messages = []
        
        # Add summary as system context
        if self._summary:
            messages.append(Message(
                role=MessageRole.SYSTEM,
                content=f"Conversation summary:\n{self._summary.content}",
                token_count=self._summary.token_count
            ))
        
        # Add recent buffer messages
        messages.extend(self._buffer)
        
        return messages
    
    def clear(self) -> None:
        """Clear memory."""
        self._buffer.clear()
        self._summary = None

class IncrementalSummaryMemory(Memory):
    """Memory with incremental summarization."""
    
    def __init__(
        self,
        llm_client: Any,
        model: str = "gpt-4o-mini",
        summarize_every: int = 5
    ):
        self.llm_client = llm_client
        self.model = model
        self.summarize_every = summarize_every
        
        self._messages: list[Message] = []
        self._running_summary: str = ""
        self._message_count = 0
    
    def add(self, message: Message) -> None:
        """Add message with incremental summarization."""
        
        self._messages.append(message)
        self._message_count += 1
        
        # Incrementally update summary
        if self._message_count % self.summarize_every == 0:
            self._update_summary()
    
    async def _update_summary_async(self) -> None:
        """Update running summary asynchronously."""
        
        recent = self._messages[-self.summarize_every:]
        recent_text = "\n".join([
            f"{m.role.value}: {m.content}"
            for m in recent
        ])
        
        prompt = f"""Current summary:
{self._running_summary or "No previous summary."}

Recent messages:
{recent_text}

Update the summary to incorporate the recent messages. Keep it concise but comprehensive."""
        
        response = await self.llm_client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=300
        )
        
        self._running_summary = response.choices[0].message.content
        
        # Keep only recent messages after summarizing
        self._messages = self._messages[-self.summarize_every:]
    
    def _update_summary(self) -> None:
        """Sync wrapper for summary update."""
        import asyncio
        try:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(self._update_summary_async())
        except RuntimeError:
            asyncio.run(self._update_summary_async())
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get summary + recent messages."""
        
        messages = []
        
        if self._running_summary:
            messages.append(Message(
                role=MessageRole.SYSTEM,
                content=f"Conversation context:\n{self._running_summary}"
            ))
        
        messages.extend(self._messages)
        return messages
    
    def clear(self) -> None:
        """Clear memory."""
        self._messages.clear()
        self._running_summary = ""
        self._message_count = 0

Vector Memory

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import hashlib

@dataclass
class MemoryEntry:
    """A memory entry with embedding."""
    
    id: str
    message: Message
    embedding: list[float] = None
    importance: float = 1.0
    access_count: int = 0
    last_accessed: datetime = None

class VectorMemory(Memory):
    """Memory with semantic retrieval."""
    
    def __init__(
        self,
        embedding_model: Any,
        max_entries: int = 1000,
        retrieval_k: int = 5
    ):
        self.embedding_model = embedding_model
        self.max_entries = max_entries
        self.retrieval_k = retrieval_k
        
        self._entries: dict[str, MemoryEntry] = {}
        self._recent_buffer: list[Message] = []
        self._buffer_size = 5
    
    def _generate_id(self, message: Message) -> str:
        """Generate unique ID for message."""
        
        content = f"{message.role.value}:{message.content}:{message.timestamp.isoformat()}"
        return hashlib.md5(content.encode()).hexdigest()
    
    async def add_async(self, message: Message) -> None:
        """Add message with embedding."""
        
        # Add to recent buffer
        self._recent_buffer.append(message)
        if len(self._recent_buffer) > self._buffer_size:
            self._recent_buffer.pop(0)
        
        # Generate embedding
        embedding = await self.embedding_model.embed([message.content])
        
        entry = MemoryEntry(
            id=self._generate_id(message),
            message=message,
            embedding=embedding[0],
            importance=self._calculate_importance(message)
        )
        
        self._entries[entry.id] = entry
        
        # Prune if over limit
        if len(self._entries) > self.max_entries:
            self._prune_entries()
    
    def add(self, message: Message) -> None:
        """Sync add (embedding deferred)."""
        
        self._recent_buffer.append(message)
        if len(self._recent_buffer) > self._buffer_size:
            self._recent_buffer.pop(0)
        
        entry = MemoryEntry(
            id=self._generate_id(message),
            message=message,
            embedding=None,  # Will be computed on retrieval
            importance=self._calculate_importance(message)
        )
        
        self._entries[entry.id] = entry
    
    def _calculate_importance(self, message: Message) -> float:
        """Calculate message importance."""
        
        importance = 1.0
        
        # Questions are important
        if "?" in message.content:
            importance += 0.3
        
        # Longer messages may be more important
        if len(message.content) > 200:
            importance += 0.2
        
        # User messages slightly more important
        if message.role == MessageRole.USER:
            importance += 0.1
        
        return min(importance, 2.0)
    
    def _prune_entries(self) -> None:
        """Remove least important entries."""
        
        # Score entries by importance and recency
        scored = []
        now = datetime.utcnow()
        
        for entry in self._entries.values():
            age_hours = (now - entry.message.timestamp).total_seconds() / 3600
            recency_score = 1.0 / (1.0 + age_hours * 0.1)
            access_score = entry.access_count * 0.1
            
            score = entry.importance + recency_score + access_score
            scored.append((entry.id, score))
        
        # Sort by score and keep top entries
        scored.sort(key=lambda x: x[1], reverse=True)
        keep_ids = set(id for id, _ in scored[:self.max_entries])
        
        self._entries = {
            id: entry for id, entry in self._entries.items()
            if id in keep_ids
        }
    
    async def retrieve(
        self,
        query: str,
        k: int = None
    ) -> list[Message]:
        """Retrieve relevant messages."""
        
        k = k or self.retrieval_k
        
        # Get query embedding
        query_embedding = await self.embedding_model.embed([query])
        query_embedding = query_embedding[0]
        
        # Ensure all entries have embeddings
        for entry in self._entries.values():
            if entry.embedding is None:
                emb = await self.embedding_model.embed([entry.message.content])
                entry.embedding = emb[0]
        
        # Calculate similarities
        similarities = []
        for entry in self._entries.values():
            sim = self._cosine_similarity(query_embedding, entry.embedding)
            similarities.append((entry, sim))
        
        # Sort by similarity
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # Update access counts and return top k
        result = []
        for entry, _ in similarities[:k]:
            entry.access_count += 1
            entry.last_accessed = datetime.utcnow()
            result.append(entry.message)
        
        return result
    
    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        """Compute cosine similarity."""
        
        import math
        
        dot = sum(x * y for x, y in zip(a, b))
        norm_a = math.sqrt(sum(x * x for x in a))
        norm_b = math.sqrt(sum(x * x for x in b))
        
        return dot / (norm_a * norm_b) if norm_a and norm_b else 0.0
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get recent buffer messages."""
        return list(self._recent_buffer)
    
    def clear(self) -> None:
        """Clear memory."""
        self._entries.clear()
        self._recent_buffer.clear()

class HybridMemory(Memory):
    """Combines buffer, summary, and vector memory."""
    
    def __init__(
        self,
        llm_client: Any,
        embedding_model: Any,
        buffer_size: int = 10,
        summary_threshold: int = 20,
        vector_k: int = 3
    ):
        self.buffer = BufferMemory(max_messages=buffer_size)
        self.summary = SummaryMemory(
            llm_client=llm_client,
            buffer_size=summary_threshold
        )
        self.vector = VectorMemory(
            embedding_model=embedding_model,
            retrieval_k=vector_k
        )
        
        self._message_count = 0
    
    def add(self, message: Message) -> None:
        """Add to all memory systems."""
        
        self.buffer.add(message)
        self.summary.add(message)
        self.vector.add(message)
        self._message_count += 1
    
    async def get_context(
        self,
        current_query: str,
        max_tokens: int = 4000
    ) -> list[Message]:
        """Get optimized context combining all memory types."""
        
        messages = []
        token_budget = max_tokens
        
        # 1. Add summary (if exists)
        summary_messages = self.summary.get_messages()
        for msg in summary_messages:
            if msg.role == MessageRole.SYSTEM:
                messages.append(msg)
                token_budget -= msg.token_count or len(msg.content) // 4
        
        # 2. Add relevant vector memories
        if self._message_count > 10:
            relevant = await self.vector.retrieve(current_query, k=3)
            for msg in relevant:
                if msg not in [m for m in self.buffer.get_messages()]:
                    tokens = msg.token_count or len(msg.content) // 4
                    if token_budget - tokens > 1000:  # Reserve for buffer
                        messages.append(Message(
                            role=MessageRole.SYSTEM,
                            content=f"Relevant past context:\n{msg.role.value}: {msg.content}"
                        ))
                        token_budget -= tokens
        
        # 3. Add recent buffer (always include)
        buffer_messages = self.buffer.get_messages()
        messages.extend(buffer_messages)
        
        return messages
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get buffer messages (sync version)."""
        return self.buffer.get_messages(max_tokens)
    
    def clear(self) -> None:
        """Clear all memory."""
        self.buffer.clear()
        self.summary.clear()
        self.vector.clear()
        self._message_count = 0

Entity Memory

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime

@dataclass
class Entity:
    """An entity mentioned in conversation."""
    
    name: str
    entity_type: str
    description: str
    mentions: list[datetime] = field(default_factory=list)
    attributes: dict = field(default_factory=dict)
    relationships: list[tuple[str, str]] = field(default_factory=list)

class EntityMemory(Memory):
    """Memory that tracks entities mentioned in conversation."""
    
    def __init__(
        self,
        llm_client: Any,
        model: str = "gpt-4o-mini"
    ):
        self.llm_client = llm_client
        self.model = model
        
        self._entities: dict[str, Entity] = {}
        self._buffer: list[Message] = []
        self._buffer_size = 10
    
    def add(self, message: Message) -> None:
        """Add message and extract entities."""
        
        self._buffer.append(message)
        if len(self._buffer) > self._buffer_size:
            self._buffer.pop(0)
        
        # Extract entities asynchronously would be called here
        # For sync, we defer extraction
    
    async def extract_entities(self, message: Message) -> list[Entity]:
        """Extract entities from message."""
        
        prompt = f"""Extract entities from this message:

"{message.content}"

For each entity, provide:
- name: The entity name
- type: person, organization, product, concept, location, etc.
- description: Brief description based on context

Return JSON array:
[{{"name": "...", "type": "...", "description": "..."}}]

If no entities, return empty array: []"""
        
        response = await self.llm_client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        import json
        import re
        
        content = response.choices[0].message.content
        json_match = re.search(r'\[[\s\S]*\]', content)
        
        entities = []
        if json_match:
            try:
                data = json.loads(json_match.group(0))
                for item in data:
                    entity = self._get_or_create_entity(
                        item["name"],
                        item["type"],
                        item.get("description", "")
                    )
                    entity.mentions.append(message.timestamp)
                    entities.append(entity)
            except json.JSONDecodeError:
                pass
        
        return entities
    
    def _get_or_create_entity(
        self,
        name: str,
        entity_type: str,
        description: str
    ) -> Entity:
        """Get existing entity or create new one."""
        
        key = name.lower()
        
        if key in self._entities:
            entity = self._entities[key]
            # Update description if new one is longer
            if len(description) > len(entity.description):
                entity.description = description
            return entity
        
        entity = Entity(
            name=name,
            entity_type=entity_type,
            description=description
        )
        self._entities[key] = entity
        return entity
    
    def get_entity(self, name: str) -> Optional[Entity]:
        """Get entity by name."""
        return self._entities.get(name.lower())
    
    def get_entities_by_type(self, entity_type: str) -> list[Entity]:
        """Get all entities of a type."""
        return [
            e for e in self._entities.values()
            if e.entity_type == entity_type
        ]
    
    def get_entity_context(self) -> str:
        """Get entity context for prompts."""
        
        if not self._entities:
            return ""
        
        lines = ["Known entities:"]
        for entity in self._entities.values():
            lines.append(f"- {entity.name} ({entity.type}): {entity.description}")
        
        return "\n".join(lines)
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get buffer with entity context."""
        
        messages = []
        
        entity_context = self.get_entity_context()
        if entity_context:
            messages.append(Message(
                role=MessageRole.SYSTEM,
                content=entity_context
            ))
        
        messages.extend(self._buffer)
        return messages
    
    def clear(self) -> None:
        """Clear memory."""
        self._entities.clear()
        self._buffer.clear()

class KnowledgeGraphMemory(Memory):
    """Memory with knowledge graph structure."""
    
    def __init__(self, llm_client: Any):
        self.llm_client = llm_client
        
        self._nodes: dict[str, dict] = {}  # id -> node data
        self._edges: list[tuple[str, str, str]] = []  # (from, relation, to)
        self._buffer: list[Message] = []
    
    def add(self, message: Message) -> None:
        """Add message to buffer."""
        self._buffer.append(message)
        if len(self._buffer) > 10:
            self._buffer.pop(0)
    
    def add_node(
        self,
        node_id: str,
        node_type: str,
        properties: dict = None
    ) -> None:
        """Add node to graph."""
        
        self._nodes[node_id] = {
            "type": node_type,
            "properties": properties or {}
        }
    
    def add_edge(
        self,
        from_id: str,
        relation: str,
        to_id: str
    ) -> None:
        """Add edge to graph."""
        
        self._edges.append((from_id, relation, to_id))
    
    def get_related(self, node_id: str, depth: int = 1) -> dict:
        """Get related nodes up to depth."""
        
        related = {"nodes": {}, "edges": []}
        visited = set()
        queue = [(node_id, 0)]
        
        while queue:
            current_id, current_depth = queue.pop(0)
            
            if current_id in visited or current_depth > depth:
                continue
            
            visited.add(current_id)
            
            if current_id in self._nodes:
                related["nodes"][current_id] = self._nodes[current_id]
            
            # Find connected edges
            for from_id, relation, to_id in self._edges:
                if from_id == current_id:
                    related["edges"].append((from_id, relation, to_id))
                    if to_id not in visited:
                        queue.append((to_id, current_depth + 1))
                elif to_id == current_id:
                    related["edges"].append((from_id, relation, to_id))
                    if from_id not in visited:
                        queue.append((from_id, current_depth + 1))
        
        return related
    
    def get_graph_context(self, relevant_nodes: list[str] = None) -> str:
        """Get graph context for prompts."""
        
        if not self._nodes:
            return ""
        
        lines = ["Knowledge graph:"]
        
        nodes_to_include = relevant_nodes or list(self._nodes.keys())[:10]
        
        for node_id in nodes_to_include:
            if node_id in self._nodes:
                node = self._nodes[node_id]
                lines.append(f"- {node_id} ({node['type']})")
        
        lines.append("\nRelationships:")
        for from_id, relation, to_id in self._edges[:20]:
            lines.append(f"- {from_id} --[{relation}]--> {to_id}")
        
        return "\n".join(lines)
    
    def get_messages(self, max_tokens: int = None) -> list[Message]:
        """Get buffer with graph context."""
        
        messages = []
        
        graph_context = self.get_graph_context()
        if graph_context:
            messages.append(Message(
                role=MessageRole.SYSTEM,
                content=graph_context
            ))
        
        messages.extend(self._buffer)
        return messages
    
    def clear(self) -> None:
        """Clear memory."""
        self._nodes.clear()
        self._edges.clear()
        self._buffer.clear()

Production Memory Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from datetime import datetime

app = FastAPI()

# In-memory storage (use Redis/database in production)
conversations: dict[str, Memory] = {}

class AddMessageRequest(BaseModel):
    conversation_id: str
    role: str
    content: str
    memory_type: str = "buffer"  # buffer, summary, vector, hybrid

class GetContextRequest(BaseModel):
    conversation_id: str
    query: Optional[str] = None
    max_tokens: int = 4000

class MessageResponse(BaseModel):
    role: str
    content: str

class ContextResponse(BaseModel):
    messages: list[MessageResponse]
    total_tokens: int
    memory_type: str

def get_or_create_memory(
    conversation_id: str,
    memory_type: str
) -> Memory:
    """Get or create memory for conversation."""
    
    if conversation_id not in conversations:
        if memory_type == "buffer":
            conversations[conversation_id] = BufferMemory(max_messages=20)
        elif memory_type == "token_buffer":
            conversations[conversation_id] = TokenBufferMemory(max_tokens=4000)
        elif memory_type == "summary":
            # Would need LLM client
            conversations[conversation_id] = BufferMemory(max_messages=50)
        elif memory_type == "vector":
            # Would need embedding model
            conversations[conversation_id] = BufferMemory(max_messages=100)
        else:
            conversations[conversation_id] = BufferMemory(max_messages=20)
    
    return conversations[conversation_id]

@app.post("/v1/memory/add")
async def add_message(request: AddMessageRequest):
    """Add message to conversation memory."""
    
    memory = get_or_create_memory(
        request.conversation_id,
        request.memory_type
    )
    
    message = Message(
        role=MessageRole(request.role),
        content=request.content,
        timestamp=datetime.utcnow()
    )
    
    memory.add(message)
    
    return {
        "status": "added",
        "conversation_id": request.conversation_id,
        "message_count": len(memory.get_messages())
    }

@app.post("/v1/memory/context")
async def get_context(request: GetContextRequest) -> ContextResponse:
    """Get conversation context."""
    
    if request.conversation_id not in conversations:
        raise HTTPException(404, "Conversation not found")
    
    memory = conversations[request.conversation_id]
    messages = memory.get_messages(max_tokens=request.max_tokens)
    
    total_tokens = sum(
        m.token_count or len(m.content) // 4
        for m in messages
    )
    
    return ContextResponse(
        messages=[
            MessageResponse(role=m.role.value, content=m.content)
            for m in messages
        ],
        total_tokens=total_tokens,
        memory_type=type(memory).__name__
    )

@app.delete("/v1/memory/{conversation_id}")
async def clear_memory(conversation_id: str):
    """Clear conversation memory."""
    
    if conversation_id in conversations:
        conversations[conversation_id].clear()
        del conversations[conversation_id]
    
    return {"status": "cleared"}

@app.get("/v1/memory/{conversation_id}/stats")
async def get_stats(conversation_id: str):
    """Get memory statistics."""
    
    if conversation_id not in conversations:
        raise HTTPException(404, "Conversation not found")
    
    memory = conversations[conversation_id]
    messages = memory.get_messages()
    
    return {
        "conversation_id": conversation_id,
        "memory_type": type(memory).__name__,
        "message_count": len(messages),
        "total_tokens": sum(
            m.token_count or len(m.content) // 4
            for m in messages
        ),
        "roles": {
            role.value: sum(1 for m in messages if m.role == role)
            for role in MessageRole
        }
    }

@app.get("/v1/conversations")
async def list_conversations():
    """List all conversations."""
    
    return {
        "conversations": [
            {
                "id": conv_id,
                "memory_type": type(memory).__name__,
                "message_count": len(memory.get_messages())
            }
            for conv_id, memory in conversations.items()
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Effective conversation memory is essential for building coherent multi-turn LLM applications. Start with buffer memory for simple use cases—it’s fast and predictable. As conversations grow longer, add summary memory to compress older context while preserving key information. For applications requiring recall of specific past interactions, vector memory enables semantic retrieval of relevant history. The most robust approach combines all three: buffer for recent context, summary for compressed history, and vector for semantic recall. Entity memory adds another dimension by tracking people, concepts, and relationships mentioned throughout the conversation. The key insight is that memory management is a tradeoff between completeness and efficiency—you can’t keep everything, so you must be strategic about what context to preserve. Monitor your token usage, measure response quality, and tune your memory parameters based on your specific use case. A well-designed memory system makes the difference between a chatbot that forgets everything and an assistant that maintains coherent, contextual conversations over extended interactions.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.