Skip to main content
โšก Calmops

Multi-Agent AI Systems: Complete Guide to Building Collaborative AI Agent Networks

Introduction

Single AI agents are powerful, but multi-agent systems unlock exponentially greater capabilities through collaboration, specialization, and emergent behavior. This comprehensive guide covers everything from fundamental multi-agent concepts to production implementation patterns.

Understanding Multi-Agent AI Systems

What are Multi-Agent AI Systems?

Multi-agent AI systems consist of multiple AI agents that work together to solve complex problems. Each agent has specific roles, capabilities, and can communicate with other agents to share information and coordinate actions.

graph TB
    subgraph "Multi-Agent System"
        User[User Request]
        
        subgraph "Agent Network"
            PM[Project Manager Agent]
            Dev[Developer Agent]
            QA[QA Tester Agent]
            Docs[Documentation Agent]
        end
        
        DB[(Shared Knowledge)]
        
        User --> PM
        PM --> Dev
        PM --> QA
        PM --> Docs
        
        Dev <--> DB
        QA <--> DB
        Docs <--> DB
        
        Dev --> PM
        QA --> PM
        Docs --> PM
    end

Why Multi-Agent Systems?

Aspect Single Agent Multi-Agent
Complexity Limited to single capability Handles diverse tasks
Expertise Generalist Specialists working together
Scalability Hard to scale Add agents for capacity
Reliability Single point of failure Fault tolerance
Emergence Fixed behavior Adaptive collaboration

Key Multi-Agent Concepts

Concept Description
Agent Role Specific function or expertise (e.g., researcher, coder, reviewer)
Agent Communication Message passing between agents
Shared Memory Common knowledge base all agents can access
Orchestration Coordinating agent workflows
Tool Sharing Agents can use shared tools and resources

Multi-Agent Architectures

Architecture 1: Hierarchical

# Hierarchical Multi-Agent Architecture

class ProjectManagerAgent:
    """Top-level agent that coordinates sub-agents"""
    
    def __init__(self):
        self.sub_agents = {
            'researcher': ResearcherAgent(),
            'developer': DeveloperAgent(),
            'reviewer': ReviewerAgent(),
            'writer': DocumentationAgent()
        }
    
    async def process_request(self, user_request: str) -> dict:
        # Analyze request and decompose into tasks
        tasks = await self.decompose_request(user_request)
        
        # Assign tasks to specialized agents
        results = await self.execute_tasks_parallel(tasks)
        
        # Synthesize results
        return await self.synthesize_results(results)
    
    async def decompose_request(self, request: str) -> list[dict]:
        """Break down request into executable tasks"""
        
        # Use LLM to decompose
        decomposition_prompt = f"""
        Decompose this request into discrete tasks:
        {request}
        
        Return a list of tasks with:
        - task_id: unique identifier
        - description: what to do
        - required_role: which agent should handle it
        - dependencies: which tasks must complete first
        """
        
        tasks = await self.llm.complete(decomposition_prompt)
        return json.loads(tasks)
    
    async def execute_tasks_parallel(self, tasks: list[dict]) -> dict:
        """Execute tasks with appropriate agents"""
        
        # Identify independent tasks
        ready_tasks = [t for t in tasks if self._can_execute(t, tasks)]
        
        # Execute ready tasks in parallel
        coroutines = []
        for task in ready_tasks:
            agent = self.sub_agents[task['required_role']]
            coroutines.append(agent.execute(task))
        
        results = await asyncio.gather(*coroutines)
        
        return {t['task_id']: r for t, r in zip(ready_tasks, results)}

Architecture 2: Collaborative/Chat-Based

# Collaborative Multi-Agent Chat Architecture

class ChatBasedMultiAgent:
    """Agents collaborate through a shared chat"""
    
    def __init__(self, agents: list[BaseAgent]):
        self.agents = agents
        self.message_queue = asyncio.Queue()
        self.shared_context = SharedContext()
    
    async def run_discussion(self, topic: str, max_turns: int = 10):
        """Run agent discussion on a topic"""
        
        # Initialize discussion
        await self.broadcast(Message(
            type=MessageType.SYSTEM,
            content=f"Discussion topic: {topic}"
        ))
        
        # Run discussion rounds
        for turn in range(max_turns):
            for agent in self.agents:
                # Agent considers current context
                response = await agent.respond(
                    topic=topic,
                    context=self.shared_context.get_context(agent.id),
                    message_history=self.shared_context.get_history()
                )
                
                # Broadcast response
                await self.broadcast(Message(
                    type=MessageType.AGENT,
                    sender=agent.id,
                    content=response
                ))
                
                # Update shared context
                self.shared_context.add_message(agent.id, response)
                
                # Check for consensus
                if self._check_consensus():
                    return self.shared_context.get_consensus()
        
        return self.shared_context.get_final_summary()
    
    async def broadcast(self, message: Message):
        """Send message to all agents"""
        for agent in self.agents:
            await agent.receive_message(message)

