Latest Articles

Agentic Workflow Patterns: Building Autonomous AI Systems That Plan, Act, and Learn

Introduction: Agentic workflows represent a paradigm shift from simple prompt-response patterns to autonomous, goal-directed AI systems. Unlike traditional LLM applications where the model responds once and stops, agentic systems can plan multi-step solutions, execute actions, observe results, and iterate until the goal is achieved. This guide covers the core patterns that make agentic systems work: planning and decomposition, tool use and action execution, reflection and self-correction, memory and state management, and multi-agent coordination. Whether you’re building a code assistant that can debug its own errors, a research agent that can gather and synthesize information, or an automation system that can handle complex workflows, these patterns provide the foundation for building reliable, capable AI agents.

Agentic Workflow Patterns
Agentic Workflow: Planning, Execution, and Reflection Loop

Planning and Task Decomposition

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
from enum import Enum

class TaskStatus(Enum):
    """Task execution status."""
    
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    BLOCKED = "blocked"

@dataclass
class Task:
    """A task in the plan."""
    
    id: str
    description: str
    status: TaskStatus = TaskStatus.PENDING
    dependencies: list[str] = field(default_factory=list)
    result: Any = None
    error: str = None
    subtasks: list['Task'] = field(default_factory=list)

@dataclass
class Plan:
    """Execution plan with tasks."""
    
    goal: str
    tasks: list[Task] = field(default_factory=list)
    current_task_idx: int = 0
    
    def get_next_task(self) -> Optional[Task]:
        """Get next executable task."""
        
        for task in self.tasks:
            if task.status == TaskStatus.PENDING:
                # Check dependencies
                deps_met = all(
                    self._get_task(dep).status == TaskStatus.COMPLETED
                    for dep in task.dependencies
                )
                if deps_met:
                    return task
        return None
    
    def _get_task(self, task_id: str) -> Optional[Task]:
        """Get task by ID."""
        
        for task in self.tasks:
            if task.id == task_id:
                return task
        return None
    
    def is_complete(self) -> bool:
        """Check if plan is complete."""
        
        return all(
            t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]
            for t in self.tasks
        )

class Planner(ABC):
    """Abstract planner interface."""
    
    @abstractmethod
    async def create_plan(self, goal: str, context: dict = None) -> Plan:
        """Create execution plan for goal."""
        pass
    
    @abstractmethod
    async def replan(self, plan: Plan, error: str) -> Plan:
        """Replan after failure."""
        pass

class LLMPlanner(Planner):
    """LLM-based planner."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
        
        self.planning_prompt = """You are a task planning agent. Given a goal, break it down into concrete, actionable tasks.

Goal: {goal}

Context: {context}

Create a plan with the following format for each task:
- Task ID (t1, t2, etc.)
- Description (what needs to be done)
- Dependencies (which tasks must complete first)

Output as JSON:
{{
    "tasks": [
        {{"id": "t1", "description": "...", "dependencies": []}},
        {{"id": "t2", "description": "...", "dependencies": ["t1"]}}
    ]
}}"""
    
    async def create_plan(self, goal: str, context: dict = None) -> Plan:
        """Create plan using LLM."""
        
        import json
        
        prompt = self.planning_prompt.format(
            goal=goal,
            context=json.dumps(context or {})
        )
        
        response = await self.llm.complete(prompt)
        
        # Parse response
        plan_data = json.loads(response.content)
        
        tasks = [
            Task(
                id=t["id"],
                description=t["description"],
                dependencies=t.get("dependencies", [])
            )
            for t in plan_data["tasks"]
        ]
        
        return Plan(goal=goal, tasks=tasks)
    
    async def replan(self, plan: Plan, error: str) -> Plan:
        """Replan after failure."""
        
        replan_prompt = f"""The following plan failed:

Goal: {plan.goal}

Failed task: {self._get_failed_task(plan)}
Error: {error}

Create a revised plan that addresses this failure.
Output as JSON with the same format."""
        
        response = await self.llm.complete(replan_prompt)
        
        import json
        plan_data = json.loads(response.content)
        
        tasks = [
            Task(
                id=t["id"],
                description=t["description"],
                dependencies=t.get("dependencies", [])
            )
            for t in plan_data["tasks"]
        ]
        
        return Plan(goal=plan.goal, tasks=tasks)
    
    def _get_failed_task(self, plan: Plan) -> str:
        """Get description of failed task."""
        
        for task in plan.tasks:
            if task.status == TaskStatus.FAILED:
                return f"{task.id}: {task.description}"
        return "Unknown"

class HierarchicalPlanner(Planner):
    """Planner that creates hierarchical task structures."""
    
    def __init__(self, llm_client: Any, max_depth: int = 3):
        self.llm = llm_client
        self.max_depth = max_depth
    
    async def create_plan(self, goal: str, context: dict = None) -> Plan:
        """Create hierarchical plan."""
        
        # Create high-level plan
        high_level = await self._plan_level(goal, context, depth=0)
        
        # Decompose each task
        for task in high_level.tasks:
            if self._needs_decomposition(task):
                subtasks = await self._decompose_task(task, depth=1)
                task.subtasks = subtasks
        
        return high_level
    
    async def _plan_level(
        self,
        goal: str,
        context: dict,
        depth: int
    ) -> Plan:
        """Plan at a specific level."""
        
        prompt = f"""Create a plan for: {goal}
        
