Introduction: LLMs are stateless—each request starts fresh with no memory of previous interactions. Building conversational applications requires implementing memory systems that maintain context across turns while staying within token limits. The challenge is balancing completeness (keeping all relevant context) with efficiency (not wasting tokens on irrelevant history). This guide covers practical memory patterns: buffer memory for recent messages, summary memory for compressing long conversations, vector memory for semantic retrieval of relevant past context, and hybrid approaches that combine multiple strategies. Whether you’re building a chatbot, assistant, or multi-turn reasoning system, effective memory management is essential for coherent, contextual responses.

Buffer Memory
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from abc import ABC, abstractmethod
from enum import Enum
class MessageRole(Enum):
"""Message roles."""
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
FUNCTION = "function"
TOOL = "tool"
@dataclass
class Message:
"""A conversation message."""
role: MessageRole
content: str
timestamp: datetime = field(default_factory=datetime.utcnow)
metadata: dict = field(default_factory=dict)
token_count: int = 0
def to_dict(self) -> dict:
"""Convert to API format."""
return {
"role": self.role.value,
"content": self.content
}
class Memory(ABC):
"""Abstract memory interface."""
@abstractmethod
def add(self, message: Message) -> None:
"""Add message to memory."""
pass
@abstractmethod
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get messages from memory."""
pass
@abstractmethod
def clear(self) -> None:
"""Clear memory."""
pass
class BufferMemory(Memory):
"""Simple buffer memory - keeps last N messages."""
def __init__(self, max_messages: int = 20):
self.max_messages = max_messages
self._messages: list[Message] = []
def add(self, message: Message) -> None:
"""Add message, removing oldest if at capacity."""
self._messages.append(message)
# Remove oldest messages if over limit
while len(self._messages) > self.max_messages:
self._messages.pop(0)
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get all buffered messages."""
if max_tokens is None:
return list(self._messages)
# Return messages that fit within token limit
result = []
total_tokens = 0
for msg in reversed(self._messages):
if total_tokens + msg.token_count > max_tokens:
break
result.insert(0, msg)
total_tokens += msg.token_count
return result
def clear(self) -> None:
"""Clear buffer."""
self._messages.clear()
class TokenBufferMemory(Memory):
"""Buffer memory with token limit."""
def __init__(
self,
max_tokens: int = 4000,
tokenizer: Any = None
):
self.max_tokens = max_tokens
self.tokenizer = tokenizer
self._messages: list[Message] = []
self._total_tokens = 0
def _count_tokens(self, text: str) -> int:
"""Count tokens in text."""
if self.tokenizer:
return len(self.tokenizer.encode(text))
# Rough estimate: 4 chars per token
return len(text) // 4
def add(self, message: Message) -> None:
"""Add message, removing oldest to stay within token limit."""
# Count tokens if not already set
if message.token_count == 0:
message.token_count = self._count_tokens(message.content)
self._messages.append(message)
self._total_tokens += message.token_count
# Remove oldest messages to stay within limit
while self._total_tokens > self.max_tokens and len(self._messages) > 1:
removed = self._messages.pop(0)
self._total_tokens -= removed.token_count
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get messages within token limit."""
limit = min(max_tokens or self.max_tokens, self.max_tokens)
result = []
total = 0
for msg in reversed(self._messages):
if total + msg.token_count > limit:
break
result.insert(0, msg)
total += msg.token_count
return result
def clear(self) -> None:
"""Clear buffer."""
self._messages.clear()
self._total_tokens = 0
class WindowBufferMemory(Memory):
"""Sliding window buffer with configurable overlap."""
def __init__(
self,
window_size: int = 10,
overlap: int = 2
):
self.window_size = window_size
self.overlap = overlap
self._messages: list[Message] = []
self._windows: list[list[Message]] = []
def add(self, message: Message) -> None:
"""Add message to current window."""
self._messages.append(message)
# Create new window when current is full
if len(self._messages) >= self.window_size:
self._windows.append(self._messages[:-self.overlap])
self._messages = self._messages[-self.overlap:]
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get current window messages."""
return list(self._messages)
def get_all_windows(self) -> list[list[Message]]:
"""Get all windows including current."""
return self._windows + [self._messages]
def clear(self) -> None:
"""Clear all windows."""
self._messages.clear()
self._windows.clear()
Summary Memory
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
@dataclass
class ConversationSummary:
"""A conversation summary."""
content: str
message_count: int
created_at: datetime
token_count: int = 0
class SummaryMemory(Memory):
"""Memory that summarizes old messages."""
def __init__(
self,
llm_client: Any,
model: str = "gpt-4o-mini",
buffer_size: int = 10,
summary_max_tokens: int = 500
):
self.llm_client = llm_client
self.model = model
self.buffer_size = buffer_size
self.summary_max_tokens = summary_max_tokens
self._buffer: list[Message] = []
self._summary: Optional[ConversationSummary] = None
def add(self, message: Message) -> None:
"""Add message, summarizing when buffer is full."""
self._buffer.append(message)
# Summarize when buffer exceeds limit
if len(self._buffer) > self.buffer_size:
self._summarize_buffer()
def _summarize_buffer(self) -> None:
"""Summarize oldest messages in buffer."""
import asyncio
# Get messages to summarize (keep recent ones)
to_summarize = self._buffer[:-self.buffer_size // 2]
self._buffer = self._buffer[-self.buffer_size // 2:]
# Build summary prompt
existing_summary = ""
if self._summary:
existing_summary = f"Previous summary:\n{self._summary.content}\n\n"
messages_text = "\n".join([
f"{m.role.value}: {m.content}"
for m in to_summarize
])
prompt = f"""{existing_summary}New messages to incorporate:
{messages_text}
Provide a concise summary of the conversation so far, capturing:
- Key topics discussed
- Important decisions or conclusions
- Any pending questions or tasks
- Relevant context for continuing the conversation
Summary:"""
# Generate summary (sync wrapper for async)
loop = asyncio.get_event_loop()
response = loop.run_until_complete(
self.llm_client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.summary_max_tokens
)
)
summary_text = response.choices[0].message.content
self._summary = ConversationSummary(
content=summary_text,
message_count=(self._summary.message_count if self._summary else 0) + len(to_summarize),
created_at=datetime.utcnow(),
token_count=len(summary_text) // 4
)
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get summary + recent messages."""
messages = []
# Add summary as system context
if self._summary:
messages.append(Message(
role=MessageRole.SYSTEM,
content=f"Conversation summary:\n{self._summary.content}",
token_count=self._summary.token_count
))
# Add recent buffer messages
messages.extend(self._buffer)
return messages
def clear(self) -> None:
"""Clear memory."""
self._buffer.clear()
self._summary = None
class IncrementalSummaryMemory(Memory):
"""Memory with incremental summarization."""
def __init__(
self,
llm_client: Any,
model: str = "gpt-4o-mini",
summarize_every: int = 5
):
self.llm_client = llm_client
self.model = model
self.summarize_every = summarize_every
self._messages: list[Message] = []
self._running_summary: str = ""
self._message_count = 0
def add(self, message: Message) -> None:
"""Add message with incremental summarization."""
self._messages.append(message)
self._message_count += 1
# Incrementally update summary
if self._message_count % self.summarize_every == 0:
self._update_summary()
async def _update_summary_async(self) -> None:
"""Update running summary asynchronously."""
recent = self._messages[-self.summarize_every:]
recent_text = "\n".join([
f"{m.role.value}: {m.content}"
for m in recent
])
prompt = f"""Current summary:
{self._running_summary or "No previous summary."}
Recent messages:
{recent_text}
Update the summary to incorporate the recent messages. Keep it concise but comprehensive."""
response = await self.llm_client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=300
)
self._running_summary = response.choices[0].message.content
# Keep only recent messages after summarizing
self._messages = self._messages[-self.summarize_every:]
def _update_summary(self) -> None:
"""Sync wrapper for summary update."""
import asyncio
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(self._update_summary_async())
except RuntimeError:
asyncio.run(self._update_summary_async())
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get summary + recent messages."""
messages = []
if self._running_summary:
messages.append(Message(
role=MessageRole.SYSTEM,
content=f"Conversation context:\n{self._running_summary}"
))
messages.extend(self._messages)
return messages
def clear(self) -> None:
"""Clear memory."""
self._messages.clear()
self._running_summary = ""
self._message_count = 0
Vector Memory
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import hashlib
@dataclass
class MemoryEntry:
"""A memory entry with embedding."""
id: str
message: Message
embedding: list[float] = None
importance: float = 1.0
access_count: int = 0
last_accessed: datetime = None
class VectorMemory(Memory):
"""Memory with semantic retrieval."""
def __init__(
self,
embedding_model: Any,
max_entries: int = 1000,
retrieval_k: int = 5
):
self.embedding_model = embedding_model
self.max_entries = max_entries
self.retrieval_k = retrieval_k
self._entries: dict[str, MemoryEntry] = {}
self._recent_buffer: list[Message] = []
self._buffer_size = 5
def _generate_id(self, message: Message) -> str:
"""Generate unique ID for message."""
content = f"{message.role.value}:{message.content}:{message.timestamp.isoformat()}"
return hashlib.md5(content.encode()).hexdigest()
async def add_async(self, message: Message) -> None:
"""Add message with embedding."""
# Add to recent buffer
self._recent_buffer.append(message)
if len(self._recent_buffer) > self._buffer_size:
self._recent_buffer.pop(0)
# Generate embedding
embedding = await self.embedding_model.embed([message.content])
entry = MemoryEntry(
id=self._generate_id(message),
message=message,
embedding=embedding[0],
importance=self._calculate_importance(message)
)
self._entries[entry.id] = entry
# Prune if over limit
if len(self._entries) > self.max_entries:
self._prune_entries()
def add(self, message: Message) -> None:
"""Sync add (embedding deferred)."""
self._recent_buffer.append(message)
if len(self._recent_buffer) > self._buffer_size:
self._recent_buffer.pop(0)
entry = MemoryEntry(
id=self._generate_id(message),
message=message,
embedding=None, # Will be computed on retrieval
importance=self._calculate_importance(message)
)
self._entries[entry.id] = entry
def _calculate_importance(self, message: Message) -> float:
"""Calculate message importance."""
importance = 1.0
# Questions are important
if "?" in message.content:
importance += 0.3
# Longer messages may be more important
if len(message.content) > 200:
importance += 0.2
# User messages slightly more important
if message.role == MessageRole.USER:
importance += 0.1
return min(importance, 2.0)
def _prune_entries(self) -> None:
"""Remove least important entries."""
# Score entries by importance and recency
scored = []
now = datetime.utcnow()
for entry in self._entries.values():
age_hours = (now - entry.message.timestamp).total_seconds() / 3600
recency_score = 1.0 / (1.0 + age_hours * 0.1)
access_score = entry.access_count * 0.1
score = entry.importance + recency_score + access_score
scored.append((entry.id, score))
# Sort by score and keep top entries
scored.sort(key=lambda x: x[1], reverse=True)
keep_ids = set(id for id, _ in scored[:self.max_entries])
self._entries = {
id: entry for id, entry in self._entries.items()
if id in keep_ids
}
async def retrieve(
self,
query: str,
k: int = None
) -> list[Message]:
"""Retrieve relevant messages."""
k = k or self.retrieval_k
# Get query embedding
query_embedding = await self.embedding_model.embed([query])
query_embedding = query_embedding[0]
# Ensure all entries have embeddings
for entry in self._entries.values():
if entry.embedding is None:
emb = await self.embedding_model.embed([entry.message.content])
entry.embedding = emb[0]
# Calculate similarities
similarities = []
for entry in self._entries.values():
sim = self._cosine_similarity(query_embedding, entry.embedding)
similarities.append((entry, sim))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
# Update access counts and return top k
result = []
for entry, _ in similarities[:k]:
entry.access_count += 1
entry.last_accessed = datetime.utcnow()
result.append(entry.message)
return result
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
"""Compute cosine similarity."""
import math
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
return dot / (norm_a * norm_b) if norm_a and norm_b else 0.0
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get recent buffer messages."""
return list(self._recent_buffer)
def clear(self) -> None:
"""Clear memory."""
self._entries.clear()
self._recent_buffer.clear()
class HybridMemory(Memory):
"""Combines buffer, summary, and vector memory."""
def __init__(
self,
llm_client: Any,
embedding_model: Any,
buffer_size: int = 10,
summary_threshold: int = 20,
vector_k: int = 3
):
self.buffer = BufferMemory(max_messages=buffer_size)
self.summary = SummaryMemory(
llm_client=llm_client,
buffer_size=summary_threshold
)
self.vector = VectorMemory(
embedding_model=embedding_model,
retrieval_k=vector_k
)
self._message_count = 0
def add(self, message: Message) -> None:
"""Add to all memory systems."""
self.buffer.add(message)
self.summary.add(message)
self.vector.add(message)
self._message_count += 1
async def get_context(
self,
current_query: str,
max_tokens: int = 4000
) -> list[Message]:
"""Get optimized context combining all memory types."""
messages = []
token_budget = max_tokens
# 1. Add summary (if exists)
summary_messages = self.summary.get_messages()
for msg in summary_messages:
if msg.role == MessageRole.SYSTEM:
messages.append(msg)
token_budget -= msg.token_count or len(msg.content) // 4
# 2. Add relevant vector memories
if self._message_count > 10:
relevant = await self.vector.retrieve(current_query, k=3)
for msg in relevant:
if msg not in [m for m in self.buffer.get_messages()]:
tokens = msg.token_count or len(msg.content) // 4
if token_budget - tokens > 1000: # Reserve for buffer
messages.append(Message(
role=MessageRole.SYSTEM,
content=f"Relevant past context:\n{msg.role.value}: {msg.content}"
))
token_budget -= tokens
# 3. Add recent buffer (always include)
buffer_messages = self.buffer.get_messages()
messages.extend(buffer_messages)
return messages
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get buffer messages (sync version)."""
return self.buffer.get_messages(max_tokens)
def clear(self) -> None:
"""Clear all memory."""
self.buffer.clear()
self.summary.clear()
self.vector.clear()
self._message_count = 0
Entity Memory
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
@dataclass
class Entity:
"""An entity mentioned in conversation."""
name: str
entity_type: str
description: str
mentions: list[datetime] = field(default_factory=list)
attributes: dict = field(default_factory=dict)
relationships: list[tuple[str, str]] = field(default_factory=list)
class EntityMemory(Memory):
"""Memory that tracks entities mentioned in conversation."""
def __init__(
self,
llm_client: Any,
model: str = "gpt-4o-mini"
):
self.llm_client = llm_client
self.model = model
self._entities: dict[str, Entity] = {}
self._buffer: list[Message] = []
self._buffer_size = 10
def add(self, message: Message) -> None:
"""Add message and extract entities."""
self._buffer.append(message)
if len(self._buffer) > self._buffer_size:
self._buffer.pop(0)
# Extract entities asynchronously would be called here
# For sync, we defer extraction
async def extract_entities(self, message: Message) -> list[Entity]:
"""Extract entities from message."""
prompt = f"""Extract entities from this message:
"{message.content}"
For each entity, provide:
- name: The entity name
- type: person, organization, product, concept, location, etc.
- description: Brief description based on context
Return JSON array:
[{{"name": "...", "type": "...", "description": "..."}}]
If no entities, return empty array: []"""
response = await self.llm_client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
import json
import re
content = response.choices[0].message.content
json_match = re.search(r'\[[\s\S]*\]', content)
entities = []
if json_match:
try:
data = json.loads(json_match.group(0))
for item in data:
entity = self._get_or_create_entity(
item["name"],
item["type"],
item.get("description", "")
)
entity.mentions.append(message.timestamp)
entities.append(entity)
except json.JSONDecodeError:
pass
return entities
def _get_or_create_entity(
self,
name: str,
entity_type: str,
description: str
) -> Entity:
"""Get existing entity or create new one."""
key = name.lower()
if key in self._entities:
entity = self._entities[key]
# Update description if new one is longer
if len(description) > len(entity.description):
entity.description = description
return entity
entity = Entity(
name=name,
entity_type=entity_type,
description=description
)
self._entities[key] = entity
return entity
def get_entity(self, name: str) -> Optional[Entity]:
"""Get entity by name."""
return self._entities.get(name.lower())
def get_entities_by_type(self, entity_type: str) -> list[Entity]:
"""Get all entities of a type."""
return [
e for e in self._entities.values()
if e.entity_type == entity_type
]
def get_entity_context(self) -> str:
"""Get entity context for prompts."""
if not self._entities:
return ""
lines = ["Known entities:"]
for entity in self._entities.values():
lines.append(f"- {entity.name} ({entity.type}): {entity.description}")
return "\n".join(lines)
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get buffer with entity context."""
messages = []
entity_context = self.get_entity_context()
if entity_context:
messages.append(Message(
role=MessageRole.SYSTEM,
content=entity_context
))
messages.extend(self._buffer)
return messages
def clear(self) -> None:
"""Clear memory."""
self._entities.clear()
self._buffer.clear()
class KnowledgeGraphMemory(Memory):
"""Memory with knowledge graph structure."""
def __init__(self, llm_client: Any):
self.llm_client = llm_client
self._nodes: dict[str, dict] = {} # id -> node data
self._edges: list[tuple[str, str, str]] = [] # (from, relation, to)
self._buffer: list[Message] = []
def add(self, message: Message) -> None:
"""Add message to buffer."""
self._buffer.append(message)
if len(self._buffer) > 10:
self._buffer.pop(0)
def add_node(
self,
node_id: str,
node_type: str,
properties: dict = None
) -> None:
"""Add node to graph."""
self._nodes[node_id] = {
"type": node_type,
"properties": properties or {}
}
def add_edge(
self,
from_id: str,
relation: str,
to_id: str
) -> None:
"""Add edge to graph."""
self._edges.append((from_id, relation, to_id))
def get_related(self, node_id: str, depth: int = 1) -> dict:
"""Get related nodes up to depth."""
related = {"nodes": {}, "edges": []}
visited = set()
queue = [(node_id, 0)]
while queue:
current_id, current_depth = queue.pop(0)
if current_id in visited or current_depth > depth:
continue
visited.add(current_id)
if current_id in self._nodes:
related["nodes"][current_id] = self._nodes[current_id]
# Find connected edges
for from_id, relation, to_id in self._edges:
if from_id == current_id:
related["edges"].append((from_id, relation, to_id))
if to_id not in visited:
queue.append((to_id, current_depth + 1))
elif to_id == current_id:
related["edges"].append((from_id, relation, to_id))
if from_id not in visited:
queue.append((from_id, current_depth + 1))
return related
def get_graph_context(self, relevant_nodes: list[str] = None) -> str:
"""Get graph context for prompts."""
if not self._nodes:
return ""
lines = ["Knowledge graph:"]
nodes_to_include = relevant_nodes or list(self._nodes.keys())[:10]
for node_id in nodes_to_include:
if node_id in self._nodes:
node = self._nodes[node_id]
lines.append(f"- {node_id} ({node['type']})")
lines.append("\nRelationships:")
for from_id, relation, to_id in self._edges[:20]:
lines.append(f"- {from_id} --[{relation}]--> {to_id}")
return "\n".join(lines)
def get_messages(self, max_tokens: int = None) -> list[Message]:
"""Get buffer with graph context."""
messages = []
graph_context = self.get_graph_context()
if graph_context:
messages.append(Message(
role=MessageRole.SYSTEM,
content=graph_context
))
messages.extend(self._buffer)
return messages
def clear(self) -> None:
"""Clear memory."""
self._nodes.clear()
self._edges.clear()
self._buffer.clear()
Production Memory Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
app = FastAPI()
# In-memory storage (use Redis/database in production)
conversations: dict[str, Memory] = {}
class AddMessageRequest(BaseModel):
conversation_id: str
role: str
content: str
memory_type: str = "buffer" # buffer, summary, vector, hybrid
class GetContextRequest(BaseModel):
conversation_id: str
query: Optional[str] = None
max_tokens: int = 4000
class MessageResponse(BaseModel):
role: str
content: str
class ContextResponse(BaseModel):
messages: list[MessageResponse]
total_tokens: int
memory_type: str
def get_or_create_memory(
conversation_id: str,
memory_type: str
) -> Memory:
"""Get or create memory for conversation."""
if conversation_id not in conversations:
if memory_type == "buffer":
conversations[conversation_id] = BufferMemory(max_messages=20)
elif memory_type == "token_buffer":
conversations[conversation_id] = TokenBufferMemory(max_tokens=4000)
elif memory_type == "summary":
# Would need LLM client
conversations[conversation_id] = BufferMemory(max_messages=50)
elif memory_type == "vector":
# Would need embedding model
conversations[conversation_id] = BufferMemory(max_messages=100)
else:
conversations[conversation_id] = BufferMemory(max_messages=20)
return conversations[conversation_id]
@app.post("/v1/memory/add")
async def add_message(request: AddMessageRequest):
"""Add message to conversation memory."""
memory = get_or_create_memory(
request.conversation_id,
request.memory_type
)
message = Message(
role=MessageRole(request.role),
content=request.content,
timestamp=datetime.utcnow()
)
memory.add(message)
return {
"status": "added",
"conversation_id": request.conversation_id,
"message_count": len(memory.get_messages())
}
@app.post("/v1/memory/context")
async def get_context(request: GetContextRequest) -> ContextResponse:
"""Get conversation context."""
if request.conversation_id not in conversations:
raise HTTPException(404, "Conversation not found")
memory = conversations[request.conversation_id]
messages = memory.get_messages(max_tokens=request.max_tokens)
total_tokens = sum(
m.token_count or len(m.content) // 4
for m in messages
)
return ContextResponse(
messages=[
MessageResponse(role=m.role.value, content=m.content)
for m in messages
],
total_tokens=total_tokens,
memory_type=type(memory).__name__
)
@app.delete("/v1/memory/{conversation_id}")
async def clear_memory(conversation_id: str):
"""Clear conversation memory."""
if conversation_id in conversations:
conversations[conversation_id].clear()
del conversations[conversation_id]
return {"status": "cleared"}
@app.get("/v1/memory/{conversation_id}/stats")
async def get_stats(conversation_id: str):
"""Get memory statistics."""
if conversation_id not in conversations:
raise HTTPException(404, "Conversation not found")
memory = conversations[conversation_id]
messages = memory.get_messages()
return {
"conversation_id": conversation_id,
"memory_type": type(memory).__name__,
"message_count": len(messages),
"total_tokens": sum(
m.token_count or len(m.content) // 4
for m in messages
),
"roles": {
role.value: sum(1 for m in messages if m.role == role)
for role in MessageRole
}
}
@app.get("/v1/conversations")
async def list_conversations():
"""List all conversations."""
return {
"conversations": [
{
"id": conv_id,
"memory_type": type(memory).__name__,
"message_count": len(memory.get_messages())
}
for conv_id, memory in conversations.items()
]
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LangChain Memory: https://python.langchain.com/docs/modules/memory/
- LlamaIndex Memory: https://docs.llamaindex.ai/en/stable/module_guides/deploying/chat_engines/
- MemGPT: https://github.com/cpacker/MemGPT
- Mem0: https://github.com/mem0ai/mem0
Conclusion
Effective conversation memory is essential for building coherent multi-turn LLM applications. Start with buffer memory for simple use cases—it’s fast and predictable. As conversations grow longer, add summary memory to compress older context while preserving key information. For applications requiring recall of specific past interactions, vector memory enables semantic retrieval of relevant history. The most robust approach combines all three: buffer for recent context, summary for compressed history, and vector for semantic recall. Entity memory adds another dimension by tracking people, concepts, and relationships mentioned throughout the conversation. The key insight is that memory management is a tradeoff between completeness and efficiency—you can’t keep everything, so you must be strategic about what context to preserve. Monitor your token usage, measure response quality, and tune your memory parameters based on your specific use case. A well-designed memory system makes the difference between a chatbot that forgets everything and an assistant that maintains coherent, contextual conversations over extended interactions.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.