mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Address PR review feedback on intermediate output forwarding
- Switch workflow.as_agent() forwarding to an explicit allowlist of {output,
intermediate, data, request_info} so orchestration-internal events
(group_chat, handoff_sent, magentic_orchestrator) stay inside the workflow
instead of leaking into agent responses via str(data) coercion.
- Stop raising on intermediate AgentResponseUpdate in non-streaming run();
surface the partial as a Message with text_reasoning content. The defensive
raise still applies to terminal output events, where Update payloads would
corrupt message ordering.
- Extend the DevUI workflow-event mapper so intermediate yields wrapping
plain strings, Messages, and list[Message] render as visible output items
instead of generic completed-trace events.
- Add orchestration coverage for GroupChat, Handoff, and Magentic builders
(default vs intermediate_outputs=True; structural where end-to-end is heavy).
This commit is contained in:
@@ -32,7 +32,7 @@ from .._types import (
|
||||
from ..exceptions import AgentInvalidRequestException, AgentInvalidResponseException
|
||||
from ._checkpoint import CheckpointStorage
|
||||
from ._events import (
|
||||
_LIFECYCLE_EVENT_TYPES,
|
||||
_AGENT_FORWARDED_EVENT_TYPES,
|
||||
WorkflowEvent,
|
||||
)
|
||||
from ._message_utils import normalize_messages_input
|
||||
@@ -321,7 +321,7 @@ class WorkflowAgent(BaseAgent):
|
||||
function_invocation_kwargs=function_invocation_kwargs,
|
||||
client_kwargs=client_kwargs,
|
||||
):
|
||||
if event.type not in _LIFECYCLE_EVENT_TYPES:
|
||||
if event.type in _AGENT_FORWARDED_EVENT_TYPES:
|
||||
output_events.append(event)
|
||||
|
||||
result = self._convert_workflow_events_to_agent_response(response_id, output_events)
|
||||
@@ -578,12 +578,27 @@ class WorkflowAgent(BaseAgent):
|
||||
)
|
||||
|
||||
if isinstance(data, AgentResponseUpdate):
|
||||
# AgentResponseUpdate in non-streaming mode would break message ordering
|
||||
# if interleaved with non-streaming AgentResponses.
|
||||
raise AgentInvalidRequestException(
|
||||
"Output event with AgentResponseUpdate data cannot be emitted in non-streaming mode. "
|
||||
"Please ensure executors emit AgentResponse for non-streaming workflows."
|
||||
if not is_intermediate:
|
||||
# Terminal AgentResponseUpdate in non-streaming mode would break
|
||||
# message ordering if interleaved with non-streaming AgentResponses.
|
||||
raise AgentInvalidRequestException(
|
||||
"Output event with AgentResponseUpdate data cannot be emitted in non-streaming mode. "
|
||||
"Please ensure executors emit AgentResponse for non-streaming workflows."
|
||||
)
|
||||
# Intermediate updates surface as reasoning content on a synthesized
|
||||
# Message; this preserves the partial signal without altering the
|
||||
# terminal response shape.
|
||||
messages.append(
|
||||
Message(
|
||||
contents=_to_text_reasoning(list(data.contents)),
|
||||
role=data.role or "assistant",
|
||||
author_name=data.author_name or output_event.executor_id,
|
||||
message_id=data.message_id or str(uuid.uuid4()),
|
||||
raw_representation=data.raw_representation,
|
||||
)
|
||||
)
|
||||
raw_representations.append(data.raw_representation)
|
||||
continue
|
||||
|
||||
if isinstance(data, AgentResponse):
|
||||
inner_msgs = [_mark_msg(m) for m in data.messages] if is_intermediate else list(data.messages)
|
||||
@@ -673,16 +688,17 @@ class WorkflowAgent(BaseAgent):
|
||||
Forwarding rule:
|
||||
|
||||
- ``type='output'`` — terminal user-facing emission. Forwarded as-is.
|
||||
- ``type='intermediate'`` (and the deprecated ``type='data'``) — forwarded
|
||||
as intermediate. Text payloads are rewritten to ``text_reasoning`` content;
|
||||
non-text content types (function_call, function_result, data, uri, …)
|
||||
pass through unchanged since their ``Content.type`` already discriminates
|
||||
them.
|
||||
- ``type='request_info'`` — request-info translation (unchanged).
|
||||
- Any other typed event (``intermediate``, the deprecated ``data``,
|
||||
orchestration-specific types) — forwarded as intermediate. Text payloads
|
||||
are rewritten to ``text_reasoning`` content; non-text content types
|
||||
(function_call, function_result, data, uri, …) pass through unchanged
|
||||
since their ``Content.type`` already discriminates them.
|
||||
- Lifecycle / diagnostic events (``started``/``status``/``failed``/
|
||||
``warning``/``error``/``superstep_*``/``executor_*``) are dropped.
|
||||
- Everything else (lifecycle, diagnostics, executor bookkeeping,
|
||||
orchestration-internal events like ``group_chat``/``handoff_sent``/
|
||||
``magentic_orchestrator``) is dropped.
|
||||
"""
|
||||
if event.type in _LIFECYCLE_EVENT_TYPES:
|
||||
if event.type not in _AGENT_FORWARDED_EVENT_TYPES:
|
||||
return []
|
||||
|
||||
if event.type != "request_info":
|
||||
|
||||
@@ -130,21 +130,16 @@ WorkflowEventType = Literal[
|
||||
]
|
||||
|
||||
|
||||
# Framework-managed event types — workflow lifecycle, diagnostics, and executor bookkeeping
|
||||
# — that carry no user-facing payload and are not forwarded through the
|
||||
# ``workflow.as_agent()`` boundary. Internal to the ``_workflows`` package.
|
||||
_LIFECYCLE_EVENT_TYPES: frozenset[str] = frozenset({
|
||||
"started",
|
||||
"status",
|
||||
"failed",
|
||||
"warning",
|
||||
"error",
|
||||
"superstep_started",
|
||||
"superstep_completed",
|
||||
"executor_invoked",
|
||||
"executor_completed",
|
||||
"executor_failed",
|
||||
"executor_bypassed",
|
||||
# Event types forwarded across the ``workflow.as_agent()`` boundary. Anything not
|
||||
# in this set — lifecycle events, diagnostics, executor bookkeeping, and
|
||||
# orchestration-internal events (``group_chat``, ``handoff_sent``,
|
||||
# ``magentic_orchestrator``) — stays inside the workflow and is not surfaced to
|
||||
# agent callers. Internal to the ``_workflows`` package.
|
||||
_AGENT_FORWARDED_EVENT_TYPES: frozenset[str] = frozenset({
|
||||
"output",
|
||||
"intermediate",
|
||||
"data", # deprecated alias for intermediate; retained for backward compat
|
||||
"request_info",
|
||||
})
|
||||
|
||||
|
||||
|
||||
@@ -156,3 +156,74 @@ async def test_workflow_agent_terminal_text_stays_text_not_reasoning() -> None:
|
||||
assert response.text == "the-answer"
|
||||
# No text_reasoning content because everything from `only` is terminal.
|
||||
assert all(c.type != "text_reasoning" for m in response.messages for c in m.contents)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_workflow_agent_non_streaming_accepts_intermediate_update() -> None:
|
||||
"""An intermediate event carrying AgentResponseUpdate must not raise in non-streaming
|
||||
mode. It surfaces as a Message with text_reasoning content."""
|
||||
|
||||
@executor
|
||||
async def emit(messages: list[Message], ctx: WorkflowContext[Never, AgentResponseUpdate]) -> None:
|
||||
await ctx.add_event(
|
||||
WorkflowEvent.intermediate(
|
||||
"emit",
|
||||
AgentResponseUpdate(contents=[Content.from_text(text="partial-thought")], role="assistant"),
|
||||
)
|
||||
)
|
||||
await ctx.yield_output("FINAL")
|
||||
|
||||
workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build()
|
||||
agent = workflow.as_agent("test")
|
||||
|
||||
response = await agent.run("hi")
|
||||
reasoning = " ".join(c.text for m in response.messages for c in m.contents if c.type == "text_reasoning")
|
||||
assert "partial-thought" in reasoning
|
||||
assert response.text == "FINAL"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_workflow_agent_drops_orchestration_internal_events() -> None:
|
||||
"""Orchestration-internal event types (group_chat / handoff_sent / magentic_orchestrator)
|
||||
must not surface through workflow.as_agent(). Their dataclass payloads would otherwise
|
||||
be stringified by the generic fallback path and leak into response history."""
|
||||
|
||||
@executor
|
||||
async def emit(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None:
|
||||
# Construct typed orchestration-internal events directly to assert they get
|
||||
# dropped at the agent boundary regardless of payload.
|
||||
await ctx.add_event(WorkflowEvent("group_chat", data={"orchestrator": "details"})) # type: ignore[arg-type]
|
||||
await ctx.add_event(WorkflowEvent("handoff_sent", data={"target": "agent_b"})) # type: ignore[arg-type]
|
||||
await ctx.add_event(WorkflowEvent("magentic_orchestrator", data={"plan": "..."})) # type: ignore[arg-type]
|
||||
await ctx.yield_output("FINAL")
|
||||
|
||||
workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build()
|
||||
agent = workflow.as_agent("test")
|
||||
|
||||
response = await agent.run("hi")
|
||||
all_text = " ".join(c.text for m in response.messages for c in m.contents if hasattr(c, "text"))
|
||||
assert "orchestrator" not in all_text
|
||||
assert "agent_b" not in all_text
|
||||
assert "plan" not in all_text
|
||||
assert response.text == "FINAL"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_workflow_agent_drops_orchestration_internal_events_streaming() -> None:
|
||||
"""Streaming counterpart — orchestration-internal events stay inside the workflow."""
|
||||
|
||||
@executor
|
||||
async def emit(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None:
|
||||
await ctx.add_event(WorkflowEvent("group_chat", data={"orchestrator": "details"})) # type: ignore[arg-type]
|
||||
await ctx.yield_output("FINAL")
|
||||
|
||||
workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build()
|
||||
agent = workflow.as_agent("test")
|
||||
|
||||
updates: list[AgentResponseUpdate] = []
|
||||
async for update in agent.run("hi", stream=True):
|
||||
updates.append(update)
|
||||
|
||||
all_text = " ".join(c.text for u in updates for c in u.contents if hasattr(c, "text"))
|
||||
assert "orchestrator" not in all_text
|
||||
assert "FINAL" in all_text
|
||||
|
||||
@@ -901,8 +901,11 @@ class MessageMapper:
|
||||
|
||||
return events
|
||||
|
||||
# Handle output events separately to preserve output data
|
||||
if event_type == "output":
|
||||
# Handle yield events (output / intermediate / data) by extracting visible
|
||||
# text from the payload. All three render as a visible message item so the
|
||||
# gap that previously dropped intermediate yields into generic completed-
|
||||
# trace events is closed.
|
||||
if event_type in ("output", "intermediate", "data"):
|
||||
output_data = getattr(event, "data", None)
|
||||
executor_id = getattr(event, "executor_id", "unknown")
|
||||
|
||||
|
||||
@@ -615,6 +615,42 @@ async def test_workflow_intermediate_event_with_agent_response_update_dispatched
|
||||
assert text_events[0].delta == "intermediate progress"
|
||||
|
||||
|
||||
async def test_workflow_intermediate_event_with_string_payload_renders_visible_text(
|
||||
mapper: MessageMapper, test_request: AgentFrameworkRequest
|
||||
) -> None:
|
||||
"""A WorkflowEvent with type='intermediate' wrapping a plain string surfaces as a
|
||||
visible output item — not a generic completed-trace event. Without this, executors
|
||||
that ``await ctx.yield_output("plan: …")`` from non-designated nodes are silently
|
||||
dropped in DevUI."""
|
||||
from agent_framework._workflows._events import WorkflowEvent
|
||||
|
||||
event = WorkflowEvent.intermediate(executor_id="planner", data="plan: starting work")
|
||||
events = await mapper.convert_event(event, test_request)
|
||||
|
||||
assert len(events) == 1
|
||||
assert events[0].type == "response.output_item.added"
|
||||
item = events[0].item
|
||||
assert item.type == "message"
|
||||
assert any("plan: starting work" in str(c) for c in item.content)
|
||||
|
||||
|
||||
async def test_workflow_intermediate_event_with_message_payload_renders_visible_text(
|
||||
mapper: MessageMapper, test_request: AgentFrameworkRequest
|
||||
) -> None:
|
||||
"""type='intermediate' wrapping a Message surfaces visibly — same path as type='output'."""
|
||||
from agent_framework import Message
|
||||
from agent_framework._workflows._events import WorkflowEvent
|
||||
|
||||
msg = Message(role="assistant", contents=[Content.from_text(text="research note")])
|
||||
event = WorkflowEvent.intermediate(executor_id="researcher", data=msg)
|
||||
events = await mapper.convert_event(event, test_request)
|
||||
|
||||
assert len(events) == 1
|
||||
assert events[0].type == "response.output_item.added"
|
||||
item = events[0].item
|
||||
assert any("research note" in str(c) for c in item.content)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# failed event (type='failed') Tests
|
||||
# =============================================================================
|
||||
|
||||
+197
-3
@@ -14,8 +14,8 @@ Verifies that under the strict-output model:
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncIterable, Awaitable
|
||||
from typing import Any, Literal, overload
|
||||
from collections.abc import AsyncIterable, Awaitable, Callable
|
||||
from typing import Any, ClassVar, Literal, overload
|
||||
|
||||
import pytest
|
||||
from agent_framework import (
|
||||
@@ -28,7 +28,18 @@ from agent_framework import (
|
||||
Message,
|
||||
ResponseStream,
|
||||
)
|
||||
from agent_framework.orchestrations import ConcurrentBuilder, SequentialBuilder
|
||||
from agent_framework.orchestrations import (
|
||||
ConcurrentBuilder,
|
||||
GroupChatBuilder,
|
||||
GroupChatState,
|
||||
HandoffBuilder,
|
||||
MagenticBuilder,
|
||||
MagenticContext,
|
||||
MagenticManagerBase,
|
||||
MagenticProgressLedger,
|
||||
MagenticProgressLedgerItem,
|
||||
SequentialBuilder,
|
||||
)
|
||||
|
||||
|
||||
class _EchoAgent(BaseAgent):
|
||||
@@ -257,3 +268,186 @@ async def test_concurrent_default_as_agent_participants_are_text_reasoning() ->
|
||||
|
||||
# The aggregator's default-yielded AgentResponse passes through as text content.
|
||||
assert text_contents, "expected at least one terminal text content from the aggregator"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GroupChat
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _two_step_selector() -> Callable[[GroupChatState], str]:
|
||||
"""Selector that picks each participant once, then keeps the first to keep tests bounded."""
|
||||
counter = {"n": 0}
|
||||
|
||||
def _select(state: GroupChatState) -> str:
|
||||
participants = list(state.participants.keys())
|
||||
step = counter["n"]
|
||||
counter["n"] = step + 1
|
||||
if step == 0:
|
||||
return participants[0]
|
||||
if step == 1 and len(participants) > 1:
|
||||
return participants[1]
|
||||
return participants[0]
|
||||
|
||||
return _select
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_group_chat_default_only_orchestrator_is_output() -> None:
|
||||
"""Default GroupChat: only the orchestrator is designated; participant replies surface
|
||||
as type='intermediate'."""
|
||||
alpha = _EchoAgent(name="alpha")
|
||||
beta = _EchoAgent(name="beta")
|
||||
|
||||
workflow = GroupChatBuilder(
|
||||
participants=[alpha, beta],
|
||||
max_rounds=2,
|
||||
selection_func=_two_step_selector(),
|
||||
).build()
|
||||
|
||||
output_executors: set[str] = set()
|
||||
intermediate_executors: set[str] = set()
|
||||
async for event in workflow.run("kickoff", stream=True):
|
||||
if event.type == "output" and event.executor_id is not None:
|
||||
output_executors.add(event.executor_id)
|
||||
elif event.type == "intermediate" and event.executor_id is not None:
|
||||
intermediate_executors.add(event.executor_id)
|
||||
|
||||
assert "group_chat_orchestrator" in output_executors
|
||||
assert "alpha" in intermediate_executors
|
||||
assert "beta" in intermediate_executors
|
||||
# Participants must NOT appear among designated outputs in the default contract.
|
||||
assert "alpha" not in output_executors
|
||||
assert "beta" not in output_executors
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_group_chat_intermediate_outputs_true_designates_all() -> None:
|
||||
"""GroupChat with intermediate_outputs=True designates orchestrator + every participant —
|
||||
each reply surfaces as type='output'."""
|
||||
alpha = _EchoAgent(name="alpha")
|
||||
beta = _EchoAgent(name="beta")
|
||||
|
||||
workflow = GroupChatBuilder(
|
||||
participants=[alpha, beta],
|
||||
max_rounds=2,
|
||||
selection_func=_two_step_selector(),
|
||||
intermediate_outputs=True,
|
||||
).build()
|
||||
|
||||
output_executors: set[str] = set()
|
||||
async for event in workflow.run("kickoff", stream=True):
|
||||
if event.type == "output" and event.executor_id is not None:
|
||||
output_executors.add(event.executor_id)
|
||||
|
||||
assert {"group_chat_orchestrator", "alpha", "beta"}.issubset(output_executors)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Handoff
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_handoff_builder_designates_every_participant_as_output() -> None:
|
||||
"""Handoff has no intermediate channel — every participant's reply is a primary
|
||||
output. The builder must designate all participants in the workflow's
|
||||
``_output_executors`` set so each per-agent yield surfaces as type='output'.
|
||||
|
||||
Structural assertion (vs end-to-end) because Handoff agents require a full
|
||||
chat-client/middleware stack that we don't want to reproduce in this contract test.
|
||||
"""
|
||||
from agent_framework import Agent
|
||||
from agent_framework._clients import BaseChatClient
|
||||
from agent_framework._middleware import ChatMiddlewareLayer
|
||||
from agent_framework._tools import FunctionInvocationLayer
|
||||
|
||||
class _StubClient(FunctionInvocationLayer[Any], ChatMiddlewareLayer[Any], BaseChatClient[Any]):
|
||||
def __init__(self) -> None:
|
||||
ChatMiddlewareLayer.__init__(self)
|
||||
FunctionInvocationLayer.__init__(self)
|
||||
BaseChatClient.__init__(self)
|
||||
|
||||
def _inner_get_response(self, **kwargs: Any) -> Any: # pragma: no cover - never called
|
||||
raise NotImplementedError
|
||||
|
||||
alpha = Agent(
|
||||
name="alpha",
|
||||
id="alpha",
|
||||
client=_StubClient(),
|
||||
require_per_service_call_history_persistence=True,
|
||||
)
|
||||
beta = Agent(
|
||||
name="beta",
|
||||
id="beta",
|
||||
client=_StubClient(),
|
||||
require_per_service_call_history_persistence=True,
|
||||
)
|
||||
|
||||
workflow = HandoffBuilder(participants=[alpha, beta]).with_start_agent(alpha).build()
|
||||
|
||||
designated = set(workflow._output_executors or []) # type: ignore[arg-type]
|
||||
assert "alpha" in designated, f"alpha must be designated; got {designated}"
|
||||
assert "beta" in designated, f"beta must be designated; got {designated}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Magentic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _StubMagenticManager(MagenticManagerBase):
|
||||
"""Deterministic manager that finishes after one round with a fixed final answer."""
|
||||
|
||||
FINAL_ANSWER: ClassVar[str] = "MAGENTIC_FINAL"
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(max_stall_count=3)
|
||||
self.name = "magentic_manager"
|
||||
self.next_speaker_name = "alpha"
|
||||
|
||||
async def plan(self, magentic_context: MagenticContext) -> Message:
|
||||
return Message("assistant", ["Plan: do the thing."], author_name=self.name)
|
||||
|
||||
async def replan(self, magentic_context: MagenticContext) -> Message:
|
||||
return Message("assistant", ["Replan."], author_name=self.name)
|
||||
|
||||
async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger:
|
||||
is_satisfied = len(magentic_context.chat_history) > 1
|
||||
return MagenticProgressLedger(
|
||||
is_request_satisfied=MagenticProgressLedgerItem(reason="t", answer=is_satisfied),
|
||||
is_in_loop=MagenticProgressLedgerItem(reason="t", answer=False),
|
||||
is_progress_being_made=MagenticProgressLedgerItem(reason="t", answer=True),
|
||||
next_speaker=MagenticProgressLedgerItem(reason="t", answer=self.next_speaker_name),
|
||||
instruction_or_question=MagenticProgressLedgerItem(reason="t", answer="Go."),
|
||||
)
|
||||
|
||||
async def prepare_final_answer(self, magentic_context: MagenticContext) -> Message:
|
||||
return Message("assistant", [self.FINAL_ANSWER], author_name=self.name)
|
||||
|
||||
|
||||
def test_magentic_builder_default_only_manager_designated() -> None:
|
||||
"""Default Magentic: only the orchestrator (manager) is designated for terminal output;
|
||||
participant replies surface as type='intermediate'.
|
||||
|
||||
Structural assertion on ``workflow._output_executors`` because exercising a Magentic
|
||||
plan/replan loop end-to-end is heavy and orthogonal to this contract.
|
||||
"""
|
||||
manager = _StubMagenticManager()
|
||||
alpha = _EchoAgent(name="alpha")
|
||||
|
||||
workflow = MagenticBuilder(participants=[alpha], manager=manager).build()
|
||||
|
||||
designated = set(workflow._output_executors or []) # type: ignore[arg-type]
|
||||
assert "magentic_orchestrator" in designated, f"manager must be designated; got {designated}"
|
||||
assert "alpha" not in designated, f"participant must not be designated by default; got {designated}"
|
||||
|
||||
|
||||
def test_magentic_builder_intermediate_outputs_true_designates_all() -> None:
|
||||
"""Magentic with intermediate_outputs=True designates orchestrator + every participant."""
|
||||
manager = _StubMagenticManager()
|
||||
alpha = _EchoAgent(name="alpha")
|
||||
|
||||
workflow = MagenticBuilder(participants=[alpha], manager=manager, intermediate_outputs=True).build()
|
||||
|
||||
designated = set(workflow._output_executors or []) # type: ignore[arg-type]
|
||||
assert {"magentic_orchestrator", "alpha"}.issubset(designated)
|
||||
|
||||
Reference in New Issue
Block a user