Introduction
Single AI agents are powerful, but multi-agent systems unlock exponentially greater capabilities through collaboration, specialization, and emergent behavior. This comprehensive guide covers everything from fundamental multi-agent concepts to production implementation patterns.
Understanding Multi-Agent AI Systems
What are Multi-Agent AI Systems?
Multi-agent AI systems consist of multiple AI agents that work together to solve complex problems. Each agent has specific roles, capabilities, and can communicate with other agents to share information and coordinate actions.
graph TB
subgraph "Multi-Agent System"
User[User Request]
subgraph "Agent Network"
PM[Project Manager Agent]
Dev[Developer Agent]
QA[QA Tester Agent]
Docs[Documentation Agent]
end
DB[(Shared Knowledge)]
User --> PM
PM --> Dev
PM --> QA
PM --> Docs
Dev <--> DB
QA <--> DB
Docs <--> DB
Dev --> PM
QA --> PM
Docs --> PM
end
Why Multi-Agent Systems?
| Aspect | Single Agent | Multi-Agent |
|---|---|---|
| Complexity | Limited to single capability | Handles diverse tasks |
| Expertise | Generalist | Specialists working together |
| Scalability | Hard to scale | Add agents for capacity |
| Reliability | Single point of failure | Fault tolerance |
| Emergence | Fixed behavior | Adaptive collaboration |
Key Multi-Agent Concepts
| Concept | Description |
|---|---|
| Agent Role | Specific function or expertise (e.g., researcher, coder, reviewer) |
| Agent Communication | Message passing between agents |
| Shared Memory | Common knowledge base all agents can access |
| Orchestration | Coordinating agent workflows |
| Tool Sharing | Agents can use shared tools and resources |
Multi-Agent Architectures
Architecture 1: Hierarchical
# Hierarchical Multi-Agent Architecture
class ProjectManagerAgent:
"""Top-level agent that coordinates sub-agents"""
def __init__(self):
self.sub_agents = {
'researcher': ResearcherAgent(),
'developer': DeveloperAgent(),
'reviewer': ReviewerAgent(),
'writer': DocumentationAgent()
}
async def process_request(self, user_request: str) -> dict:
# Analyze request and decompose into tasks
tasks = await self.decompose_request(user_request)
# Assign tasks to specialized agents
results = await self.execute_tasks_parallel(tasks)
# Synthesize results
return await self.synthesize_results(results)
async def decompose_request(self, request: str) -> list[dict]:
"""Break down request into executable tasks"""
# Use LLM to decompose
decomposition_prompt = f"""
Decompose this request into discrete tasks:
{request}
Return a list of tasks with:
- task_id: unique identifier
- description: what to do
- required_role: which agent should handle it
- dependencies: which tasks must complete first
"""
tasks = await self.llm.complete(decomposition_prompt)
return json.loads(tasks)
async def execute_tasks_parallel(self, tasks: list[dict]) -> dict:
"""Execute tasks with appropriate agents"""
# Identify independent tasks
ready_tasks = [t for t in tasks if self._can_execute(t, tasks)]
# Execute ready tasks in parallel
coroutines = []
for task in ready_tasks:
agent = self.sub_agents[task['required_role']]
coroutines.append(agent.execute(task))
results = await asyncio.gather(*coroutines)
return {t['task_id']: r for t, r in zip(ready_tasks, results)}
Architecture 2: Collaborative/Chat-Based
# Collaborative Multi-Agent Chat Architecture
class ChatBasedMultiAgent:
"""Agents collaborate through a shared chat"""
def __init__(self, agents: list[BaseAgent]):
self.agents = agents
self.message_queue = asyncio.Queue()
self.shared_context = SharedContext()
async def run_discussion(self, topic: str, max_turns: int = 10):
"""Run agent discussion on a topic"""
# Initialize discussion
await self.broadcast(Message(
type=MessageType.SYSTEM,
content=f"Discussion topic: {topic}"
))
# Run discussion rounds
for turn in range(max_turns):
for agent in self.agents:
# Agent considers current context
response = await agent.respond(
topic=topic,
context=self.shared_context.get_context(agent.id),
message_history=self.shared_context.get_history()
)
# Broadcast response
await self.broadcast(Message(
type=MessageType.AGENT,
sender=agent.id,
content=response
))
# Update shared context
self.shared_context.add_message(agent.id, response)
# Check for consensus
if self._check_consensus():
return self.shared_context.get_consensus()
return self.shared_context.get_final_summary()
async def broadcast(self, message: Message):
"""Send message to all agents"""
for agent in self.agents:
await agent.receive_message(message)
Architecture 3: Sequential Pipeline
# Sequential Pipeline Architecture
class PipelineMultiAgent:
"""Agents process data in a pipeline"""
def __init__(self, pipeline: list[tuple[str, BaseAgent]]):
"""
Args:
pipeline: List of (stage_name, agent) tuples
"""
self.pipeline = pipeline
async def process(self, input_data: Any) -> Any:
"""Process input through pipeline"""
current_data = input_data
for stage_name, agent in self.pipeline:
# Log pipeline stage
logger.info(f"Pipeline stage: {stage_name}")
# Process with agent
current_data = await agent.process(current_data)
# Validate output
if not self._validate_output(current_data):
raise PipelineError(
f"Agent {stage_name} produced invalid output"
)
return current_data
# Example: Content Creation Pipeline
content_pipeline = PipelineMultiAgent([
('research', ResearcherAgent()),
('outline', OutlinerAgent()),
('write', WriterAgent()),
('edit', EditorAgent()),
('format', FormatterAgent()),
])
# Run pipeline
result = await content_pipeline.process("Write about quantum computing")
Popular Multi-Agent Frameworks
AutoGen
Microsoft’s AutoGen enables multi-agent conversation:
#!/usr/bin/env python3
"""AutoGen Multi-Agent Example"""
from autogen import ConversableAgent, AssistantAgent, UserProxyAgent
import os
# Configure LLM
llm_config = {
"model": "gpt-4",
"api_key": os.environ["OPENAI_API_KEY"],
}
# Create assistant agent (expert developer)
coder = AssistantAgent(
name="coder",
llm_config=llm_config,
system_message="""You are an expert Python developer.
Write clean, well-documented code.
Always consider error handling and edge cases."""
)
# Create assistant agent (code reviewer)
reviewer = AssistantAgent(
name="reviewer",
llm_config=llm_config,
system_message="""You are a senior code reviewer.
Review code for:
- Security vulnerabilities
- Performance issues
- Code quality
- Test coverage
Provide constructive feedback."""
)
# Create user proxy (human in the loop)
user_proxy = UserProxyAgent(
name="user",
human_input_mode="TERMINATE",
max_consecutive_auto_reply=10,
)
# Define conversation flow
@user_proxy.register_for_execution()
@coder.register_for_llm(description="Write Python code for a given task")
def write_code(task: str) -> str:
"""Generate Python code for the given task."""
return f"# Code for: {task}\n\n# Implementation here..."
@user_proxy.register_for_execution()
@reviewer.register_for_llm(description="Review Python code")
def review_code(code: str) -> str:
"""Review the provided code."""
return f"# Review of:\n{code}\n\n# Feedback here..."
# Start conversation
async def main():
# Initiate chat
await user_proxy.a_initiate_chat(
coder,
message="""Create a Python function that:
1. Fetches data from an API
2. Processes the data
3. Saves to database
Include error handling and logging."""
)
# Get code from coder
code = user_proxy.chat_messages[coder][-1]["content"]
# Send to reviewer
await user_proxy.a_initiate_chat(
reviewer,
message=f"Review this code:\n{code}"
)
if __name__ == "__main__":
asyncio.run(main())
CrewAI
CrewAI provides role-based multi-agent orchestration:
#!/usr/bin/env python3
"""CrewAI Multi-Agent Example"""
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
# Define agents
researcher = Agent(
role="Research Analyst",
goal="Find comprehensive information on the topic",
backstory="""You are an expert research analyst with
years of experience in gathering and synthesizing information.""",
verbose=True,
allow_delegation=False,
)
writer = Agent(
role="Content Writer",
goal="Create engaging, well-structured content",
backstory="""You are a skilled content writer known for
creating compelling narratives that resonate with readers.""",
verbose=True,
allow_delegation=False,
)
editor = Agent(
role="Senior Editor",
goal="Ensure content quality and accuracy",
backstory="""You are a senior editor with sharp attention
to detail and high standards for quality.""",
verbose=True,
allow_delegation=True, # Can delegate to writer
)
# Define tasks
research_task = Task(
description="Research the latest trends in quantum computing",
agent=researcher,
expected_output="Comprehensive research report with sources",
)
write_task = Task(
description="Write an article based on the research",
agent=writer,
expected_output="Well-structured article (2000-3000 words)",
context=[research_task], # Depends on research
)
edit_task = Task(
description="Review and edit the article for publication",
agent=editor,
expected_output="Final polished article ready for publication",
context=[write_task], # Depends on writing
)
# Create crew with hierarchical process
crew = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, write_task, edit_task],
process=Process.hierarchical, # Manager coordinates
manager_agent=editor, # Editor manages the workflow
verbose=True,
)
# Execute
result = crew.kickoff(
inputs={"topic": "Quantum Computing in 2026"}
)
print(f"Crew Result: {result}")
Custom Implementation
#!/usr/bin/env python3
"""Custom Multi-Agent Framework"""
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum
import json
import uuid
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
BROADCAST = "broadcast"
SYSTEM = "system"
@dataclass
class Message:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
type: MessageType = MessageType.REQUEST
sender: str = ""
receiver: str = ""
content: Any = None
metadata: dict = field(default_factory=dict)
class BaseAgent(ABC):
"""Base class for all agents"""
def __init__(self, agent_id: str, role: str, description: str):
self.id = agent_id
self.role = role
self.description = description
self.inbox = asyncio.Queue()
self.tools = {}
self.memory = []
@abstractmethod
async def process(self, message: Message) -> Message:
"""Process incoming message"""
pass
async def receive_message(self, message: Message):
"""Receive and queue message"""
await self.inbox.put(message)
async def run(self):
"""Main agent loop"""
while True:
message = await self.inbox.get()
if message.type == MessageType.SYSTEM and message.content == "STOP":
break
response = await self.process(message)
if response:
await self.send_message(response)
async def send_message(self, message: Message):
"""Send message to another agent or broadcast"""
message.sender = self.id
await self.messenger.send(message)
def add_tool(self, name: str, func: Callable):
"""Register a tool for this agent"""
self.tools[name] = func
def remember(self, item: Any):
"""Store in agent memory"""
self.memory.append({
"content": item,
"timestamp": asyncio.get_event_loop().time()
})
class SpecializedAgent(BaseAgent):
"""Agent with specialized capabilities"""
def __init__(self, agent_id: str, role: str, description: str, system_prompt: str):
super().__init__(agent_id, role, description)
self.system_prompt = system_prompt
self.llm = None # Initialize with your LLM
async def process(self, message: Message) -> Message:
"""Process using LLM and tools"""
# Build context from memory
context = "\n".join([
f"[{m['timestamp']}]: {m['content']}"
for m in self.memory[-10:]
])
# Construct prompt
prompt = f"""
Role: {self.role}
Description: {self.description}
Context:
{context}
Current Request:
{message.content}
Available Tools:
{', '.join(self.tools.keys())}
Respond with your action and any tool calls.
"""
# Get LLM response
llm_response = await self.llm.complete(prompt)
# Parse and execute tool if needed
response_content = llm_response
if self._needs_tool(llm_response):
tool_name, tool_args = self._parse_tool_call(llm_response)
tool_result = await self.tools[tool_name](**tool_args)
response_content = f"{llm_response}\n\nTool Result: {tool_result}"
# Remember interaction
self.remember(f"Request: {message.content}\nResponse: {response_content}")
return Message(
type=MessageType.RESPONSE,
receiver=message.sender,
content=response_content
)
class AgentMessenger:
"""Message broker between agents"""
def __init__(self):
self.agents: dict[str, BaseAgent] = {}
self.message_log: list[Message] = []
def register_agent(self, agent: BaseAgent):
"""Register agent in network"""
agent.messenger = self
self.agents[agent.id] = agent
async def send(self, message: Message):
"""Route message to recipient"""
self.message_log.append(message)
if message.receiver:
# Direct message
await self.agents[message.receiver].receive_message(message)
else:
# Broadcast
for agent in self.agents.values():
if agent.id != message.sender:
await agent.receive_message(message)
class MultiAgentSystem:
"""Complete multi-agent orchestration system"""
def __init__(self):
self.messenger = AgentMessenger()
self.agents: dict[str, BaseAgent] = {}
def add_agent(self, agent: BaseAgent):
"""Add agent to system"""
self.agents[agent.id] = agent
self.messenger.register_agent(agent)
def create_agent(
self,
role: str,
description: str,
system_prompt: str,
tools: dict[str, Callable] = None
) -> SpecializedAgent:
"""Factory method to create specialized agent"""
agent = SpecializedAgent(
agent_id=f"{role.lower().replace(' ', '_')}_{len(self.agents)}",
role=role,
description=description,
system_prompt=system_prompt
)
if tools:
for name, func in tools.items():
agent.add_tool(name, func)
self.add_agent(agent)
return agent
async def run(self, initial_message: str, timeout: int = 60) -> Any:
"""Run multi-agent system with initial message"""
# Start all agents
tasks = [
asyncio.create_task(agent.run())
for agent in self.agents.values()
]
# Send initial message to all agents
broadcast = Message(
type=MessageType.BROADCAST,
content=initial_message
)
await self.messenger.send(broadcast)
# Wait for completion or timeout
try:
await asyncio.wait_for(
self._wait_for_completion(),
timeout=timeout
)
except asyncio.TimeoutError:
pass
# Stop all agents
stop_msg = Message(type=MessageType.SYSTEM, content="STOP")
for agent in self.agents.values():
await agent.receive_message(stop_msg)
# Wait for graceful shutdown
await asyncio.gather(*tasks, return_exceptions=True)
# Return aggregated results
return self._aggregate_results()
async def _wait_for_completion(self):
"""Wait for agents to finish processing"""
# Check if all agents have processed the message
while True:
await asyncio.sleep(1)
# Add completion logic based on your requirements
if len(self.messenger.message_log) > 10:
break
def _aggregate_results(self) -> dict:
"""Aggregate results from all agents"""
return {
agent.id: {
"role": agent.role,
"memory": agent.memory,
"message_count": len([
m for m in self.messenger.message_log
if m.sender == agent.id
])
}
for agent in self.agents.values()
}
# Example: Research and Writing Team
async def main():
system = MultiAgentSystem()
# Create research agent
researcher = system.create_agent(
role="Researcher",
description="Expert at gathering and analyzing information",
system_prompt="You research topics thoroughly and provide comprehensive information.",
tools={
"search_web": lambda query: f"Search results for: {query}",
"read_file": lambda path: f"Content of: {path}",
}
)
# Create writer agent
writer = system.create_agent(
role="Writer",
description="Skilled content creator",
system_prompt="You create clear, engaging content from research.",
)
# Create editor agent
editor = system.create_agent(
role="Editor",
description="Quality assurance specialist",
system_prompt="You ensure content is accurate, well-structured, and error-free.",
)
# Run system
results = await system.run(
"Write a comprehensive guide to machine learning"
)
print(json.dumps(results, indent=2))
if __name__ == "__main__":
asyncio.run(main())
Agent Communication Patterns
1. Request-Response
# Direct request-response between agents
class RequestResponsePattern:
"""Simple request-response pattern"""
async def request(self, from_agent: str, to_agent: str, request: str) -> str:
"""Send request and wait for response"""
message = Message(
type=MessageType.REQUEST,
sender=from_agent,
receiver=to_agent,
content=request
)
# Send and wait for response
response_queue = asyncio.Queue()
# Register one-time listener
def listener(msg):
if msg.sender == to_agent:
response_queue.put_nowait(msg)
self.messenger.add_listener(from_agent, listener)
await self.messenger.send(message)
# Wait for response with timeout
try:
response = await asyncio.wait_for(
response_queue.get(),
timeout=30
)
return response.content
except asyncio.TimeoutError:
return "Request timed out"
2. Publish-Subscribe
# Pub-Sub pattern for event-driven communication
class PubSubMessenger(AgentMessenger):
"""Enhanced messenger with pub-sub"""
def __init__(self):
super().__init__()
self.subscriptions: dict[str, set[str]] = {} # topic -> set of agent_ids
def subscribe(self, agent_id: str, topic: str):
"""Subscribe agent to topic"""
if topic not in self.subscriptions:
self.subscriptions[topic] = set()
self.subscriptions[topic].add(agent_id)
def unsubscribe(self, agent_id: str, topic: str):
"""Unsubscribe from topic"""
if topic in self.subscriptions:
self.subscriptions[topic].discard(agent_id)
async def publish(self, topic: str, content: Any, sender: str):
"""Publish message to topic subscribers"""
message = Message(
type=MessageType.BROADCAST,
sender=sender,
content=content,
metadata={"topic": topic}
)
# Send to all subscribers
for agent_id in self.subscriptions.get(topic, []):
if agent_id != sender:
await self.agents[agent_id].receive_message(message)
# Usage
async def event_driven_example():
messenger = PubSubMessenger()
# Subscribe agents to topics
messenger.subscribe("data_agent", "new_data")
messenger.subscribe("analysis_agent", "new_data")
messenger.subscribe("alert_agent", "anomaly_detected")
messenger.subscribe("dashboard_agent", "anomaly_detected")
# Publish events
await messenger.publish("new_data", {"records": 1000}, "ingestion_agent")
await messenger.publish("anomaly_detected", {"score": 0.95}, "analysis_agent")
3. Blackboard System
# Blackboard pattern - shared knowledge base
class Blackboard:
"""Shared knowledge base for agents"""
def __init__(self):
self.data: dict[str, Any] = {}
self.metadata: dict[str, dict] = {}
self.lock = asyncio.Lock()
async def write(self, key: str, value: Any, agent_id: str, description: str = ""):
"""Write to blackboard"""
async with self.lock:
self.data[key] = value
self.metadata[key] = {
"author": agent_id,
"description": description,
"timestamp": asyncio.get_event_loop().time()
}
async def read(self, key: str) -> Any:
"""Read from blackboard"""
async with self.lock:
return self.data.get(key)
async def query(self, pattern: str) -> dict[str, Any]:
"""Query blackboard with pattern"""
async with self.lock:
return {
k: v for k, v in self.data.items()
if pattern.lower() in k.lower()
}
class BlackboardAgent(BaseAgent):
"""Agent that interacts via blackboard"""
def __init__(self, *args, blackboard: Blackboard, **kwargs):
super().__init__(*args, **kwargs)
self.blackboard = blackboard
async def post_findings(self, key: str, findings: Any):
"""Post findings to blackboard"""
await self.blackboard.write(
key, findings, self.id,
f"{self.role} findings"
)
async def read_findings(self, key: str) -> Any:
"""Read findings from blackboard"""
return await self.blackboard.read(key)
Tool and Resource Sharing
Shared Tool Registry
# Centralized tool registry
class ToolRegistry:
"""Registry for shared tools across agents"""
def __init__(self):
self.tools: dict[str, Callable] = {}
self.tool_metadata: dict[str, dict] = {}
self.agent_permissions: dict[str, set[str]] = {}
def register_tool(
self,
name: str,
func: Callable,
description: str = "",
required_permissions: list[str] = None
):
"""Register a tool"""
self.tools[name] = func
self.tool_metadata[name] = {
"description": description,
"permissions": required_permissions or [],
"registered_by": None
}
def grant_access(self, agent_id: str, tool_names: list[str]):
"""Grant agent access to tools"""
if agent_id not in self.agent_permissions:
self.agent_permissions[agent_id] = set()
self.agent_permissions[agent_id].update(tool_names)
def has_access(self, agent_id: str, tool_name: str) -> bool:
"""Check if agent has tool access"""
# Admin always has access
if agent_id.startswith("admin"):
return True
return tool_name in self.agent_permissions.get(agent_id, set())
async def execute(self, agent_id: str, tool_name: str, **kwargs) -> Any:
"""Execute tool if agent has permission"""
if not self.has_access(agent_id, tool_name):
raise PermissionError(f"Agent {agent_id} cannot access {tool_name}")
tool = self.tools.get(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")
return await tool(**kwargs)
# Global registry
tool_registry = ToolRegistry()
# Register shared tools
tool_registry.register_tool(
"search_web",
lambda query: f"Search: {query}",
"Search the web for information"
)
tool_registry.register_tool(
"execute_code",
lambda code: f"Executed: {code[:50]}...",
"Execute Python code",
required_permissions=["code_execution"]
)
tool_registry.register_tool(
"read_file",
lambda path: f"Content: {path}",
"Read file from filesystem",
required_permissions=["file_read"]
)
Resource Pools
# Shared resource pools
class ResourcePool:
"""Pool of reusable resources"""
def __init__(self, factory: Callable, max_size: int = 10):
self.factory = factory
self.max_size = max_size
self.available: asyncio.Queue = asyncio.Queue()
self.in_use: set = set()
async def acquire(self) -> Any:
"""Acquire resource from pool"""
# Try to get from available
try:
resource = self.available.get_nowait()
self.in_use.add(id(resource))
return resource
except asyncio.QueueEmpty:
# Create new if under limit
if len(self.in_use) < self.max_size:
resource = await self.factory()
self.in_use.add(id(resource))
return resource
else:
# Wait for availability
resource = await self.available.get()
self.in_use.add(id(resource))
return resource
def release(self, resource: Any):
"""Release resource back to pool"""
resource_id = id(resource)
if resource_id in self.in_use:
self.in_use.discard(resource_id)
self.available.put_nowait(resource)
# Example: LLM pool
llm_pool = ResourcePool(
factory=lambda: ChatOpenAI(model="gpt-4"),
max_size=5
)
# Usage in agents
async def use_llm():
llm = await llm_pool.acquire()
try:
result = await llm.agenerate([...])
finally:
llm_pool.release(llm)
Coordination and Orchestration
Task Planning
# Multi-agent task planning
class TaskPlanner:
"""Plan and coordinate tasks across agents"""
def __init__(self, agents: list[BaseAgent]):
self.agents = {a.id: a for a in agents}
self.tasks: dict[str, dict] = {}
self.task_id_counter = 0
def create_plan(self, goal: str, constraints: dict = None) -> list[dict]:
"""Create execution plan for goal"""
# Decompose goal into tasks
tasks = self._decompose_goal(goal)
# Assign tasks to agents
plan = []
for task in tasks:
assigned_agent = self._select_agent(task)
plan.append({
"task_id": f"task_{self.task_id_counter}",
"description": task,
"assigned_to": assigned_agent.id,
"dependencies": self._get_dependencies(task, plan)
})
self.task_id_counter += 1
return plan
def _decompose_goal(self, goal: str) -> list[str]:
"""Decompose goal into subtasks"""
# Use LLM to decompose
prompt = f"""
Decompose this goal into 4-8 discrete subtasks:
Goal: {goal}
Return as a JSON array of task descriptions.
"""
# Simplified - in production use LLM
return [
f"Research: {goal}",
f"Plan: {goal}",
f"Execute: {goal}",
f"Review: {goal}"
]
def _select_agent(self, task: str) -> BaseAgent:
"""Select best agent for task"""
# Match task requirements to agent capabilities
for agent in self.agents.values():
if any(keyword in task.lower() for keyword in agent.role.lower().split()):
return agent
return list(self.agents.values())[0]
def _get_dependencies(self, task: str, plan: list[dict]) -> list[str]:
"""Identify task dependencies"""
# Simple: previous tasks
return [p["task_id"] for p in plan]
async def execute_plan(self, plan: list[dict]) -> dict[str, Any]:
"""Execute plan respecting dependencies"""
results = {}
completed = set()
while len(completed) < len(plan):
# Find ready tasks
ready_tasks = [
t for t in plan
if t["task_id"] not in completed
and all(dep in completed for dep in t["dependencies"])
]
if not ready_tasks:
break
# Execute ready tasks
coroutines = []
for task in ready_tasks:
agent = self.agents[task["assigned_to"]]
coroutines.append(agent.process(Message(
content=task["description"]
)))
task_results = await asyncio.gather(*coroutines)
for task, result in zip(ready_tasks, task_results):
results[task["task_id"]] = result
completed.add(task["task_id"])
return results
Dynamic Task Assignment
# Dynamic load balancing between agents
class DynamicTaskQueue:
"""Dynamically assign tasks to available agents"""
def __init__(self, agents: list[BaseAgent]):
self.agents = {a.id: a for a in agents}
self.task_queues: dict[str, asyncio.Queue] = {
a.id: asyncio.Queue() for a in agents
}
self.agent_loads: dict[str, int] = {a.id: 0 for a in agents}
async def submit_task(self, task: Any) -> str:
"""Submit task, auto-assign to least loaded agent"""
# Find agent with least load
min_load_agent = min(
self.agent_loads.items(),
key=lambda x: x[1]
)[0]
# Assign task
await self.task_queues[min_load_agent].put(task)
self.agent_loads[min_load_agent] += 1
return min_load_agent
async def process_all(self):
"""Process all queued tasks"""
async def process_agent(agent_id: str, queue: asyncio.Queue):
while not queue.empty():
task = await queue.get()
agent = self.agents[agent_id]
try:
await agent.process(task)
finally:
self.agent_loads[agent_id] -= 1
# Process all agents concurrently
await asyncio.gather(*[
process_agent(aid, q)
for aid, q in self.task_queues.items()
])
Memory Management
Shared Knowledge Base
# Shared memory for multi-agent systems
import vectorstore
from datetime import datetime, timedelta
class SharedKnowledgeBase:
"""Shared memory with vector search"""
def __init__(self):
self.vector_store = vectorstore.InMemoryVectorStore()
self.metadata_store = {}
async def add(
self,
content: str,
agent_id: str,
tags: list[str] = None,
metadata: dict = None
):
"""Add to knowledge base"""
entry_id = f"kb_{len(self.metadata_store)}"
# Store in vector DB
await self.vector_store.add(
id=entry_id,
text=content,
metadata={
"agent_id": agent_id,
"tags": tags or [],
**(metadata or {})
}
)
# Store metadata
self.metadata_store[entry_id] = {
"content": content,
"agent_id": agent_id,
"tags": tags or [],
"created_at": datetime.now(),
"access_count": 0
}
async def search(
self,
query: str,
agent_id: str = None,
tags: list[str] = None,
top_k: int = 5
) -> list[dict]:
"""Search knowledge base"""
results = await self.vector_store.search(
query=query,
top_k=top_k
)
# Filter by agent and tags
filtered = []
for r in results:
entry = self.metadata_store.get(r["id"])
if not entry:
continue
# Access control
if agent_id and entry["agent_id"] != agent_id:
# Check if shared
if not metadata_store.get(r["id"], {}).get("shared", False):
continue
# Tag filter
if tags and not any(t in entry.get("tags", []) for t in tags):
continue
entry["access_count"] += 1
filtered.append(entry)
return filtered[:top_k]
async def get_recent(self, agent_id: str = None, minutes: int = 60) -> list[dict]:
"""Get recent entries"""
cutoff = datetime.now() - timedelta(minutes=minutes)
return [
e for e in self.metadata_store.values()
if e["created_at"] > cutoff
and (agent_id is None or e["agent_id"] == agent_id)
]
# Global knowledge base
knowledge_base = SharedKnowledgeBase()
Error Handling and Resilience
Fault Tolerance
# Error handling in multi-agent systems
class ResilientMultiAgentSystem:
"""Multi-agent system with error handling"""
def __init__(self, agents: list[BaseAgent]):
self.agents = {a.id: a for a in agents}
self.retry_policy = RetryPolicy(max_retries=3, backoff_factor=2)
self.fallback_agents: dict[str, str] = {} # agent_id -> fallback_id
async def execute_with_retry(
self,
agent_id: str,
message: Message,
retry_count: int = 0
) -> Message:
"""Execute with retry logic"""
agent = self.agents[agent_id]
try:
return await asyncio.wait_for(
agent.process(message),
timeout=30
)
except asyncio.TimeoutError:
if retry_count < self.retry_policy.max_retries:
await asyncio.sleep(
self.retry_policy.backoff_factor ** retry_count
)
return await self.execute_with_retry(
agent_id, message, retry_count + 1
)
# Fallback
return await self.execute_with_fallback(agent.id, message)
except Exception as e:
logger.error(f"Agent {agent_id} error: {e}")
if retry_count < self.retry_policy.max_retries:
return await self.execute_with_retry(
agent_id, message, retry_count + 1
)
return await self.execute_with_fallback(agent.id, message)
async def execute_with_fallback(self, failed_agent_id: str, message: Message) -> Message:
"""Execute with fallback agent"""
fallback_id = self.fallback_agents.get(failed_agent_id)
if fallback_id and fallback_id in self.agents:
logger.info(f"Falling back from {failed_agent_id} to {fallback_id}")
return await self.execute_with_retry(fallback_id, message)
# No fallback - return error
return Message(
type=MessageType.RESPONSE,
content=f"Failed to process after retries. Agent: {failed_agent_id}",
metadata={"error": "max_retries_exceeded"}
)
def set_fallback(self, agent_id: str, fallback_id: str):
"""Set fallback agent"""
self.fallback_agents[agent_id] = fallback_id
class RetryPolicy:
def __init__(self, max_retries: int = 3, backoff_factor: float = 2):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
Monitoring and Observability
Agent Metrics
# Multi-agent monitoring and metrics
from prometheus_client import Counter, Histogram, Gauge
# Metrics
agent_requests = Counter(
'agent_requests_total',
'Total requests to agent',
['agent_id', 'status']
)
agent_duration = Histogram(
'agent_duration_seconds',
'Agent processing duration',
['agent_id']
)
agent_queue_size = Gauge(
'agent_queue_size',
'Current queue size',
['agent_id']
)
tool_usage = Counter(
'tool_usage_total',
'Tool usage count',
['agent_id', 'tool_name']
)
class MonitoredAgent(BaseAgent):
"""Agent with metrics collection"""
async def process(self, message: Message) -> Message:
start_time = time.time()
try:
result = await super().process(message)
agent_requests.labels(agent_id=self.id, status="success").inc()
return result
except Exception as e:
agent_requests.labels(agent_id=self.id, status="error").inc()
raise
finally:
duration = time.time() - start_time
agent_duration.labels(agent_id=self.id).observe(duration)
async def receive_message(self, message: Message):
agent_queue_size.labels(agent_id=self.id).inc()
try:
await super().receive_message(message)
finally:
agent_queue_size.labels(agent_id=self.id).dec()
Best Practices
1. Clear Agent Roles
# Define clear, focused roles
RESEARCHER_ROLE = """
You are a Research Agent.
Your responsibility: Find and synthesize information.
You do NOT:
- Write content directly
- Make decisions about tone
- Execute code
You DO:
- Search for relevant information
- Extract key insights
- Cite sources
- Provide comprehensive overviews
"""
2. Structured Communication
# Use structured message formats
@dataclass
class StructuredMessage:
intent: str # ask, inform, delegate, escalate
content: Any
context: dict
expected_response: str = None
priority: str = "normal" # low, normal, high, critical
3. Graceful Degradation
# Design for partial failures
async def process_with_fallback(agents: list[BaseAgent], task: dict):
"""Try agents in order until one succeeds"""
errors = []
for agent in agents:
try:
return await agent.process(task)
except Exception as e:
errors.append({"agent": agent.id, "error": str(e)})
continue
# All failed - return partial results
return {
"status": "partial_failure",
"errors": errors,
"attempted_agents": [a.id for a in agents]
}
Conclusion
Multi-agent AI systems represent the next frontier in AI development. Key takeaways:
- Frameworks: AutoGen, CrewAI, and custom implementations offer different trade-offs
- Architecture: Choose hierarchical, collaborative, or pipeline based on use case
- Communication: Use request-response, pub-sub, or blackboard patterns
- Resilience: Implement retry logic, fallbacks, and monitoring
- Memory: Shared knowledge bases enable effective collaboration
Start with simple two-agent systems and expand as you understand the coordination patterns needed for your specific use case.
Comments