Skip to main content

A2A Protocol: Agent-to-Agent Communication Complete Guide 2026

Created: March 15, 2026 Larry Qu 8 min read

Introduction

As AI agents become more capable, a new challenge emerges: how do agents from different companies, built on different platforms, communicate and collaborate? The A2A (Agent-to-Agent) protocol addresses this exact problem. Proposed by Google and now under Linux Foundation governance, A2A enables different AI agents to work together seamlessly—sharing tasks, exchanging information, and coordinating complex workflows.

In 2026, A2A has emerged as a critical standard for building agentic AI systems at scale. This guide covers the A2A protocol architecture, implementation, and how to build multi-agent systems that leverage this powerful standard.

Understanding A2A Protocol

What is A2A?

A2A is a protocol that enables agents to:

  • Discover each other’s capabilities
  • Negotiate task execution
  • Exchange context and results
  • Collaborate on complex workflows

Think of A2A as HTTP for agents—just as web servers use HTTP to communicate, AI agents use A2A.

Why A2A Matters

## Before A2A
- Agents were isolated
- No standard communication
- Limited collaboration
- Vendor lock-in

## After A2A
- Agents can discover each other
- Standard communication protocol
- Seamless collaboration
- Interoperability across platforms

A2A Protocol Architecture

Core Concepts

class A2AAgent:
    """Base A2A-compatible agent."""
    
    def __init__(self, agent_id, capabilities):
        self.agent_id = agent_id
        self.capabilities = capabilities
        self.endpoints = {}
        self.message_queue = []
    
    def advertise_capabilities(self):
        """Announce what this agent can do."""
        return {
            "agent_id": self.agent_id,
            "name": self.name,
            "version": "1.0",
            "capabilities": self.capabilities,
            "endpoint": self.endpoint,
            "skills": self.skills
        }

Agent Card

{
  "agent_id": "research-agent-001",
  "name": "Research Assistant",
  "description": "Advanced research and analysis agent",
  "url": "https://api.example.com/agents/research-agent",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransition": true
  },
  "skills": [
    {
      "id": "web-search",
      "name": "Web Search",
      "description": "Search the web for information",
      "tags": ["search", "research", "web"]
    },
    {
      "id": "document-analysis",
      "name": "Document Analysis",
      "description": "Analyze and summarize documents",
      "tags": ["analysis", "documents", "summary"]
    }
  ],
  "authentication": {
    "schemes": ["Bearer", "APIKey"]
  }
}

Message Format

{
  "jsonrpc": "2.0",
  "id": "msg-123",
  "method": "tasks/send",
  "params": {
    "sessionId": "session-456",
    "message": {
      "role": "user",
      "parts": [
        {
          "type": "text",
          "text": "Research the latest developments in quantum computing"
        },
        {
          "type": "file",
          "file": {
            "name": "requirements.pdf",
            "mimeType": "application/pdf"
          }
        }
      ],
      "metadata": {
        "priority": "high",
        "deadline": "2026-03-20T12:00:00Z"
      }
    },
    "pushNotification": {
      "endpoint": "https://client.example.com/notifications",
      "authentication": {
        "schemes": ["Bearer"]
      }
    }
  }
}

Building A2A Agents

Simple A2A Agent

from a2a.protocol import Agent, Message, Task
from a2a.transport import HTTPTransport

class ResearchAgent(Agent):
    """Research agent implementing A2A protocol."""
    
    def __init__(self):
        super().__init__(
            agent_id="research-agent",
            name="Research Assistant",
            capabilities=["web_search", "document_analysis", "summarization"]
        )
    
    async def handle_message(self, message: Message) -> Message:
        """Process incoming message."""
        user_request = message.content
        
        if "search" in user_request.lower():
            result = await self.web_search(user_request)
        elif "analyze" in user_request.lower():
            result = await self.analyze_document(user_request)
        else:
            result = await self.general_research(user_request)
        
        return Message(
            role="assistant",
            content=result,
            task_id=message.task_id
        )
    
    async def web_search(self, query):
        """Perform web search."""
        # Implementation
        return f"Search results for: {query}"
    
    async def analyze_document(self, document):
        """Analyze document."""
        # Implementation
        return f"Analysis complete"
    
    async def general_research(self, query):
        """General research."""
        # Implementation
        return f"Research on: {query}"
    
    def get_agent_card(self):
        """Return agent capabilities."""
        return {
            "agent_id": self.agent_id,
            "name": self.name,
            "capabilities": self.capabilities,
            "endpoint": self.endpoint,
            "skills": [
                {"id": "web_search", "name": "Web Search"},
                {"id": "analyze", "name": "Document Analysis"},
                {"id": "research", "name": "General Research"}
            ]
        }

