Multi-Agent Coordination: Building Systems Where AI Agents Collaborate

Introduction: Single agents hit limits—they can’t be experts at everything, they struggle with complex multi-step tasks, and they lack the ability to parallelize work. Multi-agent systems solve these problems by coordinating multiple specialized agents, each with distinct capabilities and roles. This guide covers practical multi-agent patterns: orchestrator agents that delegate and coordinate, specialist agents with focused expertise, communication protocols for agent collaboration, and coordination strategies for complex workflows. Whether you’re building research assistants, code generation pipelines, or autonomous systems, multi-agent coordination unlocks capabilities that single agents cannot achieve alone.

Multi-Agent Coordination
Agent Coordination: Orchestrator Agent, Specialist Agents, Coordinated Results

Agent Definition

from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from enum import Enum
from abc import ABC, abstractmethod
import asyncio

class AgentRole(Enum):
    """Agent roles in multi-agent system."""
    
    ORCHESTRATOR = "orchestrator"
    RESEARCHER = "researcher"
    CODER = "coder"
    REVIEWER = "reviewer"
    WRITER = "writer"
    ANALYST = "analyst"

@dataclass
class AgentConfig:
    """Configuration for an agent."""
    
    name: str
    role: AgentRole
    model: str = "gpt-4o"
    system_prompt: str = ""
    tools: list[Callable] = field(default_factory=list)
    max_iterations: int = 10
    temperature: float = 0.7

@dataclass
class AgentMessage:
    """Message between agents."""
    
    sender: str
    recipient: str
    content: str
    message_type: str = "task"  # task, result, question, feedback
    metadata: dict = field(default_factory=dict)

@dataclass
class AgentState:
    """Current state of an agent."""
    
    name: str
    status: str = "idle"  # idle, working, waiting, completed
    current_task: str = None
    history: list[AgentMessage] = field(default_factory=list)
    results: list[Any] = field(default_factory=list)

class BaseAgent(ABC):
    """Base class for all agents."""
    
    def __init__(self, config: AgentConfig, client: Any):
        self.config = config
        self.client = client
        self.state = AgentState(name=config.name)
        self._message_queue: asyncio.Queue = asyncio.Queue()
    
    @abstractmethod
    async def process(self, message: AgentMessage) -> AgentMessage:
        """Process a message and return response."""
        pass
    
    async def send(self, message: AgentMessage):
        """Send message to another agent."""
        await self._message_queue.put(message)
    
    async def receive(self) -> AgentMessage:
        """Receive message from queue."""
        return await self._message_queue.get()
    
    async def _call_llm(self, messages: list[dict]) -> str:
        """Call LLM with messages."""
        
        response = await self.client.chat.completions.create(
            model=self.config.model,
            messages=messages,
            temperature=self.config.temperature
        )
        
        return response.choices[0].message.content

class SpecialistAgent(BaseAgent):
    """Agent specialized in a specific domain."""
    
    async def process(self, message: AgentMessage) -> AgentMessage:
        """Process task with specialization."""
        
        self.state.status = "working"
        self.state.current_task = message.content
        
        messages = [
            {"role": "system", "content": self.config.system_prompt},
            {"role": "user", "content": message.content}
        ]
        
        result = await self._call_llm(messages)
        
        self.state.status = "completed"
        self.state.results.append(result)
        
        return AgentMessage(
            sender=self.config.name,
            recipient=message.sender,
            content=result,
            message_type="result"
        )

# Specialist agent configurations
RESEARCHER_CONFIG = AgentConfig(
    name="researcher",
    role=AgentRole.RESEARCHER,
    system_prompt="""You are a research specialist. Your job is to:
- Gather and synthesize information
- Find relevant sources and references
- Provide comprehensive research summaries
- Identify key facts and insights

Be thorough and cite your reasoning."""
)

CODER_CONFIG = AgentConfig(
    name="coder",
    role=AgentRole.CODER,
    system_prompt="""You are a coding specialist. Your job is to:
- Write clean, efficient code
- Follow best practices and patterns
- Include error handling and edge cases
- Add helpful comments and documentation

Write production-quality code."""
)

REVIEWER_CONFIG = AgentConfig(
    name="reviewer",
    role=AgentRole.REVIEWER,
    system_prompt="""You are a code review specialist. Your job is to:
- Review code for bugs and issues
- Check for security vulnerabilities
- Suggest improvements and optimizations
- Ensure code follows best practices

Be constructive and specific in feedback."""
)