Level: {"high-level" if depth == 0 else "detailed"}
Max tasks: {5 if depth == 0 else 3}

Output JSON with tasks."""
        
        response = await self.llm.complete(prompt)
        
        import json
        data = json.loads(response.content)
        
        return Plan(
            goal=goal,
            tasks=[
                Task(id=t["id"], description=t["description"])
                for t in data["tasks"]
            ]
        )
    
    def _needs_decomposition(self, task: Task) -> bool:
        """Check if task needs further decomposition."""
        
        complex_indicators = [
            "implement", "build", "create", "develop",
            "analyze", "research", "design"
        ]
        
        return any(
            ind in task.description.lower()
            for ind in complex_indicators
        )
    
    async def _decompose_task(self, task: Task, depth: int) -> list[Task]:
        """Decompose task into subtasks."""
        
        if depth >= self.max_depth:
            return []
        
        prompt = f"""Break down this task into smaller steps:
Task: {task.description}

Output 2-4 subtasks as JSON."""
        
        response = await self.llm.complete(prompt)
        
        import json
        data = json.loads(response.content)
        
        subtasks = [
            Task(
                id=f"{task.id}.{i+1}",
                description=st["description"]
            )
            for i, st in enumerate(data.get("subtasks", []))
        ]
        
        # Recursively decompose
        for subtask in subtasks:
            if self._needs_decomposition(subtask):
                subtask.subtasks = await self._decompose_task(subtask, depth + 1)
        
        return subtasks
    
    async def replan(self, plan: Plan, error: str) -> Plan:
        """Replan with error context."""
        
        return await self.create_plan(
            plan.goal,
            {"previous_error": error}
        )

Tool Use and Action Execution

from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from abc import ABC, abstractmethod
import json

@dataclass
class ToolResult:
    """Result from tool execution."""
    
    success: bool
    output: Any = None
    error: str = None

@dataclass
class Tool:
    """A tool the agent can use."""
    
    name: str
    description: str
    parameters: dict  # JSON Schema
    function: Callable
    
    def to_schema(self) -> dict:
        """Convert to function calling schema."""
        
        return {
            "name": self.name,
            "description": self.description,
            "parameters": self.parameters
        }

class ToolRegistry:
    """Registry of available tools."""
    
    def __init__(self):
        self.tools: dict[str, Tool] = {}
    
    def register(self, tool: Tool):
        """Register a tool."""
        self.tools[tool.name] = tool
    
    def get(self, name: str) -> Optional[Tool]:
        """Get tool by name."""
        return self.tools.get(name)
    
    def list_tools(self) -> list[dict]:
        """List all tools as schemas."""
        return [t.to_schema() for t in self.tools.values()]
    
    async def execute(self, name: str, **kwargs) -> ToolResult:
        """Execute a tool."""
        
        tool = self.get(name)
        if not tool:
            return ToolResult(success=False, error=f"Tool not found: {name}")
        
        try:
            result = await tool.function(**kwargs)
            return ToolResult(success=True, output=result)
        except Exception as e:
            return ToolResult(success=False, error=str(e))

class ActionExecutor:
    """Execute actions from agent decisions."""
    
    def __init__(self, tool_registry: ToolRegistry):
        self.tools = tool_registry
        self.action_history: list[dict] = []
    
    async def execute(self, action: dict) -> ToolResult:
        """Execute an action."""
        
        tool_name = action.get("tool")
        parameters = action.get("parameters", {})
        
        # Record action
        self.action_history.append({
            "tool": tool_name,
            "parameters": parameters,
            "timestamp": self._now()
        })
        
        # Execute
        result = await self.tools.execute(tool_name, **parameters)
        
        # Record result
        self.action_history[-1]["result"] = {
            "success": result.success,
            "output": str(result.output)[:500] if result.output else None,
            "error": result.error
        }
        
        return result
    
    def _now(self) -> str:
        """Get current timestamp."""
        from datetime import datetime
        return datetime.now().isoformat()
    
    def get_history(self) -> list[dict]:
        """Get action history."""
        return self.action_history

class ReActAgent:
    """Reasoning and Acting agent pattern."""
    
    def __init__(
        self,
        llm_client: Any,
        tool_registry: ToolRegistry,
        max_iterations: int = 10
    ):
        self.llm = llm_client
        self.tools = tool_registry
        self.executor = ActionExecutor(tool_registry)
        self.max_iterations = max_iterations
        
        self.react_prompt = """You are an AI agent that reasons and acts to accomplish goals.

