Files
agent-framework/python/samples/04-hosting/a2a/agent_executor.py
Giles Odigwe 4ad96b64e7 Python: [BREAKING] Migrate agent-framework-a2a to a2a-sdk v1.0 (#5752)
* Python: Migrate agent-framework-a2a to a2a-sdk v1.0

Upgrade the a2a-sdk dependency from v0.3.x to v1.0.0 and migrate all
source, tests, samples, and documentation to the v1.0 API.

Key changes:
- Dependency: a2a-sdk>=1.0.0,<2 (was >=0.3.5,<0.3.24)
- Types are now protobuf-based: Part replaces TextPart/FilePart/DataPart
- Enums use SCREAMING_SNAKE_CASE (e.g. TaskState.TASK_STATE_COMPLETED)
- Roles: Role.ROLE_AGENT, Role.ROLE_USER
- Client: SendMessageRequest wrapper, subscribe() replaces resubscribe()
- Server: A2AStarletteApplication replaced by Starlette + route factories
- DefaultRequestHandler now requires agent_card parameter
- TaskUpdater: final parameter removed, add_artifact gains last_chunk
- AgentCard.url removed; use supported_interfaces with AgentInterface
- Stream yields StreamResponse with WhichOneof('payload')

Closes #5661

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR review: validate fallback URL, remove unused task_id vars

- Raise ValueError with clear message when transport negotiation fails
  and no fallback URL is available (neither url arg nor supported_interfaces)
- Remove unused task_id local in status_update branch
- Inline artifact_event.task_id directly in artifact_update branch

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-11 22:46:12 +00:00

119 lines
4.0 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""AgentExecutor bridge between the a2a-sdk server and Agent Framework agents.
Implements the a2a-sdk ``AgentExecutor`` interface so that incoming A2A
requests are forwarded to an Agent Framework agent and the response is
published back through the a2a-sdk event queue.
"""
from __future__ import annotations
import asyncio
import uuid
from typing import TYPE_CHECKING
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.types import (
Message,
Part,
Role,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)
if TYPE_CHECKING:
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from agent_framework import Agent
class AgentFrameworkExecutor(AgentExecutor):
"""Bridges A2A protocol requests to an Agent Framework agent.
For each incoming ``execute`` call the executor:
1. Extracts the user's text from the A2A ``RequestContext``.
2. Runs the Agent Framework agent (non-streaming).
3. Publishes the result as an A2A ``Message`` to the ``EventQueue``.
"""
def __init__(self, agent: Agent) -> None:
self.agent = agent
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Run the agent and publish the response."""
user_text = context.get_user_input()
if not user_text:
user_text = "Hello"
task_id = context.task_id or str(uuid.uuid4())
context_id = context.context_id or str(uuid.uuid4())
# Signal that the agent is working
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_WORKING),
)
)
try:
response = await self.agent.run(user_text)
# Build response text from agent messages
response_parts: list[Part] = []
for msg in response.messages:
if msg.text:
response_parts.append(Part(text=msg.text))
if not response_parts:
response_parts.append(Part(text=str(response)))
# Publish the agent's response as a completed message
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(
state=TaskState.TASK_STATE_COMPLETED,
message=Message(
message_id=str(uuid.uuid4()),
role=Role.ROLE_AGENT,
parts=response_parts,
),
),
)
)
except asyncio.CancelledError:
raise
except Exception as e:
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(
state=TaskState.TASK_STATE_FAILED,
message=Message(
message_id=str(uuid.uuid4()),
role=Role.ROLE_AGENT,
parts=[Part(text=f"Agent error: {e}")],
),
),
)
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Handle cancellation by publishing a canceled status."""
task_id = context.task_id or str(uuid.uuid4())
context_id = context.context_id or str(uuid.uuid4())
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_CANCELED),
)
)