mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
68357b0250
- Fix non-streaming empty response by accumulating intermediate WORKING status updates and flushing them when an empty terminal event arrives - Fix sample agent_executor.py to enqueue Task before status events (required by v1.0 ActiveTask validation) - Fix create_jsonrpc_routes() calls to include required rpc_url param - Fix TYPE_CHECKING imports in sample agent_definitions.py - Add tests for non-streaming content accumulation behavior Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
84 lines
3.1 KiB
Python
84 lines
3.1 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""AgentExecutor bridge between the a2a-sdk server and Agent Framework agents.
|
|
|
|
Implements the a2a-sdk ``AgentExecutor`` interface so that incoming A2A
|
|
requests are forwarded to an Agent Framework agent and the response is
|
|
published back through the a2a-sdk event queue.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import TYPE_CHECKING
|
|
|
|
from a2a.helpers import new_task_from_user_message
|
|
from a2a.server.agent_execution.agent_executor import AgentExecutor
|
|
from a2a.server.tasks import TaskUpdater
|
|
from a2a.types import Part, TaskState
|
|
|
|
if TYPE_CHECKING:
|
|
from a2a.server.agent_execution.context import RequestContext
|
|
from a2a.server.events.event_queue import EventQueue
|
|
from agent_framework import Agent
|
|
|
|
|
|
class AgentFrameworkExecutor(AgentExecutor):
|
|
"""Bridges A2A protocol requests to an Agent Framework agent.
|
|
|
|
For each incoming ``execute`` call the executor:
|
|
1. Extracts the user's text from the A2A ``RequestContext``.
|
|
2. Runs the Agent Framework agent (non-streaming).
|
|
3. Publishes the result as an A2A ``Message`` to the ``EventQueue``.
|
|
"""
|
|
|
|
def __init__(self, agent: Agent) -> None:
|
|
self.agent = agent
|
|
|
|
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
|
|
"""Run the agent and publish the response."""
|
|
user_text = context.get_user_input()
|
|
if not user_text:
|
|
user_text = "Hello"
|
|
|
|
# v1.0 requires a Task object in the queue before any TaskStatusUpdateEvent
|
|
task = context.current_task
|
|
if not task and context.message:
|
|
task = new_task_from_user_message(context.message)
|
|
await event_queue.enqueue_event(task)
|
|
|
|
task_id = task.id if task else context.task_id
|
|
updater = TaskUpdater(event_queue, task_id, context.context_id)
|
|
|
|
# Signal that the agent is working
|
|
await updater.start_work()
|
|
|
|
try:
|
|
response = await self.agent.run(user_text)
|
|
|
|
# Build response text from agent messages
|
|
response_parts: list[Part] = []
|
|
for msg in response.messages:
|
|
if msg.text:
|
|
response_parts.append(Part(text=msg.text))
|
|
|
|
if not response_parts:
|
|
response_parts.append(Part(text=str(response)))
|
|
|
|
# Publish the agent's response and mark as completed
|
|
await updater.complete(
|
|
message=updater.new_agent_message(response_parts),
|
|
)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as e:
|
|
await updater.update_status(
|
|
state=TaskState.TASK_STATE_FAILED,
|
|
message=updater.new_agent_message([Part(text=f"Agent error: {e}")]),
|
|
)
|
|
|
|
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
|
|
"""Handle cancellation by publishing a canceled status."""
|
|
updater = TaskUpdater(event_queue, context.task_id, context.context_id)
|
|
await updater.update_status(state=TaskState.TASK_STATE_CANCELED)
|