Introduction
Single agents are powerful. Multi-agent systems are transformative. When multiple AI agents work together, they can tackle problems no single agent could solve - dividing complex tasks, specializing in different domains, and collaborating like a team.
This guide covers everything about building multi-agent systems: architectures, communication patterns, orchestration strategies, and real-world implementations.
Why Multi-Agent Systems?
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SINGLE vs MULTI-AGENT โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ SINGLE AGENT MULTI-AGENT โ
โ โโโโโโโโโโโโโ โโโโโโโโโโโโโโ โ
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Agent โ โ Coord โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโ โโโโโโโโฌโโโโโโโฌโโโโโโโ โ
โ โ Complex โ โ Ag1 โ Ag2 โ Ag3 โ โ
โ โ Task โ โโโโฌโโโโโโโฌโโโโโโโฌโโโโ โ
โ โ โ โ โ โ โ
โ โ Fails! โ โผ โผ โผ โ
โ โโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Combined โ โ
โ โ Result โ โ
โ โโโโโโโโโโโโโโโโ โ
โ โ
โ Limited by: Benefits: โ
โ โข Context window โข Specialization โ
โ โข Single expertise โข Parallel execution โ
โ โข Single viewpoint โข Redundancy โ
โ โข No collaboration โข Scalability โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Architecture Patterns
1. Hierarchical Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ HIERARCHICAL AGENT SYSTEM โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โ
โ โ CEO โ (Strategic) โ
โ โ Agent โ โ
โ โโโโโโฌโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โ
โ โ Eng โ โ Prod โ โ Ops โ (Tactical) โ
โ โ Lead โ โ Lead โ โ Lead โ โ
โ โโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ โ
โ โโโโโโโโโโผโโโโโโโโโ โ โโโโโโโโผโโโโโโโ โ
โ โ โ โ โ โ โ โ โ
โ โผ โผ โผ โผ โผ โผ โผ โ
โ โโโโโโ โโโโโโ โโโโโโ โโโโโโ โโโโโโ โโโโโโ โโโโโโ โ
โ โAg1 โ โAg2 โ โAg3 โ โAg4 โ โAg5 โ โAg6 โ โAg7 โ (Operational)
โ โโโโโโ โโโโโโ โโโโโโ โโโโโโ โโโโโโ โโโโโโ โโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Hierarchical agent implementation
class HierarchicalMultiAgent:
def __init__(self):
# Define hierarchy
self.ceo = Agent(name="CEO", role="strategic")
self.department_leads = {
"engineering": Agent(name="EngLead", role="tactical"),
"product": Agent(name="ProdLead", role="tactical"),
"operations": Agent(name="OpsLead", role="tactical")
}
self.teams = {
"engineering": [Agent(f"Engineer-{i}") for i in range(3)],
"product": [Agent(f"PM-{i}") for i in range(2)],
"operations": [Agent(f"Ops-{i}") for i in range(2)]
}
async def process_request(self, request: str) -> Response:
# CEO determines strategy
strategy = await self.ceo.analyze(request)
# Route to appropriate department
if strategy.department == "engineering":
return await self.handle_engineering(strategy)
elif strategy.department == "product":
return await self.handle_product(strategy)
# ...
async def handle_engineering(self, strategy):
lead = self.department_leads["engineering"]
# Lead breaks into tasks
tasks = await lead.decompose(strategy)
# Team executes in parallel
results = await asyncio.gather(*[
agent.execute(task)
for agent, task in zip(self.teams["engineering"], tasks)
])
# Lead synthesizes results
return await lead.synthesize(results)
2. Network Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ NETWORK AGENT SYSTEM โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โ
โ โโโโ Hub โโโโ โ
โ โ โ Agent โ โ โ
โ โ โโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโ โ
โ โ โ โ โ โ
โ โผ โผ โผ โผ โ
โ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โ
โ โ Search โ โ Code โ โ Data โ โ Web โ โ
โ โ Agent โ โ Agent โโโโโโถโ Agent โโโโ Agent โ โ
โ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โ
โ โ โ โ โ โ
โ โโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโ โ
โ โ Result โ โ
โ โ Aggregatโ โ
โ โโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
3. Committee Architecture
# Multiple agents vote on decisions
class CommitteeMultiAgent:
def __init__(self, agents: list):
self.agents = agents
async def make_decision(self, question: str) -> Decision:
# Get opinions from all agents
opinions = await asyncio.gather(*[
agent.think(question) for agent in self.agents
])
# Aggregate opinions
return self.vote(opinions)
def vote(self, opinions: list) -> Decision:
# Majority vote
votes = {}
for opinion in opinions:
votes[opinion.choice] = votes.get(opinion.choice, 0) + 1
winner = max(votes, key=votes.get)
return Decision(
choice=winner,
confidence=votes[winner] / len(opinions),
opinions=opinions
)
Communication Patterns
1. Message Passing
# Agent message passing
from dataclasses import dataclass
from typing import Optional
@dataclass
class AgentMessage:
sender: str
receiver: str
content: Any
type: str # request, response, broadcast
thread_id: str
class MessageBus:
def __init__(self):
self.inbox = defaultdict(list)
async def send(self, message: AgentMessage):
self.inbox[message.receiver].append(message)
async def broadcast(self, sender: str, content: Any):
message = AgentMessage(
sender=sender,
receiver="*", # broadcast
content=content,
type="broadcast",
thread_id=uuid.uuid4()
)
# Send to all except sender
for agent_name in self.agents:
if agent_name != sender:
await self.send(message)
async def receive(self, receiver: str) -> list:
messages = self.inbox[receiver]
self.inbox[receiver] = []
return messages
# Agent using message bus
class CommunicatingAgent:
def __init__(self, name: str, bus: MessageBus):
self.name = name
self.bus = bus
async def request_help(self, target: str, request: str):
msg = AgentMessage(
sender=self.name,
receiver=target,
content=request,
type="request"
)
await self.bus.send(msg)
async def respond_to(self, request: AgentMessage, response: str):
msg = AgentMessage(
sender=self.name,
receiver=request.sender,
content=response,
type="response",
thread_id=request.thread_id
)
await self.bus.send(msg)
2. Shared State
# Agents share state via distributed store
class SharedStateManager:
def __init__(self):
self.state = {}
self.lock = asyncio.Lock()
async def read(self, key: str) -> Any:
return self.state.get(key)
async def write(self, key: str, value: Any):
async with self.lock:
self.state[key] = value
async def update(self, key: str, updater: callable):
async with self.lock:
old = self.state.get(key)
new = updater(old)
self.state[key] = new
# Usage
state = SharedStateManager()
class SharedAgent:
def __init__(self, name: str, state: SharedStateManager):
self.name = name
self.state = state
async def contribute(self, key: str, data: dict):
await self.state.update(key, lambda current: {
**(current or {}),
self.name: data
})
3. Blackboard Pattern
# Shared blackboard for problem solving
class Blackboard:
def __init__(self):
self.content = {}
self.subscribers = []
def subscribe(self, agent, interest: str):
self.subscribers.append((agent, interest))
async def post(self, source: str, content: dict):
self.content[source] = content
# Notify interested agents
for agent, interest in self.subscribers:
if interest in content:
await agent.notify(content)
class BlackboardAgent:
def __init__(self, name: str, blackboard: Blackboard, interests: list):
self.name = name
self.blackboard = blackboard
for interest in interests:
blackboard.subscribe(self, interest)
async def notify(self, content: dict):
# Process new information
result = await self.process(content)
if result:
await self.blackboard.post(self.name, result)
Specialization Strategies
1. Role-Based Agents
# Define specialized agents
specialists = {
"researcher": Agent(
name="Researcher",
role="Find information",
system_prompt="""You are a research specialist.
Your job is to find accurate, relevant information.""",
tools=["web_search", "browse", "read_pdf"]
),
"coder": Agent(
name="Coder",
role="Write code",
system_prompt="""You are a coding specialist.
Your job is to write clean, correct code.""",
tools=["read_file", "write_file", "execute_code"]
),
"reviewer": Agent(
name="Reviewer",
role="Review and critique",
system_prompt="""You are a review specialist.
Your job is to find issues and improve quality.""",
tools=["analyze_code", "run_tests"]
),
"writer": Agent(
name="Writer",
role="Create content",
system_prompt="""You are a writing specialist.
Your job is to create clear, engaging content.""",
tools=["write_file", "format_markdown"]
)
}
# Workflow with specialists
async def code_review_pipeline(code: str):
# Research
context = await specialists["researcher"].execute(
"Find best practices for this code pattern"
)
# Write with context
code = await specialists["coder"].execute(
f"Write code considering: {context}"
)
# Review
issues = await specialists["reviewer"].execute(code)
# Fix
if issues:
code = await specialists["coder"].execute(
f"Fix these issues: {issues}"
)
# Document
docs = await specialists["writer"].execute(
f"Document this code: {code}"
)
return {"code": code, "docs": docs}
2. Dynamic Agent Selection
class DynamicRouter:
def __init__(self, agents: dict):
self.agents = agents
async def route(self, task: str) -> Agent:
# Analyze task requirements
requirements = await self.analyze_task(task)
# Score each agent
scores = {}
for name, agent in self.agents.items():
score = await self.score_agent(agent, requirements)
scores[name] = score
# Select best agent
best = max(scores, key=scores.get)
return self.agents[best]
async def score_agent(self, agent: Agent, requirements: dict) -> float:
# Simple heuristic scoring
score = 0
# Check tool match
for req_tool in requirements.get("tools", []):
if req_tool in agent.tools:
score += 1
# Check domain match
for keyword in requirements.get("keywords", []):
if keyword in agent.system_prompt.lower():
score += 0.5
return score
Collaboration Patterns
1. Sequential Pipeline
# Agents work in sequence
async def sequential_pipeline(task: str, agents: list):
result = task
for agent in agents:
result = await agent.execute(result)
return result
# Example: Research -> Write -> Edit -> Publish
workflow = sequential_pipeline(
"Write about AI agents",
agents=[
research_agent, # Gather information
outline_agent, # Create structure
writer_agent, # Write content
editor_agent, # Edit and refine
publisher_agent # Format and publish
]
)
2. Parallel Execution
# Agents work simultaneously
async def parallel_pipeline(task: str, agents: list):
# All agents work on same task
results = await asyncio.gather(*[
agent.execute(task) for agent in agents
])
# Combine results
return combine_results(results)
# Example: Multiple perspectives
perspectives = await parallel_pipeline(
"Analyze this investment",
agents=[
risk_agent, # Analyze risks
opportunity_agent, # Find opportunities
compliance_agent, # Check compliance
financial_agent # Model financials
]
)
3. Iterative Refinement
# Agents refine each other's work
async def iterative_refinement(task: str, agent_a: Agent, agent_b: Agent, iterations: int = 3):
current = await agent_a.execute(task)
for _ in range(iterations):
# Agent B critiques
feedback = await agent_b.review(current)
# Agent A improves
current = await agent_a.improve(current, feedback)
# Check if converged
if feedback.is_acceptable:
break
return current
# Example: Writer/Editor
final_draft = await iterative_refinement(
article,
writer=writer_agent,
editor=editor_agent,
iterations=5
)
Real-World Examples
1. Customer Support Team
support_team = {
"triage": Agent(
name="Triage Agent",
role="Route inquiries",
tools=["classify_intent", "extract_entities"]
),
"technical": Agent(
name="Technical Support",
role="Solve technical issues",
tools=["search_kb", "run_diagnostics", "reset_password"]
),
"billing": Agent(
name="Billing Support",
role="Handle payments",
tools=["check_balance", "process_refund", "update_subscription"]
),
"escalation": Agent(
name="Escalation Manager",
role="Handle complex cases",
tools=["summarize_case", "notify_human"]
)
}
async def handle_support_ticket(ticket):
# Triage first
category = await support_team["triage"].classify(ticket)
# Route to specialist
if category == "technical":
result = await support_team["technical"].solve(ticket)
elif category == "billing":
result = await support_team["billing"].resolve(ticket)
# Escalate if needed
if result.needs_escalation:
await support_team["escalation"].notify(result)
return result
2. Development Team
dev_team = {
"architect": Agent(
name="System Architect",
role="Design systems"
),
"backend": Agent(
name="Backend Developer",
role="Build APIs"
),
"frontend": Agent(
name="Frontend Developer",
role="Build UI"
),
"qa": Agent(
name="QA Engineer",
role="Test"
)
}
async def build_feature(feature_spec):
# Design
design = await dev_team["architect"].design(feature_spec)
# Split work
backend_spec, frontend_spec = design.split()
# Parallel development
backend, frontend = await asyncio.gather(
dev_team["backend"].implement(backend_spec),
dev_team["frontend"].implement(frontend_spec)
)
# Integration
integrated = await dev_team["backend"].integrate(backend, frontend)
# Test
test_results = await dev_team["qa"].test(integrated)
return test_results
Coordination Mechanisms
1. Task Allocation
class TaskAllocator:
def __init__(self, agents: list):
self.agents = agents
self.assignments = {}
async def allocate(self, tasks: list) -> dict:
# Score each task for each agent
scores = []
for task in tasks:
for agent in self.agents:
score = await self.score(task, agent)
scores.append((task, agent, score))
# Greedy assignment
scores.sort(key=lambda x: x[2], reverse=True)
assignments = {}
used_agents = set()
for task, agent, score in scores:
if agent not in used_agents:
assignments[task] = agent
used_agents.add(agent)
return assignments
async def score(self, task: Task, agent: Agent) -> float:
# Consider: skill match, availability, past performance
return 1.0 # Simplified
2. Conflict Resolution
class ConflictResolver:
def resolve(self, agent_outputs: list) -> Any:
# Different strategies
# 1. Voting
return self.vote(agent_outputs)
# 2. Weighted voting
return self.weighted_vote(agent_outputs)
# 3. Consensus
return self.consensus(agent_outputs)
# 4. Arbitration (designated agent decides)
return self.arbitrate(agent_outputs)
def vote(self, outputs: list) -> Any:
counts = {}
for output in outputs:
counts[output] = counts.get(output, 0) + 1
return max(counts, key=counts.get)
Scaling Considerations
1. Agent Pool Management
class AgentPool:
def __init__(self, agent_factory, size: int):
self.pool = asyncio.Queue()
self.size = size
# Pre-populate
for _ in range(size):
self.pool.put_nowait(agent_factory())
async def acquire(self, timeout: float = 30) -> Agent:
try:
return await asyncio.wait_for(
self.pool.get(),
timeout=timeout
)
except asyncio.TimeoutError:
# Scale up
agent = await self.create_agent()
return agent
async def release(self, agent: Agent):
await self.pool.put(agent)
2. Load Balancing
class LoadBalancer:
def __init__(self, agents: list):
self.agents = agents
self.current = 0
def get_next(self) -> Agent:
# Round-robin
agent = self.agents[self.current]
self.current = (self.current + 1) % len(self.agents)
return agent
def get_least_loaded(self) -> Agent:
# Pick agent with fewest active tasks
return min(self.agents, key=lambda a: a.active_tasks)
Conclusion
Multi-agent systems unlock capabilities beyond single agents:
- Specialization - Agents excel at specific tasks
- Collaboration - Agents work together effectively
- Scalability - Add more agents for more throughput
- Robustness - Redundancy prevents single points of failure
- Flexibility - Dynamic routing and task allocation
Choose your architecture based on your use case: hierarchical for structured organizations, network for flexible collaboration, committee for critical decisions.
Related Articles
- Agent-to-Agent Protocol: A2A
- Model Context Protocol: MCP
- AI Agent Frameworks Comparison
- Introduction to Agentic AI
Comments