Architecture 3: Sequential Pipeline

# Sequential Pipeline Architecture

class PipelineMultiAgent:
    """Agents process data in a pipeline"""
    
    def __init__(self, pipeline: list[tuple[str, BaseAgent]]):
        """
        Args:
            pipeline: List of (stage_name, agent) tuples
        """
        self.pipeline = pipeline
    
    async def process(self, input_data: Any) -> Any:
        """Process input through pipeline"""
        
        current_data = input_data
        
        for stage_name, agent in self.pipeline:
            # Log pipeline stage
            logger.info(f"Pipeline stage: {stage_name}")
            
            # Process with agent
            current_data = await agent.process(current_data)
            
            # Validate output
            if not self._validate_output(current_data):
                raise PipelineError(
                    f"Agent {stage_name} produced invalid output"
                )
        
        return current_data

# Example: Content Creation Pipeline
content_pipeline = PipelineMultiAgent([
    ('research', ResearcherAgent()),
    ('outline', OutlinerAgent()),
    ('write', WriterAgent()),
    ('edit', EditorAgent()),
    ('format', FormatterAgent()),
])

# Run pipeline
result = await content_pipeline.process("Write about quantum computing")

AutoGen

Microsoft’s AutoGen enables multi-agent conversation:

#!/usr/bin/env python3
"""AutoGen Multi-Agent Example"""

from autogen import ConversableAgent, AssistantAgent, UserProxyAgent
import os

# Configure LLM
llm_config = {
    "model": "gpt-4",
    "api_key": os.environ["OPENAI_API_KEY"],
}

# Create assistant agent (expert developer)
coder = AssistantAgent(
    name="coder",
    llm_config=llm_config,
    system_message="""You are an expert Python developer.
    Write clean, well-documented code.
    Always consider error handling and edge cases."""
)

# Create assistant agent (code reviewer)
reviewer = AssistantAgent(
    name="reviewer",
    llm_config=llm_config,
    system_message="""You are a senior code reviewer.
    Review code for:
    - Security vulnerabilities
    - Performance issues
    - Code quality
    - Test coverage
    
    Provide constructive feedback."""
)

# Create user proxy (human in the loop)
user_proxy = UserProxyAgent(
    name="user",
    human_input_mode="TERMINATE",
    max_consecutive_auto_reply=10,
)

# Define conversation flow
@user_proxy.register_for_execution()
@coder.register_for_llm(description="Write Python code for a given task")
def write_code(task: str) -> str:
    """Generate Python code for the given task."""
    return f"# Code for: {task}\n\n# Implementation here..."

@user_proxy.register_for_execution()
@reviewer.register_for_llm(description="Review Python code")
def review_code(code: str) -> str:
    """Review the provided code."""
    return f"# Review of:\n{code}\n\n# Feedback here..."

# Start conversation
async def main():
    # Initiate chat
    await user_proxy.a_initiate_chat(
        coder,
        message="""Create a Python function that:
        1. Fetches data from an API
        2. Processes the data
        3. Saves to database
        
        Include error handling and logging."""
    )
    
    # Get code from coder
    code = user_proxy.chat_messages[coder][-1]["content"]
    
    # Send to reviewer
    await user_proxy.a_initiate_chat(
        reviewer,
        message=f"Review this code:\n{code}"
    )

if __name__ == "__main__":
    asyncio.run(main())

CrewAI

CrewAI provides role-based multi-agent orchestration:

#!/usr/bin/env python3
"""CrewAI Multi-Agent Example"""

from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI

# Define agents
researcher = Agent(
    role="Research Analyst",
    goal="Find comprehensive information on the topic",
    backstory="""You are an expert research analyst with 
    years of experience in gathering and synthesizing information.""",
    verbose=True,
    allow_delegation=False,
)

writer = Agent(
    role="Content Writer",
    goal="Create engaging, well-structured content",
    backstory="""You are a skilled content writer known for
    creating compelling narratives that resonate with readers.""",
    verbose=True,
    allow_delegation=False,
)

