From 68b76e6726b3f65ff8b493861121d60f73ff3154 Mon Sep 17 00:00:00 2001 From: Evan Mattson <35585003+moonbox3@users.noreply.github.com> Date: Tue, 16 Sep 2025 10:36:25 +0900 Subject: [PATCH] Python: Workflow run state and structured error events, sample updates, tests (#725) * Add workflow event types to surface workflow state, status, and failures. * Address PR feedback * Updates to use new workflow run state enums --- .../main/agent_framework/workflow/__init__.py | 5 + .../agent_framework/workflow/__init__.pyi | 10 ++ .../agent_framework_workflow/__init__.py | 10 ++ .../agent_framework_workflow/_events.py | 116 +++++++++++++++ .../agent_framework_workflow/_executor.py | 18 ++- .../agent_framework_workflow/_runner.py | 11 +- .../agent_framework_workflow/_workflow.py | 93 +++++++++++- .../workflow/tests/test_workflow_states.py | 137 ++++++++++++++++++ .../_start-here/step1_executors_and_edges.py | 3 + .../_start-here/step2_agents_in_a_workflow.py | 2 + .../workflow/_start-here/step3_streaming.py | 37 ++++- .../guessing_game_with_human_input.py | 47 ++++-- 12 files changed, 463 insertions(+), 26 deletions(-) create mode 100644 python/packages/workflow/tests/test_workflow_states.py diff --git a/python/packages/main/agent_framework/workflow/__init__.py b/python/packages/main/agent_framework/workflow/__init__.py index cb8cd56fcf..b1b1e67070 100644 --- a/python/packages/main/agent_framework/workflow/__init__.py +++ b/python/packages/main/agent_framework/workflow/__init__.py @@ -40,6 +40,7 @@ _IMPORTS = [ "WorkflowAgent", "WorkflowViz", "FileCheckpointStorage", + "ExecutorFailedEvent", "InMemoryCheckpointStorage", "CheckpointStorage", "WorkflowCheckpoint", @@ -56,6 +57,10 @@ _IMPORTS = [ "PlanReviewRequest", "RequestInfoEvent", "StandardMagenticManager", + "WorkflowStatusEvent", + "WorkflowRunState", + "WorkflowErrorDetails", + "WorkflowFailedEvent", "ConcurrentBuilder", ] diff --git a/python/packages/main/agent_framework/workflow/__init__.pyi b/python/packages/main/agent_framework/workflow/__init__.pyi index ea88728525..9527ab7546 100644 --- a/python/packages/main/agent_framework/workflow/__init__.pyi +++ b/python/packages/main/agent_framework/workflow/__init__.pyi @@ -13,6 +13,7 @@ from agent_framework_workflow import ( Executor, ExecutorCompletedEvent, ExecutorEvent, + ExecutorFailedEvent, ExecutorInvokeEvent, FileCheckpointStorage, FunctionExecutor, @@ -38,10 +39,14 @@ from agent_framework_workflow import ( WorkflowCheckpoint, WorkflowCompletedEvent, WorkflowContext, + WorkflowErrorDetails, WorkflowEvent, WorkflowExecutor, + WorkflowFailedEvent, WorkflowRunResult, + WorkflowRunState, WorkflowStartedEvent, + WorkflowStatusEvent, WorkflowViz, __version__, executor, @@ -62,6 +67,7 @@ __all__ = [ "Executor", "ExecutorCompletedEvent", "ExecutorEvent", + "ExecutorFailedEvent", "ExecutorInvokeEvent", "FileCheckpointStorage", "FunctionExecutor", @@ -87,10 +93,14 @@ __all__ = [ "WorkflowCheckpoint", "WorkflowCompletedEvent", "WorkflowContext", + "WorkflowErrorDetails", "WorkflowEvent", "WorkflowExecutor", + "WorkflowFailedEvent", "WorkflowRunResult", + "WorkflowRunState", "WorkflowStartedEvent", + "WorkflowStatusEvent", "WorkflowViz", "__version__", "executor", diff --git a/python/packages/workflow/agent_framework_workflow/__init__.py b/python/packages/workflow/agent_framework_workflow/__init__.py index 5d3e1352fb..bf5081d754 100644 --- a/python/packages/workflow/agent_framework_workflow/__init__.py +++ b/python/packages/workflow/agent_framework_workflow/__init__.py @@ -19,11 +19,16 @@ from ._events import ( AgentRunUpdateEvent, ExecutorCompletedEvent, ExecutorEvent, + ExecutorFailedEvent, ExecutorInvokeEvent, RequestInfoEvent, WorkflowCompletedEvent, + WorkflowErrorDetails, WorkflowEvent, + WorkflowFailedEvent, + WorkflowRunState, WorkflowStartedEvent, + WorkflowStatusEvent, ) from ._executor import ( AgentExecutor, @@ -100,6 +105,7 @@ __all__ = [ "Executor", "ExecutorCompletedEvent", "ExecutorEvent", + "ExecutorFailedEvent", "ExecutorInvokeEvent", "FileCheckpointStorage", "FunctionExecutor", @@ -142,10 +148,14 @@ __all__ = [ "WorkflowCheckpoint", "WorkflowCompletedEvent", "WorkflowContext", + "WorkflowErrorDetails", "WorkflowEvent", "WorkflowExecutor", + "WorkflowFailedEvent", "WorkflowRunResult", + "WorkflowRunState", "WorkflowStartedEvent", + "WorkflowStatusEvent", "WorkflowValidationError", "WorkflowViz", "__version__", diff --git a/python/packages/workflow/agent_framework_workflow/_events.py b/python/packages/workflow/agent_framework_workflow/_events.py index 67075a617f..a04136eb0e 100644 --- a/python/packages/workflow/agent_framework_workflow/_events.py +++ b/python/packages/workflow/agent_framework_workflow/_events.py @@ -1,5 +1,8 @@ # Copyright (c) Microsoft. All rights reserved. +import traceback as _traceback +from dataclasses import dataclass +from enum import Enum from typing import TYPE_CHECKING, Any from agent_framework import AgentRunResponse, AgentRunResponseUpdate @@ -56,6 +59,108 @@ class WorkflowErrorEvent(WorkflowEvent): return f"{self.__class__.__name__}(exception={self.data})" +class WorkflowRunState(str, Enum): + """Run-level state of a workflow execution. + + Semantics: + - STARTED: Run has been initiated and the workflow context has been created. + This is an initial state before any meaningful work is performed. In this + codebase we emit a dedicated `WorkflowStartedEvent` for telemetry, and + typically advance the status directly to `IN_PROGRESS`. Consumers may + still rely on `STARTED` for state machines that need an explicit pre-work + phase. + + - IN_PROGRESS: The workflow is actively executing (e.g., the initial + message has been delivered to the start executor or a superstep is + running). This status is emitted at the beginning of a run and can be + followed by other statuses as the run progresses. + + - IN_PROGRESS_PENDING_REQUESTS: Active execution while one or more + request-for-information operations are outstanding. New work may still + be scheduled while requests are in flight. + + - IDLE: The workflow is quiescent with no outstanding requests, but has + not yet emitted a terminal result. Rare in practice but provided for + orchestration integrations that distinguish a quiescent state. + + - IDLE_WITH_PENDING_REQUESTS: The workflow is paused awaiting external + input (e.g., emitted a `RequestInfoEvent`). This is a non-terminal + state; the workflow can resume when responses are supplied. + + - COMPLETED: Normal terminal state indicating successful completion. + + - FAILED: Terminal state indicating an error surfaced. Accompanied by a + `WorkflowFailedEvent` with structured error details. + + - CANCELLED: Terminal state indicating the run was cancelled by a caller + or orchestrator. Not currently emitted by default runner paths but + included for integrators/orchestrators that support cancellation. + """ + + STARTED = "STARTED" # Explicit pre-work phase (rarely emitted as status; see note above) + IN_PROGRESS = "IN_PROGRESS" # Active execution is underway + IN_PROGRESS_PENDING_REQUESTS = "IN_PROGRESS_PENDING_REQUESTS" # Active execution with outstanding requests + IDLE = "IDLE" # No active work and no outstanding requests + IDLE_WITH_PENDING_REQUESTS = "IDLE_WITH_PENDING_REQUESTS" # Paused awaiting external responses + COMPLETED = "COMPLETED" # Finished successfully + FAILED = "FAILED" # Finished with an error + CANCELLED = "CANCELLED" # Finished due to cancellation + + +class WorkflowStatusEvent(WorkflowEvent): + """Event indicating a transition in the workflow run state.""" + + def __init__(self, state: WorkflowRunState, data: Any | None = None): + super().__init__(data) + self.state = state + + def __repr__(self) -> str: # pragma: no cover - representation only + return f"{self.__class__.__name__}(state={self.state}, data={self.data!r})" + + +@dataclass +class WorkflowErrorDetails: + """Structured error information to surface in error events/results.""" + + error_type: str + message: str + traceback: str | None = None + executor_id: str | None = None + extra: dict[str, Any] | None = None + + @classmethod + def from_exception( + cls, + exc: BaseException, + *, + executor_id: str | None = None, + extra: dict[str, Any] | None = None, + ) -> "WorkflowErrorDetails": + tb = None + try: + tb = "".join(_traceback.format_exception(type(exc), exc, exc.__traceback__)) + except Exception: + tb = None + return cls( + error_type=exc.__class__.__name__, + message=str(exc), + traceback=tb, + executor_id=executor_id, + extra=extra, + ) + + +class WorkflowFailedEvent(WorkflowEvent): + """Terminal failure event for a workflow run.""" + + def __init__(self, details: WorkflowErrorDetails, data: Any | None = None): + super().__init__(data) + self.details = details + + def __repr__(self) -> str: # pragma: no cover - representation only + return f"{self.__class__.__name__}(details={self.details}, data={self.data!r})" + + class RequestInfoEvent(WorkflowEvent): """Event triggered when a workflow executor requests external information.""" @@ -119,6 +224,17 @@ class ExecutorCompletedEvent(ExecutorEvent): return f"{self.__class__.__name__}(executor_id={self.executor_id})" +class ExecutorFailedEvent(ExecutorEvent): + """Event triggered when an executor handler raises an error.""" + + def __init__(self, executor_id: str, details: WorkflowErrorDetails): + super().__init__(executor_id, details) + self.details = details + + def __repr__(self) -> str: # pragma: no cover - representation only + return f"{self.__class__.__name__}(executor_id={self.executor_id}, details={self.details})" + + class AgentRunUpdateEvent(ExecutorEvent): """Event triggered when an agent is streaming messages.""" diff --git a/python/packages/workflow/agent_framework_workflow/_executor.py b/python/packages/workflow/agent_framework_workflow/_executor.py index 0a7135047f..f797b4d2a8 100644 --- a/python/packages/workflow/agent_framework_workflow/_executor.py +++ b/python/packages/workflow/agent_framework_workflow/_executor.py @@ -102,7 +102,14 @@ class Executor(AFBaseModel): if self._request_interceptors and message.__class__.__name__ == "SubWorkflowRequestInfo": # Directly handle SubWorkflowRequestInfo await context.add_event(ExecutorInvokeEvent(self.id)) - await self._handle_sub_workflow_request(message, context) + try: + await self._handle_sub_workflow_request(message, context) + except Exception as exc: + # Surface structured executor failure before propagating + from ._events import ExecutorFailedEvent, WorkflowErrorDetails + + await context.add_event(ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc))) + raise await context.add_event(ExecutorCompletedEvent(self.id)) return @@ -115,7 +122,14 @@ class Executor(AFBaseModel): if handler is None: raise RuntimeError(f"Executor {self.__class__.__name__} cannot handle message of type {type(message)}.") await context.add_event(ExecutorInvokeEvent(self.id)) - await handler(message, context) + try: + await handler(message, context) + except Exception as exc: + # Surface structured executor failure before propagating + from ._events import ExecutorFailedEvent, WorkflowErrorDetails + + await context.add_event(ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc))) + raise await context.add_event(ExecutorCompletedEvent(self.id)) def _discover_handlers(self) -> None: diff --git a/python/packages/workflow/agent_framework_workflow/_runner.py b/python/packages/workflow/agent_framework_workflow/_runner.py index bfdca5c27d..05e81619df 100644 --- a/python/packages/workflow/agent_framework_workflow/_runner.py +++ b/python/packages/workflow/agent_framework_workflow/_runner.py @@ -113,8 +113,15 @@ class Runner: # Periodically continue to let iteration advance continue - # Propagate errors from iteration - await iteration_task + # Propagate errors from iteration, but first surface any pending events + try: + await iteration_task + except Exception: + # Make sure failure-related events (like ExecutorFailedEvent) are surfaced + if await self._ctx.has_events(): + for event in await self._ctx.drain_events(): + yield event + raise self._iteration += 1 # Drain any straggler events emitted at tail end diff --git a/python/packages/workflow/agent_framework_workflow/_workflow.py b/python/packages/workflow/agent_framework_workflow/_workflow.py index 9232149a6a..25aad51e14 100644 --- a/python/packages/workflow/agent_framework_workflow/_workflow.py +++ b/python/packages/workflow/agent_framework_workflow/_workflow.py @@ -25,7 +25,16 @@ from ._edge import ( SwitchCaseEdgeGroupCase, SwitchCaseEdgeGroupDefault, ) -from ._events import RequestInfoEvent, WorkflowCompletedEvent, WorkflowEvent +from ._events import ( + RequestInfoEvent, + WorkflowCompletedEvent, + WorkflowErrorDetails, + WorkflowEvent, + WorkflowFailedEvent, + WorkflowRunState, + WorkflowStartedEvent, + WorkflowStatusEvent, +) from ._executor import AgentExecutor, Executor, RequestInfoExecutor from ._runner import Runner from ._runner_context import CheckpointState, InProcRunnerContext, RunnerContext @@ -43,7 +52,16 @@ logger = logging.getLogger(__name__) class WorkflowRunResult(list[WorkflowEvent]): - """A list of events generated during the workflow execution in non-streaming mode.""" + """A list of events generated during the workflow execution in non-streaming mode. + + Preserves the historical contract that the list contains data-plane events + only (executor invoke/complete, completed, requests), while exposing the + control-plane status timeline via accessors. + """ + + def __init__(self, events: list[WorkflowEvent], status_events: list[WorkflowStatusEvent] | None = None) -> None: + super().__init__(events) + self._status_events: list[WorkflowStatusEvent] = status_events or [] def get_completed_event(self) -> WorkflowCompletedEvent | None: """Get the completed event from the workflow run result. @@ -69,6 +87,23 @@ class WorkflowRunResult(list[WorkflowEvent]): """ return [event for event in self if isinstance(event, RequestInfoEvent)] + def get_final_state(self) -> WorkflowRunState: + """Return the final run state based on explicit status events. + + Returns the last WorkflowStatusEvent.state observed. Raises if none were emitted. + """ + if self._status_events: + return self._status_events[-1].state # type: ignore[return-value] + raise RuntimeError( + "Final state is unknown because no WorkflowStatusEvent was emitted. " + "Ensure your workflow entry points are used (which emit status events) " + "or handle the absence of status explicitly." + ) + + def status_timeline(self) -> list[WorkflowStatusEvent]: + """Return the list of status events emitted during the run (control-plane).""" + return list(self._status_events) + # region Workflow @@ -202,9 +237,15 @@ class Workflow(AFBaseModel): # Create workflow span that encompasses the entire execution with workflow_tracer.create_workflow_run_span(self): + saw_completed = False + saw_request = False + emitted_in_progress_pending = False try: - # Add workflow started event + # Add workflow started event (telemetry + surface state to consumers) workflow_tracer.add_workflow_event("workflow.started") + # Emit explicit start/status events to the stream + yield WorkflowStartedEvent() + yield WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS) # Reset context for a new run if supported if reset_context: @@ -216,11 +257,31 @@ class Workflow(AFBaseModel): # All executor executions happen within workflow span async for event in self._runner.run_until_convergence(): + # Track terminal indicators while forwarding events + if isinstance(event, WorkflowCompletedEvent): + saw_completed = True + elif isinstance(event, RequestInfoEvent): + saw_request = True yield event - # Success + if isinstance(event, RequestInfoEvent) and not emitted_in_progress_pending and not saw_completed: + emitted_in_progress_pending = True + yield WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS) + + # Success path: emit a final status based on observed terminal signals + if saw_completed: + yield WorkflowStatusEvent(WorkflowRunState.COMPLETED) + elif saw_request: + yield WorkflowStatusEvent(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS) + else: + yield WorkflowStatusEvent(WorkflowRunState.IDLE) + workflow_tracer.add_workflow_event("workflow.completed") except Exception as e: + # Surface structured failure details before propagating exception + details = WorkflowErrorDetails.from_exception(e) + yield WorkflowFailedEvent(details) + yield WorkflowStatusEvent(WorkflowRunState.FAILED) workflow_tracer.add_workflow_error_event(e) raise @@ -358,11 +419,12 @@ class Workflow(AFBaseModel): ): yield event - async def run(self, message: Any) -> WorkflowRunResult: + async def run(self, message: Any, *, include_status_events: bool = False) -> WorkflowRunResult: """Run the workflow with the given message. Args: message: The message to be processed by the workflow. + include_status_events: Whether to include WorkflowStatusEvent instances in the result list. Returns: A WorkflowRunResult instance containing a list of events generated during the workflow execution. @@ -377,6 +439,7 @@ class Workflow(AFBaseModel): coalesced: list[WorkflowEvent] = [] # type: ignore[name-defined] pending_updates: list[AgentRunResponseUpdate] = [] pending_executor: str | None = None + status_events: list[WorkflowStatusEvent] = [] def _flush_pending() -> None: nonlocal pending_updates, pending_executor @@ -403,12 +466,22 @@ class Workflow(AFBaseModel): continue # Flush before adding any non-update event _flush_pending() + # Omit WorkflowStartedEvent from non-streaming (telemetry-only) + if isinstance(ev, WorkflowStartedEvent): + continue + # Track status; include inline only if explicitly requested + if isinstance(ev, WorkflowStatusEvent): + status_events.append(ev) + if include_status_events: + coalesced.append(ev) + continue coalesced.append(ev) # Flush any trailing updates _flush_pending() - return WorkflowRunResult(coalesced) + # coalesced already excludes start events; includes status events only if opted in + return WorkflowRunResult(coalesced, status_events) async def run_from_checkpoint( self, @@ -435,7 +508,9 @@ class Workflow(AFBaseModel): events = [ event async for event in self.run_stream_from_checkpoint(checkpoint_id, checkpoint_storage, responses) ] - return WorkflowRunResult(events) + status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)] + filtered_events = [e for e in events if not isinstance(e, (WorkflowStatusEvent, WorkflowStartedEvent))] + return WorkflowRunResult(filtered_events, status_events) async def send_responses(self, responses: dict[str, Any]) -> WorkflowRunResult: """Send responses back to the workflow. @@ -447,7 +522,9 @@ class Workflow(AFBaseModel): A WorkflowRunResult instance containing a list of events generated during the workflow execution. """ events = [event async for event in self.send_responses_streaming(responses)] - return WorkflowRunResult(events) + status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)] + filtered_events = [e for e in events if not isinstance(e, (WorkflowStatusEvent, WorkflowStartedEvent))] + return WorkflowRunResult(filtered_events, status_events) def _get_executor_by_id(self, executor_id: str) -> Executor: """Get an executor by its ID. diff --git a/python/packages/workflow/tests/test_workflow_states.py b/python/packages/workflow/tests/test_workflow_states.py new file mode 100644 index 0000000000..0070afda46 --- /dev/null +++ b/python/packages/workflow/tests/test_workflow_states.py @@ -0,0 +1,137 @@ +# Copyright (c) Microsoft. All rights reserved. + +import pytest +from agent_framework.workflow import ( + Executor, + ExecutorFailedEvent, + RequestInfoEvent, + RequestInfoExecutor, + RequestInfoMessage, + Workflow, + WorkflowBuilder, + WorkflowCompletedEvent, + WorkflowContext, + WorkflowFailedEvent, + WorkflowRunResult, + WorkflowRunState, + WorkflowStatusEvent, + handler, +) + +from agent_framework_workflow import InProcRunnerContext +from agent_framework_workflow._shared_state import SharedState +from agent_framework_workflow._workflow_context import WorkflowContext as WFContext + + +class FailingExecutor(Executor): + """Executor that raises at runtime to test failure signaling.""" + + @handler + async def fail(self, msg: int, ctx: WorkflowContext[None]) -> None: # pragma: no cover - invoked via workflow + raise RuntimeError("boom") + + +async def test_executor_failed_and_workflow_failed_events_streaming(): + failing = FailingExecutor(id="f") + wf: Workflow = WorkflowBuilder().set_start_executor(failing).build() + + events: list[object] = [] + with pytest.raises(RuntimeError, match="boom"): + async for ev in wf.run_stream(0): + events.append(ev) + + # Workflow-level failure and FAILED status should be surfaced + assert any(isinstance(e, WorkflowFailedEvent) for e in events) + status = [e for e in events if isinstance(e, WorkflowStatusEvent)] + assert status and status[-1].state == WorkflowRunState.FAILED + + +async def test_executor_failed_event_emitted_on_direct_execute(): + failing = FailingExecutor(id="f") + ctx = InProcRunnerContext() + wf_ctx: WFContext[None] = WFContext( + executor_id=failing.id, + source_executor_ids=["START"], + shared_state=SharedState(), + runner_context=ctx, + ) + with pytest.raises(RuntimeError, match="boom"): + await failing.execute(0, wf_ctx) + drained = await ctx.drain_events() + assert any(isinstance(e, ExecutorFailedEvent) for e in drained) + + +class Requester(Executor): + """Executor that always requests external info to test idle-with-requests state.""" + + @handler + async def ask(self, _: str, ctx: WorkflowContext[RequestInfoMessage]) -> None: # pragma: no cover + await ctx.send_message(RequestInfoMessage()) + + +async def test_idle_with_pending_requests_status_streaming(): + req = Requester(id="req") + rie = RequestInfoExecutor(id="rie") + wf = WorkflowBuilder().set_start_executor(req).add_edge(req, rie).build() + + events = [ev async for ev in wf.run_stream("start")] # Consume stream fully + + # Ensure a request was emitted + assert any(isinstance(e, RequestInfoEvent) for e in events) + status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)] + assert len(status_events) >= 3 + assert status_events[-2].state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS + assert status_events[-1].state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS + + +class Completer(Executor): + """Executor that completes immediately with provided data for testing.""" + + @handler + async def run(self, msg: str, ctx: WorkflowContext[str]) -> None: # pragma: no cover + await ctx.add_event(WorkflowCompletedEvent(msg)) + + +async def test_completed_status_streaming(): + c = Completer(id="c") + wf = WorkflowBuilder().set_start_executor(c).build() + events = [ev async for ev in wf.run_stream("ok")] # no raise + # Last status should be COMPLETED + status = [e for e in events if isinstance(e, WorkflowStatusEvent)] + assert status and status[-1].state == WorkflowRunState.COMPLETED + + +async def test_non_streaming_final_state_helpers(): + # Completed case + c = Completer(id="c") + wf1 = WorkflowBuilder().set_start_executor(c).build() + result1: WorkflowRunResult = await wf1.run("done") + assert result1.get_final_state() == WorkflowRunState.COMPLETED + + # Idle-with-pending-request case + req = Requester(id="req") + rie = RequestInfoExecutor(id="rie") + wf2 = WorkflowBuilder().set_start_executor(req).add_edge(req, rie).build() + result2: WorkflowRunResult = await wf2.run("start") + assert result2.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS + + +async def test_run_includes_status_events_completed(): + c = Completer(id="c2") + wf = WorkflowBuilder().set_start_executor(c).build() + result: WorkflowRunResult = await wf.run("ok") + timeline = result.status_timeline() + assert timeline, "Expected status timeline in non-streaming run() results" + assert timeline[-1].state == WorkflowRunState.COMPLETED + + +async def test_run_includes_status_events_idle_with_requests(): + req = Requester(id="req2") + rie = RequestInfoExecutor(id="rie2") + wf = WorkflowBuilder().set_start_executor(req).add_edge(req, rie).build() + result: WorkflowRunResult = await wf.run("start") + timeline = result.status_timeline() + assert timeline, "Expected status timeline in non-streaming run() results" + assert len(timeline) >= 3 + assert timeline[-2].state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS + assert timeline[-1].state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS diff --git a/python/samples/getting_started/workflow/_start-here/step1_executors_and_edges.py b/python/samples/getting_started/workflow/_start-here/step1_executors_and_edges.py index 58b547219b..14c5348575 100644 --- a/python/samples/getting_started/workflow/_start-here/step1_executors_and_edges.py +++ b/python/samples/getting_started/workflow/_start-here/step1_executors_and_edges.py @@ -104,11 +104,14 @@ async def main(): # provides the WorkflowCompletedEvent emitted by the terminal node. events = await workflow.run("hello world") print(events.get_completed_event()) + # Summarize the final run state (e.g., COMPLETED) + print("Final state:", events.get_final_state()) """ Sample Output: WorkflowCompletedEvent(data=DLROW OLLEH) + Final state: WorkflowRunState.COMPLETED """ diff --git a/python/samples/getting_started/workflow/_start-here/step2_agents_in_a_workflow.py b/python/samples/getting_started/workflow/_start-here/step2_agents_in_a_workflow.py index aed9d22056..3d42723a38 100644 --- a/python/samples/getting_started/workflow/_start-here/step2_agents_in_a_workflow.py +++ b/python/samples/getting_started/workflow/_start-here/step2_agents_in_a_workflow.py @@ -59,6 +59,8 @@ async def main(): print(f"{event.executor_id}: {event.data}") print(f"{'=' * 60}\n{events.get_completed_event()}") + # Summarize the final run state (e.g., COMPLETED) + print("Final state:", events.get_final_state()) """ Sample Output: diff --git a/python/samples/getting_started/workflow/_start-here/step3_streaming.py b/python/samples/getting_started/workflow/_start-here/step3_streaming.py index 7f86d3bfb7..0619bd4c8e 100644 --- a/python/samples/getting_started/workflow/_start-here/step3_streaming.py +++ b/python/samples/getting_started/workflow/_start-here/step3_streaming.py @@ -4,7 +4,17 @@ import asyncio from agent_framework import ChatAgent, ChatMessage from agent_framework.azure import AzureChatClient -from agent_framework.workflow import Executor, WorkflowBuilder, WorkflowCompletedEvent, WorkflowContext, handler +from agent_framework.workflow import ( + Executor, + ExecutorFailedEvent, + WorkflowBuilder, + WorkflowCompletedEvent, + WorkflowContext, + WorkflowFailedEvent, + WorkflowRunState, + WorkflowStatusEvent, + handler, +) from azure.identity import AzureCliCredential """ @@ -107,20 +117,41 @@ async def main(): workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build() # Run the workflow with the user's initial message and stream events as they occur. - # Events include executor invoke and completion, as well as the terminal WorkflowCompletedEvent. + # In addition to executor events and WorkflowCompletedEvent, this also surfaces run-state and errors. async for event in workflow.run_stream( ChatMessage(role="user", text="Create a slogan for a new electric SUV that is affordable and fun to drive.") ): - print(event) + if isinstance(event, WorkflowStatusEvent): + if event.state == WorkflowRunState.IN_PROGRESS: + print("State: IN_PROGRESS") + elif event.state == WorkflowRunState.COMPLETED: + print("State: COMPLETED") + elif event.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS: + print("State: IN_PROGRESS_PENDING_REQUESTS (requests in flight)") + elif event.state == WorkflowRunState.IDLE: + print("State: IDLE (no active work)") + elif event.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS: + print("State: IDLE_WITH_PENDING_REQUESTS (prompt user or UI now)") + else: + print(f"State: {event.state}") + elif isinstance(event, ExecutorFailedEvent): + print(f"Executor failed: {event.executor_id} {event.details.error_type}: {event.details.message}") + elif isinstance(event, WorkflowFailedEvent): + details = event.details + print(f"Workflow failed: {details.error_type}: {details.message}") + else: + print(event) """ Sample Output: + State: IN_PROGRESS ExecutorInvokeEvent(executor_id=writer) ExecutorCompletedEvent(executor_id=writer) ExecutorInvokeEvent(executor_id=reviewer) WorkflowCompletedEvent(data=Drive the Future. Affordable Adventure, Electrified.) ExecutorCompletedEvent(executor_id=reviewer) + State: COMPLETED """ diff --git a/python/samples/getting_started/workflow/human-in-the-loop/guessing_game_with_human_input.py b/python/samples/getting_started/workflow/human-in-the-loop/guessing_game_with_human_input.py index c82e4d666e..ca5c5ec884 100644 --- a/python/samples/getting_started/workflow/human-in-the-loop/guessing_game_with_human_input.py +++ b/python/samples/getting_started/workflow/human-in-the-loop/guessing_game_with_human_input.py @@ -3,12 +3,13 @@ import asyncio from dataclasses import dataclass -from agent_framework import AgentProtocol, ChatMessage, Role +from agent_framework import ChatMessage, Role from agent_framework.azure import AzureChatClient from agent_framework.workflow import ( - AgentExecutor, # Wraps an agent so it can run inside a workflow + AgentExecutor, # Executor that runs the agent AgentExecutorRequest, # Message bundle sent to an AgentExecutor AgentExecutorResponse, # Result returned by an AgentExecutor + Executor, RequestInfoEvent, # Event emitted when human input is requested RequestInfoExecutor, # Special executor that collects human input out of band RequestInfoMessage, # Base class for request payloads sent to RequestInfoExecutor @@ -16,6 +17,8 @@ from agent_framework.workflow import ( WorkflowBuilder, # Fluent builder for assembling the graph WorkflowCompletedEvent, # Terminal event used to finish the workflow WorkflowContext, # Per run context and event bus + WorkflowRunState, # Enum of workflow run states + WorkflowStatusEvent, # Event emitted on run state changes handler, # Decorator to expose an Executor method as a step ) from azure.identity import AzureCliCredential @@ -73,7 +76,7 @@ class GuessOutput(BaseModel): guess: int -class TurnManager(AgentExecutor): +class TurnManager(Executor): """Coordinates turns between the agent and the human. Responsibilities: @@ -82,8 +85,8 @@ class TurnManager(AgentExecutor): - After each human reply, either finish the game or prompt the agent again with feedback. """ - def __init__(self, agent: AgentProtocol, id: str | None = None): - super().__init__(agent, id=id) + def __init__(self, id: str | None = None): + super().__init__(id=id) @handler async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None: @@ -166,9 +169,10 @@ async def main() -> None: response_format=GuessOutput, ) - # Build a simple loop: TurnManager <-> RequestInfoExecutor. - # TurnManager runs the agent, asks the human, processes feedback, and either finishes or repeats. - turn_manager = TurnManager(agent=agent, id="turn_manager") + # Build a simple loop: TurnManager <-> AgentExecutor <-> RequestInfoExecutor. + # TurnManager coordinates, AgentExecutor runs the model, RequestInfoExecutor gathers human replies. + turn_manager = TurnManager(id="turn_manager") + agent_exec = AgentExecutor(agent=agent, id="agent") # Naming note: # This variable is currently named hitl for historical reasons. The name can feel ambiguous or magical. @@ -179,9 +183,10 @@ async def main() -> None: top_builder = ( WorkflowBuilder() .set_start_executor(turn_manager) - .add_edge(turn_manager, turn_manager) # TurnManager executes its own agent step + .add_edge(turn_manager, agent_exec) # Ask agent to make/adjust a guess + .add_edge(agent_exec, turn_manager) # Agent's response comes back to coordinator .add_edge(turn_manager, hitl) # Ask human for guidance - .add_edge(hitl, turn_manager) # Feed human guidance back to the agent turn manager + .add_edge(hitl, turn_manager) # Feed human guidance back to coordinator ) # Build the workflow (no checkpointing in this minimal sample). @@ -206,6 +211,10 @@ async def main() -> None: stream = ( workflow.send_responses_streaming(pending_responses) if pending_responses else workflow.run_stream("start") ) + # Collect events for this turn. Among these you may see WorkflowStatusEvent + # with state IDLE_WITH_PENDING_REQUESTS when the workflow pauses for + # human input, preceded by IN_PROGRESS_PENDING_REQUESTS as requests are + # emitted. events = [event async for event in stream] pending_responses = None @@ -219,6 +228,22 @@ async def main() -> None: requests.append((event.request_id, event.data.prompt)) # Other events are ignored for brevity. + # Detect run state transitions for a better developer experience. + pending_status = any( + isinstance(e, WorkflowStatusEvent) + and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS + for e in events + ) + idle_with_requests = any( + isinstance(e, WorkflowStatusEvent) + and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS + for e in events + ) + if pending_status: + print("State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)") + if idle_with_requests: + print("State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)") + # If we have any human requests, prompt the user and prepare responses. if requests and not completed: responses: dict[str, str] = {} @@ -227,7 +252,7 @@ async def main() -> None: print(f"HITL> {prompt}") # Instructional print already appears above. The input line below is the user entry point. # If desired, you can add more guidance here, but keep it concise. - answer = input("Enter higher/lower/correct/exit: ").lower() + answer = input("Enter higher/lower/correct/exit: ").lower() # noqa: ASYNC250 if answer == "exit": print("Exiting...") return