Files
Evan Mattson 3bbc81554b Python: Improve the handling of intermediate outputs for workflows and orchestrations (#5623)
* Improve the handling of intermediate outputs for workflows and orchestrations

* Address PR review feedback on intermediate output forwarding

- Switch workflow.as_agent() forwarding to an explicit allowlist of {output,
  intermediate, data, request_info} so orchestration-internal events
  (group_chat, handoff_sent, magentic_orchestrator) stay inside the workflow
  instead of leaking into agent responses via str(data) coercion.
- Stop raising on intermediate AgentResponseUpdate in non-streaming run();
  surface the partial as a Message with text_reasoning content. The defensive
  raise still applies to terminal output events, where Update payloads would
  corrupt message ordering.
- Extend the DevUI workflow-event mapper so intermediate yields wrapping
  plain strings, Messages, and list[Message] render as visible output items
  instead of generic completed-trace events.
- Add orchestration coverage for GroupChat, Handoff, and Magentic builders
  (default vs intermediate_outputs=True; structural where end-to-end is heavy).

* Lift output-designation policy into a value type

Replace the ``Workflow._output_executors`` list and the
``RunnerContext.should_label_as_intermediate`` Protocol method with a single
immutable ``OutputDesignation`` value type owned by ``Workflow``. Thread the
designation as a parameter through the existing call chain (Runner ->
EdgeRunner -> Executor -> WorkflowContext) so ``yield_output`` consults the
threaded snapshot directly rather than calling back into the runner context.

Removes the ``InProcRunnerContext._workflow`` back-reference and the
``WorkflowBuilder.build()`` assignment that wired it up. Adds the public
predicate ``Workflow.is_terminal_executor(executor_id)`` for external
observers; ``OutputDesignation`` itself stays package-internal.

Key decisions
- ``OutputDesignation.designated`` is ``frozenset[str] | None`` -- ``None``
  preserves legacy "every yield is type='output'" behavior, any frozenset
  (including empty) opts into strict mode. The ``DeprecationWarning`` for
  legacy mode at build time is unchanged.
- ``output_designation`` is an optional parameter on ``Runner``,
  ``EdgeRunner.send_message``, ``EdgeRunner._execute_on_target``,
  ``Executor.execute``, ``Executor._create_context_for_handler``, and
  ``WorkflowContext.__init__``. Each defaults to legacy ``OutputDesignation()``
  so direct callers (Azure Functions ``CapturingRunnerContext``,
  ``test_runner`` recording fixtures) keep working without ceremony.
- The workflow-level filter in ``_run_core`` reads ``self._output_designation``
  live, preserving today's semantics where mutating the designation after
  build still affects subsequent runs (used by two existing tests).
- ``Workflow.to_dict()`` continues to emit ``"output_executors":
  list[str] | None`` (sorted from the frozenset). Checkpoint format unchanged.

Files changed
- _workflow.py: add ``OutputDesignation`` dataclass; replace
  ``_output_executors`` with ``_output_designation``; add
  ``is_terminal_executor``; delete ``_should_yield_output_event``.
- _runner_context.py: drop ``should_label_as_intermediate`` Protocol method
  and ``InProcRunnerContext`` impl; drop ``_workflow`` back-reference.
- _workflow_builder.py: remove ``context._workflow = workflow`` assignment.
- _runner.py, _edge_runner.py, _executor.py, _workflow_context.py: thread
  ``output_designation`` parameter through the call chain.
- tests/workflow/test_output_designation.py (new): three-state coverage of
  the value type plus the public predicate delegation.
- tests/workflow/test_workflow_builder.py, test_validation.py,
  test_workflow.py, test_runner.py and
  orchestrations/tests/test_orchestration_intermediate_vs_terminal.py:
  switch probes from ``_output_executors`` set checks to
  ``get_output_executors`` / ``is_terminal_executor``; update two
  post-build mutation tests to set ``_output_designation`` instead.

Verification
- core/tests/workflow/, orchestrations/tests/, azurefunctions/tests/:
  1119 passed, 42 skipped, 2 xfailed.
- ``uv run poe lint``: clean.
- ``uv run poe typing``: only the pre-existing
  ``_AGENT_FORWARDED_EVENT_TYPES`` pyright warning from 394bcd607 remains.

Notes for next iteration
- The builder's own ``_output_executors`` attribute (``list[Executor |
  SupportsAgentRun]``) is intentionally untouched; the issue scoped the
  rename to the workflow attribute.
