Python: Improve the handling of intermediate outputs for workflows and orchestrations (#5623)

* Improve the handling of intermediate outputs for workflows and orchestrations

* Address PR review feedback on intermediate output forwarding

- Switch workflow.as_agent() forwarding to an explicit allowlist of {output,
  intermediate, data, request_info} so orchestration-internal events
  (group_chat, handoff_sent, magentic_orchestrator) stay inside the workflow
  instead of leaking into agent responses via str(data) coercion.
- Stop raising on intermediate AgentResponseUpdate in non-streaming run();
  surface the partial as a Message with text_reasoning content. The defensive
  raise still applies to terminal output events, where Update payloads would
  corrupt message ordering.
- Extend the DevUI workflow-event mapper so intermediate yields wrapping
  plain strings, Messages, and list[Message] render as visible output items
  instead of generic completed-trace events.
- Add orchestration coverage for GroupChat, Handoff, and Magentic builders
  (default vs intermediate_outputs=True; structural where end-to-end is heavy).

* Lift output-designation policy into a value type

Replace the ``Workflow._output_executors`` list and the
``RunnerContext.should_label_as_intermediate`` Protocol method with a single
immutable ``OutputDesignation`` value type owned by ``Workflow``. Thread the
designation as a parameter through the existing call chain (Runner ->
EdgeRunner -> Executor -> WorkflowContext) so ``yield_output`` consults the
threaded snapshot directly rather than calling back into the runner context.

Removes the ``InProcRunnerContext._workflow`` back-reference and the
``WorkflowBuilder.build()`` assignment that wired it up. Adds the public
predicate ``Workflow.is_terminal_executor(executor_id)`` for external
observers; ``OutputDesignation`` itself stays package-internal.

Key decisions
- ``OutputDesignation.designated`` is ``frozenset[str] | None`` -- ``None``
  preserves legacy "every yield is type='output'" behavior, any frozenset
  (including empty) opts into strict mode. The ``DeprecationWarning`` for
  legacy mode at build time is unchanged.
- ``output_designation`` is an optional parameter on ``Runner``,
  ``EdgeRunner.send_message``, ``EdgeRunner._execute_on_target``,
  ``Executor.execute``, ``Executor._create_context_for_handler``, and
  ``WorkflowContext.__init__``. Each defaults to legacy ``OutputDesignation()``
  so direct callers (Azure Functions ``CapturingRunnerContext``,
  ``test_runner`` recording fixtures) keep working without ceremony.
- The workflow-level filter in ``_run_core`` reads ``self._output_designation``
  live, preserving today's semantics where mutating the designation after
  build still affects subsequent runs (used by two existing tests).
- ``Workflow.to_dict()`` continues to emit ``"output_executors":
  list[str] | None`` (sorted from the frozenset). Checkpoint format unchanged.

Files changed
- _workflow.py: add ``OutputDesignation`` dataclass; replace
  ``_output_executors`` with ``_output_designation``; add
  ``is_terminal_executor``; delete ``_should_yield_output_event``.
- _runner_context.py: drop ``should_label_as_intermediate`` Protocol method
  and ``InProcRunnerContext`` impl; drop ``_workflow`` back-reference.
- _workflow_builder.py: remove ``context._workflow = workflow`` assignment.
- _runner.py, _edge_runner.py, _executor.py, _workflow_context.py: thread
  ``output_designation`` parameter through the call chain.
- tests/workflow/test_output_designation.py (new): three-state coverage of
  the value type plus the public predicate delegation.
- tests/workflow/test_workflow_builder.py, test_validation.py,
  test_workflow.py, test_runner.py and
  orchestrations/tests/test_orchestration_intermediate_vs_terminal.py:
  switch probes from ``_output_executors`` set checks to
  ``get_output_executors`` / ``is_terminal_executor``; update two
  post-build mutation tests to set ``_output_designation`` instead.

Verification
- core/tests/workflow/, orchestrations/tests/, azurefunctions/tests/:
  1119 passed, 42 skipped, 2 xfailed.
- ``uv run poe lint``: clean.
- ``uv run poe typing``: only the pre-existing
  ``_AGENT_FORWARDED_EVENT_TYPES`` pyright warning from 394bcd607 remains.

Notes for next iteration
- The builder's own ``_output_executors`` attribute (``list[Executor |
  SupportsAgentRun]``) is intentionally untouched; the issue scoped the
  rename to the workflow attribute.
- Adjacent review candidates (twin ``WorkflowAgent`` translators,
  ``_AGENT_FORWARDED_EVENT_TYPES`` kind classifier,
  ``_event_origin_context`` ContextVar removal, ``WorkflowEvent`` ADT
  split, legacy-mode removal) remain out of scope.

* Add explicit workflow output designation

Key decisions