WRITER_CONFIG = AgentConfig(
    name="writer",
    role=AgentRole.WRITER,
    system_prompt="""You are a technical writing specialist. Your job is to:
- Write clear, concise documentation
- Explain complex concepts simply
- Structure content logically
- Use appropriate technical terminology

Write for your target audience."""
)

Orchestrator Agent

from dataclasses import dataclass
from typing import Any, Optional
import json

@dataclass
class TaskPlan:
    """Plan for executing a complex task."""
    
    goal: str
    steps: list[dict]  # {agent, task, dependencies}
    current_step: int = 0

class OrchestratorAgent(BaseAgent):
    """Agent that coordinates other agents."""
    
    def __init__(
        self,
        config: AgentConfig,
        client: Any,
        agents: dict[str, BaseAgent]
    ):
        super().__init__(config, client)
        self.agents = agents
        self.current_plan: TaskPlan = None
    
    async def process(self, message: AgentMessage) -> AgentMessage:
        """Orchestrate task execution."""
        
        self.state.status = "working"
        
        # Create execution plan
        plan = await self._create_plan(message.content)
        self.current_plan = plan
        
        # Execute plan
        results = await self._execute_plan(plan)
        
        # Synthesize results
        final_result = await self._synthesize_results(
            plan.goal,
            results
        )
        
        self.state.status = "completed"
        
        return AgentMessage(
            sender=self.config.name,
            recipient=message.sender,
            content=final_result,
            message_type="result"
        )
    
    async def _create_plan(self, task: str) -> TaskPlan:
        """Create execution plan for task."""
        
        available_agents = ", ".join(self.agents.keys())
        
        prompt = f"""Create an execution plan for this task.

Task: {task}

Available agents: {available_agents}

Return a JSON plan:
{{
    "goal": "the overall goal",
    "steps": [
        {{"agent": "agent_name", "task": "specific task", "dependencies": []}},
        {{"agent": "agent_name", "task": "specific task", "dependencies": [0]}}
    ]
}}

Dependencies are indices of steps that must complete first.
Parallelize where possible."""
        
        messages = [
            {"role": "system", "content": self.config.system_prompt},
            {"role": "user", "content": prompt}
        ]
        
        response = await self._call_llm(messages)
        
        # Parse plan
        plan_data = json.loads(response)
        
        return TaskPlan(
            goal=plan_data["goal"],
            steps=plan_data["steps"]
        )
    
    async def _execute_plan(self, plan: TaskPlan) -> list[dict]:
        """Execute plan steps."""
        
        results = [None] * len(plan.steps)
        completed = set()
        
        while len(completed) < len(plan.steps):
            # Find ready steps
            ready = []
            for i, step in enumerate(plan.steps):
                if i in completed:
                    continue
                
                deps = step.get("dependencies", [])
                if all(d in completed for d in deps):
                    ready.append(i)
            
            # Execute ready steps in parallel
            tasks = []
            for i in ready:
                step = plan.steps[i]
                
                # Include dependency results in task
                dep_context = ""
                for dep_idx in step.get("dependencies", []):
                    dep_context += f"\nPrevious result: {results[dep_idx]}"
                
                task_content = step["task"] + dep_context
                
                tasks.append(self._execute_step(
                    step["agent"],
                    task_content,
                    i
                ))
            
            step_results = await asyncio.gather(*tasks)
            
            for i, result in step_results:
                results[i] = result
                completed.add(i)
        
        return results
    
    async def _execute_step(
        self,
        agent_name: str,
        task: str,
        step_idx: int
    ) -> tuple[int, str]:
        """Execute a single step."""
        
        agent = self.agents.get(agent_name)
        if not agent:
            return step_idx, f"Error: Agent {agent_name} not found"
        
        message = AgentMessage(
            sender=self.config.name,
            recipient=agent_name,
            content=task,
            message_type="task"
        )
        
        response = await agent.process(message)
        
        return step_idx, response.content
    
    async def _synthesize_results(
        self,
        goal: str,
        results: list[str]
    ) -> str:
        """Synthesize results into final output."""
        
        results_text = "\n\n".join([
            f"Step {i+1} result:\n{r}"
            for i, r in enumerate(results)
        ])
        
        prompt = f"""Synthesize these results into a final response.

Goal: {goal}

Results:
{results_text}

Provide a comprehensive final answer that addresses the original goal."""
        
        messages = [
            {"role": "system", "content": "You synthesize multiple agent outputs into coherent responses."},
            {"role": "user", "content": prompt}
        ]
        
        return await self._call_llm(messages)

