From 5affc9c33392b2636e5a7b860a3d9628f4cd6e1e Mon Sep 17 00:00:00 2001 From: Giles Odigwe <79032838+giles17@users.noreply.github.com> Date: Mon, 1 Jun 2026 00:09:11 -0700 Subject: [PATCH] Python: Reorganize A2A samples and use package A2AExecutor (#6165) * Reorganize A2A samples: client demos in 02-agents, use package A2AExecutor - Move client samples (agent_with_a2a, a2a_agent_as_function_tools) to samples/02-agents/a2a/ - Add new concept samples: polling, stream reconnection, protocol selection - Replace sample agent_executor.py with package-level A2AExecutor (stream=True) - Update 04-hosting/a2a to focus on server-side, point to 02-agents for clients - Add README.md for the new 02-agents/a2a/ sample collection Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix streaming artifact coalescing and address PR review feedback A2AExecutor fix: - Generate a stable artifact_id per stream in _run_stream so all streaming chunks share the same ID, enabling proper append=True coalescing per the A2A spec (TaskArtifactUpdateEvent with same artifactId). - Previously, item.message_id was None for OpenAI/Foundry streaming updates, causing the SDK to generate a new random UUID per token (100+ separate artifacts instead of 1 appended artifact). Sample improvements: - Replace join workaround with response.text now that coalescing works - Add background=True to stream reconnection resume call (required for continuation token emission on in-progress tasks) - Fix type ignore specificity in polling sample Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../a2a/agent_framework_a2a/_a2a_executor.py | 35 ++++- python/samples/02-agents/a2a/README.md | 54 ++++++++ .../a2a/a2a_agent_as_function_tools.py | 2 +- python/samples/02-agents/a2a/a2a_polling.py | 96 ++++++++++++++ .../02-agents/a2a/a2a_protocol_selection.py | 84 ++++++++++++ .../02-agents/a2a/a2a_stream_reconnection.py | 124 ++++++++++++++++++ .../a2a/agent_with_a2a.py | 17 +-- python/samples/04-hosting/a2a/README.md | 48 ++----- python/samples/04-hosting/a2a/a2a_server.py | 4 +- .../samples/04-hosting/a2a/agent_executor.py | 83 ------------ 10 files changed, 409 insertions(+), 138 deletions(-) create mode 100644 python/samples/02-agents/a2a/README.md rename python/samples/{04-hosting => 02-agents}/a2a/a2a_agent_as_function_tools.py (99%) create mode 100644 python/samples/02-agents/a2a/a2a_polling.py create mode 100644 python/samples/02-agents/a2a/a2a_protocol_selection.py create mode 100644 python/samples/02-agents/a2a/a2a_stream_reconnection.py rename python/samples/{04-hosting => 02-agents}/a2a/agent_with_a2a.py (88%) delete mode 100644 python/samples/04-hosting/a2a/agent_executor.py diff --git a/python/packages/a2a/agent_framework_a2a/_a2a_executor.py b/python/packages/a2a/agent_framework_a2a/_a2a_executor.py index 949ce10167..3685e82742 100644 --- a/python/packages/a2a/agent_framework_a2a/_a2a_executor.py +++ b/python/packages/a2a/agent_framework_a2a/_a2a_executor.py @@ -2,6 +2,7 @@ import base64 import logging +import uuid from asyncio import CancelledError from collections.abc import Mapping from functools import partial @@ -181,9 +182,18 @@ class A2AExecutor(AgentExecutor): """Run the agent in streaming mode and publish updates to the task updater.""" response_stream = self._agent.run(query, session=session, stream=True, **self._run_kwargs) streamed_artifact_ids: set[str] = set() + # Generate a stable artifact ID for the entire stream so all chunks share the same ID. + # This ensures clients can coalesce streaming tokens into a single artifact/message + # per the A2A spec (TaskArtifactUpdateEvent with append=True on same artifactId). + default_artifact_id = str(uuid.uuid4()) await ( response_stream.with_transform_hook( - partial(self.handle_events, updater=updater, streamed_artifact_ids=streamed_artifact_ids) + partial( + self.handle_events, + updater=updater, + streamed_artifact_ids=streamed_artifact_ids, + default_artifact_id=default_artifact_id, + ) ) ).get_final_response() @@ -199,7 +209,11 @@ class A2AExecutor(AgentExecutor): await self.handle_events(message, updater) async def handle_events( - self, item: Message | AgentResponseUpdate, updater: TaskUpdater, streamed_artifact_ids: set[str] | None = None + self, + item: Message | AgentResponseUpdate, + updater: TaskUpdater, + streamed_artifact_ids: set[str] | None = None, + default_artifact_id: str | None = None, ) -> None: """Convert agent response items (Messages or Updates) to A2A protocol events. @@ -213,7 +227,10 @@ class A2AExecutor(AgentExecutor): item: The agent response item (Message or AgentResponseUpdate) to process. updater: The task updater to publish events to. streamed_artifact_ids: A set of artifact IDs that have already been streamed. - Used to prevent duplicate updates for the same artifact. + Used to track which artifacts need append=True on subsequent chunks. + default_artifact_id: A stable artifact ID to use when the item does not provide one. + This ensures all streaming chunks for a single response share the same artifact ID, + allowing clients to coalesce them into a single message. Example: .. code-block:: python @@ -224,6 +241,7 @@ class A2AExecutor(AgentExecutor): item: Message | AgentResponseUpdate, updater: TaskUpdater, streamed_artifact_ids: set[str] | None = None, + default_artifact_id: str | None = None, ) -> None: # Custom logic to transform item contents if item.role == "assistant" and item.contents: @@ -260,19 +278,22 @@ class A2AExecutor(AgentExecutor): if parts: if isinstance(item, AgentResponseUpdate): + # Resolve artifact ID: use item's message_id if available, otherwise fall back + # to the stable default_artifact_id so all streaming chunks share the same ID. + artifact_id = item.message_id or default_artifact_id # For streaming updates, we send TaskArtifactUpdateEvent via add_artifact await updater.add_artifact( parts=parts, - artifact_id=item.message_id, + artifact_id=artifact_id, metadata=metadata, append=( True - if streamed_artifact_ids is not None and item.message_id in (streamed_artifact_ids or set()) + if streamed_artifact_ids is not None and artifact_id in streamed_artifact_ids else None ), ) - if item.message_id and streamed_artifact_ids is not None: - streamed_artifact_ids.add(item.message_id) + if artifact_id and streamed_artifact_ids is not None: + streamed_artifact_ids.add(artifact_id) else: # For final messages, we send TaskStatusUpdateEvent with 'working' state await updater.update_status( diff --git a/python/samples/02-agents/a2a/README.md b/python/samples/02-agents/a2a/README.md new file mode 100644 index 0000000000..e4247a95e9 --- /dev/null +++ b/python/samples/02-agents/a2a/README.md @@ -0,0 +1,54 @@ +# A2A Client Samples + +These samples demonstrate how to **consume** remote A2A-compliant agents using the Agent Framework's `A2AAgent` class. + +For hosting your own agents as A2A servers, see [`samples/04-hosting/a2a/`](../../04-hosting/a2a/). + +## Samples + +| Sample | Concept | +|--------|---------| +| [`agent_with_a2a.py`](agent_with_a2a.py) | Basic consumption — non-streaming and streaming | +| [`a2a_agent_as_function_tools.py`](a2a_agent_as_function_tools.py) | Expose A2A skills as function tools for a host agent | +| [`a2a_polling.py`](a2a_polling.py) | Poll a long-running task with continuation tokens | +| [`a2a_stream_reconnection.py`](a2a_stream_reconnection.py) | Resume an interrupted stream via continuation token | +| [`a2a_protocol_selection.py`](a2a_protocol_selection.py) | Configure preferred protocol bindings (JSONRPC, GRPC, HTTP+JSON) | + +## Prerequisites + +- A running A2A-compliant agent server (see `samples/04-hosting/a2a/` to start one) +- Set `A2A_AGENT_HOST` environment variable to the server URL +- For `a2a_agent_as_function_tools.py`: also set `FOUNDRY_PROJECT_ENDPOINT` and `FOUNDRY_MODEL` + +## Running + +```bash +cd python/samples/02-agents/a2a + +# Start an A2A server in another terminal first: +# cd python/samples/04-hosting/a2a && uv run python a2a_server.py + +export A2A_AGENT_HOST="http://localhost:5001/" +uv run python agent_with_a2a.py +``` + +## Key APIs + +```python +from agent_framework.a2a import A2AAgent + +# Connect to a remote agent +async with A2AAgent(url="http://localhost:5001/", agent_card=card) as agent: + # Non-streaming + response = await agent.run("Hello") + + # Streaming + stream = agent.run("Hello", stream=True) + async for update in stream: + print(update.text) + + # Background + polling + response = await agent.run("Long task", background=True) + while response.continuation_token: + response = await agent.poll_task(response.continuation_token) +``` diff --git a/python/samples/04-hosting/a2a/a2a_agent_as_function_tools.py b/python/samples/02-agents/a2a/a2a_agent_as_function_tools.py similarity index 99% rename from python/samples/04-hosting/a2a/a2a_agent_as_function_tools.py rename to python/samples/02-agents/a2a/a2a_agent_as_function_tools.py index ca753f2d42..c441fe77e1 100644 --- a/python/samples/04-hosting/a2a/a2a_agent_as_function_tools.py +++ b/python/samples/02-agents/a2a/a2a_agent_as_function_tools.py @@ -33,7 +33,7 @@ Prerequisites: - Set FOUNDRY_MODEL to the model deployment name (e.g. gpt-4o) To run this sample: - cd python/samples/04-hosting/a2a + cd python/samples/02-agents/a2a uv run python a2a_agent_as_function_tools.py """ diff --git a/python/samples/02-agents/a2a/a2a_polling.py b/python/samples/02-agents/a2a/a2a_polling.py new file mode 100644 index 0000000000..cccce69d72 --- /dev/null +++ b/python/samples/02-agents/a2a/a2a_polling.py @@ -0,0 +1,96 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import os + +import httpx +from a2a.client import A2ACardResolver +from agent_framework.a2a import A2AAgent +from dotenv import load_dotenv + +load_dotenv() + +""" +A2A Polling for Task Completion + +This sample demonstrates how to poll a long-running A2A task for completion +using continuation tokens. When `background=True`, the agent returns immediately +with a continuation token that you can use to check progress later. + +Key concepts demonstrated: +- Starting a background A2A task with `background=True` +- Receiving a continuation token for in-progress tasks +- Polling with `poll_task()` until the task reaches a terminal state + +This is the A2A equivalent of the .NET A2AAgent_PollingForTaskCompletion sample. + +Prerequisites: +- Set A2A_AGENT_HOST to the URL of a running A2A server + +To run this sample: + cd python/samples/02-agents/a2a + uv run python a2a_polling.py +""" + + +async def main() -> None: + """Demonstrates polling a long-running A2A task for completion.""" + a2a_agent_host = os.getenv("A2A_AGENT_HOST") + if not a2a_agent_host: + raise ValueError("A2A_AGENT_HOST environment variable is not set") + + # 1. Resolve agent card and create agent. + async with httpx.AsyncClient(timeout=60.0) as http_client: + resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host) + agent_card = await resolver.get_agent_card() + + async with A2AAgent( + name=agent_card.name, + agent_card=agent_card, + url=a2a_agent_host, + ) as agent: + # 2. Start a background task — the agent returns immediately. + print("Starting background task...") + response = await agent.run( + "Write a detailed research report on quantum computing advances in 2025", + background=True, + ) + + # 3. Check if we got a continuation token (task still in progress). + if response.continuation_token is None: + # Task completed immediately — no polling needed. + print("Task completed immediately:") + print(f" {response.text}") + return + + # 4. Poll until the task completes. + token = response.continuation_token + poll_count = 0 + while token is not None: + poll_count += 1 + print(f" Poll #{poll_count} — task still in progress, waiting 2s...") + await asyncio.sleep(2) + + response = await agent.poll_task(token) # type: ignore[arg-type] + token = response.continuation_token + + # 5. Task is done — print the final response. + print(f"\nTask completed after {poll_count} poll(s):") + print(f" {response.text[:200]}...") + + +if __name__ == "__main__": + asyncio.run(main()) + + +""" +Sample output: + +Starting background task... + Poll #1 — task still in progress, waiting 2s... + Poll #2 — task still in progress, waiting 2s... + Poll #3 — task still in progress, waiting 2s... + +Task completed after 3 poll(s): + Quantum computing has seen remarkable progress in 2025, with breakthroughs in... +""" diff --git a/python/samples/02-agents/a2a/a2a_protocol_selection.py b/python/samples/02-agents/a2a/a2a_protocol_selection.py new file mode 100644 index 0000000000..c005d7365b --- /dev/null +++ b/python/samples/02-agents/a2a/a2a_protocol_selection.py @@ -0,0 +1,84 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import os + +import httpx +from a2a.client import A2ACardResolver +from agent_framework.a2a import A2AAgent +from dotenv import load_dotenv + +load_dotenv() + +""" +A2A Protocol Selection + +This sample demonstrates how to configure which protocol binding the A2A client +uses when connecting to a remote agent. The A2A specification defines three +standard bindings: JSONRPC, GRPC, and HTTP+JSON. Agents declare their supported +bindings in their AgentCard, and clients can express a preference. + +Key concepts demonstrated: +- Configuring `supported_protocol_bindings` on A2AAgent +- The client selects a binding that matches the remote agent's capabilities +- Fallback behavior when preferred binding is unavailable + +This is the A2A equivalent of the .NET A2AAgent_ProtocolSelection sample. + +Prerequisites: +- Set A2A_AGENT_HOST to the URL of a running A2A server + +To run this sample: + cd python/samples/02-agents/a2a + uv run python a2a_protocol_selection.py +""" + + +async def main() -> None: + """Demonstrates configuring A2A protocol binding preferences.""" + a2a_agent_host = os.getenv("A2A_AGENT_HOST") + if not a2a_agent_host: + raise ValueError("A2A_AGENT_HOST environment variable is not set") + + # 1. Resolve agent card to see what bindings are available. + async with httpx.AsyncClient(timeout=60.0) as http_client: + resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host) + agent_card = await resolver.get_agent_card() + + print(f"Agent: {agent_card.name}") + print("Supported interfaces:") + for interface in agent_card.supported_interfaces: + print(f" - {interface.protocol_binding} @ {interface.url}") + + # 2. Create agent with explicit protocol binding preference. + # The list is ordered by preference — the SDK will select the first + # binding that matches a supported interface on the agent card. + # + # This matters when a server exposes multiple interfaces (e.g. JSONRPC + # on / and HTTP+JSON on /api/). If only one binding is available, the + # client uses it regardless of your preference list. + async with A2AAgent( + name=agent_card.name, + agent_card=agent_card, + url=a2a_agent_host, + supported_protocol_bindings=["HTTP+JSON", "JSONRPC"], + ) as agent: + print("\nConfigured bindings: ['HTTP+JSON', 'JSONRPC']") + response = await agent.run("Tell me a short joke") + print(f"Response: {response.text}") + + +if __name__ == "__main__": + asyncio.run(main()) + + +""" +Sample output: + +Agent: PolicyAgent +Supported interfaces: + - JSONRPC @ http://localhost:5001/ + +Configured bindings: ['HTTP+JSON', 'JSONRPC'] +Response: Here's a short joke for you... +""" diff --git a/python/samples/02-agents/a2a/a2a_stream_reconnection.py b/python/samples/02-agents/a2a/a2a_stream_reconnection.py new file mode 100644 index 0000000000..c9fd0a8891 --- /dev/null +++ b/python/samples/02-agents/a2a/a2a_stream_reconnection.py @@ -0,0 +1,124 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import os + +import httpx +from a2a.client import A2ACardResolver +from agent_framework.a2a import A2AAgent +from dotenv import load_dotenv + +load_dotenv() + +""" +A2A Stream Reconnection + +This sample demonstrates how to reconnect to an interrupted A2A stream +using a continuation token. When streaming a long-running task, you can +capture the continuation token from any update and use it to resume the +stream later if the connection is lost. + +Key concepts demonstrated: +- Streaming an A2A response with `stream=True` +- Capturing continuation tokens from in-progress updates +- Simulating a stream interruption (break) +- Resuming the stream with `run(continuation_token=..., stream=True)` + +This is the A2A equivalent of the .NET A2AAgent_StreamReconnection sample. + +Prerequisites: +- Set A2A_AGENT_HOST to the URL of a running A2A server + +To run this sample: + cd python/samples/02-agents/a2a + uv run python a2a_stream_reconnection.py +""" + + +async def main() -> None: + """Demonstrates reconnecting to an interrupted A2A stream.""" + a2a_agent_host = os.getenv("A2A_AGENT_HOST") + if not a2a_agent_host: + raise ValueError("A2A_AGENT_HOST environment variable is not set") + + # 1. Resolve agent card and create agent. + async with httpx.AsyncClient(timeout=60.0) as http_client: + resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host) + agent_card = await resolver.get_agent_card() + + async with A2AAgent( + name=agent_card.name, + agent_card=agent_card, + url=a2a_agent_host, + ) as agent: + # 2. Start a streaming background task. + print("Starting streaming task...") + stream = agent.run( + "Write a long essay about the history of artificial intelligence", + stream=True, + background=True, + ) + + # 3. Read a few updates, capture the continuation token, then "disconnect". + saved_token = None + update_count = 0 + async for update in stream: + update_count += 1 + if update.continuation_token: + saved_token = update.continuation_token + for content in update.contents: + if content.text: + print(content.text, end="", flush=True) + + # Simulate a disconnect after receiving 3 updates. + if update_count >= 3: + print("\n\n--- Connection interrupted! ---\n") + break + + if saved_token is None: + print("No continuation token received — task may have completed before interruption.") + return + + # 4. Reconnect using the saved continuation token. + # background=True is required so that in-progress task updates + # surface continuation tokens (matching the A2AAgent contract). + print(f"Reconnecting with continuation token (task_id={saved_token['task_id']})...") + resumed_stream = agent.run( + continuation_token=saved_token, + stream=True, + background=True, + ) + + # 5. Continue receiving updates from where we left off. + async for update in resumed_stream: + update_count += 1 + for content in update.contents: + if content.text: + print(content.text, end="", flush=True) + print() # newline after streaming completes + + response = await resumed_stream.get_final_response() + print(f"\nStream completed. Total updates: {update_count}") + print(f"Final response: {len(response.messages)} message(s)") + + +if __name__ == "__main__": + asyncio.run(main()) + + +""" +Sample output: + +Starting streaming task... +Policy: + +--- Connection interrupted! --- + +Reconnecting with continuation token (task_id=task-abc123)... + Short Shipment Dispute Handling Policy V2.1 + +Summary: "For short shipments reported by customers, first verify internal..." + +Stream completed. Total updates: 106 +Final response: 103 message(s) +""" diff --git a/python/samples/04-hosting/a2a/agent_with_a2a.py b/python/samples/02-agents/a2a/agent_with_a2a.py similarity index 88% rename from python/samples/04-hosting/a2a/agent_with_a2a.py rename to python/samples/02-agents/a2a/agent_with_a2a.py index 58415b038c..c35fbb234c 100644 --- a/python/samples/04-hosting/a2a/agent_with_a2a.py +++ b/python/samples/02-agents/a2a/agent_with_a2a.py @@ -22,7 +22,7 @@ technologies to communicate seamlessly. By default the A2AAgent waits for the remote agent to finish before returning (background=False). This means long-running A2A tasks are handled transparently — the caller simply awaits the result. For advanced scenarios where you need to poll or resubscribe to in-progress tasks, see the -background_responses sample: samples/concepts/background_responses.py +a2a_polling and a2a_stream_reconnection samples in this folder. For more information about the A2A protocol specification, visit: https://a2a-protocol.org/latest/ @@ -70,9 +70,7 @@ async def main(): print("\n--- Non-streaming response ---") response = await agent.run("What are your capabilities?") - print("Agent Response:") - for message in response.messages: - print(f" {message.text}") + print(f"Agent Response:\n {response.text}") # 5. Stream a response — the natural model for A2A. # Updates arrive as Server-Sent Events, letting you observe @@ -82,12 +80,11 @@ async def main(): async for update in stream: for content in update.contents: if content.text: - print(f" {content.text}") + print(content.text, end="", flush=True) + print() # newline after streaming completes response = await stream.get_final_response() - print(f"\nFinal response ({len(response.messages)} message(s)):") - for message in response.messages: - print(f" {message.text}") + print(f"\nFinal response:\n {response.text}") if __name__ == "__main__": @@ -105,8 +102,8 @@ Agent Response: I can help with code generation, analysis, and general Q&A. --- Streaming response --- - I am an AI assistant built to help with various tasks. +I am an AI assistant built to help with various tasks. -Final response (1 message(s)): +Final response: I am an AI assistant built to help with various tasks. """ diff --git a/python/samples/04-hosting/a2a/README.md b/python/samples/04-hosting/a2a/README.md index bc76d124f6..803bab3f9e 100644 --- a/python/samples/04-hosting/a2a/README.md +++ b/python/samples/04-hosting/a2a/README.md @@ -1,39 +1,30 @@ -# A2A Agent Examples +# A2A Server Hosting Examples -This sample demonstrates how to host and consume agents using the [A2A (Agent2Agent) protocol](https://a2a-protocol.org/latest/) with the `agent_framework` package. There are three runnable entry points: +This sample demonstrates how to **host** Agent Framework agents as A2A-compliant servers using the [A2A (Agent2Agent) protocol](https://a2a-protocol.org/latest/). + +> **Looking for client samples?** See [`samples/02-agents/a2a/`](../../02-agents/a2a/) for consuming remote A2A agents. + +## Server Samples | Run this file | To... | |---------------|-------| -| **[`a2a_server.py`](a2a_server.py)** | Host an Agent Framework agent as an A2A-compliant server. | -| **[`agent_with_a2a.py`](agent_with_a2a.py)** | Connect to an A2A server and send requests (non-streaming and streaming). | -| **[`a2a_agent_as_function_tools.py`](a2a_agent_as_function_tools.py)** | Convert A2A agent skills into function tools for a host agent. | +| **[`a2a_server.py`](a2a_server.py)** | Host an Agent Framework agent as an A2A-compliant server (multi-agent). | +| **[`agent_framework_to_a2a.py`](agent_framework_to_a2a.py)** | Minimal example: expose a single agent as an A2A server. | -The remaining files are supporting modules used by the server: +## Supporting Modules | File | Description | |------|-------------| -| [`agent_framework_to_a2a.py`](agent_framework_to_a2a.py) | Exposes an agent_framework agent as an A2A-compliant server. Demonstrates how to wrap an agent_framework agent and expose it as an A2A service that other A2A clients can discover and communicate with. | | [`agent_definitions.py`](agent_definitions.py) | Agent and AgentCard factory definitions for invoice, policy, and logistics agents. | -| [`agent_executor.py`](agent_executor.py) | Bridges the a2a-sdk `AgentExecutor` interface to Agent Framework agents. | | [`invoice_data.py`](invoice_data.py) | Mock invoice data and tool functions for the invoice agent. | | [`a2a_server.http`](a2a_server.http) | REST Client requests for testing the server directly from VS Code. | ## Environment Variables -Make sure to set the following environment variables before running the examples: - ### Required (Server) - `FOUNDRY_PROJECT_ENDPOINT` — Your Azure AI Foundry project endpoint - `FOUNDRY_MODEL` — Model deployment name (e.g. `gpt-4o`) -### Required (Client) -- `A2A_AGENT_HOST` — URL of the A2A server (e.g. `http://localhost:5001/`) - -### Required (Function Tools Sample) -- `A2A_AGENT_HOST` — URL of the A2A server (e.g. `http://localhost:5000/`) -- `FOUNDRY_PROJECT_ENDPOINT` — Your Azure AI Foundry project endpoint -- `FOUNDRY_MODEL` — Model deployment name (e.g. `gpt-4o`) - ## Quick Start All commands below should be run from this directory: @@ -67,7 +58,7 @@ uv run python a2a_server.py --agent-type policy ### 1. Start the A2A Server -> **Note (Option A — pip users):** Replace `uv run python` with `python` in all `uv run` commands below (e.g. `python a2a_server.py ...`). `uv` is not required once the virtual environment is activated. +> **Note (Option A — pip users):** Replace `uv run python` with `python` in all `uv run` commands below. `uv` is not required once the virtual environment is activated. Pick an agent type and start the server (each in its own terminal): @@ -79,25 +70,12 @@ uv run python a2a_server.py --agent-type logistics --port 5002 You can run one agent or all three — each listens on its own port. -### 2. Run the A2A Client +### 2. Run a Client -In a separate terminal (from the same directory), point the client at a running server: +Once a server is running, use any of the client samples in [`samples/02-agents/a2a/`](../../02-agents/a2a/): ```powershell +cd python/samples/02-agents/a2a $env:A2A_AGENT_HOST = "http://localhost:5001/" uv run python agent_with_a2a.py - -# A2A server exposing an agent_framework agent -uv run python agent_framework_to_a2a.py -``` - -### 3. Run the Function Tools Sample - -This sample resolves the remote agent's skills and registers each one as a function tool -on a host Foundry-backed agent. The host agent then autonomously selects the right skill -to handle the user's request. - -```powershell -$env:A2A_AGENT_HOST = "http://localhost:5000/" -uv run python a2a_agent_as_function_tools.py ``` diff --git a/python/samples/04-hosting/a2a/a2a_server.py b/python/samples/04-hosting/a2a/a2a_server.py index 8eaf3a7363..185d9da048 100644 --- a/python/samples/04-hosting/a2a/a2a_server.py +++ b/python/samples/04-hosting/a2a/a2a_server.py @@ -9,7 +9,7 @@ from a2a.server.request_handlers import DefaultRequestHandler from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes from a2a.server.tasks import InMemoryTaskStore from agent_definitions import AGENT_CARD_FACTORIES, AGENT_FACTORIES -from agent_executor import AgentFrameworkExecutor +from agent_framework.a2a import A2AExecutor from agent_framework.foundry import FoundryChatClient from azure.identity import AzureCliCredential from dotenv import load_dotenv @@ -92,7 +92,7 @@ def main() -> None: # Build the A2A server components url = f"http://{args.host}:{args.port}/" agent_card = AGENT_CARD_FACTORIES[args.agent_type](url) - executor = AgentFrameworkExecutor(agent) + executor = A2AExecutor(agent, stream=True) task_store = InMemoryTaskStore() request_handler = DefaultRequestHandler( agent_executor=executor, diff --git a/python/samples/04-hosting/a2a/agent_executor.py b/python/samples/04-hosting/a2a/agent_executor.py deleted file mode 100644 index 3dcefff09f..0000000000 --- a/python/samples/04-hosting/a2a/agent_executor.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -"""AgentExecutor bridge between the a2a-sdk server and Agent Framework agents. - -Implements the a2a-sdk ``AgentExecutor`` interface so that incoming A2A -requests are forwarded to an Agent Framework agent and the response is -published back through the a2a-sdk event queue. -""" - -from __future__ import annotations - -import asyncio -from typing import TYPE_CHECKING - -from a2a.helpers import new_task_from_user_message -from a2a.server.agent_execution.agent_executor import AgentExecutor -from a2a.server.tasks import TaskUpdater -from a2a.types import Part, TaskState - -if TYPE_CHECKING: - from a2a.server.agent_execution.context import RequestContext - from a2a.server.events.event_queue import EventQueue - from agent_framework import Agent - - -class AgentFrameworkExecutor(AgentExecutor): - """Bridges A2A protocol requests to an Agent Framework agent. - - For each incoming ``execute`` call the executor: - 1. Extracts the user's text from the A2A ``RequestContext``. - 2. Runs the Agent Framework agent (non-streaming). - 3. Publishes the result as an A2A ``Message`` to the ``EventQueue``. - """ - - def __init__(self, agent: Agent) -> None: - self.agent = agent - - async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: - """Run the agent and publish the response.""" - user_text = context.get_user_input() - if not user_text: - user_text = "Hello" - - # v1.0 requires a Task object in the queue before any TaskStatusUpdateEvent - task = context.current_task - if not task and context.message: - task = new_task_from_user_message(context.message) - await event_queue.enqueue_event(task) - - task_id = task.id if task else context.task_id - updater = TaskUpdater(event_queue, task_id, context.context_id) - - # Signal that the agent is working - await updater.start_work() - - try: - response = await self.agent.run(user_text) - - # Build response text from agent messages - response_parts: list[Part] = [] - for msg in response.messages: - if msg.text: - response_parts.append(Part(text=msg.text)) - - if not response_parts: - response_parts.append(Part(text=str(response))) - - # Publish the agent's response and mark as completed - await updater.complete( - message=updater.new_agent_message(response_parts), - ) - except asyncio.CancelledError: - raise - except Exception as e: - await updater.update_status( - state=TaskState.TASK_STATE_FAILED, - message=updater.new_agent_message([Part(text=f"Agent error: {e}")]), - ) - - async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: - """Handle cancellation by publishing a canceled status.""" - updater = TaskUpdater(event_queue, context.task_id, context.context_id) - await updater.update_status(state=TaskState.TASK_STATE_CANCELED)