diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index c20ef4da95..4329c5ce96 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -32,7 +32,7 @@ from .._types import ( from ..exceptions import AgentInvalidRequestException, AgentInvalidResponseException from ._checkpoint import CheckpointStorage from ._events import ( - _LIFECYCLE_EVENT_TYPES, + _AGENT_FORWARDED_EVENT_TYPES, WorkflowEvent, ) from ._message_utils import normalize_messages_input @@ -321,7 +321,7 @@ class WorkflowAgent(BaseAgent): function_invocation_kwargs=function_invocation_kwargs, client_kwargs=client_kwargs, ): - if event.type not in _LIFECYCLE_EVENT_TYPES: + if event.type in _AGENT_FORWARDED_EVENT_TYPES: output_events.append(event) result = self._convert_workflow_events_to_agent_response(response_id, output_events) @@ -578,12 +578,27 @@ class WorkflowAgent(BaseAgent): ) if isinstance(data, AgentResponseUpdate): - # AgentResponseUpdate in non-streaming mode would break message ordering - # if interleaved with non-streaming AgentResponses. - raise AgentInvalidRequestException( - "Output event with AgentResponseUpdate data cannot be emitted in non-streaming mode. " - "Please ensure executors emit AgentResponse for non-streaming workflows." + if not is_intermediate: + # Terminal AgentResponseUpdate in non-streaming mode would break + # message ordering if interleaved with non-streaming AgentResponses. + raise AgentInvalidRequestException( + "Output event with AgentResponseUpdate data cannot be emitted in non-streaming mode. " + "Please ensure executors emit AgentResponse for non-streaming workflows." + ) + # Intermediate updates surface as reasoning content on a synthesized + # Message; this preserves the partial signal without altering the + # terminal response shape. + messages.append( + Message( + contents=_to_text_reasoning(list(data.contents)), + role=data.role or "assistant", + author_name=data.author_name or output_event.executor_id, + message_id=data.message_id or str(uuid.uuid4()), + raw_representation=data.raw_representation, + ) ) + raw_representations.append(data.raw_representation) + continue if isinstance(data, AgentResponse): inner_msgs = [_mark_msg(m) for m in data.messages] if is_intermediate else list(data.messages) @@ -673,16 +688,17 @@ class WorkflowAgent(BaseAgent): Forwarding rule: - ``type='output'`` — terminal user-facing emission. Forwarded as-is. + - ``type='intermediate'`` (and the deprecated ``type='data'``) — forwarded + as intermediate. Text payloads are rewritten to ``text_reasoning`` content; + non-text content types (function_call, function_result, data, uri, …) + pass through unchanged since their ``Content.type`` already discriminates + them. - ``type='request_info'`` — request-info translation (unchanged). - - Any other typed event (``intermediate``, the deprecated ``data``, - orchestration-specific types) — forwarded as intermediate. Text payloads - are rewritten to ``text_reasoning`` content; non-text content types - (function_call, function_result, data, uri, …) pass through unchanged - since their ``Content.type`` already discriminates them. - - Lifecycle / diagnostic events (``started``/``status``/``failed``/ - ``warning``/``error``/``superstep_*``/``executor_*``) are dropped. + - Everything else (lifecycle, diagnostics, executor bookkeeping, + orchestration-internal events like ``group_chat``/``handoff_sent``/ + ``magentic_orchestrator``) is dropped. """ - if event.type in _LIFECYCLE_EVENT_TYPES: + if event.type not in _AGENT_FORWARDED_EVENT_TYPES: return [] if event.type != "request_info": diff --git a/python/packages/core/agent_framework/_workflows/_events.py b/python/packages/core/agent_framework/_workflows/_events.py index 5de6431fb5..e39df9a3a1 100644 --- a/python/packages/core/agent_framework/_workflows/_events.py +++ b/python/packages/core/agent_framework/_workflows/_events.py @@ -130,21 +130,16 @@ WorkflowEventType = Literal[ ] -# Framework-managed event types — workflow lifecycle, diagnostics, and executor bookkeeping -# — that carry no user-facing payload and are not forwarded through the -# ``workflow.as_agent()`` boundary. Internal to the ``_workflows`` package. -_LIFECYCLE_EVENT_TYPES: frozenset[str] = frozenset({ - "started", - "status", - "failed", - "warning", - "error", - "superstep_started", - "superstep_completed", - "executor_invoked", - "executor_completed", - "executor_failed", - "executor_bypassed", +# Event types forwarded across the ``workflow.as_agent()`` boundary. Anything not +# in this set — lifecycle events, diagnostics, executor bookkeeping, and +# orchestration-internal events (``group_chat``, ``handoff_sent``, +# ``magentic_orchestrator``) — stays inside the workflow and is not surfaced to +# agent callers. Internal to the ``_workflows`` package. +_AGENT_FORWARDED_EVENT_TYPES: frozenset[str] = frozenset({ + "output", + "intermediate", + "data", # deprecated alias for intermediate; retained for backward compat + "request_info", }) diff --git a/python/packages/core/tests/workflow/test_workflow_agent_intermediate.py b/python/packages/core/tests/workflow/test_workflow_agent_intermediate.py index bdd1c9dc3e..da4f0c8d5a 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent_intermediate.py +++ b/python/packages/core/tests/workflow/test_workflow_agent_intermediate.py @@ -156,3 +156,74 @@ async def test_workflow_agent_terminal_text_stays_text_not_reasoning() -> None: assert response.text == "the-answer" # No text_reasoning content because everything from `only` is terminal. assert all(c.type != "text_reasoning" for m in response.messages for c in m.contents) + + +@pytest.mark.asyncio +async def test_workflow_agent_non_streaming_accepts_intermediate_update() -> None: + """An intermediate event carrying AgentResponseUpdate must not raise in non-streaming + mode. It surfaces as a Message with text_reasoning content.""" + + @executor + async def emit(messages: list[Message], ctx: WorkflowContext[Never, AgentResponseUpdate]) -> None: + await ctx.add_event( + WorkflowEvent.intermediate( + "emit", + AgentResponseUpdate(contents=[Content.from_text(text="partial-thought")], role="assistant"), + ) + ) + await ctx.yield_output("FINAL") + + workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build() + agent = workflow.as_agent("test") + + response = await agent.run("hi") + reasoning = " ".join(c.text for m in response.messages for c in m.contents if c.type == "text_reasoning") + assert "partial-thought" in reasoning + assert response.text == "FINAL" + + +@pytest.mark.asyncio +async def test_workflow_agent_drops_orchestration_internal_events() -> None: + """Orchestration-internal event types (group_chat / handoff_sent / magentic_orchestrator) + must not surface through workflow.as_agent(). Their dataclass payloads would otherwise + be stringified by the generic fallback path and leak into response history.""" + + @executor + async def emit(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None: + # Construct typed orchestration-internal events directly to assert they get + # dropped at the agent boundary regardless of payload. + await ctx.add_event(WorkflowEvent("group_chat", data={"orchestrator": "details"})) # type: ignore[arg-type] + await ctx.add_event(WorkflowEvent("handoff_sent", data={"target": "agent_b"})) # type: ignore[arg-type] + await ctx.add_event(WorkflowEvent("magentic_orchestrator", data={"plan": "..."})) # type: ignore[arg-type] + await ctx.yield_output("FINAL") + + workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build() + agent = workflow.as_agent("test") + + response = await agent.run("hi") + all_text = " ".join(c.text for m in response.messages for c in m.contents if hasattr(c, "text")) + assert "orchestrator" not in all_text + assert "agent_b" not in all_text + assert "plan" not in all_text + assert response.text == "FINAL" + + +@pytest.mark.asyncio +async def test_workflow_agent_drops_orchestration_internal_events_streaming() -> None: + """Streaming counterpart — orchestration-internal events stay inside the workflow.""" + + @executor + async def emit(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None: + await ctx.add_event(WorkflowEvent("group_chat", data={"orchestrator": "details"})) # type: ignore[arg-type] + await ctx.yield_output("FINAL") + + workflow = WorkflowBuilder(start_executor=emit, output_executors=[emit]).build() + agent = workflow.as_agent("test") + + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("hi", stream=True): + updates.append(update) + + all_text = " ".join(c.text for u in updates for c in u.contents if hasattr(c, "text")) + assert "orchestrator" not in all_text + assert "FINAL" in all_text diff --git a/python/packages/devui/agent_framework_devui/_mapper.py b/python/packages/devui/agent_framework_devui/_mapper.py index 198407e126..303bb8838e 100644 --- a/python/packages/devui/agent_framework_devui/_mapper.py +++ b/python/packages/devui/agent_framework_devui/_mapper.py @@ -901,8 +901,11 @@ class MessageMapper: return events - # Handle output events separately to preserve output data - if event_type == "output": + # Handle yield events (output / intermediate / data) by extracting visible + # text from the payload. All three render as a visible message item so the + # gap that previously dropped intermediate yields into generic completed- + # trace events is closed. + if event_type in ("output", "intermediate", "data"): output_data = getattr(event, "data", None) executor_id = getattr(event, "executor_id", "unknown") diff --git a/python/packages/devui/tests/devui/test_mapper.py b/python/packages/devui/tests/devui/test_mapper.py index 5f828b5d90..5b70c7efe6 100644 --- a/python/packages/devui/tests/devui/test_mapper.py +++ b/python/packages/devui/tests/devui/test_mapper.py @@ -615,6 +615,42 @@ async def test_workflow_intermediate_event_with_agent_response_update_dispatched assert text_events[0].delta == "intermediate progress" +async def test_workflow_intermediate_event_with_string_payload_renders_visible_text( + mapper: MessageMapper, test_request: AgentFrameworkRequest +) -> None: + """A WorkflowEvent with type='intermediate' wrapping a plain string surfaces as a + visible output item — not a generic completed-trace event. Without this, executors + that ``await ctx.yield_output("plan: …")`` from non-designated nodes are silently + dropped in DevUI.""" + from agent_framework._workflows._events import WorkflowEvent + + event = WorkflowEvent.intermediate(executor_id="planner", data="plan: starting work") + events = await mapper.convert_event(event, test_request) + + assert len(events) == 1 + assert events[0].type == "response.output_item.added" + item = events[0].item + assert item.type == "message" + assert any("plan: starting work" in str(c) for c in item.content) + + +async def test_workflow_intermediate_event_with_message_payload_renders_visible_text( + mapper: MessageMapper, test_request: AgentFrameworkRequest +) -> None: + """type='intermediate' wrapping a Message surfaces visibly — same path as type='output'.""" + from agent_framework import Message + from agent_framework._workflows._events import WorkflowEvent + + msg = Message(role="assistant", contents=[Content.from_text(text="research note")]) + event = WorkflowEvent.intermediate(executor_id="researcher", data=msg) + events = await mapper.convert_event(event, test_request) + + assert len(events) == 1 + assert events[0].type == "response.output_item.added" + item = events[0].item + assert any("research note" in str(c) for c in item.content) + + # ============================================================================= # failed event (type='failed') Tests # ============================================================================= diff --git a/python/packages/orchestrations/tests/test_orchestration_intermediate_vs_terminal.py b/python/packages/orchestrations/tests/test_orchestration_intermediate_vs_terminal.py index 8f7d194406..62b9f385b6 100644 --- a/python/packages/orchestrations/tests/test_orchestration_intermediate_vs_terminal.py +++ b/python/packages/orchestrations/tests/test_orchestration_intermediate_vs_terminal.py @@ -14,8 +14,8 @@ Verifies that under the strict-output model: from __future__ import annotations -from collections.abc import AsyncIterable, Awaitable -from typing import Any, Literal, overload +from collections.abc import AsyncIterable, Awaitable, Callable +from typing import Any, ClassVar, Literal, overload import pytest from agent_framework import ( @@ -28,7 +28,18 @@ from agent_framework import ( Message, ResponseStream, ) -from agent_framework.orchestrations import ConcurrentBuilder, SequentialBuilder +from agent_framework.orchestrations import ( + ConcurrentBuilder, + GroupChatBuilder, + GroupChatState, + HandoffBuilder, + MagenticBuilder, + MagenticContext, + MagenticManagerBase, + MagenticProgressLedger, + MagenticProgressLedgerItem, + SequentialBuilder, +) class _EchoAgent(BaseAgent): @@ -257,3 +268,186 @@ async def test_concurrent_default_as_agent_participants_are_text_reasoning() -> # The aggregator's default-yielded AgentResponse passes through as text content. assert text_contents, "expected at least one terminal text content from the aggregator" + + +# --------------------------------------------------------------------------- +# GroupChat +# --------------------------------------------------------------------------- + + +def _two_step_selector() -> Callable[[GroupChatState], str]: + """Selector that picks each participant once, then keeps the first to keep tests bounded.""" + counter = {"n": 0} + + def _select(state: GroupChatState) -> str: + participants = list(state.participants.keys()) + step = counter["n"] + counter["n"] = step + 1 + if step == 0: + return participants[0] + if step == 1 and len(participants) > 1: + return participants[1] + return participants[0] + + return _select + + +@pytest.mark.asyncio +async def test_group_chat_default_only_orchestrator_is_output() -> None: + """Default GroupChat: only the orchestrator is designated; participant replies surface + as type='intermediate'.""" + alpha = _EchoAgent(name="alpha") + beta = _EchoAgent(name="beta") + + workflow = GroupChatBuilder( + participants=[alpha, beta], + max_rounds=2, + selection_func=_two_step_selector(), + ).build() + + output_executors: set[str] = set() + intermediate_executors: set[str] = set() + async for event in workflow.run("kickoff", stream=True): + if event.type == "output" and event.executor_id is not None: + output_executors.add(event.executor_id) + elif event.type == "intermediate" and event.executor_id is not None: + intermediate_executors.add(event.executor_id) + + assert "group_chat_orchestrator" in output_executors + assert "alpha" in intermediate_executors + assert "beta" in intermediate_executors + # Participants must NOT appear among designated outputs in the default contract. + assert "alpha" not in output_executors + assert "beta" not in output_executors + + +@pytest.mark.asyncio +async def test_group_chat_intermediate_outputs_true_designates_all() -> None: + """GroupChat with intermediate_outputs=True designates orchestrator + every participant — + each reply surfaces as type='output'.""" + alpha = _EchoAgent(name="alpha") + beta = _EchoAgent(name="beta") + + workflow = GroupChatBuilder( + participants=[alpha, beta], + max_rounds=2, + selection_func=_two_step_selector(), + intermediate_outputs=True, + ).build() + + output_executors: set[str] = set() + async for event in workflow.run("kickoff", stream=True): + if event.type == "output" and event.executor_id is not None: + output_executors.add(event.executor_id) + + assert {"group_chat_orchestrator", "alpha", "beta"}.issubset(output_executors) + + +# --------------------------------------------------------------------------- +# Handoff +# --------------------------------------------------------------------------- + + +def test_handoff_builder_designates_every_participant_as_output() -> None: + """Handoff has no intermediate channel — every participant's reply is a primary + output. The builder must designate all participants in the workflow's + ``_output_executors`` set so each per-agent yield surfaces as type='output'. + + Structural assertion (vs end-to-end) because Handoff agents require a full + chat-client/middleware stack that we don't want to reproduce in this contract test. + """ + from agent_framework import Agent + from agent_framework._clients import BaseChatClient + from agent_framework._middleware import ChatMiddlewareLayer + from agent_framework._tools import FunctionInvocationLayer + + class _StubClient(FunctionInvocationLayer[Any], ChatMiddlewareLayer[Any], BaseChatClient[Any]): + def __init__(self) -> None: + ChatMiddlewareLayer.__init__(self) + FunctionInvocationLayer.__init__(self) + BaseChatClient.__init__(self) + + def _inner_get_response(self, **kwargs: Any) -> Any: # pragma: no cover - never called + raise NotImplementedError + + alpha = Agent( + name="alpha", + id="alpha", + client=_StubClient(), + require_per_service_call_history_persistence=True, + ) + beta = Agent( + name="beta", + id="beta", + client=_StubClient(), + require_per_service_call_history_persistence=True, + ) + + workflow = HandoffBuilder(participants=[alpha, beta]).with_start_agent(alpha).build() + + designated = set(workflow._output_executors or []) # type: ignore[arg-type] + assert "alpha" in designated, f"alpha must be designated; got {designated}" + assert "beta" in designated, f"beta must be designated; got {designated}" + + +# --------------------------------------------------------------------------- +# Magentic +# --------------------------------------------------------------------------- + + +class _StubMagenticManager(MagenticManagerBase): + """Deterministic manager that finishes after one round with a fixed final answer.""" + + FINAL_ANSWER: ClassVar[str] = "MAGENTIC_FINAL" + + def __init__(self) -> None: + super().__init__(max_stall_count=3) + self.name = "magentic_manager" + self.next_speaker_name = "alpha" + + async def plan(self, magentic_context: MagenticContext) -> Message: + return Message("assistant", ["Plan: do the thing."], author_name=self.name) + + async def replan(self, magentic_context: MagenticContext) -> Message: + return Message("assistant", ["Replan."], author_name=self.name) + + async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger: + is_satisfied = len(magentic_context.chat_history) > 1 + return MagenticProgressLedger( + is_request_satisfied=MagenticProgressLedgerItem(reason="t", answer=is_satisfied), + is_in_loop=MagenticProgressLedgerItem(reason="t", answer=False), + is_progress_being_made=MagenticProgressLedgerItem(reason="t", answer=True), + next_speaker=MagenticProgressLedgerItem(reason="t", answer=self.next_speaker_name), + instruction_or_question=MagenticProgressLedgerItem(reason="t", answer="Go."), + ) + + async def prepare_final_answer(self, magentic_context: MagenticContext) -> Message: + return Message("assistant", [self.FINAL_ANSWER], author_name=self.name) + + +def test_magentic_builder_default_only_manager_designated() -> None: + """Default Magentic: only the orchestrator (manager) is designated for terminal output; + participant replies surface as type='intermediate'. + + Structural assertion on ``workflow._output_executors`` because exercising a Magentic + plan/replan loop end-to-end is heavy and orthogonal to this contract. + """ + manager = _StubMagenticManager() + alpha = _EchoAgent(name="alpha") + + workflow = MagenticBuilder(participants=[alpha], manager=manager).build() + + designated = set(workflow._output_executors or []) # type: ignore[arg-type] + assert "magentic_orchestrator" in designated, f"manager must be designated; got {designated}" + assert "alpha" not in designated, f"participant must not be designated by default; got {designated}" + + +def test_magentic_builder_intermediate_outputs_true_designates_all() -> None: + """Magentic with intermediate_outputs=True designates orchestrator + every participant.""" + manager = _StubMagenticManager() + alpha = _EchoAgent(name="alpha") + + workflow = MagenticBuilder(participants=[alpha], manager=manager, intermediate_outputs=True).build() + + designated = set(workflow._output_executors or []) # type: ignore[arg-type] + assert {"magentic_orchestrator", "alpha"}.issubset(designated)