ORCHESTRATOR_CONFIG = AgentConfig(
    name="orchestrator",
    role=AgentRole.ORCHESTRATOR,
    system_prompt="""You are an orchestrator agent. Your job is to:
- Break complex tasks into subtasks
- Assign tasks to appropriate specialist agents
- Coordinate parallel execution where possible
- Synthesize results into coherent outputs

Plan efficiently and delegate effectively."""
)

Communication Protocols

from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
import asyncio
from collections import defaultdict

class MessagePriority(Enum):
    """Message priority levels."""
    
    LOW = 0
    NORMAL = 1
    HIGH = 2
    URGENT = 3

@dataclass
class PrioritizedMessage:
    """Message with priority."""
    
    message: AgentMessage
    priority: MessagePriority = MessagePriority.NORMAL
    timestamp: float = 0
    
    def __lt__(self, other):
        return (self.priority.value, -self.timestamp) < (other.priority.value, -other.timestamp)

class MessageBus:
    """Central message bus for agent communication."""
    
    def __init__(self):
        self._queues: dict[str, asyncio.PriorityQueue] = defaultdict(asyncio.PriorityQueue)
        self._subscribers: dict[str, list[Callable]] = defaultdict(list)
        self._message_log: list[AgentMessage] = []
    
    async def send(
        self,
        message: AgentMessage,
        priority: MessagePriority = MessagePriority.NORMAL
    ):
        """Send message to recipient."""
        
        self._message_log.append(message)
        
        prioritized = PrioritizedMessage(
            message=message,
            priority=priority,
            timestamp=asyncio.get_event_loop().time()
        )
        
        await self._queues[message.recipient].put(prioritized)
        
        # Notify subscribers
        for callback in self._subscribers[message.recipient]:
            await callback(message)
    
    async def receive(self, agent_name: str) -> AgentMessage:
        """Receive next message for agent."""
        
        prioritized = await self._queues[agent_name].get()
        return prioritized.message
    
    def subscribe(self, agent_name: str, callback: Callable):
        """Subscribe to messages for an agent."""
        self._subscribers[agent_name].append(callback)
    
    def get_history(self, agent_name: str = None) -> list[AgentMessage]:
        """Get message history."""
        
        if agent_name:
            return [
                m for m in self._message_log
                if m.sender == agent_name or m.recipient == agent_name
            ]
        return self._message_log

class ConversationProtocol:
    """Protocol for multi-turn agent conversations."""
    
    def __init__(self, bus: MessageBus):
        self.bus = bus
        self._conversations: dict[str, list[AgentMessage]] = {}
    
    async def start_conversation(
        self,
        initiator: str,
        participants: list[str],
        topic: str
    ) -> str:
        """Start a new conversation."""
        
        conv_id = f"conv_{asyncio.get_event_loop().time()}"
        self._conversations[conv_id] = []
        
        # Notify participants
        for participant in participants:
            message = AgentMessage(
                sender=initiator,
                recipient=participant,
                content=f"Starting conversation about: {topic}",
                message_type="conversation_start",
                metadata={"conversation_id": conv_id}
            )
            await self.bus.send(message)
        
        return conv_id
    
    async def send_in_conversation(
        self,
        conv_id: str,
        message: AgentMessage
    ):
        """Send message in conversation context."""
        
        message.metadata["conversation_id"] = conv_id
        self._conversations[conv_id].append(message)
        await self.bus.send(message)
    
    def get_conversation(self, conv_id: str) -> list[AgentMessage]:
        """Get conversation history."""
        return self._conversations.get(conv_id, [])

class ConsensusProtocol:
    """Protocol for reaching consensus among agents."""
    
    def __init__(self, bus: MessageBus, agents: list[str]):
        self.bus = bus
        self.agents = agents
    
    async def propose(
        self,
        proposer: str,
        proposal: str
    ) -> dict[str, str]:
        """Propose something and collect votes."""
        
        votes = {}
        
        # Send proposal to all agents
        for agent in self.agents:
            if agent != proposer:
                message = AgentMessage(
                    sender=proposer,
                    recipient=agent,
                    content=proposal,
                    message_type="proposal"
                )
                await self.bus.send(message, MessagePriority.HIGH)
        
        # Collect votes (simplified - in practice would use async gathering)
        # This is a placeholder for the voting mechanism
        
        return votes
    
    async def vote(
        self,
        voter: str,
        proposer: str,
        vote: str,  # "approve", "reject", "abstain"
        reason: str = None
    ):
        """Cast a vote on a proposal."""
        
        message = AgentMessage(
            sender=voter,
            recipient=proposer,
            content=vote,
            message_type="vote",
            metadata={"reason": reason}
        )
        await self.bus.send(message)

