Skip to main content
โšก Calmops

A2A Protocol: Agent-to-Agent Communication Complete Guide 2026

Introduction

As AI agents become more capable, a new challenge emerges: how do agents from different companies, built on different platforms, communicate and collaborate? The A2A (Agent-to-Agent) protocol addresses this exact problem. Proposed by Google and now under Linux Foundation governance, A2A enables different AI agents to work together seamlesslyโ€”sharing tasks, exchanging information, and coordinating complex workflows.

In 2026, A2A has emerged as a critical standard for building agentic AI systems at scale. This guide covers the A2A protocol architecture, implementation, and how to build multi-agent systems that leverage this powerful standard.

Understanding A2A Protocol

What is A2A?

A2A is a protocol that enables agents to:

  • Discover each other’s capabilities
  • Negotiate task execution
  • Exchange context and results
  • Collaborate on complex workflows

Think of A2A as HTTP for agentsโ€”just as web servers use HTTP to communicate, AI agents use A2A.

Why A2A Matters

## Before A2A
- Agents were isolated
- No standard communication
- Limited collaboration
- Vendor lock-in

## After A2A
- Agents can discover each other
- Standard communication protocol
- Seamless collaboration
- Interoperability across platforms

A2A Protocol Architecture

Core Concepts

class A2AAgent:
    """Base A2A-compatible agent."""
    
    def __init__(self, agent_id, capabilities):
        self.agent_id = agent_id
        self.capabilities = capabilities
        self.endpoints = {}
        self.message_queue = []
    
    def advertise_capabilities(self):
        """Announce what this agent can do."""
        return {
            "agent_id": self.agent_id,
            "name": self.name,
            "version": "1.0",
            "capabilities": self.capabilities,
            "endpoint": self.endpoint,
            "skills": self.skills
        }

Agent Card

{
  "agent_id": "research-agent-001",
  "name": "Research Assistant",
  "description": "Advanced research and analysis agent",
  "url": "https://api.example.com/agents/research-agent",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransition": true
  },
  "skills": [
    {
      "id": "web-search",
      "name": "Web Search",
      "description": "Search the web for information",
      "tags": ["search", "research", "web"]
    },
    {
      "id": "document-analysis",
      "name": "Document Analysis",
      "description": "Analyze and summarize documents",
      "tags": ["analysis", "documents", "summary"]
    }
  ],
  "authentication": {
    "schemes": ["Bearer", "APIKey"]
  }
}

Message Format

{
  "jsonrpc": "2.0",
  "id": "msg-123",
  "method": "tasks/send",
  "params": {
    "sessionId": "session-456",
    "message": {
      "role": "user",
      "parts": [
        {
          "type": "text",
          "text": "Research the latest developments in quantum computing"
        },
        {
          "type": "file",
          "file": {
            "name": "requirements.pdf",
            "mimeType": "application/pdf"
          }
        }
      ],
      "metadata": {
        "priority": "high",
        "deadline": "2026-03-20T12:00:00Z"
      }
    },
    "pushNotification": {
      "endpoint": "https://client.example.com/notifications",
      "authentication": {
        "schemes": ["Bearer"]
      }
    }
  }
}

Building A2A Agents

Simple A2A Agent

from a2a.protocol import Agent, Message, Task
from a2a.transport import HTTPTransport

class ResearchAgent(Agent):
    """Research agent implementing A2A protocol."""
    
    def __init__(self):
        super().__init__(
            agent_id="research-agent",
            name="Research Assistant",
            capabilities=["web_search", "document_analysis", "summarization"]
        )
    
    async def handle_message(self, message: Message) -> Message:
        """Process incoming message."""
        user_request = message.content
        
        if "search" in user_request.lower():
            result = await self.web_search(user_request)
        elif "analyze" in user_request.lower():
            result = await self.analyze_document(user_request)
        else:
            result = await self.general_research(user_request)
        
        return Message(
            role="assistant",
            content=result,
            task_id=message.task_id
        )
    
    async def web_search(self, query):
        """Perform web search."""
        # Implementation
        return f"Search results for: {query}"
    
    async def analyze_document(self, document):
        """Analyze document."""
        # Implementation
        return f"Analysis complete"
    
    async def general_research(self, query):
        """General research."""
        # Implementation
        return f"Research on: {query}"
    
    def get_agent_card(self):
        """Return agent capabilities."""
        return {
            "agent_id": self.agent_id,
            "name": self.name,
            "capabilities": self.capabilities,
            "endpoint": self.endpoint,
            "skills": [
                {"id": "web_search", "name": "Web Search"},
                {"id": "analyze", "name": "Document Analysis"},
                {"id": "research", "name": "General Research"}
            ]
        }

