mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
da32e8cf80
* Add functional workflow api * cleanup * More cleanup * address copilot feedback * Address PR feedbacK * updates * PR feedback * Address review comments on functional workflow samples - Swap 05/06 get-started samples: agent workflow first (motivates why workflows exist), simple text workflow second - Rename text_pipeline → text_workflow, poem_pipeline → poem_workflow - Add @step to agent workflow sample (05) to demonstrate caching - Switch agent samples to AzureOpenAIResponsesClient with Foundry - Remove .as_agent() from agent_integration.py to focus on the key difference between inline agent calls vs @step-cached calls - Add commented-out Agent.run example in hitl_review.py - Add clarifying comment in _functional.py that event streaming is buffered (not true per-token streaming) - Add naive_group_chat.py functional sample: round-robin group chat as a plain Python loop - Update READMEs to reflect new file names and group chat sample Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix pyright type errors * Address PR review comments on functional workflow API 1. Allow request_info inside @step: Auto-inject RunContext into step functions that declare a RunContext parameter (by type or name 'ctx'), and expose get_run_context() for programmatic access. 2. Handle None responses: Log a warning when a response value is None, and document the behavior in request_info docstring. 3. Add executor_bypassed event type: Replace executor_invoked + executor_completed with a single executor_bypassed event when a step replays from cache, making cached vs live execution explicit. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add regression tests for PR review comments on functional workflow API The three review comments (request_info in @step, None response handling, executor_bypassed event type) were already addressed in 7da7db4e. This commit adds cross-cutting regression tests that exercise the interactions between these features: - HITL in step with caching: preceding step bypassed on resume - Full checkpoint lifecycle with HITL step (interrupt -> resume -> restore) - None response inside step-level request_info logs warning - WorkflowInterrupted from step does not emit executor_failed Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address PR #4238 review comments on functional workflow API Comment 1 (request_info in @step): Already supported. Added comment in StepWrapper.__call__ explaining why WorkflowInterrupted (BaseException) safely bypasses the except Exception handler. Comment 2 (None response): Added docstring to _get_response clarifying the (found, value) return tuple semantics and None handling. Comment 3 (bypass event type): executor_bypassed is already a dedicated event type in WorkflowEventType. Updated comment at the bypass site to make the deliberate event type choice explicit. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add experimental API warnings to functional workflow module Mark all public classes and decorators (workflow, step, RunContext, FunctionalWorkflow, StepWrapper, FunctionalWorkflowAgent) as experimental and subject to change or removal. * Address PR #4238 review comments from @eavanvalkenburg - RunContext docstring leads with purpose (opt-in handle for HITL, custom events, state) so readers importing it from the public surface understand its role before the mechanics (#2993513452). - Rename `06_first_functional_workflow.py` to `06_functional_workflow_basics.py`; the previous filename was confusing since it followed `05_functional_workflow_with_agents.py` (#2993531979). - Simplify `05_functional_workflow_with_agents.py` to call agents directly without a @step wrapper; the step-vs-no-step contrast lives in `03-workflows/functional/agent_integration.py`, keeping the get-started sample minimal (#2993525532). - Switch functional samples to `FoundryChatClient` for consistency with the rest of 01-get-started and 03-workflows (follow-up on #2876988570). - Use walrus in `hitl_review.py` final-state assertion (#2993572182). - Add expected-output block to `basic_streaming_pipeline.py` (#2993557609). - Clarify in `parallel_pipeline.py` that `@step` composes with `asyncio.gather` (#2993597282). - `naive_group_chat.py` threads `list[Message]` between turns instead of stringifying the transcript, preserving role/authorship (#2993583231). Drive-by: pre-commit hook sorts an unrelated import block in `samples/04-hosting/foundry-hosted-agents/responses/02_local_tools/main.py`. * Fix 10 functional-workflow API bugs from /ultrareview pass - bug_001: `ctx.request_info()` without an explicit `request_id` now derives a deterministic `auto::<index>` id from the call-counter, so HITL resume works correctly on the documented default path. A uuid was regenerated on every replay, making resume impossible. - bug_002: `StepWrapper.__call__` no longer deepcopies arguments on the cache-hit replay branch. The copy is only performed on the live-execution path (for the event log) and falls back to the original mapping if deepcopy fails, so steps whose args aren't deepcopyable (locks, sockets, sessions) can still resume from checkpoint. - bug_007: `_set_responses` now prunes each resolved `request_id` from `_pending_requests`, and the cache-hit branch in `request_info` does the same. Previously, answered requests were re-serialized into every subsequent checkpoint and the final checkpoint falsely claimed pending requests even after the workflow completed. - bug_008: `_compute_signature_hash` now mixes the function's `co_code` and `co_names` into the checkpoint signature, so changes to the workflow body invalidate older checkpoints even when steps are accessed via module / class attributes (which `_discover_step_names` can't see statically). `RunContext._record_observed_step` records observed step names for diagnostics. - bug_010: `FunctionalWorkflow.run()` docstring corrected — says "at least one of message/responses/checkpoint_id" and explicitly notes `responses` may be combined with `checkpoint_id` (the validator already allowed this). - bug_013: `FunctionalWorkflowAgent` now surfaces `request_info` events as `FunctionApprovalRequestContent` items (mirroring graph `WorkflowAgent`), threads `responses=` and `checkpoint_id=` through to the underlying workflow, and exposes `pending_requests`. Previously `.as_agent()` returned empty `AgentResponse` for HITL workflows — effectively unusable. - bug_014: `FunctionalWorkflow` now clears `_last_message`, `_last_step_cache`, and `_last_pending_request_ids` on clean completion. `run()` validates that `responses=` keys intersect the currently-pending request set (or raises with a clear error) instead of silently replaying against stale singleton state from a prior run. - bug_015: `FunctionalWorkflow.as_agent` signature now matches graph `Workflow.as_agent`: accepts `name`, `description`, `context_providers`, and `**kwargs`. `FunctionalWorkflowAgent` stores the overrides. - bug_017: `RunContext.set_state` raises `ValueError` for underscore- prefixed keys (the framework's `_step_cache` / `_original_message` keys would silently clobber user state on checkpoint save and user underscore-prefixed state was dropped on restore). Docstring documents the reserved prefix. - merged_bug_003: Workflow function arity is validated at decoration time. Multiple non-ctx parameters raise `ValueError` immediately (previously every arg past the first was silently dropped at call time). Passing a non-None `message` to a ctx-only workflow raises `ValueError` instead of silently discarding the message. Test coverage: +18 regression tests covering every fix. Full workflow suite now 766 passed, 1 skipped, 2 xfailed; full core suite 2338 passed. * Deslop functional.py fix commit - Remove dead instrumentation added in the prior commit that was never consumed: `RunContext._observed_step_names`, `RunContext._record_observed_step`, `FunctionalWorkflow._runtime_step_names`, and `FunctionalWorkflowAgent._extra_kwargs`. The signature hash relies on `co_code` alone, which covers the attribute-access case without the collection-scaffolding. - Trim over-explanatory comments that restated what the code does or what it no longer does. Keep only the comments that answer "why" for the non-obvious bits (deterministic id contract, defensive deepcopy, stale replay guard). - Compress the `_compute_signature_hash` and FunctionalWorkflow `__init__` block docstrings without losing the user-facing reasoning. Net -49 lines. Regression lock preserved (766 passed, 1 skipped, 2 xfailed). * Fix functional workflow review feedback --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot <copilot@github.com>
85 lines
3.3 KiB
Python
85 lines
3.3 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Human-in-the-loop review pipeline using functional workflows.
|
|
|
|
Demonstrates ctx.request_info() for pausing the workflow to wait for
|
|
external input and resuming with run(responses={...}).
|
|
|
|
HITL works with or without @step. The difference is what happens on resume:
|
|
- Without @step: every function re-executes from the top (fine for cheap calls).
|
|
- With @step: completed functions return their saved result instantly.
|
|
|
|
This sample uses @step on write_draft() because it simulates an expensive
|
|
operation that shouldn't re-run just because the workflow was paused.
|
|
"""
|
|
|
|
import asyncio
|
|
|
|
from agent_framework import RunContext, WorkflowRunState, step, workflow
|
|
|
|
|
|
# @step saves the result. When the workflow resumes after the HITL pause,
|
|
# this returns its saved result instead of running the expensive operation again.
|
|
#
|
|
# In a real workflow you might call an agent here instead:
|
|
# @step
|
|
# async def write_draft(topic: str) -> str:
|
|
# return (await writer_agent.run(f"Write a draft about: {topic}")).text
|
|
@step
|
|
async def write_draft(topic: str) -> str:
|
|
"""Simulate writing a draft — expensive, shouldn't re-run on resume."""
|
|
print(f" write_draft executing for '{topic}'")
|
|
return f"Draft document about '{topic}': Lorem ipsum dolor sit amet..."
|
|
|
|
|
|
@step
|
|
async def revise_draft(draft: str, feedback: str) -> str:
|
|
"""Revise the draft based on feedback."""
|
|
return f"Revised: {draft[:50]}... [Applied feedback: {feedback}]"
|
|
|
|
|
|
@workflow
|
|
async def review_pipeline(topic: str, ctx: RunContext) -> str:
|
|
"""Write a draft, get human review, then revise."""
|
|
draft = await write_draft(topic)
|
|
|
|
# ctx.request_info() suspends the workflow here. The caller gets back
|
|
# a WorkflowRunResult with state IDLE_WITH_PENDING_REQUESTS and can
|
|
# inspect the pending request via result.get_request_info_events().
|
|
feedback = await ctx.request_info(
|
|
{"draft": draft, "instructions": "Please review this draft"},
|
|
response_type=str,
|
|
request_id="review_request",
|
|
)
|
|
|
|
# This only executes after the caller resumes with run(responses={...}).
|
|
# write_draft above returns its saved result (thanks to @step),
|
|
# request_info returns the provided response, and we continue here.
|
|
return await revise_draft(draft, feedback)
|
|
|
|
|
|
async def main():
|
|
# Phase 1: Run until the workflow pauses for human input
|
|
print("=== Phase 1: Initial run ===")
|
|
result1 = await review_pipeline.run("AI Safety")
|
|
|
|
# If request_info() was reached, the state is IDLE_WITH_PENDING_REQUESTS.
|
|
# If the workflow completed without hitting request_info(), it would be IDLE.
|
|
print(f"State: {(final_state := result1.get_final_state())}")
|
|
assert final_state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
|
|
|
|
requests = result1.get_request_info_events()
|
|
print(f"Pending request: {requests[0].request_id}")
|
|
|
|
# Phase 2: Resume with the human's response
|
|
print("\n=== Phase 2: Resume with feedback ===")
|
|
print("(write_draft should NOT execute again — saved by @step)")
|
|
result2 = await review_pipeline.run(responses={"review_request": "Add more details about alignment research"})
|
|
|
|
print(f"State: {result2.get_final_state()}")
|
|
print(f"Output: {result2.get_outputs()[0]}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|