Available tools:
{tools}

Use this format:
Thought: [your reasoning about what to do next]
Action: [tool name]
Action Input: [JSON parameters]

After receiving an observation, continue with another Thought/Action or provide:
Thought: [final reasoning]
Final Answer: [your response to the user]

Goal: {goal}

{history}"""
    
    async def run(self, goal: str) -> str:
        """Run agent to accomplish goal."""
        
        history = []
        
        for i in range(self.max_iterations):
            # Get next action
            prompt = self.react_prompt.format(
                tools=json.dumps(self.tools.list_tools(), indent=2),
                goal=goal,
                history="\n".join(history)
            )
            
            response = await self.llm.complete(prompt)
            content = response.content
            
            # Check for final answer
            if "Final Answer:" in content:
                return self._extract_final_answer(content)
            
            # Parse and execute action
            action = self._parse_action(content)
            
            if action:
                result = await self.executor.execute(action)
                
                observation = f"Observation: {result.output if result.success else result.error}"
                history.append(content)
                history.append(observation)
            else:
                history.append(content)
                history.append("Observation: Could not parse action. Please use the correct format.")
        
        return "Max iterations reached without completing goal."
    
    def _parse_action(self, content: str) -> Optional[dict]:
        """Parse action from LLM response."""
        
        import re
        
        action_match = re.search(r"Action:\s*(\w+)", content)
        input_match = re.search(r"Action Input:\s*({.*?})", content, re.DOTALL)
        
        if action_match and input_match:
            try:
                return {
                    "tool": action_match.group(1),
                    "parameters": json.loads(input_match.group(1))
                }
            except json.JSONDecodeError:
                return None
        
        return None
    
    def _extract_final_answer(self, content: str) -> str:
        """Extract final answer from response."""
        
        import re
        match = re.search(r"Final Answer:\s*(.*)", content, re.DOTALL)
        return match.group(1).strip() if match else content

class FunctionCallingAgent:
    """Agent using native function calling."""
    
    def __init__(
        self,
        llm_client: Any,
        tool_registry: ToolRegistry,
        max_iterations: int = 10
    ):
        self.llm = llm_client
        self.tools = tool_registry
        self.executor = ActionExecutor(tool_registry)
        self.max_iterations = max_iterations
    
    async def run(self, goal: str) -> str:
        """Run agent with function calling."""
        
        messages = [
            {"role": "system", "content": "You are a helpful assistant that uses tools to accomplish goals."},
            {"role": "user", "content": goal}
        ]
        
        for i in range(self.max_iterations):
            response = await self.llm.complete_with_functions(
                messages=messages,
                functions=self.tools.list_tools()
            )
            
            if response.function_call:
                # Execute function
                result = await self.executor.execute({
                    "tool": response.function_call.name,
                    "parameters": json.loads(response.function_call.arguments)
                })
                
                # Add to messages
                messages.append({
                    "role": "assistant",
                    "function_call": {
                        "name": response.function_call.name,
                        "arguments": response.function_call.arguments
                    }
                })
                messages.append({
                    "role": "function",
                    "name": response.function_call.name,
                    "content": json.dumps(result.output if result.success else {"error": result.error})
                })
            else:
                # No function call - return response
                return response.content
        
        return "Max iterations reached."

Reflection and Self-Correction

from dataclasses import dataclass
from typing import Any, Optional
from abc import ABC, abstractmethod

@dataclass
class Reflection:
    """Reflection on agent action."""
    
    action: str
    result: str
    assessment: str
    should_retry: bool
    improvement: str = None

class Reflector(ABC):
    """Abstract reflector interface."""
    
    @abstractmethod
    async def reflect(
        self,
        action: dict,
        result: Any,
        goal: str
    ) -> Reflection:
        """Reflect on action and result."""
        pass

class LLMReflector(Reflector):
    """LLM-based reflection."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
        
        self.reflection_prompt = """Reflect on this agent action:

Goal: {goal}
Action: {action}
Result: {result}

Assess:
1. Did this action make progress toward the goal?
2. Were there any errors or issues?
3. Should the agent retry with a different approach?
4. What improvement could be made?

Output JSON:
{{
    "assessment": "...",
    "made_progress": true/false,
    "should_retry": true/false,
    "improvement": "..."
}}"""
    
    async def reflect(
        self,
        action: dict,
        result: Any,
        goal: str
    ) -> Reflection:
        """Reflect using LLM."""
        
        import json
        
        prompt = self.reflection_prompt.format(
            goal=goal,
            action=json.dumps(action),
            result=str(result)
        )
        
        response = await self.llm.complete(prompt)
        data = json.loads(response.content)
        
        return Reflection(
            action=json.dumps(action),
            result=str(result),
            assessment=data["assessment"],
            should_retry=data["should_retry"],
            improvement=data.get("improvement")
        )

