Files
Evan Mattson da32e8cf80 Python: (core): Add functional workflow API (#4238)
* Add functional workflow api

* cleanup

* More cleanup

* address copilot feedback

* Address PR feedbacK

* updates

* PR feedback

* Address review comments on functional workflow samples

- Swap 05/06 get-started samples: agent workflow first (motivates
  why workflows exist), simple text workflow second
- Rename text_pipeline → text_workflow, poem_pipeline → poem_workflow
- Add @step to agent workflow sample (05) to demonstrate caching
- Switch agent samples to AzureOpenAIResponsesClient with Foundry
- Remove .as_agent() from agent_integration.py to focus on the key
  difference between inline agent calls vs @step-cached calls
- Add commented-out Agent.run example in hitl_review.py
- Add clarifying comment in _functional.py that event streaming is
  buffered (not true per-token streaming)
- Add naive_group_chat.py functional sample: round-robin group chat
  as a plain Python loop
- Update READMEs to reflect new file names and group chat sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright type errors

* Address PR review comments on functional workflow API

1. Allow request_info inside @step: Auto-inject RunContext into step
   functions that declare a RunContext parameter (by type or name 'ctx'),
   and expose get_run_context() for programmatic access.

2. Handle None responses: Log a warning when a response value is None,
   and document the behavior in request_info docstring.

3. Add executor_bypassed event type: Replace executor_invoked +
   executor_completed with a single executor_bypassed event when a step
   replays from cache, making cached vs live execution explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add regression tests for PR review comments on functional workflow API

The three review comments (request_info in @step, None response handling,
executor_bypassed event type) were already addressed in 7da7db4e. This
commit adds cross-cutting regression tests that exercise the interactions
between these features:

- HITL in step with caching: preceding step bypassed on resume
- Full checkpoint lifecycle with HITL step (interrupt -> resume -> restore)
- None response inside step-level request_info logs warning
- WorkflowInterrupted from step does not emit executor_failed

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR #4238 review comments on functional workflow API

Comment 1 (request_info in @step): Already supported. Added comment in
StepWrapper.__call__ explaining why WorkflowInterrupted (BaseException)
safely bypasses the except Exception handler.

Comment 2 (None response): Added docstring to _get_response clarifying
the (found, value) return tuple semantics and None handling.

Comment 3 (bypass event type): executor_bypassed is already a dedicated
event type in WorkflowEventType. Updated comment at the bypass site to
make the deliberate event type choice explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add experimental API warnings to functional workflow module

Mark all public classes and decorators (workflow, step, RunContext,
FunctionalWorkflow, StepWrapper, FunctionalWorkflowAgent) as
experimental and subject to change or removal.

* Address PR #4238 review comments from @eavanvalkenburg

- RunContext docstring leads with purpose (opt-in handle for HITL,
  custom events, state) so readers importing it from the public surface
  understand its role before the mechanics (#2993513452).
- Rename `06_first_functional_workflow.py` to
  `06_functional_workflow_basics.py`; the previous filename was
  confusing since it followed `05_functional_workflow_with_agents.py`
  (#2993531979).
- Simplify `05_functional_workflow_with_agents.py` to call agents
  directly without a @step wrapper; the step-vs-no-step contrast lives
  in `03-workflows/functional/agent_integration.py`, keeping the
  get-started sample minimal (#2993525532).
- Switch functional samples to `FoundryChatClient` for consistency with
  the rest of 01-get-started and 03-workflows (follow-up on #2876988570).
- Use walrus in `hitl_review.py` final-state assertion (#2993572182).
- Add expected-output block to `basic_streaming_pipeline.py` (#2993557609).
- Clarify in `parallel_pipeline.py` that `@step` composes with
  `asyncio.gather` (#2993597282).
- `naive_group_chat.py` threads `list[Message]` between turns instead
  of stringifying the transcript, preserving role/authorship (#2993583231).

Drive-by: pre-commit hook sorts an unrelated import block in
`samples/04-hosting/foundry-hosted-agents/responses/02_local_tools/main.py`.

* Fix 10 functional-workflow API bugs from /ultrareview pass

- bug_001: `ctx.request_info()` without an explicit `request_id` now derives
  a deterministic `auto::<index>` id from the call-counter, so HITL resume
  works correctly on the documented default path.  A uuid was regenerated on
  every replay, making resume impossible.

- bug_002: `StepWrapper.__call__` no longer deepcopies arguments on the
  cache-hit replay branch.  The copy is only performed on the live-execution
  path (for the event log) and falls back to the original mapping if deepcopy
  fails, so steps whose args aren't deepcopyable (locks, sockets, sessions)
  can still resume from checkpoint.

- bug_007: `_set_responses` now prunes each resolved `request_id` from
  `_pending_requests`, and the cache-hit branch in `request_info` does the
  same.  Previously, answered requests were re-serialized into every
  subsequent checkpoint and the final checkpoint falsely claimed pending
  requests even after the workflow completed.

- bug_008: `_compute_signature_hash` now mixes the function's `co_code` and
  `co_names` into the checkpoint signature, so changes to the workflow body
  invalidate older checkpoints even when steps are accessed via module /
  class attributes (which `_discover_step_names` can't see statically).
  `RunContext._record_observed_step` records observed step names for
  diagnostics.

- bug_010: `FunctionalWorkflow.run()` docstring corrected — says "at least
  one of message/responses/checkpoint_id" and explicitly notes `responses`
  may be combined with `checkpoint_id` (the validator already allowed this).

- bug_013: `FunctionalWorkflowAgent` now surfaces `request_info` events as
  `FunctionApprovalRequestContent` items (mirroring graph `WorkflowAgent`),
  threads `responses=` and `checkpoint_id=` through to the underlying
  workflow, and exposes `pending_requests`.  Previously `.as_agent()`
  returned empty `AgentResponse` for HITL workflows — effectively unusable.

- bug_014: `FunctionalWorkflow` now clears `_last_message`,
  `_last_step_cache`, and `_last_pending_request_ids` on clean completion.
  `run()` validates that `responses=` keys intersect the currently-pending
  request set (or raises with a clear error) instead of silently replaying
  against stale singleton state from a prior run.

- bug_015: `FunctionalWorkflow.as_agent` signature now matches graph
  `Workflow.as_agent`: accepts `name`, `description`, `context_providers`,
  and `**kwargs`.  `FunctionalWorkflowAgent` stores the overrides.

- bug_017: `RunContext.set_state` raises `ValueError` for underscore-
  prefixed keys (the framework's `_step_cache` / `_original_message` keys
  would silently clobber user state on checkpoint save and user
  underscore-prefixed state was dropped on restore).  Docstring documents
  the reserved prefix.

- merged_bug_003: Workflow function arity is validated at decoration time.
  Multiple non-ctx parameters raise `ValueError` immediately (previously
  every arg past the first was silently dropped at call time).  Passing a
  non-None `message` to a ctx-only workflow raises `ValueError` instead of
  silently discarding the message.

Test coverage: +18 regression tests covering every fix.  Full workflow
suite now 766 passed, 1 skipped, 2 xfailed; full core suite 2338 passed.

* Deslop functional.py fix commit

- Remove dead instrumentation added in the prior commit that was never
  consumed: `RunContext._observed_step_names`,
  `RunContext._record_observed_step`, `FunctionalWorkflow._runtime_step_names`,
  and `FunctionalWorkflowAgent._extra_kwargs`.  The signature hash relies on
  `co_code` alone, which covers the attribute-access case without the
  collection-scaffolding.
- Trim over-explanatory comments that restated what the code does or what
  it no longer does.  Keep only the comments that answer "why" for the
  non-obvious bits (deterministic id contract, defensive deepcopy, stale
  replay guard).
- Compress the `_compute_signature_hash` and FunctionalWorkflow `__init__`
  block docstrings without losing the user-facing reasoning.

Net -49 lines.  Regression lock preserved (766 passed, 1 skipped, 2 xfailed).

* Fix functional workflow review feedback

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <copilot@github.com>
2026-04-24 09:41:20 +00:00

197 lines
20 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Workflows Getting Started Samples
## Installation
Microsoft Agent Framework Workflows support ships with the core `agent-framework` or `agent-framework-core` package, so no extra installation step is required.
To install with visualization support:
```bash
pip install agent-framework[viz] --pre
```
To export visualization images you also need to [install GraphViz](https://graphviz.org/download/).
## Samples Overview
## Foundational Concepts - Start Here
Begin with the `_start-here` folder in order. These three samples introduce the core ideas of executors, edges, agents in workflows, and streaming.
| Sample | File | Concepts |
| -------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------- |
| Executors and Edges | [\_start-here/step1_executors_and_edges.py](./_start-here/step1_executors_and_edges.py) | Minimal workflow with basic executors and edges |
| Agents in a Workflow | [\_start-here/step2_agents_in_a_workflow.py](./_start-here/step2_agents_in_a_workflow.py) | Introduces adding Agents as nodes; calling agents inside a workflow |
| Streaming (Basics) | [\_start-here/step3_streaming.py](./_start-here/step3_streaming.py) | Extends workflows with event streaming |
Once comfortable with these, explore the rest of the samples below.
---
## Samples Overview (by directory)
### functional
Write workflows as plain Python async functions — no graph concepts, no executor classes, no edges. Use native control flow (`if`/`else`, loops, `asyncio.gather`) for branching and parallelism.
| Sample | File | Concepts |
|---|---|---|
| Basic Pipeline | [functional/basic_pipeline.py](./functional/basic_pipeline.py) | Sequential steps as plain async functions |
| Basic Streaming Pipeline | [functional/basic_streaming_pipeline.py](./functional/basic_streaming_pipeline.py) | Stream workflow events in real time with `run(stream=True)` |
| Parallel Pipeline | [functional/parallel_pipeline.py](./functional/parallel_pipeline.py) | Fan-out/fan-in with `asyncio.gather` |
| Steps and Checkpointing | [functional/steps_and_checkpointing.py](./functional/steps_and_checkpointing.py) | `@step` decorator for per-step checkpointing and observability |
| Human-in-the-Loop Review | [functional/hitl_review.py](./functional/hitl_review.py) | HITL with `ctx.request_info()` and replay |
| Agent Integration | [functional/agent_integration.py](./functional/agent_integration.py) | Calling agents inside workflow steps |
| Naive Group Chat | [functional/naive_group_chat.py](./functional/naive_group_chat.py) | Simple round-robin group chat as a plain loop |
### agents
| Sample | File | Concepts |
| -------------------------------------- | -------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- |
| Azure Chat Agents (Streaming) | [agents/azure_chat_agents_streaming.py](./agents/azure_chat_agents_streaming.py) | Add Azure Chat agents as edges and handle streaming events |
| Azure AI Agents (Streaming) | [agents/azure_ai_agents_streaming.py](./agents/azure_ai_agents_streaming.py) | Add Azure AI agents as edges and handle streaming events |
| Azure AI Agents (Shared Thread) | [agents/azure_ai_agents_with_shared_session.py](./agents/azure_ai_agents_with_shared_session.py) | Share a common message session between multiple Azure AI agents in a workflow |
| Custom Agent Executors | [agents/custom_agent_executors.py](./agents/custom_agent_executors.py) | Create executors to handle agent run methods |
| Workflow as Agent (Reflection Pattern) | [agents/workflow_as_agent_reflection_pattern.py](./agents/workflow_as_agent_reflection_pattern.py) | Wrap a workflow so it can behave like an agent (reflection pattern) |
| Workflow as Agent + HITL | [agents/workflow_as_agent_human_in_the_loop.py](./agents/workflow_as_agent_human_in_the_loop.py) | Extend workflow-as-agent with human-in-the-loop capability |
| Workflow as Agent with Session | [agents/workflow_as_agent_with_session.py](./agents/workflow_as_agent_with_session.py) | Use AgentSession to maintain conversation history across workflow-as-agent invocations |
| Workflow as Agent kwargs | [agents/workflow_as_agent_kwargs.py](./agents/workflow_as_agent_kwargs.py) | Pass custom context (data, user tokens) via kwargs through workflow.as_agent() to @ai_function tools |
### checkpoint
| Sample | File | Concepts |
| ------------------------------ | -------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------- |
| Checkpoint & Resume | [checkpoint/checkpoint_with_resume.py](./checkpoint/checkpoint_with_resume.py) | Create checkpoints, inspect them, and resume execution |
| Checkpoint & HITL Resume | [checkpoint/checkpoint_with_human_in_the_loop.py](./checkpoint/checkpoint_with_human_in_the_loop.py) | Combine checkpointing with human approvals and resume pending HITL requests |
| Checkpointed Sub-Workflow | [checkpoint/sub_workflow_checkpoint.py](./checkpoint/sub_workflow_checkpoint.py) | Save and resume a sub-workflow that pauses for human approval |
| Handoff + Tool Approval Resume | [orchestrations/handoff_with_tool_approval_checkpoint_resume.py](./orchestrations/handoff_with_tool_approval_checkpoint_resume.py) | Handoff workflow that captures tool-call approvals in checkpoints and resumes with human decisions |
| Workflow as Agent Checkpoint | [checkpoint/workflow_as_agent_checkpoint.py](./checkpoint/workflow_as_agent_checkpoint.py) | Enable checkpointing when using workflow.as_agent() with checkpoint_storage parameter |
| Cosmos DB Checkpoint Storage | [checkpoint/cosmos_workflow_checkpointing.py](./checkpoint/cosmos_workflow_checkpointing.py) | Use `CosmosCheckpointStorage` for durable workflow checkpointing backed by Azure Cosmos DB NoSQL |
| Cosmos DB + Foundry Checkpoint | [checkpoint/cosmos_workflow_checkpointing_foundry.py](./checkpoint/cosmos_workflow_checkpointing_foundry.py) | Multi-agent workflow using `FoundryChatClient` with `CosmosCheckpointStorage` for durable pause/resume |
### composition
| Sample | File | Concepts |
| ---------------------------------- | ------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------- |
| Sub-Workflow (Basics) | [composition/sub_workflow_basics.py](./composition/sub_workflow_basics.py) | Wrap a workflow as an executor and orchestrate sub-workflows |
| Sub-Workflow: Request Interception | [composition/sub_workflow_request_interception.py](./composition/sub_workflow_request_interception.py) | Intercept and forward sub-workflow requests using @handler for SubWorkflowRequestMessage |
| Sub-Workflow: Parallel Requests | [composition/sub_workflow_parallel_requests.py](./composition/sub_workflow_parallel_requests.py) | Multiple specialized interceptors handling different request types from same sub-workflow |
| Sub-Workflow: kwargs Propagation | [composition/sub_workflow_kwargs.py](./composition/sub_workflow_kwargs.py) | Pass custom context (user tokens, config) from parent workflow through to sub-workflow agents |
### control-flow
| Sample | File | Concepts |
| -------------------------- | ------------------------------------------------------------------------------------------ | ------------------------------------------------------- |
| Sequential Executors | [control-flow/sequential_executors.py](./control-flow/sequential_executors.py) | Sequential workflow with explicit executor setup |
| Sequential (Streaming) | [control-flow/sequential_streaming.py](./control-flow/sequential_streaming.py) | Stream events from a simple sequential run |
| Edge Condition | [control-flow/edge_condition.py](./control-flow/edge_condition.py) | Conditional routing based on agent classification |
| Switch-Case Edge Group | [control-flow/switch_case_edge_group.py](./control-flow/switch_case_edge_group.py) | Switch-case branching using classifier outputs |
| Multi-Selection Edge Group | [control-flow/multi_selection_edge_group.py](./control-flow/multi_selection_edge_group.py) | Select one or many targets dynamically (subset fan-out) |
| Simple Loop | [control-flow/simple_loop.py](./control-flow/simple_loop.py) | Feedback loop where an agent judges ABOVE/BELOW/MATCHED |
| Workflow Cancellation | [control-flow/workflow_cancellation.py](./control-flow/workflow_cancellation.py) | Cancel a running workflow using asyncio tasks |
### human-in-the-loop
| Sample | File | Concepts |
| ------------------------------------------ | ------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------- |
| Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human via `ctx.request_info()` |
| Agents with Approval Requests in Workflows | [human-in-the-loop/agents_with_approval_requests.py](./human-in-the-loop/agents_with_approval_requests.py) | Agents that create approval requests during workflow execution and wait for human approval to proceed |
| Agents with Declaration-Only Tools | [human-in-the-loop/agents_with_declaration_only_tools.py](./human-in-the-loop/agents_with_declaration_only_tools.py) | Workflow pauses when agent calls a client-side tool (`func=None`), caller supplies the result |
Builder-oriented request-info samples are maintained in the orchestration sample set
(sequential, concurrent, and group-chat builder variants).
### tool-approval
Builder-based tool approval samples are maintained in the orchestration sample set.
### observability
| Sample | File | Concepts |
| ------------------------ | -------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------- |
| Executor I/O Observation | [observability/executor_io_observation.py](./observability/executor_io_observation.py) | Observe executor input/output data via executor_invoked events (type='executor_invoked') and executor_completed events (type='executor_completed') without modifying executor code |
For additional observability samples in Agent Framework, see the [observability concept samples](../02-agents/observability/README.md). The [workflow observability sample](../02-agents/observability/workflow_observability.py) demonstrates integrating observability into workflows.
### orchestration
Orchestration-focused samples (Sequential, Concurrent, Handoff, GroupChat, Magentic), including builder-based
`workflow.as_agent(...)` variants, are documented in the [orchestrations](./orchestrations/README.md) directory.
### parallelism
| Sample | File | Concepts |
| ------------------------------------ | ------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------- |
| Concurrent (Fan-out/Fan-in) | [parallelism/fan_out_fan_in_edges.py](./parallelism/fan_out_fan_in_edges.py) | Dispatch to multiple executors and aggregate results |
| Aggregate Results of Different Types | [parallelism/aggregate_results_of_different_types.py](./parallelism/aggregate_results_of_different_types.py) | Handle results of different types from multiple concurrent executors |
| Map-Reduce with Visualization | [parallelism/map_reduce_and_visualization.py](./parallelism/map_reduce_and_visualization.py) | Fan-out/fan-in pattern with diagram export |
### state-management
| Sample | File | Concepts |
| -------------------------------- | ------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------- |
| State with Agents | [state-management/state_with_agents.py](./state-management/state_with_agents.py) | Store in state once and later reuse across agents |
| Workflow Kwargs - Global Context | [state-management/workflow_kwargs_global.py](./state-management/workflow_kwargs_global.py) | Pass custom context (data, user tokens) via kwargs to `@tool` tools in all agents |
| Workflow Kwargs - Per Agent | [state-management/workflow_kwargs_per_agent.py](./state-management/workflow_kwargs_per_agent.py) | Pass custom context (data, user tokens) via kwargs to `@tool` tools in individual agents |
### visualization
| Sample | File | Concepts |
| ----------------------------- | -------------------------------------------------------------------------------------------------- | ------------------------------------------- |
| Concurrent with Visualization | [visualization/concurrent_with_visualization.py](./visualization/concurrent_with_visualization.py) | Fan-out/fan-in workflow with diagram export |
### declarative
YAML-based declarative workflows allow you to define multi-agent orchestration patterns without writing Python code. See the [declarative workflows README](./declarative/README.md) for more details on YAML workflow syntax and available actions.
| Sample | File | Concepts |
|---|---|---|
| Agent to Function Tool | [declarative/agent_to_function_tool/](./declarative/agent_to_function_tool/) | Chain agent output to InvokeFunctionTool actions |
| Conditional Workflow | [declarative/conditional_workflow/](./declarative/conditional_workflow/) | Nested conditional branching based on user input |
| Customer Support | [declarative/customer_support/](./declarative/customer_support/) | Multi-agent customer support with routing |
| Deep Research | [declarative/deep_research/](./declarative/deep_research/) | Research workflow with planning, searching, and synthesis |
| Function Tools | [declarative/function_tools/](./declarative/function_tools/) | Invoking Python functions from declarative workflows |
| Human-in-Loop | [declarative/human_in_loop/](./declarative/human_in_loop/) | Interactive workflows that request user input |
| Invoke Function Tool | [declarative/invoke_function_tool/](./declarative/invoke_function_tool/) | Call registered Python functions with InvokeFunctionTool |
| Marketing | [declarative/marketing/](./declarative/marketing/) | Marketing content generation workflow |
| Simple Workflow | [declarative/simple_workflow/](./declarative/simple_workflow/) | Basic workflow with variable setting, conditionals, and loops |
| Student Teacher | [declarative/student_teacher/](./declarative/student_teacher/) | Student-teacher interaction pattern |
### resources
- Sample text inputs used by certain workflows:
- [resources/long_text.txt](./resources/long_text.txt)
- [resources/email.txt](./resources/email.txt)
- [resources/spam.txt](./resources/spam.txt)
- [resources/ambiguous_email.txt](./resources/ambiguous_email.txt)
Notes
- Agent-based samples use provider SDKs (Azure/OpenAI, etc.). Ensure credentials are configured, or adapt agents accordingly.
Sequential orchestration uses a few small adapter nodes for plumbing:
- "input-conversation" normalizes input to `list[Message]`
- "to-conversation:<participant>" converts agent responses into the shared conversation
- "complete" publishes the final output event (type='output')
These may appear in event streams (executor_invoked/executor_completed). They're analogous to
concurrents dispatcher and aggregator and can be ignored if you only care about agent activity.
### Why FoundryChatClient?
Workflow and orchestration samples use `FoundryChatClient` because they create agents locally and do not need
server-managed agent resources. This lightweight, project-backed chat client is a good fit for orchestration
patterns such as Sequential, Concurrent, Handoff, GroupChat, and Magentic.
If you need persistent server-side agent resources, use the hosted-agent flows rather than these workflow samples.
### Environment Variables
Workflow samples that use `FoundryChatClient` expect:
- `FOUNDRY_PROJECT_ENDPOINT` (Azure AI Foundry Agent Service (V2) project endpoint)
- `FOUNDRY_MODEL` (model deployment name)
These values are passed directly into the client constructor via `os.getenv()` in sample code.