Files
Evan Mattson da32e8cf80 Python: (core): Add functional workflow API (#4238)
* Add functional workflow api

* cleanup

* More cleanup

* address copilot feedback

* Address PR feedbacK

* updates

* PR feedback

* Address review comments on functional workflow samples

- Swap 05/06 get-started samples: agent workflow first (motivates
  why workflows exist), simple text workflow second
- Rename text_pipeline → text_workflow, poem_pipeline → poem_workflow
- Add @step to agent workflow sample (05) to demonstrate caching
- Switch agent samples to AzureOpenAIResponsesClient with Foundry
- Remove .as_agent() from agent_integration.py to focus on the key
  difference between inline agent calls vs @step-cached calls
- Add commented-out Agent.run example in hitl_review.py
- Add clarifying comment in _functional.py that event streaming is
  buffered (not true per-token streaming)
- Add naive_group_chat.py functional sample: round-robin group chat
  as a plain Python loop
- Update READMEs to reflect new file names and group chat sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright type errors

* Address PR review comments on functional workflow API

1. Allow request_info inside @step: Auto-inject RunContext into step
   functions that declare a RunContext parameter (by type or name 'ctx'),
   and expose get_run_context() for programmatic access.

2. Handle None responses: Log a warning when a response value is None,
   and document the behavior in request_info docstring.

3. Add executor_bypassed event type: Replace executor_invoked +
   executor_completed with a single executor_bypassed event when a step
   replays from cache, making cached vs live execution explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add regression tests for PR review comments on functional workflow API

The three review comments (request_info in @step, None response handling,
executor_bypassed event type) were already addressed in 7da7db4e. This
commit adds cross-cutting regression tests that exercise the interactions
between these features:

- HITL in step with caching: preceding step bypassed on resume
- Full checkpoint lifecycle with HITL step (interrupt -> resume -> restore)
- None response inside step-level request_info logs warning
- WorkflowInterrupted from step does not emit executor_failed

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR #4238 review comments on functional workflow API

Comment 1 (request_info in @step): Already supported. Added comment in
StepWrapper.__call__ explaining why WorkflowInterrupted (BaseException)
safely bypasses the except Exception handler.

Comment 2 (None response): Added docstring to _get_response clarifying
the (found, value) return tuple semantics and None handling.

Comment 3 (bypass event type): executor_bypassed is already a dedicated
event type in WorkflowEventType. Updated comment at the bypass site to
make the deliberate event type choice explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add experimental API warnings to functional workflow module

Mark all public classes and decorators (workflow, step, RunContext,
FunctionalWorkflow, StepWrapper, FunctionalWorkflowAgent) as
experimental and subject to change or removal.

* Address PR #4238 review comments from @eavanvalkenburg

- RunContext docstring leads with purpose (opt-in handle for HITL,
  custom events, state) so readers importing it from the public surface
  understand its role before the mechanics (#2993513452).
- Rename `06_first_functional_workflow.py` to
  `06_functional_workflow_basics.py`; the previous filename was
  confusing since it followed `05_functional_workflow_with_agents.py`
  (#2993531979).
- Simplify `05_functional_workflow_with_agents.py` to call agents
  directly without a @step wrapper; the step-vs-no-step contrast lives
  in `03-workflows/functional/agent_integration.py`, keeping the
  get-started sample minimal (#2993525532).
- Switch functional samples to `FoundryChatClient` for consistency with
  the rest of 01-get-started and 03-workflows (follow-up on #2876988570).
- Use walrus in `hitl_review.py` final-state assertion (#2993572182).
- Add expected-output block to `basic_streaming_pipeline.py` (#2993557609).
- Clarify in `parallel_pipeline.py` that `@step` composes with
  `asyncio.gather` (#2993597282).
- `naive_group_chat.py` threads `list[Message]` between turns instead
  of stringifying the transcript, preserving role/authorship (#2993583231).

Drive-by: pre-commit hook sorts an unrelated import block in
`samples/04-hosting/foundry-hosted-agents/responses/02_local_tools/main.py`.

* Fix 10 functional-workflow API bugs from /ultrareview pass

- bug_001: `ctx.request_info()` without an explicit `request_id` now derives
  a deterministic `auto::<index>` id from the call-counter, so HITL resume
  works correctly on the documented default path.  A uuid was regenerated on
  every replay, making resume impossible.

- bug_002: `StepWrapper.__call__` no longer deepcopies arguments on the
  cache-hit replay branch.  The copy is only performed on the live-execution
  path (for the event log) and falls back to the original mapping if deepcopy
  fails, so steps whose args aren't deepcopyable (locks, sockets, sessions)
  can still resume from checkpoint.

- bug_007: `_set_responses` now prunes each resolved `request_id` from
  `_pending_requests`, and the cache-hit branch in `request_info` does the
  same.  Previously, answered requests were re-serialized into every
  subsequent checkpoint and the final checkpoint falsely claimed pending
  requests even after the workflow completed.

- bug_008: `_compute_signature_hash` now mixes the function's `co_code` and
  `co_names` into the checkpoint signature, so changes to the workflow body
  invalidate older checkpoints even when steps are accessed via module /
  class attributes (which `_discover_step_names` can't see statically).
  `RunContext._record_observed_step` records observed step names for
  diagnostics.

- bug_010: `FunctionalWorkflow.run()` docstring corrected — says "at least
  one of message/responses/checkpoint_id" and explicitly notes `responses`
  may be combined with `checkpoint_id` (the validator already allowed this).

- bug_013: `FunctionalWorkflowAgent` now surfaces `request_info` events as
  `FunctionApprovalRequestContent` items (mirroring graph `WorkflowAgent`),
  threads `responses=` and `checkpoint_id=` through to the underlying
  workflow, and exposes `pending_requests`.  Previously `.as_agent()`
  returned empty `AgentResponse` for HITL workflows — effectively unusable.

- bug_014: `FunctionalWorkflow` now clears `_last_message`,
  `_last_step_cache`, and `_last_pending_request_ids` on clean completion.
  `run()` validates that `responses=` keys intersect the currently-pending
  request set (or raises with a clear error) instead of silently replaying
  against stale singleton state from a prior run.

- bug_015: `FunctionalWorkflow.as_agent` signature now matches graph
  `Workflow.as_agent`: accepts `name`, `description`, `context_providers`,
  and `**kwargs`.  `FunctionalWorkflowAgent` stores the overrides.

- bug_017: `RunContext.set_state` raises `ValueError` for underscore-
  prefixed keys (the framework's `_step_cache` / `_original_message` keys
  would silently clobber user state on checkpoint save and user
  underscore-prefixed state was dropped on restore).  Docstring documents
  the reserved prefix.

- merged_bug_003: Workflow function arity is validated at decoration time.
  Multiple non-ctx parameters raise `ValueError` immediately (previously
  every arg past the first was silently dropped at call time).  Passing a
  non-None `message` to a ctx-only workflow raises `ValueError` instead of
  silently discarding the message.

Test coverage: +18 regression tests covering every fix.  Full workflow
suite now 766 passed, 1 skipped, 2 xfailed; full core suite 2338 passed.

* Deslop functional.py fix commit

- Remove dead instrumentation added in the prior commit that was never
  consumed: `RunContext._observed_step_names`,
  `RunContext._record_observed_step`, `FunctionalWorkflow._runtime_step_names`,
  and `FunctionalWorkflowAgent._extra_kwargs`.  The signature hash relies on
  `co_code` alone, which covers the attribute-access case without the
  collection-scaffolding.
- Trim over-explanatory comments that restated what the code does or what
  it no longer does.  Keep only the comments that answer "why" for the
  non-obvious bits (deterministic id contract, defensive deepcopy, stale
  replay guard).
- Compress the `_compute_signature_hash` and FunctionalWorkflow `__init__`
  block docstrings without losing the user-facing reasoning.

Net -49 lines.  Regression lock preserved (766 passed, 1 skipped, 2 xfailed).

* Fix functional workflow review feedback

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <copilot@github.com>
2026-04-24 09:41:20 +00:00

20 KiB
Raw Permalink Blame History

Workflows Getting Started Samples

Installation

Microsoft Agent Framework Workflows support ships with the core agent-framework or agent-framework-core package, so no extra installation step is required.

To install with visualization support:

pip install agent-framework[viz] --pre

To export visualization images you also need to install GraphViz.

Samples Overview

Foundational Concepts - Start Here

Begin with the _start-here folder in order. These three samples introduce the core ideas of executors, edges, agents in workflows, and streaming.

Sample File Concepts
Executors and Edges _start-here/step1_executors_and_edges.py Minimal workflow with basic executors and edges
Agents in a Workflow _start-here/step2_agents_in_a_workflow.py Introduces adding Agents as nodes; calling agents inside a workflow
Streaming (Basics) _start-here/step3_streaming.py Extends workflows with event streaming

Once comfortable with these, explore the rest of the samples below.


Samples Overview (by directory)

functional

Write workflows as plain Python async functions — no graph concepts, no executor classes, no edges. Use native control flow (if/else, loops, asyncio.gather) for branching and parallelism.

Sample File Concepts
Basic Pipeline functional/basic_pipeline.py Sequential steps as plain async functions
Basic Streaming Pipeline functional/basic_streaming_pipeline.py Stream workflow events in real time with run(stream=True)
Parallel Pipeline functional/parallel_pipeline.py Fan-out/fan-in with asyncio.gather
Steps and Checkpointing functional/steps_and_checkpointing.py @step decorator for per-step checkpointing and observability
Human-in-the-Loop Review functional/hitl_review.py HITL with ctx.request_info() and replay
Agent Integration functional/agent_integration.py Calling agents inside workflow steps
Naive Group Chat functional/naive_group_chat.py Simple round-robin group chat as a plain loop

agents

Sample File Concepts
Azure Chat Agents (Streaming) agents/azure_chat_agents_streaming.py Add Azure Chat agents as edges and handle streaming events
Azure AI Agents (Streaming) agents/azure_ai_agents_streaming.py Add Azure AI agents as edges and handle streaming events
Azure AI Agents (Shared Thread) agents/azure_ai_agents_with_shared_session.py Share a common message session between multiple Azure AI agents in a workflow
Custom Agent Executors agents/custom_agent_executors.py Create executors to handle agent run methods
Workflow as Agent (Reflection Pattern) agents/workflow_as_agent_reflection_pattern.py Wrap a workflow so it can behave like an agent (reflection pattern)
Workflow as Agent + HITL agents/workflow_as_agent_human_in_the_loop.py Extend workflow-as-agent with human-in-the-loop capability
Workflow as Agent with Session agents/workflow_as_agent_with_session.py Use AgentSession to maintain conversation history across workflow-as-agent invocations
Workflow as Agent kwargs agents/workflow_as_agent_kwargs.py Pass custom context (data, user tokens) via kwargs through workflow.as_agent() to @ai_function tools

checkpoint

Sample File Concepts
Checkpoint & Resume checkpoint/checkpoint_with_resume.py Create checkpoints, inspect them, and resume execution
Checkpoint & HITL Resume checkpoint/checkpoint_with_human_in_the_loop.py Combine checkpointing with human approvals and resume pending HITL requests
Checkpointed Sub-Workflow checkpoint/sub_workflow_checkpoint.py Save and resume a sub-workflow that pauses for human approval
Handoff + Tool Approval Resume orchestrations/handoff_with_tool_approval_checkpoint_resume.py Handoff workflow that captures tool-call approvals in checkpoints and resumes with human decisions
Workflow as Agent Checkpoint checkpoint/workflow_as_agent_checkpoint.py Enable checkpointing when using workflow.as_agent() with checkpoint_storage parameter
Cosmos DB Checkpoint Storage checkpoint/cosmos_workflow_checkpointing.py Use CosmosCheckpointStorage for durable workflow checkpointing backed by Azure Cosmos DB NoSQL
Cosmos DB + Foundry Checkpoint checkpoint/cosmos_workflow_checkpointing_foundry.py Multi-agent workflow using FoundryChatClient with CosmosCheckpointStorage for durable pause/resume

composition

Sample File Concepts
Sub-Workflow (Basics) composition/sub_workflow_basics.py Wrap a workflow as an executor and orchestrate sub-workflows
Sub-Workflow: Request Interception composition/sub_workflow_request_interception.py Intercept and forward sub-workflow requests using @handler for SubWorkflowRequestMessage
Sub-Workflow: Parallel Requests composition/sub_workflow_parallel_requests.py Multiple specialized interceptors handling different request types from same sub-workflow
Sub-Workflow: kwargs Propagation composition/sub_workflow_kwargs.py Pass custom context (user tokens, config) from parent workflow through to sub-workflow agents

control-flow

Sample File Concepts
Sequential Executors control-flow/sequential_executors.py Sequential workflow with explicit executor setup
Sequential (Streaming) control-flow/sequential_streaming.py Stream events from a simple sequential run
Edge Condition control-flow/edge_condition.py Conditional routing based on agent classification
Switch-Case Edge Group control-flow/switch_case_edge_group.py Switch-case branching using classifier outputs
Multi-Selection Edge Group control-flow/multi_selection_edge_group.py Select one or many targets dynamically (subset fan-out)
Simple Loop control-flow/simple_loop.py Feedback loop where an agent judges ABOVE/BELOW/MATCHED
Workflow Cancellation control-flow/workflow_cancellation.py Cancel a running workflow using asyncio tasks

human-in-the-loop

Sample File Concepts
Human-In-The-Loop (Guessing Game) human-in-the-loop/guessing_game_with_human_input.py Interactive request/response prompts with a human via ctx.request_info()
Agents with Approval Requests in Workflows human-in-the-loop/agents_with_approval_requests.py Agents that create approval requests during workflow execution and wait for human approval to proceed
Agents with Declaration-Only Tools human-in-the-loop/agents_with_declaration_only_tools.py Workflow pauses when agent calls a client-side tool (func=None), caller supplies the result

Builder-oriented request-info samples are maintained in the orchestration sample set (sequential, concurrent, and group-chat builder variants).

tool-approval

Builder-based tool approval samples are maintained in the orchestration sample set.

observability

Sample File Concepts
Executor I/O Observation observability/executor_io_observation.py Observe executor input/output data via executor_invoked events (type='executor_invoked') and executor_completed events (type='executor_completed') without modifying executor code

For additional observability samples in Agent Framework, see the observability concept samples. The workflow observability sample demonstrates integrating observability into workflows.

orchestration

Orchestration-focused samples (Sequential, Concurrent, Handoff, GroupChat, Magentic), including builder-based workflow.as_agent(...) variants, are documented in the orchestrations directory.

parallelism

Sample File Concepts
Concurrent (Fan-out/Fan-in) parallelism/fan_out_fan_in_edges.py Dispatch to multiple executors and aggregate results
Aggregate Results of Different Types parallelism/aggregate_results_of_different_types.py Handle results of different types from multiple concurrent executors
Map-Reduce with Visualization parallelism/map_reduce_and_visualization.py Fan-out/fan-in pattern with diagram export

state-management

Sample File Concepts
State with Agents state-management/state_with_agents.py Store in state once and later reuse across agents
Workflow Kwargs - Global Context state-management/workflow_kwargs_global.py Pass custom context (data, user tokens) via kwargs to @tool tools in all agents
Workflow Kwargs - Per Agent state-management/workflow_kwargs_per_agent.py Pass custom context (data, user tokens) via kwargs to @tool tools in individual agents

visualization

Sample File Concepts
Concurrent with Visualization visualization/concurrent_with_visualization.py Fan-out/fan-in workflow with diagram export

declarative

YAML-based declarative workflows allow you to define multi-agent orchestration patterns without writing Python code. See the declarative workflows README for more details on YAML workflow syntax and available actions.

Sample File Concepts
Agent to Function Tool declarative/agent_to_function_tool/ Chain agent output to InvokeFunctionTool actions
Conditional Workflow declarative/conditional_workflow/ Nested conditional branching based on user input
Customer Support declarative/customer_support/ Multi-agent customer support with routing
Deep Research declarative/deep_research/ Research workflow with planning, searching, and synthesis
Function Tools declarative/function_tools/ Invoking Python functions from declarative workflows
Human-in-Loop declarative/human_in_loop/ Interactive workflows that request user input
Invoke Function Tool declarative/invoke_function_tool/ Call registered Python functions with InvokeFunctionTool
Marketing declarative/marketing/ Marketing content generation workflow
Simple Workflow declarative/simple_workflow/ Basic workflow with variable setting, conditionals, and loops
Student Teacher declarative/student_teacher/ Student-teacher interaction pattern

resources

Notes

  • Agent-based samples use provider SDKs (Azure/OpenAI, etc.). Ensure credentials are configured, or adapt agents accordingly.

Sequential orchestration uses a few small adapter nodes for plumbing:

  • "input-conversation" normalizes input to list[Message]
  • "to-conversation:" converts agent responses into the shared conversation
  • "complete" publishes the final output event (type='output') These may appear in event streams (executor_invoked/executor_completed). They're analogous to concurrents dispatcher and aggregator and can be ignored if you only care about agent activity.

Why FoundryChatClient?

Workflow and orchestration samples use FoundryChatClient because they create agents locally and do not need server-managed agent resources. This lightweight, project-backed chat client is a good fit for orchestration patterns such as Sequential, Concurrent, Handoff, GroupChat, and Magentic.

If you need persistent server-side agent resources, use the hosted-agent flows rather than these workflow samples.

Environment Variables

Workflow samples that use FoundryChatClient expect:

  • FOUNDRY_PROJECT_ENDPOINT (Azure AI Foundry Agent Service (V2) project endpoint)
  • FOUNDRY_MODEL (model deployment name)

These values are passed directly into the client constructor via os.getenv() in sample code.