class SelfCorrectionAgent:
    """Agent with self-correction capability."""
    
    def __init__(
        self,
        llm_client: Any,
        tool_registry: Any,
        reflector: Reflector,
        max_retries: int = 3
    ):
        self.llm = llm_client
        self.tools = tool_registry
        self.reflector = reflector
        self.max_retries = max_retries
    
    async def execute_with_correction(
        self,
        task: str,
        action: dict
    ) -> tuple[Any, list[Reflection]]:
        """Execute action with self-correction."""
        
        reflections = []
        
        for attempt in range(self.max_retries):
            # Execute action
            result = await self.tools.execute(
                action["tool"],
                **action.get("parameters", {})
            )
            
            # Reflect
            reflection = await self.reflector.reflect(
                action=action,
                result=result,
                goal=task
            )
            reflections.append(reflection)
            
            if result.success and not reflection.should_retry:
                return result, reflections
            
            # Correct action based on reflection
            if reflection.improvement:
                action = await self._improve_action(
                    action,
                    reflection.improvement
                )
        
        return result, reflections
    
    async def _improve_action(
        self,
        action: dict,
        improvement: str
    ) -> dict:
        """Improve action based on reflection."""
        
        import json
        
        prompt = f"""Improve this action based on feedback:

Current action: {json.dumps(action)}
Improvement needed: {improvement}

Output the improved action as JSON."""
        
        response = await self.llm.complete(prompt)
        return json.loads(response.content)

class CriticAgent:
    """Critic that evaluates agent outputs."""
    
    def __init__(self, llm_client: Any):
        self.llm = llm_client
    
    async def critique(
        self,
        goal: str,
        output: str,
        criteria: list[str] = None
    ) -> dict:
        """Critique agent output."""
        
        criteria = criteria or [
            "Correctness",
            "Completeness",
            "Clarity",
            "Efficiency"
        ]
        
        prompt = f"""Critique this output:

Goal: {goal}
Output: {output}

Evaluate on these criteria:
{chr(10).join(f"- {c}" for c in criteria)}

For each criterion, provide:
- Score (1-5)
- Explanation

Also provide:
- Overall assessment
- Suggestions for improvement

Output as JSON."""
        
        response = await self.llm.complete(prompt)
        
        import json
        return json.loads(response.content)

class IterativeRefinementAgent:
    """Agent that iteratively refines output."""
    
    def __init__(
        self,
        llm_client: Any,
        critic: CriticAgent,
        max_iterations: int = 3,
        quality_threshold: float = 4.0
    ):
        self.llm = llm_client
        self.critic = critic
        self.max_iterations = max_iterations
        self.quality_threshold = quality_threshold
    
    async def generate(self, goal: str) -> tuple[str, list[dict]]:
        """Generate output with iterative refinement."""
        
        iterations = []
        
        # Initial generation
        output = await self._generate_initial(goal)
        
        for i in range(self.max_iterations):
            # Critique
            critique = await self.critic.critique(goal, output)
            
            iterations.append({
                "iteration": i + 1,
                "output": output,
                "critique": critique
            })
            
            # Check quality
            avg_score = self._average_score(critique)
            
            if avg_score >= self.quality_threshold:
                break
            
            # Refine based on critique
            output = await self._refine(goal, output, critique)
        
        return output, iterations
    
    async def _generate_initial(self, goal: str) -> str:
        """Generate initial output."""
        
        response = await self.llm.complete(goal)
        return response.content
    
    async def _refine(
        self,
        goal: str,
        output: str,
        critique: dict
    ) -> str:
        """Refine output based on critique."""
        
        import json
        
        prompt = f"""Refine this output based on critique:

Goal: {goal}
Current output: {output}
Critique: {json.dumps(critique)}

Provide an improved version that addresses the feedback."""
        
        response = await self.llm.complete(prompt)
        return response.content
    
    def _average_score(self, critique: dict) -> float:
        """Calculate average score from critique."""
        
        scores = []
        for key, value in critique.items():
            if isinstance(value, dict) and "score" in value:
                scores.append(value["score"])
        
        return sum(scores) / len(scores) if scores else 0