A2A Server

from a2a.server import A2AServer
from a2a.transport import StarletteTransport

app = StarletteTransport()

server = A2AServer(
    agent=ResearchAgent(),
    transport=app
)

@app.get("/agent.json")
async def agent_card():
    """Serve agent card for discovery."""
    return server.agent.get_agent_card()

@app.post("/")
async def handle_message(request: Request):
    """Handle A2A messages."""
    return await server.handle(request.json())

A2A Client

from a2a.client import A2AClient

class AgentOrchestrator:
    """Orchestrate multiple agents via A2A."""
    
    def __init__(self):
        self.agents = {}
    
    async def discover_agent(self, url: str) -> dict:
        """Discover agent capabilities."""
        async with A2AClient(url) as client:
            card = await client.get_agent_card()
            self.agents[card["agent_id"]] = {
                "url": url,
                "card": card
            }
            return card
    
    async def send_task(self, agent_id: str, task: str) -> str:
        """Send task to specific agent."""
        agent = self.agents.get(agent_id)
        if not agent:
            raise ValueError(f"Agent {agent_id} not found")
        
        async with A2AClient(agent["url"]) as client:
            task_result = await client.send_task(task)
            return task_result

Multi-Agent Collaboration

Task Delegation

class MultiAgentCoordinator:
    """Coordinate tasks across multiple A2A agents."""
    
    def __init__(self):
        self.agents = {}
        self.task_graph = {}
    
    async def decompose_task(self, task: str) -> list:
        """Break task into subtasks."""
        prompt = f"""Break this task into subtasks that can be executed by different agents.
        
Task: {task}

Return a list of subtasks in order."""

        # Use LLM to decompose
        subtasks = [
            {"agent": "research", "task": "Research background"},
            {"agent": "analysis", "task": "Analyze data"},
            {"agent": "writing", "task": "Write report"}
        ]
        
        return subtasks
    
    async def execute_parallel(self, subtasks: list) -> list:
        """Execute subtasks in parallel."""
        results = await asyncio.gather(*[
            self.send_to_agent(st["agent"], st["task"])
            for st in subtasks
        ])
        return results
    
    async def execute_sequential(self, subtasks: list) -> list:
        """Execute subtasks sequentially."""
        results = []
        context = ""
        
        for subtask in subtasks:
            # Add previous results to context
            if results:
                subtask["task"] += f"\n\nPrevious results:\n{context}"
            
            result = await self.send_to_agent(subtask["agent"], subtask["task"])
            results.append(result)
            context += f"\n{result}"
        
        return results
    
    async def send_to_agent(self, agent_type: str, task: str) -> str:
        """Send task to agent of specific type."""
        agent = self._get_agent_by_type(agent_type)
        return await agent.execute(task)

Agent Discovery

class AgentDirectory:
    """Directory of available A2A agents."""
    
    def __init__(self):
        self.agents = {}
    
    async def register_agent(self, agent_card: dict):
        """Register new agent."""
        self.agents[agent_card["agent_id"]] = agent_card
    
    async def find_agents(self, capability: str) -> list:
        """Find agents with specific capability."""
        matching = []
        
        for agent_id, card in self.agents.items():
            for skill in card.get("skills", []):
                if capability.lower() in skill["name"].lower():
                    matching.append(card)
        
        return matching
    
    async def select_best_agent(self, capability: str) -> dict:
        """Select best agent for capability."""
        candidates = await self.find_agents(capability)
        
        # Select based on capability match score
        # Could also consider latency, reliability, etc.
        if not candidates:
            raise NoAgentFoundError(f"No agent found for: {capability}")
        
        return candidates[0]

A2A with MCP

Combining A2A and MCP

class HybridAgent:
    """Agent using both A2A and MCP."""
    
    def __init__(self):
        self.a2a = A2AAgentTransport()
        self.mcp = MCPClient()
    
    async def handle_message(self, message: Message):
        """Handle message with both A2A and MCP."""
        
        # Use MCP for tool calls within agent
        tools = await self.mcp.discover_tools()
        
        # Use A2A to collaborate with other agents
        if self._needs_collaboration(message):
            collaborators = await self._find_collaborators(message)
            return await self._collaborate(collaborators, message)
        
        # Process locally with MCP tools
        return await self._process_with_tools(message, tools)
    
    async def _collaborate(self, collaborators, message):
        """Collaborate with other agents via A2A."""
        results = []
        
        for agent in collaborators:
            async with A2AClient(agent["url"]) as client:
                result = await client.send_task(message.content)
                results.append(result)
        
        return self._synthesize_results(results)

