Files
Evan Mattson da32e8cf80 Python: (core): Add functional workflow API (#4238)
* Add functional workflow api

* cleanup

* More cleanup

* address copilot feedback

* Address PR feedbacK

* updates

* PR feedback

* Address review comments on functional workflow samples

- Swap 05/06 get-started samples: agent workflow first (motivates
  why workflows exist), simple text workflow second
- Rename text_pipeline → text_workflow, poem_pipeline → poem_workflow
- Add @step to agent workflow sample (05) to demonstrate caching
- Switch agent samples to AzureOpenAIResponsesClient with Foundry
- Remove .as_agent() from agent_integration.py to focus on the key
  difference between inline agent calls vs @step-cached calls
- Add commented-out Agent.run example in hitl_review.py
- Add clarifying comment in _functional.py that event streaming is
  buffered (not true per-token streaming)
- Add naive_group_chat.py functional sample: round-robin group chat
  as a plain Python loop
- Update READMEs to reflect new file names and group chat sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright type errors

* Address PR review comments on functional workflow API

1. Allow request_info inside @step: Auto-inject RunContext into step
   functions that declare a RunContext parameter (by type or name 'ctx'),
   and expose get_run_context() for programmatic access.

2. Handle None responses: Log a warning when a response value is None,
   and document the behavior in request_info docstring.

3. Add executor_bypassed event type: Replace executor_invoked +
   executor_completed with a single executor_bypassed event when a step
   replays from cache, making cached vs live execution explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add regression tests for PR review comments on functional workflow API

The three review comments (request_info in @step, None response handling,
executor_bypassed event type) were already addressed in 7da7db4e. This
commit adds cross-cutting regression tests that exercise the interactions
between these features:

- HITL in step with caching: preceding step bypassed on resume
- Full checkpoint lifecycle with HITL step (interrupt -> resume -> restore)
- None response inside step-level request_info logs warning
- WorkflowInterrupted from step does not emit executor_failed

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR #4238 review comments on functional workflow API

Comment 1 (request_info in @step): Already supported. Added comment in
StepWrapper.__call__ explaining why WorkflowInterrupted (BaseException)
safely bypasses the except Exception handler.

Comment 2 (None response): Added docstring to _get_response clarifying
the (found, value) return tuple semantics and None handling.

Comment 3 (bypass event type): executor_bypassed is already a dedicated
event type in WorkflowEventType. Updated comment at the bypass site to
make the deliberate event type choice explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add experimental API warnings to functional workflow module

Mark all public classes and decorators (workflow, step, RunContext,
FunctionalWorkflow, StepWrapper, FunctionalWorkflowAgent) as
experimental and subject to change or removal.

* Address PR #4238 review comments from @eavanvalkenburg

- RunContext docstring leads with purpose (opt-in handle for HITL,
  custom events, state) so readers importing it from the public surface
  understand its role before the mechanics (#2993513452).
- Rename `06_first_functional_workflow.py` to
  `06_functional_workflow_basics.py`; the previous filename was
  confusing since it followed `05_functional_workflow_with_agents.py`
  (#2993531979).
- Simplify `05_functional_workflow_with_agents.py` to call agents
  directly without a @step wrapper; the step-vs-no-step contrast lives
  in `03-workflows/functional/agent_integration.py`, keeping the
  get-started sample minimal (#2993525532).
- Switch functional samples to `FoundryChatClient` for consistency with
  the rest of 01-get-started and 03-workflows (follow-up on #2876988570).
- Use walrus in `hitl_review.py` final-state assertion (#2993572182).
- Add expected-output block to `basic_streaming_pipeline.py` (#2993557609).
- Clarify in `parallel_pipeline.py` that `@step` composes with
  `asyncio.gather` (#2993597282).
- `naive_group_chat.py` threads `list[Message]` between turns instead
  of stringifying the transcript, preserving role/authorship (#2993583231).

Drive-by: pre-commit hook sorts an unrelated import block in
`samples/04-hosting/foundry-hosted-agents/responses/02_local_tools/main.py`.

* Fix 10 functional-workflow API bugs from /ultrareview pass

- bug_001: `ctx.request_info()` without an explicit `request_id` now derives
  a deterministic `auto::<index>` id from the call-counter, so HITL resume
  works correctly on the documented default path.  A uuid was regenerated on
  every replay, making resume impossible.

- bug_002: `StepWrapper.__call__` no longer deepcopies arguments on the
  cache-hit replay branch.  The copy is only performed on the live-execution
  path (for the event log) and falls back to the original mapping if deepcopy
  fails, so steps whose args aren't deepcopyable (locks, sockets, sessions)
  can still resume from checkpoint.

- bug_007: `_set_responses` now prunes each resolved `request_id` from
  `_pending_requests`, and the cache-hit branch in `request_info` does the
  same.  Previously, answered requests were re-serialized into every
  subsequent checkpoint and the final checkpoint falsely claimed pending
  requests even after the workflow completed.

- bug_008: `_compute_signature_hash` now mixes the function's `co_code` and
  `co_names` into the checkpoint signature, so changes to the workflow body
  invalidate older checkpoints even when steps are accessed via module /
  class attributes (which `_discover_step_names` can't see statically).
  `RunContext._record_observed_step` records observed step names for
  diagnostics.

- bug_010: `FunctionalWorkflow.run()` docstring corrected — says "at least
  one of message/responses/checkpoint_id" and explicitly notes `responses`
  may be combined with `checkpoint_id` (the validator already allowed this).

- bug_013: `FunctionalWorkflowAgent` now surfaces `request_info` events as
  `FunctionApprovalRequestContent` items (mirroring graph `WorkflowAgent`),
  threads `responses=` and `checkpoint_id=` through to the underlying
  workflow, and exposes `pending_requests`.  Previously `.as_agent()`
  returned empty `AgentResponse` for HITL workflows — effectively unusable.

- bug_014: `FunctionalWorkflow` now clears `_last_message`,
  `_last_step_cache`, and `_last_pending_request_ids` on clean completion.
  `run()` validates that `responses=` keys intersect the currently-pending
  request set (or raises with a clear error) instead of silently replaying
  against stale singleton state from a prior run.

- bug_015: `FunctionalWorkflow.as_agent` signature now matches graph
  `Workflow.as_agent`: accepts `name`, `description`, `context_providers`,
  and `**kwargs`.  `FunctionalWorkflowAgent` stores the overrides.

- bug_017: `RunContext.set_state` raises `ValueError` for underscore-
  prefixed keys (the framework's `_step_cache` / `_original_message` keys
  would silently clobber user state on checkpoint save and user
  underscore-prefixed state was dropped on restore).  Docstring documents
  the reserved prefix.

- merged_bug_003: Workflow function arity is validated at decoration time.
  Multiple non-ctx parameters raise `ValueError` immediately (previously
  every arg past the first was silently dropped at call time).  Passing a
  non-None `message` to a ctx-only workflow raises `ValueError` instead of
  silently discarding the message.

Test coverage: +18 regression tests covering every fix.  Full workflow
suite now 766 passed, 1 skipped, 2 xfailed; full core suite 2338 passed.

* Deslop functional.py fix commit

- Remove dead instrumentation added in the prior commit that was never
  consumed: `RunContext._observed_step_names`,
  `RunContext._record_observed_step`, `FunctionalWorkflow._runtime_step_names`,
  and `FunctionalWorkflowAgent._extra_kwargs`.  The signature hash relies on
  `co_code` alone, which covers the attribute-access case without the
  collection-scaffolding.
- Trim over-explanatory comments that restated what the code does or what
  it no longer does.  Keep only the comments that answer "why" for the
  non-obvious bits (deterministic id contract, defensive deepcopy, stale
  replay guard).
- Compress the `_compute_signature_hash` and FunctionalWorkflow `__init__`
  block docstrings without losing the user-facing reasoning.

Net -49 lines.  Regression lock preserved (766 passed, 1 skipped, 2 xfailed).

* Fix functional workflow review feedback

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <copilot@github.com>
2026-04-24 09:41:20 +00:00

74 lines
1.9 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
executor,
handler,
)
from typing_extensions import Never
"""
First Graph Workflow — Chain executors with edges
The graph API gives you full control over execution topology: edges,
fan-out/fan-in, switch/case, and superstep-based checkpointing.
This sample builds a minimal graph workflow with two steps:
1. Convert text to uppercase (class-based executor)
2. Reverse the text (function-based executor)
No external services are required.
"""
# <create_workflow>
# Step 1: A class-based executor that converts text to uppercase
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert input to uppercase and forward to the next node."""
await ctx.send_message(text.upper())
# Step 2: A function-based executor that reverses the string and yields output
@executor(id="reverse_text")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Reverse the string and yield the final workflow output."""
await ctx.yield_output(text[::-1])
def create_workflow():
"""Build the workflow: UpperCase → reverse_text."""
upper = UpperCase(id="upper_case")
return WorkflowBuilder(start_executor=upper).add_edge(upper, reverse_text).build()
# </create_workflow>
async def main() -> None:
# <run_workflow>
workflow = create_workflow()
events = await workflow.run("hello world")
print(f"Output: {events.get_outputs()}")
print(f"Final state: {events.get_final_state()}")
# </run_workflow>
"""
Expected output:
Output: ['DLROW OLLEH']
Final state: WorkflowRunState.IDLE
"""
if __name__ == "__main__":
asyncio.run(main())