Python: Reorganize A2A samples and use package A2AExecutor (#6165)

* Reorganize A2A samples: client demos in 02-agents, use package A2AExecutor

- Move client samples (agent_with_a2a, a2a_agent_as_function_tools) to samples/02-agents/a2a/
- Add new concept samples: polling, stream reconnection, protocol selection
- Replace sample agent_executor.py with package-level A2AExecutor (stream=True)
- Update 04-hosting/a2a to focus on server-side, point to 02-agents for clients
- Add README.md for the new 02-agents/a2a/ sample collection

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix streaming artifact coalescing and address PR review feedback

A2AExecutor fix:
- Generate a stable artifact_id per stream in _run_stream so all streaming
  chunks share the same ID, enabling proper append=True coalescing per the
  A2A spec (TaskArtifactUpdateEvent with same artifactId).
- Previously, item.message_id was None for OpenAI/Foundry streaming updates,
  causing the SDK to generate a new random UUID per token (100+ separate
  artifacts instead of 1 appended artifact).

Sample improvements:
- Replace join workaround with response.text now that coalescing works
- Add background=True to stream reconnection resume call (required for
  continuation token emission on in-progress tasks)
- Fix type ignore specificity in polling sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Giles Odigwe
2026-06-01 00:09:11 -07:00
committed by GitHub
Unverified
parent edcc786651
commit 5affc9c333
10 changed files with 409 additions and 138 deletions
+54
View File
@@ -0,0 +1,54 @@
# A2A Client Samples
These samples demonstrate how to **consume** remote A2A-compliant agents using the Agent Framework's `A2AAgent` class.
For hosting your own agents as A2A servers, see [`samples/04-hosting/a2a/`](../../04-hosting/a2a/).
## Samples
| Sample | Concept |
|--------|---------|
| [`agent_with_a2a.py`](agent_with_a2a.py) | Basic consumption — non-streaming and streaming |
| [`a2a_agent_as_function_tools.py`](a2a_agent_as_function_tools.py) | Expose A2A skills as function tools for a host agent |
| [`a2a_polling.py`](a2a_polling.py) | Poll a long-running task with continuation tokens |
| [`a2a_stream_reconnection.py`](a2a_stream_reconnection.py) | Resume an interrupted stream via continuation token |
| [`a2a_protocol_selection.py`](a2a_protocol_selection.py) | Configure preferred protocol bindings (JSONRPC, GRPC, HTTP+JSON) |
## Prerequisites
- A running A2A-compliant agent server (see `samples/04-hosting/a2a/` to start one)
- Set `A2A_AGENT_HOST` environment variable to the server URL
- For `a2a_agent_as_function_tools.py`: also set `FOUNDRY_PROJECT_ENDPOINT` and `FOUNDRY_MODEL`
## Running
```bash
cd python/samples/02-agents/a2a
# Start an A2A server in another terminal first:
# cd python/samples/04-hosting/a2a && uv run python a2a_server.py
export A2A_AGENT_HOST="http://localhost:5001/"
uv run python agent_with_a2a.py
```
## Key APIs
```python
from agent_framework.a2a import A2AAgent
# Connect to a remote agent
async with A2AAgent(url="http://localhost:5001/", agent_card=card) as agent:
# Non-streaming
response = await agent.run("Hello")
# Streaming
stream = agent.run("Hello", stream=True)
async for update in stream:
print(update.text)
# Background + polling
response = await agent.run("Long task", background=True)
while response.continuation_token:
response = await agent.poll_task(response.continuation_token)
```
@@ -0,0 +1,150 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import re
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
"""
A2A Agent Skills as Function Tools
This sample demonstrates how to represent an A2A agent's skills as individual
function tools and register them with a host agent. Each skill advertised in the
remote agent's AgentCard becomes a separate tool that the host agent can invoke.
Key concepts demonstrated:
- Resolving an AgentCard from a remote A2A endpoint
- Converting each skill into a FunctionTool via as_tool()
- Registering those tools with a host agent
- Having the host agent autonomously select and invoke A2A skills
Prerequisites:
- Set A2A_AGENT_HOST to the URL of a running A2A server
- Set FOUNDRY_PROJECT_ENDPOINT to your Azure AI Foundry project endpoint
- Set FOUNDRY_MODEL to the model deployment name (e.g. gpt-4o)
To run this sample:
cd python/samples/02-agents/a2a
uv run python a2a_agent_as_function_tools.py
"""
async def main() -> None:
"""Discover A2A agent skills and register them as tools on a host agent."""
# 1. Read environment configuration.
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
project_endpoint = os.getenv("FOUNDRY_PROJECT_ENDPOINT")
model = os.getenv("FOUNDRY_MODEL")
if not project_endpoint or not model:
raise ValueError("FOUNDRY_PROJECT_ENDPOINT and FOUNDRY_MODEL must be set")
print(f"Connecting to A2A agent at: {a2a_agent_host}")
# 2. Resolve the remote agent card to discover its skills.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
print(f"Found agent: {agent_card.name} ({len(agent_card.skills)} skill(s))")
for skill in agent_card.skills:
print(f" - {skill.name}: {skill.description}")
# 3. Create the A2AAgent that wraps the remote endpoint.
async with A2AAgent(
name=agent_card.name,
description=agent_card.description,
agent_card=agent_card,
url=a2a_agent_host,
) as a2a_agent:
# 4. Convert each A2A skill into a FunctionTool.
# Skill names may contain spaces or special characters, so we
# sanitize them into valid tool identifiers before passing to as_tool().
skill_tools = [
a2a_agent.as_tool(
name=re.sub(r"[^0-9A-Za-z]+", "_", skill.name),
description=skill.description or "",
)
for skill in agent_card.skills
]
# 5. Create the host agent with the skill tools.
credential = AzureCliCredential()
client = FoundryChatClient(
project_endpoint=project_endpoint,
model=model,
credential=credential,
)
host_agent = client.as_agent(
name="assistant",
instructions="You are a helpful assistant. Use your tools to answer questions.",
tools=skill_tools,
)
# 6. Run the host agent — it will select and invoke the appropriate A2A skill tools.
query = "Show me all invoices for Contoso"
print(f"\nUser: {query}\n")
response = await host_agent.run(query)
print(f"Agent: {response}")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Connecting to A2A agent at: http://localhost:5000/
Found agent: InvoiceAgent (1 skill(s))
- InvoiceQuery: Handles requests relating to invoices.
User: Show me all invoices for Contoso
Agent: Here are the invoices for Contoso:
1. **Invoice ID:** INV789
- **Date:** 2026-02-15
- **Products:**
- T-Shirts: 150 units @ $10.00 = $1,500.00
- Hats: 200 units @ $15.00 = $3,000.00
- Glasses: 300 units @ $5.00 = $1,500.00
- **Total:** $6,000.00
2. **Invoice ID:** INV333
- **Date:** 2026-03-14
- **Products:**
- T-Shirts: 400 units @ $11.00 = $4,400.00
- Hats: 600 units @ $15.00 = $9,000.00
- Glasses: 700 units @ $5.00 = $3,500.00
- **Total:** $16,900.00
3. **Invoice ID:** INV666
- **Date:** 2026-02-06
- **Products:**
- T-Shirts: 2,500 units @ $8.00 = $20,000.00
- Hats: 1,200 units @ $10.00 = $12,000.00
- Glasses: 1,000 units @ $6.00 = $6,000.00
- **Total:** $38,000.00
4. **Invoice ID:** INV999
- **Date:** 2026-03-19
- **Products:**
- T-Shirts: 1,400 units @ $10.50 = $14,700.00
- Hats: 1,100 units @ $9.00 = $9,900.00
- Glasses: 950 units @ $12.00 = $11,400.00
- **Total:** $36,000.00
If you need more details or a specific invoice, please let me know!
"""
@@ -0,0 +1,96 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from dotenv import load_dotenv
load_dotenv()
"""
A2A Polling for Task Completion
This sample demonstrates how to poll a long-running A2A task for completion
using continuation tokens. When `background=True`, the agent returns immediately
with a continuation token that you can use to check progress later.
Key concepts demonstrated:
- Starting a background A2A task with `background=True`
- Receiving a continuation token for in-progress tasks
- Polling with `poll_task()` until the task reaches a terminal state
This is the A2A equivalent of the .NET A2AAgent_PollingForTaskCompletion sample.
Prerequisites:
- Set A2A_AGENT_HOST to the URL of a running A2A server
To run this sample:
cd python/samples/02-agents/a2a
uv run python a2a_polling.py
"""
async def main() -> None:
"""Demonstrates polling a long-running A2A task for completion."""
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
# 1. Resolve agent card and create agent.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
async with A2AAgent(
name=agent_card.name,
agent_card=agent_card,
url=a2a_agent_host,
) as agent:
# 2. Start a background task — the agent returns immediately.
print("Starting background task...")
response = await agent.run(
"Write a detailed research report on quantum computing advances in 2025",
background=True,
)
# 3. Check if we got a continuation token (task still in progress).
if response.continuation_token is None:
# Task completed immediately — no polling needed.
print("Task completed immediately:")
print(f" {response.text}")
return
# 4. Poll until the task completes.
token = response.continuation_token
poll_count = 0
while token is not None:
poll_count += 1
print(f" Poll #{poll_count} — task still in progress, waiting 2s...")
await asyncio.sleep(2)
response = await agent.poll_task(token) # type: ignore[arg-type]
token = response.continuation_token
# 5. Task is done — print the final response.
print(f"\nTask completed after {poll_count} poll(s):")
print(f" {response.text[:200]}...")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Starting background task...
Poll #1 — task still in progress, waiting 2s...
Poll #2 — task still in progress, waiting 2s...
Poll #3 — task still in progress, waiting 2s...
Task completed after 3 poll(s):
Quantum computing has seen remarkable progress in 2025, with breakthroughs in...
"""
@@ -0,0 +1,84 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from dotenv import load_dotenv
load_dotenv()
"""
A2A Protocol Selection
This sample demonstrates how to configure which protocol binding the A2A client
uses when connecting to a remote agent. The A2A specification defines three
standard bindings: JSONRPC, GRPC, and HTTP+JSON. Agents declare their supported
bindings in their AgentCard, and clients can express a preference.
Key concepts demonstrated:
- Configuring `supported_protocol_bindings` on A2AAgent
- The client selects a binding that matches the remote agent's capabilities
- Fallback behavior when preferred binding is unavailable
This is the A2A equivalent of the .NET A2AAgent_ProtocolSelection sample.
Prerequisites:
- Set A2A_AGENT_HOST to the URL of a running A2A server
To run this sample:
cd python/samples/02-agents/a2a
uv run python a2a_protocol_selection.py
"""
async def main() -> None:
"""Demonstrates configuring A2A protocol binding preferences."""
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
# 1. Resolve agent card to see what bindings are available.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
print(f"Agent: {agent_card.name}")
print("Supported interfaces:")
for interface in agent_card.supported_interfaces:
print(f" - {interface.protocol_binding} @ {interface.url}")
# 2. Create agent with explicit protocol binding preference.
# The list is ordered by preference — the SDK will select the first
# binding that matches a supported interface on the agent card.
#
# This matters when a server exposes multiple interfaces (e.g. JSONRPC
# on / and HTTP+JSON on /api/). If only one binding is available, the
# client uses it regardless of your preference list.
async with A2AAgent(
name=agent_card.name,
agent_card=agent_card,
url=a2a_agent_host,
supported_protocol_bindings=["HTTP+JSON", "JSONRPC"],
) as agent:
print("\nConfigured bindings: ['HTTP+JSON', 'JSONRPC']")
response = await agent.run("Tell me a short joke")
print(f"Response: {response.text}")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Agent: PolicyAgent
Supported interfaces:
- JSONRPC @ http://localhost:5001/
Configured bindings: ['HTTP+JSON', 'JSONRPC']
Response: Here's a short joke for you...
"""
@@ -0,0 +1,124 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from dotenv import load_dotenv
load_dotenv()
"""
A2A Stream Reconnection
This sample demonstrates how to reconnect to an interrupted A2A stream
using a continuation token. When streaming a long-running task, you can
capture the continuation token from any update and use it to resume the
stream later if the connection is lost.
Key concepts demonstrated:
- Streaming an A2A response with `stream=True`
- Capturing continuation tokens from in-progress updates
- Simulating a stream interruption (break)
- Resuming the stream with `run(continuation_token=..., stream=True)`
This is the A2A equivalent of the .NET A2AAgent_StreamReconnection sample.
Prerequisites:
- Set A2A_AGENT_HOST to the URL of a running A2A server
To run this sample:
cd python/samples/02-agents/a2a
uv run python a2a_stream_reconnection.py
"""
async def main() -> None:
"""Demonstrates reconnecting to an interrupted A2A stream."""
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
# 1. Resolve agent card and create agent.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
async with A2AAgent(
name=agent_card.name,
agent_card=agent_card,
url=a2a_agent_host,
) as agent:
# 2. Start a streaming background task.
print("Starting streaming task...")
stream = agent.run(
"Write a long essay about the history of artificial intelligence",
stream=True,
background=True,
)
# 3. Read a few updates, capture the continuation token, then "disconnect".
saved_token = None
update_count = 0
async for update in stream:
update_count += 1
if update.continuation_token:
saved_token = update.continuation_token
for content in update.contents:
if content.text:
print(content.text, end="", flush=True)
# Simulate a disconnect after receiving 3 updates.
if update_count >= 3:
print("\n\n--- Connection interrupted! ---\n")
break
if saved_token is None:
print("No continuation token received — task may have completed before interruption.")
return
# 4. Reconnect using the saved continuation token.
# background=True is required so that in-progress task updates
# surface continuation tokens (matching the A2AAgent contract).
print(f"Reconnecting with continuation token (task_id={saved_token['task_id']})...")
resumed_stream = agent.run(
continuation_token=saved_token,
stream=True,
background=True,
)
# 5. Continue receiving updates from where we left off.
async for update in resumed_stream:
update_count += 1
for content in update.contents:
if content.text:
print(content.text, end="", flush=True)
print() # newline after streaming completes
response = await resumed_stream.get_final_response()
print(f"\nStream completed. Total updates: {update_count}")
print(f"Final response: {len(response.messages)} message(s)")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Starting streaming task...
Policy:
--- Connection interrupted! ---
Reconnecting with continuation token (task_id=task-abc123)...
Short Shipment Dispute Handling Policy V2.1
Summary: "For short shipments reported by customers, first verify internal..."
Stream completed. Total updates: 106
Final response: 103 message(s)
"""
@@ -0,0 +1,109 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
"""
Agent2Agent (A2A) Protocol Integration Sample
This sample demonstrates how to connect to and communicate with external agents using
the A2A protocol. A2A is a standardized communication protocol that enables interoperability
between different agent systems, allowing agents built with different frameworks and
technologies to communicate seamlessly.
By default the A2AAgent waits for the remote agent to finish before returning (background=False).
This means long-running A2A tasks are handled transparently — the caller simply awaits the result.
For advanced scenarios where you need to poll or resubscribe to in-progress tasks, see the
a2a_polling and a2a_stream_reconnection samples in this folder.
For more information about the A2A protocol specification, visit: https://a2a-protocol.org/latest/
Key concepts demonstrated:
- Discovering A2A-compliant agents using AgentCard resolution
- Creating A2AAgent instances to wrap external A2A endpoints
- Non-streaming request/response
- Streaming responses to receive incremental updates via SSE
To run this sample:
1. Set the A2A_AGENT_HOST environment variable to point to an A2A-compliant agent endpoint
Example: export A2A_AGENT_HOST="https://your-a2a-agent.example.com"
2. Ensure the target agent exposes its AgentCard at /.well-known/agent.json
3. Run: uv run python agent_with_a2a.py
Visit the README.md for more details on setting up and running A2A agents.
"""
async def main():
"""Demonstrates connecting to and communicating with an A2A-compliant agent."""
# 1. Get A2A agent host from environment.
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
print(f"Connecting to A2A agent at: {a2a_agent_host}")
# 2. Resolve the agent card to discover capabilities.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
print(f"Found agent: {agent_card.name} - {agent_card.description}")
# 3. Create A2A agent instance.
async with A2AAgent(
name=agent_card.name,
description=agent_card.description,
agent_card=agent_card,
url=a2a_agent_host,
) as agent:
# 4. Simple request/response — the agent waits for completion internally.
# Even if the remote agent takes a while, background=False (the default)
# means the call blocks until a terminal state is reached.
print("\n--- Non-streaming response ---")
response = await agent.run("What are your capabilities?")
print(f"Agent Response:\n {response.text}")
# 5. Stream a response — the natural model for A2A.
# Updates arrive as Server-Sent Events, letting you observe
# progress in real time as the remote agent works.
print("\n--- Streaming response ---")
stream = agent.run("Tell me about yourself", stream=True)
async for update in stream:
for content in update.contents:
if content.text:
print(content.text, end="", flush=True)
print() # newline after streaming completes
response = await stream.get_final_response()
print(f"\nFinal response:\n {response.text}")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Connecting to A2A agent at: http://localhost:5001/
Found agent: MyAgent - A helpful AI assistant
--- Non-streaming response ---
Agent Response:
I can help with code generation, analysis, and general Q&A.
--- Streaming response ---
I am an AI assistant built to help with various tasks.
Final response:
I am an AI assistant built to help with various tasks.
"""