- Adjacent review candidates (twin ``WorkflowAgent`` translators,
  ``_AGENT_FORWARDED_EVENT_TYPES`` kind classifier,
  ``_event_origin_context`` ContextVar removal, ``WorkflowEvent`` ADT
  split, legacy-mode removal) remain out of scope.

* Add explicit workflow output designation

Key decisions

- Extend the internal OutputDesignation value type from terminal-only membership to output/intermediate/hidden classification. Legacy mode remains outputs=None, so workflows built without output_executors or intermediate_executors still label every yield_output as type='output'.

- WorkflowBuilder now accepts intermediate_executors. Providing either designation enters explicit mode; output executors emit output, intermediate executors emit intermediate, and unlisted yield_output payloads are hidden from caller-facing events while remaining in executor_completed data.

- Empty explicit designation, duplicate entries, overlaps, unknown executors, and designated executors without workflow output annotations fail build validation. Existing orchestration builders pass intermediate-capable participants through intermediate_executors to preserve current intermediate_outputs behavior until participant-oriented designation lands.

Files changed

- packages/core/agent_framework/_workflows/_workflow.py, _workflow_builder.py, _workflow_context.py, _validation.py, _events.py

- packages/core/tests/workflow/test_output_designation.py, test_output_executors_contract.py, test_strict_mode_event_labeling.py, test_validation.py, test_workflow.py, test_workflow_agent_intermediate.py

- packages/orchestrations/agent_framework_orchestrations/_sequential.py, _concurrent.py, _group_chat.py, _magentic.py

- packages/core/AGENTS.md

Verification

- uv run pytest packages/core/tests/workflow packages/orchestrations/tests packages/devui/tests/devui/test_mapper.py -q

- uv run pytest packages/azurefunctions/tests -q

- uv run poe lint

- uv run poe typing fails only on pre-existing packages/core/agent_framework/_workflows/_agent.py _AGENT_FORWARDED_EVENT_TYPES private-use pyright error.

Notes for next iteration

- issues/03-core-workflow-explicit-designation.md was moved to issues/done but issues/ remains untracked and intentionally excluded from this commit.

- Slice 4 should tighten workflow.as_agent() mapping for hidden emissions and streaming-only update payloads; Slice 5 should replace orchestration intermediate_outputs with participant-oriented designation.

* Tighten workflow-as-agent output mapping

Key decisions

- Treat AgentResponseUpdate as a streaming-only payload across the workflow.as_agent() adapter, so non-streaming agent runs now reject both terminal output and intermediate workflow events carrying updates.
- Keep streaming classification behavior explicit: terminal update payloads remain normal text content, while intermediate update payloads are rewritten to text_reasoning content.
- Add explicit-mode coverage proving hidden yield_output emissions do not appear in non-streaming AgentResponse messages or streaming AgentResponseUpdate chunks.

Files changed

- packages/core/agent_framework/_workflows/_agent.py
- packages/core/tests/workflow/test_workflow_agent_intermediate.py

Verification

- uv run pytest packages/core/tests/workflow/test_workflow_agent_intermediate.py -q
- uv run pytest packages/core/tests/workflow/test_workflow_agent.py packages/core/tests/workflow/test_workflow_agent_intermediate.py -q
- uv run pytest packages/core/tests/workflow packages/orchestrations/tests packages/devui/tests/devui/test_mapper.py -q
- uv run poe lint
- uv run poe typing fails only on the pre-existing packages/core/agent_framework/_workflows/_agent.py _AGENT_FORWARDED_EVENT_TYPES private-use pyright error.

Blockers or notes for next iteration

- issues/04-workflow-as-agent-output-mapping.md was moved to issues/done/ but issues/ remains untracked and intentionally excluded from this commit.
- Slice 5 should replace orchestration intermediate_outputs with participant-oriented designation.

* Add orchestration participant output designation

Key decisions

- Replace orchestration intermediate_outputs with participant-oriented output_participants and intermediate_participants across Sequential, Concurrent, GroupChat, Magentic, and Handoff builders.
- Keep synthetic final executors terminal by default for Concurrent, GroupChat, and Magentic; keep Sequential's final participant terminal by default; keep Handoff participants terminal by default.
- Centralize participant designation validation for empty explicit designation, duplicates, overlaps, and unknown participants, then map validated participants to workflow output/intermediate executors.

Files changed

