Introduction: Agentic workflows represent a paradigm shift from simple prompt-response patterns to autonomous, goal-directed AI systems. Unlike traditional LLM applications where the model responds once and stops, agentic systems can plan multi-step solutions, execute actions, observe results, and iterate until the goal is achieved. This guide covers the core patterns that make agentic systems work: planning and decomposition, tool use and action execution, reflection and self-correction, memory and state management, and multi-agent coordination. Whether you’re building a code assistant that can debug its own errors, a research agent that can gather and synthesize information, or an automation system that can handle complex workflows, these patterns provide the foundation for building reliable, capable AI agents.

Planning and Task Decomposition
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
from enum import Enum
class TaskStatus(Enum):
"""Task execution status."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
@dataclass
class Task:
"""A task in the plan."""
id: str
description: str
status: TaskStatus = TaskStatus.PENDING
dependencies: list[str] = field(default_factory=list)
result: Any = None
error: str = None
subtasks: list['Task'] = field(default_factory=list)
@dataclass
class Plan:
"""Execution plan with tasks."""
goal: str
tasks: list[Task] = field(default_factory=list)
current_task_idx: int = 0
def get_next_task(self) -> Optional[Task]:
"""Get next executable task."""
for task in self.tasks:
if task.status == TaskStatus.PENDING:
# Check dependencies
deps_met = all(
self._get_task(dep).status == TaskStatus.COMPLETED
for dep in task.dependencies
)
if deps_met:
return task
return None
def _get_task(self, task_id: str) -> Optional[Task]:
"""Get task by ID."""
for task in self.tasks:
if task.id == task_id:
return task
return None
def is_complete(self) -> bool:
"""Check if plan is complete."""
return all(
t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]
for t in self.tasks
)
class Planner(ABC):
"""Abstract planner interface."""
@abstractmethod
async def create_plan(self, goal: str, context: dict = None) -> Plan:
"""Create execution plan for goal."""
pass
@abstractmethod
async def replan(self, plan: Plan, error: str) -> Plan:
"""Replan after failure."""
pass
class LLMPlanner(Planner):
"""LLM-based planner."""
def __init__(self, llm_client: Any):
self.llm = llm_client
self.planning_prompt = """You are a task planning agent. Given a goal, break it down into concrete, actionable tasks.
Goal: {goal}
Context: {context}
Create a plan with the following format for each task:
- Task ID (t1, t2, etc.)
- Description (what needs to be done)
- Dependencies (which tasks must complete first)
Output as JSON:
{{
"tasks": [
{{"id": "t1", "description": "...", "dependencies": []}},
{{"id": "t2", "description": "...", "dependencies": ["t1"]}}
]
}}"""
async def create_plan(self, goal: str, context: dict = None) -> Plan:
"""Create plan using LLM."""
import json
prompt = self.planning_prompt.format(
goal=goal,
context=json.dumps(context or {})
)
response = await self.llm.complete(prompt)
# Parse response
plan_data = json.loads(response.content)
tasks = [
Task(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", [])
)
for t in plan_data["tasks"]
]
return Plan(goal=goal, tasks=tasks)
async def replan(self, plan: Plan, error: str) -> Plan:
"""Replan after failure."""
replan_prompt = f"""The following plan failed:
Goal: {plan.goal}
Failed task: {self._get_failed_task(plan)}
Error: {error}
Create a revised plan that addresses this failure.
Output as JSON with the same format."""
response = await self.llm.complete(replan_prompt)
import json
plan_data = json.loads(response.content)
tasks = [
Task(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", [])
)
for t in plan_data["tasks"]
]
return Plan(goal=plan.goal, tasks=tasks)
def _get_failed_task(self, plan: Plan) -> str:
"""Get description of failed task."""
for task in plan.tasks:
if task.status == TaskStatus.FAILED:
return f"{task.id}: {task.description}"
return "Unknown"
class HierarchicalPlanner(Planner):
"""Planner that creates hierarchical task structures."""
def __init__(self, llm_client: Any, max_depth: int = 3):
self.llm = llm_client
self.max_depth = max_depth
async def create_plan(self, goal: str, context: dict = None) -> Plan:
"""Create hierarchical plan."""
# Create high-level plan
high_level = await self._plan_level(goal, context, depth=0)
# Decompose each task
for task in high_level.tasks:
if self._needs_decomposition(task):
subtasks = await self._decompose_task(task, depth=1)
task.subtasks = subtasks
return high_level
async def _plan_level(
self,
goal: str,
context: dict,
depth: int
) -> Plan:
"""Plan at a specific level."""
prompt = f"""Create a plan for: {goal}
Level: {"high-level" if depth == 0 else "detailed"}
Max tasks: {5 if depth == 0 else 3}
Output JSON with tasks."""
response = await self.llm.complete(prompt)
import json
data = json.loads(response.content)
return Plan(
goal=goal,
tasks=[
Task(id=t["id"], description=t["description"])
for t in data["tasks"]
]
)
def _needs_decomposition(self, task: Task) -> bool:
"""Check if task needs further decomposition."""
complex_indicators = [
"implement", "build", "create", "develop",
"analyze", "research", "design"
]
return any(
ind in task.description.lower()
for ind in complex_indicators
)
async def _decompose_task(self, task: Task, depth: int) -> list[Task]:
"""Decompose task into subtasks."""
if depth >= self.max_depth:
return []
prompt = f"""Break down this task into smaller steps:
Task: {task.description}
Output 2-4 subtasks as JSON."""
response = await self.llm.complete(prompt)
import json
data = json.loads(response.content)
subtasks = [
Task(
id=f"{task.id}.{i+1}",
description=st["description"]
)
for i, st in enumerate(data.get("subtasks", []))
]
# Recursively decompose
for subtask in subtasks:
if self._needs_decomposition(subtask):
subtask.subtasks = await self._decompose_task(subtask, depth + 1)
return subtasks
async def replan(self, plan: Plan, error: str) -> Plan:
"""Replan with error context."""
return await self.create_plan(
plan.goal,
{"previous_error": error}
)
Tool Use and Action Execution
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from abc import ABC, abstractmethod
import json
@dataclass
class ToolResult:
"""Result from tool execution."""
success: bool
output: Any = None
error: str = None
@dataclass
class Tool:
"""A tool the agent can use."""
name: str
description: str
parameters: dict # JSON Schema
function: Callable
def to_schema(self) -> dict:
"""Convert to function calling schema."""
return {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
class ToolRegistry:
"""Registry of available tools."""
def __init__(self):
self.tools: dict[str, Tool] = {}
def register(self, tool: Tool):
"""Register a tool."""
self.tools[tool.name] = tool
def get(self, name: str) -> Optional[Tool]:
"""Get tool by name."""
return self.tools.get(name)
def list_tools(self) -> list[dict]:
"""List all tools as schemas."""
return [t.to_schema() for t in self.tools.values()]
async def execute(self, name: str, **kwargs) -> ToolResult:
"""Execute a tool."""
tool = self.get(name)
if not tool:
return ToolResult(success=False, error=f"Tool not found: {name}")
try:
result = await tool.function(**kwargs)
return ToolResult(success=True, output=result)
except Exception as e:
return ToolResult(success=False, error=str(e))
class ActionExecutor:
"""Execute actions from agent decisions."""
def __init__(self, tool_registry: ToolRegistry):
self.tools = tool_registry
self.action_history: list[dict] = []
async def execute(self, action: dict) -> ToolResult:
"""Execute an action."""
tool_name = action.get("tool")
parameters = action.get("parameters", {})
# Record action
self.action_history.append({
"tool": tool_name,
"parameters": parameters,
"timestamp": self._now()
})
# Execute
result = await self.tools.execute(tool_name, **parameters)
# Record result
self.action_history[-1]["result"] = {
"success": result.success,
"output": str(result.output)[:500] if result.output else None,
"error": result.error
}
return result
def _now(self) -> str:
"""Get current timestamp."""
from datetime import datetime
return datetime.now().isoformat()
def get_history(self) -> list[dict]:
"""Get action history."""
return self.action_history
class ReActAgent:
"""Reasoning and Acting agent pattern."""
def __init__(
self,
llm_client: Any,
tool_registry: ToolRegistry,
max_iterations: int = 10
):
self.llm = llm_client
self.tools = tool_registry
self.executor = ActionExecutor(tool_registry)
self.max_iterations = max_iterations
self.react_prompt = """You are an AI agent that reasons and acts to accomplish goals.
Available tools:
{tools}
Use this format:
Thought: [your reasoning about what to do next]
Action: [tool name]
Action Input: [JSON parameters]
After receiving an observation, continue with another Thought/Action or provide:
Thought: [final reasoning]
Final Answer: [your response to the user]
Goal: {goal}
{history}"""
async def run(self, goal: str) -> str:
"""Run agent to accomplish goal."""
history = []
for i in range(self.max_iterations):
# Get next action
prompt = self.react_prompt.format(
tools=json.dumps(self.tools.list_tools(), indent=2),
goal=goal,
history="\n".join(history)
)
response = await self.llm.complete(prompt)
content = response.content
# Check for final answer
if "Final Answer:" in content:
return self._extract_final_answer(content)
# Parse and execute action
action = self._parse_action(content)
if action:
result = await self.executor.execute(action)
observation = f"Observation: {result.output if result.success else result.error}"
history.append(content)
history.append(observation)
else:
history.append(content)
history.append("Observation: Could not parse action. Please use the correct format.")
return "Max iterations reached without completing goal."
def _parse_action(self, content: str) -> Optional[dict]:
"""Parse action from LLM response."""
import re
action_match = re.search(r"Action:\s*(\w+)", content)
input_match = re.search(r"Action Input:\s*({.*?})", content, re.DOTALL)
if action_match and input_match:
try:
return {
"tool": action_match.group(1),
"parameters": json.loads(input_match.group(1))
}
except json.JSONDecodeError:
return None
return None
def _extract_final_answer(self, content: str) -> str:
"""Extract final answer from response."""
import re
match = re.search(r"Final Answer:\s*(.*)", content, re.DOTALL)
return match.group(1).strip() if match else content
class FunctionCallingAgent:
"""Agent using native function calling."""
def __init__(
self,
llm_client: Any,
tool_registry: ToolRegistry,
max_iterations: int = 10
):
self.llm = llm_client
self.tools = tool_registry
self.executor = ActionExecutor(tool_registry)
self.max_iterations = max_iterations
async def run(self, goal: str) -> str:
"""Run agent with function calling."""
messages = [
{"role": "system", "content": "You are a helpful assistant that uses tools to accomplish goals."},
{"role": "user", "content": goal}
]
for i in range(self.max_iterations):
response = await self.llm.complete_with_functions(
messages=messages,
functions=self.tools.list_tools()
)
if response.function_call:
# Execute function
result = await self.executor.execute({
"tool": response.function_call.name,
"parameters": json.loads(response.function_call.arguments)
})
# Add to messages
messages.append({
"role": "assistant",
"function_call": {
"name": response.function_call.name,
"arguments": response.function_call.arguments
}
})
messages.append({
"role": "function",
"name": response.function_call.name,
"content": json.dumps(result.output if result.success else {"error": result.error})
})
else:
# No function call - return response
return response.content
return "Max iterations reached."
Reflection and Self-Correction
from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod
@dataclass
class Reflection:
"""Reflection on agent action."""
action: str
result: str
assessment: str
should_retry: bool
improvement: str = None
class Reflector(ABC):
"""Abstract reflector interface."""
@abstractmethod
async def reflect(
self,
action: dict,
result: Any,
goal: str
) -> Reflection:
"""Reflect on action and result."""
pass
class LLMReflector(Reflector):
"""LLM-based reflection."""
def __init__(self, llm_client: Any):
self.llm = llm_client
self.reflection_prompt = """Reflect on this agent action:
Goal: {goal}
Action: {action}
Result: {result}
Assess:
1. Did this action make progress toward the goal?
2. Were there any errors or issues?
3. Should the agent retry with a different approach?
4. What improvement could be made?
Output JSON:
{{
"assessment": "...",
"made_progress": true/false,
"should_retry": true/false,
"improvement": "..."
}}"""
async def reflect(
self,
action: dict,
result: Any,
goal: str
) -> Reflection:
"""Reflect using LLM."""
import json
prompt = self.reflection_prompt.format(
goal=goal,
action=json.dumps(action),
result=str(result)
)
response = await self.llm.complete(prompt)
data = json.loads(response.content)
return Reflection(
action=json.dumps(action),
result=str(result),
assessment=data["assessment"],
should_retry=data["should_retry"],
improvement=data.get("improvement")
)
class SelfCorrectionAgent:
"""Agent with self-correction capability."""
def __init__(
self,
llm_client: Any,
tool_registry: Any,
reflector: Reflector,
max_retries: int = 3
):
self.llm = llm_client
self.tools = tool_registry
self.reflector = reflector
self.max_retries = max_retries
async def execute_with_correction(
self,
task: str,
action: dict
) -> tuple[Any, list[Reflection]]:
"""Execute action with self-correction."""
reflections = []
for attempt in range(self.max_retries):
# Execute action
result = await self.tools.execute(
action["tool"],
**action.get("parameters", {})
)
# Reflect
reflection = await self.reflector.reflect(
action=action,
result=result,
goal=task
)
reflections.append(reflection)
if result.success and not reflection.should_retry:
return result, reflections
# Correct action based on reflection
if reflection.improvement:
action = await self._improve_action(
action,
reflection.improvement
)
return result, reflections
async def _improve_action(
self,
action: dict,
improvement: str
) -> dict:
"""Improve action based on reflection."""
import json
prompt = f"""Improve this action based on feedback:
Current action: {json.dumps(action)}
Improvement needed: {improvement}
Output the improved action as JSON."""
response = await self.llm.complete(prompt)
return json.loads(response.content)
class CriticAgent:
"""Critic that evaluates agent outputs."""
def __init__(self, llm_client: Any):
self.llm = llm_client
async def critique(
self,
goal: str,
output: str,
criteria: list[str] = None
) -> dict:
"""Critique agent output."""
criteria = criteria or [
"Correctness",
"Completeness",
"Clarity",
"Efficiency"
]
prompt = f"""Critique this output:
Goal: {goal}
Output: {output}
Evaluate on these criteria:
{chr(10).join(f"- {c}" for c in criteria)}
For each criterion, provide:
- Score (1-5)
- Explanation
Also provide:
- Overall assessment
- Suggestions for improvement
Output as JSON."""
response = await self.llm.complete(prompt)
import json
return json.loads(response.content)
class IterativeRefinementAgent:
"""Agent that iteratively refines output."""
def __init__(
self,
llm_client: Any,
critic: CriticAgent,
max_iterations: int = 3,
quality_threshold: float = 4.0
):
self.llm = llm_client
self.critic = critic
self.max_iterations = max_iterations
self.quality_threshold = quality_threshold
async def generate(self, goal: str) -> tuple[str, list[dict]]:
"""Generate output with iterative refinement."""
iterations = []
# Initial generation
output = await self._generate_initial(goal)
for i in range(self.max_iterations):
# Critique
critique = await self.critic.critique(goal, output)
iterations.append({
"iteration": i + 1,
"output": output,
"critique": critique
})
# Check quality
avg_score = self._average_score(critique)
if avg_score >= self.quality_threshold:
break
# Refine based on critique
output = await self._refine(goal, output, critique)
return output, iterations
async def _generate_initial(self, goal: str) -> str:
"""Generate initial output."""
response = await self.llm.complete(goal)
return response.content
async def _refine(
self,
goal: str,
output: str,
critique: dict
) -> str:
"""Refine output based on critique."""
import json
prompt = f"""Refine this output based on critique:
Goal: {goal}
Current output: {output}
Critique: {json.dumps(critique)}
Provide an improved version that addresses the feedback."""
response = await self.llm.complete(prompt)
return response.content
def _average_score(self, critique: dict) -> float:
"""Calculate average score from critique."""
scores = []
for key, value in critique.items():
if isinstance(value, dict) and "score" in value:
scores.append(value["score"])
return sum(scores) / len(scores) if scores else 0
Memory and State Management
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from abc import ABC, abstractmethod
import json
@dataclass
class MemoryEntry:
"""Entry in agent memory."""
content: str
memory_type: str # "observation", "action", "thought", "fact"
timestamp: datetime = field(default_factory=datetime.now)
importance: float = 0.5
metadata: dict = field(default_factory=dict)
class AgentMemory(ABC):
"""Abstract agent memory interface."""
@abstractmethod
def add(self, entry: MemoryEntry):
"""Add memory entry."""
pass
@abstractmethod
def retrieve(self, query: str, k: int = 5) -> list[MemoryEntry]:
"""Retrieve relevant memories."""
pass
@abstractmethod
def get_recent(self, n: int = 10) -> list[MemoryEntry]:
"""Get recent memories."""
pass
class ShortTermMemory(AgentMemory):
"""Short-term working memory."""
def __init__(self, max_size: int = 100):
self.max_size = max_size
self.entries: list[MemoryEntry] = []
def add(self, entry: MemoryEntry):
"""Add to short-term memory."""
self.entries.append(entry)
# Evict oldest if at capacity
if len(self.entries) > self.max_size:
self.entries = self.entries[-self.max_size:]
def retrieve(self, query: str, k: int = 5) -> list[MemoryEntry]:
"""Retrieve by recency (simple implementation)."""
return self.entries[-k:]
def get_recent(self, n: int = 10) -> list[MemoryEntry]:
"""Get n most recent entries."""
return self.entries[-n:]
def clear(self):
"""Clear short-term memory."""
self.entries = []
class LongTermMemory(AgentMemory):
"""Long-term memory with semantic retrieval."""
def __init__(self, embedding_model: Any, vector_store: Any):
self.embedder = embedding_model
self.store = vector_store
self.entries: dict[str, MemoryEntry] = {}
def add(self, entry: MemoryEntry):
"""Add to long-term memory."""
import uuid
# Generate ID
entry_id = str(uuid.uuid4())
# Store entry
self.entries[entry_id] = entry
# Add to vector store
embedding = self.embedder.embed(entry.content)
self.store.add(
id=entry_id,
embedding=embedding,
metadata={
"type": entry.memory_type,
"importance": entry.importance,
"timestamp": entry.timestamp.isoformat()
}
)
def retrieve(self, query: str, k: int = 5) -> list[MemoryEntry]:
"""Retrieve semantically similar memories."""
query_embedding = self.embedder.embed(query)
results = self.store.search(query_embedding, k=k)
return [
self.entries[r.id]
for r in results
if r.id in self.entries
]
def get_recent(self, n: int = 10) -> list[MemoryEntry]:
"""Get n most recent entries."""
sorted_entries = sorted(
self.entries.values(),
key=lambda e: e.timestamp,
reverse=True
)
return sorted_entries[:n]
class EpisodicMemory:
"""Memory organized by episodes/sessions."""
def __init__(self):
self.episodes: dict[str, list[MemoryEntry]] = {}
self.current_episode: str = None
def start_episode(self, episode_id: str):
"""Start a new episode."""
self.current_episode = episode_id
self.episodes[episode_id] = []
def add(self, entry: MemoryEntry):
"""Add to current episode."""
if self.current_episode:
self.episodes[self.current_episode].append(entry)
def get_episode(self, episode_id: str) -> list[MemoryEntry]:
"""Get all memories from episode."""
return self.episodes.get(episode_id, [])
def summarize_episode(self, episode_id: str, llm: Any) -> str:
"""Summarize an episode."""
entries = self.get_episode(episode_id)
if not entries:
return ""
content = "\n".join(e.content for e in entries)
prompt = f"""Summarize this agent session:
{content}
Provide a concise summary of:
1. What was accomplished
2. Key decisions made
3. Important information learned"""
response = llm.complete(prompt)
return response.content
class HierarchicalMemory:
"""Memory with multiple levels."""
def __init__(
self,
short_term: ShortTermMemory,
long_term: LongTermMemory,
consolidation_threshold: int = 50
):
self.short_term = short_term
self.long_term = long_term
self.threshold = consolidation_threshold
def add(self, entry: MemoryEntry):
"""Add to short-term, consolidate if needed."""
self.short_term.add(entry)
# Consolidate important memories to long-term
if entry.importance > 0.7:
self.long_term.add(entry)
# Periodic consolidation
if len(self.short_term.entries) >= self.threshold:
self._consolidate()
def retrieve(self, query: str, k: int = 5) -> list[MemoryEntry]:
"""Retrieve from both memory systems."""
short_term_results = self.short_term.retrieve(query, k=k//2)
long_term_results = self.long_term.retrieve(query, k=k//2)
# Combine and deduplicate
all_results = short_term_results + long_term_results
# Sort by relevance (using importance as proxy)
all_results.sort(key=lambda e: e.importance, reverse=True)
return all_results[:k]
def _consolidate(self):
"""Consolidate short-term to long-term."""
# Move important memories to long-term
for entry in self.short_term.entries:
if entry.importance > 0.5:
self.long_term.add(entry)
# Keep only recent in short-term
self.short_term.entries = self.short_term.entries[-20:]
class AgentState:
"""Complete agent state."""
def __init__(self):
self.memory = HierarchicalMemory(
ShortTermMemory(),
None # Would be LongTermMemory with embedder
)
self.current_goal: str = None
self.current_plan: Any = None
self.context: dict = {}
self.tool_results: list[dict] = []
def update(self, key: str, value: Any):
"""Update state."""
self.context[key] = value
def get(self, key: str, default: Any = None) -> Any:
"""Get state value."""
return self.context.get(key, default)
def to_dict(self) -> dict:
"""Serialize state."""
return {
"goal": self.current_goal,
"context": self.context,
"recent_memories": [
{"content": m.content, "type": m.memory_type}
for m in self.memory.short_term.get_recent(10)
]
}
def from_dict(self, data: dict):
"""Deserialize state."""
self.current_goal = data.get("goal")
self.context = data.get("context", {})
Multi-Agent Coordination
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
from enum import Enum
import asyncio
class AgentRole(Enum):
"""Agent roles in multi-agent system."""
COORDINATOR = "coordinator"
WORKER = "worker"
CRITIC = "critic"
SPECIALIST = "specialist"
@dataclass
class AgentMessage:
"""Message between agents."""
sender: str
recipient: str
content: str
message_type: str = "request" # request, response, broadcast
metadata: dict = field(default_factory=dict)
class Agent(ABC):
"""Base agent class."""
def __init__(self, name: str, role: AgentRole):
self.name = name
self.role = role
self.inbox: asyncio.Queue = asyncio.Queue()
@abstractmethod
async def process(self, message: AgentMessage) -> Optional[AgentMessage]:
"""Process incoming message."""
pass
async def send(self, message: AgentMessage, router: 'MessageRouter'):
"""Send message to another agent."""
await router.route(message)
async def receive(self) -> AgentMessage:
"""Receive message from inbox."""
return await self.inbox.get()
class MessageRouter:
"""Route messages between agents."""
def __init__(self):
self.agents: dict[str, Agent] = {}
def register(self, agent: Agent):
"""Register an agent."""
self.agents[agent.name] = agent
async def route(self, message: AgentMessage):
"""Route message to recipient."""
if message.recipient == "broadcast":
# Send to all agents except sender
for name, agent in self.agents.items():
if name != message.sender:
await agent.inbox.put(message)
elif message.recipient in self.agents:
await self.agents[message.recipient].inbox.put(message)
class CoordinatorAgent(Agent):
"""Agent that coordinates other agents."""
def __init__(self, name: str, llm_client: Any):
super().__init__(name, AgentRole.COORDINATOR)
self.llm = llm_client
self.worker_agents: list[str] = []
self.task_assignments: dict[str, str] = {}
def add_worker(self, worker_name: str):
"""Add worker agent."""
self.worker_agents.append(worker_name)
async def delegate(
self,
task: str,
router: MessageRouter
) -> dict[str, str]:
"""Delegate task to workers."""
# Decompose task
subtasks = await self._decompose_task(task)
# Assign to workers
results = {}
for i, subtask in enumerate(subtasks):
worker = self.worker_agents[i % len(self.worker_agents)]
message = AgentMessage(
sender=self.name,
recipient=worker,
content=subtask,
message_type="request"
)
await router.route(message)
self.task_assignments[subtask] = worker
# Collect results
for _ in range(len(subtasks)):
response = await self.receive()
results[response.metadata.get("task")] = response.content
return results
async def _decompose_task(self, task: str) -> list[str]:
"""Decompose task into subtasks."""
prompt = f"""Decompose this task into subtasks for parallel execution:
Task: {task}
Output as JSON list of subtask descriptions."""
response = await self.llm.complete(prompt)
import json
return json.loads(response.content)
async def process(self, message: AgentMessage) -> Optional[AgentMessage]:
"""Process message."""
if message.message_type == "response":
# Store result
return None
return None
class WorkerAgent(Agent):
"""Agent that executes tasks."""
def __init__(self, name: str, llm_client: Any, specialty: str = None):
super().__init__(name, AgentRole.WORKER)
self.llm = llm_client
self.specialty = specialty
async def process(self, message: AgentMessage) -> Optional[AgentMessage]:
"""Process task request."""
if message.message_type == "request":
result = await self._execute_task(message.content)
return AgentMessage(
sender=self.name,
recipient=message.sender,
content=result,
message_type="response",
metadata={"task": message.content}
)
return None
async def _execute_task(self, task: str) -> str:
"""Execute task."""
prompt = f"""Execute this task:
Task: {task}
{"Specialty: " + self.specialty if self.specialty else ""}
Provide the result."""
response = await self.llm.complete(prompt)
return response.content
class DebateSystem:
"""Multi-agent debate for better reasoning."""
def __init__(
self,
agents: list[Agent],
moderator: Agent,
rounds: int = 3
):
self.agents = agents
self.moderator = moderator
self.rounds = rounds
async def debate(self, topic: str) -> str:
"""Run debate on topic."""
positions = {}
# Initial positions
for agent in self.agents:
response = await agent.llm.complete(
f"Take a position on: {topic}\n\nProvide your argument."
)
positions[agent.name] = response.content
# Debate rounds
for round_num in range(self.rounds):
new_positions = {}
for agent in self.agents:
# Show other positions
other_positions = {
name: pos
for name, pos in positions.items()
if name != agent.name
}
prompt = f"""Topic: {topic}
Your current position: {positions[agent.name]}
Other positions:
{self._format_positions(other_positions)}
Round {round_num + 1}: Respond to other arguments and refine your position."""
response = await agent.llm.complete(prompt)
new_positions[agent.name] = response.content
positions = new_positions
# Moderator synthesizes
synthesis = await self.moderator.llm.complete(
f"""Synthesize the debate on: {topic}
Final positions:
{self._format_positions(positions)}
Provide a balanced conclusion."""
)
return synthesis.content
def _format_positions(self, positions: dict[str, str]) -> str:
"""Format positions for prompt."""
return "\n\n".join(
f"{name}:\n{pos}"
for name, pos in positions.items()
)
class ConsensusSystem:
"""Multi-agent consensus building."""
def __init__(self, agents: list[Agent], threshold: float = 0.8):
self.agents = agents
self.threshold = threshold
async def reach_consensus(self, question: str) -> tuple[str, float]:
"""Try to reach consensus on question."""
# Get initial answers
answers = {}
for agent in self.agents:
response = await agent.llm.complete(question)
answers[agent.name] = response.content
# Check agreement
agreement = await self._calculate_agreement(answers)
if agreement >= self.threshold:
# Consensus reached
return self._synthesize_answer(answers), agreement
# Deliberation round
for agent in self.agents:
prompt = f"""Question: {question}
Your answer: {answers[agent.name]}
Other answers:
{self._format_answers(answers, exclude=agent.name)}
Agreement level: {agreement:.0%}
Revise your answer considering others' perspectives."""
response = await agent.llm.complete(prompt)
answers[agent.name] = response.content
agreement = await self._calculate_agreement(answers)
return self._synthesize_answer(answers), agreement
async def _calculate_agreement(self, answers: dict[str, str]) -> float:
"""Calculate agreement level."""
# Simple heuristic: check for similar key phrases
all_answers = list(answers.values())
if len(all_answers) < 2:
return 1.0
# Count common words
word_sets = [set(a.lower().split()) for a in all_answers]
common = word_sets[0]
for ws in word_sets[1:]:
common = common.intersection(ws)
total = word_sets[0]
for ws in word_sets[1:]:
total = total.union(ws)
return len(common) / len(total) if total else 0
def _synthesize_answer(self, answers: dict[str, str]) -> str:
"""Synthesize final answer."""
# Return most common or first answer
return list(answers.values())[0]
def _format_answers(
self,
answers: dict[str, str],
exclude: str = None
) -> str:
"""Format answers for prompt."""
return "\n\n".join(
f"{name}: {ans}"
for name, ans in answers.items()
if name != exclude
)
Production Agentic Service
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import uuid
app = FastAPI()
class TaskRequest(BaseModel):
goal: str
max_iterations: int = 10
use_reflection: bool = True
class TaskStatus(BaseModel):
task_id: str
status: str
progress: float
result: Optional[str] = None
error: Optional[str] = None
# Task storage
tasks: dict[str, TaskStatus] = {}
# Mock components
class MockLLM:
async def complete(self, prompt: str):
class Response:
content = f"Response to: {prompt[:100]}"
await asyncio.sleep(0.1)
return Response()
llm = MockLLM()
async def run_agent_task(task_id: str, goal: str, max_iterations: int):
"""Run agent task in background."""
try:
tasks[task_id].status = "running"
# Simulate agent execution
for i in range(max_iterations):
tasks[task_id].progress = (i + 1) / max_iterations
await asyncio.sleep(0.5)
# Check for completion
if i >= 3: # Simulate early completion
break
tasks[task_id].status = "completed"
tasks[task_id].result = f"Completed goal: {goal}"
except Exception as e:
tasks[task_id].status = "failed"
tasks[task_id].error = str(e)
@app.post("/v1/tasks")
async def create_task(
request: TaskRequest,
background_tasks: BackgroundTasks
) -> dict:
"""Create and start agent task."""
task_id = str(uuid.uuid4())
tasks[task_id] = TaskStatus(
task_id=task_id,
status="pending",
progress=0.0
)
background_tasks.add_task(
run_agent_task,
task_id,
request.goal,
request.max_iterations
)
return {"task_id": task_id, "status": "pending"}
@app.get("/v1/tasks/{task_id}")
async def get_task(task_id: str) -> TaskStatus:
"""Get task status."""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
return tasks[task_id]
@app.post("/v1/tasks/{task_id}/cancel")
async def cancel_task(task_id: str) -> dict:
"""Cancel running task."""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
tasks[task_id].status = "cancelled"
return {"task_id": task_id, "status": "cancelled"}
@app.get("/v1/tasks")
async def list_tasks(status: Optional[str] = None) -> list[TaskStatus]:
"""List all tasks."""
result = list(tasks.values())
if status:
result = [t for t in result if t.status == status]
return result
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- ReAct Pattern: https://arxiv.org/abs/2210.03629
- LangChain Agents: https://python.langchain.com/docs/modules/agents/
- AutoGPT: https://github.com/Significant-Gravitas/AutoGPT
- CrewAI: https://github.com/joaomdmoura/crewAI
Conclusion
Agentic workflows transform LLMs from passive responders into active problem solvers. The key patterns are planning (breaking goals into executable steps), tool use (extending capabilities beyond text generation), reflection (learning from actions and self-correcting), memory (maintaining context across interactions), and multi-agent coordination (leveraging specialized agents for complex tasks). Start simple with the ReAct pattern—it’s surprisingly effective for many use cases. Add reflection when you need reliability; agents that can recognize and correct their mistakes are far more robust than those that can’t. Implement hierarchical memory when conversations span multiple sessions or when agents need to learn from past experiences. Consider multi-agent systems for complex tasks that benefit from specialized expertise or diverse perspectives. The most important insight is that agentic systems require different evaluation criteria than traditional LLM applications—you’re measuring goal completion, not just response quality. Build in observability from the start, log every action and decision, and design for graceful failure. The future of AI applications is agentic, and these patterns provide the foundation for building systems that can truly act on behalf of users.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.