Files
Giles Odigwe 5affc9c333 Python: Reorganize A2A samples and use package A2AExecutor (#6165)
* Reorganize A2A samples: client demos in 02-agents, use package A2AExecutor

- Move client samples (agent_with_a2a, a2a_agent_as_function_tools) to samples/02-agents/a2a/
- Add new concept samples: polling, stream reconnection, protocol selection
- Replace sample agent_executor.py with package-level A2AExecutor (stream=True)
- Update 04-hosting/a2a to focus on server-side, point to 02-agents for clients
- Add README.md for the new 02-agents/a2a/ sample collection

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix streaming artifact coalescing and address PR review feedback

A2AExecutor fix:
- Generate a stable artifact_id per stream in _run_stream so all streaming
  chunks share the same ID, enabling proper append=True coalescing per the
  A2A spec (TaskArtifactUpdateEvent with same artifactId).
- Previously, item.message_id was None for OpenAI/Foundry streaming updates,
  causing the SDK to generate a new random UUID per token (100+ separate
  artifacts instead of 1 appended artifact).

Sample improvements:
- Replace join workaround with response.text now that coalescing works
- Add background=True to stream reconnection resume call (required for
  continuation token emission on in-progress tasks)
- Fix type ignore specificity in polling sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-01 07:09:11 +00:00

125 lines
3.9 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from dotenv import load_dotenv
load_dotenv()
"""
A2A Stream Reconnection
This sample demonstrates how to reconnect to an interrupted A2A stream
using a continuation token. When streaming a long-running task, you can
capture the continuation token from any update and use it to resume the
stream later if the connection is lost.
Key concepts demonstrated:
- Streaming an A2A response with `stream=True`
- Capturing continuation tokens from in-progress updates
- Simulating a stream interruption (break)
- Resuming the stream with `run(continuation_token=..., stream=True)`
This is the A2A equivalent of the .NET A2AAgent_StreamReconnection sample.
Prerequisites:
- Set A2A_AGENT_HOST to the URL of a running A2A server
To run this sample:
cd python/samples/02-agents/a2a
uv run python a2a_stream_reconnection.py
"""
async def main() -> None:
"""Demonstrates reconnecting to an interrupted A2A stream."""
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
# 1. Resolve agent card and create agent.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
async with A2AAgent(
name=agent_card.name,
agent_card=agent_card,
url=a2a_agent_host,
) as agent:
# 2. Start a streaming background task.
print("Starting streaming task...")
stream = agent.run(
"Write a long essay about the history of artificial intelligence",
stream=True,
background=True,
)
# 3. Read a few updates, capture the continuation token, then "disconnect".
saved_token = None
update_count = 0
async for update in stream:
update_count += 1
if update.continuation_token:
saved_token = update.continuation_token
for content in update.contents:
if content.text:
print(content.text, end="", flush=True)
# Simulate a disconnect after receiving 3 updates.
if update_count >= 3:
print("\n\n--- Connection interrupted! ---\n")
break
if saved_token is None:
print("No continuation token received — task may have completed before interruption.")
return
# 4. Reconnect using the saved continuation token.
# background=True is required so that in-progress task updates
# surface continuation tokens (matching the A2AAgent contract).
print(f"Reconnecting with continuation token (task_id={saved_token['task_id']})...")
resumed_stream = agent.run(
continuation_token=saved_token,
stream=True,
background=True,
)
# 5. Continue receiving updates from where we left off.
async for update in resumed_stream:
update_count += 1
for content in update.contents:
if content.text:
print(content.text, end="", flush=True)
print() # newline after streaming completes
response = await resumed_stream.get_final_response()
print(f"\nStream completed. Total updates: {update_count}")
print(f"Final response: {len(response.messages)} message(s)")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Starting streaming task...
Policy:
--- Connection interrupted! ---
Reconnecting with continuation token (task_id=task-abc123)...
Short Shipment Dispute Handling Policy V2.1
Summary: "For short shipments reported by customers, first verify internal..."
Stream completed. Total updates: 106
Final response: 103 message(s)
"""