editor = Agent(
    role="Senior Editor",
    goal="Ensure content quality and accuracy",
    backstory="""You are a senior editor with sharp attention
    to detail and high standards for quality.""",
    verbose=True,
    allow_delegation=True,  # Can delegate to writer
)

# Define tasks
research_task = Task(
    description="Research the latest trends in quantum computing",
    agent=researcher,
    expected_output="Comprehensive research report with sources",
)

write_task = Task(
    description="Write an article based on the research",
    agent=writer,
    expected_output="Well-structured article (2000-3000 words)",
    context=[research_task],  # Depends on research
)

edit_task = Task(
    description="Review and edit the article for publication",
    agent=editor,
    expected_output="Final polished article ready for publication",
    context=[write_task],  # Depends on writing
)

# Create crew with hierarchical process
crew = Crew(
    agents=[researcher, writer, editor],
    tasks=[research_task, write_task, edit_task],
    process=Process.hierarchical,  # Manager coordinates
    manager_agent=editor,  # Editor manages the workflow
    verbose=True,
)

# Execute
result = crew.kickoff(
    inputs={"topic": "Quantum Computing in 2026"}
)

print(f"Crew Result: {result}")

Custom Implementation

#!/usr/bin/env python3
"""Custom Multi-Agent Framework"""

import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum
import json
import uuid

class MessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    BROADCAST = "broadcast"
    SYSTEM = "system"

@dataclass
class Message:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    type: MessageType = MessageType.REQUEST
    sender: str = ""
    receiver: str = ""
    content: Any = None
    metadata: dict = field(default_factory=dict)

class BaseAgent(ABC):
    """Base class for all agents"""
    
    def __init__(self, agent_id: str, role: str, description: str):
        self.id = agent_id
        self.role = role
        self.description = description
        self.inbox = asyncio.Queue()
        self.tools = {}
        self.memory = []
    
    @abstractmethod
    async def process(self, message: Message) -> Message:
        """Process incoming message"""
        pass
    
    async def receive_message(self, message: Message):
        """Receive and queue message"""
        await self.inbox.put(message)
    
    async def run(self):
        """Main agent loop"""
        while True:
            message = await self.inbox.get()
            if message.type == MessageType.SYSTEM and message.content == "STOP":
                break
            
            response = await self.process(message)
            if response:
                await self.send_message(response)
    
    async def send_message(self, message: Message):
        """Send message to another agent or broadcast"""
        message.sender = self.id
        await self.messenger.send(message)
    
    def add_tool(self, name: str, func: Callable):
        """Register a tool for this agent"""
        self.tools[name] = func
    
    def remember(self, item: Any):
        """Store in agent memory"""
        self.memory.append({
            "content": item,
            "timestamp": asyncio.get_event_loop().time()
        })

class SpecializedAgent(BaseAgent):
    """Agent with specialized capabilities"""
    
    def __init__(self, agent_id: str, role: str, description: str, system_prompt: str):
        super().__init__(agent_id, role, description)
        self.system_prompt = system_prompt
        self.llm = None  # Initialize with your LLM
    
    async def process(self, message: Message) -> Message:
        """Process using LLM and tools"""
        
        # Build context from memory
        context = "\n".join([
            f"[{m['timestamp']}]: {m['content']}"
            for m in self.memory[-10:]
        ])
        
        # Construct prompt
        prompt = f"""
        Role: {self.role}
        Description: {self.description}
        
        Context:
        {context}
        
        Current Request:
        {message.content}
        
        Available Tools:
        {', '.join(self.tools.keys())}
        
        Respond with your action and any tool calls.
        """
        
        # Get LLM response
        llm_response = await self.llm.complete(prompt)
        
        # Parse and execute tool if needed
        response_content = llm_response
        if self._needs_tool(llm_response):
            tool_name, tool_args = self._parse_tool_call(llm_response)
            tool_result = await self.tools[tool_name](**tool_args)
            response_content = f"{llm_response}\n\nTool Result: {tool_result}"
        
        # Remember interaction
        self.remember(f"Request: {message.content}\nResponse: {response_content}")
        
        return Message(
            type=MessageType.RESPONSE,
            receiver=message.sender,
            content=response_content
        )

class AgentMessenger:
    """Message broker between agents"""
    
    def __init__(self):
        self.agents: dict[str, BaseAgent] = {}
        self.message_log: list[Message] = []
    
    def register_agent(self, agent: BaseAgent):
        """Register agent in network"""
        agent.messenger = self
        self.agents[agent.id] = agent
    
    async def send(self, message: Message):
        """Route message to recipient"""
        self.message_log.append(message)
        
        if message.receiver:
            # Direct message
            await self.agents[message.receiver].receive_message(message)
        else:
            # Broadcast
            for agent in self.agents.values():
                if agent.id != message.sender:
                    await agent.receive_message(message)