Coordination Patterns

from dataclasses import dataclass
from typing import Any, Optional
import asyncio

class PipelineCoordinator:
    """Coordinate agents in a pipeline pattern."""
    
    def __init__(self, agents: list[BaseAgent]):
        self.agents = agents
    
    async def execute(self, initial_input: str) -> str:
        """Execute pipeline sequentially."""
        
        current_input = initial_input
        
        for agent in self.agents:
            message = AgentMessage(
                sender="pipeline",
                recipient=agent.config.name,
                content=current_input,
                message_type="task"
            )
            
            response = await agent.process(message)
            current_input = response.content
        
        return current_input

class ParallelCoordinator:
    """Coordinate agents in parallel."""
    
    def __init__(self, agents: list[BaseAgent]):
        self.agents = agents
    
    async def execute(self, task: str) -> list[str]:
        """Execute task on all agents in parallel."""
        
        tasks = []
        
        for agent in self.agents:
            message = AgentMessage(
                sender="parallel",
                recipient=agent.config.name,
                content=task,
                message_type="task"
            )
            tasks.append(agent.process(message))
        
        responses = await asyncio.gather(*tasks)
        
        return [r.content for r in responses]

class DebateCoordinator:
    """Coordinate agents in a debate pattern."""
    
    def __init__(
        self,
        agents: list[BaseAgent],
        judge: BaseAgent,
        max_rounds: int = 3
    ):
        self.agents = agents
        self.judge = judge
        self.max_rounds = max_rounds
    
    async def debate(self, topic: str) -> str:
        """Run a debate and get final verdict."""
        
        positions = []
        
        # Get initial positions
        for agent in self.agents:
            message = AgentMessage(
                sender="debate",
                recipient=agent.config.name,
                content=f"State your position on: {topic}",
                message_type="task"
            )
            response = await agent.process(message)
            positions.append({
                "agent": agent.config.name,
                "position": response.content
            })
        
        # Debate rounds
        for round_num in range(self.max_rounds):
            new_positions = []
            
            for i, agent in enumerate(self.agents):
                # Show other positions
                other_positions = [
                    p for j, p in enumerate(positions)
                    if j != i
                ]
                
                context = "\n".join([
                    f"{p['agent']}: {p['position']}"
                    for p in other_positions
                ])
                
                message = AgentMessage(
                    sender="debate",
                    recipient=agent.config.name,
                    content=f"Round {round_num + 1}. Other positions:\n{context}\n\nRespond and refine your position.",
                    message_type="task"
                )
                
                response = await agent.process(message)
                new_positions.append({
                    "agent": agent.config.name,
                    "position": response.content
                })
            
            positions = new_positions
        
        # Judge decides
        all_positions = "\n\n".join([
            f"{p['agent']}:\n{p['position']}"
            for p in positions
        ])
        
        judge_message = AgentMessage(
            sender="debate",
            recipient=self.judge.config.name,
            content=f"Topic: {topic}\n\nFinal positions:\n{all_positions}\n\nProvide your verdict.",
            message_type="task"
        )
        
        verdict = await self.judge.process(judge_message)
        
        return verdict.content

class HierarchicalCoordinator:
    """Coordinate agents in a hierarchy."""
    
    def __init__(
        self,
        leader: BaseAgent,
        teams: dict[str, list[BaseAgent]]
    ):
        self.leader = leader
        self.teams = teams
    
    async def execute(self, task: str) -> str:
        """Execute task through hierarchy."""
        
        # Leader creates team assignments
        team_names = ", ".join(self.teams.keys())
        
        assignment_message = AgentMessage(
            sender="hierarchy",
            recipient=self.leader.config.name,
            content=f"Task: {task}\n\nAvailable teams: {team_names}\n\nAssign subtasks to teams.",
            message_type="task"
        )
        
        assignments = await self.leader.process(assignment_message)
        
        # Parse and execute assignments (simplified)
        team_results = {}
        
        for team_name, agents in self.teams.items():
            # Execute within team
            coordinator = ParallelCoordinator(agents)
            results = await coordinator.execute(task)
            team_results[team_name] = results
        
        # Leader synthesizes
        results_text = "\n".join([
            f"{team}: {results}"
            for team, results in team_results.items()
        ])
        
        synthesis_message = AgentMessage(
            sender="hierarchy",
            recipient=self.leader.config.name,
            content=f"Team results:\n{results_text}\n\nSynthesize final answer.",
            message_type="task"
        )
        
        final = await self.leader.process(synthesis_message)
        
        return final.content

