Python: (core): Add functional workflow API (#4238)

* Add functional workflow api

* cleanup

* More cleanup

* address copilot feedback

* Address PR feedbacK

* updates

* PR feedback

* Address review comments on functional workflow samples

- Swap 05/06 get-started samples: agent workflow first (motivates
  why workflows exist), simple text workflow second
- Rename text_pipeline → text_workflow, poem_pipeline → poem_workflow
- Add @step to agent workflow sample (05) to demonstrate caching
- Switch agent samples to AzureOpenAIResponsesClient with Foundry
- Remove .as_agent() from agent_integration.py to focus on the key
  difference between inline agent calls vs @step-cached calls
- Add commented-out Agent.run example in hitl_review.py
- Add clarifying comment in _functional.py that event streaming is
  buffered (not true per-token streaming)
- Add naive_group_chat.py functional sample: round-robin group chat
  as a plain Python loop
- Update READMEs to reflect new file names and group chat sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright type errors

* Address PR review comments on functional workflow API

1. Allow request_info inside @step: Auto-inject RunContext into step
   functions that declare a RunContext parameter (by type or name 'ctx'),
   and expose get_run_context() for programmatic access.

2. Handle None responses: Log a warning when a response value is None,
   and document the behavior in request_info docstring.

3. Add executor_bypassed event type: Replace executor_invoked +
   executor_completed with a single executor_bypassed event when a step
   replays from cache, making cached vs live execution explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add regression tests for PR review comments on functional workflow API

The three review comments (request_info in @step, None response handling,
executor_bypassed event type) were already addressed in 7da7db4e. This
commit adds cross-cutting regression tests that exercise the interactions
between these features:

- HITL in step with caching: preceding step bypassed on resume
- Full checkpoint lifecycle with HITL step (interrupt -> resume -> restore)
- None response inside step-level request_info logs warning
- WorkflowInterrupted from step does not emit executor_failed

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR #4238 review comments on functional workflow API

Comment 1 (request_info in @step): Already supported. Added comment in
StepWrapper.__call__ explaining why WorkflowInterrupted (BaseException)
safely bypasses the except Exception handler.

Comment 2 (None response): Added docstring to _get_response clarifying
the (found, value) return tuple semantics and None handling.

Comment 3 (bypass event type): executor_bypassed is already a dedicated
event type in WorkflowEventType. Updated comment at the bypass site to
make the deliberate event type choice explicit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add experimental API warnings to functional workflow module

Mark all public classes and decorators (workflow, step, RunContext,
FunctionalWorkflow, StepWrapper, FunctionalWorkflowAgent) as
experimental and subject to change or removal.

* Address PR #4238 review comments from @eavanvalkenburg

- RunContext docstring leads with purpose (opt-in handle for HITL,
  custom events, state) so readers importing it from the public surface
  understand its role before the mechanics (#2993513452).
- Rename `06_first_functional_workflow.py` to
  `06_functional_workflow_basics.py`; the previous filename was
  confusing since it followed `05_functional_workflow_with_agents.py`
  (#2993531979).
- Simplify `05_functional_workflow_with_agents.py` to call agents
  directly without a @step wrapper; the step-vs-no-step contrast lives
  in `03-workflows/functional/agent_integration.py`, keeping the
  get-started sample minimal (#2993525532).
- Switch functional samples to `FoundryChatClient` for consistency with
  the rest of 01-get-started and 03-workflows (follow-up on #2876988570).
- Use walrus in `hitl_review.py` final-state assertion (#2993572182).
- Add expected-output block to `basic_streaming_pipeline.py` (#2993557609).
- Clarify in `parallel_pipeline.py` that `@step` composes with
  `asyncio.gather` (#2993597282).
- `naive_group_chat.py` threads `list[Message]` between turns instead
  of stringifying the transcript, preserving role/authorship (#2993583231).

Drive-by: pre-commit hook sorts an unrelated import block in
`samples/04-hosting/foundry-hosted-agents/responses/02_local_tools/main.py`.

* Fix 10 functional-workflow API bugs from /ultrareview pass

- bug_001: `ctx.request_info()` without an explicit `request_id` now derives
  a deterministic `auto::<index>` id from the call-counter, so HITL resume
  works correctly on the documented default path.  A uuid was regenerated on
  every replay, making resume impossible.

- bug_002: `StepWrapper.__call__` no longer deepcopies arguments on the
  cache-hit replay branch.  The copy is only performed on the live-execution
  path (for the event log) and falls back to the original mapping if deepcopy
  fails, so steps whose args aren't deepcopyable (locks, sockets, sessions)
  can still resume from checkpoint.

- bug_007: `_set_responses` now prunes each resolved `request_id` from
  `_pending_requests`, and the cache-hit branch in `request_info` does the
  same.  Previously, answered requests were re-serialized into every
  subsequent checkpoint and the final checkpoint falsely claimed pending
  requests even after the workflow completed.

- bug_008: `_compute_signature_hash` now mixes the function's `co_code` and
  `co_names` into the checkpoint signature, so changes to the workflow body
  invalidate older checkpoints even when steps are accessed via module /
  class attributes (which `_discover_step_names` can't see statically).
  `RunContext._record_observed_step` records observed step names for
  diagnostics.

- bug_010: `FunctionalWorkflow.run()` docstring corrected — says "at least
  one of message/responses/checkpoint_id" and explicitly notes `responses`
  may be combined with `checkpoint_id` (the validator already allowed this).

- bug_013: `FunctionalWorkflowAgent` now surfaces `request_info` events as
  `FunctionApprovalRequestContent` items (mirroring graph `WorkflowAgent`),
  threads `responses=` and `checkpoint_id=` through to the underlying
  workflow, and exposes `pending_requests`.  Previously `.as_agent()`
  returned empty `AgentResponse` for HITL workflows — effectively unusable.

- bug_014: `FunctionalWorkflow` now clears `_last_message`,
  `_last_step_cache`, and `_last_pending_request_ids` on clean completion.
  `run()` validates that `responses=` keys intersect the currently-pending
  request set (or raises with a clear error) instead of silently replaying
  against stale singleton state from a prior run.

- bug_015: `FunctionalWorkflow.as_agent` signature now matches graph
  `Workflow.as_agent`: accepts `name`, `description`, `context_providers`,
  and `**kwargs`.  `FunctionalWorkflowAgent` stores the overrides.

- bug_017: `RunContext.set_state` raises `ValueError` for underscore-
  prefixed keys (the framework's `_step_cache` / `_original_message` keys
  would silently clobber user state on checkpoint save and user
  underscore-prefixed state was dropped on restore).  Docstring documents
  the reserved prefix.

- merged_bug_003: Workflow function arity is validated at decoration time.
  Multiple non-ctx parameters raise `ValueError` immediately (previously
  every arg past the first was silently dropped at call time).  Passing a
  non-None `message` to a ctx-only workflow raises `ValueError` instead of
  silently discarding the message.

Test coverage: +18 regression tests covering every fix.  Full workflow
suite now 766 passed, 1 skipped, 2 xfailed; full core suite 2338 passed.

* Deslop functional.py fix commit

- Remove dead instrumentation added in the prior commit that was never
  consumed: `RunContext._observed_step_names`,
  `RunContext._record_observed_step`, `FunctionalWorkflow._runtime_step_names`,
  and `FunctionalWorkflowAgent._extra_kwargs`.  The signature hash relies on
  `co_code` alone, which covers the attribute-access case without the
  collection-scaffolding.
- Trim over-explanatory comments that restated what the code does or what
  it no longer does.  Keep only the comments that answer "why" for the
  non-obvious bits (deterministic id contract, defensive deepcopy, stale
  replay guard).
- Compress the `_compute_signature_hash` and FunctionalWorkflow `__init__`
  block docstrings without losing the user-facing reasoning.

Net -49 lines.  Regression lock preserved (766 passed, 1 skipped, 2 xfailed).

* Fix functional workflow review feedback

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
Evan Mattson
2026-04-24 18:41:20 +09:00
committed by GitHub
Unverified
parent 62e02da698
commit da32e8cf80
21 changed files with 3968 additions and 15 deletions
+14
View File
@@ -30,6 +30,20 @@ Once comfortable with these, explore the rest of the samples below.
## Samples Overview (by directory)
### functional
Write workflows as plain Python async functions — no graph concepts, no executor classes, no edges. Use native control flow (`if`/`else`, loops, `asyncio.gather`) for branching and parallelism.
| Sample | File | Concepts |
|---|---|---|
| Basic Pipeline | [functional/basic_pipeline.py](./functional/basic_pipeline.py) | Sequential steps as plain async functions |
| Basic Streaming Pipeline | [functional/basic_streaming_pipeline.py](./functional/basic_streaming_pipeline.py) | Stream workflow events in real time with `run(stream=True)` |
| Parallel Pipeline | [functional/parallel_pipeline.py](./functional/parallel_pipeline.py) | Fan-out/fan-in with `asyncio.gather` |
| Steps and Checkpointing | [functional/steps_and_checkpointing.py](./functional/steps_and_checkpointing.py) | `@step` decorator for per-step checkpointing and observability |
| Human-in-the-Loop Review | [functional/hitl_review.py](./functional/hitl_review.py) | HITL with `ctx.request_info()` and replay |
| Agent Integration | [functional/agent_integration.py](./functional/agent_integration.py) | Calling agents inside workflow steps |
| Naive Group Chat | [functional/naive_group_chat.py](./functional/naive_group_chat.py) | Simple round-robin group chat as a plain loop |
### agents
| Sample | File | Concepts |
@@ -0,0 +1,107 @@
# Copyright (c) Microsoft. All rights reserved.
"""Calling agents inside functional workflows.
Agent calls work inside @workflow as plain function calls — no decorator needed.
Just call the agent and use the result.
If you want per-step caching (so agent calls don't re-execute on HITL resume
or crash recovery), add @step. Since each agent call hits an LLM API (time +
money), @step is often worth it. But it's always opt-in.
This sample shows both approaches side-by-side so you can see the difference.
"""
import asyncio
from agent_framework import Agent, step, workflow
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# ---------------------------------------------------------------------------
# Create agents
# ---------------------------------------------------------------------------
client = FoundryChatClient(credential=AzureCliCredential())
classifier_agent = Agent(
name="ClassifierAgent",
instructions=(
"Classify documents into one category: Technical, Legal, Marketing, or Scientific. "
"Reply with only the category name."
),
client=client,
)
writer_agent = Agent(
name="WriterAgent",
instructions="Summarize the given content in one sentence.",
client=client,
)
reviewer_agent = Agent(
name="ReviewerAgent",
instructions="Review the given summary in one sentence. Is it accurate and complete?",
client=client,
)
# ---------------------------------------------------------------------------
# Simplest approach: call agents directly inside the workflow.
# No @step, no wrappers — just plain function calls.
# ---------------------------------------------------------------------------
@workflow
async def simple_pipeline(document: str) -> str:
"""Process a document — agents called inline, no @step."""
classification = (await classifier_agent.run(f"Classify this document: {document}")).text
summary = (await writer_agent.run(f"Summarize: {document}")).text
review = (await reviewer_agent.run(f"Review this summary: {summary}")).text
return f"Classification: {classification}\nSummary: {summary}\nReview: {review}"
# ---------------------------------------------------------------------------
# With @step: agent results are cached. On HITL resume or checkpoint
# recovery, completed steps return their saved result instead of calling
# the LLM again. Worth it for expensive operations.
# ---------------------------------------------------------------------------
@step
async def classify_document(doc: str) -> str:
return (await classifier_agent.run(f"Classify this document: {doc}")).text
@step
async def generate_summary(doc: str) -> str:
return (await writer_agent.run(f"Summarize: {doc}")).text
@step
async def review_summary(summary: str) -> str:
return (await reviewer_agent.run(f"Review this summary: {summary}")).text
@workflow
async def cached_pipeline(document: str) -> str:
"""Same pipeline, but @step caches each agent call."""
classification = await classify_document(document)
summary = await generate_summary(document)
review = await review_summary(summary)
return f"Classification: {classification}\nSummary: {summary}\nReview: {review}"
async def main():
# Simple version — agents called inline
result = await simple_pipeline.run("This is a technical document about machine learning...")
print(result.get_outputs()[0])
# Cached version — same result, but steps won't re-execute on resume
result = await cached_pipeline.run("This is a technical document about machine learning...")
print(f"\nCached: {result.get_outputs()[0]}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,58 @@
# Copyright (c) Microsoft. All rights reserved.
"""Basic sequential pipeline using the functional workflow API.
The simplest possible workflow: plain async functions orchestrated by @workflow.
No @step decorator needed — just write Python.
"""
import asyncio
from agent_framework import workflow
# These are plain async functions — no decorators needed.
# They run normally inside the workflow, just like any other Python function.
async def fetch_data(url: str) -> dict[str, str | int]:
"""Simulate fetching data from a URL."""
return {"url": url, "content": f"Data from {url}", "status": 200}
async def transform_data(data: dict[str, str | int]) -> str:
"""Transform raw data into a summary string."""
return f"[{data['status']}] {data['content']}"
# @workflow turns this async function into a FunctionalWorkflow object.
# Without it, this is just a normal async function. With it, you get:
# - .run() that returns a WorkflowRunResult with events and outputs
# - .run(stream=True) for streaming events in real time
# - .as_agent() to use this workflow anywhere an agent is expected
#
# The function's first parameter receives the input from .run("...").
# Add a `ctx: RunContext` parameter only if you need HITL, state, or custom events.
@workflow
async def data_pipeline(url: str) -> str:
"""A simple sequential data pipeline."""
raw = await fetch_data(url)
summary = await transform_data(raw)
# This is just a function — plain Python works between calls.
# No need to wrap every operation in a separate async function.
is_valid = len(summary) > 0 and "[200]" in summary
tag = "VALID" if is_valid else "INVALID"
# Returning a value automatically emits it as an output.
# Callers retrieve it via result.get_outputs().
return f"[{tag}] {summary}"
async def main():
# .run() is provided by @workflow — a plain async function wouldn't have it
result = await data_pipeline.run("https://example.com/api/data")
print("Output:", result.get_outputs()[0])
print("State:", result.get_final_state())
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,63 @@
# Copyright (c) Microsoft. All rights reserved.
"""Basic streaming pipeline using the functional workflow API.
Stream workflow events in real time with run(stream=True).
"""
import asyncio
from agent_framework import workflow
# Plain async functions — no decorators needed for simple helpers.
async def fetch_data(url: str) -> dict[str, str | int]:
"""Simulate fetching data from a URL."""
return {"url": url, "content": f"Data from {url}", "status": 200}
async def transform_data(data: dict[str, str | int]) -> str:
"""Transform raw data into a summary string."""
return f"[{data['status']}] {data['content']}"
async def validate_result(summary: str) -> bool:
"""Validate the transformed result."""
return len(summary) > 0 and "[200]" in summary
# @workflow enables .run(stream=True), which returns a ResponseStream
# you can iterate over with `async for`. Without @workflow, you'd just
# have a normal async function with no streaming capability.
@workflow
async def data_pipeline(url: str) -> str:
"""A simple sequential data pipeline."""
raw = await fetch_data(url)
summary = await transform_data(raw)
is_valid = await validate_result(summary)
return f"{summary} (valid={is_valid})"
async def main():
# run(stream=True) returns a ResponseStream that yields events as they
# are produced. The raw stream includes lifecycle events (started, status)
# alongside application events — filter by event.type to find what you need.
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
"""
Expected output:
Output: [200] Data from https://example.com/api/data (valid=True)
Final state: WorkflowRunState.IDLE
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,84 @@
# Copyright (c) Microsoft. All rights reserved.
"""Human-in-the-loop review pipeline using functional workflows.
Demonstrates ctx.request_info() for pausing the workflow to wait for
external input and resuming with run(responses={...}).
HITL works with or without @step. The difference is what happens on resume:
- Without @step: every function re-executes from the top (fine for cheap calls).
- With @step: completed functions return their saved result instantly.
This sample uses @step on write_draft() because it simulates an expensive
operation that shouldn't re-run just because the workflow was paused.
"""
import asyncio
from agent_framework import RunContext, WorkflowRunState, step, workflow
# @step saves the result. When the workflow resumes after the HITL pause,
# this returns its saved result instead of running the expensive operation again.
#
# In a real workflow you might call an agent here instead:
# @step
# async def write_draft(topic: str) -> str:
# return (await writer_agent.run(f"Write a draft about: {topic}")).text
@step
async def write_draft(topic: str) -> str:
"""Simulate writing a draft — expensive, shouldn't re-run on resume."""
print(f" write_draft executing for '{topic}'")
return f"Draft document about '{topic}': Lorem ipsum dolor sit amet..."
@step
async def revise_draft(draft: str, feedback: str) -> str:
"""Revise the draft based on feedback."""
return f"Revised: {draft[:50]}... [Applied feedback: {feedback}]"
@workflow
async def review_pipeline(topic: str, ctx: RunContext) -> str:
"""Write a draft, get human review, then revise."""
draft = await write_draft(topic)
# ctx.request_info() suspends the workflow here. The caller gets back
# a WorkflowRunResult with state IDLE_WITH_PENDING_REQUESTS and can
# inspect the pending request via result.get_request_info_events().
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
# This only executes after the caller resumes with run(responses={...}).
# write_draft above returns its saved result (thanks to @step),
# request_info returns the provided response, and we continue here.
return await revise_draft(draft, feedback)
async def main():
# Phase 1: Run until the workflow pauses for human input
print("=== Phase 1: Initial run ===")
result1 = await review_pipeline.run("AI Safety")
# If request_info() was reached, the state is IDLE_WITH_PENDING_REQUESTS.
# If the workflow completed without hitting request_info(), it would be IDLE.
print(f"State: {(final_state := result1.get_final_state())}")
assert final_state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(f"Pending request: {requests[0].request_id}")
# Phase 2: Resume with the human's response
print("\n=== Phase 2: Resume with feedback ===")
print("(write_draft should NOT execute again — saved by @step)")
result2 = await review_pipeline.run(responses={"review_request": "Add more details about alignment research"})
print(f"State: {result2.get_final_state()}")
print(f"Output: {result2.get_outputs()[0]}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,82 @@
# Copyright (c) Microsoft. All rights reserved.
"""Naive group chat using the functional workflow API.
A simple round-robin group chat where agents take turns responding.
Because it's just a function, you control the loop, the turn order,
and the termination condition with plain Python — no framework abstractions.
Compare this with the graph-based GroupChat orchestration to see how the
functional API lets you start simple and add complexity only when needed.
"""
import asyncio
from agent_framework import Agent, Message, workflow
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# ---------------------------------------------------------------------------
# Create agents
# ---------------------------------------------------------------------------
client = FoundryChatClient(credential=AzureCliCredential())
expert = Agent(
name="PythonExpert",
instructions=(
"You are a Python expert in a group discussion. "
"Answer questions about Python and refine your answer based on feedback. "
"Keep responses concise (2-3 sentences)."
),
client=client,
)
critic = Agent(
name="Critic",
instructions=(
"You are a constructive critic in a group discussion. "
"Point out edge cases, gotchas, or missing nuances in the previous answer. "
"If the answer is solid, say so briefly."
),
client=client,
)
summarizer = Agent(
name="Summarizer",
instructions=(
"You are a summarizer in a group discussion. "
"After the discussion, provide a final concise summary that incorporates "
"the expert's answer and the critic's feedback. Keep it to 2-3 sentences."
),
client=client,
)
# ---------------------------------------------------------------------------
# A naive group chat is just a loop — no special framework needed
# ---------------------------------------------------------------------------
@workflow
async def group_chat(question: str) -> str:
"""Round-robin group chat: expert answers, critic reviews, summarizer wraps up."""
participants = [expert, critic, summarizer]
# Passing list[Message] keeps roles/authorship intact between turns,
# instead of stringifying everything into a single prompt.
conversation: list[Message] = [Message("user", [question])]
# Simple round-robin: each agent sees the full conversation so far
for agent in participants:
response = await agent.run(conversation)
conversation.extend(response.messages)
return "\n\n".join(f"{m.author_name or m.role}: {m.text}" for m in conversation)
async def main():
result = await group_chat.run("What's the difference between a list and a tuple in Python?")
print(result.get_outputs()[0])
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,66 @@
# Copyright (c) Microsoft. All rights reserved.
"""Parallel pipeline using asyncio.gather with functional workflows.
Fan-out/fan-in uses native Python concurrency via asyncio.gather.
No @step needed — still just plain async functions.
"""
import asyncio
from agent_framework import workflow
# Plain async functions — asyncio.gather handles the concurrency,
# no framework primitives needed for parallelism.
async def research_web(topic: str) -> str:
"""Simulate web research."""
await asyncio.sleep(0.05)
return f"Web results for '{topic}': 10 articles found"
async def research_papers(topic: str) -> str:
"""Simulate academic paper search."""
await asyncio.sleep(0.05)
return f"Papers on '{topic}': 3 relevant papers"
async def research_news(topic: str) -> str:
"""Simulate news search."""
await asyncio.sleep(0.05)
return f"News about '{topic}': 5 recent articles"
async def synthesize(sources: list[str]) -> str:
"""Combine research results into a summary."""
return "Research Summary:\n" + "\n".join(f" - {s}" for s in sources)
# @workflow wraps the orchestration logic so you get .run(), streaming,
# and events. The functions it calls are plain Python — no decorators
# needed just because they're inside a workflow.
@workflow
async def research_pipeline(topic: str) -> str:
"""Fan-out to three research tasks, then synthesize results."""
# asyncio.gather runs all three concurrently — this is standard Python,
# not a framework concept. Use it the same way you would anywhere else.
#
# Tip: if any of these were wrapped with @step (e.g. an expensive agent call),
# the pattern is identical — @step composes with asyncio.gather, so each
# branch is independently cached on HITL resume or checkpoint restore.
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
async def main():
result = await research_pipeline.run("AI agents")
print(result.get_outputs()[0])
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,97 @@
# Copyright (c) Microsoft. All rights reserved.
"""Introducing @step: per-step checkpointing and observability.
The previous samples used plain functions — and that works. Workflows support
HITL (ctx.request_info) and checkpointing regardless of whether you use @step.
The difference: without @step, a resumed workflow re-executes every function
call from the top. That's fine for cheap functions. But for expensive operations
(API calls, agent runs, etc.) you don't want to pay that cost again.
@step saves each function's result so it skips re-execution on resume:
- On HITL resume, completed steps return their saved result instantly.
- On crash recovery from a checkpoint, earlier step results are restored.
- Each step emits executor_invoked/executor_completed events for observability.
@step is opt-in. Plain functions still work alongside @step in the same workflow.
"""
import asyncio
from agent_framework import InMemoryCheckpointStorage, step, workflow
# Track call counts to show which functions actually execute on resume
fetch_calls = 0
transform_calls = 0
# @step saves this function's result. On resume, it returns the saved
# result instead of re-executing — useful because this is expensive.
@step
async def fetch_data(url: str) -> dict[str, str | int]:
"""Expensive operation — @step prevents re-execution on resume."""
global fetch_calls
fetch_calls += 1
print(f" fetch_data called (call #{fetch_calls})")
return {"url": url, "content": f"Data from {url}", "status": 200}
@step
async def transform_data(data: dict[str, str | int]) -> str:
"""Another expensive operation — @step saves the result."""
global transform_calls
transform_calls += 1
print(f" transform_data called (call #{transform_calls})")
return f"[{data['status']}] {data['content']}"
# No @step — this is cheap, so it just re-runs on resume. That's fine.
async def validate_result(summary: str) -> bool:
"""Cheap validation — no @step needed."""
return len(summary) > 0 and "[200]" in summary
storage = InMemoryCheckpointStorage()
# checkpoint_storage tells @workflow where to persist step results.
# Each @step saves a checkpoint after it completes.
@workflow(checkpoint_storage=storage)
async def data_pipeline(url: str) -> str:
"""Mix of @step functions and plain functions."""
raw = await fetch_data(url)
summary = await transform_data(raw)
is_valid = await validate_result(summary)
return f"{summary} (valid={is_valid})"
async def main():
# --- Run 1: Everything executes normally ---
print("=== Run 1: Fresh execution ===")
result = await data_pipeline.run("https://example.com/api/data")
print(f"Output: {result.get_outputs()[0]}")
print(f"fetch_calls={fetch_calls}, transform_calls={transform_calls}")
# @step functions emit executor events; plain functions don't.
print("\nEvents:")
for event in result:
if event.type in ("executor_invoked", "executor_completed"):
print(f" {event.type}: {event.executor_id}")
# --- Run 2: Restore from checkpoint ---
# The workflow re-executes, but @step functions return saved results.
# Only validate_result() (no @step) actually runs again.
print("\n=== Run 2: Restored from checkpoint ===")
latest = await storage.get_latest(workflow_name="data_pipeline")
assert latest is not None
result2 = await data_pipeline.run(checkpoint_id=latest.checkpoint_id)
print(f"Output: {result2.get_outputs()[0]}")
print(f"fetch_calls={fetch_calls}, transform_calls={transform_calls}")
print("(call counts unchanged — @step results were restored from checkpoint)")
if __name__ == "__main__":
asyncio.run(main())