A2A Server

from a2a.server import A2AServer
from a2a.transport import StarletteTransport

app = StarletteTransport()

server = A2AServer(
    agent=ResearchAgent(),
    transport=app
)

@app.get("/agent.json")
async def agent_card():
    """Serve agent card for discovery."""
    return server.agent.get_agent_card()

@app.post("/")
async def handle_message(request: Request):
    """Handle A2A messages."""
    return await server.handle(request.json())

A2A Client

from a2a.client import A2AClient

class AgentOrchestrator:
    """Orchestrate multiple agents via A2A."""
    
    def __init__(self):
        self.agents = {}
    
    async def discover_agent(self, url: str) -> dict:
        """Discover agent capabilities."""
        async with A2AClient(url) as client:
            card = await client.get_agent_card()
            self.agents[card["agent_id"]] = {
                "url": url,
                "card": card
            }
            return card
    
    async def send_task(self, agent_id: str, task: str) -> str:
        """Send task to specific agent."""
        agent = self.agents.get(agent_id)
        if not agent:
            raise ValueError(f"Agent {agent_id} not found")
        
        async with A2AClient(agent["url"]) as client:
            task_result = await client.send_task(task)
            return task_result

Multi-Agent Collaboration

Task Delegation

class MultiAgentCoordinator:
    """Coordinate tasks across multiple A2A agents."""
    
    def __init__(self):
        self.agents = {}
        self.task_graph = {}
    
    async def decompose_task(self, task: str) -> list:
        """Break task into subtasks."""
        prompt = f"""Break this task into subtasks that can be executed by different agents.
        
Task: {task}

Return a list of subtasks in order."""

        # Use LLM to decompose
        subtasks = [
            {"agent": "research", "task": "Research background"},
            {"agent": "analysis", "task": "Analyze data"},
            {"agent": "writing", "task": "Write report"}
        ]
        
        return subtasks
    
    async def execute_parallel(self, subtasks: list) -> list:
        """Execute subtasks in parallel."""
        results = await asyncio.gather(*[
            self.send_to_agent(st["agent"], st["task"])
            for st in subtasks
        ])
        return results
    
    async def execute_sequential(self, subtasks: list) -> list:
        """Execute subtasks sequentially."""
        results = []
        context = ""
        
        for subtask in subtasks:
            # Add previous results to context
            if results:
                subtask["task"] += f"\n\nPrevious results:\n{context}"
            
            result = await self.send_to_agent(subtask["agent"], subtask["task"])
            results.append(result)
            context += f"\n{result}"
        
        return results
    
    async def send_to_agent(self, agent_type: str, task: str) -> str:
        """Send task to agent of specific type."""
        agent = self._get_agent_by_type(agent_type)
        return await agent.execute(task)

Agent Discovery

class AgentDirectory:
    """Directory of available A2A agents."""
    
    def __init__(self):
        self.agents = {}
    
    async def register_agent(self, agent_card: dict):
        """Register new agent."""
        self.agents[agent_card["agent_id"]] = agent_card
    
    async def find_agents(self, capability: str) -> list:
        """Find agents with specific capability."""
        matching = []
        
        for agent_id, card in self.agents.items():
            for skill in card.get("skills", []):
                if capability.lower() in skill["name"].lower():
                    matching.append(card)
        
        return matching
    
    async def select_best_agent(self, capability: str) -> dict:
        """Select best agent for capability."""
        candidates = await self.find_agents(capability)
        
        # Select based on capability match score
        # Could also consider latency, reliability, etc.
        if not candidates:
            raise NoAgentFoundError(f"No agent found for: {capability}")
        
        return candidates[0]

A2A with MCP

Combining A2A and MCP

class HybridAgent:
    """Agent using both A2A and MCP."""
    
    def __init__(self):
        self.a2a = A2AAgentTransport()
        self.mcp = MCPClient()
    
    async def handle_message(self, message: Message):
        """Handle message with both A2A and MCP."""
        
        # Use MCP for tool calls within agent
        tools = await self.mcp.discover_tools()
        
        # Use A2A to collaborate with other agents
        if self._needs_collaboration(message):
            collaborators = await self._find_collaborators(message)
            return await self._collaborate(collaborators, message)
        
        # Process locally with MCP tools
        return await self._process_with_tools(message, tools)
    
    async def _collaborate(self, collaborators, message):
        """Collaborate with other agents via A2A."""
        results = []
        
        for agent in collaborators:
            async with A2AClient(agent["url"]) as client:
                result = await client.send_task(message.content)
                results.append(result)
        
        return self._synthesize_results(results)

