diff --git a/python/packages/main/agent_framework/_workflow/_executor.py b/python/packages/main/agent_framework/_workflow/_executor.py index a9a77c3c95..161adedf4b 100644 --- a/python/packages/main/agent_framework/_workflow/_executor.py +++ b/python/packages/main/agent_framework/_workflow/_executor.py @@ -1621,7 +1621,7 @@ class AgentExecutor(Executor): async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse]) -> None: """Execute the underlying agent, emit events, and enqueue response. - Terminal detection & WorkflowCompletedEvent emission are handled centrally in Runner. + Terminal detection is handled centrally in Runner. This method only produces AgentRunEvent/AgentRunUpdateEvent plus enqueues an AgentExecutorResponse message for routing. """ diff --git a/python/packages/main/agent_framework/_workflow/_runner.py b/python/packages/main/agent_framework/_workflow/_runner.py index 71eaa0452b..844ca95618 100644 --- a/python/packages/main/agent_framework/_workflow/_runner.py +++ b/python/packages/main/agent_framework/_workflow/_runner.py @@ -288,7 +288,7 @@ class Runner: tasks = [_deliver_message_inner(edge_runner, message) for edge_runner in associated_edge_runners] if not tasks: # No outgoing edges. If this is an AgentExecutorResponse, treat it as an - # intentional terminal emission and emit a WorkflowCompletedEvent here. + # intentional terminal emission and emit a WorkflowOutputEvent here. # (Previously this relied on the executor to emit, but AgentExecutor only # sends an AgentExecutorResponse message; centralized completion keeps the # contract consistent with other executors.) diff --git a/python/samples/getting_started/observability/04-workflow.py b/python/samples/getting_started/observability/04-workflow.py index ca2f23685e..027812aafd 100644 --- a/python/samples/getting_started/observability/04-workflow.py +++ b/python/samples/getting_started/observability/04-workflow.py @@ -6,9 +6,8 @@ from typing import Any from agent_framework import ( Executor, WorkflowBuilder, - WorkflowCompletedEvent, - WorkflowOutputEvent, WorkflowContext, + WorkflowOutputEvent, handler, ) from agent_framework.observability import get_tracer, setup_observability @@ -49,9 +48,8 @@ class ReverseTextExecutor(Executor): result = text[::-1] print(f"ReverseTextExecutor: Result '{result}'") - # Yield the output and signal workflow completion. + # Yield the output. await ctx.yield_output(result) - await ctx.add_event(WorkflowCompletedEvent()) async def run_sequential_workflow() -> None: diff --git a/python/samples/getting_started/workflow/README.md b/python/samples/getting_started/workflow/README.md index e34f146643..2fdb3f2389 100644 --- a/python/samples/getting_started/workflow/README.md +++ b/python/samples/getting_started/workflow/README.md @@ -113,7 +113,7 @@ Notes Sequential orchestration uses a few small adapter nodes for plumbing: - "input-conversation" normalizes input to `list[ChatMessage]` - "to-conversation:" converts agent responses into the shared conversation -- "complete" publishes the final `WorkflowCompletedEvent` +- "complete" publishes the final `WorkflowOutputEvent` These may appear in event streams (ExecutorInvoke/Completed). They’re analogous to concurrent’s dispatcher and aggregator and can be ignored if you only care about agent activity. diff --git a/python/samples/getting_started/workflow/checkpoint/checkpoint_with_resume.py b/python/samples/getting_started/workflow/checkpoint/checkpoint_with_resume.py index ab5f5d4053..db441dcb6c 100644 --- a/python/samples/getting_started/workflow/checkpoint/checkpoint_with_resume.py +++ b/python/samples/getting_started/workflow/checkpoint/checkpoint_with_resume.py @@ -39,7 +39,7 @@ Pipeline: 2) ReverseTextExecutor reverses the string. 3) SubmitToLowerAgent prepares an AgentExecutorRequest for the lowercasing agent. 4) lower_agent (AgentExecutor) converts text to lowercase via Azure OpenAI. -5) FinalizeFromAgent emits a WorkflowCompletedEvent with the final result. +5) FinalizeFromAgent yields the final result. What you learn: - How to persist executor state using ctx.get_state and ctx.set_state. diff --git a/python/samples/getting_started/workflow/orchestration/concurrent_agents.py b/python/samples/getting_started/workflow/orchestration/concurrent_agents.py index 13f167582b..bee9ef3029 100644 --- a/python/samples/getting_started/workflow/orchestration/concurrent_agents.py +++ b/python/samples/getting_started/workflow/orchestration/concurrent_agents.py @@ -22,7 +22,7 @@ Demonstrates: Prerequisites: - Azure OpenAI access configured for AzureChatClient (use az login + env vars) -- Familiarity with Workflow events (AgentRunEvent, WorkflowCompletedEvent) +- Familiarity with Workflow events (AgentRunEvent, WorkflowOutputEvent) """ diff --git a/python/samples/getting_started/workflow/parallelism/aggregate_results_of_different_types.py b/python/samples/getting_started/workflow/parallelism/aggregate_results_of_different_types.py index 16628fb54f..f718397cbd 100644 --- a/python/samples/getting_started/workflow/parallelism/aggregate_results_of_different_types.py +++ b/python/samples/getting_started/workflow/parallelism/aggregate_results_of_different_types.py @@ -3,7 +3,8 @@ import asyncio import random -from agent_framework import Executor, WorkflowBuilder, WorkflowCompletedEvent, WorkflowContext, handler +from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler +from typing_extensions import Never """ Sample: Concurrent fan out and fan in with two different tasks that output results of different types. @@ -52,10 +53,10 @@ class Sum(Executor): class Aggregator(Executor): - """Aggregate the results from the different tasks and emit the `WorkflowCompletedEvent`.""" + """Aggregate the results from the different tasks and yield the final output.""" @handler - async def handle(self, results: list[int | float], ctx: WorkflowContext[None]): + async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]): """Receive the results from the source executors. The framework will automatically collect messages from the source executors @@ -65,9 +66,9 @@ class Aggregator(Executor): results (list[int | float]): execution results from upstream executors. The type annotation must be a list of union types that the upstream executors will produce. - cts (WorkflowContext[None]): A workflow context. + ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output. """ - await ctx.add_event(WorkflowCompletedEvent(data=results)) + await ctx.yield_output(results) async def main() -> None: @@ -87,13 +88,13 @@ async def main() -> None: ) # 3) Run the workflow - completion: WorkflowCompletedEvent | None = None + output: list[int | float] | None = None async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]): - if isinstance(event, WorkflowCompletedEvent): - completion = event + if isinstance(event, WorkflowOutputEvent): + output = event.data - if completion is not None: - print(completion.data) + if output is not None: + print(output) if __name__ == "__main__":