Files
Evan Mattson 866a325b48 Python: [BREAKING] Standardize orchestration terminal outputs as AgentResponse (#5301)
* Fix orchestration outputs so as_agent() returns the final answer only. Align other orchestration outputs

* Fix orchestration output issues from review comments

1. Sample cleanup: Remove commented-out FoundryChatClient block and update
   prerequisites to reference OPENAI_CHAT_MODEL_ID instead of FOUNDRY_* vars.

2. Sequential approval output: Change _EndWithConversation.end_with_agent_executor_response
   from a no-op sink to yield response.agent_response. When the last participant is
   AgentApprovalExecutor (via with_request_info), _EndWithConversation is the output
   executor so the yield produces the terminal answer. When the last participant is a
   regular AgentExecutor, _EndWithConversation is not in output_executors so the yield
   is silently filtered out.

3. Forward data events through WorkflowExecutor: _process_workflow_result now also
   forwards 'data' events from sub-workflows so that emit_intermediate_data=True on
   AgentExecutor works correctly when wrapped in AgentApprovalExecutor.

4. Concurrent docstring: Update _AggregateAgentConversations docstring to say
   'deterministic participant order' instead of 'completion order'.

5. Add test_concurrent_intermediate_outputs_emits_data_events verifying that
   ConcurrentBuilder(intermediate_outputs=True) emits per-participant data events
   alongside the single aggregated output event.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add tests for sequential workflow with_request_info and intermediate_outputs (#5301)

Address PR review comments 2, 3, and 5:

- Add test_sequential_request_info_last_participant_emits_output:
  Verifies that when the last participant is wrapped via with_request_info()
  (AgentApprovalExecutor), the workflow still emits a terminal output after
  approval, exercising the _EndWithConversation.end_with_agent_executor_response
  fallback path.

- Add test_sequential_request_info_with_intermediate_outputs_emits_data_events:
  Verifies that emit_intermediate_data=True works correctly through
  AgentApprovalExecutor wrapping—WorkflowExecutor._process_result already
  forwards data events from sub-workflows, so intermediate agent responses
  surface as data events in the parent workflow.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright type errors from AgentResponse output refactor (#5301)

Update cast() calls in _group_chat.py and _magentic.py to use
WorkflowContext[Never, AgentResponse] instead of the old
WorkflowContext[Never, list[Message]], matching the updated method
signatures in _base_group_chat_orchestrator.py.

Fix _sequential.py _EndWithConversation.end_with_agent_executor_response
to declare WorkflowContext[Any, AgentResponse] so yield_output accepts
AgentResponse[None].

Fix _workflow_executor.py data event forwarding to handle nullable
executor_id.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright reportUnknownVariableType in _agent.py (#5301)

Extract event.data into a typed local variable before the isinstance
check to avoid pyright narrowing it to AgentResponse[Unknown].

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright reportMissingImports for orjson in file history samples (#5301)

Add pyright: ignore[reportMissingImports] to orjson imports that are
already guarded by try/except ImportError, matching the existing pattern
used elsewhere in the samples.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review feedback for #5301: review comment fixes

* Address review feedback for #5301: review comment fixes

* Revert sequential_workflow_as_agent sample to FoundryChatClient

Reverts the mistaken switch from FoundryChatClient to OpenAIChatClient
in the sequential workflow as agent sample.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address ultrareview feedback: emit_data_events rename + WorkflowAgent reasoning conversion

Layered on top of the prior review-feedback work in this branch.

Renames:
- AgentExecutor.emit_intermediate_data -> emit_data_events (mechanical
  rename; orchestration semantics live at the orchestration layer, not
  the general-purpose executor). Forwarded through MagenticAgentExecutor,
  AgentApprovalExecutor, and all orchestration call sites.
- HandoffAgentExecutor._check_terminate_and_yield -> _should_terminate
  (pure predicate; no longer yields anything). HandoffBuilder docstring
  rewritten to describe the new per-agent AgentResponse output contract.

WorkflowAgent reasoning-content conversion:
- Add _rewrite_text_to_reasoning(contents) and _msg_as_reasoning(msg)
  helpers; the as_agent() path now reframes text content from data events
  as text_reasoning Content blocks before merging into the AgentResponse.
- Consumers iterate msg.contents and branch on content.type — same path
  they already use for Claude thinking and OpenAI reasoning. No new
  field on Message/AgentResponse/WorkflowEvent.
- Streaming branch constructs fresh AgentResponseUpdate instances instead
  of mutating shared payloads (regression test added).
- Helper _msg_maybe_reasoning consolidates the conditional rewrite at
  three call sites in the non-streaming conversion.

Tests:
- TestWorkflowAgentReasoningHelpers + TestWorkflowAgentDataEventReasoningConversion
  add 9 new tests covering helpers, non-streaming, streaming, mixed content,
  already-reasoning passthrough, and mutation-safety regression.
- Updated test_sequential_as_agent_with_intermediate_outputs_includes_chain
  to assert text_reasoning content for intermediate agents.

* Fix pyright: widen event.data to Any to avoid partial-unknown narrowing

The streaming conversion path narrowed event.data via isinstance against
generic AgentResponse, producing AgentResponse[Unknown] and tripping
reportUnknownVariableType/reportUnknownMemberType. Binding data: Any
before the check keeps runtime behavior identical while restoring a fully
known type for downstream access.

* Clean up design

* Scope to agent output semantics only

* yield AgentResponseUpdate streaming, AgentResponse non-streaming

* Fix mypy/pyright: widen cast types at GroupChat callsites

Eight callsites in _group_chat.py still cast to WorkflowContext[Never,
AgentResponse] but the base orchestrator methods now accept the wider
WorkflowContext[Never, AgentResponse | AgentResponseUpdate] (mode-aware
yields). W_OutT is invariant, so the narrower cast is not assignable.
Magentic was widened in the same commit; this catches the GroupChat
callsites that were missed.

* Python: skip flaky Foundry / Foundry Hosting integration tests (#5553)

These two integration tests have been failing in the merge queue across
multiple unrelated PRs (5301, 5531). Both are marked `@pytest.mark.flaky`
with 3 retries, but all attempts fail back-to-back. Skipping both with a
reason pointing to #5553 so they can be fixed properly without continuing
to block unrelated merges.

- packages/foundry_hosting/tests/test_responses_int.py::TestOptions::test_temperature_and_max_tokens
- packages/foundry/tests/foundry/test_foundry_embedding_client.py::TestFoundryEmbeddingIntegration::test_text_embedding_live

Also includes a one-line uv.lock specifier-ordering normalization
auto-applied by the poe-check pre-commit hook.

---------

Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-29 00:35:36 +00:00

477 lines
19 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
from collections.abc import AsyncIterable, Awaitable
from typing import Any, Literal, overload
import pytest
from agent_framework import (
AgentExecutorResponse,
AgentResponse,
AgentResponseUpdate,
AgentRunInputs,
AgentSession,
BaseAgent,
Content,
Executor,
Message,
ResponseStream,
TypeCompatibilityError,
WorkflowContext,
WorkflowRunState,
handler,
)
from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage
from agent_framework.orchestrations import SequentialBuilder
from typing_extensions import Never
class _EchoAgent(BaseAgent):
"""Simple agent that appends a single assistant message with its name."""
@overload
def run(
self,
messages: AgentRunInputs | None = ...,
*,
stream: Literal[False] = ...,
session: AgentSession | None = ...,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]]: ...
@overload
def run(
self,
messages: AgentRunInputs | None = ...,
*,
stream: Literal[True],
session: AgentSession | None = ...,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...
def run(
self,
messages: AgentRunInputs | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
if stream:
async def _stream() -> AsyncIterable[AgentResponseUpdate]:
yield AgentResponseUpdate(contents=[Content.from_text(text=f"{self.name} reply")])
return ResponseStream(_stream(), finalizer=AgentResponse.from_updates)
async def _run() -> AgentResponse:
return AgentResponse(messages=[Message("assistant", [f"{self.name} reply"])])
return _run()
class _SummarizerTerminator(Executor):
"""Custom-executor terminator that yields a synthesized summary as the workflow's final answer."""
@handler
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse],
) -> None:
conversation = agent_response.full_conversation or []
user_texts = [m.text for m in conversation if m.role == "user"]
agents = [m.author_name or m.role for m in conversation if m.role == "assistant"]
summary = Message("assistant", [f"Summary of users:{len(user_texts)} agents:{len(agents)}"])
await ctx.yield_output(AgentResponse(messages=[summary]))
class _InvalidExecutor(Executor):
"""Invalid executor that does not have a handler that accepts a list of chat messages"""
@handler
async def summarize(self, conversation: list[str], ctx: WorkflowContext[list[Message]]) -> None:
pass
def test_sequential_builder_rejects_empty_participants() -> None:
with pytest.raises(ValueError):
SequentialBuilder(participants=[])
def test_sequential_builder_validation_rejects_invalid_executor() -> None:
"""Test that adding an invalid executor to the builder raises an error."""
with pytest.raises(TypeCompatibilityError):
SequentialBuilder(participants=[_EchoAgent(id="agent1", name="A1"), _InvalidExecutor(id="invalid")]).build()
async def test_sequential_streaming_yields_only_last_agent_updates() -> None:
"""Streaming mode surfaces only the last agent's AgentResponseUpdate chunks as outputs.
Intermediate agents do NOT emit `output` events; only the last agent (the workflow's
output_executor) emits chunks of the final answer.
"""
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder(participants=[a1, a2]).build()
completed = False
update_events: list[AgentResponseUpdate] = []
async for ev in wf.run("hello sequential", stream=True):
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
completed = True
elif ev.type == "output":
update_events.append(ev.data) # type: ignore[arg-type]
if completed:
break
assert completed
# Only the last agent's streaming chunks surface as `output` events.
assert update_events, "Expected at least one streaming update from the last agent"
for upd in update_events:
assert isinstance(upd, AgentResponseUpdate)
combined_text = "".join(u.text for u in update_events if hasattr(u, "text"))
assert "A2 reply" in combined_text
assert "A1 reply" not in combined_text
async def test_sequential_non_streaming_yields_only_last_agent_response() -> None:
"""Non-streaming mode emits a single `output` event with the last agent's AgentResponse."""
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder(participants=[a1, a2]).build()
output_events = [ev for ev in await wf.run("hello sequential") if ev.type == "output"]
assert len(output_events) == 1
response = output_events[0].data
assert isinstance(response, AgentResponse)
assert all(m.role == "assistant" for m in response.messages)
combined = " ".join(m.text for m in response.messages)
assert "A2 reply" in combined
assert "A1 reply" not in combined
async def test_sequential_as_agent_returns_only_last_agent_response() -> None:
"""`workflow.as_agent().run(prompt)` returns ONLY the last agent's messages — not the user
input or earlier agents' replies. This is the core fix for the orchestration-as-agent
output contract."""
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
agent = SequentialBuilder(participants=[a1, a2]).build().as_agent()
response = await agent.run("hello as_agent")
assert isinstance(response, AgentResponse)
# Only the last agent's reply — no user prompt, no agent1 messages.
combined = " ".join(m.text for m in response.messages)
assert "A2 reply" in combined
assert "A1 reply" not in combined
assert "hello as_agent" not in combined
async def test_sequential_with_custom_executor_summary() -> None:
"""A custom-executor terminator yields its own AgentResponse — that becomes the workflow output.
Custom executors used as the terminator must call `ctx.yield_output(AgentResponse(...))`
directly (rather than `ctx.send_message(list[Message])` like an intermediate executor would),
because the terminator IS the workflow's output executor.
"""
a1 = _EchoAgent(id="agent1", name="A1")
summarizer = _SummarizerTerminator(id="summarizer")
wf = SequentialBuilder(participants=[a1, summarizer]).build()
output_events = [ev for ev in await wf.run("topic X") if ev.type == "output"]
assert len(output_events) == 1
response = output_events[0].data
assert isinstance(response, AgentResponse)
assert len(response.messages) == 1
assert response.messages[0].role == "assistant"
assert response.messages[0].text.startswith("Summary of users:")
async def test_sequential_checkpoint_resume_round_trip() -> None:
storage = InMemoryCheckpointStorage()
initial_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf = SequentialBuilder(participants=list(initial_agents), checkpoint_storage=storage).build()
baseline_updates: list[AgentResponseUpdate] = []
async for ev in wf.run("checkpoint sequential", stream=True):
if ev.type == "output":
baseline_updates.append(ev.data) # type: ignore[arg-type]
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
break
assert baseline_updates
checkpoints = await storage.list_checkpoints(workflow_name=wf.name)
assert checkpoints
checkpoints.sort(key=lambda cp: cp.timestamp)
resume_checkpoint = checkpoints[0]
resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf_resume = SequentialBuilder(participants=list(resumed_agents), checkpoint_storage=storage).build()
resumed_updates: list[AgentResponseUpdate] = []
async for ev in wf_resume.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True):
if ev.type == "output":
resumed_updates.append(ev.data) # type: ignore[arg-type]
if ev.type == "status" and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
):
break
assert resumed_updates
baseline_text = "".join(u.text for u in baseline_updates if hasattr(u, "text"))
resumed_text = "".join(u.text for u in resumed_updates if hasattr(u, "text"))
assert baseline_text == resumed_text
async def test_sequential_checkpoint_runtime_only() -> None:
"""Test checkpointing configured ONLY at runtime, not at build time."""
storage = InMemoryCheckpointStorage()
agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf = SequentialBuilder(participants=list(agents)).build()
baseline_updates: list[AgentResponseUpdate] = []
async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True):
if ev.type == "output":
baseline_updates.append(ev.data) # type: ignore[arg-type]
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
break
assert baseline_updates
checkpoints = await storage.list_checkpoints(workflow_name=wf.name)
assert checkpoints
checkpoints.sort(key=lambda cp: cp.timestamp)
resume_checkpoint = checkpoints[0]
resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf_resume = SequentialBuilder(participants=list(resumed_agents)).build()
resumed_updates: list[AgentResponseUpdate] = []
async for ev in wf_resume.run(
checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage, stream=True
):
if ev.type == "output":
resumed_updates.append(ev.data) # type: ignore[arg-type]
if ev.type == "status" and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
):
break
assert resumed_updates
baseline_text = "".join(u.text for u in baseline_updates if hasattr(u, "text"))
resumed_text = "".join(u.text for u in resumed_updates if hasattr(u, "text"))
assert baseline_text == resumed_text
async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None:
"""Test that runtime checkpoint storage overrides build-time configuration."""
import tempfile
with tempfile.TemporaryDirectory() as temp_dir1, tempfile.TemporaryDirectory() as temp_dir2:
from agent_framework._workflows._checkpoint import FileCheckpointStorage
buildtime_storage = FileCheckpointStorage(temp_dir1)
runtime_storage = FileCheckpointStorage(temp_dir2)
agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf = SequentialBuilder(participants=list(agents), checkpoint_storage=buildtime_storage).build()
baseline_output: list[Message] | None = None
async for ev in wf.run("override test", checkpoint_storage=runtime_storage, stream=True):
if ev.type == "output":
baseline_output = ev.data # type: ignore[assignment]
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
break
assert baseline_output is not None
buildtime_checkpoints = await buildtime_storage.list_checkpoints(workflow_name=wf.name)
runtime_checkpoints = await runtime_storage.list_checkpoints(workflow_name=wf.name)
assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints"
assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden"
async def test_sequential_builder_reusable_after_build_with_participants() -> None:
"""Test that the builder can be reused to build multiple identical workflows with participants()."""
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
builder = SequentialBuilder(participants=[a1, a2])
# Build first workflow
builder.build()
assert builder._participants[0] is a1 # type: ignore
assert builder._participants[1] is a2 # type: ignore
# ---------------------------------------------------------------------------
# chain_only_agent_responses tests
# ---------------------------------------------------------------------------
class _CapturingAgent(BaseAgent):
"""Agent that records the messages it received and returns a configurable reply."""
def __init__(self, *, reply_text: str = "reply", **kwargs: Any):
super().__init__(**kwargs)
self.reply_text = reply_text
self.last_messages: list[Message] = []
@overload
def run(
self,
messages: AgentRunInputs | None = ...,
*,
stream: Literal[False] = ...,
session: AgentSession | None = ...,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]]: ...
@overload
def run(
self,
messages: AgentRunInputs | None = ...,
*,
stream: Literal[True],
session: AgentSession | None = ...,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...
def run(
self,
messages: AgentRunInputs | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
captured: list[Message] = []
if messages:
for m in messages: # type: ignore[union-attr]
if isinstance(m, Message):
captured.append(m)
elif isinstance(m, str):
captured.append(Message("user", [m]))
self.last_messages = captured
if stream:
async def _stream() -> AsyncIterable[AgentResponseUpdate]:
yield AgentResponseUpdate(contents=[Content.from_text(text=self.reply_text)])
return ResponseStream(_stream(), finalizer=AgentResponse.from_updates)
async def _run() -> AgentResponse:
return AgentResponse(messages=[Message("assistant", [self.reply_text])])
return _run()
async def test_chain_only_agent_responses_false_passes_full_conversation() -> None:
"""Default (chain_only_agent_responses=False) passes full conversation to the second agent."""
a1 = _CapturingAgent(id="agent1", name="A1", reply_text="A1 reply")
a2 = _CapturingAgent(id="agent2", name="A2", reply_text="A2 reply")
wf = SequentialBuilder(participants=[a1, a2], chain_only_agent_responses=False).build()
async for ev in wf.run("hello", stream=True):
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
break
# Second agent should see full conversation: [user("hello"), assistant("A1 reply")]
seen = a2.last_messages
assert len(seen) == 2
assert seen[0].role == "user" and "hello" in (seen[0].text or "")
assert seen[1].role == "assistant" and "A1 reply" in (seen[1].text or "")
async def test_chain_only_agent_responses_true_passes_only_agent_messages() -> None:
"""chain_only_agent_responses=True passes only the previous agent's response messages."""
a1 = _CapturingAgent(id="agent1", name="A1", reply_text="A1 reply")
a2 = _CapturingAgent(id="agent2", name="A2", reply_text="A2 reply")
wf = SequentialBuilder(participants=[a1, a2], chain_only_agent_responses=True).build()
async for ev in wf.run("hello", stream=True):
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
break
# Second agent should see only the assistant message: [assistant("A1 reply")]
seen = a2.last_messages
assert len(seen) == 1
assert seen[0].role == "assistant" and "A1 reply" in (seen[0].text or "")
async def test_chain_only_agent_responses_three_agents() -> None:
"""chain_only_agent_responses=True with three agents: each sees only the prior agent's reply."""
a1 = _CapturingAgent(id="agent1", name="A1", reply_text="A1 reply")
a2 = _CapturingAgent(id="agent2", name="A2", reply_text="A2 reply")
a3 = _CapturingAgent(id="agent3", name="A3", reply_text="A3 reply")
wf = SequentialBuilder(participants=[a1, a2, a3], chain_only_agent_responses=True).build()
async for ev in wf.run("hello", stream=True):
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
break
# a2 should see only A1's reply
assert len(a2.last_messages) == 1
assert a2.last_messages[0].role == "assistant" and "A1 reply" in (a2.last_messages[0].text or "")
# a3 should see only A2's reply
assert len(a3.last_messages) == 1
assert a3.last_messages[0].role == "assistant" and "A2 reply" in (a3.last_messages[0].text or "")
# ---------------------------------------------------------------------------
# with_request_info tests
# ---------------------------------------------------------------------------
async def test_sequential_request_info_last_participant_emits_output() -> None:
"""When the last participant is wrapped via with_request_info(), the workflow
still emits a terminal output event after approval.
This exercises the _EndWithConversation.end_with_agent_executor_response path
that converts the AgentApprovalExecutor's forwarded AgentExecutorResponse into
the workflow's final AgentResponse output.
"""
from agent_framework_orchestrations._orchestration_request_info import AgentRequestInfoResponse
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder(participants=[a1, a2]).with_request_info().build()
# First run: collect request_info events for both agents
request_events: list[Any] = []
async for ev in wf.run("hello with approval", stream=True):
if ev.type == "request_info" and isinstance(ev.data, AgentExecutorResponse):
request_events.append(ev)
# Approve each agent in sequence until the workflow completes
while request_events:
responses = {req.request_id: AgentRequestInfoResponse.approve() for req in request_events}
request_events = []
output_events: list[Any] = []
async for ev in wf.run(stream=True, responses=responses):
if ev.type == "request_info" and isinstance(ev.data, AgentExecutorResponse):
request_events.append(ev)
elif ev.type == "output":
output_events.append(ev)
# The workflow must produce a terminal output with the last agent's response.
assert len(output_events) == 1
response = output_events[0].data
assert isinstance(response, AgentResponse)
assert any("A2 reply" in m.text for m in response.messages)