- packages/orchestrations/agent_framework_orchestrations/_participant_designation.py
- packages/orchestrations/agent_framework_orchestrations/_sequential.py
- packages/orchestrations/agent_framework_orchestrations/_concurrent.py
- packages/orchestrations/agent_framework_orchestrations/_group_chat.py
- packages/orchestrations/agent_framework_orchestrations/_magentic.py
- packages/orchestrations/agent_framework_orchestrations/_handoff.py
- packages/orchestrations/tests/test_orchestration_intermediate_vs_terminal.py
- packages/orchestrations/tests/test_magentic.py

Blockers or notes for next iteration

- issues/05-orchestration-participant-designation.md was moved to issues/done/ but issues/ remains untracked and intentionally excluded from this commit.
- Slice 7 should migrate samples and docs away from intermediate_outputs to the new participant designation API.
- uv run poe typing still fails only on the pre-existing packages/core/agent_framework/_workflows/_agent.py _AGENT_FORWARDED_EVENT_TYPES private-use pyright error.

* Migrate samples to explicit output designation

Key decisions

- Replace sample usage of the removed orchestration intermediate_outputs boolean with participant-oriented intermediate_participants designation.
- Update raw workflow guidance to show output_executors together with intermediate_executors, and document that unlisted yields are hidden in explicit designation mode.
- Keep orchestration final outputs terminal while streaming designated participant responses as intermediate progress, including workflow.as_agent() samples where intermediates map to text_reasoning content.
- Refresh workflow and orchestration README guidance plus the changelog reference so public docs no longer point users at intermediate_outputs.

Files changed

- CHANGELOG.md
- packages/orchestrations/README.md
- samples/README.md
- samples/03-workflows/README.md
- samples/03-workflows/control-flow/intermediate_vs_terminal_outputs.py
- samples/03-workflows/orchestrations/README.md
- samples/03-workflows/orchestrations/group_chat_agent_manager.py
- samples/03-workflows/orchestrations/group_chat_philosophical_debate.py
- samples/03-workflows/orchestrations/group_chat_simple_selector.py
- samples/03-workflows/orchestrations/magentic.py
- samples/03-workflows/orchestrations/magentic_human_plan_review.py
- samples/03-workflows/orchestrations/sequential_chain_only_agent_responses.py
- samples/03-workflows/agents/group_chat_workflow_as_agent.py
- samples/03-workflows/agents/magentic_workflow_as_agent.py
- samples/03-workflows/agents/sequential_workflow_as_agent.py
- samples/semantic-kernel-migration/orchestrations/group_chat.py
- samples/semantic-kernel-migration/orchestrations/magentic.py

Blockers or notes for next iteration

- issues/07-samples-and-docs-explicit-output-designation.md was moved to issues/done/ but issues/ remains untracked and intentionally excluded from this commit.
- issues/06-devui-intermediate-event-rendering.md remains present and appears already satisfied by existing DevUI mapper/tests from the prior implementation slice.
- PRD-explicit-workflow-output-designation.md remains untracked and intentionally excluded from this commit.

* Render DevUI intermediate workflow outputs

Key decisions

- Preserve workflow output designation metadata on visible DevUI output messages and text deltas so intermediate/data emissions remain distinguishable from terminal output.
- Render intermediate workflow message items in the execution timeline using executor metadata, while excluding them from the final workflow result aggregation.
- Keep terminal output message rendering unchanged and retain legacy data events on the intermediate compatibility path.

Files changed

- packages/devui/agent_framework_devui/_mapper.py
- packages/devui/frontend/src/components/features/workflow/execution-timeline.tsx
- packages/devui/frontend/src/components/features/workflow/workflow-view.tsx
- packages/devui/frontend/src/types/openai.ts
- packages/devui/tests/devui/test_mapper.py

Blockers or notes for next iteration

- issues/06-devui-intermediate-event-rendering.md was moved to issues/done/ but issues/ remains untracked and intentionally excluded from this commit.
- PRD-explicit-workflow-output-designation.md remains untracked and intentionally excluded from this commit.
- uv run poe typing still fails only on the pre-existing packages/core/agent_framework/_workflows/_agent.py _AGENT_FORWARDED_EVENT_TYPES private-use pyright error.

* Fix mypy

* Clarify orchestration participant output config

* Rename participant output kwargs for clarity

output_participants -> final_output_from, intermediate_participants ->
intermediate_output_from. The old names read like categories of
participant; the new names make it clear the kwarg designates which
participants' outputs surface as final vs. intermediate events.