Production Multi-Agent Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Initialize components
client = None  # Initialize with OpenAI client
message_bus = MessageBus()

# Create agents
agents = {}

class CreateAgentRequest(BaseModel):
    name: str
    role: str
    system_prompt: str
    model: str = "gpt-4o"

class TaskRequest(BaseModel):
    task: str
    coordinator: str = "orchestrator"  # orchestrator, pipeline, parallel, debate

class MessageRequest(BaseModel):
    sender: str
    recipient: str
    content: str
    message_type: str = "task"

@app.post("/v1/agents")
async def create_agent(request: CreateAgentRequest):
    """Create a new agent."""
    
    config = AgentConfig(
        name=request.name,
        role=AgentRole(request.role),
        system_prompt=request.system_prompt,
        model=request.model
    )
    
    agent = SpecialistAgent(config, client)
    agents[request.name] = agent
    
    return {
        "name": request.name,
        "role": request.role,
        "status": "created"
    }

@app.post("/v1/execute")
async def execute_task(request: TaskRequest):
    """Execute a task with coordination."""
    
    if request.coordinator == "orchestrator":
        orchestrator = OrchestratorAgent(
            ORCHESTRATOR_CONFIG,
            client,
            agents
        )
        
        message = AgentMessage(
            sender="user",
            recipient="orchestrator",
            content=request.task,
            message_type="task"
        )
        
        result = await orchestrator.process(message)
        
    elif request.coordinator == "pipeline":
        coordinator = PipelineCoordinator(list(agents.values()))
        result_content = await coordinator.execute(request.task)
        result = AgentMessage(
            sender="pipeline",
            recipient="user",
            content=result_content,
            message_type="result"
        )
        
    elif request.coordinator == "parallel":
        coordinator = ParallelCoordinator(list(agents.values()))
        results = await coordinator.execute(request.task)
        result = AgentMessage(
            sender="parallel",
            recipient="user",
            content="\n\n".join(results),
            message_type="result"
        )
    
    else:
        raise HTTPException(400, f"Unknown coordinator: {request.coordinator}")
    
    return {
        "task": request.task,
        "coordinator": request.coordinator,
        "result": result.content
    }

@app.post("/v1/messages")
async def send_message(request: MessageRequest):
    """Send a message between agents."""
    
    message = AgentMessage(
        sender=request.sender,
        recipient=request.recipient,
        content=request.content,
        message_type=request.message_type
    )
    
    await message_bus.send(message)
    
    return {"status": "sent", "message_id": id(message)}

@app.get("/v1/agents/{name}/state")
async def get_agent_state(name: str):
    """Get agent state."""
    
    if name not in agents:
        raise HTTPException(404, f"Agent not found: {name}")
    
    agent = agents[name]
    
    return {
        "name": agent.state.name,
        "status": agent.state.status,
        "current_task": agent.state.current_task,
        "results_count": len(agent.state.results)
    }

@app.get("/v1/messages/history")
async def get_message_history(agent: Optional[str] = None):
    """Get message history."""
    
    history = message_bus.get_history(agent)
    
    return {
        "messages": [
            {
                "sender": m.sender,
                "recipient": m.recipient,
                "content": m.content[:100] + "..." if len(m.content) > 100 else m.content,
                "type": m.message_type
            }
            for m in history
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy", "agents": len(agents)}

References

Conclusion

Multi-agent systems unlock capabilities that single agents cannot achieve. Start with clear agent definitions—each agent should have a focused role with a specific system prompt that guides its behavior. Use orchestrator agents to coordinate complex tasks, breaking them into subtasks and delegating to specialists. Implement proper communication protocols—message buses, conversation tracking, and consensus mechanisms enable rich agent collaboration. Choose coordination patterns based on your task structure: pipelines for sequential processing, parallel execution for independent subtasks, debates for exploring multiple perspectives, and hierarchies for complex organizational structures. The key insight is that agents work best when they have clear boundaries and well-defined interfaces. Build your multi-agent system incrementally—start with two agents and add complexity as needed. With proper coordination, multiple specialized agents can tackle problems that would overwhelm any single agent.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.