- Extend the internal OutputDesignation value type from terminal-only membership to output/intermediate/hidden classification. Legacy mode remains outputs=None, so workflows built without output_executors or intermediate_executors still label every yield_output as type='output'.

- WorkflowBuilder now accepts intermediate_executors. Providing either designation enters explicit mode; output executors emit output, intermediate executors emit intermediate, and unlisted yield_output payloads are hidden from caller-facing events while remaining in executor_completed data.

- Empty explicit designation, duplicate entries, overlaps, unknown executors, and designated executors without workflow output annotations fail build validation. Existing orchestration builders pass intermediate-capable participants through intermediate_executors to preserve current intermediate_outputs behavior until participant-oriented designation lands.

Files changed

- packages/core/agent_framework/_workflows/_workflow.py, _workflow_builder.py, _workflow_context.py, _validation.py, _events.py

- packages/core/tests/workflow/test_output_designation.py, test_output_executors_contract.py, test_strict_mode_event_labeling.py, test_validation.py, test_workflow.py, test_workflow_agent_intermediate.py

- packages/orchestrations/agent_framework_orchestrations/_sequential.py, _concurrent.py, _group_chat.py, _magentic.py

- packages/core/AGENTS.md

Verification

- uv run pytest packages/core/tests/workflow packages/orchestrations/tests packages/devui/tests/devui/test_mapper.py -q

- uv run pytest packages/azurefunctions/tests -q

- uv run poe lint

- uv run poe typing fails only on pre-existing packages/core/agent_framework/_workflows/_agent.py _AGENT_FORWARDED_EVENT_TYPES private-use pyright error.

Notes for next iteration

- issues/03-core-workflow-explicit-designation.md was moved to issues/done but issues/ remains untracked and intentionally excluded from this commit.

- Slice 4 should tighten workflow.as_agent() mapping for hidden emissions and streaming-only update payloads; Slice 5 should replace orchestration intermediate_outputs with participant-oriented designation.

* Tighten workflow-as-agent output mapping

Key decisions

- Treat AgentResponseUpdate as a streaming-only payload across the workflow.as_agent() adapter, so non-streaming agent runs now reject both terminal output and intermediate workflow events carrying updates.
- Keep streaming classification behavior explicit: terminal update payloads remain normal text content, while intermediate update payloads are rewritten to text_reasoning content.
- Add explicit-mode coverage proving hidden yield_output emissions do not appear in non-streaming AgentResponse messages or streaming AgentResponseUpdate chunks.

Files changed

- packages/core/agent_framework/_workflows/_agent.py
- packages/core/tests/workflow/test_workflow_agent_intermediate.py

Verification

- uv run pytest packages/core/tests/workflow/test_workflow_agent_intermediate.py -q
- uv run pytest packages/core/tests/workflow/test_workflow_agent.py packages/core/tests/workflow/test_workflow_agent_intermediate.py -q
- uv run pytest packages/core/tests/workflow packages/orchestrations/tests packages/devui/tests/devui/test_mapper.py -q
- uv run poe lint
- uv run poe typing fails only on the pre-existing packages/core/agent_framework/_workflows/_agent.py _AGENT_FORWARDED_EVENT_TYPES private-use pyright error.

Blockers or notes for next iteration

- issues/04-workflow-as-agent-output-mapping.md was moved to issues/done/ but issues/ remains untracked and intentionally excluded from this commit.
- Slice 5 should replace orchestration intermediate_outputs with participant-oriented designation.

* Add orchestration participant output designation

Key decisions

- Replace orchestration intermediate_outputs with participant-oriented output_participants and intermediate_participants across Sequential, Concurrent, GroupChat, Magentic, and Handoff builders.
- Keep synthetic final executors terminal by default for Concurrent, GroupChat, and Magentic; keep Sequential's final participant terminal by default; keep Handoff participants terminal by default.
- Centralize participant designation validation for empty explicit designation, duplicates, overlaps, and unknown participants, then map validated participants to workflow output/intermediate executors.

Files changed

- packages/orchestrations/agent_framework_orchestrations/_participant_designation.py
- packages/orchestrations/agent_framework_orchestrations/_sequential.py
- packages/orchestrations/agent_framework_orchestrations/_concurrent.py
- packages/orchestrations/agent_framework_orchestrations/_group_chat.py
- packages/orchestrations/agent_framework_orchestrations/_magentic.py
- packages/orchestrations/agent_framework_orchestrations/_handoff.py
- packages/orchestrations/tests/test_orchestration_intermediate_vs_terminal.py
- packages/orchestrations/tests/test_magentic.py

Blockers or notes for next iteration

