Files
agent-framework/python/samples/04-hosting/a2a/agent_executor.py
T
Giles Odigwe ded32f3ff8 Python: Add A2A server sample (#4528)
* Python: Add A2A server sample and fix client streaming bug

Add a pure Python A2A server sample so testing the A2A client no longer
requires running the .NET server. The server uses the a2a-sdk's
A2AStarletteApplication with uvicorn and supports three agent types
(invoice, policy, logistics) backed by AzureOpenAIResponsesClient.

New files:
- a2a_server.py: Main server entry point with CLI args
- agent_executor.py: Bridges a2a-sdk AgentExecutor to Agent Framework
- agent_definitions.py: Agent and AgentCard factory definitions
- invoice_data.py: Mock invoice data and query tool functions
- a2a_server.http: REST Client requests for testing

Also fixes a streaming bug in agent_with_a2a.py where async with was
used on ResponseStream which does not support the async context manager
protocol. Changed to async for to match all other samples.

Closes #4045

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR review: handle CancelledError and fix end_date filtering

- Re-raise asyncio.CancelledError before the broad exception handler
  so cooperative cancellation is not swallowed.
- Make end_date filter inclusive of the full day by comparing with
  < end + timedelta(days=1) instead of <= midnight.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-10 00:00:49 +00:00

124 lines
4.1 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""AgentExecutor bridge between the a2a-sdk server and Agent Framework agents.
Implements the a2a-sdk ``AgentExecutor`` interface so that incoming A2A
requests are forwarded to an Agent Framework agent and the response is
published back through the a2a-sdk event queue.
"""
from __future__ import annotations
import asyncio
import uuid
from typing import TYPE_CHECKING
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.types import (
Message,
Part,
Role,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
TextPart,
)
if TYPE_CHECKING:
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from agent_framework import Agent
class AgentFrameworkExecutor(AgentExecutor):
"""Bridges A2A protocol requests to an Agent Framework agent.
For each incoming ``execute`` call the executor:
1. Extracts the user's text from the A2A ``RequestContext``.
2. Runs the Agent Framework agent (non-streaming).
3. Publishes the result as an A2A ``Message`` to the ``EventQueue``.
"""
def __init__(self, agent: Agent) -> None:
self.agent = agent
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Run the agent and publish the response."""
user_text = context.get_user_input()
if not user_text:
user_text = "Hello"
task_id = context.task_id or str(uuid.uuid4())
context_id = context.context_id or str(uuid.uuid4())
# Signal that the agent is working
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.working),
final=False,
)
)
try:
response = await self.agent.run(user_text)
# Build response text from agent messages
response_parts: list[Part] = []
for msg in response.messages:
if msg.text:
response_parts.append(TextPart(text=msg.text))
if not response_parts:
response_parts.append(TextPart(text=str(response)))
# Publish the agent's response as a completed message
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(
state=TaskState.completed,
message=Message(
message_id=str(uuid.uuid4()),
role=Role.agent,
parts=response_parts,
),
),
final=True,
)
)
except asyncio.CancelledError:
raise
except Exception as e:
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(
state=TaskState.failed,
message=Message(
message_id=str(uuid.uuid4()),
role=Role.agent,
parts=[TextPart(text=f"Agent error: {e}")],
),
),
final=True,
)
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Handle cancellation by publishing a canceled status."""
task_id = context.task_id or str(uuid.uuid4())
context_id = context.context_id or str(uuid.uuid4())
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.canceled),
final=True,
)
)