Introduction: Single agents hit limits—they can’t be experts at everything, they struggle with complex multi-step tasks, and they lack the ability to parallelize work. Multi-agent systems solve these problems by coordinating multiple specialized agents, each with distinct capabilities and roles. This guide covers practical multi-agent patterns: orchestrator agents that delegate and coordinate, specialist agents with focused expertise, communication protocols for agent collaboration, and coordination strategies for complex workflows. Whether you’re building research assistants, code generation pipelines, or autonomous systems, multi-agent coordination unlocks capabilities that single agents cannot achieve alone.

Agent Definition
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from enum import Enum
from abc import ABC, abstractmethod
import asyncio
class AgentRole(Enum):
"""Agent roles in multi-agent system."""
ORCHESTRATOR = "orchestrator"
RESEARCHER = "researcher"
CODER = "coder"
REVIEWER = "reviewer"
WRITER = "writer"
ANALYST = "analyst"
@dataclass
class AgentConfig:
"""Configuration for an agent."""
name: str
role: AgentRole
model: str = "gpt-4o"
system_prompt: str = ""
tools: list[Callable] = field(default_factory=list)
max_iterations: int = 10
temperature: float = 0.7
@dataclass
class AgentMessage:
"""Message between agents."""
sender: str
recipient: str
content: str
message_type: str = "task" # task, result, question, feedback
metadata: dict = field(default_factory=dict)
@dataclass
class AgentState:
"""Current state of an agent."""
name: str
status: str = "idle" # idle, working, waiting, completed
current_task: str = None
history: list[AgentMessage] = field(default_factory=list)
results: list[Any] = field(default_factory=list)
class BaseAgent(ABC):
"""Base class for all agents."""
def __init__(self, config: AgentConfig, client: Any):
self.config = config
self.client = client
self.state = AgentState(name=config.name)
self._message_queue: asyncio.Queue = asyncio.Queue()
@abstractmethod
async def process(self, message: AgentMessage) -> AgentMessage:
"""Process a message and return response."""
pass
async def send(self, message: AgentMessage):
"""Send message to another agent."""
await self._message_queue.put(message)
async def receive(self) -> AgentMessage:
"""Receive message from queue."""
return await self._message_queue.get()
async def _call_llm(self, messages: list[dict]) -> str:
"""Call LLM with messages."""
response = await self.client.chat.completions.create(
model=self.config.model,
messages=messages,
temperature=self.config.temperature
)
return response.choices[0].message.content
class SpecialistAgent(BaseAgent):
"""Agent specialized in a specific domain."""
async def process(self, message: AgentMessage) -> AgentMessage:
"""Process task with specialization."""
self.state.status = "working"
self.state.current_task = message.content
messages = [
{"role": "system", "content": self.config.system_prompt},
{"role": "user", "content": message.content}
]
result = await self._call_llm(messages)
self.state.status = "completed"
self.state.results.append(result)
return AgentMessage(
sender=self.config.name,
recipient=message.sender,
content=result,
message_type="result"
)
# Specialist agent configurations
RESEARCHER_CONFIG = AgentConfig(
name="researcher",
role=AgentRole.RESEARCHER,
system_prompt="""You are a research specialist. Your job is to:
- Gather and synthesize information
- Find relevant sources and references
- Provide comprehensive research summaries
- Identify key facts and insights
Be thorough and cite your reasoning."""
)
CODER_CONFIG = AgentConfig(
name="coder",
role=AgentRole.CODER,
system_prompt="""You are a coding specialist. Your job is to:
- Write clean, efficient code
- Follow best practices and patterns
- Include error handling and edge cases
- Add helpful comments and documentation
Write production-quality code."""
)
REVIEWER_CONFIG = AgentConfig(
name="reviewer",
role=AgentRole.REVIEWER,
system_prompt="""You are a code review specialist. Your job is to:
- Review code for bugs and issues
- Check for security vulnerabilities
- Suggest improvements and optimizations
- Ensure code follows best practices
Be constructive and specific in feedback."""
)
WRITER_CONFIG = AgentConfig(
name="writer",
role=AgentRole.WRITER,
system_prompt="""You are a technical writing specialist. Your job is to:
- Write clear, concise documentation
- Explain complex concepts simply
- Structure content logically
- Use appropriate technical terminology
Write for your target audience."""
)
Orchestrator Agent
from dataclasses import dataclass
from typing import Any, Optional
import json
@dataclass
class TaskPlan:
"""Plan for executing a complex task."""
goal: str
steps: list[dict] # {agent, task, dependencies}
current_step: int = 0
class OrchestratorAgent(BaseAgent):
"""Agent that coordinates other agents."""
def __init__(
self,
config: AgentConfig,
client: Any,
agents: dict[str, BaseAgent]
):
super().__init__(config, client)
self.agents = agents
self.current_plan: TaskPlan = None
async def process(self, message: AgentMessage) -> AgentMessage:
"""Orchestrate task execution."""
self.state.status = "working"
# Create execution plan
plan = await self._create_plan(message.content)
self.current_plan = plan
# Execute plan
results = await self._execute_plan(plan)
# Synthesize results
final_result = await self._synthesize_results(
plan.goal,
results
)
self.state.status = "completed"
return AgentMessage(
sender=self.config.name,
recipient=message.sender,
content=final_result,
message_type="result"
)
async def _create_plan(self, task: str) -> TaskPlan:
"""Create execution plan for task."""
available_agents = ", ".join(self.agents.keys())
prompt = f"""Create an execution plan for this task.
Task: {task}
Available agents: {available_agents}
Return a JSON plan:
{{
"goal": "the overall goal",
"steps": [
{{"agent": "agent_name", "task": "specific task", "dependencies": []}},
{{"agent": "agent_name", "task": "specific task", "dependencies": [0]}}
]
}}
Dependencies are indices of steps that must complete first.
Parallelize where possible."""
messages = [
{"role": "system", "content": self.config.system_prompt},
{"role": "user", "content": prompt}
]
response = await self._call_llm(messages)
# Parse plan
plan_data = json.loads(response)
return TaskPlan(
goal=plan_data["goal"],
steps=plan_data["steps"]
)
async def _execute_plan(self, plan: TaskPlan) -> list[dict]:
"""Execute plan steps."""
results = [None] * len(plan.steps)
completed = set()
while len(completed) < len(plan.steps):
# Find ready steps
ready = []
for i, step in enumerate(plan.steps):
if i in completed:
continue
deps = step.get("dependencies", [])
if all(d in completed for d in deps):
ready.append(i)
# Execute ready steps in parallel
tasks = []
for i in ready:
step = plan.steps[i]
# Include dependency results in task
dep_context = ""
for dep_idx in step.get("dependencies", []):
dep_context += f"\nPrevious result: {results[dep_idx]}"
task_content = step["task"] + dep_context
tasks.append(self._execute_step(
step["agent"],
task_content,
i
))
step_results = await asyncio.gather(*tasks)
for i, result in step_results:
results[i] = result
completed.add(i)
return results
async def _execute_step(
self,
agent_name: str,
task: str,
step_idx: int
) -> tuple[int, str]:
"""Execute a single step."""
agent = self.agents.get(agent_name)
if not agent:
return step_idx, f"Error: Agent {agent_name} not found"
message = AgentMessage(
sender=self.config.name,
recipient=agent_name,
content=task,
message_type="task"
)
response = await agent.process(message)
return step_idx, response.content
async def _synthesize_results(
self,
goal: str,
results: list[str]
) -> str:
"""Synthesize results into final output."""
results_text = "\n\n".join([
f"Step {i+1} result:\n{r}"
for i, r in enumerate(results)
])
prompt = f"""Synthesize these results into a final response.
Goal: {goal}
Results:
{results_text}
Provide a comprehensive final answer that addresses the original goal."""
messages = [
{"role": "system", "content": "You synthesize multiple agent outputs into coherent responses."},
{"role": "user", "content": prompt}
]
return await self._call_llm(messages)
ORCHESTRATOR_CONFIG = AgentConfig(
name="orchestrator",
role=AgentRole.ORCHESTRATOR,
system_prompt="""You are an orchestrator agent. Your job is to:
- Break complex tasks into subtasks
- Assign tasks to appropriate specialist agents
- Coordinate parallel execution where possible
- Synthesize results into coherent outputs
Plan efficiently and delegate effectively."""
)
Communication Protocols
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
import asyncio
from collections import defaultdict
class MessagePriority(Enum):
"""Message priority levels."""
LOW = 0
NORMAL = 1
HIGH = 2
URGENT = 3
@dataclass
class PrioritizedMessage:
"""Message with priority."""
message: AgentMessage
priority: MessagePriority = MessagePriority.NORMAL
timestamp: float = 0
def __lt__(self, other):
return (self.priority.value, -self.timestamp) < (other.priority.value, -other.timestamp)
class MessageBus:
"""Central message bus for agent communication."""
def __init__(self):
self._queues: dict[str, asyncio.PriorityQueue] = defaultdict(asyncio.PriorityQueue)
self._subscribers: dict[str, list[Callable]] = defaultdict(list)
self._message_log: list[AgentMessage] = []
async def send(
self,
message: AgentMessage,
priority: MessagePriority = MessagePriority.NORMAL
):
"""Send message to recipient."""
self._message_log.append(message)
prioritized = PrioritizedMessage(
message=message,
priority=priority,
timestamp=asyncio.get_event_loop().time()
)
await self._queues[message.recipient].put(prioritized)
# Notify subscribers
for callback in self._subscribers[message.recipient]:
await callback(message)
async def receive(self, agent_name: str) -> AgentMessage:
"""Receive next message for agent."""
prioritized = await self._queues[agent_name].get()
return prioritized.message
def subscribe(self, agent_name: str, callback: Callable):
"""Subscribe to messages for an agent."""
self._subscribers[agent_name].append(callback)
def get_history(self, agent_name: str = None) -> list[AgentMessage]:
"""Get message history."""
if agent_name:
return [
m for m in self._message_log
if m.sender == agent_name or m.recipient == agent_name
]
return self._message_log
class ConversationProtocol:
"""Protocol for multi-turn agent conversations."""
def __init__(self, bus: MessageBus):
self.bus = bus
self._conversations: dict[str, list[AgentMessage]] = {}
async def start_conversation(
self,
initiator: str,
participants: list[str],
topic: str
) -> str:
"""Start a new conversation."""
conv_id = f"conv_{asyncio.get_event_loop().time()}"
self._conversations[conv_id] = []
# Notify participants
for participant in participants:
message = AgentMessage(
sender=initiator,
recipient=participant,
content=f"Starting conversation about: {topic}",
message_type="conversation_start",
metadata={"conversation_id": conv_id}
)
await self.bus.send(message)
return conv_id
async def send_in_conversation(
self,
conv_id: str,
message: AgentMessage
):
"""Send message in conversation context."""
message.metadata["conversation_id"] = conv_id
self._conversations[conv_id].append(message)
await self.bus.send(message)
def get_conversation(self, conv_id: str) -> list[AgentMessage]:
"""Get conversation history."""
return self._conversations.get(conv_id, [])
class ConsensusProtocol:
"""Protocol for reaching consensus among agents."""
def __init__(self, bus: MessageBus, agents: list[str]):
self.bus = bus
self.agents = agents
async def propose(
self,
proposer: str,
proposal: str
) -> dict[str, str]:
"""Propose something and collect votes."""
votes = {}
# Send proposal to all agents
for agent in self.agents:
if agent != proposer:
message = AgentMessage(
sender=proposer,
recipient=agent,
content=proposal,
message_type="proposal"
)
await self.bus.send(message, MessagePriority.HIGH)
# Collect votes (simplified - in practice would use async gathering)
# This is a placeholder for the voting mechanism
return votes
async def vote(
self,
voter: str,
proposer: str,
vote: str, # "approve", "reject", "abstain"
reason: str = None
):
"""Cast a vote on a proposal."""
message = AgentMessage(
sender=voter,
recipient=proposer,
content=vote,
message_type="vote",
metadata={"reason": reason}
)
await self.bus.send(message)
Coordination Patterns
from dataclasses import dataclass
from typing import Any, Optional
import asyncio
class PipelineCoordinator:
"""Coordinate agents in a pipeline pattern."""
def __init__(self, agents: list[BaseAgent]):
self.agents = agents
async def execute(self, initial_input: str) -> str:
"""Execute pipeline sequentially."""
current_input = initial_input
for agent in self.agents:
message = AgentMessage(
sender="pipeline",
recipient=agent.config.name,
content=current_input,
message_type="task"
)
response = await agent.process(message)
current_input = response.content
return current_input
class ParallelCoordinator:
"""Coordinate agents in parallel."""
def __init__(self, agents: list[BaseAgent]):
self.agents = agents
async def execute(self, task: str) -> list[str]:
"""Execute task on all agents in parallel."""
tasks = []
for agent in self.agents:
message = AgentMessage(
sender="parallel",
recipient=agent.config.name,
content=task,
message_type="task"
)
tasks.append(agent.process(message))
responses = await asyncio.gather(*tasks)
return [r.content for r in responses]
class DebateCoordinator:
"""Coordinate agents in a debate pattern."""
def __init__(
self,
agents: list[BaseAgent],
judge: BaseAgent,
max_rounds: int = 3
):
self.agents = agents
self.judge = judge
self.max_rounds = max_rounds
async def debate(self, topic: str) -> str:
"""Run a debate and get final verdict."""
positions = []
# Get initial positions
for agent in self.agents:
message = AgentMessage(
sender="debate",
recipient=agent.config.name,
content=f"State your position on: {topic}",
message_type="task"
)
response = await agent.process(message)
positions.append({
"agent": agent.config.name,
"position": response.content
})
# Debate rounds
for round_num in range(self.max_rounds):
new_positions = []
for i, agent in enumerate(self.agents):
# Show other positions
other_positions = [
p for j, p in enumerate(positions)
if j != i
]
context = "\n".join([
f"{p['agent']}: {p['position']}"
for p in other_positions
])
message = AgentMessage(
sender="debate",
recipient=agent.config.name,
content=f"Round {round_num + 1}. Other positions:\n{context}\n\nRespond and refine your position.",
message_type="task"
)
response = await agent.process(message)
new_positions.append({
"agent": agent.config.name,
"position": response.content
})
positions = new_positions
# Judge decides
all_positions = "\n\n".join([
f"{p['agent']}:\n{p['position']}"
for p in positions
])
judge_message = AgentMessage(
sender="debate",
recipient=self.judge.config.name,
content=f"Topic: {topic}\n\nFinal positions:\n{all_positions}\n\nProvide your verdict.",
message_type="task"
)
verdict = await self.judge.process(judge_message)
return verdict.content
class HierarchicalCoordinator:
"""Coordinate agents in a hierarchy."""
def __init__(
self,
leader: BaseAgent,
teams: dict[str, list[BaseAgent]]
):
self.leader = leader
self.teams = teams
async def execute(self, task: str) -> str:
"""Execute task through hierarchy."""
# Leader creates team assignments
team_names = ", ".join(self.teams.keys())
assignment_message = AgentMessage(
sender="hierarchy",
recipient=self.leader.config.name,
content=f"Task: {task}\n\nAvailable teams: {team_names}\n\nAssign subtasks to teams.",
message_type="task"
)
assignments = await self.leader.process(assignment_message)
# Parse and execute assignments (simplified)
team_results = {}
for team_name, agents in self.teams.items():
# Execute within team
coordinator = ParallelCoordinator(agents)
results = await coordinator.execute(task)
team_results[team_name] = results
# Leader synthesizes
results_text = "\n".join([
f"{team}: {results}"
for team, results in team_results.items()
])
synthesis_message = AgentMessage(
sender="hierarchy",
recipient=self.leader.config.name,
content=f"Team results:\n{results_text}\n\nSynthesize final answer.",
message_type="task"
)
final = await self.leader.process(synthesis_message)
return final.content
Production Multi-Agent Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
# Initialize components
client = None # Initialize with OpenAI client
message_bus = MessageBus()
# Create agents
agents = {}
class CreateAgentRequest(BaseModel):
name: str
role: str
system_prompt: str
model: str = "gpt-4o"
class TaskRequest(BaseModel):
task: str
coordinator: str = "orchestrator" # orchestrator, pipeline, parallel, debate
class MessageRequest(BaseModel):
sender: str
recipient: str
content: str
message_type: str = "task"
@app.post("/v1/agents")
async def create_agent(request: CreateAgentRequest):
"""Create a new agent."""
config = AgentConfig(
name=request.name,
role=AgentRole(request.role),
system_prompt=request.system_prompt,
model=request.model
)
agent = SpecialistAgent(config, client)
agents[request.name] = agent
return {
"name": request.name,
"role": request.role,
"status": "created"
}
@app.post("/v1/execute")
async def execute_task(request: TaskRequest):
"""Execute a task with coordination."""
if request.coordinator == "orchestrator":
orchestrator = OrchestratorAgent(
ORCHESTRATOR_CONFIG,
client,
agents
)
message = AgentMessage(
sender="user",
recipient="orchestrator",
content=request.task,
message_type="task"
)
result = await orchestrator.process(message)
elif request.coordinator == "pipeline":
coordinator = PipelineCoordinator(list(agents.values()))
result_content = await coordinator.execute(request.task)
result = AgentMessage(
sender="pipeline",
recipient="user",
content=result_content,
message_type="result"
)
elif request.coordinator == "parallel":
coordinator = ParallelCoordinator(list(agents.values()))
results = await coordinator.execute(request.task)
result = AgentMessage(
sender="parallel",
recipient="user",
content="\n\n".join(results),
message_type="result"
)
else:
raise HTTPException(400, f"Unknown coordinator: {request.coordinator}")
return {
"task": request.task,
"coordinator": request.coordinator,
"result": result.content
}
@app.post("/v1/messages")
async def send_message(request: MessageRequest):
"""Send a message between agents."""
message = AgentMessage(
sender=request.sender,
recipient=request.recipient,
content=request.content,
message_type=request.message_type
)
await message_bus.send(message)
return {"status": "sent", "message_id": id(message)}
@app.get("/v1/agents/{name}/state")
async def get_agent_state(name: str):
"""Get agent state."""
if name not in agents:
raise HTTPException(404, f"Agent not found: {name}")
agent = agents[name]
return {
"name": agent.state.name,
"status": agent.state.status,
"current_task": agent.state.current_task,
"results_count": len(agent.state.results)
}
@app.get("/v1/messages/history")
async def get_message_history(agent: Optional[str] = None):
"""Get message history."""
history = message_bus.get_history(agent)
return {
"messages": [
{
"sender": m.sender,
"recipient": m.recipient,
"content": m.content[:100] + "..." if len(m.content) > 100 else m.content,
"type": m.message_type
}
for m in history
]
}
@app.get("/health")
async def health():
return {"status": "healthy", "agents": len(agents)}
References
- AutoGen Framework: https://github.com/microsoft/autogen
- CrewAI: https://github.com/joaomdmoura/crewAI
- LangGraph: https://github.com/langchain-ai/langgraph
- Multi-Agent Systems: https://arxiv.org/abs/2308.08155
Conclusion
Multi-agent systems unlock capabilities that single agents cannot achieve. Start with clear agent definitions—each agent should have a focused role with a specific system prompt that guides its behavior. Use orchestrator agents to coordinate complex tasks, breaking them into subtasks and delegating to specialists. Implement proper communication protocols—message buses, conversation tracking, and consensus mechanisms enable rich agent collaboration. Choose coordination patterns based on your task structure: pipelines for sequential processing, parallel execution for independent subtasks, debates for exploring multiple perspectives, and hierarchies for complex organizational structures. The key insight is that agents work best when they have clear boundaries and well-defined interfaces. Build your multi-agent system incrementally—start with two agents and add complexity as needed. With proper coordination, multiple specialized agents can tackle problems that would overwhelm any single agent.
Discover more from Code, Cloud & Context
Subscribe to get the latest posts sent to your email.