Python: Workflow run state and structured error events, sample updates, tests (#725)

* Add workflow event types to surface workflow state, status, and failures.

* Address PR feedback

* Updates to use new workflow run state enums
This commit is contained in:
Evan Mattson
2025-09-16 10:36:25 +09:00
committed by GitHub
Unverified
parent e7cd03b32e
commit 68b76e6726
12 changed files with 463 additions and 26 deletions
@@ -40,6 +40,7 @@ _IMPORTS = [
"WorkflowAgent",
"WorkflowViz",
"FileCheckpointStorage",
"ExecutorFailedEvent",
"InMemoryCheckpointStorage",
"CheckpointStorage",
"WorkflowCheckpoint",
@@ -56,6 +57,10 @@ _IMPORTS = [
"PlanReviewRequest",
"RequestInfoEvent",
"StandardMagenticManager",
"WorkflowStatusEvent",
"WorkflowRunState",
"WorkflowErrorDetails",
"WorkflowFailedEvent",
"ConcurrentBuilder",
]
@@ -13,6 +13,7 @@ from agent_framework_workflow import (
Executor,
ExecutorCompletedEvent,
ExecutorEvent,
ExecutorFailedEvent,
ExecutorInvokeEvent,
FileCheckpointStorage,
FunctionExecutor,
@@ -38,10 +39,14 @@ from agent_framework_workflow import (
WorkflowCheckpoint,
WorkflowCompletedEvent,
WorkflowContext,
WorkflowErrorDetails,
WorkflowEvent,
WorkflowExecutor,
WorkflowFailedEvent,
WorkflowRunResult,
WorkflowRunState,
WorkflowStartedEvent,
WorkflowStatusEvent,
WorkflowViz,
__version__,
executor,
@@ -62,6 +67,7 @@ __all__ = [
"Executor",
"ExecutorCompletedEvent",
"ExecutorEvent",
"ExecutorFailedEvent",
"ExecutorInvokeEvent",
"FileCheckpointStorage",
"FunctionExecutor",
@@ -87,10 +93,14 @@ __all__ = [
"WorkflowCheckpoint",
"WorkflowCompletedEvent",
"WorkflowContext",
"WorkflowErrorDetails",
"WorkflowEvent",
"WorkflowExecutor",
"WorkflowFailedEvent",
"WorkflowRunResult",
"WorkflowRunState",
"WorkflowStartedEvent",
"WorkflowStatusEvent",
"WorkflowViz",
"__version__",
"executor",
@@ -19,11 +19,16 @@ from ._events import (
AgentRunUpdateEvent,
ExecutorCompletedEvent,
ExecutorEvent,
ExecutorFailedEvent,
ExecutorInvokeEvent,
RequestInfoEvent,
WorkflowCompletedEvent,
WorkflowErrorDetails,
WorkflowEvent,
WorkflowFailedEvent,
WorkflowRunState,
WorkflowStartedEvent,
WorkflowStatusEvent,
)
from ._executor import (
AgentExecutor,
@@ -100,6 +105,7 @@ __all__ = [
"Executor",
"ExecutorCompletedEvent",
"ExecutorEvent",
"ExecutorFailedEvent",
"ExecutorInvokeEvent",
"FileCheckpointStorage",
"FunctionExecutor",
@@ -142,10 +148,14 @@ __all__ = [
"WorkflowCheckpoint",
"WorkflowCompletedEvent",
"WorkflowContext",
"WorkflowErrorDetails",
"WorkflowEvent",
"WorkflowExecutor",
"WorkflowFailedEvent",
"WorkflowRunResult",
"WorkflowRunState",
"WorkflowStartedEvent",
"WorkflowStatusEvent",
"WorkflowValidationError",
"WorkflowViz",
"__version__",
@@ -1,5 +1,8 @@
# Copyright (c) Microsoft. All rights reserved.
import traceback as _traceback
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any
from agent_framework import AgentRunResponse, AgentRunResponseUpdate
@@ -56,6 +59,108 @@ class WorkflowErrorEvent(WorkflowEvent):
return f"{self.__class__.__name__}(exception={self.data})"
class WorkflowRunState(str, Enum):
"""Run-level state of a workflow execution.
Semantics:
- STARTED: Run has been initiated and the workflow context has been created.
This is an initial state before any meaningful work is performed. In this
codebase we emit a dedicated `WorkflowStartedEvent` for telemetry, and
typically advance the status directly to `IN_PROGRESS`. Consumers may
still rely on `STARTED` for state machines that need an explicit pre-work
phase.
- IN_PROGRESS: The workflow is actively executing (e.g., the initial
message has been delivered to the start executor or a superstep is
running). This status is emitted at the beginning of a run and can be
followed by other statuses as the run progresses.
- IN_PROGRESS_PENDING_REQUESTS: Active execution while one or more
request-for-information operations are outstanding. New work may still
be scheduled while requests are in flight.
- IDLE: The workflow is quiescent with no outstanding requests, but has
not yet emitted a terminal result. Rare in practice but provided for
orchestration integrations that distinguish a quiescent state.
- IDLE_WITH_PENDING_REQUESTS: The workflow is paused awaiting external
input (e.g., emitted a `RequestInfoEvent`). This is a non-terminal
state; the workflow can resume when responses are supplied.
- COMPLETED: Normal terminal state indicating successful completion.
- FAILED: Terminal state indicating an error surfaced. Accompanied by a
`WorkflowFailedEvent` with structured error details.
- CANCELLED: Terminal state indicating the run was cancelled by a caller
or orchestrator. Not currently emitted by default runner paths but
included for integrators/orchestrators that support cancellation.
"""
STARTED = "STARTED" # Explicit pre-work phase (rarely emitted as status; see note above)
IN_PROGRESS = "IN_PROGRESS" # Active execution is underway
IN_PROGRESS_PENDING_REQUESTS = "IN_PROGRESS_PENDING_REQUESTS" # Active execution with outstanding requests
IDLE = "IDLE" # No active work and no outstanding requests
IDLE_WITH_PENDING_REQUESTS = "IDLE_WITH_PENDING_REQUESTS" # Paused awaiting external responses
COMPLETED = "COMPLETED" # Finished successfully
FAILED = "FAILED" # Finished with an error
CANCELLED = "CANCELLED" # Finished due to cancellation
class WorkflowStatusEvent(WorkflowEvent):
"""Event indicating a transition in the workflow run state."""
def __init__(self, state: WorkflowRunState, data: Any | None = None):
super().__init__(data)
self.state = state
def __repr__(self) -> str: # pragma: no cover - representation only
return f"{self.__class__.__name__}(state={self.state}, data={self.data!r})"
@dataclass
class WorkflowErrorDetails:
"""Structured error information to surface in error events/results."""
error_type: str
message: str
traceback: str | None = None
executor_id: str | None = None
extra: dict[str, Any] | None = None
@classmethod
def from_exception(
cls,
exc: BaseException,
*,
executor_id: str | None = None,
extra: dict[str, Any] | None = None,
) -> "WorkflowErrorDetails":
tb = None
try:
tb = "".join(_traceback.format_exception(type(exc), exc, exc.__traceback__))
except Exception:
tb = None
return cls(
error_type=exc.__class__.__name__,
message=str(exc),
traceback=tb,
executor_id=executor_id,
extra=extra,
)
class WorkflowFailedEvent(WorkflowEvent):
"""Terminal failure event for a workflow run."""
def __init__(self, details: WorkflowErrorDetails, data: Any | None = None):
super().__init__(data)
self.details = details
def __repr__(self) -> str: # pragma: no cover - representation only
return f"{self.__class__.__name__}(details={self.details}, data={self.data!r})"
class RequestInfoEvent(WorkflowEvent):
"""Event triggered when a workflow executor requests external information."""
@@ -119,6 +224,17 @@ class ExecutorCompletedEvent(ExecutorEvent):
return f"{self.__class__.__name__}(executor_id={self.executor_id})"
class ExecutorFailedEvent(ExecutorEvent):
"""Event triggered when an executor handler raises an error."""
def __init__(self, executor_id: str, details: WorkflowErrorDetails):
super().__init__(executor_id, details)
self.details = details
def __repr__(self) -> str: # pragma: no cover - representation only
return f"{self.__class__.__name__}(executor_id={self.executor_id}, details={self.details})"
class AgentRunUpdateEvent(ExecutorEvent):
"""Event triggered when an agent is streaming messages."""
@@ -102,7 +102,14 @@ class Executor(AFBaseModel):
if self._request_interceptors and message.__class__.__name__ == "SubWorkflowRequestInfo":
# Directly handle SubWorkflowRequestInfo
await context.add_event(ExecutorInvokeEvent(self.id))
await self._handle_sub_workflow_request(message, context)
try:
await self._handle_sub_workflow_request(message, context)
except Exception as exc:
# Surface structured executor failure before propagating
from ._events import ExecutorFailedEvent, WorkflowErrorDetails
await context.add_event(ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc)))
raise
await context.add_event(ExecutorCompletedEvent(self.id))
return
@@ -115,7 +122,14 @@ class Executor(AFBaseModel):
if handler is None:
raise RuntimeError(f"Executor {self.__class__.__name__} cannot handle message of type {type(message)}.")
await context.add_event(ExecutorInvokeEvent(self.id))
await handler(message, context)
try:
await handler(message, context)
except Exception as exc:
# Surface structured executor failure before propagating
from ._events import ExecutorFailedEvent, WorkflowErrorDetails
await context.add_event(ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc)))
raise
await context.add_event(ExecutorCompletedEvent(self.id))
def _discover_handlers(self) -> None:
@@ -113,8 +113,15 @@ class Runner:
# Periodically continue to let iteration advance
continue
# Propagate errors from iteration
await iteration_task
# Propagate errors from iteration, but first surface any pending events
try:
await iteration_task
except Exception:
# Make sure failure-related events (like ExecutorFailedEvent) are surfaced
if await self._ctx.has_events():
for event in await self._ctx.drain_events():
yield event
raise
self._iteration += 1
# Drain any straggler events emitted at tail end
@@ -25,7 +25,16 @@ from ._edge import (
SwitchCaseEdgeGroupCase,
SwitchCaseEdgeGroupDefault,
)
from ._events import RequestInfoEvent, WorkflowCompletedEvent, WorkflowEvent
from ._events import (
RequestInfoEvent,
WorkflowCompletedEvent,
WorkflowErrorDetails,
WorkflowEvent,
WorkflowFailedEvent,
WorkflowRunState,
WorkflowStartedEvent,
WorkflowStatusEvent,
)
from ._executor import AgentExecutor, Executor, RequestInfoExecutor
from ._runner import Runner
from ._runner_context import CheckpointState, InProcRunnerContext, RunnerContext
@@ -43,7 +52,16 @@ logger = logging.getLogger(__name__)
class WorkflowRunResult(list[WorkflowEvent]):
"""A list of events generated during the workflow execution in non-streaming mode."""
"""A list of events generated during the workflow execution in non-streaming mode.
Preserves the historical contract that the list contains data-plane events
only (executor invoke/complete, completed, requests), while exposing the
control-plane status timeline via accessors.
"""
def __init__(self, events: list[WorkflowEvent], status_events: list[WorkflowStatusEvent] | None = None) -> None:
super().__init__(events)
self._status_events: list[WorkflowStatusEvent] = status_events or []
def get_completed_event(self) -> WorkflowCompletedEvent | None:
"""Get the completed event from the workflow run result.
@@ -69,6 +87,23 @@ class WorkflowRunResult(list[WorkflowEvent]):
"""
return [event for event in self if isinstance(event, RequestInfoEvent)]
def get_final_state(self) -> WorkflowRunState:
"""Return the final run state based on explicit status events.
Returns the last WorkflowStatusEvent.state observed. Raises if none were emitted.
"""
if self._status_events:
return self._status_events[-1].state # type: ignore[return-value]
raise RuntimeError(
"Final state is unknown because no WorkflowStatusEvent was emitted. "
"Ensure your workflow entry points are used (which emit status events) "
"or handle the absence of status explicitly."
)
def status_timeline(self) -> list[WorkflowStatusEvent]:
"""Return the list of status events emitted during the run (control-plane)."""
return list(self._status_events)
# region Workflow
@@ -202,9 +237,15 @@ class Workflow(AFBaseModel):
# Create workflow span that encompasses the entire execution
with workflow_tracer.create_workflow_run_span(self):
saw_completed = False
saw_request = False
emitted_in_progress_pending = False
try:
# Add workflow started event
# Add workflow started event (telemetry + surface state to consumers)
workflow_tracer.add_workflow_event("workflow.started")
# Emit explicit start/status events to the stream
yield WorkflowStartedEvent()
yield WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS)
# Reset context for a new run if supported
if reset_context:
@@ -216,11 +257,31 @@ class Workflow(AFBaseModel):
# All executor executions happen within workflow span
async for event in self._runner.run_until_convergence():
# Track terminal indicators while forwarding events
if isinstance(event, WorkflowCompletedEvent):
saw_completed = True
elif isinstance(event, RequestInfoEvent):
saw_request = True
yield event
# Success
if isinstance(event, RequestInfoEvent) and not emitted_in_progress_pending and not saw_completed:
emitted_in_progress_pending = True
yield WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS)
# Success path: emit a final status based on observed terminal signals
if saw_completed:
yield WorkflowStatusEvent(WorkflowRunState.COMPLETED)
elif saw_request:
yield WorkflowStatusEvent(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS)
else:
yield WorkflowStatusEvent(WorkflowRunState.IDLE)
workflow_tracer.add_workflow_event("workflow.completed")
except Exception as e:
# Surface structured failure details before propagating exception
details = WorkflowErrorDetails.from_exception(e)
yield WorkflowFailedEvent(details)
yield WorkflowStatusEvent(WorkflowRunState.FAILED)
workflow_tracer.add_workflow_error_event(e)
raise
@@ -358,11 +419,12 @@ class Workflow(AFBaseModel):
):
yield event
async def run(self, message: Any) -> WorkflowRunResult:
async def run(self, message: Any, *, include_status_events: bool = False) -> WorkflowRunResult:
"""Run the workflow with the given message.
Args:
message: The message to be processed by the workflow.
include_status_events: Whether to include WorkflowStatusEvent instances in the result list.
Returns:
A WorkflowRunResult instance containing a list of events generated during the workflow execution.
@@ -377,6 +439,7 @@ class Workflow(AFBaseModel):
coalesced: list[WorkflowEvent] = [] # type: ignore[name-defined]
pending_updates: list[AgentRunResponseUpdate] = []
pending_executor: str | None = None
status_events: list[WorkflowStatusEvent] = []
def _flush_pending() -> None:
nonlocal pending_updates, pending_executor
@@ -403,12 +466,22 @@ class Workflow(AFBaseModel):
continue
# Flush before adding any non-update event
_flush_pending()
# Omit WorkflowStartedEvent from non-streaming (telemetry-only)
if isinstance(ev, WorkflowStartedEvent):
continue
# Track status; include inline only if explicitly requested
if isinstance(ev, WorkflowStatusEvent):
status_events.append(ev)
if include_status_events:
coalesced.append(ev)
continue
coalesced.append(ev)
# Flush any trailing updates
_flush_pending()
return WorkflowRunResult(coalesced)
# coalesced already excludes start events; includes status events only if opted in
return WorkflowRunResult(coalesced, status_events)
async def run_from_checkpoint(
self,
@@ -435,7 +508,9 @@ class Workflow(AFBaseModel):
events = [
event async for event in self.run_stream_from_checkpoint(checkpoint_id, checkpoint_storage, responses)
]
return WorkflowRunResult(events)
status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)]
filtered_events = [e for e in events if not isinstance(e, (WorkflowStatusEvent, WorkflowStartedEvent))]
return WorkflowRunResult(filtered_events, status_events)
async def send_responses(self, responses: dict[str, Any]) -> WorkflowRunResult:
"""Send responses back to the workflow.
@@ -447,7 +522,9 @@ class Workflow(AFBaseModel):
A WorkflowRunResult instance containing a list of events generated during the workflow execution.
"""
events = [event async for event in self.send_responses_streaming(responses)]
return WorkflowRunResult(events)
status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)]
filtered_events = [e for e in events if not isinstance(e, (WorkflowStatusEvent, WorkflowStartedEvent))]
return WorkflowRunResult(filtered_events, status_events)
def _get_executor_by_id(self, executor_id: str) -> Executor:
"""Get an executor by its ID.
@@ -0,0 +1,137 @@
# Copyright (c) Microsoft. All rights reserved.
import pytest
from agent_framework.workflow import (
Executor,
ExecutorFailedEvent,
RequestInfoEvent,
RequestInfoExecutor,
RequestInfoMessage,
Workflow,
WorkflowBuilder,
WorkflowCompletedEvent,
WorkflowContext,
WorkflowFailedEvent,
WorkflowRunResult,
WorkflowRunState,
WorkflowStatusEvent,
handler,
)
from agent_framework_workflow import InProcRunnerContext
from agent_framework_workflow._shared_state import SharedState
from agent_framework_workflow._workflow_context import WorkflowContext as WFContext
class FailingExecutor(Executor):
"""Executor that raises at runtime to test failure signaling."""
@handler
async def fail(self, msg: int, ctx: WorkflowContext[None]) -> None: # pragma: no cover - invoked via workflow
raise RuntimeError("boom")
async def test_executor_failed_and_workflow_failed_events_streaming():
failing = FailingExecutor(id="f")
wf: Workflow = WorkflowBuilder().set_start_executor(failing).build()
events: list[object] = []
with pytest.raises(RuntimeError, match="boom"):
async for ev in wf.run_stream(0):
events.append(ev)
# Workflow-level failure and FAILED status should be surfaced
assert any(isinstance(e, WorkflowFailedEvent) for e in events)
status = [e for e in events if isinstance(e, WorkflowStatusEvent)]
assert status and status[-1].state == WorkflowRunState.FAILED
async def test_executor_failed_event_emitted_on_direct_execute():
failing = FailingExecutor(id="f")
ctx = InProcRunnerContext()
wf_ctx: WFContext[None] = WFContext(
executor_id=failing.id,
source_executor_ids=["START"],
shared_state=SharedState(),
runner_context=ctx,
)
with pytest.raises(RuntimeError, match="boom"):
await failing.execute(0, wf_ctx)
drained = await ctx.drain_events()
assert any(isinstance(e, ExecutorFailedEvent) for e in drained)
class Requester(Executor):
"""Executor that always requests external info to test idle-with-requests state."""
@handler
async def ask(self, _: str, ctx: WorkflowContext[RequestInfoMessage]) -> None: # pragma: no cover
await ctx.send_message(RequestInfoMessage())
async def test_idle_with_pending_requests_status_streaming():
req = Requester(id="req")
rie = RequestInfoExecutor(id="rie")
wf = WorkflowBuilder().set_start_executor(req).add_edge(req, rie).build()
events = [ev async for ev in wf.run_stream("start")] # Consume stream fully
# Ensure a request was emitted
assert any(isinstance(e, RequestInfoEvent) for e in events)
status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)]
assert len(status_events) >= 3
assert status_events[-2].state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
assert status_events[-1].state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
class Completer(Executor):
"""Executor that completes immediately with provided data for testing."""
@handler
async def run(self, msg: str, ctx: WorkflowContext[str]) -> None: # pragma: no cover
await ctx.add_event(WorkflowCompletedEvent(msg))
async def test_completed_status_streaming():
c = Completer(id="c")
wf = WorkflowBuilder().set_start_executor(c).build()
events = [ev async for ev in wf.run_stream("ok")] # no raise
# Last status should be COMPLETED
status = [e for e in events if isinstance(e, WorkflowStatusEvent)]
assert status and status[-1].state == WorkflowRunState.COMPLETED
async def test_non_streaming_final_state_helpers():
# Completed case
c = Completer(id="c")
wf1 = WorkflowBuilder().set_start_executor(c).build()
result1: WorkflowRunResult = await wf1.run("done")
assert result1.get_final_state() == WorkflowRunState.COMPLETED
# Idle-with-pending-request case
req = Requester(id="req")
rie = RequestInfoExecutor(id="rie")
wf2 = WorkflowBuilder().set_start_executor(req).add_edge(req, rie).build()
result2: WorkflowRunResult = await wf2.run("start")
assert result2.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
async def test_run_includes_status_events_completed():
c = Completer(id="c2")
wf = WorkflowBuilder().set_start_executor(c).build()
result: WorkflowRunResult = await wf.run("ok")
timeline = result.status_timeline()
assert timeline, "Expected status timeline in non-streaming run() results"
assert timeline[-1].state == WorkflowRunState.COMPLETED
async def test_run_includes_status_events_idle_with_requests():
req = Requester(id="req2")
rie = RequestInfoExecutor(id="rie2")
wf = WorkflowBuilder().set_start_executor(req).add_edge(req, rie).build()
result: WorkflowRunResult = await wf.run("start")
timeline = result.status_timeline()
assert timeline, "Expected status timeline in non-streaming run() results"
assert len(timeline) >= 3
assert timeline[-2].state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
assert timeline[-1].state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
@@ -104,11 +104,14 @@ async def main():
# provides the WorkflowCompletedEvent emitted by the terminal node.
events = await workflow.run("hello world")
print(events.get_completed_event())
# Summarize the final run state (e.g., COMPLETED)
print("Final state:", events.get_final_state())
"""
Sample Output:
WorkflowCompletedEvent(data=DLROW OLLEH)
Final state: WorkflowRunState.COMPLETED
"""
@@ -59,6 +59,8 @@ async def main():
print(f"{event.executor_id}: {event.data}")
print(f"{'=' * 60}\n{events.get_completed_event()}")
# Summarize the final run state (e.g., COMPLETED)
print("Final state:", events.get_final_state())
"""
Sample Output:
@@ -4,7 +4,17 @@ import asyncio
from agent_framework import ChatAgent, ChatMessage
from agent_framework.azure import AzureChatClient
from agent_framework.workflow import Executor, WorkflowBuilder, WorkflowCompletedEvent, WorkflowContext, handler
from agent_framework.workflow import (
Executor,
ExecutorFailedEvent,
WorkflowBuilder,
WorkflowCompletedEvent,
WorkflowContext,
WorkflowFailedEvent,
WorkflowRunState,
WorkflowStatusEvent,
handler,
)
from azure.identity import AzureCliCredential
"""
@@ -107,20 +117,41 @@ async def main():
workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build()
# Run the workflow with the user's initial message and stream events as they occur.
# Events include executor invoke and completion, as well as the terminal WorkflowCompletedEvent.
# In addition to executor events and WorkflowCompletedEvent, this also surfaces run-state and errors.
async for event in workflow.run_stream(
ChatMessage(role="user", text="Create a slogan for a new electric SUV that is affordable and fun to drive.")
):
print(event)
if isinstance(event, WorkflowStatusEvent):
if event.state == WorkflowRunState.IN_PROGRESS:
print("State: IN_PROGRESS")
elif event.state == WorkflowRunState.COMPLETED:
print("State: COMPLETED")
elif event.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS:
print("State: IN_PROGRESS_PENDING_REQUESTS (requests in flight)")
elif event.state == WorkflowRunState.IDLE:
print("State: IDLE (no active work)")
elif event.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS:
print("State: IDLE_WITH_PENDING_REQUESTS (prompt user or UI now)")
else:
print(f"State: {event.state}")
elif isinstance(event, ExecutorFailedEvent):
print(f"Executor failed: {event.executor_id} {event.details.error_type}: {event.details.message}")
elif isinstance(event, WorkflowFailedEvent):
details = event.details
print(f"Workflow failed: {details.error_type}: {details.message}")
else:
print(event)
"""
Sample Output:
State: IN_PROGRESS
ExecutorInvokeEvent(executor_id=writer)
ExecutorCompletedEvent(executor_id=writer)
ExecutorInvokeEvent(executor_id=reviewer)
WorkflowCompletedEvent(data=Drive the Future. Affordable Adventure, Electrified.)
ExecutorCompletedEvent(executor_id=reviewer)
State: COMPLETED
"""
@@ -3,12 +3,13 @@
import asyncio
from dataclasses import dataclass
from agent_framework import AgentProtocol, ChatMessage, Role
from agent_framework import ChatMessage, Role
from agent_framework.azure import AzureChatClient
from agent_framework.workflow import (
AgentExecutor, # Wraps an agent so it can run inside a workflow
AgentExecutor, # Executor that runs the agent
AgentExecutorRequest, # Message bundle sent to an AgentExecutor
AgentExecutorResponse, # Result returned by an AgentExecutor
Executor,
RequestInfoEvent, # Event emitted when human input is requested
RequestInfoExecutor, # Special executor that collects human input out of band
RequestInfoMessage, # Base class for request payloads sent to RequestInfoExecutor
@@ -16,6 +17,8 @@ from agent_framework.workflow import (
WorkflowBuilder, # Fluent builder for assembling the graph
WorkflowCompletedEvent, # Terminal event used to finish the workflow
WorkflowContext, # Per run context and event bus
WorkflowRunState, # Enum of workflow run states
WorkflowStatusEvent, # Event emitted on run state changes
handler, # Decorator to expose an Executor method as a step
)
from azure.identity import AzureCliCredential
@@ -73,7 +76,7 @@ class GuessOutput(BaseModel):
guess: int
class TurnManager(AgentExecutor):
class TurnManager(Executor):
"""Coordinates turns between the agent and the human.
Responsibilities:
@@ -82,8 +85,8 @@ class TurnManager(AgentExecutor):
- After each human reply, either finish the game or prompt the agent again with feedback.
"""
def __init__(self, agent: AgentProtocol, id: str | None = None):
super().__init__(agent, id=id)
def __init__(self, id: str | None = None):
super().__init__(id=id)
@handler
async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
@@ -166,9 +169,10 @@ async def main() -> None:
response_format=GuessOutput,
)
# Build a simple loop: TurnManager <-> RequestInfoExecutor.
# TurnManager runs the agent, asks the human, processes feedback, and either finishes or repeats.
turn_manager = TurnManager(agent=agent, id="turn_manager")
# Build a simple loop: TurnManager <-> AgentExecutor <-> RequestInfoExecutor.
# TurnManager coordinates, AgentExecutor runs the model, RequestInfoExecutor gathers human replies.
turn_manager = TurnManager(id="turn_manager")
agent_exec = AgentExecutor(agent=agent, id="agent")
# Naming note:
# This variable is currently named hitl for historical reasons. The name can feel ambiguous or magical.
@@ -179,9 +183,10 @@ async def main() -> None:
top_builder = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, turn_manager) # TurnManager executes its own agent step
.add_edge(turn_manager, agent_exec) # Ask agent to make/adjust a guess
.add_edge(agent_exec, turn_manager) # Agent's response comes back to coordinator
.add_edge(turn_manager, hitl) # Ask human for guidance
.add_edge(hitl, turn_manager) # Feed human guidance back to the agent turn manager
.add_edge(hitl, turn_manager) # Feed human guidance back to coordinator
)
# Build the workflow (no checkpointing in this minimal sample).
@@ -206,6 +211,10 @@ async def main() -> None:
stream = (
workflow.send_responses_streaming(pending_responses) if pending_responses else workflow.run_stream("start")
)
# Collect events for this turn. Among these you may see WorkflowStatusEvent
# with state IDLE_WITH_PENDING_REQUESTS when the workflow pauses for
# human input, preceded by IN_PROGRESS_PENDING_REQUESTS as requests are
# emitted.
events = [event async for event in stream]
pending_responses = None
@@ -219,6 +228,22 @@ async def main() -> None:
requests.append((event.request_id, event.data.prompt))
# Other events are ignored for brevity.
# Detect run state transitions for a better developer experience.
pending_status = any(
isinstance(e, WorkflowStatusEvent)
and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
for e in events
)
idle_with_requests = any(
isinstance(e, WorkflowStatusEvent)
and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
for e in events
)
if pending_status:
print("State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
if idle_with_requests:
print("State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")
# If we have any human requests, prompt the user and prepare responses.
if requests and not completed:
responses: dict[str, str] = {}
@@ -227,7 +252,7 @@ async def main() -> None:
print(f"HITL> {prompt}")
# Instructional print already appears above. The input line below is the user entry point.
# If desired, you can add more guidance here, but keep it concise.
answer = input("Enter higher/lower/correct/exit: ").lower()
answer = input("Enter higher/lower/correct/exit: ").lower() # noqa: ASYNC250
if answer == "exit":
print("Exiting...")
return