Files
agent-framework/python/samples/03-workflows/control-flow/sequential_streaming.py
T
Copilot b05fc9e849 Python: Add load_dotenv() to samples for .env file support (#4043)
* Initial plan

* Add load_dotenv() to 303 Python samples for environment variable loading

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Update SAMPLE_GUIDELINES.md to document load_dotenv() requirement

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Update samples README.md to document .env file usage

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Run ruff format on all changed sample files

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Clarify load_dotenv() usage in README - local dev vs production

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove deprecated getting_started folder as requested

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Document env_file_path parameter for per-client configuration

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Merge main branch to resolve conflicts

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix run_evaluation.py file that was empty in merge commit

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove dotnet changes from merge - out of scope for this PR

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove package and test changes from merge - only sample changes needed

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove test_func_utils.py - only sample changes needed

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Revert sample files not in original changeset - keep only load_dotenv additions

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Move load_dotenv() outside snippet tag in 06_host_your_agent.py

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix comment placement - move load_dotenv before code comments

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix load_dotenv() placement across all samples - after docstring, before code comments

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Merge latest main branch with load_dotenv changes

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove non-sample changes from merge - keep only load_dotenv additions

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Revert non-load_dotenv sample changes from merge

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix run_evaluation.py - use main's improved version (file already had load_dotenv)

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Manual update

* Manual update 2

* Fix Role usage and load_dotenv placement per PR review feedback

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix Role usage - use string literals not enum attributes

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix SAMPLE_GUIDELINES.md example - load_dotenv before docstring per guidance

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Move load_dotenv() before docstrings in all samples per SAMPLE_GUIDELINES ordering

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Address PR review: rename files, fix placement, add session usage, remove note

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Update Redis README to reference renamed file redis_history_provider.py

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>
Co-authored-by: Tao Chen <taochen@microsoft.com>
Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>
Co-authored-by: Eduard van Valkenburg <eavanvalkenburg@users.noreply.github.com>
2026-02-19 10:55:13 +00:00

81 lines
3.1 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import WorkflowBuilder, WorkflowContext, executor
from typing_extensions import Never
"""
Sample: Foundational sequential workflow with streaming using function-style executors.
Two lightweight steps run in order. The first converts text to uppercase.
The second reverses the text and yields the workflow output. Events are printed as they arrive from a streaming run.
Purpose:
Show how to declare executors with the @executor decorator, connect them with WorkflowBuilder,
pass intermediate values using ctx.send_message, and yield final output using ctx.yield_output().
Demonstrate how streaming exposes executor_invoked events (type='executor_invoked') and
executor_completed events (type='executor_completed') for observability.
Prerequisites:
- No external services required.
"""
# Step 1: Define methods using the executor decorator.
@executor(id="upper_case_executor")
async def to_upper_case(text: str, ctx: WorkflowContext[str]) -> None:
"""Transform the input to uppercase and forward it to the next step.
Concepts:
- The @executor decorator registers this function as a workflow node.
- WorkflowContext[str] indicates that this node emits a string payload downstream.
"""
result = text.upper()
# Send the intermediate result to the next executor in the workflow graph.
await ctx.send_message(result)
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Reverse the input and yield the workflow output.
Concepts:
- Terminal nodes yield output using ctx.yield_output().
- The workflow completes when it becomes idle (no more work to do).
"""
result = text[::-1]
# Yield the final output for this workflow run.
await ctx.yield_output(result)
async def main():
"""Build a two-step sequential workflow and run it with streaming to observe events."""
# Step 1: Build the workflow with the defined edges.
# Order matters. upper_case_executor runs first, then reverse_text_executor.
workflow = WorkflowBuilder(start_executor=to_upper_case).add_edge(to_upper_case, reverse_text).build()
# Step 2: Run the workflow and stream events in real time.
async for event in workflow.run("hello world", stream=True):
# You will see executor invoke and completion events as the workflow progresses.
print(f"Event: {event}")
if event.type == "output":
print(f"Workflow completed with result: {event.data}")
"""
Sample Output:
Event: executor_invoked event (type='executor_invoked', executor_id=upper_case_executor)
Event: executor_completed event (type='executor_completed', executor_id=upper_case_executor)
Event: executor_invoked event (type='executor_invoked', executor_id=reverse_text_executor)
Event: executor_completed event (type='executor_completed', executor_id=reverse_text_executor)
Event: output event (type='output', data='DLROW OLLEH', executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH
"""
if __name__ == "__main__":
asyncio.run(main())