Memory and State Management

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from abc import ABC, abstractmethod
import json

@dataclass
class MemoryEntry:
    """Entry in agent memory."""
    
    content: str
    memory_type: str  # "observation", "action", "thought", "fact"
    timestamp: datetime = field(default_factory=datetime.now)
    importance: float = 0.5
    metadata: dict = field(default_factory=dict)

class AgentMemory(ABC):
    """Abstract agent memory interface."""
    
    @abstractmethod
    def add(self, entry: MemoryEntry):
        """Add memory entry."""
        pass
    
    @abstractmethod
    def retrieve(self, query: str, k: int = 5) -> list[MemoryEntry]:
        """Retrieve relevant memories."""
        pass
    
    @abstractmethod
    def get_recent(self, n: int = 10) -> list[MemoryEntry]:
        """Get recent memories."""
        pass

class ShortTermMemory(AgentMemory):
    """Short-term working memory."""
    
    def __init__(self, max_size: int = 100):
        self.max_size = max_size
        self.entries: list[MemoryEntry] = []
    
    def add(self, entry: MemoryEntry):
        """Add to short-term memory."""
        
        self.entries.append(entry)
        
        # Evict oldest if at capacity
        if len(self.entries) > self.max_size:
            self.entries = self.entries[-self.max_size:]
    
    def retrieve(self, query: str, k: int = 5) -> list[MemoryEntry]:
        """Retrieve by recency (simple implementation)."""
        
        return self.entries[-k:]
    
    def get_recent(self, n: int = 10) -> list[MemoryEntry]:
        """Get n most recent entries."""
        
        return self.entries[-n:]
    
    def clear(self):
        """Clear short-term memory."""
        self.entries = []

class LongTermMemory(AgentMemory):
    """Long-term memory with semantic retrieval."""
    
    def __init__(self, embedding_model: Any, vector_store: Any):
        self.embedder = embedding_model
        self.store = vector_store
        self.entries: dict[str, MemoryEntry] = {}
    
    def add(self, entry: MemoryEntry):
        """Add to long-term memory."""
        
        import uuid
        
        # Generate ID
        entry_id = str(uuid.uuid4())
        
        # Store entry
        self.entries[entry_id] = entry
        
        # Add to vector store
        embedding = self.embedder.embed(entry.content)
        self.store.add(
            id=entry_id,
            embedding=embedding,
            metadata={
                "type": entry.memory_type,
                "importance": entry.importance,
                "timestamp": entry.timestamp.isoformat()
            }
        )
    
    def retrieve(self, query: str, k: int = 5) -> list[MemoryEntry]:
        """Retrieve semantically similar memories."""
        
        query_embedding = self.embedder.embed(query)
        results = self.store.search(query_embedding, k=k)
        
        return [
            self.entries[r.id]
            for r in results
            if r.id in self.entries
        ]
    
    def get_recent(self, n: int = 10) -> list[MemoryEntry]:
        """Get n most recent entries."""
        
        sorted_entries = sorted(
            self.entries.values(),
            key=lambda e: e.timestamp,
            reverse=True
        )
        return sorted_entries[:n]

class EpisodicMemory:
    """Memory organized by episodes/sessions."""
    
    def __init__(self):
        self.episodes: dict[str, list[MemoryEntry]] = {}
        self.current_episode: str = None
    
    def start_episode(self, episode_id: str):
        """Start a new episode."""
        
        self.current_episode = episode_id
        self.episodes[episode_id] = []
    
    def add(self, entry: MemoryEntry):
        """Add to current episode."""
        
        if self.current_episode:
            self.episodes[self.current_episode].append(entry)
    
    def get_episode(self, episode_id: str) -> list[MemoryEntry]:
        """Get all memories from episode."""
        
        return self.episodes.get(episode_id, [])
    
    def summarize_episode(self, episode_id: str, llm: Any) -> str:
        """Summarize an episode."""
        
        entries = self.get_episode(episode_id)
        
        if not entries:
            return ""
        
        content = "\n".join(e.content for e in entries)
        
        prompt = f"""Summarize this agent session:

{content}

Provide a concise summary of:
1. What was accomplished
2. Key decisions made
3. Important information learned"""
        
        response = llm.complete(prompt)
        return response.content