- issues/05-orchestration-participant-designation.md was moved to issues/done/ but issues/ remains untracked and intentionally excluded from this commit.
- Slice 7 should migrate samples and docs away from intermediate_outputs to the new participant designation API.
- uv run poe typing still fails only on the pre-existing packages/core/agent_framework/_workflows/_agent.py _AGENT_FORWARDED_EVENT_TYPES private-use pyright error.

* Migrate samples to explicit output designation

Key decisions

- Replace sample usage of the removed orchestration intermediate_outputs boolean with participant-oriented intermediate_participants designation.
- Update raw workflow guidance to show output_executors together with intermediate_executors, and document that unlisted yields are hidden in explicit designation mode.
- Keep orchestration final outputs terminal while streaming designated participant responses as intermediate progress, including workflow.as_agent() samples where intermediates map to text_reasoning content.
- Refresh workflow and orchestration README guidance plus the changelog reference so public docs no longer point users at intermediate_outputs.

Files changed

- CHANGELOG.md
- packages/orchestrations/README.md
- samples/README.md
- samples/03-workflows/README.md
- samples/03-workflows/control-flow/intermediate_vs_terminal_outputs.py
- samples/03-workflows/orchestrations/README.md
- samples/03-workflows/orchestrations/group_chat_agent_manager.py
- samples/03-workflows/orchestrations/group_chat_philosophical_debate.py
- samples/03-workflows/orchestrations/group_chat_simple_selector.py
- samples/03-workflows/orchestrations/magentic.py
- samples/03-workflows/orchestrations/magentic_human_plan_review.py
- samples/03-workflows/orchestrations/sequential_chain_only_agent_responses.py
- samples/03-workflows/agents/group_chat_workflow_as_agent.py
- samples/03-workflows/agents/magentic_workflow_as_agent.py
- samples/03-workflows/agents/sequential_workflow_as_agent.py
- samples/semantic-kernel-migration/orchestrations/group_chat.py
- samples/semantic-kernel-migration/orchestrations/magentic.py

Blockers or notes for next iteration

- issues/07-samples-and-docs-explicit-output-designation.md was moved to issues/done/ but issues/ remains untracked and intentionally excluded from this commit.
- issues/06-devui-intermediate-event-rendering.md remains present and appears already satisfied by existing DevUI mapper/tests from the prior implementation slice.
- PRD-explicit-workflow-output-designation.md remains untracked and intentionally excluded from this commit.

* Render DevUI intermediate workflow outputs

Key decisions

- Preserve workflow output designation metadata on visible DevUI output messages and text deltas so intermediate/data emissions remain distinguishable from terminal output.
- Render intermediate workflow message items in the execution timeline using executor metadata, while excluding them from the final workflow result aggregation.
- Keep terminal output message rendering unchanged and retain legacy data events on the intermediate compatibility path.

Files changed

- packages/devui/agent_framework_devui/_mapper.py
- packages/devui/frontend/src/components/features/workflow/execution-timeline.tsx
- packages/devui/frontend/src/components/features/workflow/workflow-view.tsx
- packages/devui/frontend/src/types/openai.ts
- packages/devui/tests/devui/test_mapper.py

Blockers or notes for next iteration

- issues/06-devui-intermediate-event-rendering.md was moved to issues/done/ but issues/ remains untracked and intentionally excluded from this commit.
- PRD-explicit-workflow-output-designation.md remains untracked and intentionally excluded from this commit.
- uv run poe typing still fails only on the pre-existing packages/core/agent_framework/_workflows/_agent.py _AGENT_FORWARDED_EVENT_TYPES private-use pyright error.

* Fix mypy

* Clarify orchestration participant output config

* Rename participant output kwargs for clarity

output_participants -> final_output_from, intermediate_participants ->
intermediate_output_from. The old names read like categories of
participant; the new names make it clear the kwarg designates which
participants' outputs surface as final vs. intermediate events.

* Rename core workflow output kwargs with deprecation shim

Adds final_output_from / intermediate_output_from as canonical kwargs on
Workflow and WorkflowBuilder. Old output_executors / intermediate_executors
kwargs continue to work but emit DeprecationWarning via a shared coalesce
helper that also rejects supplying both. Wire-format keys in to_dict()
stay as output_executors / intermediate_executors so checkpoint
compatibility is preserved.

Internal call sites in orchestrations and samples updated to the new
names so users following sample code learn the canonical vocabulary;
legacy callers still work with a one-shot warning.

* Suppress pyright reportPrivateUsage on cross-module sentinel import

* Update docstrings

* Propagate sub-workflow intermediate outputs, fix handoff/sequential intermediate-only designation, and shore up tests, sample, and docstrings around the intermediate output contract.

* Add canonical workflow output_from selection