class MultiAgentSystem:
    """Complete multi-agent orchestration system"""
    
    def __init__(self):
        self.messenger = AgentMessenger()
        self.agents: dict[str, BaseAgent] = {}
    
    def add_agent(self, agent: BaseAgent):
        """Add agent to system"""
        self.agents[agent.id] = agent
        self.messenger.register_agent(agent)
    
    def create_agent(
        self,
        role: str,
        description: str,
        system_prompt: str,
        tools: dict[str, Callable] = None
    ) -> SpecializedAgent:
        """Factory method to create specialized agent"""
        
        agent = SpecializedAgent(
            agent_id=f"{role.lower().replace(' ', '_')}_{len(self.agents)}",
            role=role,
            description=description,
            system_prompt=system_prompt
        )
        
        if tools:
            for name, func in tools.items():
                agent.add_tool(name, func)
        
        self.add_agent(agent)
        return agent
    
    async def run(self, initial_message: str, timeout: int = 60) -> Any:
        """Run multi-agent system with initial message"""
        
        # Start all agents
        tasks = [
            asyncio.create_task(agent.run())
            for agent in self.agents.values()
        ]
        
        # Send initial message to all agents
        broadcast = Message(
            type=MessageType.BROADCAST,
            content=initial_message
        )
        await self.messenger.send(broadcast)
        
        # Wait for completion or timeout
        try:
            await asyncio.wait_for(
                self._wait_for_completion(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            pass
        
        # Stop all agents
        stop_msg = Message(type=MessageType.SYSTEM, content="STOP")
        for agent in self.agents.values():
            await agent.receive_message(stop_msg)
        
        # Wait for graceful shutdown
        await asyncio.gather(*tasks, return_exceptions=True)
        
        # Return aggregated results
        return self._aggregate_results()
    
    async def _wait_for_completion(self):
        """Wait for agents to finish processing"""
        # Check if all agents have processed the message
        while True:
            await asyncio.sleep(1)
            # Add completion logic based on your requirements
            if len(self.messenger.message_log) > 10:
                break
    
    def _aggregate_results(self) -> dict:
        """Aggregate results from all agents"""
        return {
            agent.id: {
                "role": agent.role,
                "memory": agent.memory,
                "message_count": len([
                    m for m in self.messenger.message_log
                    if m.sender == agent.id
                ])
            }
            for agent in self.agents.values()
        }

# Example: Research and Writing Team
async def main():
    system = MultiAgentSystem()
    
    # Create research agent
    researcher = system.create_agent(
        role="Researcher",
        description="Expert at gathering and analyzing information",
        system_prompt="You research topics thoroughly and provide comprehensive information.",
        tools={
            "search_web": lambda query: f"Search results for: {query}",
            "read_file": lambda path: f"Content of: {path}",
        }
    )
    
    # Create writer agent
    writer = system.create_agent(
        role="Writer",
        description="Skilled content creator",
        system_prompt="You create clear, engaging content from research.",
    )
    
    # Create editor agent
    editor = system.create_agent(
        role="Editor",
        description="Quality assurance specialist",
        system_prompt="You ensure content is accurate, well-structured, and error-free.",
    )
    
    # Run system
    results = await system.run(
        "Write a comprehensive guide to machine learning"
    )
    
    print(json.dumps(results, indent=2))

if __name__ == "__main__":
    asyncio.run(main())

Agent Communication Patterns

1. Request-Response

# Direct request-response between agents

class RequestResponsePattern:
    """Simple request-response pattern"""
    
    async def request(self, from_agent: str, to_agent: str, request: str) -> str:
        """Send request and wait for response"""
        
        message = Message(
            type=MessageType.REQUEST,
            sender=from_agent,
            receiver=to_agent,
            content=request
        )
        
        # Send and wait for response
        response_queue = asyncio.Queue()
        
        # Register one-time listener
        def listener(msg):
            if msg.sender == to_agent:
                response_queue.put_nowait(msg)
        
        self.messenger.add_listener(from_agent, listener)
        
        await self.messenger.send(message)
        
        # Wait for response with timeout
        try:
            response = await asyncio.wait_for(
                response_queue.get(),
                timeout=30
            )
            return response.content
        except asyncio.TimeoutError:
            return "Request timed out"

2. Publish-Subscribe

# Pub-Sub pattern for event-driven communication

class PubSubMessenger(AgentMessenger):
    """Enhanced messenger with pub-sub"""
    
    def __init__(self):
        super().__init__()
        self.subscriptions: dict[str, set[str]] = {}  # topic -> set of agent_ids
    
    def subscribe(self, agent_id: str, topic: str):
        """Subscribe agent to topic"""
        if topic not in self.subscriptions:
            self.subscriptions[topic] = set()
        self.subscriptions[topic].add(agent_id)
    
    def unsubscribe(self, agent_id: str, topic: str):
        """Unsubscribe from topic"""
        if topic in self.subscriptions:
            self.subscriptions[topic].discard(agent_id)
    
    async def publish(self, topic: str, content: Any, sender: str):
        """Publish message to topic subscribers"""
        
        message = Message(
            type=MessageType.BROADCAST,
            sender=sender,
            content=content,
            metadata={"topic": topic}
        )
        
        # Send to all subscribers
        for agent_id in self.subscriptions.get(topic, []):
            if agent_id != sender:
                await self.agents[agent_id].receive_message(message)

# Usage
async def event_driven_example():
    messenger = PubSubMessenger()
    
    # Subscribe agents to topics
    messenger.subscribe("data_agent", "new_data")
    messenger.subscribe("analysis_agent", "new_data")
    messenger.subscribe("alert_agent", "anomaly_detected")
    messenger.subscribe("dashboard_agent", "anomaly_detected")
    
    # Publish events
    await messenger.publish("new_data", {"records": 1000}, "ingestion_agent")
    await messenger.publish("anomaly_detected", {"score": 0.95}, "analysis_agent")

3. Blackboard System

# Blackboard pattern - shared knowledge base

class Blackboard:
    """Shared knowledge base for agents"""
    
    def __init__(self):
        self.data: dict[str, Any] = {}
        self.metadata: dict[str, dict] = {}
        self.lock = asyncio.Lock()
    
    async def write(self, key: str, value: Any, agent_id: str, description: str = ""):
        """Write to blackboard"""
        async with self.lock:
            self.data[key] = value
            self.metadata[key] = {
                "author": agent_id,
                "description": description,
                "timestamp": asyncio.get_event_loop().time()
            }
    
    async def read(self, key: str) -> Any:
        """Read from blackboard"""
        async with self.lock:
            return self.data.get(key)
    
    async def query(self, pattern: str) -> dict[str, Any]:
        """Query blackboard with pattern"""
        async with self.lock:
            return {
                k: v for k, v in self.data.items()
                if pattern.lower() in k.lower()
            }

class BlackboardAgent(BaseAgent):
    """Agent that interacts via blackboard"""
    
    def __init__(self, *args, blackboard: Blackboard, **kwargs):
        super().__init__(*args, **kwargs)
        self.blackboard = blackboard
    
    async def post_findings(self, key: str, findings: Any):
        """Post findings to blackboard"""
        await self.blackboard.write(
            key, findings, self.id,
            f"{self.role} findings"
        )
    
    async def read_findings(self, key: str) -> Any:
        """Read findings from blackboard"""
        return await self.blackboard.read(key)

Tool and Resource Sharing

Shared Tool Registry

# Centralized tool registry

class ToolRegistry:
    """Registry for shared tools across agents"""
    
    def __init__(self):
        self.tools: dict[str, Callable] = {}
        self.tool_metadata: dict[str, dict] = {}
        self.agent_permissions: dict[str, set[str]] = {}
    
    def register_tool(
        self,
        name: str,
        func: Callable,
        description: str = "",
        required_permissions: list[str] = None
    ):
        """Register a tool"""
        self.tools[name] = func
        self.tool_metadata[name] = {
            "description": description,
            "permissions": required_permissions or [],
            "registered_by": None
        }
    
    def grant_access(self, agent_id: str, tool_names: list[str]):
        """Grant agent access to tools"""
        if agent_id not in self.agent_permissions:
            self.agent_permissions[agent_id] = set()
        self.agent_permissions[agent_id].update(tool_names)
    
    def has_access(self, agent_id: str, tool_name: str) -> bool:
        """Check if agent has tool access"""
        # Admin always has access
        if agent_id.startswith("admin"):
            return True
        
        return tool_name in self.agent_permissions.get(agent_id, set())
    
    async def execute(self, agent_id: str, tool_name: str, **kwargs) -> Any:
        """Execute tool if agent has permission"""
        if not self.has_access(agent_id, tool_name):
            raise PermissionError(f"Agent {agent_id} cannot access {tool_name}")
        
        tool = self.tools.get(tool_name)
        if not tool:
            raise ValueError(f"Tool {tool_name} not found")
        
        return await tool(**kwargs)

# Global registry
tool_registry = ToolRegistry()

# Register shared tools
tool_registry.register_tool(
    "search_web",
    lambda query: f"Search: {query}",
    "Search the web for information"
)

tool_registry.register_tool(
    "execute_code",
    lambda code: f"Executed: {code[:50]}...",
    "Execute Python code",
    required_permissions=["code_execution"]
)

tool_registry.register_tool(
    "read_file",
    lambda path: f"Content: {path}",
    "Read file from filesystem",
    required_permissions=["file_read"]
)

Resource Pools

# Shared resource pools

class ResourcePool:
    """Pool of reusable resources"""
    
    def __init__(self, factory: Callable, max_size: int = 10):
        self.factory = factory
        self.max_size = max_size
        self.available: asyncio.Queue = asyncio.Queue()
        self.in_use: set = set()
    
    async def acquire(self) -> Any:
        """Acquire resource from pool"""
        # Try to get from available
        try:
            resource = self.available.get_nowait()
            self.in_use.add(id(resource))
            return resource
        except asyncio.QueueEmpty:
            # Create new if under limit
            if len(self.in_use) < self.max_size:
                resource = await self.factory()
                self.in_use.add(id(resource))
                return resource
            else:
                # Wait for availability
                resource = await self.available.get()
                self.in_use.add(id(resource))
                return resource
    
    def release(self, resource: Any):
        """Release resource back to pool"""
        resource_id = id(resource)
        if resource_id in self.in_use:
            self.in_use.discard(resource_id)
            self.available.put_nowait(resource)

# Example: LLM pool
llm_pool = ResourcePool(
    factory=lambda: ChatOpenAI(model="gpt-4"),
    max_size=5
)

# Usage in agents
async def use_llm():
    llm = await llm_pool.acquire()
    try:
        result = await llm.agenerate([...])
    finally:
        llm_pool.release(llm)

Coordination and Orchestration

Task Planning

# Multi-agent task planning

class TaskPlanner:
    """Plan and coordinate tasks across agents"""
    
    def __init__(self, agents: list[BaseAgent]):
        self.agents = {a.id: a for a in agents}
        self.tasks: dict[str, dict] = {}
        self.task_id_counter = 0
    
    def create_plan(self, goal: str, constraints: dict = None) -> list[dict]:
        """Create execution plan for goal"""
        
        # Decompose goal into tasks
        tasks = self._decompose_goal(goal)
        
        # Assign tasks to agents
        plan = []
        for task in tasks:
            assigned_agent = self._select_agent(task)
            plan.append({
                "task_id": f"task_{self.task_id_counter}",
                "description": task,
                "assigned_to": assigned_agent.id,
                "dependencies": self._get_dependencies(task, plan)
            })
            self.task_id_counter += 1
        
        return plan
    
    def _decompose_goal(self, goal: str) -> list[str]:
        """Decompose goal into subtasks"""
        # Use LLM to decompose
        prompt = f"""
        Decompose this goal into 4-8 discrete subtasks:
        Goal: {goal}
        
        Return as a JSON array of task descriptions.
        """
        
        # Simplified - in production use LLM
        return [
            f"Research: {goal}",
            f"Plan: {goal}",
            f"Execute: {goal}",
            f"Review: {goal}"
        ]
    
    def _select_agent(self, task: str) -> BaseAgent:
        """Select best agent for task"""
        # Match task requirements to agent capabilities
        for agent in self.agents.values():
            if any(keyword in task.lower() for keyword in agent.role.lower().split()):
                return agent
        return list(self.agents.values())[0]
    
    def _get_dependencies(self, task: str, plan: list[dict]) -> list[str]:
        """Identify task dependencies"""
        # Simple: previous tasks
        return [p["task_id"] for p in plan]
    
    async def execute_plan(self, plan: list[dict]) -> dict[str, Any]:
        """Execute plan respecting dependencies"""
        
        results = {}
        completed = set()
        
        while len(completed) < len(plan):
            # Find ready tasks
            ready_tasks = [
                t for t in plan
                if t["task_id"] not in completed
                and all(dep in completed for dep in t["dependencies"])
            ]
            
            if not ready_tasks:
                break
            
            # Execute ready tasks
            coroutines = []
            for task in ready_tasks:
                agent = self.agents[task["assigned_to"]]
                coroutines.append(agent.process(Message(
                    content=task["description"]
                )))
            
            task_results = await asyncio.gather(*coroutines)
            
            for task, result in zip(ready_tasks, task_results):
                results[task["task_id"]] = result
                completed.add(task["task_id"])
        
        return results

Dynamic Task Assignment

# Dynamic load balancing between agents

class DynamicTaskQueue:
    """Dynamically assign tasks to available agents"""
    
    def __init__(self, agents: list[BaseAgent]):
        self.agents = {a.id: a for a in agents}
        self.task_queues: dict[str, asyncio.Queue] = {
            a.id: asyncio.Queue() for a in agents
        }
        self.agent_loads: dict[str, int] = {a.id: 0 for a in agents}
    
    async def submit_task(self, task: Any) -> str:
        """Submit task, auto-assign to least loaded agent"""
        
        # Find agent with least load
        min_load_agent = min(
            self.agent_loads.items(),
            key=lambda x: x[1]
        )[0]
        
        # Assign task
        await self.task_queues[min_load_agent].put(task)
        self.agent_loads[min_load_agent] += 1
        
        return min_load_agent
    
    async def process_all(self):
        """Process all queued tasks"""
        
        async def process_agent(agent_id: str, queue: asyncio.Queue):
            while not queue.empty():
                task = await queue.get()
                agent = self.agents[agent_id]
                
                try:
                    await agent.process(task)
                finally:
                    self.agent_loads[agent_id] -= 1
        
        # Process all agents concurrently
        await asyncio.gather(*[
            process_agent(aid, q)
            for aid, q in self.task_queues.items()
        ])

Memory Management

Shared Knowledge Base

# Shared memory for multi-agent systems

import vectorstore
from datetime import datetime, timedelta

class SharedKnowledgeBase:
    """Shared memory with vector search"""
    
    def __init__(self):
        self.vector_store = vectorstore.InMemoryVectorStore()
        self.metadata_store = {}
    
    async def add(
        self,
        content: str,
        agent_id: str,
        tags: list[str] = None,
        metadata: dict = None
    ):
        """Add to knowledge base"""
        
        entry_id = f"kb_{len(self.metadata_store)}"
        
        # Store in vector DB
        await self.vector_store.add(
            id=entry_id,
            text=content,
            metadata={
                "agent_id": agent_id,
                "tags": tags or [],
                **(metadata or {})
            }
        )
        
        # Store metadata
        self.metadata_store[entry_id] = {
            "content": content,
            "agent_id": agent_id,
            "tags": tags or [],
            "created_at": datetime.now(),
            "access_count": 0
        }
    
    async def search(
        self,
        query: str,
        agent_id: str = None,
        tags: list[str] = None,
        top_k: int = 5
    ) -> list[dict]:
        """Search knowledge base"""
        
        results = await self.vector_store.search(
            query=query,
            top_k=top_k
        )
        
        # Filter by agent and tags
        filtered = []
        for r in results:
            entry = self.metadata_store.get(r["id"])
            if not entry:
                continue
            
            # Access control
            if agent_id and entry["agent_id"] != agent_id:
                # Check if shared
                if not metadata_store.get(r["id"], {}).get("shared", False):
                    continue
            
            # Tag filter
            if tags and not any(t in entry.get("tags", []) for t in tags):
                continue
            
            entry["access_count"] += 1
            filtered.append(entry)
        
        return filtered[:top_k]
    
    async def get_recent(self, agent_id: str = None, minutes: int = 60) -> list[dict]:
        """Get recent entries"""
        
        cutoff = datetime.now() - timedelta(minutes=minutes)
        
        return [
            e for e in self.metadata_store.values()
            if e["created_at"] > cutoff
            and (agent_id is None or e["agent_id"] == agent_id)
        ]

# Global knowledge base
knowledge_base = SharedKnowledgeBase()

Error Handling and Resilience

Fault Tolerance

# Error handling in multi-agent systems

class ResilientMultiAgentSystem:
    """Multi-agent system with error handling"""
    
    def __init__(self, agents: list[BaseAgent]):
        self.agents = {a.id: a for a in agents}
        self.retry_policy = RetryPolicy(max_retries=3, backoff_factor=2)
        self.fallback_agents: dict[str, str] = {}  # agent_id -> fallback_id
    
    async def execute_with_retry(
        self,
        agent_id: str,
        message: Message,
        retry_count: int = 0
    ) -> Message:
        """Execute with retry logic"""
        
        agent = self.agents[agent_id]
        
        try:
            return await asyncio.wait_for(
                agent.process(message),
                timeout=30
            )
        except asyncio.TimeoutError:
            if retry_count < self.retry_policy.max_retries:
                await asyncio.sleep(
                    self.retry_policy.backoff_factor ** retry_count
                )
                return await self.execute_with_retry(
                    agent_id, message, retry_count + 1
                )
            # Fallback
            return await self.execute_with_fallback(agent.id, message)
        
        except Exception as e:
            logger.error(f"Agent {agent_id} error: {e}")
            if retry_count < self.retry_policy.max_retries:
                return await self.execute_with_retry(
                    agent_id, message, retry_count + 1
                )
            return await self.execute_with_fallback(agent.id, message)
    
    async def execute_with_fallback(self, failed_agent_id: str, message: Message) -> Message:
        """Execute with fallback agent"""
        
        fallback_id = self.fallback_agents.get(failed_agent_id)
        
        if fallback_id and fallback_id in self.agents:
            logger.info(f"Falling back from {failed_agent_id} to {fallback_id}")
            return await self.execute_with_retry(fallback_id, message)
        
        # No fallback - return error
        return Message(
            type=MessageType.RESPONSE,
            content=f"Failed to process after retries. Agent: {failed_agent_id}",
            metadata={"error": "max_retries_exceeded"}
        )
    
    def set_fallback(self, agent_id: str, fallback_id: str):
        """Set fallback agent"""
        self.fallback_agents[agent_id] = fallback_id

class RetryPolicy:
    def __init__(self, max_retries: int = 3, backoff_factor: float = 2):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor

Monitoring and Observability

Agent Metrics

# Multi-agent monitoring and metrics

from prometheus_client import Counter, Histogram, Gauge

# Metrics
agent_requests = Counter(
    'agent_requests_total',
    'Total requests to agent',
    ['agent_id', 'status']
)

agent_duration = Histogram(
    'agent_duration_seconds',
    'Agent processing duration',
    ['agent_id']
)

agent_queue_size = Gauge(
    'agent_queue_size',
    'Current queue size',
    ['agent_id']
)

tool_usage = Counter(
    'tool_usage_total',
    'Tool usage count',
    ['agent_id', 'tool_name']
)

class MonitoredAgent(BaseAgent):
    """Agent with metrics collection"""
    
    async def process(self, message: Message) -> Message:
        start_time = time.time()
        
        try:
            result = await super().process(message)
            agent_requests.labels(agent_id=self.id, status="success").inc()
            return result
        except Exception as e:
            agent_requests.labels(agent_id=self.id, status="error").inc()
            raise
        finally:
            duration = time.time() - start_time
            agent_duration.labels(agent_id=self.id).observe(duration)
    
    async def receive_message(self, message: Message):
        agent_queue_size.labels(agent_id=self.id).inc()
        try:
            await super().receive_message(message)
        finally:
            agent_queue_size.labels(agent_id=self.id).dec()

Best Practices

1. Clear Agent Roles

# Define clear, focused roles

RESEARCHER_ROLE = """
You are a Research Agent.
Your responsibility: Find and synthesize information.

You do NOT:
- Write content directly
- Make decisions about tone
- Execute code

You DO:
- Search for relevant information
- Extract key insights
- Cite sources
- Provide comprehensive overviews
"""

2. Structured Communication

# Use structured message formats

@dataclass
class StructuredMessage:
    intent: str  # ask, inform, delegate, escalate
    content: Any
    context: dict
    expected_response: str = None
    priority: str = "normal"  # low, normal, high, critical

3. Graceful Degradation

# Design for partial failures

async def process_with_fallback(agents: list[BaseAgent], task: dict):
    """Try agents in order until one succeeds"""
    
    errors = []
    for agent in agents:
        try:
            return await agent.process(task)
        except Exception as e:
            errors.append({"agent": agent.id, "error": str(e)})
            continue
    
    # All failed - return partial results
    return {
        "status": "partial_failure",
        "errors": errors,
        "attempted_agents": [a.id for a in agents]
    }

Conclusion

Multi-agent AI systems represent the next frontier in AI development. Key takeaways:

  • Frameworks: AutoGen, CrewAI, and custom implementations offer different trade-offs
  • Architecture: Choose hierarchical, collaborative, or pipeline based on use case
  • Communication: Use request-response, pub-sub, or blackboard patterns
  • Resilience: Implement retry logic, fallbacks, and monitoring
  • Memory: Shared knowledge bases enable effective collaboration

Start with simple two-agent systems and expand as you understand the coordination patterns needed for your specific use case.


External Resources

Comments