Python: Clean left-over WorkflowCompletedEvent (#884)

* Clean left-over WorkflowCompletedEvent

* Improve comments

* Fix type check error
This commit is contained in:
Tao Chen
2025-09-23 17:52:51 -07:00
committed by GitHub
Unverified
parent 06827e9f1a
commit 36933de345
7 changed files with 18 additions and 19 deletions
@@ -1621,7 +1621,7 @@ class AgentExecutor(Executor):
async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
"""Execute the underlying agent, emit events, and enqueue response.
Terminal detection & WorkflowCompletedEvent emission are handled centrally in Runner.
Terminal detection is handled centrally in Runner.
This method only produces AgentRunEvent/AgentRunUpdateEvent plus enqueues an
AgentExecutorResponse message for routing.
"""
@@ -288,7 +288,7 @@ class Runner:
tasks = [_deliver_message_inner(edge_runner, message) for edge_runner in associated_edge_runners]
if not tasks:
# No outgoing edges. If this is an AgentExecutorResponse, treat it as an
# intentional terminal emission and emit a WorkflowCompletedEvent here.
# intentional terminal emission and emit a WorkflowOutputEvent here.
# (Previously this relied on the executor to emit, but AgentExecutor only
# sends an AgentExecutorResponse message; centralized completion keeps the
# contract consistent with other executors.)
@@ -6,9 +6,8 @@ from typing import Any
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowCompletedEvent,
WorkflowOutputEvent,
WorkflowContext,
WorkflowOutputEvent,
handler,
)
from agent_framework.observability import get_tracer, setup_observability
@@ -49,9 +48,8 @@ class ReverseTextExecutor(Executor):
result = text[::-1]
print(f"ReverseTextExecutor: Result '{result}'")
# Yield the output and signal workflow completion.
# Yield the output.
await ctx.yield_output(result)
await ctx.add_event(WorkflowCompletedEvent())
async def run_sequential_workflow() -> None:
@@ -113,7 +113,7 @@ Notes
Sequential orchestration uses a few small adapter nodes for plumbing:
- "input-conversation" normalizes input to `list[ChatMessage]`
- "to-conversation:<participant>" converts agent responses into the shared conversation
- "complete" publishes the final `WorkflowCompletedEvent`
- "complete" publishes the final `WorkflowOutputEvent`
These may appear in event streams (ExecutorInvoke/Completed). Theyre analogous to
concurrents dispatcher and aggregator and can be ignored if you only care about agent activity.
@@ -39,7 +39,7 @@ Pipeline:
2) ReverseTextExecutor reverses the string.
3) SubmitToLowerAgent prepares an AgentExecutorRequest for the lowercasing agent.
4) lower_agent (AgentExecutor) converts text to lowercase via Azure OpenAI.
5) FinalizeFromAgent emits a WorkflowCompletedEvent with the final result.
5) FinalizeFromAgent yields the final result.
What you learn:
- How to persist executor state using ctx.get_state and ctx.set_state.
@@ -22,7 +22,7 @@ Demonstrates:
Prerequisites:
- Azure OpenAI access configured for AzureChatClient (use az login + env vars)
- Familiarity with Workflow events (AgentRunEvent, WorkflowCompletedEvent)
- Familiarity with Workflow events (AgentRunEvent, WorkflowOutputEvent)
"""
@@ -3,7 +3,8 @@
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowCompletedEvent, WorkflowContext, handler
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
"""
Sample: Concurrent fan out and fan in with two different tasks that output results of different types.
@@ -52,10 +53,10 @@ class Sum(Executor):
class Aggregator(Executor):
"""Aggregate the results from the different tasks and emit the `WorkflowCompletedEvent`."""
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[None]):
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
@@ -65,9 +66,9 @@ class Aggregator(Executor):
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
cts (WorkflowContext[None]): A workflow context.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.add_event(WorkflowCompletedEvent(data=results))
await ctx.yield_output(results)
async def main() -> None:
@@ -87,13 +88,13 @@ async def main() -> None:
)
# 3) Run the workflow
completion: WorkflowCompletedEvent | None = None
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if isinstance(event, WorkflowOutputEvent):
output = event.data
if completion is not None:
print(completion.data)
if output is not None:
print(output)
if __name__ == "__main__":