Part 5 of the Microsoft Agent Framework Series
Real-world AI applications rarely consist of single-turn interactions. Customer support, technical assistance, research workflows — they all require ongoing conversations with context preserved across multiple exchanges.
In this article, we’ll explore Agent Threads — the powerful state management system in Microsoft Agent Framework that enables sophisticated multi-turn conversations.
Thread Lifecycle Management
| Phase | Description | Code |
|---|---|---|
| Creation | Start a new conversation | thread = agent.get_new_thread() |
| Active | Ongoing conversation | await agent.run(msg, thread) |
| Persistence | Save for later | thread.save() / storage adapter |
| Resume | Continue later | thread = await storage.load(id) |
| Cleanup | End session | thread.clear() |
Context Window Management
As conversations grow, you may exceed the model’s context window. Strategies to manage this:
Summarization Strategy
async def manage_long_conversation(agent, thread, max_messages=20):
"""Summarize old messages when thread gets too long."""
if len(thread.messages) > max_messages:
# Get older messages for summarization
old_messages = thread.messages[:-10] # Keep last 10 intact
old_content = "\n".join([f"{m['role']}: {m['content']}" for m in old_messages])
# Create summary using a separate call
summary_result = await agent.run(
f"Summarize this conversation context concisely: {old_content}"
)
# Replace old messages with summary
thread.messages = [
{"role": "system", "content": f"Previous context summary: {summary_result.text}"}
] + thread.messages[-10:]
print(f"Context summarized. Messages reduced from {max_messages + 1} to {len(thread.messages)}")
return thread
# Usage in conversation loop
async def conversation_with_summarization():
agent = client.create_agent(name="LongConversationBot", instructions="...")
thread = agent.get_new_thread()
while True:
user_input = input("You: ")
if user_input.lower() == "exit":
break
# Manage context before each response
thread = await manage_long_conversation(agent, thread)
result = await agent.run(user_input, thread)
print(f"Assistant: {result.text}").NET / C# Implementation
using Azure.Identity;
using OpenAI;
using Microsoft.Agents.AI;
namespace MAF.Part05.MultiTurn;
///
/// Part 5: Multi-Turn Conversations in .NET
///
public class MultiTurnDemo
{
public static async Task Main(string[] args)
{
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT not set");
var agent = new AzureOpenAIClient(
new Uri(endpoint),
new DefaultAzureCredential())
.GetOpenAIResponseClient("gpt-4o")
.CreateAIAgent(
name: "AssistantBot",
instructions: "You are a helpful assistant. Remember context from the conversation.");
// Create a thread for multi-turn conversation
var thread = agent.GetNewThread();
Console.WriteLine("=== Multi-Turn Conversation Demo ===\n");
// First turn
Console.WriteLine("User: I'm Bob, and I need help with Azure.");
var result1 = await agent.RunAsync("I'm Bob, and I need help with Azure.", thread);
Console.WriteLine($"Assistant: {result1}\n");
// Second turn - agent remembers the context
Console.WriteLine("User: What cloud platform am I asking about?");
var result2 = await agent.RunAsync("What cloud platform am I asking about?", thread);
Console.WriteLine($"Assistant: {result2}\n"); // Will mention Azure
// Third turn - agent still has context
Console.WriteLine("User: Remind me of my name?");
var result3 = await agent.RunAsync("Remind me of my name?", thread);
Console.WriteLine($"Assistant: {result3}\n"); // Will say Bob
Console.WriteLine("=== Demo Complete ===");
}
}
Thread Persistence
For production applications, persist threads to storage:
import json
import redis
from datetime import datetime
from typing import Optional
class RedisThreadStore:
"""Persist agent threads to Redis for recovery and session management."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.ttl = 86400 * 7 # 7 days expiration
async def save_thread(self, session_id: str, thread) -> None:
"""Save thread state to Redis."""
data = {
"messages": thread.messages,
"metadata": {
"created": datetime.now().isoformat(),
"message_count": len(thread.messages),
"last_updated": datetime.now().isoformat()
}
}
self.redis.setex(
f"agent:thread:{session_id}",
self.ttl,
json.dumps(data)
)
print(f"Thread saved: {session_id} ({len(thread.messages)} messages)")
async def load_thread(self, session_id: str, agent) -> Optional[object]:
"""Load thread from Redis, or create new if not found."""
data = self.redis.get(f"agent:thread:{session_id}")
if data:
parsed = json.loads(data)
thread = agent.get_new_thread()
thread.messages = parsed["messages"]
print(f"Thread loaded: {session_id} ({len(thread.messages)} messages)")
return thread
print(f"No existing thread found for {session_id}, creating new")
return agent.get_new_thread()
async def delete_thread(self, session_id: str) -> bool:
"""Delete a thread from Redis."""
result = self.redis.delete(f"agent:thread:{session_id}")
return result > 0
async def list_sessions(self, pattern: str = "agent:thread:*") -> list:
"""List all active session IDs."""
keys = self.redis.keys(pattern)
return [k.decode().split(":")[-1] for k in keys]
# Usage example
async def persistent_conversation():
store = RedisThreadStore()
agent = client.create_agent(name="PersistentBot", instructions="...")
session_id = "user-12345"
# Load existing thread or create new
thread = await store.load_thread(session_id, agent)
# Run conversation
result = await agent.run("Continue where we left off", thread)
print(f"Assistant: {result.text}")
# Save after each interaction
await store.save_thread(session_id, thread).NET / C# Implementation
using Microsoft.Agents.AI;
public async Task ManageLongConversation(IAgent agent, IThread thread, int maxMessages = 20)
{
// Retrieve messages (hypothetical accessor)
var messages = await thread.GetMessagesAsync();
if (messages.Count > maxMessages)
{
Console.WriteLine($"Summarizing history... ({messages.Count} messages)");
// Keep last 10 messages intact
var recentMessages = messages.TakeLast(10).ToList();
var olderMessages = messages.Take(messages.Count - 10).ToList();
// Create summarization prompt
var oldContent = string.Join("\n", olderMessages.Select(m => $"{m.Role}: {m.Content}"));
var summaryPrompt = $"Summarize this conversation context concisely:\n{oldContent}";
// Generate summary using the agent
var summaryResult = await agent.RunAsync(summaryPrompt);
// Update Thread: Clear and Replace
await thread.ClearAsync();
await thread.AddMessageAsync(new Message(Role.System, $"Previous context summary: {summaryResult}"));
// Re-add recent messages
foreach(var msg in recentMessages)
{
await thread.AddMessageAsync(msg);
}
}
}
Human-in-the-Loop Patterns
Threads enable approval workflows where human review is required:
import asyncio
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class ApprovalStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
@dataclass
class ActionRequest:
action: str
params: dict
requires_approval: bool
reason: str
# In-memory store for pending approvals (use Redis/DB in production)
pending_approvals = {}
async def check_if_approval_needed(action: str, params: dict) -> ActionRequest:
"""Determine if an action requires human approval."""
high_risk_actions = ["delete_account", "refund_over_100", "modify_permissions"]
requires_approval = action in high_risk_actions
reason = f"Action '{action}' is classified as high-risk" if requires_approval else ""
return ActionRequest(
action=action,
params=params,
requires_approval=requires_approval,
reason=reason
)
async def request_human_approval(
session_id: str,
action_request: ActionRequest,
thread,
thread_store
) -> dict:
"""Pause workflow and request human approval."""
# Save thread state
await thread_store.save_thread(session_id, thread)
# Store pending approval
approval_id = f"approval-{session_id}-{action_request.action}"
pending_approvals[approval_id] = {
"session_id": session_id,
"action": action_request.action,
"params": action_request.params,
"status": ApprovalStatus.PENDING,
"reason": action_request.reason
}
# Notify approver (email, Slack, Teams, etc.)
await notify_approver(approval_id, action_request)
return {
"status": "pending_approval",
"approval_id": approval_id,
"message": f"Action '{action_request.action}' requires approval. ID: {approval_id}"
}
async def notify_approver(approval_id: str, action_request: ActionRequest):
"""Send notification to human approver."""
print(f"\n🔔 APPROVAL REQUIRED")
print(f" ID: {approval_id}")
print(f" Action: {action_request.action}")
print(f" Params: {action_request.params}")
print(f" Reason: {action_request.reason}")
print(f" Approve with: handle_approval('{approval_id}', True)")
async def handle_approval(
approval_id: str,
approved: bool,
agent,
thread_store
) -> str:
"""Process human approval decision and resume workflow."""
if approval_id not in pending_approvals:
return f"Approval {approval_id} not found"
approval = pending_approvals[approval_id]
session_id = approval["session_id"]
# Load the saved thread
thread = await thread_store.load_thread(session_id, agent)
if approved:
# Execute the approved action
result = f"Action '{approval['action']}' executed successfully"
approval["status"] = ApprovalStatus.APPROVED
# Inform agent of approval
response = await agent.run(
f"The action was approved and executed. Result: {result}",
thread
)
else:
approval["status"] = ApprovalStatus.REJECTED
response = await agent.run(
"The action was rejected by the approver. Please suggest alternatives.",
thread
)
# Save updated thread
await thread_store.save_thread(session_id, thread)
# Cleanup
del pending_approvals[approval_id]
return response.text.NET / C# Implementation
using Microsoft.Agents.AI;
public class HumanInLoopWorkflow
{
public enum ApprovalStatus { Pending, Approved, Rejected }
public record ApprovalRequest(string Id, string Action, object Data, ApprovalStatus Status = ApprovalStatus.Pending);
private readonly Dictionary<string, ApprovalRequest> _pending = new();
public async Task<string> ProcessAction(object agent, string request, object thread)
{
// ... (Implementation detailing approval logic)
return "NEEDS_APPROVAL";
}
}
.NET / C# Implementation
using System.Text.Json;
using StackExchange.Redis;
namespace MAF.Part05.Persistence;
///
/// Part 5: Redis Thread Persistence in .NET
///
public class RedisThreadStore
{
private readonly IDatabase _redis;
private readonly TimeSpan _ttl = TimeSpan.FromDays(7);
public RedisThreadStore(string connectionString = "localhost:6379")
{
var connection = ConnectionMultiplexer.Connect(connectionString);
_redis = connection.GetDatabase();
}
public async Task SaveThreadAsync(string sessionId, object thread)
{
var data = new
{
Messages = GetMessages(thread),
Metadata = new
{
Created = DateTime.UtcNow,
LastUpdated = DateTime.UtcNow
}
};
var json = JsonSerializer.Serialize(data);
await _redis.StringSetAsync($"agent:thread:{sessionId}", json, _ttl);
Console.WriteLine($"Thread saved: {sessionId}");
}
public async Task
Best Practices
| Practice | Recommendation |
|---|---|
| Thread per session | Create one thread per user session, not per message |
| Context limits | Monitor message count; summarize when approaching limits |
| Persistence | Use Redis, Cosmos DB, or SQL for production |
| TTL | Set expiration for inactive threads (e.g., 7 days) |
| Cleanup | Clear threads on logout or session end |
📦 Source Code
All code examples from this article series are available on GitHub:
👉 https://github.com/nithinmohantk/microsoft-agent-framework-series-examples
Clone the repository to follow along:
git clone https://github.com/nithinmohantk/microsoft-agent-framework-series-examples.git
cd microsoft-agent-framework-series-examples
Human-in-the-Loop Workflow (C#)
Series Navigation
- Part 1: Introduction
- Part 2: First Agent (.NET)
- Part 3: First Agent (Python)
- Part 4: Tools & Function Calling
- Part 5: Multi-Turn Conversations & Threads ← You are here
- Part 6: Workflows — Coming next
References
- Microsoft Agent Framework GitHub
- Multi-Turn Conversation Documentation
- Agent Memory & Context Providers
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.