Use Cases

Enterprise Workflow Automation

class EnterpriseWorkflowAgent:
    """Enterprise workflow using A2A agents."""
    
    def __init__(self):
        self.email_agent = None
        self.documents_agent = None
        self.calendar_agent = None
        self.analytics_agent = None
    
    async def setup(self):
        """Initialize all agents."""
        self.email_agent = await self.discover("https://email.example.com/agent.json")
        self.documents_agent = await self.discover("https://docs.example.com/agent.json")
        self.calendar_agent = await self.discover("https://calendar.example.com/agent.json")
        self.analytics_agent = await self.discover("https://analytics.example.com/agent.json")
    
    async def handle_meeting_request(self, request):
        """Handle complete meeting workflow."""
        
        # Step 1: Check calendar
        calendar_result = await self.send_task(
            self.calendar_agent,
            f"Find available time slots for {request.date}"
        )
        
        # Step 2: Create document
        doc_result = await self.send_task(
            self.documents_agent,
            f"Create agenda for: {request.topic}"
        )
        
        # Step 3: Send invites
        email_result = await self.send_task(
            self.email_agent,
            f"Send meeting invites to: {request.attendees}"
        )
        
        # Step 4: Schedule follow-up
        follow_up = await self.send_task(
            self.calendar_agent,
            f"Schedule follow-up meeting"
        )
        
        return {
            "calendar": calendar_result,
            "document": doc_result,
            "email": email_result,
            "follow_up": follow_up
        }

Research Assistant Network

class ResearchNetwork:
    """Network of research agents."""
    
    def __init__(self):
        self.search_agent = None
        self.analysis_agent = None
        self.writing_agent = None
    
    async def research_topic(self, topic: str) -> dict:
        """Full research workflow."""
        
        # Parallel: search multiple sources
        search_results = await asyncio.gather(
            self.search_agent.query(f"Academic: {topic}"),
            self.search_agent.query(f"Industry: {topic}"),
            self.search_agent.query(f"News: {topic}")
        )
        
        # Analyze results
        analysis = await self.analysis_agent.analyze(search_results)
        
        # Write report
        report = await self.writing_agent.write(
            topic=topic,
            analysis=analysis,
            format="research_paper"
        )
        
        return {
            "sources": search_results,
            "analysis": analysis,
            "report": report
        }

Security and Authentication

A2A Authentication

class A2AAuth:
    """A2A authentication handling."""
    
    SUPPORTED_SCHEMES = ["Bearer", "APIKey", "OAuth2"]
    
    async def authenticate(self, request: dict, agent_card: dict) -> bool:
        """Authenticate A2A request."""
        auth_header = request.get("headers", {}).get("Authorization")
        
        if not auth_header:
            return False
        
        scheme, credentials = auth_header.split(" ", 1)
        
        if scheme not in agent_card.get("authentication", {}).get("schemes", []):
            return False
        
        if scheme == "Bearer":
            return await self._verify_jwt(credentials)
        elif scheme == "APIKey":
            return await self._verify_api_key(credentials)
        
        return False
    
    async def authorize_task(self, task: dict, permissions: list) -> bool:
        """Check if task is authorized."""
        # Check user permissions for task
        # Verify scope
        # Validate resource access
        pass

Best Practices

Agent Design

## A2A Agent Best Practices

### 1. Clear Capabilities
- Define skills precisely
- Use descriptive names
- Include relevant tags

### 2. Error Handling
- Graceful degradation
- Clear error messages
- Retry logic

### 3. State Management
- Use session IDs
- Handle reconnections
- Maintain context

### 4. Security
- Authenticate requests
- Validate inputs
- Encrypt sensitive data

### 5. Monitoring
- Log all interactions
- Track performance
- Alert on failures

Future of A2A

  • Agent Marketplaces: Discovery and selection of agents
  • Federated Learning: Agents learning from each other
  • Agent Governance: Standards for agent behavior
  • Cross-Platform: A2A spanning cloud, edge, and devices

Conclusion

A2A represents a fundamental shift in how AI agents collaborate. By providing a standard protocol for agent communication, A2A enables the creation of sophisticated multi-agent systems that can work across organizational boundaries. As the agent ecosystem grows, A2A will become increasingly critical for building interconnected AI systems.

Start by implementing A2A in single-agent applications, then expand to multi-agent workflows as you gain experience. The protocol’s simplicity and power make it accessible for projects of any size.

Resources

Comments

Share this article

Scan to read on mobile