Key decisions:\n- Make output_from the canonical workflow-output allow-list and keep output_executors/final_output_from as deprecated compatibility aliases.\n- Treat empty output_from/intermediate_output_from lists as explicit selections and keep validation responsible for empty, duplicate, overlap, and unknown selections.\n- Remove the branch-only public intermediate_executors WorkflowBuilder kwarg while preserving legacy wire keys in to_dict().\n\nFiles changed:\n- packages/core/agent_framework/_workflows/_workflow.py\n- packages/core/agent_framework/_workflows/_workflow_builder.py\n- packages/core/agent_framework/_workflows/_workflow_context.py\n- packages/core/agent_framework/_workflows/_agent.py\n- packages/core/agent_framework/_workflows/_agent_executor.py\n- packages/core/tests/workflow/* output-selection coverage updates\n- packages/core/AGENTS.md\n- issues/done/001-canonical-list-based-output-selection.md\n\nBlockers/notes:\n- Orchestration builders still pass final_output_from internally; follow-up issue 004 should migrate them to output_from.\n- Legacy omitted-selection behavior and explicit all/all_other literals are left for issues 002 and 003.

* Add explicit all workflow output selection

Key decisions:
- Treat output_from='all' as an explicit workflow-output selection sentinel and expand it at build time to executors with declared workflow output types.
- Keep omitted output selections in legacy all-output mode with a deprecation warning that names output_from and intermediate_output_from and points to output_from='all'.
- Reject intermediate_output_from='all' at construction because the all-output literal is output-only for this issue.

Files changed:
- packages/core/agent_framework/_workflows/_workflow_builder.py
- packages/core/tests/workflow/test_output_executors_contract.py
- issues/done/002-explicit-all-output-and-legacy-migration.md

Blockers/notes:
- all_other intermediate-output selection remains for issue 003.
- Workflow-as-agent/orchestration parity remains for issue 004.

* Add all-other intermediate output selection

Key decisions:
- Treat intermediate_output_from='all_other' as an explicit intermediate-output selection sentinel and expand it at build time after the workflow graph is complete.
- Expand all_other to output-capable executors not selected by output_from; omitted or empty output_from selects no workflow outputs, while output_from='all' leaves an empty intermediate selection.
- Keep output_from='all_other' invalid so all_other remains intermediate-output-only and runtime classification still receives concrete executor-id sets.

Files changed:
- packages/core/agent_framework/_workflows/_workflow_builder.py
- packages/core/tests/workflow/test_output_executors_contract.py
- issues/done/003-all-other-intermediate-output-selection.md

Blockers/notes:
- Workflow-as-agent and orchestration parity remains for issue 004.
- Full documentation updates remain for issue 005.

* Add orchestration output selection parity

Key decisions:
- Expose output_from on sequential, concurrent, group chat, handoff, and magentic builders while keeping final_output_from as a deprecated compatibility alias.
- Resolve orchestration participant selections through the same explicit rules as workflows: output_from='all', intermediate_output_from='all_other', hidden unselected participant payloads, and overlap/duplicate/unknown/invalid-literal validation.
- Continue preserving documented orchestration defaults by always designating each pattern's terminal internal executor where applicable.

Files changed:
- packages/orchestrations/agent_framework_orchestrations/_participant_output_config.py
- packages/orchestrations/agent_framework_orchestrations/_sequential.py
- packages/orchestrations/agent_framework_orchestrations/_concurrent.py
- packages/orchestrations/agent_framework_orchestrations/_group_chat.py
- packages/orchestrations/agent_framework_orchestrations/_handoff.py
- packages/orchestrations/agent_framework_orchestrations/_magentic.py
- packages/orchestrations/agent_framework_orchestrations/_orchestration_request_info.py
- packages/orchestrations/tests/test_orchestration_intermediate_vs_terminal.py
- issues/done/004-workflow-as-agent-and-orchestration-parity.md

Blockers/notes:
- Full documentation and sample migration wording remains for issue 005.
- Existing tests that intentionally use final_output_from now emit the new deprecation warning.

* Document workflow output selection contract

Key decisions:
- Use Workflow Output and Intermediate Output as the developer-facing terms for selected caller-facing emissions.
- Document output_from and intermediate_output_from as the canonical API, with output_from as an allow-list and unselected payloads hidden unless explicitly selected as intermediate.
- Add scenario and invalid-selection tables for workflow and orchestration docs, including legacy omission warnings, output_from='all', intermediate_output_from='all_other', list selections, invalid literals, overlap, duplicates, unknown selections, and empty explicit selections.
- Migrate samples away from final_output_from and output_executors except where compatibility aliases are explicitly documented.

Files changed:
- packages/core/AGENTS.md
- packages/orchestrations/README.md
- packages/orchestrations/agent_framework_orchestrations/_handoff.py
- packages/orchestrations/agent_framework_orchestrations/_sequential.py
- samples/03-workflows/README.md
- samples/03-workflows/control-flow/intermediate_vs_terminal_outputs.py
- samples/03-workflows/human-in-the-loop/agents_with_approval_requests.py
- samples/03-workflows/orchestrations/README.md
- samples/04-hosting/foundry-hosted-agents/responses/05_workflows/main.py
- scripts/sample_validation/create_dynamic_workflow_executor.py
- issues/done/005-document-output-selection-contract.md

Blockers/notes:
- Direct full Ruff on scripts/sample_validation/create_dynamic_workflow_executor.py still reports pre-existing docstring/print/line-length issues outside this docs migration; syntax-focused checks for changed files pass.
- No remaining AFK issue files are present under issues/.

* Latest updates

* Typing fixes

* Cleanup
This commit is contained in:
Evan Mattson
2026-05-19 09:15:25 +09:00
committed by GitHub
Unverified
parent 3ebbdb01b4
commit 3bbc81554b
68 changed files with 3480 additions and 325 deletions
@@ -72,6 +72,17 @@ def _stringify_name(value: Any) -> str:
return value if isinstance(value, str) else str(value)
def _workflow_output_metadata(event_type: Any, executor_id: Any) -> dict[str, Any] | None:
"""Return metadata that preserves workflow yield designation on visible output."""
if event_type not in ("output", "intermediate", "data"):
return None
return {
"workflow_event_type": event_type,
"workflow_output_kind": "terminal" if event_type == "output" else "intermediate",
"executor_id": executor_id,
}
def _serialize_content_recursive(value: Any) -> Any:
"""Recursively serialize Agent Framework Content objects to JSON-compatible values.
@@ -200,15 +211,21 @@ class MessageMapper:
try:
from agent_framework import AgentResponse, AgentResponseUpdate, WorkflowEvent
# Handle WorkflowEvent with type='output' or 'data' wrapping AgentResponseUpdate
# This must be checked BEFORE generic WorkflowEvent check
# Note: AgentExecutor uses type='output' for streaming updates
if isinstance(raw_event, WorkflowEvent) and raw_event.type in ("output", "data"):
# Handle WorkflowEvent with type='output', 'intermediate', or 'data' wrapping
# AgentResponseUpdate. This must be checked BEFORE generic WorkflowEvent check.
# Note: AgentExecutor uses type='output' for streaming updates from designated
# executors and type='intermediate' from non-designated executors. type='data'
# is the deprecated legacy variant retained for backward compat.
if isinstance(raw_event, WorkflowEvent) and raw_event.type in ("output", "intermediate", "data"):
event_data = getattr(cast(Any, raw_event), "data", None)
if isinstance(event_data, AgentResponseUpdate):
# Preserve executor_id in context for proper output routing
context["current_executor_id"] = getattr(cast(Any, raw_event), "executor_id", None)
return await self._convert_agent_update(event_data, context)
context["current_workflow_event_type"] = raw_event.type
try:
return await self._convert_agent_update(event_data, context)
finally:
context.pop("current_workflow_event_type", None)
# Handle complete agent response (AgentResponse) - for non-streaming agent execution
if isinstance(raw_event, AgentResponse):
@@ -633,6 +650,13 @@ class MessageMapper:
# Check if we're in an executor context with an existing item
executor_id = context.get("current_executor_id")
executor_item_key = f"exec_item_{executor_id}" if executor_id else None
workflow_metadata = _workflow_output_metadata(context.get("current_workflow_event_type"), executor_id)
if has_text_content and workflow_metadata is not None:
current_metadata = context.get("current_message_workflow_metadata")
if current_metadata != workflow_metadata:
context.pop("current_message_id", None)
context["current_message_workflow_metadata"] = workflow_metadata
# If we have an executor item, use it for deltas instead of creating a message
if has_text_content and executor_item_key and executor_item_key in context:
@@ -644,6 +668,15 @@ class MessageMapper:
message_id = f"msg_{uuid4().hex[:8]}"
context["current_message_id"] = message_id
context["output_index"] = context.get("output_index", -1) + 1
message_item = ResponseOutputMessage(
type="message",
id=message_id,
role="assistant",
content=[],
status="in_progress",
)
if workflow_metadata is not None:
cast(Any, message_item).metadata = workflow_metadata
# Add message output item
events.append(
@@ -651,9 +684,7 @@ class MessageMapper:
type="response.output_item.added",
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
item=ResponseOutputMessage(
type="message", id=message_id, role="assistant", content=[], status="in_progress"
),
item=message_item,
)
)
@@ -675,17 +706,18 @@ class MessageMapper:
# Special handling for TextContent to use proper delta events
if content.type == "text" and "current_message_id" in context:
# Stream text content via proper delta events
events.append(
ResponseTextDeltaEvent(
type="response.output_text.delta",
output_index=context["output_index"],
content_index=context.get("content_index", 0),
item_id=context["current_message_id"],
delta=content.text,
logprobs=[], # We don't have logprobs from Agent Framework
sequence_number=self._next_sequence(context),
)
delta_event = ResponseTextDeltaEvent(
type="response.output_text.delta",
output_index=context["output_index"],
content_index=context.get("content_index", 0),
item_id=context["current_message_id"],
delta=content.text,
logprobs=[], # We don't have logprobs from Agent Framework
sequence_number=self._next_sequence(context),
)
if workflow_metadata is not None:
cast(Any, delta_event).metadata = workflow_metadata
events.append(delta_event)
elif content.type in self.content_mappers:
# Use existing mappers for other content types
mapped_events = await self.content_mappers[content.type](content, context)
@@ -899,10 +931,14 @@ class MessageMapper:
return events
# Handle output events separately to preserve output data
if event_type == "output":
# Handle yield events (output / intermediate / data) by extracting visible
# text from the payload. All three render as a visible message item so the
# gap that previously dropped intermediate yields into generic completed-
# trace events is closed.
if event_type in ("output", "intermediate", "data"):
output_data = getattr(event, "data", None)
executor_id = getattr(event, "executor_id", "unknown")
workflow_metadata = _workflow_output_metadata(event_type, executor_id)
if output_data is not None:
# Import required types
@@ -960,6 +996,8 @@ class MessageMapper:
content=[text_content],
status="completed",
)
if workflow_metadata is not None:
cast(Any, output_message).metadata = workflow_metadata
# Emit output_item.added for each yield_output
logger.debug(
@@ -96,6 +96,14 @@ function getStateBadgeClass(state: ExecutorState) {
}
}
function getMessageText(item: unknown): string {
const content = (item as { content?: Array<{ type: string; text?: string }> }).content;
return content
?.filter((content) => content.type === "output_text" && content.text)
.map((content) => content.text)
.join("\n") ?? "";
}
function ExecutorRunItem({
run,
isExpanded,
@@ -282,7 +290,12 @@ export function ExecutionTimeline({
});
} else if (item && item.type === "message" && "metadata" in item && item.id) {
// Handle message items from Magentic agents
const metadata = item.metadata as { agent_id?: string; source?: string } | undefined;
const metadata = item.metadata as {
agent_id?: string;
executor_id?: string;
source?: string;
workflow_output_kind?: string;
} | undefined;
if (metadata?.agent_id && metadata?.source === "magentic") {
const executorId = metadata.agent_id;
const itemId = item.id;
@@ -298,6 +311,21 @@ export function ExecutionTimeline({
timestamp: uiTimestamp,
runNumber,
});
} else if (metadata?.executor_id && metadata.workflow_output_kind === "intermediate") {
const executorId = metadata.executor_id;
const itemId = item.id;
const runNumber = (runCount.get(executorId) || 0) + 1;
runCount.set(executorId, runNumber);
runs.push({
executorId,
executorName: truncateText(executorId, 35),
itemId,
state: item.status === "completed" ? "completed" : "running",
output: itemOutputs[itemId] || getMessageText(item),
timestamp: uiTimestamp,
runNumber,
});
}
}
}
@@ -327,7 +355,12 @@ export function ExecutionTimeline({
}
} else if (item && item.type === "message" && "metadata" in item && item.id) {
// Handle message completion from Magentic agents
const metadata = item.metadata as { agent_id?: string; source?: string } | undefined;
const metadata = item.metadata as {
agent_id?: string;
executor_id?: string;
source?: string;
workflow_output_kind?: string;
} | undefined;
if (metadata?.agent_id && metadata?.source === "magentic") {
const itemId = item.id;
const existingRun = runs.find((r) => r.itemId === itemId);
@@ -336,6 +369,14 @@ export function ExecutionTimeline({
existingRun.state = item.status === "completed" ? "completed" : "failed";
existingRun.output = itemOutputs[itemId] || "";
}
} else if (metadata?.executor_id && metadata.workflow_output_kind === "intermediate") {
const itemId = item.id;
const existingRun = runs.find((r) => r.itemId === itemId);
if (existingRun) {
existingRun.state = item.status === "completed" ? "completed" : "failed";
existingRun.output = itemOutputs[itemId] || getMessageText(item);
}
}
}
}
@@ -663,6 +663,7 @@ export function WorkflowView({
item &&
item.type === "message" &&
(!("metadata" in item) || !(item.metadata as { source?: string } | undefined)?.source) &&
(item.metadata as { workflow_output_kind?: string } | undefined)?.workflow_output_kind !== "intermediate" &&
"content" in item &&
Array.isArray(item.content)
) {
@@ -1121,27 +1122,30 @@ export function WorkflowView({
// Handle workflow output messages
if (item && item.type === "message" && "content" in item && Array.isArray(item.content)) {
// Extract text from message content
for (const content of item.content as Array<{ type: string; text?: string }>) {
if (content.type === "output_text" && content.text) {
const text = content.text; // Capture for closure
// Append to workflow result (support multiple yield_output calls)
setWorkflowResult((prev) => {
if (prev && prev.length > 0) {
// If there's existing output, add separator
return prev + "\n\n" + text;
}
return text;
});
const metadata = item.metadata as { workflow_output_kind?: string } | undefined;
if (metadata?.workflow_output_kind !== "intermediate") {
// Extract text from message content
for (const content of item.content as Array<{ type: string; text?: string }>) {
if (content.type === "output_text" && content.text) {
const text = content.text; // Capture for closure
// Append to workflow result (support multiple yield_output calls)
setWorkflowResult((prev) => {
if (prev && prev.length > 0) {
// If there's existing output, add separator
return prev + "\n\n" + text;
}
return text;
});
// Try to parse as JSON for structured metadata
try {
const parsed = JSON.parse(text);
if (typeof parsed === "object" && parsed !== null) {
workflowMetadata.current = parsed;
// Try to parse as JSON for structured metadata
try {
const parsed = JSON.parse(text);
if (typeof parsed === "object" && parsed !== null) {
workflowMetadata.current = parsed;
}
} catch {
// Not JSON, keep as text
}
} catch {
// Not JSON, keep as text
}
}
}
@@ -376,6 +376,7 @@ export interface ResponseTextDeltaEvent extends ResponseStreamEvent {
content_index: number;
sequence_number: number;
logprobs: Record<string, unknown>[];
metadata?: Record<string, unknown>;
}
// OpenAI Response for non-streaming
@@ -397,6 +398,7 @@ export interface ResponseOutputMessage {
content: ResponseOutputText[];
id: string;
status: "completed" | "failed" | "in_progress";
metadata?: Record<string, unknown>;
}
export interface ResponseOutputText {
@@ -517,7 +517,8 @@ async def test_magentic_executor_event_with_agent_delta_metadata(
"""Test that WorkflowEvent[AgentResponseUpdate] with magentic_event_type='agent_delta' is handled correctly.
This tests the ACTUAL event format Magentic emits - not a fake MagenticAgentDeltaEvent class.
Magentic uses WorkflowEvent.emit() with additional_properties containing magentic_event_type.
Magentic emits type='intermediate' WorkflowEvent instances with additional_properties
containing magentic_event_type.
"""
from agent_framework._types import AgentResponseUpdate
from agent_framework._workflows._events import WorkflowEvent
@@ -532,7 +533,7 @@ async def test_magentic_executor_event_with_agent_delta_metadata(
"agent_id": "writer_agent",
},
)
event = WorkflowEvent.emit(executor_id="magentic_executor", data=update)
event = WorkflowEvent("intermediate", executor_id="magentic_executor", data=update)
events = await mapper.convert_event(event, test_request)
@@ -547,8 +548,8 @@ async def test_magentic_executor_event_with_agent_delta_metadata(
async def test_magentic_orchestrator_message_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that WorkflowEvent[AgentResponseUpdate] with magentic_event_type='orchestrator_message' is handled.
Magentic emits orchestrator planning/instruction messages using WorkflowEvent.emit()
with additional_properties containing magentic_event_type='orchestrator_message'.
Magentic emits orchestrator planning/instruction messages using type='intermediate'
WorkflowEvent instances with additional_properties containing magentic_event_type='orchestrator_message'.
"""
from agent_framework._types import AgentResponseUpdate
from agent_framework._workflows._events import WorkflowEvent
@@ -564,7 +565,7 @@ async def test_magentic_orchestrator_message_event(mapper: MessageMapper, test_r
"orchestrator_id": "magentic_orchestrator",
},
)
event = WorkflowEvent.emit(executor_id="magentic_orchestrator", data=update)
event = WorkflowEvent("intermediate", executor_id="magentic_orchestrator", data=update)
events = await mapper.convert_event(event, test_request)
@@ -595,7 +596,7 @@ async def test_magentic_events_use_same_event_class_as_other_workflows(
contents=[Content.from_text(text="Regular workflow response")],
role="assistant",
)
regular_event = WorkflowEvent.emit(executor_id="regular_executor", data=regular_update)
regular_event = WorkflowEvent("intermediate", executor_id="regular_executor", data=regular_update)
# 2. Magentic workflow (with additional_properties)
magentic_update = AgentResponseUpdate(
@@ -603,7 +604,7 @@ async def test_magentic_events_use_same_event_class_as_other_workflows(
role="assistant",
additional_properties={"magentic_event_type": "agent_delta"},
)
magentic_event = WorkflowEvent.emit(executor_id="magentic_executor", data=magentic_update)
magentic_event = WorkflowEvent("intermediate", executor_id="magentic_executor", data=magentic_update)
# Both should be the SAME class
assert type(regular_event) is type(magentic_event)
@@ -653,7 +654,7 @@ async def test_workflow_output_event(mapper: MessageMapper, test_request: AgentF
"""Test output event (type='output') is converted to output_item.added."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent.output(executor_id="final_executor", data="Final workflow output")
event = WorkflowEvent("output", executor_id="final_executor", data="Final workflow output")
events = await mapper.convert_event(event, test_request)
# output event (type='output') should emit output_item.added
@@ -662,6 +663,9 @@ async def test_workflow_output_event(mapper: MessageMapper, test_request: AgentF
# Check item contains the output text
item = events[0].item
assert item.type == "message"
assert item.metadata["workflow_event_type"] == "output"
assert item.metadata["workflow_output_kind"] == "terminal"
assert item.metadata["executor_id"] == "final_executor"
assert any("Final workflow output" in str(c) for c in item.content)
@@ -675,13 +679,104 @@ async def test_workflow_output_event_with_list_data(mapper: MessageMapper, test_
Message(role="user", contents=[Content.from_text(text="Hello")]),
Message(role="assistant", contents=[Content.from_text(text="World")]),
]
event = WorkflowEvent.output(executor_id="complete", data=messages)
event = WorkflowEvent("output", executor_id="complete", data=messages)
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
async def test_workflow_intermediate_event_with_agent_response_update_dispatched(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""A WorkflowEvent with type='intermediate' wrapping an AgentResponseUpdate is mapped
just like type='output' / type='data' — to OpenAI text-delta events."""
from agent_framework._workflows._events import WorkflowEvent
update = AgentResponseUpdate(
contents=[Content.from_text(text="intermediate progress")],
role="assistant",
author_name="non-designated-agent",
)
event = WorkflowEvent("intermediate", executor_id="non_designated", data=update)
events = await mapper.convert_event(event, test_request)
assert len(events) >= 1
added_events = [e for e in events if getattr(e, "type", "") == "response.output_item.added"]
assert added_events
item = added_events[0].item
assert item.metadata["workflow_event_type"] == "intermediate"
assert item.metadata["workflow_output_kind"] == "intermediate"
assert item.metadata["executor_id"] == "non_designated"
text_events = [e for e in events if getattr(e, "type", "") == "response.output_text.delta"]
assert len(text_events) >= 1
assert text_events[0].metadata["workflow_event_type"] == "intermediate"
assert text_events[0].metadata["workflow_output_kind"] == "intermediate"
assert text_events[0].metadata["executor_id"] == "non_designated"
assert text_events[0].delta == "intermediate progress"
async def test_workflow_intermediate_event_with_string_payload_renders_visible_text(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""A WorkflowEvent with type='intermediate' wrapping a plain string surfaces as a
visible output item — not a generic completed-trace event. Without this, executors
that ``await ctx.yield_output("plan: …")`` from non-designated nodes are silently
dropped in DevUI."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent("intermediate", executor_id="planner", data="plan: starting work")
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
item = events[0].item
assert item.type == "message"
assert item.metadata["workflow_event_type"] == "intermediate"
assert item.metadata["workflow_output_kind"] == "intermediate"
assert item.metadata["executor_id"] == "planner"
assert any("plan: starting work" in str(c) for c in item.content)
async def test_workflow_intermediate_event_with_message_payload_renders_visible_text(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""type='intermediate' wrapping a Message surfaces visibly — same path as type='output'."""
from agent_framework import Message
from agent_framework._workflows._events import WorkflowEvent
msg = Message(role="assistant", contents=[Content.from_text(text="research note")])
event = WorkflowEvent("intermediate", executor_id="researcher", data=msg)
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
item = events[0].item
assert item.metadata["workflow_event_type"] == "intermediate"
assert item.metadata["workflow_output_kind"] == "intermediate"
assert item.metadata["executor_id"] == "researcher"
assert any("research note" in str(c) for c in item.content)
async def test_workflow_data_event_keeps_intermediate_compatibility_metadata(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Deprecated type='data' workflow events remain visible and explicitly intermediate."""
from agent_framework._workflows._events import WorkflowEvent
with pytest.warns(DeprecationWarning):
event = WorkflowEvent.emit(executor_id="legacy", data="legacy progress")
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
item = events[0].item
assert item.metadata["workflow_event_type"] == "data"
assert item.metadata["workflow_output_kind"] == "intermediate"
assert item.metadata["executor_id"] == "legacy"
assert any("legacy progress" in str(c) for c in item.content)
# =============================================================================
# failed event (type='failed') Tests
# =============================================================================