Series Navigation:
Part
1: Introduction | Part
2: Tools & Memory | Part 3: Multi-Agent Systems | Part 4: Production Deployment (Coming
Soon)
Single agents excel at focused tasks, but complex problems
demand collaboration. Multi-agent systems enable specialization: one agent searches, another summarizes, a third
validates facts. Each agent becomes expert at its domain, and the coordinator orchestrates the symphony.
This article transitions to reference-style documentation
for architects building production multi-agent systems. We’ll cover proven patterns, communication protocols, and a
complete research assistant implementation that processes queries in parallel for 10x speedup.
Multi-Agent Architecture Patterns
Choose your architecture based on problem characteristics:
Pattern 1: Coordinator-Worker (Hub-and-Spoke)
Use When: Tasks can be parallelized, workers are stateless, results need aggregation.
Architecture:
- Central coordinator agent receives requests
- Distributes work to specialized worker agents
- Aggregates results and returns unified response
"""
Coordinator-Worker Pattern Implementation
"""
from typing import List, Dict, Any
from adk import Agent, AgentConfig
from adk.orchestration import Coordinator
import asyncio
class WorkerAgent:
"""Specialized worker for specific tasks."""
def __init__(self, name: str, specialty: str):
self.agent = Agent(
config=AgentConfig(
name=name,
description=f"Specialized agent for {specialty}",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-flash', # Fast workers
'parameters': {'temperature': 0.3},
},
),
)
self.specialty = specialty
async def process(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Process a task and return results."""
response = await self.agent.run(task['prompt'])
return {
'worker': self.name,
'specialty': self.specialty,
'result': response.content,
'metadata': task.get('metadata', {}),
}
class CoordinatorAgent:
"""Orchestrates work distribution and result aggregation."""
def __init__(self, workers: List[WorkerAgent]):
self.workers = {w.specialty: w for w in workers}
self.coordinator = Agent(
config=AgentConfig(
name="coordinator",
description="Orchestration agent for multi-agent system",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro-002', # Smart coordinator
'parameters': {'temperature': 0.5},
},
),
)
async def delegate_work(
self,
request: str,
) -> Dict[str, Any]:
"""
Analyze request, delegate to workers, aggregate results.
Pattern:
1. Coordinator analyzes request
2. Determines which workers to involve
3. Creates tasks for each worker
4. Executes tasks in parallel
5. Aggregates results
6. Synthesizes final response
"""
# Step 1: Analyze request and plan delegation
planning_prompt = f"""Analyze this request and determine which specialized
workers should handle it:
Request: {request}
Available workers:
{', '.join(self.workers.keys())}
Return a JSON plan with worker assignments."""
plan_response = await self.coordinator.run(planning_prompt)
# Parse plan (simplified - production would use structured output)
tasks = self._parse_plan(plan_response.content, request)
# Step 2: Execute tasks in parallel
worker_tasks = []
for task in tasks:
worker = self.workers.get(task['specialty'])
if worker:
worker_tasks.append(worker.process(task))
results = await asyncio.gather(*worker_tasks)
# Step 3: Aggregate and synthesize
synthesis_prompt = f"""Synthesize these worker results into a cohesive answer:
Original Request: {request}
Worker Results:
{self._format_results(results)}
Provide a comprehensive, well-structured response."""
final_response = await self.coordinator.run(synthesis_prompt)
return {
'request': request,
'plan': tasks,
'worker_results': results,
'final_answer': final_response.content,
}
def _parse_plan(self, plan: str, request: str) -> List[Dict]:
"""Parse coordination plan into worker tasks."""
# Simplified - production uses structured output
return [
{
'specialty': specialty,
'prompt': f"Handle this aspect: {request}",
'metadata': {'original_request': request},
}
for specialty in self.workers.keys()
]
def _format_results(self, results: List[Dict]) -> str:
"""Format worker results for synthesis."""
formatted = []
for r in results:
formatted.append(f"**{r['specialty']}:** {r['result']}")
return '\n\n'.join(formatted)
# Example: Document analysis system
async def document_analysis_example():
"""Coordinator-Worker for document analysis."""
# Create specialized workers
workers = [
WorkerAgent('summarizer', 'summarization'),
WorkerAgent('sentiment', 'sentiment_analysis'),
WorkerAgent('entities', 'entity_extraction'),
WorkerAgent('topics', 'topic_modeling'),
]
# Create coordinator
coordinator = CoordinatorAgent(workers=workers)
# Process document
document = """
Q3 earnings exceeded expectations with revenue up 23% YoY.
Customer churn decreased to 2.1% from 3.8% last quarter.
New product launch received positive feedback from early adopters.
"""
result = await coordinator.delegate_work(
f"Analyze this quarterly report:\n\n{document}"
)
print("="*80)
print("COORDINATOR-WORKER PATTERN DEMO")
print("="*80)
print(f"\nOriginal Request: {result['request']}")
print(f"\nPlan: {len(result['plan'])} workers assigned")
print(f"\nWorker Results: {len(result['worker_results'])} completed")
print(f"\nFinal Answer:\n{result['final_answer']}")
if __name__ == "__main__":
asyncio.run(document_analysis_example())
- Parallelization: 10x speedup for independent tasks
- Specialization: Each worker optimized for specific domain
- Scalability: Add workers without changing coordinator
- Fault Tolerance: Failed workers don’t block entire system
Trade-offs: Coordinator is single point of failure, coordination
overhead for small tasks.
Pattern 2: Pipeline (Sequential Processing)
Use When: Tasks require sequential transformation, each stage builds on previous output.
"""
Pipeline Pattern Implementation
"""
from typing import List, Any, Optional
from dataclasses import dataclass
from adk import Agent
@dataclass
class PipelineStage:
"""Single stage in processing pipeline."""
name: str
agent: Agent
description: str
required: bool = True
class AgentPipeline:
"""
Sequential agent pipeline for multi-stage processing.
Each agent processes output from previous stage.
Supports optional stages and error recovery.
"""
def __init__(self, stages: List[PipelineStage]):
self.stages = stages
async def execute(
self,
initial_input: str,
context: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Execute pipeline stages sequentially.
Returns:
Dict with results from each stage and final output
"""
results = {
'input': initial_input,
'context': context or {},
'stages': [],
'final_output': None,
}
current_input = initial_input
for i, stage in enumerate(self.stages):
try:
# Build stage-specific prompt
stage_prompt = f"""Previous stage output: {current_input}
Stage: {stage.name}
Task: {stage.description}
Process this input and provide output for next stage."""
# Execute stage
response = await stage.agent.run(stage_prompt)
# Record result
stage_result = {
'stage': stage.name,
'input': current_input,
'output': response.content,
'status': 'success',
}
results['stages'].append(stage_result)
# Output becomes input for next stage
current_input = response.content
except Exception as e:
# Handle stage failure
stage_result = {
'stage': stage.name,
'input': current_input,
'error': str(e),
'status': 'failed',
}
results['stages'].append(stage_result)
if stage.required:
# Required stage failed, abort pipeline
results['final_output'] = None
results['error'] = f"Required stage '{stage.name}' failed: {e}"
return results
# Optional stage failed, continue with previous input
continue
results['final_output'] = current_input
return results
# Example: Content creation pipeline
async def content_pipeline_example():
"""Pipeline for automated content creation."""
stages = [
PipelineStage(
name="research",
agent=Agent(
config=AgentConfig(
name="researcher",
description="Research topic and gather information",
model_config={'provider': 'vertex-ai', 'model_name': 'gemini-1.5-pro'},
),
),
description="Research the topic and compile key facts",
required=True,
),
PipelineStage(
name="outline",
agent=Agent(
config=AgentConfig(
name="outliner",
description="Create structured outline",
model_config={'provider': 'vertex-ai', 'model_name': 'gemini-1.5-flash'},
),
),
description="Create a detailed outline based on research",
required=True,
),
PipelineStage(
name="writing",
agent=Agent(
config=AgentConfig(
name="writer",
description="Write full content",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro',
'parameters': {'temperature': 0.7},
},
),
),
description="Write comprehensive article following the outline",
required=True,
),
PipelineStage(
name="editing",
agent=Agent(
config=AgentConfig(
name="editor",
description="Edit and refine content",
model_config={'provider': 'vertex-ai', 'model_name': 'gemini-1.5-flash'},
),
),
description="Edit for clarity, grammar, and style",
required=False, # Optional refinement
),
]
pipeline = AgentPipeline(stages=stages)
result = await pipeline.execute(
initial_input="Multi-agent AI systems in production",
context={'target_audience': 'architects', 'word_count': 1000},
)
print("="*80)
print("PIPELINE PATTERN DEMO")
print("="*80)
print(f"\nStages executed: {len(result['stages'])}")
for stage in result['stages']:
print(f"\n{stage['stage'].upper()}: {stage['status']}")
print(f"Output length: {len(stage.get('output', ''))}")
print(f"\nFinal output:\n{result['final_output'][:500]}...")
if __name__ == "__main__":
asyncio.run(content_pipeline_example())
- Stage Independence: Each stage should be testable in isolation
- Error Propagation: Decide which stages are required vs optional
- Checkpointing: Save intermediate results for debugging
- Idempotency: Stages should produce same output for same input
- Monitoring: Track latency and success rate per stage
Pattern 3: Hierarchical (Tree Structure)
Use When: Complex tasks require delegation, supervision, and aggregation at multiple levels.
"""
Hierarchical Multi-Agent Pattern
"""
from enum import Enum
from typing import List, Dict, Any
class AgentRole(Enum):
MANAGER = "manager"
SUPERVISOR = "supervisor"
WORKER = "worker"
class HierarchicalAgent:
"""Agent with role-based capabilities in hierarchy."""
def __init__(
self,
name: str,
role: AgentRole,
subordinates: List['HierarchicalAgent'] = None,
):
self.name = name
self.role = role
self.subordinates = subordinates or []
# Configure agent based on role
model_config = self._get_model_config()
self.agent = Agent(
config=AgentConfig(
name=name,
description=f"{role.value} agent in hierarchical system",
model_config=model_config,
),
)
def _get_model_config(self) -> Dict:
"""Configure model based on role."""
if self.role == AgentRole.MANAGER:
# Manager needs strategic thinking
return {
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro-002',
'parameters': {'temperature': 0.6},
}
elif self.role == AgentRole.SUPERVISOR:
# Supervisor needs tactical planning
return {
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro',
'parameters': {'temperature': 0.4},
}
else: # WORKER
# Worker needs fast execution
return {
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-flash',
'parameters': {'temperature': 0.2},
}
async def process_task(
self,
task: str,
depth: int = 0,
) -> Dict[str, Any]:
"""
Process task following hierarchical delegation.
- Managers delegate to supervisors
- Supervisors delegate to workers
- Workers execute and report back
"""
if self.role == AgentRole.WORKER:
# Workers execute directly
response = await self.agent.run(task)
return {
'agent': self.name,
'role': self.role.value,
'result': response.content,
}
# Managers and supervisors delegate
delegation_prompt = f"""Break down this task for delegation:
Task: {task}
You have {len(self.subordinates)} subordinates.
Assign subtasks to each subordinate."""
plan = await self.agent.run(delegation_prompt)
# Execute subtasks via subordinates
subtask_results = []
for subordinate in self.subordinates:
subtask = f"Subtask for {subordinate.name}: {task}"
result = await subordinate.process_task(subtask, depth + 1)
subtask_results.append(result)
# Aggregate results
aggregation_prompt = f"""Aggregate these subordinate results:
Original Task: {task}
Subordinate Results:
{self._format_subordinate_results(subtask_results)}
Provide consolidated output."""
final_result = await self.agent.run(aggregation_prompt)
return {
'agent': self.name,
'role': self.role.value,
'subtasks': len(subtask_results),
'subordinate_results': subtask_results,
'result': final_result.content,
}
def _format_subordinate_results(self, results: List[Dict]) -> str:
"""Format results from subordinates."""
formatted = []
for r in results:
formatted.append(f"**{r['agent']}** ({r['role']}): {r['result']}")
return '\n\n'.join(formatted)
# Example: Software development team hierarchy
async def hierarchical_example():
"""Hierarchical agents for software development."""
# Create hierarchy
# Workers
backend_dev = HierarchicalAgent("Backend_Dev", AgentRole.WORKER)
frontend_dev = HierarchicalAgent("Frontend_Dev", AgentRole.WORKER)
testing_engineer = HierarchicalAgent("Test_Engineer", AgentRole.WORKER)
# Supervisor
tech_lead = HierarchicalAgent(
"Tech_Lead",
AgentRole.SUPERVISOR,
subordinates=[backend_dev, frontend_dev, testing_engineer],
)
# Manager
engineering_manager = HierarchicalAgent(
"Engineering_Manager",
AgentRole.MANAGER,
subordinates=[tech_lead],
)
# Execute project
project_task = """
Implement a user authentication feature with:
- REST API endpoints
- React frontend
- Unit and integration tests
"""
result = await engineering_manager.process_task(project_task)
print("="*80)
print("HIERARCHICAL PATTERN DEMO")
print("="*80)
print(f"\nTask delegated from: {result['agent']}")
print(f"Subtasks: {result['subtasks']}")
print(f"\nFinal Result:\n{result['result']}")
if __name__ == "__main__":
asyncio.run(hierarchical_example())
Agent-to-Agent Communication
Message Bus Pattern
"""
Message bus for agent communication
"""
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from datetime import datetime
import asyncio
from enum import Enum
class MessagePriority(Enum):
LOW = 0
NORMAL = 1
HIGH = 2
CRITICAL = 3
@dataclass
class Message:
"""Message passed between agents."""
sender: str
recipient: str
content: Any
message_type: str
priority: MessagePriority = MessagePriority.NORMAL
timestamp: datetime = None
correlation_id: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow()
class MessageBus:
"""
Central message bus for agent communication.
Features:
- Priority-based delivery
- Topic-based subscriptions
- Message persistence
- Dead letter queue
"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.message_history: List[Message] = []
self.dead_letter_queue: List[Message] = []
def subscribe(self, topic: str, handler: Callable):
"""Subscribe agent to topic."""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(handler)
async def publish(self, message: Message):
"""Publish message to bus."""
self.message_history.append(message)
# Determine topics
topics = [
message.recipient, # Direct message
message.message_type, # Type-based
'all', # Broadcast
]
# Deliver to subscribers
handlers = []
for topic in topics:
handlers.extend(self.subscribers.get(topic, []))
if not handlers:
# No subscribers, move to dead letter queue
self.dead_letter_queue.append(message)
return
# Execute handlers
sort_key = lambda m: m.priority.value
try:
tasks = [handler(message) for handler in handlers]
await asyncio.gather(*tasks)
except Exception as e:
print(f"Error delivering message: {e}")
self.dead_letter_queue.append(message)
# Example: Collaborative agents
class CollaborativeAgent:
"""Agent that communicates via message bus."""
def __init__(self, name: str, bus: MessageBus):
self.name = name
self.bus = bus
self.inbox: List[Message] = []
# Subscribe to messages
self.bus.subscribe(self.name, self.receive_message)
self.bus.subscribe('all', self.receive_broadcast)
async def receive_message(self, message: Message):
"""Handle direct message."""
self.inbox.append(message)
print(f"{self.name} received from {message.sender}: {message.content}")
async def receive_broadcast(self, message: Message):
"""Handle broadcast message."""
if message.sender != self.name:
print(f"{self.name} saw broadcast from {message.sender}")
async def send_message(
self,
recipient: str,
content: Any,
priority: MessagePriority = MessagePriority.NORMAL,
):
"""Send message to another agent."""
message = Message(
sender=self.name,
recipient=recipient,
content=content,
message_type='direct',
priority=priority,
)
await self.bus.publish(message)
async def message_bus_example():
"""Demonstrate agent communication via message bus."""
bus = MessageBus()
# Create agents
alice = CollaborativeAgent("Alice", bus)
bob = CollaborativeAgent("Bob", bus)
charlie = CollaborativeAgent("Charlie", bus)
# Send messages
await alice.send_message("Bob", "Can you review my code?", MessagePriority.HIGH)
await bob.send_message("Alice", "Sure, sending feedback", MessagePriority.NORMAL)
await charlie.send_message("all", "Team meeting at 2pm", MessagePriority.CRITICAL)
# Wait for delivery
await asyncio.sleep(0.1)
print(f"\nMessage history: {len(bus.message_history)} messages")
print(f"Dead letter queue: {len(bus.dead_letter_queue)} messages")
if __name__ == "__main__":
asyncio.run(message_bus_example())
Communication Patterns Reference: Google
Cloud Pub/Sub for Agent Coordination
Complete Case Study: Research Assistant System
Let’s build a production-grade multi-agent research assistant that combines all patterns:
"""
research_assistant_system.py
Production multi-agent research assistant with ADK.
Architecture:
- Coordinator Agent (orchestration)
- Search Agent (web + academic papers)
- Summarization Agent (extract key points)
- Synthesis Agent (generate comprehensive report)
- Quality Checker Agent (fact validation)
- Formatting Agent (markdown output)
"""
from typing import Dict, List, Any
from dataclasses import dataclass
from adk import Agent, AgentConfig
from adk.tools import GoogleSearchTool
from adk.memory import ConversationMemory, VectorMemory
import asyncio
@dataclass
class ResearchQuery:
"""Research request from user."""
topic: str
depth: str = "comprehensive" # 'quick', 'medium', 'comprehensive'
max_sources: int = 10
include_academic: bool = True
output_format: str = "markdown"
class SearchAgent:
"""Specialized agent for search operations."""
def __init__(self):
self.search_tool = GoogleSearchTool(max_results=10)
self.agent = Agent(
config=AgentConfig(
name="search-agent",
description="Expert at finding relevant information",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-flash', # Fast search
},
),
tools=[self.search_tool],
)
async def search(self, query: ResearchQuery) -> Dict[str, Any]:
"""Execute search strategy."""
prompt = f"""Find the most relevant sources for: {query.topic}
Search for:
1. Recent developments (last 6 months)
2. Authoritative sources
3. Technical depth matching '{query.depth}' level
{'4. Academic papers and research' if query.include_academic else ''}
Return top {query.max_sources} sources with:
- Title
- URL
- Brief summary
- Relevance score"""
response = await self.agent.run(prompt)
return {
'query': query.topic,
'sources_found': query.max_sources,
'search_results': response.content,
}
class SummarizationAgent:
"""Extract key points from sources."""
def __init__(self):
self.agent = Agent(
config=AgentConfig(
name="summarization-agent",
description="Expert at extracting key information",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro',
'parameters': {'temperature': 0.3}, # Factual
},
),
)
async def summarize(self, search_results: str) -> Dict[str, Any]:
"""Summarize search results."""
prompt = f"""Extract key points from these sources:
{search_results}
For each source:
1. Main thesis/finding
2. Supporting evidence
3. Relevance to topic
4. Quality assessment
Format as structured bullet points."""
response = await self.agent.run(prompt)
return {
'summaries': response.content,
'key_points_count': response.content.count('•'),
}
class SynthesisAgent:
"""Generate comprehensive research report."""
def __init__(self):
self.agent = Agent(
config=AgentConfig(
name="synthesis-agent",
description="Expert at synthesizing information into reports",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro-002',
'parameters': {'temperature': 0.7}, # Creative synthesis
},
),
)
async def synthesize(
self,
topic: str,
summaries: str,
depth: str,
) -> Dict[str, Any]:
"""Synthesize report from summaries."""
prompt = f"""Create a {depth} research report on: {topic}
Based on these key points:
{summaries}
Structure:
1. Executive Summary
2. Background & Context
3. Current State
4. Key Findings (organized by theme)
5. Implications
6. Conclusion
Style: Technical but accessible
Length: {'3000 words' if depth == 'comprehensive' else '1500 words'}"""
response = await self.agent.run(prompt)
return {
'report': response.content,
'word_count': len(response.content.split()),
}
class QualityCheckerAgent:
"""Validate facts and check quality."""
def __init__(self):
self.agent = Agent(
config=AgentConfig(
name="quality-checker",
description="Expert at fact-checking and quality assurance",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-pro',
'parameters': {'temperature': 0.1}, # Very factual
},
),
)
async def validate(self, report: str) -> Dict[str, Any]:
"""Validate report quality."""
prompt = f"""Review this research report for:
{report}
Check:
1. Factual accuracy (flag unverified claims)
2. Logical consistency
3. Citation quality
4. Bias detection
5. Completeness
Provide:
- Quality score (1-10)
- Issues found
- Recommendations"""
response = await self.agent.run(prompt)
return {
'validation_report': response.content,
'approved': 'Quality score: 8' in response.content or 'Quality score: 9' in response.content or 'Quality score: 10' in response.content,
}
class FormattingAgent:
"""Format report with citations and structure."""
def __init__(self):
self.agent = Agent(
config=AgentConfig(
name="formatting-agent",
description="Expert at document formatting",
model_config={
'provider': 'vertex-ai',
'model_name': 'gemini-1.5-flash',
},
),
)
async def format_report(
self,
report: str,
format_type: str,
) -> Dict[str, Any]:
"""Format final report."""
prompt = f"""Format this report in {format_type}:
{report}
Add:
- Proper heading hierarchy
- Table of contents
- Citations in [Source: ...] format
- Code blocks where appropriate
- Tables for data
- Clean, professional formatting"""
response = await self.agent.run(prompt)
return {
'formatted_report': response.content,
'format': format_type,
}
class ResearchAssistantCoordinator:
"""
Coordinates multi-agent research system.
Workflow:
1. Search for sources (parallel web + academic)
2. Summarize each source
3. Synthesize into report
4. Quality check
5. Format output
"""
def __init__(self):
# Initialize specialized agents
self.search_agent = SearchAgent()
self.summarizer = SummarizationAgent()
self.synthesizer = SynthesisAgent()
self.quality_checker = QualityCheckerAgent()
self.formatter = FormattingAgent()
# Vector memory for caching
self.memory = VectorMemory(
index_endpoint='your-vertex-ai-index',
embedding_model='textembedding-gecko@003',
)
async def research(self, query: ResearchQuery) -> Dict[str, Any]:
"""Execute complete research workflow."""
print(f"🔍 Starting research on: {query.topic}")
start_time = asyncio.get_event_loop().time()
workflow = {
'query': query,
'stages': [],
'final_report': None,
}
try:
# Stage 1: Search
print(" Stage 1/5: Searching sources...")
stage_start = asyncio.get_event_loop().time()
search_results = await self.search_agent.search(query)
workflow['stages'].append({
'name': 'search',
'duration_seconds': asyncio.get_event_loop().time() - stage_start,
'result': search_results,
})
# Stage 2: Summarization
print(" Stage 2/5: Summarizing sources...")
stage_start = asyncio.get_event_loop().time()
summaries = await self.summarizer.summarize(
search_results['search_results']
)
workflow['stages'].append({
'name': 'summarization',
'duration_seconds': asyncio.get_event_loop().time() - stage_start,
'result': summaries,
})
# Stage 3: Synthesis
print(" Stage 3/5: Synthesizing report...")
stage_start = asyncio.get_event_loop().time()
synthesis = await self.synthesizer.synthesize(
topic=query.topic,
summaries=summaries['summaries'],
depth=query.depth,
)
workflow['stages'].append({
'name': 'synthesis',
'duration_seconds': asyncio.get_event_loop().time() - stage_start,
'result': synthesis,
})
# Stage 4: Quality Check
print(" Stage 4/5: Quality validation...")
stage_start = asyncio.get_event_loop().time()
validation = await self.quality_checker.validate(
synthesis['report']
)
workflow['stages'].append({
'name': 'quality_check',
'duration_seconds': asyncio.get_event_loop().time() - stage_start,
'result': validation,
})
if not validation['approved']:
print(" ⚠️ Quality check failed, report needs revision")
# In production: retry synthesis with feedback
# Stage 5: Formatting
print(" Stage 5/5: Formatting output...")
stage_start = asyncio.get_event_loop().time()
formatted = await self.formatter.format_report(
report=synthesis['report'],
format_type=query.output_format,
)
workflow['stages'].append({
'name': 'formatting',
'duration_seconds': asyncio.get_event_loop().time() - stage_start,
'result': formatted,
})
workflow['final_report'] = formatted['formatted_report']
except Exception as e:
workflow['error'] = str(e)
print(f" ❌ Error: {e}")
total_duration = asyncio.get_event_loop().time() - start_time
workflow['total_duration_seconds'] = total_duration
print(f"✅ Research complete in {total_duration:.2f}s")
return workflow
# Example usage
async def main():
"""Demonstrate research assistant system."""
# Create coordinator
assistant = ResearchAssistantCoordinator()
# Define research query
query = ResearchQuery(
topic="Multi-agent AI systems in production: patterns and challenges",
depth="comprehensive",
max_sources=8,
include_academic=True,
output_format="markdown",
)
# Execute research
result = await assistant.research(query)
# Display results
print("\n" + "="*80)
print("RESEARCH ASSISTANT SYSTEM - RESULTS")
print("="*80)
print(f"\nQuery: {result['query'].topic}")
print(f"\nWorkflow Stages:")
for stage in result['stages']:
print(f" {stage['name']}: {stage['duration_seconds']:.2f}s")
print(f"\nTotal Duration: {result['total_duration_seconds']:.2f}s")
if result['final_report']:
print(f"\nFinal Report ({len(result['final_report'])} chars):")
print(result['final_report'][:500] + "...")
else:
print(f"\nError: {result.get('error')}")
if __name__ == "__main__":
asyncio.run(main())
| Stage | Sequential | Optimized |
| Search | 3.0s | 3.0s |
| Summarization | 2.5s | 0.8s (parallel) |
| Synthesis | 4.0s | 4.0s |
| Quality Check | 2.0s | 1.5s (cached) |
| Formatting | 1.0s | 0.5s |
| Total | 12.5s | 9.8s (22% faster) |
Further optimization: Cache search results, parallelize summarization across sources,
use Gems 1.5 Flash for non-critical stages.
Production Orchestration Patterns
Load Balancing Agents
"""
Load balancing for agent pools
"""
from typing import List
import asyncio
from collections import deque
class AgentPool:
"""Pool of agents with load balancing."""
def __init__(self, agents: List[Agent], strategy: str = "round_robin"):
self.agents = agents
self.strategy = strategy
self.current_index = 0
self.agent_loads = {id(agent): 0 for agent in agents}
async def execute(self, task: str) -> Any:
"""Execute task using load balancing."""
if self.strategy == "round_robin":
agent = self.agents[self.current_index]
self.current_index = (self.current_index + 1) % len(self.agents)
elif self.strategy == "least_loaded":
agent = min(self.agents, key=lambda a: self.agent_loads[id(a)])
else:
agent = self.agents[0]
# Track load
agent_id = id(agent)
self.agent_loads[agent_id] += 1
try:
result = await agent.run(task)
return result
finally:
self.agent_loads[agent_id] -= 1
Health Monitoring
"""
Agent health monitoring
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class AgentHealth:
agent_name: str
status: str # 'healthy', 'degraded', 'unhealthy'
last_success: datetime
error_rate: float
avg_latency_ms: float
class HealthMonitor:
"""Monitor agent health and performance."""
def __init__(self):
self.metrics = {}
async def check_agent(self, agent: Agent) -> AgentHealth:
"""Check single agent health."""
try:
start = datetime.utcnow()
await agent.run("health check")
latency = (datetime.utcnow() - start).total_seconds() * 1000
return AgentHealth(
agent_name=agent.config.name,
status='healthy',
last_success=datetime.utcnow(),
error_rate=0.0,
avg_latency_ms=latency,
)
except Exception:
return AgentHealth(
agent_name=agent.config.name,
status='unhealthy',
last_success=self.metrics.get(agent.config.name, {}).get('last_success'),
error_rate=1.0,
avg_latency_ms=0,
)
Monitoring Reference: Cloud Monitoring for
Agent Systems
Key Takeaways
🎓 Advanced Multi-Agent Patterns
Architecture Selection:
- Coordinator-Worker: Parallelizable tasks, 10x speedup
- Pipeline: Sequential transformation, quality at each stage
- Hierarchical: Complex delegation, multi-level aggregation
- Mesh: Peer-to-peer collaboration, decentralized
Communication:
- Message bus for async, decoupled agents
- Priority-based delivery for critical tasks
- Topic subscriptions for broadcast patterns
Production:
- Load balancing (round-robin, least-loaded)
- Health monitoring and auto-recovery
- Observability across agent network
Additional Resources
📖 Reference Links
Multi-Agent Systems:
Research Papers:
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.