* Rename core workflow output kwargs with deprecation shim

Adds final_output_from / intermediate_output_from as canonical kwargs on
Workflow and WorkflowBuilder. Old output_executors / intermediate_executors
kwargs continue to work but emit DeprecationWarning via a shared coalesce
helper that also rejects supplying both. Wire-format keys in to_dict()
stay as output_executors / intermediate_executors so checkpoint
compatibility is preserved.

Internal call sites in orchestrations and samples updated to the new
names so users following sample code learn the canonical vocabulary;
legacy callers still work with a one-shot warning.

* Suppress pyright reportPrivateUsage on cross-module sentinel import

* Update docstrings

* Propagate sub-workflow intermediate outputs, fix handoff/sequential intermediate-only designation, and shore up tests, sample, and docstrings around the intermediate output contract.

* Add canonical workflow output_from selection

Key decisions:\n- Make output_from the canonical workflow-output allow-list and keep output_executors/final_output_from as deprecated compatibility aliases.\n- Treat empty output_from/intermediate_output_from lists as explicit selections and keep validation responsible for empty, duplicate, overlap, and unknown selections.\n- Remove the branch-only public intermediate_executors WorkflowBuilder kwarg while preserving legacy wire keys in to_dict().\n\nFiles changed:\n- packages/core/agent_framework/_workflows/_workflow.py\n- packages/core/agent_framework/_workflows/_workflow_builder.py\n- packages/core/agent_framework/_workflows/_workflow_context.py\n- packages/core/agent_framework/_workflows/_agent.py\n- packages/core/agent_framework/_workflows/_agent_executor.py\n- packages/core/tests/workflow/* output-selection coverage updates\n- packages/core/AGENTS.md\n- issues/done/001-canonical-list-based-output-selection.md\n\nBlockers/notes:\n- Orchestration builders still pass final_output_from internally; follow-up issue 004 should migrate them to output_from.\n- Legacy omitted-selection behavior and explicit all/all_other literals are left for issues 002 and 003.

* Add explicit all workflow output selection

Key decisions:
- Treat output_from='all' as an explicit workflow-output selection sentinel and expand it at build time to executors with declared workflow output types.
- Keep omitted output selections in legacy all-output mode with a deprecation warning that names output_from and intermediate_output_from and points to output_from='all'.
- Reject intermediate_output_from='all' at construction because the all-output literal is output-only for this issue.

Files changed:
- packages/core/agent_framework/_workflows/_workflow_builder.py
- packages/core/tests/workflow/test_output_executors_contract.py
- issues/done/002-explicit-all-output-and-legacy-migration.md

Blockers/notes:
- all_other intermediate-output selection remains for issue 003.
- Workflow-as-agent/orchestration parity remains for issue 004.

* Add all-other intermediate output selection

Key decisions:
- Treat intermediate_output_from='all_other' as an explicit intermediate-output selection sentinel and expand it at build time after the workflow graph is complete.
- Expand all_other to output-capable executors not selected by output_from; omitted or empty output_from selects no workflow outputs, while output_from='all' leaves an empty intermediate selection.
- Keep output_from='all_other' invalid so all_other remains intermediate-output-only and runtime classification still receives concrete executor-id sets.

Files changed:
- packages/core/agent_framework/_workflows/_workflow_builder.py
- packages/core/tests/workflow/test_output_executors_contract.py
- issues/done/003-all-other-intermediate-output-selection.md

Blockers/notes:
- Workflow-as-agent and orchestration parity remains for issue 004.
- Full documentation updates remain for issue 005.

* Add orchestration output selection parity

Key decisions:
- Expose output_from on sequential, concurrent, group chat, handoff, and magentic builders while keeping final_output_from as a deprecated compatibility alias.
- Resolve orchestration participant selections through the same explicit rules as workflows: output_from='all', intermediate_output_from='all_other', hidden unselected participant payloads, and overlap/duplicate/unknown/invalid-literal validation.
- Continue preserving documented orchestration defaults by always designating each pattern's terminal internal executor where applicable.

Files changed:
- packages/orchestrations/agent_framework_orchestrations/_participant_output_config.py
- packages/orchestrations/agent_framework_orchestrations/_sequential.py
- packages/orchestrations/agent_framework_orchestrations/_concurrent.py
- packages/orchestrations/agent_framework_orchestrations/_group_chat.py
- packages/orchestrations/agent_framework_orchestrations/_handoff.py
- packages/orchestrations/agent_framework_orchestrations/_magentic.py
- packages/orchestrations/agent_framework_orchestrations/_orchestration_request_info.py
- packages/orchestrations/tests/test_orchestration_intermediate_vs_terminal.py
- issues/done/004-workflow-as-agent-and-orchestration-parity.md

Blockers/notes:
- Full documentation and sample migration wording remains for issue 005.
- Existing tests that intentionally use final_output_from now emit the new deprecation warning.

* Document workflow output selection contract

Key decisions:
- Use Workflow Output and Intermediate Output as the developer-facing terms for selected caller-facing emissions.
- Document output_from and intermediate_output_from as the canonical API, with output_from as an allow-list and unselected payloads hidden unless explicitly selected as intermediate.
- Add scenario and invalid-selection tables for workflow and orchestration docs, including legacy omission warnings, output_from='all', intermediate_output_from='all_other', list selections, invalid literals, overlap, duplicates, unknown selections, and empty explicit selections.
- Migrate samples away from final_output_from and output_executors except where compatibility aliases are explicitly documented.

Files changed:
- packages/core/AGENTS.md
- packages/orchestrations/README.md
- packages/orchestrations/agent_framework_orchestrations/_handoff.py
- packages/orchestrations/agent_framework_orchestrations/_sequential.py
- samples/03-workflows/README.md
- samples/03-workflows/control-flow/intermediate_vs_terminal_outputs.py
- samples/03-workflows/human-in-the-loop/agents_with_approval_requests.py
- samples/03-workflows/orchestrations/README.md
- samples/04-hosting/foundry-hosted-agents/responses/05_workflows/main.py
- scripts/sample_validation/create_dynamic_workflow_executor.py
- issues/done/005-document-output-selection-contract.md

Blockers/notes:
- Direct full Ruff on scripts/sample_validation/create_dynamic_workflow_executor.py still reports pre-existing docstring/print/line-length issues outside this docs migration; syntax-focused checks for changed files pass.
- No remaining AFK issue files are present under issues/.

* Latest updates

* Typing fixes

* Cleanup
2026-05-19 00:15:25 +00:00

451 lines
17 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Unit tests for workflow utility functions."""
from dataclasses import dataclass
from unittest.mock import Mock
import pytest
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
AgentResponse,
Message,
WorkflowEvent,
WorkflowMessage,
)
from pydantic import BaseModel
from agent_framework_azurefunctions._context import CapturingRunnerContext
from agent_framework_azurefunctions._serialization import (
deserialize_value,
reconstruct_to_type,
serialize_value,
strip_pickle_markers,
)
# Module-level test types (must be importable for checkpoint encoding roundtrip)
@dataclass
class SampleData:
"""Sample dataclass for testing checkpoint encoding roundtrip."""
name: str
value: int
class SampleModel(BaseModel):
"""Sample Pydantic model for testing checkpoint encoding roundtrip."""
title: str
count: int
@dataclass
class DataclassWithPydanticField:
"""Dataclass containing a Pydantic model field for testing nested serialization."""
label: str
model: SampleModel
class TestCapturingRunnerContext:
"""Test suite for CapturingRunnerContext."""
@pytest.fixture
def context(self) -> CapturingRunnerContext:
"""Create a fresh CapturingRunnerContext for each test."""
return CapturingRunnerContext()
@pytest.mark.asyncio
async def test_send_message_captures_message(self, context: CapturingRunnerContext) -> None:
"""Test that send_message captures messages correctly."""
message = WorkflowMessage(data="test data", target_id="target_1", source_id="source_1")
await context.send_message(message)
messages = await context.drain_messages()
assert "source_1" in messages
assert len(messages["source_1"]) == 1
assert messages["source_1"][0].data == "test data"
@pytest.mark.asyncio
async def test_send_multiple_messages_groups_by_source(self, context: CapturingRunnerContext) -> None:
"""Test that messages are grouped by source_id."""
msg1 = WorkflowMessage(data="msg1", target_id="target", source_id="source_a")
msg2 = WorkflowMessage(data="msg2", target_id="target", source_id="source_a")
msg3 = WorkflowMessage(data="msg3", target_id="target", source_id="source_b")
await context.send_message(msg1)
await context.send_message(msg2)
await context.send_message(msg3)
messages = await context.drain_messages()
assert len(messages["source_a"]) == 2
assert len(messages["source_b"]) == 1
@pytest.mark.asyncio
async def test_drain_messages_clears_messages(self, context: CapturingRunnerContext) -> None:
"""Test that drain_messages clears the message store."""
message = WorkflowMessage(data="test", target_id="t", source_id="s")
await context.send_message(message)
await context.drain_messages() # First drain
messages = await context.drain_messages() # Second drain
assert messages == {}
@pytest.mark.asyncio
async def test_has_messages_returns_correct_status(self, context: CapturingRunnerContext) -> None:
"""Test has_messages returns correct boolean."""
assert await context.has_messages() is False
await context.send_message(WorkflowMessage(data="test", target_id="t", source_id="s"))
assert await context.has_messages() is True
@pytest.mark.asyncio
async def test_add_event_queues_event(self, context: CapturingRunnerContext) -> None:
"""Test that add_event queues events correctly."""
event = WorkflowEvent("output", executor_id="exec_1", data="output")
await context.add_event(event)
events = await context.drain_events()
assert len(events) == 1
assert isinstance(events[0], WorkflowEvent)
assert events[0].type == "output"
assert events[0].data == "output"
@pytest.mark.asyncio
async def test_drain_events_clears_queue(self, context: CapturingRunnerContext) -> None:
"""Test that drain_events clears the event queue."""
await context.add_event(WorkflowEvent("output", executor_id="e", data="test"))
await context.drain_events() # First drain
events = await context.drain_events() # Second drain
assert events == []
@pytest.mark.asyncio
async def test_has_events_returns_correct_status(self, context: CapturingRunnerContext) -> None:
"""Test has_events returns correct boolean."""
assert await context.has_events() is False
await context.add_event(WorkflowEvent("output", executor_id="e", data="test"))
assert await context.has_events() is True
@pytest.mark.asyncio
async def test_next_event_waits_for_event(self, context: CapturingRunnerContext) -> None:
"""Test that next_event returns queued events."""
event = WorkflowEvent("output", executor_id="e", data="waited")
await context.add_event(event)
result = await context.next_event()
assert result.data == "waited"
def test_has_checkpointing_returns_false(self, context: CapturingRunnerContext) -> None:
"""Test that checkpointing is not supported."""
assert context.has_checkpointing() is False
def test_is_streaming_returns_false_by_default(self, context: CapturingRunnerContext) -> None:
"""Test streaming is disabled by default."""
assert context.is_streaming() is False
def test_set_streaming(self, context: CapturingRunnerContext) -> None:
"""Test setting streaming mode."""
context.set_streaming(True)
assert context.is_streaming() is True
context.set_streaming(False)
assert context.is_streaming() is False
def test_set_workflow_id(self, context: CapturingRunnerContext) -> None:
"""Test setting workflow ID."""
context.set_workflow_id("workflow-123")
assert context._workflow_id == "workflow-123"
@pytest.mark.asyncio
async def test_reset_for_new_run_clears_state(self, context: CapturingRunnerContext) -> None:
"""Test that reset_for_new_run clears all state."""
await context.send_message(WorkflowMessage(data="test", target_id="t", source_id="s"))
await context.add_event(WorkflowEvent("output", executor_id="e", data="event"))
context.set_streaming(True)
context.reset_for_new_run()
assert await context.has_messages() is False
assert await context.has_events() is False
assert context.is_streaming() is False
@pytest.mark.asyncio
async def test_create_checkpoint_raises_not_implemented(self, context: CapturingRunnerContext) -> None:
"""Test that checkpointing methods raise NotImplementedError."""
from agent_framework._workflows._state import State
with pytest.raises(NotImplementedError):
await context.create_checkpoint("test_workflow", "abc123", State(), None, 1)
@pytest.mark.asyncio
async def test_load_checkpoint_raises_not_implemented(self, context: CapturingRunnerContext) -> None:
"""Test that load_checkpoint raises NotImplementedError."""
with pytest.raises(NotImplementedError):
await context.load_checkpoint("some-id")
@pytest.mark.asyncio
async def test_apply_checkpoint_raises_not_implemented(self, context: CapturingRunnerContext) -> None:
"""Test that apply_checkpoint raises NotImplementedError."""
with pytest.raises(NotImplementedError):
await context.apply_checkpoint(Mock())
class TestSerializationRoundtrip:
"""Test that serialization roundtrips correctly for types used in Azure Functions workflows."""
def test_roundtrip_chat_message(self) -> None:
"""Test Message survives encode → decode roundtrip."""
original = Message(role="user", contents=["Hello"])
encoded = serialize_value(original)
decoded = deserialize_value(encoded)
assert isinstance(decoded, Message)
assert decoded.role == "user"
def test_roundtrip_agent_executor_request(self) -> None:
"""Test AgentExecutorRequest with nested Messages roundtrips."""
original = AgentExecutorRequest(
messages=[Message(role="user", contents=["Hi"])],
should_respond=True,
)
encoded = serialize_value(original)
decoded = deserialize_value(encoded)
assert isinstance(decoded, AgentExecutorRequest)
assert len(decoded.messages) == 1
assert isinstance(decoded.messages[0], Message)
assert decoded.should_respond is True
def test_roundtrip_agent_executor_response(self) -> None:
"""Test AgentExecutorResponse with nested AgentResponse roundtrips."""
original = AgentExecutorResponse(
executor_id="test_exec",
agent_response=AgentResponse(messages=[Message(role="assistant", contents=["Reply"])]),
full_conversation=[Message(role="assistant", contents=["Reply"])],
)
encoded = serialize_value(original)
decoded = deserialize_value(encoded)
assert isinstance(decoded, AgentExecutorResponse)
assert decoded.executor_id == "test_exec"
assert isinstance(decoded.agent_response, AgentResponse)
def test_roundtrip_dataclass(self) -> None:
"""Test custom dataclass roundtrips."""
original = SampleData(name="test", value=42)
encoded = serialize_value(original)
decoded = deserialize_value(encoded)
assert isinstance(decoded, SampleData)
assert decoded.name == "test"
assert decoded.value == 42
def test_roundtrip_pydantic_model(self) -> None:
"""Test Pydantic model roundtrips."""
original = SampleModel(title="Hello", count=5)
encoded = serialize_value(original)
decoded = deserialize_value(encoded)
assert isinstance(decoded, SampleModel)
assert decoded.title == "Hello"
assert decoded.count == 5
def test_roundtrip_primitives(self) -> None:
"""Test primitives pass through unchanged."""
assert serialize_value(None) is None
assert serialize_value("hello") == "hello"
assert serialize_value(42) == 42
assert serialize_value(3.14) == 3.14
assert serialize_value(True) is True
def test_roundtrip_list_of_objects(self) -> None:
"""Test list of typed objects roundtrips."""
original = [
Message(role="user", contents=["Q"]),
Message(role="assistant", contents=["A"]),
]
encoded = serialize_value(original)
decoded = deserialize_value(encoded)
assert isinstance(decoded, list)
assert len(decoded) == 2
assert all(isinstance(m, Message) for m in decoded)
def test_roundtrip_dict_of_objects(self) -> None:
"""Test dict with typed values roundtrips (used for shared state)."""
original = {"count": 42, "msg": Message(role="user", contents=["Hi"])}
encoded = serialize_value(original)
decoded = deserialize_value(encoded)
assert decoded["count"] == 42
assert isinstance(decoded["msg"], Message)
def test_roundtrip_dataclass_with_nested_pydantic(self) -> None:
"""Test dataclass containing a Pydantic model field roundtrips correctly.
This covers the HITL pattern where AnalysisWithSubmission (dataclass)
contains a ContentAnalysisResult (Pydantic BaseModel) field.
"""
original = DataclassWithPydanticField(label="test", model=SampleModel(title="Nested", count=99))
encoded = serialize_value(original)
decoded = deserialize_value(encoded)
assert isinstance(decoded, DataclassWithPydanticField)
assert decoded.label == "test"
assert isinstance(decoded.model, SampleModel)
assert decoded.model.title == "Nested"
assert decoded.model.count == 99
class TestReconstructToType:
"""Test suite for reconstruct_to_type function (used for HITL responses)."""
def test_none_returns_none(self) -> None:
"""Test that None input returns None."""
assert reconstruct_to_type(None, str) is None
def test_already_correct_type(self) -> None:
"""Test that values already of the correct type are returned as-is."""
assert reconstruct_to_type("hello", str) == "hello"
assert reconstruct_to_type(42, int) == 42
def test_non_dict_returns_original(self) -> None:
"""Test that non-dict values are returned as-is."""
assert reconstruct_to_type("hello", int) == "hello"
assert reconstruct_to_type([1, 2], dict) == [1, 2]
def test_reconstruct_pydantic_model(self) -> None:
"""Test reconstruction of Pydantic model from plain dict."""
class ApprovalResponse(BaseModel):
approved: bool
reason: str
data = {"approved": True, "reason": "Looks good"}
result = reconstruct_to_type(data, ApprovalResponse)
assert isinstance(result, ApprovalResponse)
assert result.approved is True
assert result.reason == "Looks good"
def test_reconstruct_dataclass(self) -> None:
"""Test reconstruction of dataclass from plain dict."""
@dataclass
class Feedback:
score: int
comment: str
data = {"score": 5, "comment": "Great"}
result = reconstruct_to_type(data, Feedback)
assert isinstance(result, Feedback)
assert result.score == 5
assert result.comment == "Great"
def test_reconstruct_from_checkpoint_markers(self) -> None:
"""Test that data with checkpoint markers is decoded via deserialize_value.
reconstruct_to_type is general-purpose and handles trusted checkpoint
data. Untrusted HITL callers must call strip_pickle_markers() first.
"""
original = SampleData(value=99, name="marker-test")
encoded = serialize_value(original)
result = reconstruct_to_type(encoded, SampleData)
assert isinstance(result, SampleData)
assert result.value == 99
def test_unrecognized_dict_returns_original(self) -> None:
"""Test that unrecognized dicts are returned as-is."""
@dataclass
class Unrelated:
completely_different: str
data = {"some_key": "some_value"}
result = reconstruct_to_type(data, Unrelated)
assert result == data
def test_reconstruct_strips_injected_pickle_markers(self) -> None:
"""End-to-end: strip_pickle_markers + reconstruct_to_type blocks attack.
This mirrors the real HITL flow where callers sanitize before reconstruction.
"""
malicious = {"__pickled__": "gASVDgAAAAAAAACMBHRlc3SULg==", "__type__": "builtins:str"}
sanitized = strip_pickle_markers(malicious)
result = reconstruct_to_type(sanitized, str)
assert result is None
class TestStripPickleMarkers:
"""Security tests for strip_pickle_markers — the defence-in-depth layer
that prevents untrusted HTTP input from reaching pickle.loads()."""
def test_strips_top_level_pickle_marker(self) -> None:
"""A dict containing __pickled__ must be replaced with None."""
data = {"__pickled__": "PAYLOAD", "__type__": "os:system"}
assert strip_pickle_markers(data) is None
def test_strips_top_level_type_marker_only(self) -> None:
"""Even __type__ alone (without __pickled__) must be neutralised."""
data = {"__type__": "os:system", "other": "value"}
assert strip_pickle_markers(data) is None
def test_strips_nested_pickle_marker(self) -> None:
"""Pickle markers nested inside a dict must be neutralised."""
data = {"safe": "value", "nested": {"__pickled__": "PAYLOAD", "__type__": "os:system"}}
result = strip_pickle_markers(data)
assert result == {"safe": "value", "nested": None}
def test_strips_pickle_marker_in_list(self) -> None:
"""Pickle markers inside a list element must be neutralised."""
data = [{"__pickled__": "PAYLOAD"}, "safe"]
result = strip_pickle_markers(data)
assert result == [None, "safe"]
def test_strips_deeply_nested_marker(self) -> None:
"""Deeply nested pickle markers must be neutralised."""
data = {"a": {"b": {"c": {"__pickled__": "deep"}}}}
result = strip_pickle_markers(data)
assert result == {"a": {"b": {"c": None}}}
def test_preserves_safe_dict(self) -> None:
"""Dicts without pickle markers must be left untouched."""
data = {"approved": True, "reason": "Looks good"}
assert strip_pickle_markers(data) == data
def test_preserves_primitives(self) -> None:
"""Primitive values must pass through unchanged."""
assert strip_pickle_markers("hello") == "hello"
assert strip_pickle_markers(42) == 42
assert strip_pickle_markers(None) is None
assert strip_pickle_markers(True) is True
def test_preserves_safe_list(self) -> None:
"""Lists without pickle markers must be left untouched."""
data = [1, "two", {"key": "value"}]
assert strip_pickle_markers(data) == data
def test_mixed_safe_and_malicious(self) -> None:
"""Only the malicious entries should be stripped; safe entries remain."""
data = {
"user_input": "hello",
"evil": {"__pickled__": "PAYLOAD", "__type__": "os:system"},
"count": 42,
}
result = strip_pickle_markers(data)
assert result == {"user_input": "hello", "evil": None, "count": 42}