Use Cases

Enterprise Workflow Automation

class EnterpriseWorkflowAgent:
    """Enterprise workflow using A2A agents."""
    
    def __init__(self):
        self.email_agent = None
        self.documents_agent = None
        self.calendar_agent = None
        self.analytics_agent = None
    
    async def setup(self):
        """Initialize all agents."""
        self.email_agent = await self.discover("https://email.example.com/agent.json")
        self.documents_agent = await self.discover("https://docs.example.com/agent.json")
        self.calendar_agent = await self.discover("https://calendar.example.com/agent.json")
        self.analytics_agent = await self.discover("https://analytics.example.com/agent.json")
    
    async def handle_meeting_request(self, request):
        """Handle complete meeting workflow."""
        
        # Step 1: Check calendar
        calendar_result = await self.send_task(
            self.calendar_agent,
            f"Find available time slots for {request.date}"
        )
        
        # Step 2: Create document
        doc_result = await self.send_task(
            self.documents_agent,
            f"Create agenda for: {request.topic}"
        )
        
        # Step 3: Send invites
        email_result = await self.send_task(
            self.email_agent,
            f"Send meeting invites to: {request.attendees}"
        )
        
        # Step 4: Schedule follow-up
        follow_up = await self.send_task(
            self.calendar_agent,
            f"Schedule follow-up meeting"
        )
        
        return {
            "calendar": calendar_result,
            "document": doc_result,
            "email": email_result,
            "follow_up": follow_up
        }

Research Assistant Network

class ResearchNetwork:
    """Network of research agents."""
    
    def __init__(self):
        self.search_agent = None
        self.analysis_agent = None
        self.writing_agent = None
    
    async def research_topic(self, topic: str) -> dict:
        """Full research workflow."""
        
        # Parallel: search multiple sources
        search_results = await asyncio.gather(
            self.search_agent.query(f"Academic: {topic}"),
            self.search_agent.query(f"Industry: {topic}"),
            self.search_agent.query(f"News: {topic}")
        )
        
        # Analyze results
        analysis = await self.analysis_agent.analyze(search_results)
        
        # Write report
        report = await self.writing_agent.write(
            topic=topic,
            analysis=analysis,
            format="research_paper"
        )
        
        return {
            "sources": search_results,
            "analysis": analysis,
            "report": report
        }

Security and Authentication

A2A Authentication

class A2AAuth:
    """A2A authentication handling."""
    
    SUPPORTED_SCHEMES = ["Bearer", "APIKey", "OAuth2"]
    
    async def authenticate(self, request: dict, agent_card: dict) -> bool:
        """Authenticate A2A request."""
        auth_header = request.get("headers", {}).get("Authorization")
        
        if not auth_header:
            return False
        
        scheme, credentials = auth_header.split(" ", 1)
        
        if scheme not in agent_card.get("authentication", {}).get("schemes", []):
            return False
        
        if scheme == "Bearer":
            return await self._verify_jwt(credentials)
        elif scheme == "APIKey":
            return await self._verify_api_key(credentials)
        
        return False
    
    async def authorize_task(self, task: dict, permissions: list) -> bool:
        """Check if task is authorized."""
        # Check user permissions for task
        # Verify scope
        # Validate resource access
        pass

Best Practices

Agent Design

## A2A Agent Best Practices

### 1. Clear Capabilities
- Define skills precisely
- Use descriptive names
- Include relevant tags

### 2. Error Handling
- Graceful degradation
- Clear error messages
- Retry logic

### 3. State Management
- Use session IDs
- Handle reconnections
- Maintain context

### 4. Security
- Authenticate requests
- Validate inputs
- Encrypt sensitive data

### 5. Monitoring
- Log all interactions
- Track performance
- Alert on failures

Future of A2A

  • Agent Marketplaces: Discovery and selection of agents
  • Federated Learning: Agents learning from each other
  • Agent Governance: Standards for agent behavior
  • Cross-Platform: A2A spanning cloud, edge, and devices

Conclusion

A2A represents a fundamental shift in how AI agents collaborate. By providing a standard protocol for agent communication, A2A enables the creation of sophisticated multi-agent systems that can work across organizational boundaries. As the agent ecosystem grows, A2A will become increasingly critical for building interconnected AI systems.

Start by implementing A2A in single-agent applications, then expand to multi-agent workflows as you gain experience. The protocol’s simplicity and power make it accessible for projects of any size.

Resources

Comments