Address PR review feedback on intermediate output forwarding

- Switch workflow.as_agent() forwarding to an explicit allowlist of {output,
  intermediate, data, request_info} so orchestration-internal events
  (group_chat, handoff_sent, magentic_orchestrator) stay inside the workflow
  instead of leaking into agent responses via str(data) coercion.
- Stop raising on intermediate AgentResponseUpdate in non-streaming run();
  surface the partial as a Message with text_reasoning content. The defensive
  raise still applies to terminal output events, where Update payloads would
  corrupt message ordering.
- Extend the DevUI workflow-event mapper so intermediate yields wrapping
  plain strings, Messages, and list[Message] render as visible output items
  instead of generic completed-trace events.
- Add orchestration coverage for GroupChat, Handoff, and Magentic builders
  (default vs intermediate_outputs=True; structural where end-to-end is heavy).
This commit is contained in:
Evan Mattson
2026-05-06 08:47:58 +09:00
Unverified
parent ca0ef3b188
commit 394bcd6071
6 changed files with 350 additions and 35 deletions
@@ -32,7 +32,7 @@ from .._types import (
from ..exceptions import AgentInvalidRequestException, AgentInvalidResponseException
from ._checkpoint import CheckpointStorage
from ._events import (
_LIFECYCLE_EVENT_TYPES,
_AGENT_FORWARDED_EVENT_TYPES,
WorkflowEvent,
)
from ._message_utils import normalize_messages_input
@@ -321,7 +321,7 @@ class WorkflowAgent(BaseAgent):
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=client_kwargs,
):
if event.type not in _LIFECYCLE_EVENT_TYPES:
if event.type in _AGENT_FORWARDED_EVENT_TYPES:
output_events.append(event)
result = self._convert_workflow_events_to_agent_response(response_id, output_events)
@@ -578,12 +578,27 @@ class WorkflowAgent(BaseAgent):
)
if isinstance(data, AgentResponseUpdate):
# AgentResponseUpdate in non-streaming mode would break message ordering
# if interleaved with non-streaming AgentResponses.
raise AgentInvalidRequestException(
"Output event with AgentResponseUpdate data cannot be emitted in non-streaming mode. "
"Please ensure executors emit AgentResponse for non-streaming workflows."
if not is_intermediate:
# Terminal AgentResponseUpdate in non-streaming mode would break
# message ordering if interleaved with non-streaming AgentResponses.
raise AgentInvalidRequestException(
"Output event with AgentResponseUpdate data cannot be emitted in non-streaming mode. "
"Please ensure executors emit AgentResponse for non-streaming workflows."
)
# Intermediate updates surface as reasoning content on a synthesized
# Message; this preserves the partial signal without altering the
# terminal response shape.
messages.append(
Message(
contents=_to_text_reasoning(list(data.contents)),
role=data.role or "assistant",
author_name=data.author_name or output_event.executor_id,
message_id=data.message_id or str(uuid.uuid4()),
raw_representation=data.raw_representation,
)
)
raw_representations.append(data.raw_representation)
continue
if isinstance(data, AgentResponse):
inner_msgs = [_mark_msg(m) for m in data.messages] if is_intermediate else list(data.messages)
@@ -673,16 +688,17 @@ class WorkflowAgent(BaseAgent):
Forwarding rule:
- ``type='output'`` — terminal user-facing emission. Forwarded as-is.
- ``type='intermediate'`` (and the deprecated ``type='data'``) — forwarded
as intermediate. Text payloads are rewritten to ``text_reasoning`` content;
non-text content types (function_call, function_result, data, uri, …)
pass through unchanged since their ``Content.type`` already discriminates
them.
- ``type='request_info'`` — request-info translation (unchanged).
- Any other typed event (``intermediate``, the deprecated ``data``,
orchestration-specific types) — forwarded as intermediate. Text payloads
are rewritten to ``text_reasoning`` content; non-text content types
(function_call, function_result, data, uri, …) pass through unchanged
since their ``Content.type`` already discriminates them.
- Lifecycle / diagnostic events (``started``/``status``/``failed``/
``warning``/``error``/``superstep_*``/``executor_*``) are dropped.
- Everything else (lifecycle, diagnostics, executor bookkeeping,
orchestration-internal events like ``group_chat``/``handoff_sent``/
``magentic_orchestrator``) is dropped.
"""
if event.type in _LIFECYCLE_EVENT_TYPES:
if event.type not in _AGENT_FORWARDED_EVENT_TYPES:
return []
if event.type != "request_info":
@@ -130,21 +130,16 @@ WorkflowEventType = Literal[
]
# Framework-managed event types — workflow lifecycle, diagnostics, and executor bookkeeping
# that carry no user-facing payload and are not forwarded through the
# ``workflow.as_agent()`` boundary. Internal to the ``_workflows`` package.
_LIFECYCLE_EVENT_TYPES: frozenset[str] = frozenset({
"started",
"status",
"failed",
"warning",
"error",
"superstep_started",
"superstep_completed",
"executor_invoked",
"executor_completed",
"executor_failed",
"executor_bypassed",
# Event types forwarded across the ``workflow.as_agent()`` boundary. Anything not
# in this set — lifecycle events, diagnostics, executor bookkeeping, and
# orchestration-internal events (``group_chat``, ``handoff_sent``,
# ``magentic_orchestrator``) — stays inside the workflow and is not surfaced to
# agent callers. Internal to the ``_workflows`` package.
_AGENT_FORWARDED_EVENT_TYPES: frozenset[str] = frozenset({
"output",
"intermediate",
"data", # deprecated alias for intermediate; retained for backward compat
"request_info",
})
@@ -156,3 +156,74 @@ async def test_workflow_agent_terminal_text_stays_text_not_reasoning() -> None:
assert response.text == "the-answer"
# No text_reasoning content because everything from `only` is terminal.
assert all(c.type != "text_reasoning" for m in response.messages for c in m.contents)
@pytest.mark.asyncio
async def test_workflow_agent_non_streaming_accepts_intermediate_update() -> None:
"""An intermediate event carrying AgentResponseUpdate must not raise in non-streaming
mode. It surfaces as a Message with text_reasoning content."""
@executor
async def emit(messages: list[Message], ctx: WorkflowContext[Never, AgentResponseUpdate]) -> None:
await ctx.add_event(
WorkflowEvent.intermediate(
"emit",
AgentResponseUpdate(contents=[Content.from_text(text="partial-thought")], role="assistant"),
)
)
await ctx.yield_output("FINAL")
workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build()
agent = workflow.as_agent("test")
response = await agent.run("hi")
reasoning = " ".join(c.text for m in response.messages for c in m.contents if c.type == "text_reasoning")
assert "partial-thought" in reasoning
assert response.text == "FINAL"
@pytest.mark.asyncio
async def test_workflow_agent_drops_orchestration_internal_events() -> None:
"""Orchestration-internal event types (group_chat / handoff_sent / magentic_orchestrator)
must not surface through workflow.as_agent(). Their dataclass payloads would otherwise
be stringified by the generic fallback path and leak into response history."""
@executor
async def emit(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None:
# Construct typed orchestration-internal events directly to assert they get
# dropped at the agent boundary regardless of payload.
await ctx.add_event(WorkflowEvent("group_chat", data={"orchestrator": "details"})) # type: ignore[arg-type]
await ctx.add_event(WorkflowEvent("handoff_sent", data={"target": "agent_b"})) # type: ignore[arg-type]
await ctx.add_event(WorkflowEvent("magentic_orchestrator", data={"plan": "..."})) # type: ignore[arg-type]
await ctx.yield_output("FINAL")
workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build()
agent = workflow.as_agent("test")
response = await agent.run("hi")
all_text = " ".join(c.text for m in response.messages for c in m.contents if hasattr(c, "text"))
assert "orchestrator" not in all_text
assert "agent_b" not in all_text
assert "plan" not in all_text
assert response.text == "FINAL"
@pytest.mark.asyncio
async def test_workflow_agent_drops_orchestration_internal_events_streaming() -> None:
"""Streaming counterpart — orchestration-internal events stay inside the workflow."""
@executor
async def emit(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None:
await ctx.add_event(WorkflowEvent("group_chat", data={"orchestrator": "details"})) # type: ignore[arg-type]
await ctx.yield_output("FINAL")
workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build()
agent = workflow.as_agent("test")
updates: list[AgentResponseUpdate] = []
async for update in agent.run("hi", stream=True):
updates.append(update)
all_text = " ".join(c.text for u in updates for c in u.contents if hasattr(c, "text"))
assert "orchestrator" not in all_text
assert "FINAL" in all_text
@@ -901,8 +901,11 @@ class MessageMapper:
return events
# Handle output events separately to preserve output data
if event_type == "output":
# Handle yield events (output / intermediate / data) by extracting visible
# text from the payload. All three render as a visible message item so the
# gap that previously dropped intermediate yields into generic completed-
# trace events is closed.
if event_type in ("output", "intermediate", "data"):
output_data = getattr(event, "data", None)
executor_id = getattr(event, "executor_id", "unknown")
@@ -615,6 +615,42 @@ async def test_workflow_intermediate_event_with_agent_response_update_dispatched
assert text_events[0].delta == "intermediate progress"
async def test_workflow_intermediate_event_with_string_payload_renders_visible_text(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""A WorkflowEvent with type='intermediate' wrapping a plain string surfaces as a
visible output item — not a generic completed-trace event. Without this, executors
that ``await ctx.yield_output("plan: …")`` from non-designated nodes are silently
dropped in DevUI."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent.intermediate(executor_id="planner", data="plan: starting work")
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
item = events[0].item
assert item.type == "message"
assert any("plan: starting work" in str(c) for c in item.content)
async def test_workflow_intermediate_event_with_message_payload_renders_visible_text(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""type='intermediate' wrapping a Message surfaces visibly — same path as type='output'."""
from agent_framework import Message
from agent_framework._workflows._events import WorkflowEvent
msg = Message(role="assistant", contents=[Content.from_text(text="research note")])
event = WorkflowEvent.intermediate(executor_id="researcher", data=msg)
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
item = events[0].item
assert any("research note" in str(c) for c in item.content)
# =============================================================================
# failed event (type='failed') Tests
# =============================================================================
@@ -14,8 +14,8 @@ Verifies that under the strict-output model:
from __future__ import annotations
from collections.abc import AsyncIterable, Awaitable
from typing import Any, Literal, overload
from collections.abc import AsyncIterable, Awaitable, Callable
from typing import Any, ClassVar, Literal, overload
import pytest
from agent_framework import (
@@ -28,7 +28,18 @@ from agent_framework import (
Message,
ResponseStream,
)
from agent_framework.orchestrations import ConcurrentBuilder, SequentialBuilder
from agent_framework.orchestrations import (
ConcurrentBuilder,
GroupChatBuilder,
GroupChatState,
HandoffBuilder,
MagenticBuilder,
MagenticContext,
MagenticManagerBase,
MagenticProgressLedger,
MagenticProgressLedgerItem,
SequentialBuilder,
)
class _EchoAgent(BaseAgent):
@@ -257,3 +268,186 @@ async def test_concurrent_default_as_agent_participants_are_text_reasoning() ->
# The aggregator's default-yielded AgentResponse passes through as text content.
assert text_contents, "expected at least one terminal text content from the aggregator"
# ---------------------------------------------------------------------------
# GroupChat
# ---------------------------------------------------------------------------
def _two_step_selector() -> Callable[[GroupChatState], str]:
"""Selector that picks each participant once, then keeps the first to keep tests bounded."""
counter = {"n": 0}
def _select(state: GroupChatState) -> str:
participants = list(state.participants.keys())
step = counter["n"]
counter["n"] = step + 1
if step == 0:
return participants[0]
if step == 1 and len(participants) > 1:
return participants[1]
return participants[0]
return _select
@pytest.mark.asyncio
async def test_group_chat_default_only_orchestrator_is_output() -> None:
"""Default GroupChat: only the orchestrator is designated; participant replies surface
as type='intermediate'."""
alpha = _EchoAgent(name="alpha")
beta = _EchoAgent(name="beta")
workflow = GroupChatBuilder(
participants=[alpha, beta],
max_rounds=2,
selection_func=_two_step_selector(),
).build()
output_executors: set[str] = set()
intermediate_executors: set[str] = set()
async for event in workflow.run("kickoff", stream=True):
if event.type == "output" and event.executor_id is not None:
output_executors.add(event.executor_id)
elif event.type == "intermediate" and event.executor_id is not None:
intermediate_executors.add(event.executor_id)
assert "group_chat_orchestrator" in output_executors
assert "alpha" in intermediate_executors
assert "beta" in intermediate_executors
# Participants must NOT appear among designated outputs in the default contract.
assert "alpha" not in output_executors
assert "beta" not in output_executors
@pytest.mark.asyncio
async def test_group_chat_intermediate_outputs_true_designates_all() -> None:
"""GroupChat with intermediate_outputs=True designates orchestrator + every participant —
each reply surfaces as type='output'."""
alpha = _EchoAgent(name="alpha")
beta = _EchoAgent(name="beta")
workflow = GroupChatBuilder(
participants=[alpha, beta],
max_rounds=2,
selection_func=_two_step_selector(),
intermediate_outputs=True,
).build()
output_executors: set[str] = set()
async for event in workflow.run("kickoff", stream=True):
if event.type == "output" and event.executor_id is not None:
output_executors.add(event.executor_id)
assert {"group_chat_orchestrator", "alpha", "beta"}.issubset(output_executors)
# ---------------------------------------------------------------------------
# Handoff
# ---------------------------------------------------------------------------
def test_handoff_builder_designates_every_participant_as_output() -> None:
"""Handoff has no intermediate channel — every participant's reply is a primary
output. The builder must designate all participants in the workflow's
``_output_executors`` set so each per-agent yield surfaces as type='output'.
Structural assertion (vs end-to-end) because Handoff agents require a full
chat-client/middleware stack that we don't want to reproduce in this contract test.
"""
from agent_framework import Agent
from agent_framework._clients import BaseChatClient
from agent_framework._middleware import ChatMiddlewareLayer
from agent_framework._tools import FunctionInvocationLayer
class _StubClient(FunctionInvocationLayer[Any], ChatMiddlewareLayer[Any], BaseChatClient[Any]):
def __init__(self) -> None:
ChatMiddlewareLayer.__init__(self)
FunctionInvocationLayer.__init__(self)
BaseChatClient.__init__(self)
def _inner_get_response(self, **kwargs: Any) -> Any: # pragma: no cover - never called
raise NotImplementedError
alpha = Agent(
name="alpha",
id="alpha",
client=_StubClient(),
require_per_service_call_history_persistence=True,
)
beta = Agent(
name="beta",
id="beta",
client=_StubClient(),
require_per_service_call_history_persistence=True,
)
workflow = HandoffBuilder(participants=[alpha, beta]).with_start_agent(alpha).build()
designated = set(workflow._output_executors or []) # type: ignore[arg-type]
assert "alpha" in designated, f"alpha must be designated; got {designated}"
assert "beta" in designated, f"beta must be designated; got {designated}"
# ---------------------------------------------------------------------------
# Magentic
# ---------------------------------------------------------------------------
class _StubMagenticManager(MagenticManagerBase):
"""Deterministic manager that finishes after one round with a fixed final answer."""
FINAL_ANSWER: ClassVar[str] = "MAGENTIC_FINAL"
def __init__(self) -> None:
super().__init__(max_stall_count=3)
self.name = "magentic_manager"
self.next_speaker_name = "alpha"
async def plan(self, magentic_context: MagenticContext) -> Message:
return Message("assistant", ["Plan: do the thing."], author_name=self.name)
async def replan(self, magentic_context: MagenticContext) -> Message:
return Message("assistant", ["Replan."], author_name=self.name)
async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger:
is_satisfied = len(magentic_context.chat_history) > 1
return MagenticProgressLedger(
is_request_satisfied=MagenticProgressLedgerItem(reason="t", answer=is_satisfied),
is_in_loop=MagenticProgressLedgerItem(reason="t", answer=False),
is_progress_being_made=MagenticProgressLedgerItem(reason="t", answer=True),
next_speaker=MagenticProgressLedgerItem(reason="t", answer=self.next_speaker_name),
instruction_or_question=MagenticProgressLedgerItem(reason="t", answer="Go."),
)
async def prepare_final_answer(self, magentic_context: MagenticContext) -> Message:
return Message("assistant", [self.FINAL_ANSWER], author_name=self.name)
def test_magentic_builder_default_only_manager_designated() -> None:
"""Default Magentic: only the orchestrator (manager) is designated for terminal output;
participant replies surface as type='intermediate'.
Structural assertion on ``workflow._output_executors`` because exercising a Magentic
plan/replan loop end-to-end is heavy and orthogonal to this contract.
"""
manager = _StubMagenticManager()
alpha = _EchoAgent(name="alpha")
workflow = MagenticBuilder(participants=[alpha], manager=manager).build()
designated = set(workflow._output_executors or []) # type: ignore[arg-type]
assert "magentic_orchestrator" in designated, f"manager must be designated; got {designated}"
assert "alpha" not in designated, f"participant must not be designated by default; got {designated}"
def test_magentic_builder_intermediate_outputs_true_designates_all() -> None:
"""Magentic with intermediate_outputs=True designates orchestrator + every participant."""
manager = _StubMagenticManager()
alpha = _EchoAgent(name="alpha")
workflow = MagenticBuilder(participants=[alpha], manager=manager, intermediate_outputs=True).build()
designated = set(workflow._output_executors or []) # type: ignore[arg-type]
assert {"magentic_orchestrator", "alpha"}.issubset(designated)