class HierarchicalMemory:
    """Memory with multiple levels."""
    
    def __init__(
        self,
        short_term: ShortTermMemory,
        long_term: LongTermMemory,
        consolidation_threshold: int = 50
    ):
        self.short_term = short_term
        self.long_term = long_term
        self.threshold = consolidation_threshold
    
    def add(self, entry: MemoryEntry):
        """Add to short-term, consolidate if needed."""
        
        self.short_term.add(entry)
        
        # Consolidate important memories to long-term
        if entry.importance > 0.7:
            self.long_term.add(entry)
        
        # Periodic consolidation
        if len(self.short_term.entries) >= self.threshold:
            self._consolidate()
    
    def retrieve(self, query: str, k: int = 5) -> list[MemoryEntry]:
        """Retrieve from both memory systems."""
        
        short_term_results = self.short_term.retrieve(query, k=k//2)
        long_term_results = self.long_term.retrieve(query, k=k//2)
        
        # Combine and deduplicate
        all_results = short_term_results + long_term_results
        
        # Sort by relevance (using importance as proxy)
        all_results.sort(key=lambda e: e.importance, reverse=True)
        
        return all_results[:k]
    
    def _consolidate(self):
        """Consolidate short-term to long-term."""
        
        # Move important memories to long-term
        for entry in self.short_term.entries:
            if entry.importance > 0.5:
                self.long_term.add(entry)
        
        # Keep only recent in short-term
        self.short_term.entries = self.short_term.entries[-20:]

class AgentState:
    """Complete agent state."""
    
    def __init__(self):
        self.memory = HierarchicalMemory(
            ShortTermMemory(),
            None  # Would be LongTermMemory with embedder
        )
        self.current_goal: str = None
        self.current_plan: Any = None
        self.context: dict = {}
        self.tool_results: list[dict] = []
    
    def update(self, key: str, value: Any):
        """Update state."""
        self.context[key] = value
    
    def get(self, key: str, default: Any = None) -> Any:
        """Get state value."""
        return self.context.get(key, default)
    
    def to_dict(self) -> dict:
        """Serialize state."""
        
        return {
            "goal": self.current_goal,
            "context": self.context,
            "recent_memories": [
                {"content": m.content, "type": m.memory_type}
                for m in self.memory.short_term.get_recent(10)
            ]
        }
    
    def from_dict(self, data: dict):
        """Deserialize state."""
        
        self.current_goal = data.get("goal")
        self.context = data.get("context", {})

Multi-Agent Coordination

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
from enum import Enum
import asyncio

class AgentRole(Enum):
    """Agent roles in multi-agent system."""
    
    COORDINATOR = "coordinator"
    WORKER = "worker"
    CRITIC = "critic"
    SPECIALIST = "specialist"

@dataclass
class AgentMessage:
    """Message between agents."""
    
    sender: str
    recipient: str
    content: str
    message_type: str = "request"  # request, response, broadcast
    metadata: dict = field(default_factory=dict)

class Agent(ABC):
    """Base agent class."""
    
    def __init__(self, name: str, role: AgentRole):
        self.name = name
        self.role = role
        self.inbox: asyncio.Queue = asyncio.Queue()
    
    @abstractmethod
    async def process(self, message: AgentMessage) -> Optional[AgentMessage]:
        """Process incoming message."""
        pass
    
    async def send(self, message: AgentMessage, router: 'MessageRouter'):
        """Send message to another agent."""
        await router.route(message)
    
    async def receive(self) -> AgentMessage:
        """Receive message from inbox."""
        return await self.inbox.get()

class MessageRouter:
    """Route messages between agents."""
    
    def __init__(self):
        self.agents: dict[str, Agent] = {}
    
    def register(self, agent: Agent):
        """Register an agent."""
        self.agents[agent.name] = agent
    
    async def route(self, message: AgentMessage):
        """Route message to recipient."""
        
        if message.recipient == "broadcast":
            # Send to all agents except sender
            for name, agent in self.agents.items():
                if name != message.sender:
                    await agent.inbox.put(message)
        elif message.recipient in self.agents:
            await self.agents[message.recipient].inbox.put(message)

class CoordinatorAgent(Agent):
    """Agent that coordinates other agents."""
    
    def __init__(self, name: str, llm_client: Any):
        super().__init__(name, AgentRole.COORDINATOR)
        self.llm = llm_client
        self.worker_agents: list[str] = []
        self.task_assignments: dict[str, str] = {}
    
    def add_worker(self, worker_name: str):
        """Add worker agent."""
        self.worker_agents.append(worker_name)
    
    async def delegate(
        self,
        task: str,
        router: MessageRouter
    ) -> dict[str, str]:
        """Delegate task to workers."""
        
        # Decompose task
        subtasks = await self._decompose_task(task)
        
        # Assign to workers
        results = {}
        
        for i, subtask in enumerate(subtasks):
            worker = self.worker_agents[i % len(self.worker_agents)]
            
            message = AgentMessage(
                sender=self.name,
                recipient=worker,
                content=subtask,
                message_type="request"
            )
            
            await router.route(message)
            self.task_assignments[subtask] = worker
        
        # Collect results
        for _ in range(len(subtasks)):
            response = await self.receive()
            results[response.metadata.get("task")] = response.content
        
        return results
    
    async def _decompose_task(self, task: str) -> list[str]:
        """Decompose task into subtasks."""
        
        prompt = f"""Decompose this task into subtasks for parallel execution:

Task: {task}

Output as JSON list of subtask descriptions."""
        
        response = await self.llm.complete(prompt)
        
        import json
        return json.loads(response.content)
    
    async def process(self, message: AgentMessage) -> Optional[AgentMessage]:
        """Process message."""
        
        if message.message_type == "response":
            # Store result
            return None
        
        return None

class WorkerAgent(Agent):
    """Agent that executes tasks."""
    
    def __init__(self, name: str, llm_client: Any, specialty: str = None):
        super().__init__(name, AgentRole.WORKER)
        self.llm = llm_client
        self.specialty = specialty
    
    async def process(self, message: AgentMessage) -> Optional[AgentMessage]:
        """Process task request."""
        
        if message.message_type == "request":
            result = await self._execute_task(message.content)
            
            return AgentMessage(
                sender=self.name,
                recipient=message.sender,
                content=result,
                message_type="response",
                metadata={"task": message.content}
            )
        
        return None
    
    async def _execute_task(self, task: str) -> str:
        """Execute task."""
        
        prompt = f"""Execute this task:

Task: {task}
{"Specialty: " + self.specialty if self.specialty else ""}

Provide the result."""
        
        response = await self.llm.complete(prompt)
        return response.content

class DebateSystem:
    """Multi-agent debate for better reasoning."""
    
    def __init__(
        self,
        agents: list[Agent],
        moderator: Agent,
        rounds: int = 3
    ):
        self.agents = agents
        self.moderator = moderator
        self.rounds = rounds
    
    async def debate(self, topic: str) -> str:
        """Run debate on topic."""
        
        positions = {}
        
        # Initial positions
        for agent in self.agents:
            response = await agent.llm.complete(
                f"Take a position on: {topic}\n\nProvide your argument."
            )
            positions[agent.name] = response.content
        
        # Debate rounds
        for round_num in range(self.rounds):
            new_positions = {}
            
            for agent in self.agents:
                # Show other positions
                other_positions = {
                    name: pos
                    for name, pos in positions.items()
                    if name != agent.name
                }
                
                prompt = f"""Topic: {topic}

Your current position: {positions[agent.name]}

Other positions:
{self._format_positions(other_positions)}

Round {round_num + 1}: Respond to other arguments and refine your position."""
                
                response = await agent.llm.complete(prompt)
                new_positions[agent.name] = response.content
            
            positions = new_positions
        
        # Moderator synthesizes
        synthesis = await self.moderator.llm.complete(
            f"""Synthesize the debate on: {topic}

Final positions:
{self._format_positions(positions)}

Provide a balanced conclusion."""
        )
        
        return synthesis.content
    
    def _format_positions(self, positions: dict[str, str]) -> str:
        """Format positions for prompt."""
        
        return "\n\n".join(
            f"{name}:\n{pos}"
            for name, pos in positions.items()
        )

class ConsensusSystem:
    """Multi-agent consensus building."""
    
    def __init__(self, agents: list[Agent], threshold: float = 0.8):
        self.agents = agents
        self.threshold = threshold
    
    async def reach_consensus(self, question: str) -> tuple[str, float]:
        """Try to reach consensus on question."""
        
        # Get initial answers
        answers = {}
        for agent in self.agents:
            response = await agent.llm.complete(question)
            answers[agent.name] = response.content
        
        # Check agreement
        agreement = await self._calculate_agreement(answers)
        
        if agreement >= self.threshold:
            # Consensus reached
            return self._synthesize_answer(answers), agreement
        
        # Deliberation round
        for agent in self.agents:
            prompt = f"""Question: {question}

Your answer: {answers[agent.name]}

Other answers:
{self._format_answers(answers, exclude=agent.name)}

Agreement level: {agreement:.0%}

Revise your answer considering others' perspectives."""
            
            response = await agent.llm.complete(prompt)
            answers[agent.name] = response.content
        
        agreement = await self._calculate_agreement(answers)
        return self._synthesize_answer(answers), agreement
    
    async def _calculate_agreement(self, answers: dict[str, str]) -> float:
        """Calculate agreement level."""
        
        # Simple heuristic: check for similar key phrases
        all_answers = list(answers.values())
        
        if len(all_answers) < 2:
            return 1.0
        
        # Count common words
        word_sets = [set(a.lower().split()) for a in all_answers]
        
        common = word_sets[0]
        for ws in word_sets[1:]:
            common = common.intersection(ws)
        
        total = word_sets[0]
        for ws in word_sets[1:]:
            total = total.union(ws)
        
        return len(common) / len(total) if total else 0
    
    def _synthesize_answer(self, answers: dict[str, str]) -> str:
        """Synthesize final answer."""
        
        # Return most common or first answer
        return list(answers.values())[0]
    
    def _format_answers(
        self,
        answers: dict[str, str],
        exclude: str = None
    ) -> str:
        """Format answers for prompt."""
        
        return "\n\n".join(
            f"{name}: {ans}"
            for name, ans in answers.items()
            if name != exclude
        )

Production Agentic Service

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
import uuid

app = FastAPI()

class TaskRequest(BaseModel):
    goal: str
    max_iterations: int = 10
    use_reflection: bool = True

class TaskStatus(BaseModel):
    task_id: str
    status: str
    progress: float
    result: Optional[str] = None
    error: Optional[str] = None

# Task storage
tasks: dict[str, TaskStatus] = {}

# Mock components
class MockLLM:
    async def complete(self, prompt: str):
        class Response:
            content = f"Response to: {prompt[:100]}"
        await asyncio.sleep(0.1)
        return Response()

llm = MockLLM()

async def run_agent_task(task_id: str, goal: str, max_iterations: int):
    """Run agent task in background."""
    
    try:
        tasks[task_id].status = "running"
        
        # Simulate agent execution
        for i in range(max_iterations):
            tasks[task_id].progress = (i + 1) / max_iterations
            await asyncio.sleep(0.5)
            
            # Check for completion
            if i >= 3:  # Simulate early completion
                break
        
        tasks[task_id].status = "completed"
        tasks[task_id].result = f"Completed goal: {goal}"
        
    except Exception as e:
        tasks[task_id].status = "failed"
        tasks[task_id].error = str(e)

@app.post("/v1/tasks")
async def create_task(
    request: TaskRequest,
    background_tasks: BackgroundTasks
) -> dict:
    """Create and start agent task."""
    
    task_id = str(uuid.uuid4())
    
    tasks[task_id] = TaskStatus(
        task_id=task_id,
        status="pending",
        progress=0.0
    )
    
    background_tasks.add_task(
        run_agent_task,
        task_id,
        request.goal,
        request.max_iterations
    )
    
    return {"task_id": task_id, "status": "pending"}

@app.get("/v1/tasks/{task_id}")
async def get_task(task_id: str) -> TaskStatus:
    """Get task status."""
    
    if task_id not in tasks:
        raise HTTPException(status_code=404, detail="Task not found")
    
    return tasks[task_id]

@app.post("/v1/tasks/{task_id}/cancel")
async def cancel_task(task_id: str) -> dict:
    """Cancel running task."""
    
    if task_id not in tasks:
        raise HTTPException(status_code=404, detail="Task not found")
    
    tasks[task_id].status = "cancelled"
    
    return {"task_id": task_id, "status": "cancelled"}

@app.get("/v1/tasks")
async def list_tasks(status: Optional[str] = None) -> list[TaskStatus]:
    """List all tasks."""
    
    result = list(tasks.values())
    
    if status:
        result = [t for t in result if t.status == status]
    
    return result

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Agentic workflows transform LLMs from passive responders into active problem solvers. The key patterns are planning (breaking goals into executable steps), tool use (extending capabilities beyond text generation), reflection (learning from actions and self-correcting), memory (maintaining context across interactions), and multi-agent coordination (leveraging specialized agents for complex tasks). Start simple with the ReAct pattern—it’s surprisingly effective for many use cases. Add reflection when you need reliability; agents that can recognize and correct their mistakes are far more robust than those that can’t. Implement hierarchical memory when conversations span multiple sessions or when agents need to learn from past experiences. Consider multi-agent systems for complex tasks that benefit from specialized expertise or diverse perspectives. The most important insight is that agentic systems require different evaluation criteria than traditional LLM applications—you’re measuring goal completion, not just response quality. Build in observability from the start, log every action and decision, and design for graceful failure. The future of AI applications is agentic, and these patterns provide the foundation for building systems that can truly act on behalf of users.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

About the Author

I am a Cloud Architect and Developer passionate about solving complex problems with modern technology. My blog explores the intersection of Cloud Architecture, Artificial Intelligence, and Software Engineering. I share tutorials, deep dives, and insights into building scalable, intelligent systems.

Areas of Expertise

Cloud Architecture (Azure, AWS)
Artificial Intelligence & LLMs
DevOps & Kubernetes
Backend Dev (C#, .NET, Python, Node.js)
© 2025 Code, Cloud & Context | Built by Nithin Mohan TK | Powered by Passion