# Copyright (c) Microsoft. All rights reserved. """AgentExecutor bridge between the a2a-sdk server and Agent Framework agents. Implements the a2a-sdk ``AgentExecutor`` interface so that incoming A2A requests are forwarded to an Agent Framework agent and the response is published back through the a2a-sdk event queue. """ from __future__ import annotations import asyncio from typing import TYPE_CHECKING from a2a.helpers import new_task_from_user_message from a2a.server.agent_execution.agent_executor import AgentExecutor from a2a.server.tasks import TaskUpdater from a2a.types import Part, TaskState if TYPE_CHECKING: from a2a.server.agent_execution.context import RequestContext from a2a.server.events.event_queue import EventQueue from agent_framework import Agent class AgentFrameworkExecutor(AgentExecutor): """Bridges A2A protocol requests to an Agent Framework agent. For each incoming ``execute`` call the executor: 1. Extracts the user's text from the A2A ``RequestContext``. 2. Runs the Agent Framework agent (non-streaming). 3. Publishes the result as an A2A ``Message`` to the ``EventQueue``. """ def __init__(self, agent: Agent) -> None: self.agent = agent async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: """Run the agent and publish the response.""" user_text = context.get_user_input() if not user_text: user_text = "Hello" # v1.0 requires a Task object in the queue before any TaskStatusUpdateEvent task = context.current_task if not task and context.message: task = new_task_from_user_message(context.message) await event_queue.enqueue_event(task) task_id = task.id if task else context.task_id updater = TaskUpdater(event_queue, task_id, context.context_id) # Signal that the agent is working await updater.start_work() try: response = await self.agent.run(user_text) # Build response text from agent messages response_parts: list[Part] = [] for msg in response.messages: if msg.text: response_parts.append(Part(text=msg.text)) if not response_parts: response_parts.append(Part(text=str(response))) # Publish the agent's response and mark as completed await updater.complete( message=updater.new_agent_message(response_parts), ) except asyncio.CancelledError: raise except Exception as e: await updater.update_status( state=TaskState.TASK_STATE_FAILED, message=updater.new_agent_message([Part(text=f"Agent error: {e}")]), ) async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: """Handle cancellation by publishing a canceled status.""" updater = TaskUpdater(event_queue, context.task_id, context.context_id) await updater.update_status(state=TaskState.TASK_STATE_CANCELED)