Introduction
Single AI agents are powerful, but multi-agent systems unlock exponentially greater capabilities through collaboration, specialization, and emergent behavior. This comprehensive guide covers everything from fundamental multi-agent concepts to production implementation patterns.
The shift is dramatic. Gartner reports a 1,445% surge in multi-agent system inquiries from early 2024 to mid-2025. By the end of 2026, 40% of enterprise applications will feature task-specific AI agents, up from less than 5% in 2025. AI agents are projected to generate $450 billion in economic value by 2028, yet only 2% of organizations have deployed them at full scale. The infrastructure for coordinated agents has finally matured: three standardized protocols—MCP, A2A, and ACP—make interoperable agent communication practical for the first time.
Understanding Multi-Agent AI Systems
What are Multi-Agent AI Systems?
Multi-agent AI systems consist of multiple AI agents that work together to solve complex problems. Each agent has specific roles, capabilities, and can communicate with other agents to share information and coordinate actions.
graph TB
subgraph "Multi-Agent System"
User[User Request]
subgraph "Agent Network"
PM[Project Manager Agent]
Dev[Developer Agent]
QA[QA Tester Agent]
Docs[Documentation Agent]
end
DB[(Shared Knowledge)]
User --> PM
PM --> Dev
PM --> QA
PM --> Docs
Dev <--> DB
QA <--> DB
Docs <--> DB
Dev --> PM
QA --> PM
Docs --> PM
end
Why Multi-Agent Systems?
| Aspect | Single Agent | Multi-Agent |
|---|---|---|
| Complexity | Limited to single capability | Handles diverse tasks |
| Expertise | Generalist | Specialists working together |
| Scalability | Hard to scale | Add agents for capacity |
| Reliability | Single point of failure | Fault tolerance |
| Emergence | Fixed behavior | Adaptive collaboration |
Key Multi-Agent Concepts
| Concept | Description |
|---|---|
| Agent Role | Specific function or expertise (e.g., researcher, coder, reviewer) |
| Agent Communication | Message passing between agents |
| Shared Memory | Common knowledge base all agents can access |
| Orchestration | Coordinating agent workflows |
| Tool Sharing | Agents can use shared tools and resources |
Protocol Standards for Multi-Agent Systems
Three standards have converged to make production multi-agent systems practical in 2026:
MCP (Model Context Protocol) — Standardized by Anthropic. Defines how agents discover and invoke external tools, databases, and APIs. Instead of each framework building custom integrations, MCP provides a single interface: tools implement it once, agents consume it uniformly. For a deeper look, see our MCP Complete Guide 2026.
A2A (Agent-to-Agent) — Developed by Google. Enables agents to discover each other, delegate tasks, negotiate, and share results without a central orchestrator. Supports ad-hoc agent teams that form and dissolve around specific tasks.
ACP (Agent Communication Protocol) — From IBM. Provides governance frameworks for enterprise deployment: authentication, authorization, audit trails, and compliance validation built into inter-agent communication.
A2A Message Format
{
"jsonrpc": "2.0",
"id": "task-123",
"method": "agents/tasks/create",
"params": {
"agent_id": "research-agent",
"task": {
"id": "task-123",
"description": "Research the latest AI trends",
"context": {
"user_id": "user-456",
"session_id": "session-789"
},
"priority": "high"
},
"input_data": {
"topic": "AI agents 2026",
"depth": "detailed"
}
}
}
A2A Agent Base Implementation
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentTask:
id: str
agent_id: str
description: str
input_data: Dict[str, Any]
context: Dict[str, Any] = None
status: TaskStatus = TaskStatus.PENDING
result: Any = None
class A2AAgent:
def __init__(self, agent_id: str, capabilities: List[str]):
self.agent_id = agent_id
self.capabilities = capabilities
self.message_queue = []
async def send_task(self, target_agent: str, task: AgentTask) -> str:
message = {
"jsonrpc": "2.0", "id": task.id,
"method": "agents/tasks/create",
"params": {
"agent_id": target_agent,
"task": {
"id": task.id,
"description": task.description,
"input_data": task.input_data,
"context": task.context
}
}
}
return await self._send_message(message)
async def receive_task(self, message: Dict) -> AgentTask:
params = message.get("params", {})
task_data = params.get("task", {})
return AgentTask(
id=task_data.get("id"),
agent_id=params.get("agent_id"),
description=task_data.get("description"),
input_data=task_data.get("input_data", {}),
context=task_data.get("context", {})
)
async def send_response(self, task_id: str, result: Any):
message = {
"jsonrpc": "2.0", "id": task_id,
"method": "agents/tasks/result",
"params": {"task_id": task_id, "status": "completed", "result": result}
}
return await self._send_message(message)
Multi-Agent Architectures
Architecture 1: Hierarchical
# Hierarchical Multi-Agent Architecture
class ProjectManagerAgent:
"""Top-level agent that coordinates sub-agents"""
def __init__(self):
self.sub_agents = {
'researcher': ResearcherAgent(),
'developer': DeveloperAgent(),
'reviewer': ReviewerAgent(),
'writer': DocumentationAgent()
}
async def process_request(self, user_request: str) -> dict:
# Analyze request and decompose into tasks
tasks = await self.decompose_request(user_request)
# Assign tasks to specialized agents
results = await self.execute_tasks_parallel(tasks)
# Synthesize results
return await self.synthesize_results(results)
async def decompose_request(self, request: str) -> list[dict]:
"""Break down request into executable tasks"""
# Use LLM to decompose
decomposition_prompt = f"""
Decompose this request into discrete tasks:
{request}
Return a list of tasks with:
- task_id: unique identifier
- description: what to do
- required_role: which agent should handle it
- dependencies: which tasks must complete first
"""
tasks = await self.llm.complete(decomposition_prompt)
return json.loads(tasks)
async def execute_tasks_parallel(self, tasks: list[dict]) -> dict:
"""Execute tasks with appropriate agents"""
# Identify independent tasks
ready_tasks = [t for t in tasks if self._can_execute(t, tasks)]
# Execute ready tasks in parallel
coroutines = []
for task in ready_tasks:
agent = self.sub_agents[task['required_role']]
coroutines.append(agent.execute(task))
results = await asyncio.gather(*coroutines)
return {t['task_id']: r for t, r in zip(ready_tasks, results)}
Architecture 2: Collaborative/Chat-Based
# Collaborative Multi-Agent Chat Architecture
class ChatBasedMultiAgent:
"""Agents collaborate through a shared chat"""
def __init__(self, agents: list[BaseAgent]):
self.agents = agents
self.message_queue = asyncio.Queue()
self.shared_context = SharedContext()
async def run_discussion(self, topic: str, max_turns: int = 10):
"""Run agent discussion on a topic"""
# Initialize discussion
await self.broadcast(Message(
type=MessageType.SYSTEM,
content=f"Discussion topic: {topic}"
))
# Run discussion rounds
for turn in range(max_turns):
for agent in self.agents:
# Agent considers current context
response = await agent.respond(
topic=topic,
context=self.shared_context.get_context(agent.id),
message_history=self.shared_context.get_history()
)
# Broadcast response
await self.broadcast(Message(
type=MessageType.AGENT,
sender=agent.id,
content=response
))
# Update shared context
self.shared_context.add_message(agent.id, response)
# Check for consensus
if self._check_consensus():
return self.shared_context.get_consensus()
return self.shared_context.get_final_summary()
async def broadcast(self, message: Message):
"""Send message to all agents"""
for agent in self.agents:
await agent.receive_message(message)
Architecture 3: Sequential Pipeline
# Sequential Pipeline Architecture
class PipelineMultiAgent:
"""Agents process data in a pipeline"""
def __init__(self, pipeline: list[tuple[str, BaseAgent]]):
"""
Args:
pipeline: List of (stage_name, agent) tuples
"""
self.pipeline = pipeline
async def process(self, input_data: Any) -> Any:
"""Process input through pipeline"""
current_data = input_data
for stage_name, agent in self.pipeline:
# Log pipeline stage
logger.info(f"Pipeline stage: {stage_name}")
# Process with agent
current_data = await agent.process(current_data)
# Validate output
if not self._validate_output(current_data):
raise PipelineError(
f"Agent {stage_name} produced invalid output"
)
return current_data
# Example: Content Creation Pipeline
content_pipeline = PipelineMultiAgent([
('research', ResearcherAgent()),
('outline', OutlinerAgent()),
('write', WriterAgent()),
('edit', EditorAgent()),
('format', FormatterAgent()),
])
# Run pipeline
result = await content_pipeline.process("Write about quantum computing")
Architecture 4: Network (Hub-and-Spoke)
┌─────────────────────────────────────────────────────────────────────┐
│ NETWORK AGENT SYSTEM │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ ┌──│ Hub │──┐ │
│ │ │ Agent │ │ │
│ │ └─────────┘ │ │
│ │ │ │
│ ┌────────────┼───────────────┼────────────┐ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Search │ │ Code │ │ Data │ │ Web │ │
│ │ Agent │ │ Agent │────▶│ Agent │──│ Agent │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ │
│ │ │ │ │ │
│ └────────────┴───────────────┴────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ Result │ │
│ │ Aggregat│ │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
A central hub agent routes requests to specialized agents and aggregates results. Agents can also communicate directly with each other for peer-to-peer collaboration. This pattern balances centralized coordination with flexible inter-agent communication, making it suitable for systems where agents need both structure and autonomy.
Architecture 5: Committee (Voting-Based)
# Committee architecture - multiple agents vote on decisions
class CommitteeMultiAgent:
def __init__(self, agents: list):
self.agents = agents
async def make_decision(self, question: str) -> Decision:
# Get opinions from all agents
opinions = await asyncio.gather(*[
agent.think(question) for agent in self.agents
])
# Aggregate opinions
return self.vote(opinions)
def vote(self, opinions: list) -> Decision:
# Majority vote
votes = {}
for opinion in opinions:
votes[opinion.choice] = votes.get(opinion.choice, 0) + 1
winner = max(votes, key=votes.get)
return Decision(
choice=winner,
confidence=votes[winner] / len(opinions),
opinions=opinions
)
Architecture 6: Role-Based Specialization
# Define specialized agents by role
specialists = {
"researcher": Agent(
name="Researcher",
role="Find information",
system_prompt="""You are a research specialist.
Your job is to find accurate, relevant information.""",
tools=["web_search", "browse", "read_pdf"]
),
"coder": Agent(
name="Coder",
role="Write code",
system_prompt="""You are a coding specialist.
Your job is to write clean, correct code.""",
tools=["read_file", "write_file", "execute_code"]
),
"reviewer": Agent(
name="Reviewer",
role="Review and critique",
system_prompt="""You are a review specialist.
Your job is to find issues and improve quality.""",
tools=["analyze_code", "run_tests"]
),
"writer": Agent(
name="Writer",
role="Create content",
system_prompt="""You are a writing specialist.
Your job is to create clear, engaging content.""",
tools=["write_file", "format_markdown"]
)
}
# Workflow with specialists
async def code_review_pipeline(code: str):
# Research best practices
context = await specialists["researcher"].execute(
"Find best practices for this code pattern"
)
# Write code with context
code = await specialists["coder"].execute(
f"Write code considering: {context}"
)
# Review
issues = await specialists["reviewer"].execute(code)
# Fix issues
if issues:
code = await specialists["coder"].execute(
f"Fix these issues: {issues}"
)
# Document
docs = await specialists["writer"].execute(
f"Document this code: {code}"
)
return {"code": code, "docs": docs}
Architecture 7: Decentralized (Choreography)
Agents communicate peer-to-peer using the A2A protocol. More resilient than centralized orchestration — no single point of failure — but harder to debug. Google’s A2A protocol enables this pattern natively.
Agent A ◄────► Agent B
▲ ▲
│ │
▼ ▼
Agent C ◄────► Agent D
Best for systems where agents must operate autonomously without a central coordinator, such as distributed sensor networks or peer-to-peer marketplaces.
Architecture 8: Event-Driven
Agents coordinate through events and messages, reacting to changes in real-time. Agents subscribe to specific event types and respond when relevant events occur, enabling loose coupling and dynamic scalability.
class EventDrivenCoordinator:
def __init__(self):
self.subscribers: dict[str, list] = {}
def subscribe(self, event_type: str, agent):
self.subscribers.setdefault(event_type, []).append(agent)
async def emit(self, event_type: str, data: Any):
for agent in self.subscribers.get(event_type, []):
await agent.handle_event(event_type, data)
Popular Multi-Agent Frameworks
AutoGen
Microsoft’s AutoGen enables multi-agent conversation:
#!/usr/bin/env python3
"""AutoGen Multi-Agent Example"""
from autogen import ConversableAgent, AssistantAgent, UserProxyAgent
import os
# Configure LLM
llm_config = {
"model": "gpt-4",
"api_key": os.environ["OPENAI_API_KEY"],
}
# Create assistant agent (expert developer)
coder = AssistantAgent(
name="coder",
llm_config=llm_config,
system_message="""You are an expert Python developer.
Write clean, well-documented code.
Always consider error handling and edge cases."""
)
# Create assistant agent (code reviewer)
reviewer = AssistantAgent(
name="reviewer",
llm_config=llm_config,
system_message="""You are a senior code reviewer.
Review code for:
- Security vulnerabilities
- Performance issues
- Code quality
- Test coverage
Provide constructive feedback."""
)
# Create user proxy (human in the loop)
user_proxy = UserProxyAgent(
name="user",
human_input_mode="TERMINATE",
max_consecutive_auto_reply=10,
)
# Define conversation flow
@user_proxy.register_for_execution()
@coder.register_for_llm(description="Write Python code for a given task")
def write_code(task: str) -> str:
"""Generate Python code for the given task."""
return f"# Code for: {task}\n\n# Implementation here..."
@user_proxy.register_for_execution()
@reviewer.register_for_llm(description="Review Python code")
def review_code(code: str) -> str:
"""Review the provided code."""
return f"# Review of:\n{code}\n\n# Feedback here..."
# Start conversation
async def main():
# Initiate chat
await user_proxy.a_initiate_chat(
coder,
message="""Create a Python function that:
1. Fetches data from an API
2. Processes the data
3. Saves to database
Include error handling and logging."""
)
# Get code from coder
code = user_proxy.chat_messages[coder][-1]["content"]
# Send to reviewer
await user_proxy.a_initiate_chat(
reviewer,
message=f"Review this code:\n{code}"
)
if __name__ == "__main__":
asyncio.run(main())
LangGraph: Graph-Based Orchestration
LangGraph models agents as nodes in a stateful directed graph. Transitions depend on dynamic logic, giving fine-grained control over workflow execution with explicit state management and traceable decision paths. Best for complex, multi-step workflows requiring audit trails.
from langgraph.graph import StateGraph, END
from typing import TypedDict
class AgentState(TypedDict):
messages: list
task: str
research_results: str
code: str
analysis: str
def research_agent(state: AgentState):
return {"research_results": perform_web_search(state["task"])}
def coding_agent(state: AgentState):
return {"code": generate_code(state["research_results"])}
def analysis_agent(state: AgentState):
return {"analysis": analyze_code(state.get("code", ""))}
workflow = StateGraph(AgentState)
workflow.add_node("research", research_agent)
workflow.add_node("code", coding_agent)
workflow.add_node("analyze", analysis_agent)
workflow.set_entry_point("research")
workflow.add_edge("research", "code")
workflow.add_edge("code", "analyze")
workflow.add_edge("analyze", END)
app = workflow.compile()
LangGraph provides in-thread memory (single task) and cross-thread memory (across sessions) natively. For regulated industries requiring strict audit trails, this is the strongest choice.
CrewAI
CrewAI provides role-based multi-agent orchestration:
#!/usr/bin/env python3
"""CrewAI Multi-Agent Example"""
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
# Define agents
researcher = Agent(
role="Research Analyst",
goal="Find comprehensive information on the topic",
backstory="""You are an expert research analyst with
years of experience in gathering and synthesizing information.""",
verbose=True,
allow_delegation=False,
)
writer = Agent(
role="Content Writer",
goal="Create engaging, well-structured content",
backstory="""You are a skilled content writer known for
creating compelling narratives that resonate with readers.""",
verbose=True,
allow_delegation=False,
)
editor = Agent(
role="Senior Editor",
goal="Ensure content quality and accuracy",
backstory="""You are a senior editor with sharp attention
to detail and high standards for quality.""",
verbose=True,
allow_delegation=True, # Can delegate to writer
)
# Define tasks
research_task = Task(
description="Research the latest trends in quantum computing",
agent=researcher,
expected_output="Comprehensive research report with sources",
)
write_task = Task(
description="Write an article based on the research",
agent=writer,
expected_output="Well-structured article (2000-3000 words)",
context=[research_task], # Depends on research
)
edit_task = Task(
description="Review and edit the article for publication",
agent=editor,
expected_output="Final polished article ready for publication",
context=[write_task], # Depends on writing
)
# Create crew with hierarchical process
crew = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, write_task, edit_task],
process=Process.hierarchical, # Manager coordinates
manager_agent=editor, # Editor manages the workflow
verbose=True,
)
# Execute
result = crew.kickoff(
inputs={"topic": "Quantum Computing in 2026"}
)
print(f"Crew Result: {result}")
Custom Implementation
#!/usr/bin/env python3
"""Custom Multi-Agent Framework"""
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum
import json
import uuid
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
BROADCAST = "broadcast"
SYSTEM = "system"
@dataclass
class Message:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
type: MessageType = MessageType.REQUEST
sender: str = ""
receiver: str = ""
content: Any = None
metadata: dict = field(default_factory=dict)
class BaseAgent(ABC):
"""Base class for all agents"""
def __init__(self, agent_id: str, role: str, description: str):
self.id = agent_id
self.role = role
self.description = description
self.inbox = asyncio.Queue()
self.tools = {}
self.memory = []
@abstractmethod
async def process(self, message: Message) -> Message:
"""Process incoming message"""
pass
async def receive_message(self, message: Message):
"""Receive and queue message"""
await self.inbox.put(message)
async def run(self):
"""Main agent loop"""
while True:
message = await self.inbox.get()
if message.type == MessageType.SYSTEM and message.content == "STOP":
break
response = await self.process(message)
if response:
await self.send_message(response)
async def send_message(self, message: Message):
"""Send message to another agent or broadcast"""
message.sender = self.id
await self.messenger.send(message)
def add_tool(self, name: str, func: Callable):
"""Register a tool for this agent"""
self.tools[name] = func
def remember(self, item: Any):
"""Store in agent memory"""
self.memory.append({
"content": item,
"timestamp": asyncio.get_event_loop().time()
})
class SpecializedAgent(BaseAgent):
"""Agent with specialized capabilities"""
def __init__(self, agent_id: str, role: str, description: str, system_prompt: str):
super().__init__(agent_id, role, description)
self.system_prompt = system_prompt
self.llm = None # Initialize with your LLM
async def process(self, message: Message) -> Message:
"""Process using LLM and tools"""
# Build context from memory
context = "\n".join([
f"[{m['timestamp']}]: {m['content']}"
for m in self.memory[-10:]
])
# Construct prompt
prompt = f"""
Role: {self.role}
Description: {self.description}
Context:
{context}
Current Request:
{message.content}
Available Tools:
{', '.join(self.tools.keys())}
Respond with your action and any tool calls.
"""
# Get LLM response
llm_response = await self.llm.complete(prompt)
# Parse and execute tool if needed
response_content = llm_response
if self._needs_tool(llm_response):
tool_name, tool_args = self._parse_tool_call(llm_response)
tool_result = await self.tools[tool_name](**tool_args)
response_content = f"{llm_response}\n\nTool Result: {tool_result}"
# Remember interaction
self.remember(f"Request: {message.content}\nResponse: {response_content}")
return Message(
type=MessageType.RESPONSE,
receiver=message.sender,
content=response_content
)
class AgentMessenger:
"""Message broker between agents"""
def __init__(self):
self.agents: dict[str, BaseAgent] = {}
self.message_log: list[Message] = []
def register_agent(self, agent: BaseAgent):
"""Register agent in network"""
agent.messenger = self
self.agents[agent.id] = agent
async def send(self, message: Message):
"""Route message to recipient"""
self.message_log.append(message)
if message.receiver:
# Direct message
await self.agents[message.receiver].receive_message(message)
else:
# Broadcast
for agent in self.agents.values():
if agent.id != message.sender:
await agent.receive_message(message)
class MultiAgentSystem:
"""Complete multi-agent orchestration system"""
def __init__(self):
self.messenger = AgentMessenger()
self.agents: dict[str, BaseAgent] = {}
def add_agent(self, agent: BaseAgent):
"""Add agent to system"""
self.agents[agent.id] = agent
self.messenger.register_agent(agent)
def create_agent(
self,
role: str,
description: str,
system_prompt: str,
tools: dict[str, Callable] = None
) -> SpecializedAgent:
"""Factory method to create specialized agent"""
agent = SpecializedAgent(
agent_id=f"{role.lower().replace(' ', '_')}_{len(self.agents)}",
role=role,
description=description,
system_prompt=system_prompt
)
if tools:
for name, func in tools.items():
agent.add_tool(name, func)
self.add_agent(agent)
return agent
async def run(self, initial_message: str, timeout: int = 60) -> Any:
"""Run multi-agent system with initial message"""
# Start all agents
tasks = [
asyncio.create_task(agent.run())
for agent in self.agents.values()
]
# Send initial message to all agents
broadcast = Message(
type=MessageType.BROADCAST,
content=initial_message
)
await self.messenger.send(broadcast)
# Wait for completion or timeout
try:
await asyncio.wait_for(
self._wait_for_completion(),
timeout=timeout
)
except asyncio.TimeoutError:
pass
# Stop all agents
stop_msg = Message(type=MessageType.SYSTEM, content="STOP")
for agent in self.agents.values():
await agent.receive_message(stop_msg)
# Wait for graceful shutdown
await asyncio.gather(*tasks, return_exceptions=True)
# Return aggregated results
return self._aggregate_results()
async def _wait_for_completion(self):
"""Wait for agents to finish processing"""
# Check if all agents have processed the message
while True:
await asyncio.sleep(1)
# Add completion logic based on your requirements
if len(self.messenger.message_log) > 10:
break
def _aggregate_results(self) -> dict:
"""Aggregate results from all agents"""
return {
agent.id: {
"role": agent.role,
"memory": agent.memory,
"message_count": len([
m for m in self.messenger.message_log
if m.sender == agent.id
])
}
for agent in self.agents.values()
}
# Example: Research and Writing Team
async def main():
system = MultiAgentSystem()
# Create research agent
researcher = system.create_agent(
role="Researcher",
description="Expert at gathering and analyzing information",
system_prompt="You research topics thoroughly and provide comprehensive information.",
tools={
"search_web": lambda query: f"Search results for: {query}",
"read_file": lambda path: f"Content of: {path}",
}
)
# Create writer agent
writer = system.create_agent(
role="Writer",
description="Skilled content creator",
system_prompt="You create clear, engaging content from research.",
)
# Create editor agent
editor = system.create_agent(
role="Editor",
description="Quality assurance specialist",
system_prompt="You ensure content is accurate, well-structured, and error-free.",
)
# Run system
results = await system.run(
"Write a comprehensive guide to machine learning"
)
print(json.dumps(results, indent=2))
if __name__ == "__main__":
asyncio.run(main())
Framework Comparison
| Framework | Architecture | Best For | Learning Curve | Production Ready |
|---|---|---|---|---|
| LangGraph | Stateful graphs | Complex workflows, regulated | Medium | Yes |
| CrewAI | Role-based teams | Rapid prototyping | Low | Yes |
| AutoGen | Conversation loops | Research, experimentation | High | Limited |
| Google ADK | GCP-integrated | Enterprise Google Cloud | Medium | Yes |
Agent Communication Patterns
1. Request-Response
# Direct request-response between agents
class RequestResponsePattern:
"""Simple request-response pattern"""
async def request(self, from_agent: str, to_agent: str, request: str) -> str:
"""Send request and wait for response"""
message = Message(
type=MessageType.REQUEST,
sender=from_agent,
receiver=to_agent,
content=request
)
# Send and wait for response
response_queue = asyncio.Queue()
# Register one-time listener
def listener(msg):
if msg.sender == to_agent:
response_queue.put_nowait(msg)
self.messenger.add_listener(from_agent, listener)
await self.messenger.send(message)
# Wait for response with timeout
try:
response = await asyncio.wait_for(
response_queue.get(),
timeout=30
)
return response.content
except asyncio.TimeoutError:
return "Request timed out"
2. Publish-Subscribe
# Pub-Sub pattern for event-driven communication
class PubSubMessenger(AgentMessenger):
"""Enhanced messenger with pub-sub"""
def __init__(self):
super().__init__()
self.subscriptions: dict[str, set[str]] = {} # topic -> set of agent_ids
def subscribe(self, agent_id: str, topic: str):
"""Subscribe agent to topic"""
if topic not in self.subscriptions:
self.subscriptions[topic] = set()
self.subscriptions[topic].add(agent_id)
def unsubscribe(self, agent_id: str, topic: str):
"""Unsubscribe from topic"""
if topic in self.subscriptions:
self.subscriptions[topic].discard(agent_id)
async def publish(self, topic: str, content: Any, sender: str):
"""Publish message to topic subscribers"""
message = Message(
type=MessageType.BROADCAST,
sender=sender,
content=content,
metadata={"topic": topic}
)
# Send to all subscribers
for agent_id in self.subscriptions.get(topic, []):
if agent_id != sender:
await self.agents[agent_id].receive_message(message)
# Usage
async def event_driven_example():
messenger = PubSubMessenger()
# Subscribe agents to topics
messenger.subscribe("data_agent", "new_data")
messenger.subscribe("analysis_agent", "new_data")
messenger.subscribe("alert_agent", "anomaly_detected")
messenger.subscribe("dashboard_agent", "anomaly_detected")
# Publish events
await messenger.publish("new_data", {"records": 1000}, "ingestion_agent")
await messenger.publish("anomaly_detected", {"score": 0.95}, "analysis_agent")
3. Blackboard System
# Blackboard pattern - shared knowledge base
class Blackboard:
"""Shared knowledge base for agents"""
def __init__(self):
self.data: dict[str, Any] = {}
self.metadata: dict[str, dict] = {}
self.lock = asyncio.Lock()
async def write(self, key: str, value: Any, agent_id: str, description: str = ""):
"""Write to blackboard"""
async with self.lock:
self.data[key] = value
self.metadata[key] = {
"author": agent_id,
"description": description,
"timestamp": asyncio.get_event_loop().time()
}
async def read(self, key: str) -> Any:
"""Read from blackboard"""
async with self.lock:
return self.data.get(key)
async def query(self, pattern: str) -> dict[str, Any]:
"""Query blackboard with pattern"""
async with self.lock:
return {
k: v for k, v in self.data.items()
if pattern.lower() in k.lower()
}
class BlackboardAgent(BaseAgent):
"""Agent that interacts via blackboard"""
def __init__(self, *args, blackboard: Blackboard, **kwargs):
super().__init__(*args, **kwargs)
self.blackboard = blackboard
async def post_findings(self, key: str, findings: Any):
"""Post findings to blackboard"""
await self.blackboard.write(
key, findings, self.id,
f"{self.role} findings"
)
async def read_findings(self, key: str) -> Any:
"""Read findings from blackboard"""
return await self.blackboard.read(key)
4. Shared State Pattern
# Agents share state via distributed store
class SharedStateManager:
def __init__(self):
self.state = {}
self.lock = asyncio.Lock()
async def read(self, key: str) -> Any:
return self.state.get(key)
async def write(self, key: str, value: Any):
async with self.lock:
self.state[key] = value
async def update(self, key: str, updater: callable):
async with self.lock:
old = self.state.get(key)
new = updater(old)
self.state[key] = new
# Usage
state = SharedStateManager()
class SharedAgent:
def __init__(self, name: str, state: SharedStateManager):
self.name = name
self.state = state
async def contribute(self, key: str, data: dict):
await self.state.update(key, lambda current: {
**(current or {}),
self.name: data
})
Tool and Resource Sharing
Shared Tool Registry
# Centralized tool registry
class ToolRegistry:
"""Registry for shared tools across agents"""
def __init__(self):
self.tools: dict[str, Callable] = {}
self.tool_metadata: dict[str, dict] = {}
self.agent_permissions: dict[str, set[str]] = {}
def register_tool(
self,
name: str,
func: Callable,
description: str = "",
required_permissions: list[str] = None
):
"""Register a tool"""
self.tools[name] = func
self.tool_metadata[name] = {
"description": description,
"permissions": required_permissions or [],
"registered_by": None
}
def grant_access(self, agent_id: str, tool_names: list[str]):
"""Grant agent access to tools"""
if agent_id not in self.agent_permissions:
self.agent_permissions[agent_id] = set()
self.agent_permissions[agent_id].update(tool_names)
def has_access(self, agent_id: str, tool_name: str) -> bool:
"""Check if agent has tool access"""
# Admin always has access
if agent_id.startswith("admin"):
return True
return tool_name in self.agent_permissions.get(agent_id, set())
async def execute(self, agent_id: str, tool_name: str, **kwargs) -> Any:
"""Execute tool if agent has permission"""
if not self.has_access(agent_id, tool_name):
raise PermissionError(f"Agent {agent_id} cannot access {tool_name}")
tool = self.tools.get(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")
return await tool(**kwargs)
# Global registry
tool_registry = ToolRegistry()
# Register shared tools
tool_registry.register_tool(
"search_web",
lambda query: f"Search: {query}",
"Search the web for information"
)
tool_registry.register_tool(
"execute_code",
lambda code: f"Executed: {code[:50]}...",
"Execute Python code",
required_permissions=["code_execution"]
)
tool_registry.register_tool(
"read_file",
lambda path: f"Content: {path}",
"Read file from filesystem",
required_permissions=["file_read"]
)
Resource Pools
# Shared resource pools
class ResourcePool:
"""Pool of reusable resources"""
def __init__(self, factory: Callable, max_size: int = 10):
self.factory = factory
self.max_size = max_size
self.available: asyncio.Queue = asyncio.Queue()
self.in_use: set = set()
async def acquire(self) -> Any:
"""Acquire resource from pool"""
# Try to get from available
try:
resource = self.available.get_nowait()
self.in_use.add(id(resource))
return resource
except asyncio.QueueEmpty:
# Create new if under limit
if len(self.in_use) < self.max_size:
resource = await self.factory()
self.in_use.add(id(resource))
return resource
else:
# Wait for availability
resource = await self.available.get()
self.in_use.add(id(resource))
return resource
def release(self, resource: Any):
"""Release resource back to pool"""
resource_id = id(resource)
if resource_id in self.in_use:
self.in_use.discard(resource_id)
self.available.put_nowait(resource)
# Example: LLM pool
llm_pool = ResourcePool(
factory=lambda: ChatOpenAI(model="gpt-4"),
max_size=5
)
# Usage in agents
async def use_llm():
llm = await llm_pool.acquire()
try:
result = await llm.agenerate([...])
finally:
llm_pool.release(llm)
Coordination and Orchestration
Task Planning
# Multi-agent task planning
class TaskPlanner:
"""Plan and coordinate tasks across agents"""
def __init__(self, agents: list[BaseAgent]):
self.agents = {a.id: a for a in agents}
self.tasks: dict[str, dict] = {}
self.task_id_counter = 0
def create_plan(self, goal: str, constraints: dict = None) -> list[dict]:
"""Create execution plan for goal"""
# Decompose goal into tasks
tasks = self._decompose_goal(goal)
# Assign tasks to agents
plan = []
for task in tasks:
assigned_agent = self._select_agent(task)
plan.append({
"task_id": f"task_{self.task_id_counter}",
"description": task,
"assigned_to": assigned_agent.id,
"dependencies": self._get_dependencies(task, plan)
})
self.task_id_counter += 1
return plan
def _decompose_goal(self, goal: str) -> list[str]:
"""Decompose goal into subtasks"""
# Use LLM to decompose
prompt = f"""
Decompose this goal into 4-8 discrete subtasks:
Goal: {goal}
Return as a JSON array of task descriptions.
"""
# Simplified - in production use LLM
return [
f"Research: {goal}",
f"Plan: {goal}",
f"Execute: {goal}",
f"Review: {goal}"
]
def _select_agent(self, task: str) -> BaseAgent:
"""Select best agent for task"""
# Match task requirements to agent capabilities
for agent in self.agents.values():
if any(keyword in task.lower() for keyword in agent.role.lower().split()):
return agent
return list(self.agents.values())[0]
def _get_dependencies(self, task: str, plan: list[dict]) -> list[str]:
"""Identify task dependencies"""
# Simple: previous tasks
return [p["task_id"] for p in plan]
async def execute_plan(self, plan: list[dict]) -> dict[str, Any]:
"""Execute plan respecting dependencies"""
results = {}
completed = set()
while len(completed) < len(plan):
# Find ready tasks
ready_tasks = [
t for t in plan
if t["task_id"] not in completed
and all(dep in completed for dep in t["dependencies"])
]
if not ready_tasks:
break
# Execute ready tasks
coroutines = []
for task in ready_tasks:
agent = self.agents[task["assigned_to"]]
coroutines.append(agent.process(Message(
content=task["description"]
)))
task_results = await asyncio.gather(*coroutines)
for task, result in zip(ready_tasks, task_results):
results[task["task_id"]] = result
completed.add(task["task_id"])
return results
Dynamic Task Assignment
# Dynamic load balancing between agents
class DynamicTaskQueue:
"""Dynamically assign tasks to available agents"""
def __init__(self, agents: list[BaseAgent]):
self.agents = {a.id: a for a in agents}
self.task_queues: dict[str, asyncio.Queue] = {
a.id: asyncio.Queue() for a in agents
}
self.agent_loads: dict[str, int] = {a.id: 0 for a in agents}
async def submit_task(self, task: Any) -> str:
"""Submit task, auto-assign to least loaded agent"""
# Find agent with least load
min_load_agent = min(
self.agent_loads.items(),
key=lambda x: x[1]
)[0]
# Assign task
await self.task_queues[min_load_agent].put(task)
self.agent_loads[min_load_agent] += 1
return min_load_agent
async def process_all(self):
"""Process all queued tasks"""
async def process_agent(agent_id: str, queue: asyncio.Queue):
while not queue.empty():
task = await queue.get()
agent = self.agents[agent_id]
try:
await agent.process(task)
finally:
self.agent_loads[agent_id] -= 1
# Process all agents concurrently
await asyncio.gather(*[
process_agent(aid, q)
for aid, q in self.task_queues.items()
])
Dynamic Router
class DynamicRouter:
def __init__(self, agents: dict):
self.agents = agents
async def route(self, task: str) -> Agent:
# Analyze task requirements
requirements = await self.analyze_task(task)
# Score each agent
scores = {}
for name, agent in self.agents.items():
score = await self.score_agent(agent, requirements)
scores[name] = score
# Select best agent
best = max(scores, key=scores.get)
return self.agents[best]
async def score_agent(self, agent: Agent, requirements: dict) -> float:
score = 0
# Check tool match
for req_tool in requirements.get("tools", []):
if req_tool in agent.tools:
score += 1
# Check domain match
for keyword in requirements.get("keywords", []):
if keyword in agent.system_prompt.lower():
score += 0.5
return score
Memory Management
Production multi-agent systems require layered memory:
- Working memory — Current task context, shared across agents in the same workflow
- Short-term memory — Session-level state that persists across a conversation
- Long-term memory — Cross-session knowledge retained for repeated use
- Episodic memory — Records of past agent decisions and outcomes for learning
Shared Knowledge Base
# Shared memory for multi-agent systems
import vectorstore
from datetime import datetime, timedelta
class SharedKnowledgeBase:
"""Shared memory with vector search"""
def __init__(self):
self.vector_store = vectorstore.InMemoryVectorStore()
self.metadata_store = {}
async def add(
self,
content: str,
agent_id: str,
tags: list[str] = None,
metadata: dict = None
):
"""Add to knowledge base"""
entry_id = f"kb_{len(self.metadata_store)}"
# Store in vector DB
await self.vector_store.add(
id=entry_id,
text=content,
metadata={
"agent_id": agent_id,
"tags": tags or [],
**(metadata or {})
}
)
# Store metadata
self.metadata_store[entry_id] = {
"content": content,
"agent_id": agent_id,
"tags": tags or [],
"created_at": datetime.now(),
"access_count": 0
}
async def search(
self,
query: str,
agent_id: str = None,
tags: list[str] = None,
top_k: int = 5
) -> list[dict]:
"""Search knowledge base"""
results = await self.vector_store.search(
query=query,
top_k=top_k
)
# Filter by agent and tags
filtered = []
for r in results:
entry = self.metadata_store.get(r["id"])
if not entry:
continue
# Access control
if agent_id and entry["agent_id"] != agent_id:
# Check if shared
if not metadata_store.get(r["id"], {}).get("shared", False):
continue
# Tag filter
if tags and not any(t in entry.get("tags", []) for t in tags):
continue
entry["access_count"] += 1
filtered.append(entry)
return filtered[:top_k]
async def get_recent(self, agent_id: str = None, minutes: int = 60) -> list[dict]:
"""Get recent entries"""
cutoff = datetime.now() - timedelta(minutes=minutes)
return [
e for e in self.metadata_store.values()
if e["created_at"] > cutoff
and (agent_id is None or e["agent_id"] == agent_id)
]
# Global knowledge base
knowledge_base = SharedKnowledgeBase()
Scaling Considerations
Agent Pool Management
class AgentPool:
def __init__(self, agent_factory, size: int):
self.pool = asyncio.Queue()
self.size = size
# Pre-populate
for _ in range(size):
self.pool.put_nowait(agent_factory())
async def acquire(self, timeout: float = 30) -> Agent:
try:
return await asyncio.wait_for(
self.pool.get(),
timeout=timeout
)
except asyncio.TimeoutError:
# Scale up dynamically
agent = await self.create_agent()
return agent
async def release(self, agent: Agent):
await self.pool.put(agent)
Load Balancing
class LoadBalancer:
def __init__(self, agents: list):
self.agents = agents
self.current = 0
def get_next(self) -> Agent:
# Round-robin
agent = self.agents[self.current]
self.current = (self.current + 1) % len(self.agents)
return agent
def get_least_loaded(self) -> Agent:
# Pick agent with fewest active tasks
return min(self.agents, key=lambda a: a.active_tasks)
Error Handling and Resilience
Fault Tolerance
# Error handling in multi-agent systems
class ResilientMultiAgentSystem:
"""Multi-agent system with error handling"""
def __init__(self, agents: list[BaseAgent]):
self.agents = {a.id: a for a in agents}
self.retry_policy = RetryPolicy(max_retries=3, backoff_factor=2)
self.fallback_agents: dict[str, str] = {} # agent_id -> fallback_id
async def execute_with_retry(
self,
agent_id: str,
message: Message,
retry_count: int = 0
) -> Message:
"""Execute with retry logic"""
agent = self.agents[agent_id]
try:
return await asyncio.wait_for(
agent.process(message),
timeout=30
)
except asyncio.TimeoutError:
if retry_count < self.retry_policy.max_retries:
await asyncio.sleep(
self.retry_policy.backoff_factor ** retry_count
)
return await self.execute_with_retry(
agent_id, message, retry_count + 1
)
# Fallback
return await self.execute_with_fallback(agent.id, message)
except Exception as e:
logger.error(f"Agent {agent_id} error: {e}")
if retry_count < self.retry_policy.max_retries:
return await self.execute_with_retry(
agent_id, message, retry_count + 1
)
return await self.execute_with_fallback(agent.id, message)
async def execute_with_fallback(self, failed_agent_id: str, message: Message) -> Message:
"""Execute with fallback agent"""
fallback_id = self.fallback_agents.get(failed_agent_id)
if fallback_id and fallback_id in self.agents:
logger.info(f"Falling back from {failed_agent_id} to {fallback_id}")
return await self.execute_with_retry(fallback_id, message)
# No fallback - return error
return Message(
type=MessageType.RESPONSE,
content=f"Failed to process after retries. Agent: {failed_agent_id}",
metadata={"error": "max_retries_exceeded"}
)
def set_fallback(self, agent_id: str, fallback_id: str):
"""Set fallback agent"""
self.fallback_agents[agent_id] = fallback_id
class RetryPolicy:
def __init__(self, max_retries: int = 3, backoff_factor: float = 2):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
Monitoring and Observability
Agent Metrics
# Multi-agent monitoring and metrics
from prometheus_client import Counter, Histogram, Gauge
# Metrics
agent_requests = Counter(
'agent_requests_total',
'Total requests to agent',
['agent_id', 'status']
)
agent_duration = Histogram(
'agent_duration_seconds',
'Agent processing duration',
['agent_id']
)
agent_queue_size = Gauge(
'agent_queue_size',
'Current queue size',
['agent_id']
)
tool_usage = Counter(
'tool_usage_total',
'Tool usage count',
['agent_id', 'tool_name']
)
class MonitoredAgent(BaseAgent):
"""Agent with metrics collection"""
async def process(self, message: Message) -> Message:
start_time = time.time()
try:
result = await super().process(message)
agent_requests.labels(agent_id=self.id, status="success").inc()
return result
except Exception as e:
agent_requests.labels(agent_id=self.id, status="error").inc()
raise
finally:
duration = time.time() - start_time
agent_duration.labels(agent_id=self.id).observe(duration)
async def receive_message(self, message: Message):
agent_queue_size.labels(agent_id=self.id).inc()
try:
await super().receive_message(message)
finally:
agent_queue_size.labels(agent_id=self.id).dec()
Collaboration Patterns
1. Sequential Pipeline
# Agents work in sequence
async def sequential_pipeline(task: str, agents: list):
result = task
for agent in agents:
result = await agent.execute(result)
return result
# Example: Research -> Write -> Edit -> Publish
workflow = sequential_pipeline(
"Write about AI agents",
agents=[
research_agent,
outline_agent,
writer_agent,
editor_agent,
publisher_agent
]
)
2. Parallel Execution
# Agents work simultaneously on the same task
async def parallel_pipeline(task: str, agents: list):
results = await asyncio.gather(*[
agent.execute(task) for agent in agents
])
return combine_results(results)
# Example: Multiple analysis perspectives
perspectives = await parallel_pipeline(
"Analyze this investment",
agents=[
risk_agent,
opportunity_agent,
compliance_agent,
financial_agent
]
)
3. Iterative Refinement
# Agents refine each other's work
async def iterative_refinement(task: str, agent_a, agent_b, iterations: int = 3):
current = await agent_a.execute(task)
for _ in range(iterations):
feedback = await agent_b.review(current)
current = await agent_a.improve(current, feedback)
if feedback.is_acceptable:
break
return current
# Example: Writer/Editor loop
final_draft = await iterative_refinement(
article,
writer_agent,
editor_agent,
iterations=5
)
Real-World Applications
1. Customer Support Team
support_team = {
"triage": Agent(
name="Triage Agent",
role="Route inquiries",
tools=["classify_intent", "extract_entities"]
),
"technical": Agent(
name="Technical Support",
role="Solve technical issues",
tools=["search_kb", "run_diagnostics", "reset_password"]
),
"billing": Agent(
name="Billing Support",
role="Handle payments",
tools=["check_balance", "process_refund", "update_subscription"]
),
"escalation": Agent(
name="Escalation Manager",
role="Handle complex cases",
tools=["summarize_case", "notify_human"]
)
}
async def handle_support_ticket(ticket):
# Triage first
category = await support_team["triage"].classify(ticket)
# Route to specialist
if category == "technical":
result = await support_team["technical"].solve(ticket)
elif category == "billing":
result = await support_team["billing"].resolve(ticket)
# Escalate if needed
if result.needs_escalation:
await support_team["escalation"].notify(result)
return result
2. Development Team
dev_team = {
"architect": Agent(
name="System Architect",
role="Design systems"
),
"backend": Agent(
name="Backend Developer",
role="Build APIs"
),
"frontend": Agent(
name="Frontend Developer",
role="Build UI"
),
"qa": Agent(
name="QA Engineer",
role="Test"
)
}
async def build_feature(feature_spec):
# Design
design = await dev_team["architect"].design(feature_spec)
# Split work
backend_spec, frontend_spec = design.split()
# Parallel development
backend, frontend = await asyncio.gather(
dev_team["backend"].implement(backend_spec),
dev_team["frontend"].implement(frontend_spec)
)
# Integration
integrated = await dev_team["backend"].integrate(backend, frontend)
# Test
test_results = await dev_team["qa"].test(integrated)
return test_results
Best Practices
1. Clear Agent Roles
# Define clear, focused roles
RESEARCHER_ROLE = """
You are a Research Agent.
Your responsibility: Find and synthesize information.
You do NOT:
- Write content directly
- Make decisions about tone
- Execute code
You DO:
- Search for relevant information
- Extract key insights
- Cite sources
- Provide comprehensive overviews
"""
2. Structured Communication
# Use structured message formats
@dataclass
class StructuredMessage:
intent: str # ask, inform, delegate, escalate
content: Any
context: dict
expected_response: str = None
priority: str = "normal" # low, normal, high, critical
3. Graceful Degradation
# Design for partial failures
async def process_with_fallback(agents: list[BaseAgent], task: dict):
"""Try agents in order until one succeeds"""
errors = []
for agent in agents:
try:
return await agent.process(task)
except Exception as e:
errors.append({"agent": agent.id, "error": str(e)})
continue
# All failed - return partial results
return {
"status": "partial_failure",
"errors": errors,
"attempted_agents": [a.id for a in agents]
}
4. Production Checklist
- Start small: 2–3 agents solving one specific problem. Prove value before expanding.
- Team size: 3–7 agents per workflow. Beyond 7, use hierarchical structures with team leads.
- Observability: Track which agent handled each decision and why. Log inter-agent messages with correlation IDs.
- Cost control: Match model size to task complexity. Cache repeated queries. Set token budgets per agent.
- Failure handling: Implement retry with exponential backoff and circuit breakers. Always have fallback agents.
- Governance: By 2027, 40% of agentic AI projects will fail due to inadequate risk controls. Set clear operational limits per agent.
5. Common Pitfalls
- Too many agents (coordination complexity grows exponentially)
- Overlapping agent responsibilities (duplicate work, conflicting decisions)
- Circular dependencies (agents waiting on each other)
- Shared state without synchronization (race conditions)
- No human escalation path (unrecoverable failures)
The Future of Multi-Agent Systems
Trends
| Trend | Impact | Timeline |
|---|---|---|
| A2A Standardization | Universal agent communication | 2026 |
| MCP Adoption | Standardized tool integration | 2026 |
| Agent Marketplaces | Reusable agent templates | 2026–2027 |
| Self-Composing Agents | Agents that design other agents | 2027 |
| Agent Governance Frameworks | Auth, audit, compliance | 2026 |
Emerging Standards
- A2A (Agent-to-Agent) — Google. Peer-to-peer agent collaboration
- MCP (Model Context Protocol) — Anthropic. Standardized tool access
- ACP (Agent Communication Protocol) — IBM. Enterprise governance
- AgentCard — Agent discovery and capability advertisement
Conclusion
Multi-agent AI systems represent the next frontier in AI development. Key takeaways:
- Frameworks: AutoGen, CrewAI, and custom implementations offer different trade-offs
- Architecture: Choose hierarchical, hub-and-spoke, committee, collaborative, or pipeline based on use case
- Communication: Use request-response, pub-sub, blackboard, or shared state patterns
- Resilience: Implement retry logic, fallbacks, and monitoring
- Memory: Shared knowledge bases enable effective collaboration
- Scaling: Use agent pools and load balancers for production workloads
- Specialization: Role-based agents with focused capabilities outperform generalist agents
Start with simple two-agent systems and expand as you understand the coordination patterns needed for your specific use case.
External Resources
- AutoGen Documentation
- CrewAI Documentation
- LangChain Agents
- LangGraph Documentation
- A2A Protocol (Google)
- MCP Specification (Anthropic)
- Multi-Agent Systems Paper
Resources
- OpenAI Documentation
- Hugging Face Documentation
- Papers with Code
- A2A Protocol
- MCP Complete Guide
- AI Agent Frameworks Comparison
- Introduction to Agentic AI
Comments