Files
agent-framework/python/samples/03-workflows/functional/steps_and_checkpointing.py
T
Evan Mattson da32e8cf80 Python: (core): Add functional workflow API (#4238)
* Add functional workflow api

* cleanup

* More cleanup

* address copilot feedback

* Address PR feedbacK

* updates

* PR feedback

* Address review comments on functional workflow samples

- Swap 05/06 get-started samples: agent workflow first (motivates
  why workflows exist), simple text workflow second
- Rename text_pipeline → text_workflow, poem_pipeline → poem_workflow
- Add @step to agent workflow sample (05) to demonstrate caching
- Switch agent samples to AzureOpenAIResponsesClient with Foundry
- Remove .as_agent() from agent_integration.py to focus on the key
  difference between inline agent calls vs @step-cached calls
- Add commented-out Agent.run example in hitl_review.py
- Add clarifying comment in _functional.py that event streaming is
  buffered (not true per-token streaming)
- Add naive_group_chat.py functional sample: round-robin group chat
  as a plain Python loop
- Update READMEs to reflect new file names and group chat sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright type errors

* Address PR review comments on functional workflow API

1. Allow request_info inside @step: Auto-inject RunContext into step
   functions that declare a RunContext parameter (by type or name 'ctx'),
   and expose get_run_context() for programmatic access.

2. Handle None responses: Log a warning when a response value is None,
   and document the behavior in request_info docstring.

3. Add executor_bypassed event type: Replace executor_invoked +
   executor_completed with a single executor_bypassed event when a step
   replays from cache, making cached vs live execution explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add regression tests for PR review comments on functional workflow API

The three review comments (request_info in @step, None response handling,
executor_bypassed event type) were already addressed in 7da7db4e. This
commit adds cross-cutting regression tests that exercise the interactions
between these features:

- HITL in step with caching: preceding step bypassed on resume
- Full checkpoint lifecycle with HITL step (interrupt -> resume -> restore)
- None response inside step-level request_info logs warning
- WorkflowInterrupted from step does not emit executor_failed

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR #4238 review comments on functional workflow API

Comment 1 (request_info in @step): Already supported. Added comment in
StepWrapper.__call__ explaining why WorkflowInterrupted (BaseException)
safely bypasses the except Exception handler.

Comment 2 (None response): Added docstring to _get_response clarifying
the (found, value) return tuple semantics and None handling.

Comment 3 (bypass event type): executor_bypassed is already a dedicated
event type in WorkflowEventType. Updated comment at the bypass site to
make the deliberate event type choice explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add experimental API warnings to functional workflow module

Mark all public classes and decorators (workflow, step, RunContext,
FunctionalWorkflow, StepWrapper, FunctionalWorkflowAgent) as
experimental and subject to change or removal.

* Address PR #4238 review comments from @eavanvalkenburg

- RunContext docstring leads with purpose (opt-in handle for HITL,
  custom events, state) so readers importing it from the public surface
  understand its role before the mechanics (#2993513452).
- Rename `06_first_functional_workflow.py` to
  `06_functional_workflow_basics.py`; the previous filename was
  confusing since it followed `05_functional_workflow_with_agents.py`
  (#2993531979).
- Simplify `05_functional_workflow_with_agents.py` to call agents
  directly without a @step wrapper; the step-vs-no-step contrast lives
  in `03-workflows/functional/agent_integration.py`, keeping the
  get-started sample minimal (#2993525532).
- Switch functional samples to `FoundryChatClient` for consistency with
  the rest of 01-get-started and 03-workflows (follow-up on #2876988570).
- Use walrus in `hitl_review.py` final-state assertion (#2993572182).
- Add expected-output block to `basic_streaming_pipeline.py` (#2993557609).
- Clarify in `parallel_pipeline.py` that `@step` composes with
  `asyncio.gather` (#2993597282).
- `naive_group_chat.py` threads `list[Message]` between turns instead
  of stringifying the transcript, preserving role/authorship (#2993583231).

Drive-by: pre-commit hook sorts an unrelated import block in
`samples/04-hosting/foundry-hosted-agents/responses/02_local_tools/main.py`.

* Fix 10 functional-workflow API bugs from /ultrareview pass

- bug_001: `ctx.request_info()` without an explicit `request_id` now derives
  a deterministic `auto::<index>` id from the call-counter, so HITL resume
  works correctly on the documented default path.  A uuid was regenerated on
  every replay, making resume impossible.

- bug_002: `StepWrapper.__call__` no longer deepcopies arguments on the
  cache-hit replay branch.  The copy is only performed on the live-execution
  path (for the event log) and falls back to the original mapping if deepcopy
  fails, so steps whose args aren't deepcopyable (locks, sockets, sessions)
  can still resume from checkpoint.

- bug_007: `_set_responses` now prunes each resolved `request_id` from
  `_pending_requests`, and the cache-hit branch in `request_info` does the
  same.  Previously, answered requests were re-serialized into every
  subsequent checkpoint and the final checkpoint falsely claimed pending
  requests even after the workflow completed.

- bug_008: `_compute_signature_hash` now mixes the function's `co_code` and
  `co_names` into the checkpoint signature, so changes to the workflow body
  invalidate older checkpoints even when steps are accessed via module /
  class attributes (which `_discover_step_names` can't see statically).
  `RunContext._record_observed_step` records observed step names for
  diagnostics.

- bug_010: `FunctionalWorkflow.run()` docstring corrected — says "at least
  one of message/responses/checkpoint_id" and explicitly notes `responses`
  may be combined with `checkpoint_id` (the validator already allowed this).

- bug_013: `FunctionalWorkflowAgent` now surfaces `request_info` events as
  `FunctionApprovalRequestContent` items (mirroring graph `WorkflowAgent`),
  threads `responses=` and `checkpoint_id=` through to the underlying
  workflow, and exposes `pending_requests`.  Previously `.as_agent()`
  returned empty `AgentResponse` for HITL workflows — effectively unusable.

- bug_014: `FunctionalWorkflow` now clears `_last_message`,
  `_last_step_cache`, and `_last_pending_request_ids` on clean completion.
  `run()` validates that `responses=` keys intersect the currently-pending
  request set (or raises with a clear error) instead of silently replaying
  against stale singleton state from a prior run.

- bug_015: `FunctionalWorkflow.as_agent` signature now matches graph
  `Workflow.as_agent`: accepts `name`, `description`, `context_providers`,
  and `**kwargs`.  `FunctionalWorkflowAgent` stores the overrides.

- bug_017: `RunContext.set_state` raises `ValueError` for underscore-
  prefixed keys (the framework's `_step_cache` / `_original_message` keys
  would silently clobber user state on checkpoint save and user
  underscore-prefixed state was dropped on restore).  Docstring documents
  the reserved prefix.

- merged_bug_003: Workflow function arity is validated at decoration time.
  Multiple non-ctx parameters raise `ValueError` immediately (previously
  every arg past the first was silently dropped at call time).  Passing a
  non-None `message` to a ctx-only workflow raises `ValueError` instead of
  silently discarding the message.

Test coverage: +18 regression tests covering every fix.  Full workflow
suite now 766 passed, 1 skipped, 2 xfailed; full core suite 2338 passed.

* Deslop functional.py fix commit

- Remove dead instrumentation added in the prior commit that was never
  consumed: `RunContext._observed_step_names`,
  `RunContext._record_observed_step`, `FunctionalWorkflow._runtime_step_names`,
  and `FunctionalWorkflowAgent._extra_kwargs`.  The signature hash relies on
  `co_code` alone, which covers the attribute-access case without the
  collection-scaffolding.
- Trim over-explanatory comments that restated what the code does or what
  it no longer does.  Keep only the comments that answer "why" for the
  non-obvious bits (deterministic id contract, defensive deepcopy, stale
  replay guard).
- Compress the `_compute_signature_hash` and FunctionalWorkflow `__init__`
  block docstrings without losing the user-facing reasoning.

Net -49 lines.  Regression lock preserved (766 passed, 1 skipped, 2 xfailed).

* Fix functional workflow review feedback

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <copilot@github.com>
2026-04-24 09:41:20 +00:00

98 lines
3.6 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Introducing @step: per-step checkpointing and observability.
The previous samples used plain functions — and that works. Workflows support
HITL (ctx.request_info) and checkpointing regardless of whether you use @step.
The difference: without @step, a resumed workflow re-executes every function
call from the top. That's fine for cheap functions. But for expensive operations
(API calls, agent runs, etc.) you don't want to pay that cost again.
@step saves each function's result so it skips re-execution on resume:
- On HITL resume, completed steps return their saved result instantly.
- On crash recovery from a checkpoint, earlier step results are restored.
- Each step emits executor_invoked/executor_completed events for observability.
@step is opt-in. Plain functions still work alongside @step in the same workflow.
"""
import asyncio
from agent_framework import InMemoryCheckpointStorage, step, workflow
# Track call counts to show which functions actually execute on resume
fetch_calls = 0
transform_calls = 0
# @step saves this function's result. On resume, it returns the saved
# result instead of re-executing — useful because this is expensive.
@step
async def fetch_data(url: str) -> dict[str, str | int]:
"""Expensive operation — @step prevents re-execution on resume."""
global fetch_calls
fetch_calls += 1
print(f" fetch_data called (call #{fetch_calls})")
return {"url": url, "content": f"Data from {url}", "status": 200}
@step
async def transform_data(data: dict[str, str | int]) -> str:
"""Another expensive operation — @step saves the result."""
global transform_calls
transform_calls += 1
print(f" transform_data called (call #{transform_calls})")
return f"[{data['status']}] {data['content']}"
# No @step — this is cheap, so it just re-runs on resume. That's fine.
async def validate_result(summary: str) -> bool:
"""Cheap validation — no @step needed."""
return len(summary) > 0 and "[200]" in summary
storage = InMemoryCheckpointStorage()
# checkpoint_storage tells @workflow where to persist step results.
# Each @step saves a checkpoint after it completes.
@workflow(checkpoint_storage=storage)
async def data_pipeline(url: str) -> str:
"""Mix of @step functions and plain functions."""
raw = await fetch_data(url)
summary = await transform_data(raw)
is_valid = await validate_result(summary)
return f"{summary} (valid={is_valid})"
async def main():
# --- Run 1: Everything executes normally ---
print("=== Run 1: Fresh execution ===")
result = await data_pipeline.run("https://example.com/api/data")
print(f"Output: {result.get_outputs()[0]}")
print(f"fetch_calls={fetch_calls}, transform_calls={transform_calls}")
# @step functions emit executor events; plain functions don't.
print("\nEvents:")
for event in result:
if event.type in ("executor_invoked", "executor_completed"):
print(f" {event.type}: {event.executor_id}")
# --- Run 2: Restore from checkpoint ---
# The workflow re-executes, but @step functions return saved results.
# Only validate_result() (no @step) actually runs again.
print("\n=== Run 2: Restored from checkpoint ===")
latest = await storage.get_latest(workflow_name="data_pipeline")
assert latest is not None
result2 = await data_pipeline.run(checkpoint_id=latest.checkpoint_id)
print(f"Output: {result2.get_outputs()[0]}")
print(f"fetch_calls={fetch_calls}, transform_calls={transform_calls}")
print("(call counts unchanged — @step results were restored from checkpoint)")
if __name__ == "__main__":
asyncio.run(main())