Python: [BREAKING] Standardize orchestration terminal outputs as AgentResponse (#5301)

* Fix orchestration outputs so as_agent() returns the final answer only. Align other orchestration outputs

* Fix orchestration output issues from review comments

1. Sample cleanup: Remove commented-out FoundryChatClient block and update
   prerequisites to reference OPENAI_CHAT_MODEL_ID instead of FOUNDRY_* vars.

2. Sequential approval output: Change _EndWithConversation.end_with_agent_executor_response
   from a no-op sink to yield response.agent_response. When the last participant is
   AgentApprovalExecutor (via with_request_info), _EndWithConversation is the output
   executor so the yield produces the terminal answer. When the last participant is a
   regular AgentExecutor, _EndWithConversation is not in output_executors so the yield
   is silently filtered out.

3. Forward data events through WorkflowExecutor: _process_workflow_result now also
   forwards 'data' events from sub-workflows so that emit_intermediate_data=True on
   AgentExecutor works correctly when wrapped in AgentApprovalExecutor.

4. Concurrent docstring: Update _AggregateAgentConversations docstring to say
   'deterministic participant order' instead of 'completion order'.

5. Add test_concurrent_intermediate_outputs_emits_data_events verifying that
   ConcurrentBuilder(intermediate_outputs=True) emits per-participant data events
   alongside the single aggregated output event.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add tests for sequential workflow with_request_info and intermediate_outputs (#5301)

Address PR review comments 2, 3, and 5:

- Add test_sequential_request_info_last_participant_emits_output:
  Verifies that when the last participant is wrapped via with_request_info()
  (AgentApprovalExecutor), the workflow still emits a terminal output after
  approval, exercising the _EndWithConversation.end_with_agent_executor_response
  fallback path.

- Add test_sequential_request_info_with_intermediate_outputs_emits_data_events:
  Verifies that emit_intermediate_data=True works correctly through
  AgentApprovalExecutor wrapping—WorkflowExecutor._process_result already
  forwards data events from sub-workflows, so intermediate agent responses
  surface as data events in the parent workflow.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright type errors from AgentResponse output refactor (#5301)

Update cast() calls in _group_chat.py and _magentic.py to use
WorkflowContext[Never, AgentResponse] instead of the old
WorkflowContext[Never, list[Message]], matching the updated method
signatures in _base_group_chat_orchestrator.py.

Fix _sequential.py _EndWithConversation.end_with_agent_executor_response
to declare WorkflowContext[Any, AgentResponse] so yield_output accepts
AgentResponse[None].

Fix _workflow_executor.py data event forwarding to handle nullable
executor_id.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright reportUnknownVariableType in _agent.py (#5301)

Extract event.data into a typed local variable before the isinstance
check to avoid pyright narrowing it to AgentResponse[Unknown].

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright reportMissingImports for orjson in file history samples (#5301)

Add pyright: ignore[reportMissingImports] to orjson imports that are
already guarded by try/except ImportError, matching the existing pattern
used elsewhere in the samples.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review feedback for #5301: review comment fixes

* Address review feedback for #5301: review comment fixes

* Revert sequential_workflow_as_agent sample to FoundryChatClient

Reverts the mistaken switch from FoundryChatClient to OpenAIChatClient
in the sequential workflow as agent sample.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address ultrareview feedback: emit_data_events rename + WorkflowAgent reasoning conversion

Layered on top of the prior review-feedback work in this branch.

Renames:
- AgentExecutor.emit_intermediate_data -> emit_data_events (mechanical
  rename; orchestration semantics live at the orchestration layer, not
  the general-purpose executor). Forwarded through MagenticAgentExecutor,
  AgentApprovalExecutor, and all orchestration call sites.
- HandoffAgentExecutor._check_terminate_and_yield -> _should_terminate
  (pure predicate; no longer yields anything). HandoffBuilder docstring
  rewritten to describe the new per-agent AgentResponse output contract.

WorkflowAgent reasoning-content conversion:
- Add _rewrite_text_to_reasoning(contents) and _msg_as_reasoning(msg)
  helpers; the as_agent() path now reframes text content from data events
  as text_reasoning Content blocks before merging into the AgentResponse.
- Consumers iterate msg.contents and branch on content.type — same path
  they already use for Claude thinking and OpenAI reasoning. No new
  field on Message/AgentResponse/WorkflowEvent.
- Streaming branch constructs fresh AgentResponseUpdate instances instead
  of mutating shared payloads (regression test added).
- Helper _msg_maybe_reasoning consolidates the conditional rewrite at
  three call sites in the non-streaming conversion.

Tests:
- TestWorkflowAgentReasoningHelpers + TestWorkflowAgentDataEventReasoningConversion
  add 9 new tests covering helpers, non-streaming, streaming, mixed content,
  already-reasoning passthrough, and mutation-safety regression.
- Updated test_sequential_as_agent_with_intermediate_outputs_includes_chain
  to assert text_reasoning content for intermediate agents.

* Fix pyright: widen event.data to Any to avoid partial-unknown narrowing

The streaming conversion path narrowed event.data via isinstance against
generic AgentResponse, producing AgentResponse[Unknown] and tripping
reportUnknownVariableType/reportUnknownMemberType. Binding data: Any
before the check keeps runtime behavior identical while restoring a fully
known type for downstream access.

* Clean up design

* Scope to agent output semantics only

* yield AgentResponseUpdate streaming, AgentResponse non-streaming

* Fix mypy/pyright: widen cast types at GroupChat callsites

Eight callsites in _group_chat.py still cast to WorkflowContext[Never,
AgentResponse] but the base orchestrator methods now accept the wider
WorkflowContext[Never, AgentResponse | AgentResponseUpdate] (mode-aware
yields). W_OutT is invariant, so the narrower cast is not assignable.
Magentic was widened in the same commit; this catches the GroupChat
callsites that were missed.

* Python: skip flaky Foundry / Foundry Hosting integration tests (#5553)

These two integration tests have been failing in the merge queue across
multiple unrelated PRs (5301, 5531). Both are marked `@pytest.mark.flaky`
with 3 retries, but all attempts fail back-to-back. Skipping both with a
reason pointing to #5553 so they can be fixed properly without continuing
to block unrelated merges.

- packages/foundry_hosting/tests/test_responses_int.py::TestOptions::test_temperature_and_max_tokens
- packages/foundry/tests/foundry/test_foundry_embedding_client.py::TestFoundryEmbeddingIntegration::test_text_embedding_live

Also includes a one-line uv.lock specifier-ordering normalization
auto-applied by the poe-check pre-commit hook.

---------

Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Evan Mattson
2026-04-29 09:35:36 +09:00
committed by GitHub
Unverified
parent 40e90c96c3
commit 866a325b48
22 changed files with 785 additions and 490 deletions
+4
View File
@@ -242,3 +242,7 @@ python/dotnet-ref
# Generated filtered solution files (created by eng/scripts/New-FilteredSolution.ps1)
dotnet/filtered-*.slnx
**/*.lscache
# Local tool state
.omc/
.omx/
@@ -528,6 +528,7 @@ class WorkflowAgent(BaseAgent):
raw_representations.append(output_event)
else:
data = output_event.data
if isinstance(data, AgentResponseUpdate):
# We cannot support AgentResponseUpdate in non-streaming mode. This is because the message
# sequence cannot be guaranteed when there are streaming updates in between non-streaming
@@ -628,16 +629,23 @@ class WorkflowAgent(BaseAgent):
A list of AgentResponseUpdate objects. Empty list if the event is not relevant.
"""
if event.type == "output":
# Convert workflow output to agent response updates.
# Handle different data types appropriately.
data = event.data
executor_id = event.executor_id
if isinstance(data, AgentResponseUpdate):
# Pass through AgentResponseUpdate directly (streaming from AgentExecutor)
if not data.author_name:
data.author_name = executor_id
return [data]
# Construct a fresh AgentResponseUpdate so we don't mutate a payload
# that AgentExecutor still holds a reference to in its `updates` list.
return [
AgentResponseUpdate(
contents=list(data.contents),
role=data.role,
author_name=data.author_name or executor_id,
response_id=data.response_id,
message_id=data.message_id,
created_at=data.created_at,
raw_representation=data.raw_representation,
)
]
if isinstance(data, AgentResponse):
# Convert each message in AgentResponse to an AgentResponseUpdate
updates: list[AgentResponseUpdate] = []
@@ -156,8 +156,9 @@ class AgentExecutor(Executor):
the agent run.
- "custom": use the provided context_filter function to determine which messages to include
as context for the agent run.
context_filter: An optional function for filtering conversation context when context_mode is set
to "custom".
context_filter: A function that takes the full conversation (list of Messages) as input and returns
a filtered list of Messages to be used as context for the agent run. This is required
if context_mode is set to "custom".
"""
# Prefer provided id; else use agent.name if present; else generate deterministic prefix
exec_id = id or resolve_agent_id(agent)
@@ -361,7 +361,7 @@ class WorkflowExecutor(Executor):
return any(is_instance_of(message.data, input_type) for input_type in self.workflow.input_types)
@handler
async def process_workflow(self, input_data: object, ctx: WorkflowContext[Any]) -> None:
async def process_workflow(self, input_data: object, ctx: WorkflowContext[Any, Any]) -> None:
"""Execute the sub-workflow with raw input data.
This handler starts a new sub-workflow execution. When the sub-workflow
@@ -428,7 +428,7 @@ class WorkflowExecutor(Executor):
async def handle_message_wrapped_request_response(
self,
response: SubWorkflowResponseMessage,
ctx: WorkflowContext[Any],
ctx: WorkflowContext[Any, Any],
) -> None:
"""Handle response from parent for a forwarded request.
@@ -232,16 +232,18 @@ async def test_groupchat_kwargs_flow_to_agents() -> None:
async def test_kwargs_stored_in_state() -> None:
"""Test that function_invocation_kwargs are stored in State with the correct key."""
from agent_framework import Executor, WorkflowContext, handler
from typing_extensions import Never
from agent_framework import AgentResponse, Executor, WorkflowContext, handler
stored_kwargs: dict[str, Any] | None = None
class _StateInspector(Executor):
@handler
async def inspect(self, msgs: list[Message], ctx: WorkflowContext[list[Message]]) -> None:
async def inspect(self, msgs: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None:
nonlocal stored_kwargs
stored_kwargs = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY)
await ctx.send_message(msgs)
await ctx.yield_output(AgentResponse(messages=msgs))
inspector = _StateInspector(id="inspector")
workflow = SequentialBuilder(participants=[inspector]).build()
@@ -256,16 +258,18 @@ async def test_kwargs_stored_in_state() -> None:
async def test_empty_kwargs_stored_as_empty_dict() -> None:
"""Test that empty kwargs are stored as empty dict in State."""
from agent_framework import Executor, WorkflowContext, handler
from typing_extensions import Never
from agent_framework import AgentResponse, Executor, WorkflowContext, handler
stored_kwargs: Any = "NOT_CHECKED"
class _StateChecker(Executor):
@handler
async def check(self, msgs: list[Message], ctx: WorkflowContext[list[Message]]) -> None:
async def check(self, msgs: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None:
nonlocal stored_kwargs
stored_kwargs = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY)
await ctx.send_message(msgs)
await ctx.yield_output(AgentResponse(messages=msgs))
checker = _StateChecker(id="checker")
workflow = SequentialBuilder(participants=[checker]).build()
@@ -695,7 +699,9 @@ async def test_subworkflow_kwargs_accessible_via_state() -> None:
Verifies that WORKFLOW_RUN_KWARGS_KEY is populated in the subworkflow's State
with kwargs from the parent workflow.
"""
from agent_framework import Executor, WorkflowContext, handler
from typing_extensions import Never
from agent_framework import AgentResponse, Executor, WorkflowContext, handler
from agent_framework._workflows._workflow_executor import WorkflowExecutor
captured_kwargs_from_state: list[dict[str, Any]] = []
@@ -704,10 +710,10 @@ async def test_subworkflow_kwargs_accessible_via_state() -> None:
"""Executor that reads kwargs from State for verification."""
@handler
async def read_kwargs(self, msgs: list[Message], ctx: WorkflowContext[list[Message]]) -> None:
async def read_kwargs(self, msgs: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None:
kwargs_from_state = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY)
captured_kwargs_from_state.append(kwargs_from_state or {})
await ctx.send_message(msgs)
await ctx.yield_output(AgentResponse(messages=msgs))
# Build inner workflow with State reader
state_reader = _StateReader(id="state_reader")
@@ -303,6 +303,7 @@ skip_if_foundry_inference_integration_tests_disabled = pytest.mark.skipif(
class TestFoundryEmbeddingIntegration:
"""Integration tests requiring a live Foundry inference endpoint."""
@pytest.mark.skip(reason="Flaky in merge queue, blocking unrelated PRs. Tracked in #5553.")
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_inference_integration_tests_disabled
@@ -559,6 +559,7 @@ class TestToolCalling:
class TestOptions:
"""Verify chat options are passed through to the model."""
@pytest.mark.skip(reason="Flaky in merge queue, blocking unrelated PRs. Tracked in #5553.")
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
@@ -12,7 +12,7 @@ from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass
from typing import Any, ClassVar, TypeAlias
from agent_framework._types import Message
from agent_framework._types import AgentResponse, AgentResponseUpdate, Message
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
from agent_framework._workflows._events import WorkflowEvent
from agent_framework._workflows._executor import Executor, handler
@@ -351,8 +351,10 @@ class BaseGroupChatOrchestrator(Executor, ABC):
result = await result
return result
async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool:
"""Check termination conditions and yield completion if met.
async def _check_terminate_and_yield(
self, ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate]
) -> bool:
"""Check termination conditions and yield the completion message if met.
Args:
ctx: Workflow context for yielding output
@@ -362,12 +364,37 @@ class BaseGroupChatOrchestrator(Executor, ABC):
"""
terminate = await self._check_termination()
if terminate:
self._append_messages([self._create_completion_message(self.TERMINATION_CONDITION_MET_MESSAGE)])
await ctx.yield_output(self._full_conversation)
completion_message = self._create_completion_message(self.TERMINATION_CONDITION_MET_MESSAGE)
self._append_messages([completion_message])
await self._yield_completion(ctx, completion_message)
return True
return False
async def _yield_completion(
self,
ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate],
completion_message: Message,
) -> None:
"""Yield a synthesized terminal completion message in the right shape for the run mode.
Mode-aware to mirror ``AgentExecutor`` semantics:
- Streaming (``ctx.is_streaming()``): yield a single ``AgentResponseUpdate`` so the
``output`` event stream stays uniformly per-chunk.
- Non-streaming: yield the full ``AgentResponse``.
"""
if ctx.is_streaming():
await ctx.yield_output(
AgentResponseUpdate(
contents=list(completion_message.contents),
role=completion_message.role,
author_name=completion_message.author_name,
message_id=completion_message.message_id,
)
)
else:
await ctx.yield_output(AgentResponse(messages=[completion_message]))
def _create_completion_message(self, message: str) -> Message:
"""Create a standardized completion message.
@@ -490,8 +517,10 @@ class BaseGroupChatOrchestrator(Executor, ABC):
return False
async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool:
"""Check round limit and yield completion if reached.
async def _check_round_limit_and_yield(
self, ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate]
) -> bool:
"""Check round limit and yield the max-rounds completion message if reached.
Args:
ctx: Workflow context for yielding output
@@ -501,8 +530,9 @@ class BaseGroupChatOrchestrator(Executor, ABC):
"""
reach_max_rounds = self._check_round_limit()
if reach_max_rounds:
self._append_messages([self._create_completion_message(self.MAX_ROUNDS_MET_MESSAGE)])
await ctx.yield_output(self._full_conversation)
completion_message = self._create_completion_message(self.MAX_ROUNDS_MET_MESSAGE)
self._append_messages([completion_message])
await self._yield_completion(ctx, completion_message)
return True
return False
@@ -6,7 +6,7 @@ import logging
from collections.abc import Callable, Sequence
from typing import Any
from agent_framework import Message, SupportsAgentRun
from agent_framework import AgentResponse, Message, SupportsAgentRun
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
from agent_framework._workflows._agent_utils import resolve_agent_id
from agent_framework._workflows._checkpoint import CheckpointStorage
@@ -71,18 +71,20 @@ class _DispatchToAllParticipants(Executor):
class _AggregateAgentConversations(Executor):
"""Aggregates agent responses and completes with combined ChatMessages.
"""Aggregates agent responses and completes with a single AgentResponse.
Emits a list[Message] shaped as:
[ single_user_prompt?, agent1_final_assistant, agent2_final_assistant, ... ]
Emits an `AgentResponse` whose `messages` are the final assistant message from each
participant (one message per agent), in deterministic participant order matching
the fan-in `sources` configuration. The user prompt is intentionally not included —
that is part of the input, not the answer.
- Extracts a single user prompt (first user message seen across results).
- For each result, selects the final assistant message (prefers agent_response.messages).
- Avoids duplicating the same user message per agent.
For each participant the final assistant message is sourced from
`r.agent_response.messages`, falling back to scanning `r.full_conversation` for
pathological executors that did not populate the response.
"""
@handler
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, list[Message]]) -> None:
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, AgentResponse]) -> None:
if not results:
logger.error("Concurrent aggregator received empty results list")
raise ValueError("Aggregation failed: no results provided")
@@ -91,12 +93,10 @@ class _AggregateAgentConversations(Executor):
r = getattr(msg, "role", None)
if r is None:
return False
# Normalize both r and role to lowercase strings for comparison
r_str = str(r).lower() if isinstance(r, str) or hasattr(r, "__str__") else r
role_str = str(role).lower()
return r_str == role_str
prompt_message: Message | None = None
assistant_replies: list[Message] = []
for r in results:
@@ -107,10 +107,6 @@ class _AggregateAgentConversations(Executor):
f"{len(resp_messages)} response msgs, {len(r.full_conversation)} conversation msgs"
)
# Capture a single user prompt (first encountered across any conversation)
if prompt_message is None:
prompt_message = next((m for m in r.full_conversation if _is_role(m, "user")), None)
# Pick the final assistant message from the response; fallback to conversation search
final_assistant = next((m for m in reversed(resp_messages) if _is_role(m, "assistant")), None)
if final_assistant is None:
@@ -127,14 +123,7 @@ class _AggregateAgentConversations(Executor):
logger.error(f"Aggregation failed: no assistant replies found across {len(results)} results")
raise RuntimeError("Aggregation failed: no assistant replies found")
output: list[Message] = []
if prompt_message is not None:
output.append(prompt_message)
else:
logger.warning("No user prompt found in any conversation; emitting assistants only")
output.extend(assistant_replies)
await ctx.yield_output(output)
await ctx.yield_output(AgentResponse(messages=assistant_replies))
class _CallbackAggregator(Executor):
@@ -190,7 +179,8 @@ class ConcurrentBuilder:
from agent_framework_orchestrations import ConcurrentBuilder
# Minimal: use default aggregator (returns list[Message])
# Minimal: use default aggregator (yields one AgentResponse with one assistant
# message per participant)
workflow = ConcurrentBuilder(participants=[agent1, agent2, agent3]).build()
@@ -222,8 +212,9 @@ class ConcurrentBuilder:
Args:
participants: Sequence of agent or executor instances to run in parallel.
checkpoint_storage: Optional checkpoint storage for enabling workflow state persistence.
intermediate_outputs: If True, enables intermediate outputs from agent participants
before aggregation.
intermediate_outputs: If True, every participant's `yield_output` surfaces as a
workflow `output` event in addition to the aggregator's. By default
(False) only the aggregator's output surfaces.
"""
self._participants: list[SupportsAgentRun | Executor] = []
self._aggregator: Executor | None = None
@@ -383,7 +374,7 @@ class ConcurrentBuilder:
- If request info is enabled, the orchestration emits a request info event with outputs from all participants
before sending the outputs to the aggregator
- Aggregator yields output and the workflow becomes idle. The output is either:
- list[Message] (default aggregator: one user + one assistant per agent)
- AgentResponse (default aggregator: one assistant message per participant)
- custom payload from the provided aggregator
Returns:
@@ -29,7 +29,7 @@ from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass
from typing import Any, ClassVar, cast
from agent_framework import Agent, AgentSession, Message, SupportsAgentRun
from agent_framework import Agent, AgentResponse, AgentResponseUpdate, AgentSession, Message, SupportsAgentRun
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
from agent_framework._workflows._agent_utils import resolve_agent_id
from agent_framework._workflows._checkpoint import CheckpointStorage
@@ -169,7 +169,9 @@ class GroupChatOrchestrator(BaseGroupChatOrchestrator):
"""Initialize orchestrator state and start the conversation loop."""
self._append_messages(messages)
# Termination condition will also be applied to the input messages
if await self._check_terminate_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)):
if await self._check_terminate_and_yield(
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)
):
return
next_speaker = await self._get_next_speaker()
@@ -198,9 +200,13 @@ class GroupChatOrchestrator(BaseGroupChatOrchestrator):
messages = clean_conversation_for_handoff(messages)
self._append_messages(messages)
if await self._check_terminate_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)):
if await self._check_terminate_and_yield(
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)
):
return
if await self._check_round_limit_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)):
if await self._check_round_limit_and_yield(
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)
):
return
next_speaker = await self._get_next_speaker()
@@ -332,13 +338,15 @@ class AgentBasedGroupChatOrchestrator(BaseGroupChatOrchestrator):
"""Initialize orchestrator state and start the conversation loop."""
self._append_messages(messages)
# Termination condition will also be applied to the input messages
if await self._check_terminate_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)):
if await self._check_terminate_and_yield(
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)
):
return
agent_orchestration_output = await self._invoke_agent()
if await self._check_agent_terminate_and_yield(
agent_orchestration_output,
cast(WorkflowContext[Never, list[Message]], ctx),
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx),
):
return
@@ -366,15 +374,19 @@ class AgentBasedGroupChatOrchestrator(BaseGroupChatOrchestrator):
# Remove tool-related content to prevent API errors from empty messages
messages = clean_conversation_for_handoff(messages)
self._append_messages(messages)
if await self._check_terminate_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)):
if await self._check_terminate_and_yield(
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)
):
return
if await self._check_round_limit_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)):
if await self._check_round_limit_and_yield(
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)
):
return
agent_orchestration_output = await self._invoke_agent()
if await self._check_agent_terminate_and_yield(
agent_orchestration_output,
cast(WorkflowContext[Never, list[Message]], ctx),
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx),
):
return
@@ -522,9 +534,9 @@ class AgentBasedGroupChatOrchestrator(BaseGroupChatOrchestrator):
async def _check_agent_terminate_and_yield(
self,
agent_orchestration_output: AgentOrchestrationOutput,
ctx: WorkflowContext[Never, list[Message]],
ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate],
) -> bool:
"""Check if the agent requested termination and yield completion if so.
"""Yield the orchestrator's completion if termination was requested.
Args:
agent_orchestration_output: Output from the orchestrator agent
@@ -536,8 +548,9 @@ class AgentBasedGroupChatOrchestrator(BaseGroupChatOrchestrator):
final_message = (
agent_orchestration_output.final_message or "The conversation has been terminated by the agent."
)
self._append_messages([self._create_completion_message(final_message)])
await ctx.yield_output(self._full_conversation)
completion_message = self._create_completion_message(final_message)
self._append_messages([completion_message])
await self._yield_completion(ctx, completion_message)
return True
return False
@@ -622,7 +635,9 @@ class GroupChatBuilder:
True to terminate the conversation, False to continue.
max_rounds: Optional maximum number of orchestrator rounds to prevent infinite conversations.
checkpoint_storage: Optional checkpoint storage for enabling workflow state persistence.
intermediate_outputs: If True, enables intermediate outputs from agent participants.
intermediate_outputs: If True, every participant's `yield_output` surfaces as a
workflow `output` event in addition to the orchestrator's. By default (False)
only the orchestrator's output surfaces.
"""
self._participants: dict[str, SupportsAgentRun | Executor] = {}
self._participant_factories: list[Callable[[], SupportsAgentRun | Executor]] = []
@@ -643,8 +658,7 @@ class GroupChatBuilder:
self._request_info_enabled: bool = False
self._request_info_filter: set[str] = set()
# Intermediate outputs
self._intermediate_outputs = intermediate_outputs
self._intermediate_outputs: bool = intermediate_outputs
if participants is None and participant_factories is None:
raise ValueError("Either participants or participant_factories must be provided.")
@@ -352,7 +352,7 @@ class HandoffAgentExecutor(AgentExecutor):
self._full_conversation.extend(self._cache.copy())
# Check termination condition before running the agent
if await self._check_terminate_and_yield(ctx):
if await self._should_terminate():
return
# Run the agent
@@ -410,7 +410,7 @@ class HandoffAgentExecutor(AgentExecutor):
# Re-evaluate termination after appending and broadcasting this response.
# Without this check, workflows that become terminal due to the latest assistant
# message would still emit request_info and require an unnecessary extra resume.
if await self._check_terminate_and_yield(ctx):
if await self._should_terminate():
return
# Handle case where no handoff was requested
@@ -447,10 +447,10 @@ class HandoffAgentExecutor(AgentExecutor):
response: The user's response messages
ctx: The workflow context
If the response is empty, it indicates termination of the handoff workflow.
If the response is empty, the handoff workflow terminates. Per-agent responses
already surfaced as `output` events; no terminal yield is needed.
"""
if not response:
await ctx.yield_output(self._full_conversation)
return
# Broadcast the user response to all other agents
@@ -520,14 +520,12 @@ class HandoffAgentExecutor(AgentExecutor):
return None
async def _check_terminate_and_yield(self, ctx: WorkflowContext[Any, Any]) -> bool:
"""Check termination conditions and yield completion if met.
async def _should_terminate(self) -> bool:
"""Pure predicate: return True iff the configured termination condition is satisfied.
Args:
ctx: Workflow context for yielding output
Returns:
True if termination condition met and output yielded, False otherwise
Per-agent responses already surface as `output` events as agents speak, so the
handoff workflow has no terminal yield to make — this method only decides whether
the workflow should stop iterating.
"""
if self._termination_condition is None:
return False
@@ -535,12 +533,7 @@ class HandoffAgentExecutor(AgentExecutor):
terminated = self._termination_condition(self._full_conversation)
if inspect.isawaitable(terminated):
terminated = await terminated
if terminated:
await ctx.yield_output(self._full_conversation)
return True
return False
return bool(terminated)
@override
async def on_checkpoint_save(self) -> dict[str, Any]:
@@ -577,13 +570,15 @@ class HandoffBuilder:
tool injection, and middleware — capabilities only available on ``Agent``.
Outputs:
The final conversation history as a list of Message once the group chat completes.
Each agent's response surfaces as a workflow `output` event as it speaks; there is no
synthetic terminal event. Consumers iterating events see per-agent ``AgentResponse`` (or
``AgentResponseUpdate`` while streaming) in conversation order. The workflow returns to
idle once the termination condition is met (or the user terminates an interactive run).
Note:
1. Agents in handoff workflows must be ``Agent`` instances and support local tool calls.
2. Handoff doesn't support intermediate outputs from agents. All outputs are returned as
they become available. This is because agents in handoff workflows are not considered
sub-agents of a central orchestrator, thus all outputs are directly emitted.
2. Because each agent's response is itself a workflow output, handoff has no separate
"intermediate outputs" channel — every per-agent response is the primary output.
"""
def __init__(
@@ -14,6 +14,7 @@ from typing import Any, ClassVar, TypeVar, cast
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
Message,
SupportsAgentRun,
@@ -1057,7 +1058,9 @@ class MagenticOrchestrator(BaseGroupChatOrchestrator):
if self._magentic_context is None:
raise RuntimeError("Context not initialized")
# Check limits first
within_limits = await self._check_within_limits_or_complete(cast(WorkflowContext[Never, list[Message]], ctx))
within_limits = await self._check_within_limits_or_complete(
cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)
)
if not within_limits:
return
@@ -1092,7 +1095,7 @@ class MagenticOrchestrator(BaseGroupChatOrchestrator):
# Check for task completion
if self._progress_ledger.is_request_satisfied.answer:
logger.info("Magentic Orchestrator: Task completed")
await self._prepare_final_answer(cast(WorkflowContext[Never, list[Message]], ctx))
await self._prepare_final_answer(cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx))
return
# Check for stalling or looping
@@ -1116,7 +1119,7 @@ class MagenticOrchestrator(BaseGroupChatOrchestrator):
if next_speaker not in self._participant_registry.participants:
logger.warning(f"Invalid next speaker: {next_speaker}")
await self._prepare_final_answer(cast(WorkflowContext[Never, list[Message]], ctx))
await self._prepare_final_answer(cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx))
return
# Add instruction to conversation (assistant guidance)
@@ -1192,20 +1195,25 @@ class MagenticOrchestrator(BaseGroupChatOrchestrator):
# Start inner loop
await self._run_inner_loop(ctx)
async def _prepare_final_answer(self, ctx: WorkflowContext[Never, list[Message]]) -> None:
"""Prepare the final answer using the manager."""
async def _prepare_final_answer(self, ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate]) -> None:
"""Yield the manager's synthesized final answer.
Mode-aware: streaming -> ``AgentResponseUpdate``, non-streaming → ``AgentResponse``.
See ``BaseGroupChatOrchestrator._yield_completion``.
"""
if self._magentic_context is None:
raise RuntimeError("Context not initialized")
logger.info("Magentic Orchestrator: Preparing final answer")
final_answer = await self._manager.prepare_final_answer(self._magentic_context.clone(deep=True))
# Emit a completed event for the workflow
await ctx.yield_output([final_answer])
await self._yield_completion(ctx, final_answer)
self._terminated = True
async def _check_within_limits_or_complete(self, ctx: WorkflowContext[Never, list[Message]]) -> bool:
async def _check_within_limits_or_complete(
self, ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate]
) -> bool:
"""Check if orchestrator is within operational limits.
If limits are exceeded, yield a termination message and mark the workflow as terminated.
@@ -1229,15 +1237,12 @@ class MagenticOrchestrator(BaseGroupChatOrchestrator):
limit_type = "round" if hit_round_limit else "reset"
logger.error(f"Magentic Orchestrator: Max {limit_type} count reached")
# Yield the full conversation with an indication of termination due to limits
await ctx.yield_output([
*self._magentic_context.chat_history,
Message(
role="assistant",
contents=[f"Workflow terminated due to reaching maximum {limit_type} count."],
author_name=MAGENTIC_MANAGER_NAME,
),
])
termination_message = Message(
role="assistant",
contents=[f"Workflow terminated due to reaching maximum {limit_type} count."],
author_name=MAGENTIC_MANAGER_NAME,
)
await self._yield_completion(ctx, termination_message)
self._terminated = True
return False
@@ -1427,7 +1432,9 @@ class MagenticBuilder:
max_round_count: Max total coordination rounds. None means unlimited.
enable_plan_review: If True, requires human approval of the initial plan before proceeding.
checkpoint_storage: Optional checkpoint storage for enabling workflow state persistence.
intermediate_outputs: If True, enables intermediate outputs from agent participants.
intermediate_outputs: If True, every participant's `yield_output` surfaces as a
workflow `output` event in addition to the orchestrator's. By default (False)
only the orchestrator's output surfaces.
"""
self._participants: dict[str, SupportsAgentRun | Executor] = {}
@@ -1440,7 +1447,6 @@ class MagenticBuilder:
self._checkpoint_storage: CheckpointStorage | None = checkpoint_storage
# Intermediate outputs
self._intermediate_outputs = intermediate_outputs
self._set_participants(participants)
@@ -4,7 +4,7 @@ from dataclasses import dataclass
from typing import Literal
from agent_framework._agents import SupportsAgentRun
from agent_framework._types import Message
from agent_framework._types import AgentResponse, Message
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
from agent_framework._workflows._agent_utils import resolve_agent_id
from agent_framework._workflows._executor import Executor, handler
@@ -86,7 +86,13 @@ class AgentRequestInfoResponse:
class AgentRequestInfoExecutor(Executor):
"""Executor for gathering request info from users to assist agents."""
"""Executor for gathering request info from users to assist agents.
On approval (caller returned no follow-up messages), yields the original
``AgentExecutorResponse`` so downstream ``AgentExecutor`` participants can consume it
via their ``from_response`` handler — i.e., the inner workflow's output type matches the
chain currency used between Sequential participants.
"""
@handler
async def request_info(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext) -> None:
@@ -109,6 +115,56 @@ class AgentRequestInfoExecutor(Executor):
await ctx.yield_output(original_request)
class _TerminalAgentRequestInfoExecutor(Executor):
"""Sibling of ``AgentRequestInfoExecutor`` used when ``AgentApprovalExecutor`` is the workflow's terminator.
This exists because:
- The orchestration contract established is that every orchestration's terminal
``output`` event carries an ``AgentResponse``. That is the user-facing promise — e.g.,
``workflow.as_agent().run(prompt)`` returns an ``AgentResponse``.
- ``AgentRequestInfoExecutor`` yields ``AgentExecutorResponse`` because that is the chain
currency between Sequential participants: the next ``AgentExecutor`` consumes
``AgentExecutorResponse`` via its ``from_response`` handler. That is correct when
``AgentApprovalExecutor`` is *intermediate*.
- When ``AgentApprovalExecutor`` is the *terminator* (``allow_direct_output=True``), the
inner yield flows straight through ``WorkflowExecutor`` to the outer workflow's terminal
output. Yielding ``AgentExecutorResponse`` there would surface ``AgentExecutorResponse``
as the workflow's terminal output — violating the orchestration contract.
Used in place of ``AgentRequestInfoExecutor`` inside the terminator-mode inner workflow
built by ``AgentApprovalExecutor._build_workflow`` when ``allow_direct_output=True``.
Translation belongs here — at the source of the yield in the orchestrations package —
rather than at the ``WorkflowExecutor`` boundary in core, because core has no opinion
about the orchestration's ``AgentResponse`` contract.
Note: not a subclass of ``AgentRequestInfoExecutor``. The two classes have different
terminal yield contracts (``AgentExecutorResponse`` vs. ``AgentResponse``), and
``WorkflowContext``'s output type parameter is invariant — so a subclass override would
be type-incompatible. They are siblings sharing only a small ``request_info`` handler.
"""
@handler
async def request_info(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext) -> None:
"""Handle the agent's response and gather additional info from users."""
await ctx.request_info(agent_response, AgentRequestInfoResponse)
@response_handler
async def handle_request_info_response(
self,
original_request: AgentExecutorResponse,
response: AgentRequestInfoResponse,
ctx: WorkflowContext[AgentExecutorRequest, AgentResponse],
) -> None:
"""Process the additional info provided by users; yield ``AgentResponse`` on approval."""
if response.messages:
# User provided additional messages, further iterate on agent response
await ctx.send_message(AgentExecutorRequest(messages=response.messages, should_respond=True))
else:
# No additional info, approve and surface the wrapped AgentResponse to the parent.
await ctx.yield_output(original_request.agent_response)
class AgentApprovalExecutor(WorkflowExecutor):
"""Executor for enabling scenarios requiring agent approval in an orchestration.
@@ -122,22 +178,47 @@ class AgentApprovalExecutor(WorkflowExecutor):
self,
agent: SupportsAgentRun,
context_mode: Literal["full", "last_agent", "custom"] | None = None,
*,
allow_direct_output: bool = False,
) -> None:
"""Initialize the AgentApprovalExecutor.
Args:
agent: The agent protocol to use for generating responses.
context_mode: The mode for providing context to the agent.
allow_direct_output: When True, the inner agent's response is yielded as the
wrapping workflow's output (rather than forwarded as a message to a
downstream participant). Set this when this executor is the workflow's
terminator — so the user-approved final response surfaces as a workflow
``output`` event.
"""
self._context_mode: Literal["full", "last_agent", "custom"] | None = context_mode
self._description = agent.description
super().__init__(workflow=self._build_workflow(agent), id=resolve_agent_id(agent), propagate_request=True)
super().__init__(
workflow=self._build_workflow(agent, terminal=allow_direct_output),
id=resolve_agent_id(agent),
propagate_request=True,
allow_direct_output=allow_direct_output,
)
def _build_workflow(self, agent: SupportsAgentRun) -> Workflow:
"""Build the internal workflow for the AgentApprovalExecutor."""
agent_executor = AgentExecutor(agent, context_mode=self._context_mode)
request_info_executor = AgentRequestInfoExecutor(id="agent_request_info_executor")
def _build_workflow(self, agent: SupportsAgentRun, *, terminal: bool) -> Workflow:
"""Build the internal workflow for the AgentApprovalExecutor.
Picks the right ``AgentRequestInfoExecutor`` variant for the role this approval flow
plays in the outer workflow:
- Intermediate (``terminal=False``): inner workflow yields ``AgentExecutorResponse``
so the next outer ``AgentExecutor`` participant can consume it via ``from_response``.
- Terminator (``terminal=True``): inner workflow yields ``AgentResponse`` so the outer
workflow's terminal output matches the orchestration contract.
"""
agent_executor = AgentExecutor(
agent,
context_mode=self._context_mode,
)
request_info_cls = _TerminalAgentRequestInfoExecutor if terminal else AgentRequestInfoExecutor
request_info_executor = request_info_cls(id="agent_request_info_executor")
return (
WorkflowBuilder(start_executor=agent_executor)
@@ -2,50 +2,24 @@
"""Sequential builder for agent/executor workflows with shared conversation context.
This module provides a high-level, agent-focused API to assemble a sequential
workflow where:
- Participants are provided as SupportsAgentRun or Executor instances via `participants=[...]`
- A shared conversation context (list[Message]) is passed along the chain
- Agents append their assistant messages to the context
- Custom executors can transform or summarize and return a refined context
- The workflow finishes with the final context produced by the last participant
Participants (SupportsAgentRun or Executor instances) run in order, sharing a
conversation along the chain. Agents append their assistant messages; custom executors
transform and return a refined `list[Message]`.
Typical wiring:
input -> _InputToConversation -> participant1 -> (agent? -> _ResponseToConversation) ->
... -> participantN -> _EndWithConversation
Wiring: input -> _InputToConversation -> participant1 -> ... -> participantN
Notes:
- Participants can mix SupportsAgentRun and Executor objects
- Agents are auto-wrapped by WorkflowBuilder as AgentExecutor (unless already wrapped)
- AgentExecutor produces AgentExecutorResponse; _ResponseToConversation converts this to list[Message]
- Non-agent executors must define a handler that consumes `list[Message]` and sends back
the updated `list[Message]` via their workflow context
Why include the small internal adapter executors?
- Input normalization ("input-conversation"): ensures the workflow always starts with a
`list[Message]` regardless of whether callers pass a `str`, a single `Message`,
or a list. This keeps the first hop strongly typed and avoids boilerplate in participants.
- Agent response adaptation ("to-conversation:<participant>"): agents (via AgentExecutor)
emit `AgentExecutorResponse`. The adapter converts that to a `list[Message]`
using `full_conversation` so original prompts aren't lost when chaining.
- Result output ("end"): yields the final conversation list and the workflow becomes idle
giving a consistent terminal payload shape for both agents and custom executors.
These adapters are first-class executors by design so they are type-checked at edges,
observable (ExecutorInvoke/Completed events), and easily testable/reusable. Their IDs are
deterministic and self-describing (for example, "to-conversation:writer") to reduce event-log
confusion and to mirror how the concurrent builder uses explicit dispatcher/aggregator nodes.
The workflow's final `output` event is the last participant's `yield_output(...)`. For
agent terminators that is an `AgentResponse` (or per-chunk `AgentResponseUpdate`s when
streaming). For custom-executor terminators, the executor itself yields whatever it
produces — by convention an `AgentResponse` so downstream consumers see a uniform shape.
"""
import logging
from collections.abc import Sequence
from typing import Any, Literal
from typing import Literal
from agent_framework import Message, SupportsAgentRun
from agent_framework._workflows._agent_executor import (
AgentExecutor,
AgentExecutorResponse,
)
from agent_framework._workflows._agent_executor import AgentExecutor
from agent_framework._workflows._agent_utils import resolve_agent_id
from agent_framework._workflows._checkpoint import CheckpointStorage
from agent_framework._workflows._executor import (
@@ -78,34 +52,6 @@ class _InputToConversation(Executor):
await ctx.send_message(normalize_messages_input(messages))
class _EndWithConversation(Executor):
"""Terminates the workflow by emitting the final conversation context."""
@handler
async def end_with_messages(
self,
conversation: list[Message],
ctx: WorkflowContext[Any, list[Message]],
) -> None:
"""Handler for ending with a list of Message.
This is used when the last participant is a custom executor.
"""
await ctx.yield_output(list(conversation))
@handler
async def end_with_agent_executor_response(
self,
response: AgentExecutorResponse,
ctx: WorkflowContext[Any, list[Message] | None],
) -> None:
"""Handle case where last participant is an agent.
The agent is wrapped by AgentExecutor and emits AgentExecutorResponse.
"""
await ctx.yield_output(response.full_conversation)
class SequentialBuilder:
r"""High-level builder for sequential agent/executor workflows with shared context.
@@ -155,7 +101,9 @@ class SequentialBuilder:
chain_only_agent_responses: If True, only agent responses are chained between agents.
By default, the full conversation context is passed to the next agent. This also applies
to Executor -> Agent transitions if the executor sends `AgentExecutorResponse`.
intermediate_outputs: If True, enables intermediate outputs from agent participants.
intermediate_outputs: If True, every participant's `yield_output` surfaces as a
workflow `output` event in addition to the terminator's. By default (False) only
the last participant's output surfaces.
"""
self._participants: list[SupportsAgentRun | Executor] = []
self._checkpoint_storage: CheckpointStorage | None = checkpoint_storage
@@ -225,7 +173,14 @@ class SequentialBuilder:
return self
def _resolve_participants(self) -> list[Executor]:
"""Resolve participant instances into Executor objects."""
"""Resolve participant instances into Executor objects.
Wraps `SupportsAgentRun` participants as `AgentExecutor` (or `AgentApprovalExecutor`
when request-info is enabled for that participant). The last participant, when wrapped
as `AgentApprovalExecutor`, is constructed with `allow_direct_output=True` so the
approved response surfaces as the workflow's output event instead of being forwarded
as a message that has nowhere to go.
"""
if not self._participants:
raise ValueError("No participants provided. Pass participants to the constructor.")
@@ -235,8 +190,9 @@ class SequentialBuilder:
"last_agent" if self._chain_only_agent_responses else None
)
last_idx = len(participants) - 1
executors: list[Executor] = []
for p in participants:
for idx, p in enumerate(participants):
if isinstance(p, Executor):
executors.append(p)
elif isinstance(p, SupportsAgentRun):
@@ -244,7 +200,13 @@ class SequentialBuilder:
not self._request_info_filter or resolve_agent_id(p) in self._request_info_filter
):
# Handle request info enabled agents
executors.append(AgentApprovalExecutor(p, context_mode=context_mode))
executors.append(
AgentApprovalExecutor(
p,
context_mode=context_mode,
allow_direct_output=(idx == last_idx),
)
)
else:
executors.append(AgentExecutor(p, context_mode=context_mode))
else:
@@ -256,17 +218,18 @@ class SequentialBuilder:
"""Build and validate the sequential workflow.
Wiring pattern:
- _InputToConversation normalizes the initial input into list[Message]
- For each participant in order:
- If Agent (or AgentExecutor): pass conversation to the agent, then optionally
route through a request info interceptor, then convert response to conversation
via _ResponseToConversation
- Else (custom Executor): pass conversation directly to the executor
- _EndWithConversation yields the final conversation and the workflow becomes idle
- `_InputToConversation` normalizes the initial input into `list[Message]`.
- Each participant runs in order:
- `AgentExecutor`: receives the conversation / `AgentExecutorResponse` and
forwards an `AgentExecutorResponse` downstream.
- Custom `Executor`: receives `list[Message]` and forwards `list[Message]`.
If used as the terminator, it must call `ctx.yield_output(AgentResponse(...))`
instead of `ctx.send_message(...)` — its yield becomes the workflow's output.
- The last participant is registered as the workflow's `output_executor`, so the
terminator's own `yield_output` is the workflow's terminal output (`AgentResponse`,
or per-chunk `AgentResponseUpdate` when streaming).
"""
# Internal nodes
input_conv = _InputToConversation(id="input-conversation")
end = _EndWithConversation(id="end")
# Resolve participants and participant factories to executors
participants: list[Executor] = self._resolve_participants()
@@ -274,15 +237,12 @@ class SequentialBuilder:
builder = WorkflowBuilder(
start_executor=input_conv,
checkpoint_storage=self._checkpoint_storage,
output_executors=[end] if not self._intermediate_outputs else None,
output_executors=[participants[-1]] if not self._intermediate_outputs else None,
)
# Start of the chain is the input normalizer
prior: Executor | SupportsAgentRun = input_conv
for p in participants:
builder.add_edge(prior, p)
prior = p
# Terminate with the final conversation
builder.add_edge(prior, end)
return builder.build()
@@ -1,14 +1,21 @@
# Copyright (c) Microsoft. All rights reserved.
from typing import Any, cast
from collections.abc import AsyncIterable, Awaitable
from typing import Any, Literal, cast, overload
import pytest
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
AgentResponse,
AgentResponseUpdate,
AgentRunInputs,
AgentSession,
BaseAgent,
Content,
Executor,
Message,
ResponseStream,
WorkflowContext,
WorkflowRunState,
handler,
@@ -49,36 +56,26 @@ def test_concurrent_builder_rejects_duplicate_executors() -> None:
ConcurrentBuilder(participants=[a, b])
async def test_concurrent_default_aggregator_emits_single_user_and_assistants() -> None:
# Three synthetic agent executors
async def test_concurrent_default_aggregator_emits_assistants_only() -> None:
"""Default aggregator yields a single AgentResponse with one assistant message per participant.
The user prompt is intentionally not included — that belongs in the input, not the answer.
"""
e1 = _FakeAgentExec("agentA", "Alpha")
e2 = _FakeAgentExec("agentB", "Beta")
e3 = _FakeAgentExec("agentC", "Gamma")
wf = ConcurrentBuilder(participants=[e1, e2, e3]).build()
completed = False
output: list[Message] | None = None
async for ev in wf.run("prompt: hello world", stream=True):
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
completed = True
elif ev.type == "output":
output = cast(list[Message], ev.data)
if completed and output is not None:
break
output_events = [ev for ev in await wf.run("prompt: hello world") if ev.type == "output"]
assert len(output_events) == 1
response = output_events[0].data
assert isinstance(response, AgentResponse)
assert completed
assert output is not None
messages: list[Message] = output
# Expect one user message + one assistant message per participant
assert len(messages) == 1 + 3
assert messages[0].role == "user"
assert "hello world" in messages[0].text
assistant_texts = {m.text for m in messages[1:]}
assert assistant_texts == {"Alpha", "Beta", "Gamma"}
assert all(m.role == "assistant" for m in messages[1:])
# Exactly one assistant message per participant; no user prompt.
assert len(response.messages) == 3
assert all(m.role == "assistant" for m in response.messages)
assert {m.text for m in response.messages} == {"Alpha", "Beta", "Gamma"}
async def test_concurrent_custom_aggregator_callback_is_used() -> None:
@@ -215,7 +212,7 @@ async def test_concurrent_checkpoint_resume_round_trip() -> None:
wf = ConcurrentBuilder(participants=list(participants), checkpoint_storage=storage).build()
baseline_output: list[Message] | None = None
baseline_output: AgentResponse | None = None
async for ev in wf.run("checkpoint concurrent", stream=True):
if ev.type == "output":
baseline_output = ev.data # type: ignore[assignment]
@@ -236,7 +233,7 @@ async def test_concurrent_checkpoint_resume_round_trip() -> None:
)
wf_resume = ConcurrentBuilder(participants=list(resumed_participants), checkpoint_storage=storage).build()
resumed_output: list[Message] | None = None
resumed_output: AgentResponse | None = None
async for ev in wf_resume.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True):
if ev.type == "output":
resumed_output = ev.data # type: ignore[assignment]
@@ -247,8 +244,8 @@ async def test_concurrent_checkpoint_resume_round_trip() -> None:
break
assert resumed_output is not None
assert [m.role for m in resumed_output] == [m.role for m in baseline_output]
assert [m.text for m in resumed_output] == [m.text for m in baseline_output]
assert [m.role for m in resumed_output.messages] == [m.role for m in baseline_output.messages]
assert [m.text for m in resumed_output.messages] == [m.text for m in baseline_output.messages]
async def test_concurrent_checkpoint_runtime_only() -> None:
@@ -258,7 +255,7 @@ async def test_concurrent_checkpoint_runtime_only() -> None:
agents = [_FakeAgentExec(id="agent1", reply_text="A1"), _FakeAgentExec(id="agent2", reply_text="A2")]
wf = ConcurrentBuilder(participants=agents).build()
baseline_output: list[Message] | None = None
baseline_output: AgentResponse | None = None
async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True):
if ev.type == "output":
baseline_output = ev.data # type: ignore[assignment]
@@ -278,7 +275,7 @@ async def test_concurrent_checkpoint_runtime_only() -> None:
resumed_agents = [_FakeAgentExec(id="agent1", reply_text="A1"), _FakeAgentExec(id="agent2", reply_text="A2")]
wf_resume = ConcurrentBuilder(participants=resumed_agents).build()
resumed_output: list[Message] | None = None
resumed_output: AgentResponse | None = None
async for ev in wf_resume.run(
checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage, stream=True
):
@@ -291,7 +288,7 @@ async def test_concurrent_checkpoint_runtime_only() -> None:
break
assert resumed_output is not None
assert [m.role for m in resumed_output] == [m.role for m in baseline_output]
assert [m.role for m in resumed_output.messages] == [m.role for m in baseline_output.messages]
async def test_concurrent_checkpoint_runtime_overrides_buildtime() -> None:
@@ -334,3 +331,46 @@ async def test_concurrent_builder_reusable_after_build_with_participants() -> No
assert builder._participants[0] is e1 # type: ignore
assert builder._participants[1] is e2 # type: ignore
class _EchoAgent(BaseAgent):
"""Simple agent that appends a single assistant message with its name."""
@overload
def run(
self,
messages: AgentRunInputs | None = ...,
*,
stream: Literal[False] = ...,
session: AgentSession | None = ...,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]]: ...
@overload
def run(
self,
messages: AgentRunInputs | None = ...,
*,
stream: Literal[True],
session: AgentSession | None = ...,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...
def run(
self,
messages: AgentRunInputs | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
if stream:
async def _stream() -> AsyncIterable[AgentResponseUpdate]:
yield AgentResponseUpdate(contents=[Content.from_text(text=f"{self.name} reply")])
return ResponseStream(_stream(), finalizer=AgentResponse.from_updates)
async def _run() -> AgentResponse:
return AgentResponse(messages=[Message("assistant", [f"{self.name} reply"])])
return _run()
@@ -238,18 +238,16 @@ async def test_group_chat_builder_basic_flow() -> None:
orchestrator_name="manager",
).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("coordinate task", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
assert len(outputs) == 1
assert len(outputs[0]) >= 1
# Check that both agents contributed
authors = {msg.author_name for msg in outputs[0] if msg.author_name in ["alpha", "beta"]}
assert len(authors) == 2
# Exactly one terminal `output` event = the orchestrator's completion AgentResponseUpdate
# (mode-aware: streaming yields a single update chunk for the synthesized message).
assert len(updates) == 1
# The completion message is authored by the orchestrator.
assert updates[0].author_name == "manager"
async def test_group_chat_as_agent_accepts_conversation() -> None:
@@ -283,18 +281,16 @@ async def test_agent_manager_handles_concatenated_json_output() -> None:
orchestrator_agent=manager,
).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("coordinate task", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
assert outputs
conversation = outputs[-1]
assert any(msg.author_name == "agent" and msg.text == "worker response" for msg in conversation)
assert conversation[-1].author_name == manager.name
assert conversation[-1].text == "concatenated manager final"
assert updates
final_update = updates[-1]
# Terminal update is the orchestrator's completion message.
assert final_update.author_name == manager.name
assert final_update.text == "concatenated manager final"
# Comprehensive tests for group chat functionality
@@ -400,20 +396,14 @@ class TestGroupChatWorkflow:
selection_func=selector,
).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("test task", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
# Should have terminated due to max_rounds, expect at least one output
assert len(outputs) >= 1
# The final message in the conversation should be about round limit
conversation = outputs[-1]
assert len(conversation) >= 1
final_output = conversation[-1]
assert "maximum number of rounds" in final_output.text.lower()
# Exactly one terminal output event = orchestrator's max-rounds completion update.
assert len(updates) == 1
assert "maximum number of rounds" in (updates[0].text or "").lower()
async def test_termination_condition_halts_conversation(self) -> None:
"""Test that a custom termination condition stops the workflow."""
@@ -433,20 +423,89 @@ class TestGroupChatWorkflow:
selection_func=selector,
).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("test task", stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
assert updates, "Expected termination to yield output"
# Terminal update is the orchestrator's completion message only.
assert "termination condition" in (updates[-1].text or "").lower()
async def test_termination_yields_update_in_streaming(self) -> None:
"""In streaming mode, the orchestrator's terminal completion surfaces as `AgentResponseUpdate`.
Mirrors AgentExecutor's mode-aware behavior: streaming workflows produce per-chunk
`AgentResponseUpdate` events; the synthesized termination message is logically a
single chunk, so it should be a single `AgentResponseUpdate`.
"""
def selector(state: GroupChatState) -> str:
return "agent"
def termination_condition(conversation: list[Message]) -> bool:
replies = [msg for msg in conversation if msg.role == "assistant" and msg.author_name == "agent"]
return len(replies) >= 2
workflow = GroupChatBuilder(
participants=[StubAgent("agent", "response")],
termination_condition=termination_condition,
selection_func=selector,
).build()
terminal: AgentResponseUpdate | None = None
async for event in workflow.run("test task", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
terminal = event.data # last output event wins
assert outputs, "Expected termination to yield output"
conversation = outputs[-1]
agent_replies = [msg for msg in conversation if msg.author_name == "agent" and msg.role == "assistant"]
assert len(agent_replies) == 2
final_output = conversation[-1]
# The orchestrator uses its ID as author_name by default
assert "termination condition" in final_output.text.lower()
assert isinstance(terminal, AgentResponseUpdate), (
f"Expected AgentResponseUpdate in streaming mode, got {type(terminal).__name__}"
)
assert "termination condition" in (terminal.text or "").lower()
async def test_termination_yields_response_in_non_streaming(self) -> None:
"""In non-streaming mode, the orchestrator's terminal completion surfaces as `AgentResponse`."""
def selector(state: GroupChatState) -> str:
return "agent"
def termination_condition(conversation: list[Message]) -> bool:
replies = [msg for msg in conversation if msg.role == "assistant" and msg.author_name == "agent"]
return len(replies) >= 2
workflow = GroupChatBuilder(
participants=[StubAgent("agent", "response")],
termination_condition=termination_condition,
selection_func=selector,
).build()
events = await workflow.run("test task")
outputs = [ev for ev in events if ev.type == "output"]
assert len(outputs) == 1
assert isinstance(outputs[0].data, AgentResponse)
assert "termination condition" in outputs[0].data.messages[-1].text.lower()
async def test_max_rounds_yields_update_in_streaming(self) -> None:
"""Max-rounds completion in streaming mode surfaces as `AgentResponseUpdate`."""
def selector(state: GroupChatState) -> str:
return "agent"
workflow = GroupChatBuilder(
participants=[StubAgent("agent", "response")],
max_rounds=2,
selection_func=selector,
).build()
terminal: AgentResponseUpdate | None = None
async for event in workflow.run("test task", stream=True):
if event.type == "output":
terminal = event.data
assert isinstance(terminal, AgentResponseUpdate), (
f"Expected AgentResponseUpdate in streaming mode, got {type(terminal).__name__}"
)
assert "maximum number of rounds" in (terminal.text or "").lower()
async def test_termination_condition_agent_manager_finalizes(self) -> None:
"""Test that termination condition with agent orchestrator produces default termination message."""
@@ -459,17 +518,15 @@ class TestGroupChatWorkflow:
orchestrator_agent=manager,
).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("test task", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
assert outputs, "Expected termination to yield output"
conversation = outputs[-1]
assert conversation[-1].text == BaseGroupChatOrchestrator.TERMINATION_CONDITION_MET_MESSAGE
assert conversation[-1].author_name == manager.name
assert updates, "Expected termination to yield output"
final_update = updates[-1]
assert final_update.text == BaseGroupChatOrchestrator.TERMINATION_CONDITION_MET_MESSAGE
assert final_update.author_name == manager.name
async def test_unknown_participant_error(self) -> None:
"""Test that unknown participant selection raises error."""
@@ -505,14 +562,12 @@ class TestCheckpointing:
selection_func=selector,
).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("test task", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
assert len(outputs) == 1 # Should complete normally
assert len(updates) == 1 # Should complete normally
class TestConversationHandling:
@@ -546,14 +601,12 @@ class TestConversationHandling:
workflow = GroupChatBuilder(participants=[agent], max_rounds=1, selection_func=selector).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("test string", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
assert len(outputs) == 1
assert len(updates) == 1
async def test_handle_chat_message_input(self) -> None:
"""Test handling Message input directly."""
@@ -569,14 +622,12 @@ class TestConversationHandling:
workflow = GroupChatBuilder(participants=[agent], max_rounds=1, selection_func=selector).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run(task_message, stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
assert len(outputs) == 1
assert len(updates) == 1
async def test_handle_conversation_list_input(self) -> None:
"""Test handling conversation list preserves context."""
@@ -595,14 +646,12 @@ class TestConversationHandling:
workflow = GroupChatBuilder(participants=[agent], max_rounds=1, selection_func=selector).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run(conversation, stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
assert len(outputs) == 1
assert len(updates) == 1
class TestRoundLimitEnforcement:
@@ -625,20 +674,14 @@ class TestRoundLimitEnforcement:
selection_func=selector,
).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("test", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
# Should have at least one output (the round limit message)
assert len(outputs) >= 1
# The last message in the conversation should be about round limit
conversation = outputs[-1]
assert len(conversation) >= 1
final_output = conversation[-1]
assert "maximum number of rounds" in final_output.text.lower()
# Exactly one terminal output event = orchestrator's max-rounds completion update.
assert len(updates) == 1
assert "maximum number of rounds" in (updates[0].text or "").lower()
async def test_round_limit_in_ingest_participant_message(self) -> None:
"""Test round limit enforcement after participant response."""
@@ -658,20 +701,14 @@ class TestRoundLimitEnforcement:
selection_func=selector,
).build()
outputs: list[list[Message]] = []
updates: list[AgentResponseUpdate] = []
async for event in workflow.run("test", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, list):
outputs.append(cast(list[Message], data))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
# Should have at least one output (the round limit message)
assert len(outputs) >= 1
# The last message in the conversation should be about round limit
conversation = outputs[-1]
assert len(conversation) >= 1
final_output = conversation[-1]
assert "maximum number of rounds" in final_output.text.lower()
# Exactly one terminal output event = orchestrator's max-rounds completion update.
assert len(updates) == 1
assert "maximum number of rounds" in (updates[0].text or "").lower()
async def test_group_chat_checkpoint_runtime_only() -> None:
@@ -684,17 +721,17 @@ async def test_group_chat_checkpoint_runtime_only() -> None:
wf = GroupChatBuilder(participants=[agent_a, agent_b], max_rounds=2, selection_func=selector).build()
baseline_output: list[Message] | None = None
baseline_update: AgentResponseUpdate | None = None
async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True):
if ev.type == "output":
baseline_output = cast(list[Message], ev.data) if isinstance(ev.data, list) else None # type: ignore
if ev.type == "output" and isinstance(ev.data, AgentResponseUpdate):
baseline_update = ev.data
if ev.type == "status" and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
):
break
assert baseline_output is not None
assert baseline_update is not None
checkpoints = await storage.list_checkpoints(workflow_name=wf.name)
assert len(checkpoints) > 0, "Runtime-only checkpointing should have created checkpoints"
@@ -720,17 +757,17 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None:
checkpoint_storage=buildtime_storage,
selection_func=selector,
).build()
baseline_output: list[Message] | None = None
baseline_update: AgentResponseUpdate | None = None
async for ev in wf.run("override test", checkpoint_storage=runtime_storage, stream=True):
if ev.type == "output":
baseline_output = cast(list[Message], ev.data) if isinstance(ev.data, list) else None # type: ignore
if ev.type == "output" and isinstance(ev.data, AgentResponseUpdate):
baseline_update = ev.data
if ev.type == "status" and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
):
break
assert baseline_output is not None
assert baseline_update is not None
buildtime_checkpoints = await buildtime_storage.list_checkpoints(workflow_name=wf.name)
runtime_checkpoints = await runtime_storage.list_checkpoints(workflow_name=wf.name)
@@ -974,14 +1011,11 @@ async def test_group_chat_with_orchestrator_factory_returning_chat_agent():
outputs.append(event)
assert len(outputs) == 1
# The DynamicManagerAgent terminates after second call with final_message
final_messages = outputs[0].data
assert isinstance(final_messages, list)
assert any(
msg.text == "dynamic manager final"
for msg in cast(list[Message], final_messages)
if msg.author_name == "dynamic_manager"
)
# Streaming mode: terminal yield is AgentResponseUpdate. The DynamicManagerAgent
# terminates after second call with final_message.
final_update = outputs[0].data
assert isinstance(final_update, AgentResponseUpdate)
assert final_update.text == "dynamic manager final"
def test_group_chat_with_orchestrator_factory_returning_base_orchestrator():
@@ -9,6 +9,8 @@ from unittest.mock import AsyncMock, MagicMock
import pytest
from agent_framework import (
Agent,
AgentResponse,
AgentResponseUpdate,
ChatResponse,
ChatResponseUpdate,
Content,
@@ -856,10 +858,15 @@ async def test_autonomous_mode_yields_output_without_user_request():
outputs = [ev for ev in events if ev.type == "output"]
assert outputs, "Autonomous mode should yield a workflow output"
final_conversation = outputs[-1].data
assert isinstance(final_conversation, list)
conversation_list = cast(list[Message], final_conversation)
assert any(msg.role == "assistant" and (msg.text or "").startswith("specialist reply") for msg in conversation_list)
# Per-agent activity surfaces as `output` events from each HandoffAgentExecutor as they
# speak. Handoff has no orchestrator that produces a separate "answer" — the conversation
# IS the result. In streaming mode payloads are AgentResponseUpdate; combined text should
# contain the specialist's reply.
payloads = [ev.data for ev in outputs if isinstance(ev.data, (AgentResponse, AgentResponseUpdate))]
combined = " ".join(
getattr(p, "text", None) or " ".join(m.text for m in getattr(p, "messages", [])) for p in payloads
)
assert "specialist reply" in combined
async def test_autonomous_mode_resumes_user_input_on_turn_limit():
@@ -923,14 +930,10 @@ async def test_handoff_async_termination_condition() -> None:
stream=True, responses={requests[-1].request_id: [Message(role="user", contents=["Second user message"])]}
)
)
outputs = [ev for ev in events if ev.type == "output"]
assert len(outputs) == 1
final_conversation = outputs[0].data
assert isinstance(final_conversation, list)
final_conv_list = cast(list[Message], final_conversation)
user_messages = [msg for msg in final_conv_list if msg.role == "user"]
assert len(user_messages) == 2
# Resume run terminates without further agent activity once the second user message
# satisfies the termination condition. The workflow returns to idle cleanly.
idle_states = [ev for ev in events if ev.type == "status" and ev.state == WorkflowRunState.IDLE]
assert idle_states, "Workflow should become idle after termination"
assert termination_call_count > 0
@@ -990,8 +993,9 @@ async def test_handoff_terminates_without_request_info_when_latest_response_meet
outputs = [event for event in events if event.type == "output"]
assert outputs
conversation_outputs = [event for event in outputs if isinstance(event.data, list)]
assert len(conversation_outputs) == 1
# Per-agent activity surfaces as output events (AgentResponseUpdate in streaming mode).
agent_payloads = [event for event in outputs if isinstance(event.data, (AgentResponse, AgentResponseUpdate))]
assert len(agent_payloads) >= 1
async def test_tool_choice_preserved_from_agent_config():
@@ -190,24 +190,82 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None:
assert isinstance(workflow, Workflow)
outputs: list[Message] = []
updates: list[AgentResponseUpdate] = []
orchestrator_event_count = 0
async for event in workflow.run("compose summary", stream=True):
if event.type == "output":
msg = event.data
if isinstance(msg, list):
outputs.extend(cast(list[Message], msg))
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
updates.append(event.data)
elif event.type == "magentic_orchestrator":
orchestrator_event_count += 1
assert outputs, "Expected a final output message"
assert len(outputs) >= 1
final = outputs[-1]
assert updates, "Expected a final output update"
final = updates[-1]
assert final.text == manager.FINAL_ANSWER
assert final.author_name == manager.name
assert orchestrator_event_count > 0, "Expected orchestrator events to be emitted"
async def test_magentic_final_answer_yields_update_in_streaming() -> None:
"""In streaming mode, Magentic's manager final-answer surfaces as `AgentResponseUpdate`.
Mirrors AgentExecutor's mode-aware behavior: streaming workflows produce per-chunk
`AgentResponseUpdate` events; the synthesized final answer is logically a single chunk,
so it surfaces as a single `AgentResponseUpdate`.
"""
manager = FakeManager()
workflow = MagenticBuilder(
participants=[StubAgent(manager.next_speaker_name, "first draft")],
manager=manager,
).build()
terminal: AgentResponseUpdate | None = None
async for event in workflow.run("compose summary", stream=True):
if event.type == "output":
terminal = event.data
assert isinstance(terminal, AgentResponseUpdate), (
f"Expected AgentResponseUpdate in streaming mode, got {type(terminal).__name__}"
)
assert terminal.text == manager.FINAL_ANSWER
assert terminal.author_name == manager.name
async def test_magentic_final_answer_yields_response_in_non_streaming() -> None:
"""In non-streaming mode, Magentic's manager final-answer surfaces as `AgentResponse`."""
manager = FakeManager()
workflow = MagenticBuilder(
participants=[StubAgent(manager.next_speaker_name, "first draft")],
manager=manager,
).build()
events = await workflow.run("compose summary")
outputs = [ev for ev in events if ev.type == "output"]
assert len(outputs) == 1
assert isinstance(outputs[0].data, AgentResponse)
assert outputs[0].data.messages[-1].text == manager.FINAL_ANSWER
async def test_magentic_limit_termination_yields_update_in_streaming() -> None:
"""In streaming mode, Magentic's round-limit termination surfaces as `AgentResponseUpdate`."""
manager = FakeManager(max_round_count=1)
workflow = MagenticBuilder(
participants=[DummyExec(name=manager.next_speaker_name)],
manager=manager,
).build()
terminal: AgentResponseUpdate | None = None
async for event in workflow.run("round limit test", stream=True):
if event.type == "output":
terminal = event.data
assert isinstance(terminal, AgentResponseUpdate), (
f"Expected AgentResponseUpdate in streaming mode, got {type(terminal).__name__}"
)
# Either the final answer OR the round-limit termination message — both are valid terminal states
# for max_round_count=1; the precise one depends on FakeManager's progression.
assert terminal.text
async def test_magentic_as_agent_does_not_accept_conversation() -> None:
manager = FakeManager()
writer = StubAgent(manager.next_speaker_name, "summary response")
@@ -250,7 +308,7 @@ async def test_magentic_workflow_plan_review_approval_to_completion():
assert isinstance(req_event.data, MagenticPlanReviewRequest)
completed = False
output: list[Message] | None = None
output: AgentResponseUpdate | None = None
async for ev in wf.run(stream=True, responses={req_event.request_id: req_event.data.approve()}):
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
completed = True
@@ -261,8 +319,8 @@ async def test_magentic_workflow_plan_review_approval_to_completion():
assert completed
assert output is not None
assert isinstance(output, list)
assert all(isinstance(msg, Message) for msg in output)
# Streaming mode: terminal output is AgentResponseUpdate.
assert isinstance(output, AgentResponseUpdate)
async def test_magentic_plan_review_with_revise():
@@ -333,14 +391,12 @@ async def test_magentic_orchestrator_round_limit_produces_partial_result():
None,
)
assert idle_status is not None
# Check that we got workflow output via WorkflowEvent with type "output"
# Streaming mode: terminal output is AgentResponseUpdate.
output_event = next((e for e in events if e.type == "output"), None)
assert output_event is not None
data = output_event.data
assert isinstance(data, list)
assert len(data) > 0 # type: ignore
assert data[-1].role == "assistant" # type: ignore
assert all(isinstance(msg, Message) for msg in data) # type: ignore
assert isinstance(data, AgentResponseUpdate)
assert data.role == "assistant"
async def test_magentic_checkpoint_resume_round_trip():
@@ -578,7 +634,7 @@ async def _collect_agent_responses_setup(participant: SupportsAgentRun) -> list[
# Run a bounded stream to allow one invoke and then completion
events: list[WorkflowEvent] = []
async for ev in wf.run("task", stream=True): # plan review disabled
async for ev in wf.run("task", stream=True):
events.append(ev)
# Capture streaming updates (type="output" with AgentResponseUpdate data)
if ev.type == "output" and isinstance(ev.data, AgentResponseUpdate):
@@ -753,11 +809,9 @@ async def test_magentic_stall_and_reset_reach_limits():
assert idle_status is not None
output_event = next((e for e in events if e.type == "output"), None)
assert output_event is not None
assert isinstance(output_event.data, list)
assert all(isinstance(msg, Message) for msg in output_event.data) # type: ignore
assert len(output_event.data) > 0 # type: ignore
assert output_event.data[-1].text is not None # type: ignore
assert output_event.data[-1].text == "Workflow terminated due to reaching maximum reset count." # type: ignore
# Streaming mode: terminal output is AgentResponseUpdate.
assert isinstance(output_event.data, AgentResponseUpdate)
assert output_event.data.text == "Workflow terminated due to reaching maximum reset count."
async def test_magentic_checkpoint_runtime_only() -> None:
@@ -22,6 +22,7 @@ from agent_framework import (
)
from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage
from agent_framework.orchestrations import SequentialBuilder
from typing_extensions import Never
class _EchoAgent(BaseAgent):
@@ -67,16 +68,20 @@ class _EchoAgent(BaseAgent):
return _run()
class _SummarizerExec(Executor):
"""Custom executor that summarizes by appending a short assistant message."""
class _SummarizerTerminator(Executor):
"""Custom-executor terminator that yields a synthesized summary as the workflow's final answer."""
@handler
async def summarize(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext[list[Message]]) -> None:
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse],
) -> None:
conversation = agent_response.full_conversation or []
user_texts = [m.text for m in conversation if m.role == "user"]
agents = [m.author_name or m.role for m in conversation if m.role == "assistant"]
summary = Message("assistant", [f"Summary of users:{len(user_texts)} agents:{len(agents)}"])
await ctx.send_message(list(conversation) + [summary])
await ctx.yield_output(AgentResponse(messages=[summary]))
class _InvalidExecutor(Executor):
@@ -98,58 +103,91 @@ def test_sequential_builder_validation_rejects_invalid_executor() -> None:
SequentialBuilder(participants=[_EchoAgent(id="agent1", name="A1"), _InvalidExecutor(id="invalid")]).build()
async def test_sequential_agents_append_to_context() -> None:
async def test_sequential_streaming_yields_only_last_agent_updates() -> None:
"""Streaming mode surfaces only the last agent's AgentResponseUpdate chunks as outputs.
Intermediate agents do NOT emit `output` events; only the last agent (the workflow's
output_executor) emits chunks of the final answer.
"""
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder(participants=[a1, a2]).build()
completed = False
output: list[Message] | None = None
update_events: list[AgentResponseUpdate] = []
async for ev in wf.run("hello sequential", stream=True):
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
completed = True
elif ev.type == "output":
output = ev.data # type: ignore[assignment]
if completed and output is not None:
update_events.append(ev.data) # type: ignore[arg-type]
if completed:
break
assert completed
assert output is not None
assert isinstance(output, list)
msgs: list[Message] = output
assert len(msgs) == 3
assert msgs[0].role == "user" and "hello sequential" in msgs[0].text
assert msgs[1].role == "assistant" and (msgs[1].author_name == "A1" or True)
assert msgs[2].role == "assistant" and (msgs[2].author_name == "A2" or True)
assert "A1 reply" in msgs[1].text
assert "A2 reply" in msgs[2].text
# Only the last agent's streaming chunks surface as `output` events.
assert update_events, "Expected at least one streaming update from the last agent"
for upd in update_events:
assert isinstance(upd, AgentResponseUpdate)
combined_text = "".join(u.text for u in update_events if hasattr(u, "text"))
assert "A2 reply" in combined_text
assert "A1 reply" not in combined_text
async def test_sequential_non_streaming_yields_only_last_agent_response() -> None:
"""Non-streaming mode emits a single `output` event with the last agent's AgentResponse."""
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder(participants=[a1, a2]).build()
output_events = [ev for ev in await wf.run("hello sequential") if ev.type == "output"]
assert len(output_events) == 1
response = output_events[0].data
assert isinstance(response, AgentResponse)
assert all(m.role == "assistant" for m in response.messages)
combined = " ".join(m.text for m in response.messages)
assert "A2 reply" in combined
assert "A1 reply" not in combined
async def test_sequential_as_agent_returns_only_last_agent_response() -> None:
"""`workflow.as_agent().run(prompt)` returns ONLY the last agent's messages — not the user
input or earlier agents' replies. This is the core fix for the orchestration-as-agent
output contract."""
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
agent = SequentialBuilder(participants=[a1, a2]).build().as_agent()
response = await agent.run("hello as_agent")
assert isinstance(response, AgentResponse)
# Only the last agent's reply — no user prompt, no agent1 messages.
combined = " ".join(m.text for m in response.messages)
assert "A2 reply" in combined
assert "A1 reply" not in combined
assert "hello as_agent" not in combined
async def test_sequential_with_custom_executor_summary() -> None:
"""A custom-executor terminator yields its own AgentResponse — that becomes the workflow output.
Custom executors used as the terminator must call `ctx.yield_output(AgentResponse(...))`
directly (rather than `ctx.send_message(list[Message])` like an intermediate executor would),
because the terminator IS the workflow's output executor.
"""
a1 = _EchoAgent(id="agent1", name="A1")
summarizer = _SummarizerExec(id="summarizer")
summarizer = _SummarizerTerminator(id="summarizer")
wf = SequentialBuilder(participants=[a1, summarizer]).build()
completed = False
output: list[Message] | None = None
async for ev in wf.run("topic X", stream=True):
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
completed = True
elif ev.type == "output":
output = ev.data
if completed and output is not None:
break
assert completed
assert output is not None
msgs: list[Message] = output
# Expect: [user, A1 reply, summary]
assert len(msgs) == 3
assert msgs[0].role == "user"
assert msgs[1].role == "assistant" and "A1 reply" in msgs[1].text
assert msgs[2].role == "assistant" and msgs[2].text.startswith("Summary of users:")
output_events = [ev for ev in await wf.run("topic X") if ev.type == "output"]
assert len(output_events) == 1
response = output_events[0].data
assert isinstance(response, AgentResponse)
assert len(response.messages) == 1
assert response.messages[0].role == "assistant"
assert response.messages[0].text.startswith("Summary of users:")
async def test_sequential_checkpoint_resume_round_trip() -> None:
@@ -158,14 +196,14 @@ async def test_sequential_checkpoint_resume_round_trip() -> None:
initial_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf = SequentialBuilder(participants=list(initial_agents), checkpoint_storage=storage).build()
baseline_output: list[Message] | None = None
baseline_updates: list[AgentResponseUpdate] = []
async for ev in wf.run("checkpoint sequential", stream=True):
if ev.type == "output":
baseline_output = ev.data # type: ignore[assignment]
baseline_updates.append(ev.data) # type: ignore[arg-type]
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
break
assert baseline_output is not None
assert baseline_updates
checkpoints = await storage.list_checkpoints(workflow_name=wf.name)
assert checkpoints
@@ -175,19 +213,20 @@ async def test_sequential_checkpoint_resume_round_trip() -> None:
resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf_resume = SequentialBuilder(participants=list(resumed_agents), checkpoint_storage=storage).build()
resumed_output: list[Message] | None = None
resumed_updates: list[AgentResponseUpdate] = []
async for ev in wf_resume.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True):
if ev.type == "output":
resumed_output = ev.data # type: ignore[assignment]
resumed_updates.append(ev.data) # type: ignore[arg-type]
if ev.type == "status" and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
):
break
assert resumed_output is not None
assert [m.role for m in resumed_output] == [m.role for m in baseline_output]
assert [m.text for m in resumed_output] == [m.text for m in baseline_output]
assert resumed_updates
baseline_text = "".join(u.text for u in baseline_updates if hasattr(u, "text"))
resumed_text = "".join(u.text for u in resumed_updates if hasattr(u, "text"))
assert baseline_text == resumed_text
async def test_sequential_checkpoint_runtime_only() -> None:
@@ -197,14 +236,14 @@ async def test_sequential_checkpoint_runtime_only() -> None:
agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf = SequentialBuilder(participants=list(agents)).build()
baseline_output: list[Message] | None = None
baseline_updates: list[AgentResponseUpdate] = []
async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True):
if ev.type == "output":
baseline_output = ev.data # type: ignore[assignment]
baseline_updates.append(ev.data) # type: ignore[arg-type]
if ev.type == "status" and ev.state == WorkflowRunState.IDLE:
break
assert baseline_output is not None
assert baseline_updates
checkpoints = await storage.list_checkpoints(workflow_name=wf.name)
assert checkpoints
@@ -214,21 +253,22 @@ async def test_sequential_checkpoint_runtime_only() -> None:
resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2"))
wf_resume = SequentialBuilder(participants=list(resumed_agents)).build()
resumed_output: list[Message] | None = None
resumed_updates: list[AgentResponseUpdate] = []
async for ev in wf_resume.run(
checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage, stream=True
):
if ev.type == "output":
resumed_output = ev.data # type: ignore[assignment]
resumed_updates.append(ev.data) # type: ignore[arg-type]
if ev.type == "status" and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
):
break
assert resumed_output is not None
assert [m.role for m in resumed_output] == [m.role for m in baseline_output]
assert [m.text for m in resumed_output] == [m.text for m in baseline_output]
assert resumed_updates
baseline_text = "".join(u.text for u in baseline_updates if hasattr(u, "text"))
resumed_text = "".join(u.text for u in resumed_updates if hasattr(u, "text"))
assert baseline_text == resumed_text
async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None:
@@ -390,3 +430,47 @@ async def test_chain_only_agent_responses_three_agents() -> None:
# a3 should see only A2's reply
assert len(a3.last_messages) == 1
assert a3.last_messages[0].role == "assistant" and "A2 reply" in (a3.last_messages[0].text or "")
# ---------------------------------------------------------------------------
# with_request_info tests
# ---------------------------------------------------------------------------
async def test_sequential_request_info_last_participant_emits_output() -> None:
"""When the last participant is wrapped via with_request_info(), the workflow
still emits a terminal output event after approval.
This exercises the _EndWithConversation.end_with_agent_executor_response path
that converts the AgentApprovalExecutor's forwarded AgentExecutorResponse into
the workflow's final AgentResponse output.
"""
from agent_framework_orchestrations._orchestration_request_info import AgentRequestInfoResponse
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder(participants=[a1, a2]).with_request_info().build()
# First run: collect request_info events for both agents
request_events: list[Any] = []
async for ev in wf.run("hello with approval", stream=True):
if ev.type == "request_info" and isinstance(ev.data, AgentExecutorResponse):
request_events.append(ev)
# Approve each agent in sequence until the workflow completes
while request_events:
responses = {req.request_id: AgentRequestInfoResponse.approve() for req in request_events}
request_events = []
output_events: list[Any] = []
async for ev in wf.run(stream=True, responses=responses):
if ev.type == "request_info" and isinstance(ev.data, AgentExecutorResponse):
request_events.append(ev)
elif ev.type == "output":
output_events.append(ev)
# The workflow must produce a terminal output with the last agent's response.
assert len(output_events) == 1
response = output_events[0].data
assert isinstance(response, AgentResponse)
assert any("A2 reply" in m.text for m in response.messages)
@@ -26,8 +26,8 @@ Note on internal adapters:
You can safely ignore them when focusing on agent progress.
Prerequisites:
- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- FOUNDRY_MODEL must be set to your Azure OpenAI model deployment name.
- FOUNDRY_PROJECT_ENDPOINT must be set to the Azure Foundry project endpoint.
- FOUNDRY_MODEL must be set to the model name for the Foundry chat client.
"""
@@ -68,28 +68,18 @@ async def main() -> None:
"""
Sample Output:
===== Final Conversation =====
===== Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend lessyour affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
===== as_agent() Conversation =====
------------------------------------------------------------
01 [writer]
Go electric, save bigyour affordable ride awaits!
------------------------------------------------------------
02 [reviewer]
01 [reviewer]
Catchy and straightforward! The tagline clearly emphasizes both the electric aspect and the affordability of the
eBike. It's inviting and actionable. For even more impact, consider making it slightly shorter:
"Go electric, save big." Overall, this is an effective and appealing suggestion for a budget-friendly eBike.
Note:
`workflow.as_agent()` returns ONLY the final agent's response (the "answer") — the prior agents' work
is not included in the response. To observe intermediate agents while running as an agent, build with
`SequentialBuilder(participants=[...], intermediate_outputs=True)`; the intermediate replies are then
surfaced as `data` events and merged into the AgentResponse.
"""
@@ -2,11 +2,11 @@
import asyncio
import os
from typing import Any
from agent_framework import (
Agent,
AgentExecutorResponse,
AgentResponse,
Executor,
Message,
WorkflowContext,
@@ -16,6 +16,7 @@ from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from typing_extensions import Never
# Load environment variables from .env file
load_dotenv()
@@ -25,13 +26,14 @@ Sample: Sequential workflow mixing agents and a custom summarizer executor
This demonstrates how SequentialBuilder chains participants with a shared
conversation context (list[Message]). An agent produces content; a custom
executor appends a compact summary to the conversation. The workflow completes
after all participants have executed in sequence, and the final output contains
the complete conversation.
executor synthesizes a compact summary and yields it as the workflow's terminal
output.
Custom executor contract:
- Provide at least one @handler accepting AgentExecutorResponse and a WorkflowContext[list[Message]]
- Emit the updated conversation via ctx.send_message([...])
- Intermediate custom executors: handle the message type from the prior participant
and forward `list[Message]` via `ctx.send_message(...)` for the next participant.
- Terminator custom executors: handle the message type from the prior participant and
yield the workflow's final answer as an `AgentResponse` via `ctx.yield_output(...)`.
Prerequisites:
- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
@@ -41,27 +43,29 @@ Prerequisites:
class Summarizer(Executor):
"""Simple summarizer: consumes full conversation and appends an assistant summary."""
"""Terminator custom executor: synthesizes a one-line summary as the workflow's final answer."""
@handler
async def summarize(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext[list[Message]]) -> None:
"""Append a summary message to a copy of the full conversation.
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse],
) -> None:
"""Yield a terminal AgentResponse containing the summary.
Note: A custom executor must be able to handle the message type from the prior participant, and produce
the message type expected by the next participant. In this case, the prior participant is an agent thus
the input is AgentExecutorResponse (an agent will be wrapped in an AgentExecutor, which produces
`AgentExecutorResponse`). If the next participant is also an agent or this is the final participant,
the output must be `list[Message]`.
The prior participant is an agent, which is wrapped in an `AgentExecutor` that
produces `AgentExecutorResponse`. As the last participant in the sequential workflow,
this executor calls `ctx.yield_output(AgentResponse(...))` so its output becomes the
workflow's terminal output (rather than being forwarded to a downstream participant).
"""
if not agent_response.full_conversation:
await ctx.send_message([Message("assistant", ["No conversation to summarize."])])
await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
return
users = sum(1 for m in agent_response.full_conversation if m.role == "user")
assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
final_conversation = list(agent_response.full_conversation) + [summary]
await ctx.send_message(final_conversation)
await ctx.yield_output(AgentResponse(messages=[summary]))
async def main() -> None:
@@ -81,33 +85,20 @@ async def main() -> None:
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()
# 3) Run workflow and extract final conversation
# 3) Run workflow and extract the final summary
events = await workflow.run("Explain the benefits of budget eBikes for commuters.")
outputs = events.get_outputs()
if outputs:
print("===== Final Conversation =====")
messages: list[Message] | Any = outputs[0]
for i, msg in enumerate(messages, start=1):
name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")
print("===== Final Summary =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
print(msg.text)
"""
Sample Output:
------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
===== Final Summary =====
Summary -> users:1 assistants:1
"""
+1 -1
View File
@@ -585,7 +585,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "agent-framework-core", editable = "packages/core" },
{ name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = "<=0.2.1,>=0.2.1" },
{ name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = ">=0.2.1,<=0.2.1" },
]
[[package]]