Python: Workflow event source updates (#789)

* Workflow event source updates

* Add WorkflowLifecycleEvent TypeAlias. Update docstrings

* Updates

* Rename

---------

Co-authored-by: Chris <66376200+crickman@users.noreply.github.com>
This commit is contained in:
Evan Mattson
2025-09-19 00:17:55 +09:00
committed by GitHub
Unverified
parent 6a66ad517a
commit 960196cd52
18 changed files with 350 additions and 118 deletions
@@ -31,12 +31,14 @@ from ._events import (
ExecutorCompletedEvent,
ExecutorEvent,
ExecutorFailedEvent,
ExecutorInvokeEvent,
ExecutorInvokedEvent,
RequestInfoEvent,
WorkflowCompletedEvent,
WorkflowErrorDetails,
WorkflowEvent,
WorkflowEventSource,
WorkflowFailedEvent,
WorkflowLifecycleEvent,
WorkflowRunState,
WorkflowStartedEvent,
WorkflowStatusEvent,
@@ -118,7 +120,7 @@ __all__ = [
"ExecutorCompletedEvent",
"ExecutorEvent",
"ExecutorFailedEvent",
"ExecutorInvokeEvent",
"ExecutorInvokedEvent",
"FanInEdgeGroup",
"FanOutEdgeGroup",
"FileCheckpointStorage",
@@ -172,8 +174,10 @@ __all__ = [
"WorkflowContext",
"WorkflowErrorDetails",
"WorkflowEvent",
"WorkflowEventSource",
"WorkflowExecutor",
"WorkflowFailedEvent",
"WorkflowLifecycleEvent",
"WorkflowRunResult",
"WorkflowRunState",
"WorkflowStartedEvent",
@@ -27,12 +27,14 @@ from ._events import (
ExecutorCompletedEvent,
ExecutorEvent,
ExecutorFailedEvent,
ExecutorInvokeEvent,
ExecutorInvokedEvent,
RequestInfoEvent,
WorkflowCompletedEvent,
WorkflowErrorDetails,
WorkflowEvent,
WorkflowEventSource,
WorkflowFailedEvent,
WorkflowLifecycleEvent,
WorkflowRunState,
WorkflowStartedEvent,
WorkflowStatusEvent,
@@ -114,7 +116,7 @@ __all__ = [
"ExecutorCompletedEvent",
"ExecutorEvent",
"ExecutorFailedEvent",
"ExecutorInvokeEvent",
"ExecutorInvokedEvent",
"FanInEdgeGroup",
"FanOutEdgeGroup",
"FileCheckpointStorage",
@@ -168,8 +170,10 @@ __all__ = [
"WorkflowContext",
"WorkflowErrorDetails",
"WorkflowEvent",
"WorkflowEventSource",
"WorkflowExecutor",
"WorkflowFailedEvent",
"WorkflowLifecycleEvent",
"WorkflowRunResult",
"WorkflowRunState",
"WorkflowStartedEvent",
@@ -1,9 +1,12 @@
# Copyright (c) Microsoft. All rights reserved.
import traceback as _traceback
from collections.abc import Iterator
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, TypeAlias
from agent_framework import AgentRunResponse, AgentRunResponseUpdate
@@ -11,32 +14,70 @@ if TYPE_CHECKING:
from ._executor import RequestInfoMessage
class WorkflowEventSource(str, Enum):
"""Identifies whether a workflow event came from the framework or an executor.
Use `FRAMEWORK` for events emitted by built-in orchestration paths—even when the
code that raises them lives in runner-related modules—and `EXECUTOR` for events
surfaced by developer-provided executor implementations.
"""
FRAMEWORK = "FRAMEWORK" # Framework-owned orchestration, regardless of module location
EXECUTOR = "EXECUTOR" # User-supplied executor code and callbacks
_event_origin_context: ContextVar[WorkflowEventSource] = ContextVar(
"workflow_event_origin", default=WorkflowEventSource.EXECUTOR
)
def _current_event_origin() -> WorkflowEventSource:
"""Return the origin to associate with newly created workflow events."""
return _event_origin_context.get()
@contextmanager
def _framework_event_origin() -> Iterator[None]: # pyright: ignore[reportUnusedFunction]
"""Temporarily mark subsequently created events as originating from the framework (internal)."""
token = _event_origin_context.set(WorkflowEventSource.FRAMEWORK)
try:
yield
finally:
_event_origin_context.reset(token)
class WorkflowEvent:
"""Base class for workflow events."""
def __init__(self, data: Any | None = None):
"""Initialize the workflow event with optional data."""
self.data = data
self.origin = _current_event_origin()
def __repr__(self) -> str:
"""Return a string representation of the workflow event."""
return f"{self.__class__.__name__}(data={self.data if self.data is not None else 'None'})"
data_repr = self.data if self.data is not None else "None"
return f"{self.__class__.__name__}(origin={self.origin}, data={data_repr})"
class WorkflowStartedEvent(WorkflowEvent):
"""Event triggered when a workflow starts."""
"""Built-in lifecycle event emitted when a workflow run begins."""
...
class WorkflowCompletedEvent(WorkflowEvent):
"""Event triggered when a workflow completes."""
"""Built-in lifecycle event emitted when a workflow run completes successfully.
Unlike the framework-only `WorkflowLifecycleEvent` union, this event can be
emitted by developer-provided executors to return final workflow output.
"""
...
class WorkflowWarningEvent(WorkflowEvent):
"""Event triggered when a warning occurs in the workflow."""
"""Executor-origin event signaling a warning surfaced by user code."""
def __init__(self, data: str):
"""Initialize the workflow warning event with optional data and warning message."""
@@ -44,11 +85,11 @@ class WorkflowWarningEvent(WorkflowEvent):
def __repr__(self) -> str:
"""Return a string representation of the workflow warning event."""
return f"{self.__class__.__name__}(message={self.data})"
return f"{self.__class__.__name__}(message={self.data}, origin={self.origin})"
class WorkflowErrorEvent(WorkflowEvent):
"""Event triggered when an error occurs in the workflow."""
"""Executor-origin event signaling an error surfaced by user code."""
def __init__(self, data: Exception):
"""Initialize the workflow error event with optional data and error message."""
@@ -56,7 +97,7 @@ class WorkflowErrorEvent(WorkflowEvent):
def __repr__(self) -> str:
"""Return a string representation of the workflow error event."""
return f"{self.__class__.__name__}(exception={self.data})"
return f"{self.__class__.__name__}(exception={self.data}, origin={self.origin})"
class WorkflowRunState(str, Enum):
@@ -108,14 +149,24 @@ class WorkflowRunState(str, Enum):
class WorkflowStatusEvent(WorkflowEvent):
"""Event indicating a transition in the workflow run state."""
"""Built-in lifecycle event emitted for workflow run state transitions."""
def __init__(self, state: WorkflowRunState, data: Any | None = None):
def __init__(
self,
state: WorkflowRunState,
data: Any | None = None,
):
"""Initialize the workflow status event with a new state and optional data.
Args:
state: The new state of the workflow run.
data: Optional additional data associated with the state change.
"""
super().__init__(data)
self.state = state
def __repr__(self) -> str: # pragma: no cover - representation only
return f"{self.__class__.__name__}(state={self.state}, data={self.data!r})"
return f"{self.__class__.__name__}(state={self.state}, data={self.data!r}, origin={self.origin})"
@dataclass
@@ -151,14 +202,18 @@ class WorkflowErrorDetails:
class WorkflowFailedEvent(WorkflowEvent):
"""Terminal failure event for a workflow run."""
"""Built-in lifecycle event emitted when a workflow run terminates with an error."""
def __init__(self, details: WorkflowErrorDetails, data: Any | None = None):
def __init__(
self,
details: WorkflowErrorDetails,
data: Any | None = None,
):
super().__init__(data)
self.details = details
def __repr__(self) -> str: # pragma: no cover - representation only
return f"{self.__class__.__name__}(details={self.details}, data={self.data!r})"
return f"{self.__class__.__name__}(details={self.details}, data={self.data!r}, origin={self.origin})"
class RequestInfoEvent(WorkflowEvent):
@@ -208,12 +263,12 @@ class ExecutorEvent(WorkflowEvent):
return f"{self.__class__.__name__}(executor_id={self.executor_id}, data={self.data})"
class ExecutorInvokeEvent(ExecutorEvent):
class ExecutorInvokedEvent(ExecutorEvent):
"""Event triggered when an executor handler is invoked."""
def __repr__(self) -> str:
"""Return a string representation of the executor handler invoke event."""
return f"{self.__class__.__name__}(executor_id={self.executor_id})"
return f"{self.__class__.__name__}(executor_id={self.executor_id}, data={self.data})"
class ExecutorCompletedEvent(ExecutorEvent):
@@ -221,13 +276,17 @@ class ExecutorCompletedEvent(ExecutorEvent):
def __repr__(self) -> str:
"""Return a string representation of the executor handler complete event."""
return f"{self.__class__.__name__}(executor_id={self.executor_id})"
return f"{self.__class__.__name__}(executor_id={self.executor_id}, data={self.data})"
class ExecutorFailedEvent(ExecutorEvent):
"""Event triggered when an executor handler raises an error."""
def __init__(self, executor_id: str, details: WorkflowErrorDetails):
def __init__(
self,
executor_id: str,
details: WorkflowErrorDetails,
):
super().__init__(executor_id, details)
self.details = details
@@ -257,3 +316,6 @@ class AgentRunEvent(ExecutorEvent):
def __repr__(self) -> str:
"""Return a string representation of the agent run event."""
return f"{self.__class__.__name__}(executor_id={self.executor_id}, data={self.data})"
WorkflowLifecycleEvent: TypeAlias = WorkflowStartedEvent | WorkflowStatusEvent | WorkflowFailedEvent
@@ -21,8 +21,9 @@ from ._events import (
AgentRunEvent,
AgentRunUpdateEvent,
ExecutorCompletedEvent,
ExecutorInvokeEvent,
ExecutorInvokedEvent,
RequestInfoEvent,
_framework_event_origin, # pyright: ignore[reportPrivateUsage]
)
from ._typing_utils import is_instance_of
from ._workflow_context import WorkflowContext
@@ -102,16 +103,22 @@ class Executor(AFBaseModel):
# Lazy registration for SubWorkflowRequestInfo if we have interceptors
if self._request_interceptors and message.__class__.__name__ == "SubWorkflowRequestInfo":
# Directly handle SubWorkflowRequestInfo
await context.add_event(ExecutorInvokeEvent(self.id))
with _framework_event_origin():
invoke_event = ExecutorInvokedEvent(self.id)
await context.add_event(invoke_event)
try:
await self._handle_sub_workflow_request(message, context)
except Exception as exc:
# Surface structured executor failure before propagating
from ._events import ExecutorFailedEvent, WorkflowErrorDetails
await context.add_event(ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc)))
with _framework_event_origin():
failure_event = ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc))
await context.add_event(failure_event)
raise
await context.add_event(ExecutorCompletedEvent(self.id))
with _framework_event_origin():
completed_event = ExecutorCompletedEvent(self.id)
await context.add_event(completed_event)
return
handler: Callable[[Any, WorkflowContext[Any]], Any] | None = None
@@ -122,16 +129,22 @@ class Executor(AFBaseModel):
if handler is None:
raise RuntimeError(f"Executor {self.__class__.__name__} cannot handle message of type {type(message)}.")
await context.add_event(ExecutorInvokeEvent(self.id))
with _framework_event_origin():
invoke_event = ExecutorInvokedEvent(self.id)
await context.add_event(invoke_event)
try:
await handler(message, context)
except Exception as exc:
# Surface structured executor failure before propagating
from ._events import ExecutorFailedEvent, WorkflowErrorDetails
await context.add_event(ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc)))
with _framework_event_origin():
failure_event = ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc))
await context.add_event(failure_event)
raise
await context.add_event(ExecutorCompletedEvent(self.id))
with _framework_event_origin():
completed_event = ExecutorCompletedEvent(self.id)
await context.add_event(completed_event)
def _discover_handlers(self) -> None:
"""Discover message handlers and request interceptors in the executor class."""
@@ -11,7 +11,7 @@ if TYPE_CHECKING:
from ._edge import EdgeGroup
from ._edge_runner import EdgeRunner, create_edge_runner
from ._events import WorkflowCompletedEvent, WorkflowEvent
from ._events import WorkflowCompletedEvent, WorkflowEvent, _framework_event_origin
from ._executor import Executor
from ._runner_context import Message, RunnerContext
from ._shared_state import SharedState
@@ -280,7 +280,9 @@ class Runner:
if isinstance(message.data, AgentExecutorResponse):
final_messages = message.data.agent_run_response.messages
final_text = final_messages[-1].text if final_messages else "(no content)"
await self._ctx.add_event(WorkflowCompletedEvent(final_text))
with _framework_event_origin():
completion_event = WorkflowCompletedEvent(final_text)
await self._ctx.add_event(completion_event)
continue # Terminal handled
except Exception as exc: # pragma: no cover - defensive
logger.debug("Suppressed exception during terminal message type check: %s", exc)
@@ -301,7 +303,9 @@ class Runner:
# Emit a single completion event with final text (best-effort extraction)
final_messages = message.data.agent_run_response.messages
final_text = final_messages[-1].text if final_messages else "(no content)"
await self._ctx.add_event(WorkflowCompletedEvent(final_text))
with _framework_event_origin():
completion_event = WorkflowCompletedEvent(final_text)
await self._ctx.add_event(completion_event)
continue
except Exception as exc: # pragma: no cover
logger.debug("Terminal completion emission failed: %s", exc)
@@ -5,7 +5,7 @@ import logging
import sys
import uuid
from collections.abc import AsyncIterable, Awaitable, Callable, Sequence
from typing import Any, cast
from typing import Any
from pydantic import Field
@@ -35,6 +35,7 @@ from ._events import (
WorkflowRunState,
WorkflowStartedEvent,
WorkflowStatusEvent,
_framework_event_origin,
)
from ._executor import AgentExecutor, Executor, RequestInfoExecutor
from ._runner import Runner
@@ -52,14 +53,6 @@ else:
logger = logging.getLogger(__name__)
def _default_edge_groups() -> list[EdgeGroup]:
return []
def _default_executors() -> dict[str, Executor]:
return {}
class WorkflowRunResult(list[WorkflowEvent]):
"""A list of events generated during the workflow execution in non-streaming mode.
@@ -125,10 +118,10 @@ class Workflow(AFBaseModel):
"""
edge_groups: list[EdgeGroup] = Field(
default_factory=_default_edge_groups, description="List of edge groups that define the workflow edges"
default_factory=list, description="List of edge groups that define the workflow edges"
)
executors: dict[str, Executor] = Field(
default_factory=_default_executors, description="Dictionary mapping executor IDs to Executor instances"
default_factory=dict, description="Dictionary mapping executor IDs to Executor instances"
)
start_executor_id: str = Field(min_length=1, description="The ID of the starting executor for the workflow")
max_iterations: int = Field(
@@ -190,20 +183,21 @@ class Workflow(AFBaseModel):
# Ensure WorkflowExecutor instances have their workflow field serialized
if "executors" in data:
executors_data = cast(dict[str, Any], data["executors"])
executor_map: dict[str, Executor] = self.executors
executors_data = data["executors"]
for executor_id, executor_data in executors_data.items():
# Check if this is a WorkflowExecutor that might be missing its workflow field
if isinstance(executor_data, dict):
executor_dict = cast(dict[str, Any], executor_data)
if executor_dict.get("type") == "WorkflowExecutor" and "workflow" not in executor_dict:
# Get the original executor object and serialize its workflow
original_executor = executor_map.get(executor_id)
if original_executor is not None and hasattr(original_executor, "workflow"):
from ._executor import WorkflowExecutor
if (
isinstance(executor_data, dict)
and executor_data.get("type") == "WorkflowExecutor"
and "workflow" not in executor_data
):
# Get the original executor object and serialize its workflow
original_executor = self.executors.get(executor_id)
if original_executor and hasattr(original_executor, "workflow"):
from ._executor import WorkflowExecutor
if isinstance(original_executor, WorkflowExecutor):
executor_dict["workflow"] = original_executor.workflow.model_dump(**kwargs)
if isinstance(original_executor, WorkflowExecutor):
executor_data["workflow"] = original_executor.workflow.model_dump(**kwargs)
return data
@@ -252,8 +246,12 @@ class Workflow(AFBaseModel):
# Add workflow started event (telemetry + surface state to consumers)
workflow_tracer.add_workflow_event("workflow.started")
# Emit explicit start/status events to the stream
yield WorkflowStartedEvent()
yield WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS)
with _framework_event_origin():
started = WorkflowStartedEvent()
yield started
with _framework_event_origin():
in_progress = WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS)
yield in_progress
# Reset context for a new run if supported
if reset_context:
@@ -274,22 +272,34 @@ class Workflow(AFBaseModel):
if isinstance(event, RequestInfoEvent) and not emitted_in_progress_pending and not saw_completed:
emitted_in_progress_pending = True
yield WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS)
with _framework_event_origin():
pending_status = WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS)
yield pending_status
# Success path: emit a final status based on observed terminal signals
if saw_completed:
yield WorkflowStatusEvent(WorkflowRunState.COMPLETED)
with _framework_event_origin():
terminal_status = WorkflowStatusEvent(WorkflowRunState.COMPLETED)
yield terminal_status
elif saw_request:
yield WorkflowStatusEvent(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS)
with _framework_event_origin():
terminal_status = WorkflowStatusEvent(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS)
yield terminal_status
else:
yield WorkflowStatusEvent(WorkflowRunState.IDLE)
with _framework_event_origin():
terminal_status = WorkflowStatusEvent(WorkflowRunState.IDLE)
yield terminal_status
workflow_tracer.add_workflow_event("workflow.completed")
except Exception as e:
# Surface structured failure details before propagating exception
details = WorkflowErrorDetails.from_exception(e)
yield WorkflowFailedEvent(details)
yield WorkflowStatusEvent(WorkflowRunState.FAILED)
with _framework_event_origin():
failed_event = WorkflowFailedEvent(details)
yield failed_event
with _framework_event_origin():
failed_status = WorkflowStatusEvent(WorkflowRunState.FAILED)
yield failed_status
workflow_tracer.add_workflow_error_event(e)
raise
@@ -1,10 +1,19 @@
# Copyright (c) Microsoft. All rights reserved.
from typing import Any, Generic, TypeVar
import logging
from typing import Any, Generic, TypeVar, cast, get_args
from opentelemetry.propagate import inject
from ._events import WorkflowEvent
from ._events import (
WorkflowEvent,
WorkflowEventSource,
WorkflowFailedEvent,
WorkflowLifecycleEvent,
WorkflowStartedEvent,
WorkflowStatusEvent,
WorkflowWarningEvent,
)
from ._runner_context import Message, RunnerContext
from ._shared_state import SharedState
from ._telemetry import workflow_tracer
@@ -12,6 +21,20 @@ from ._telemetry import workflow_tracer
T_Out = TypeVar("T_Out")
logger = logging.getLogger(__name__)
_FRAMEWORK_LIFECYCLE_EVENT_TYPES: tuple[type[WorkflowEvent], ...] = cast(
tuple[type[WorkflowEvent], ...],
tuple(get_args(WorkflowLifecycleEvent))
or (
WorkflowStartedEvent,
WorkflowStatusEvent,
WorkflowFailedEvent,
),
)
class WorkflowContext(Generic[T_Out]):
"""Context for executors in a workflow.
@@ -77,6 +100,16 @@ class WorkflowContext(Generic[T_Out]):
async def add_event(self, event: WorkflowEvent) -> None:
"""Add an event to the workflow context."""
if event.origin == WorkflowEventSource.EXECUTOR and isinstance(event, _FRAMEWORK_LIFECYCLE_EVENT_TYPES):
event_name = event.__class__.__name__
warning_msg = (
f"Executor '{self._executor_id}' attempted to emit {event_name}, "
"which is reserved for framework lifecycle notifications. The "
"event was ignored."
)
logger.warning(warning_msg)
await self._runner_context.add_event(WorkflowWarningEvent(warning_msg))
return
await self._runner_context.add_event(event)
async def get_shared_state(self, key: str) -> Any:
@@ -5,10 +5,19 @@ from dataclasses import dataclass
import pytest
from agent_framework import Executor, WorkflowCompletedEvent, WorkflowContext, WorkflowEvent, handler
from agent_framework import (
AgentExecutorResponse,
AgentRunResponse,
Executor,
WorkflowCompletedEvent,
WorkflowContext,
WorkflowEvent,
WorkflowEventSource,
handler,
)
from agent_framework._workflow._edge import SingleEdgeGroup
from agent_framework._workflow._runner import Runner
from agent_framework._workflow._runner_context import InProcRunnerContext, RunnerContext
from agent_framework._workflow._runner_context import InProcRunnerContext, Message, RunnerContext
from agent_framework._workflow._shared_state import SharedState
@@ -79,6 +88,7 @@ async def test_runner_run_until_convergence():
assert isinstance(event, WorkflowEvent)
if isinstance(event, WorkflowCompletedEvent):
result = event.data
assert event.origin is WorkflowEventSource.EXECUTOR
assert result is not None and result == 10
@@ -148,3 +158,20 @@ async def test_runner_already_running():
pass
await asyncio.gather(_run(), _run())
async def test_runner_emits_runner_completion_for_agent_response_without_targets():
ctx = InProcRunnerContext()
runner = Runner([], {}, SharedState(), ctx)
await ctx.send_message(
Message(
data=AgentExecutorResponse("agent", AgentRunResponse()),
source_id="agent",
)
)
events: list[WorkflowEvent] = [event async for event in runner.run_until_convergence()]
completions = [e for e in events if isinstance(e, WorkflowCompletedEvent)]
assert completions
assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in completions)
@@ -3,8 +3,6 @@
import asyncio
from dataclasses import dataclass
import pytest
from agent_framework import (
Executor,
WorkflowBuilder,
@@ -64,7 +62,6 @@ class SimpleParent(Executor):
self.result = response
@pytest.mark.asyncio
async def test_simple_sub_workflow():
"""Test the simplest possible sub-workflow."""
# Create sub-workflow with dummy executor to satisfy validation
@@ -4,7 +4,6 @@ import asyncio
from dataclasses import dataclass
from typing import Any
import pytest
from pydantic import Field
from agent_framework import (
@@ -120,7 +119,6 @@ class ParentOrchestrator(Executor):
self.results.append(result)
@pytest.mark.asyncio
async def test_basic_sub_workflow() -> None:
"""Test basic sub-workflow execution without interception."""
# Create sub-workflow
@@ -185,7 +183,6 @@ async def test_basic_sub_workflow() -> None:
assert parent.result.is_valid is True
@pytest.mark.asyncio
async def test_sub_workflow_with_interception():
"""Test sub-workflow with parent interception of requests."""
# Create sub-workflow
@@ -248,7 +245,6 @@ async def test_sub_workflow_with_interception():
assert parent.results[0].reason == "Domain not approved"
@pytest.mark.asyncio
async def test_conditional_forwarding() -> None:
"""Test conditional forwarding with RequestResponse.forward()."""
@@ -326,7 +322,6 @@ async def test_conditional_forwarding() -> None:
assert parent.result.is_valid is True
@pytest.mark.asyncio
async def test_workflow_scoped_interception() -> None:
"""Test interception scoped to specific sub-workflows."""
@@ -273,7 +273,6 @@ async def test_trace_context_handling(tracing_enabled: Any, span_exporter: InMem
assert processing_span.attributes.get("message.type") == "str"
@pytest.mark.asyncio
async def test_trace_context_disabled_when_tracing_disabled() -> None:
"""Test that no trace context is added when tracing is disabled."""
# Tracing should be disabled by default
@@ -299,7 +298,6 @@ async def test_trace_context_disabled_when_tracing_disabled() -> None:
assert message.source_span_id is None
@pytest.mark.asyncio
async def test_end_to_end_workflow_tracing(tracing_enabled: Any, span_exporter: InMemorySpanExporter) -> None:
"""Test end-to-end tracing including workflow build, execution, and span linking with fan-in edges."""
# Create executors for fan-in scenario
@@ -424,7 +422,6 @@ async def test_end_to_end_workflow_tracing(tracing_enabled: Any, span_exporter:
assert len(aggregator_span.links) >= 2, f"Expected at least 2 links, got {len(aggregator_span.links)}"
@pytest.mark.asyncio
async def test_workflow_error_handling_in_tracing(tracing_enabled: Any, span_exporter: InMemorySpanExporter) -> None:
"""Test that workflow errors are properly recorded in traces."""
@@ -460,7 +457,6 @@ async def test_workflow_error_handling_in_tracing(tracing_enabled: Any, span_exp
assert workflow_span.status.status_code.name == "ERROR"
@pytest.mark.asyncio
async def test_message_trace_context_serialization() -> None:
"""Test that message trace context is properly serialized/deserialized."""
ctx = InProcRunnerContext()
@@ -495,7 +491,6 @@ async def test_message_trace_context_serialization() -> None:
assert restored_msg.source_span_ids == ["span123"] # Test new format
@pytest.mark.asyncio
async def test_workflow_build_error_tracing(tracing_enabled: Any, span_exporter: InMemorySpanExporter) -> None:
"""Test that build errors are properly recorded in build spans."""
@@ -231,7 +231,7 @@ async def test_fan_out():
events = await workflow.run(NumberMessage(data=0))
# Each executor will emit two events: ExecutorInvokeEvent and ExecutorCompletedEvent
# Each executor will emit two events: ExecutorInvokedEvent and ExecutorCompletedEvent
# executor_b will also emit a WorkflowCompletedEvent
assert len(events) == 7
@@ -251,7 +251,7 @@ async def test_fan_out_multiple_completed_events():
events = await workflow.run(NumberMessage(data=0))
# Each executor will emit two events: ExecutorInvokeEvent and ExecutorCompletedEvent
# Each executor will emit two events: ExecutorInvokedEvent and ExecutorCompletedEvent
# executor_a and executor_b will also emit a WorkflowCompletedEvent
assert len(events) == 8
@@ -276,7 +276,7 @@ async def test_fan_in():
events = await workflow.run(NumberMessage(data=0))
# Each executor will emit two events: ExecutorInvokeEvent and ExecutorCompletedEvent
# Each executor will emit two events: ExecutorInvokedEvent and ExecutorCompletedEvent
# aggregator will also emit a WorkflowCompletedEvent
assert len(events) == 9
@@ -667,10 +667,10 @@ async def test_workflow_with_simple_cycle_and_exit_condition():
) # Should complete when executor_a reaches its limit
# Verify cycling occurred (should have events from both executors)
# Check for ExecutorInvokeEvent and ExecutorCompletedEvent types that have executor_id
from agent_framework import ExecutorCompletedEvent, ExecutorInvokeEvent
# Check for ExecutorInvokedEvent and ExecutorCompletedEvent types that have executor_id
from agent_framework import ExecutorCompletedEvent, ExecutorInvokedEvent
executor_events = [e for e in events if isinstance(e, (ExecutorInvokeEvent, ExecutorCompletedEvent))]
executor_events = [e for e in events if isinstance(e, (ExecutorInvokedEvent, ExecutorCompletedEvent))]
executor_ids = {e.executor_id for e in executor_events}
assert "exec_a" in executor_ids, "Should have events from executor A"
assert "exec_b" in executor_ids, "Should have events from executor B"
@@ -76,7 +76,6 @@ class RequestingExecutor(Executor):
class TestWorkflowAgent:
"""Test cases for WorkflowAgent end-to-end functionality."""
@pytest.mark.asyncio
async def test_end_to_end_basic_workflow(self):
"""Test basic end-to-end workflow execution with 2 executors emitting AgentRunEvent."""
# Create workflow with two executors
@@ -117,7 +116,6 @@ class TestWorkflowAgent:
assert "Step1: Hello World" in step1_text
assert "Step2: Step1: Hello World" in step2_text
@pytest.mark.asyncio
async def test_end_to_end_basic_workflow_streaming(self):
"""Test end-to-end workflow with streaming executor that emits AgentRunStreamingEvent."""
# Create a single streaming executor
@@ -146,7 +144,6 @@ class TestWorkflowAgent:
assert isinstance(second_content, TextContent)
assert "Streaming2: Streaming1: Test input" in second_content.text
@pytest.mark.asyncio
async def test_end_to_end_request_info_handling(self):
"""Test end-to-end workflow with RequestInfoEvent handling."""
# Create workflow with requesting executor -> request info executor (no cycle)
@@ -0,0 +1,64 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
from agent_framework import (
WorkflowCompletedEvent,
WorkflowContext,
WorkflowEvent,
WorkflowRunState,
WorkflowStatusEvent,
)
if TYPE_CHECKING:
from _pytest.logging import LogCaptureFixture
from agent_framework._workflow._runner_context import InProcRunnerContext
@asynccontextmanager
async def make_context(
executor_id: str = "exec",
) -> AsyncIterator[tuple[WorkflowContext[object], "InProcRunnerContext"]]:
from agent_framework._workflow._runner_context import InProcRunnerContext
from agent_framework._workflow._shared_state import SharedState
runner_ctx = InProcRunnerContext()
shared_state = SharedState()
workflow_ctx: WorkflowContext[object] = WorkflowContext(
executor_id,
["source"],
shared_state,
runner_ctx,
)
try:
yield workflow_ctx, runner_ctx
finally:
await asyncio.sleep(0)
async def test_executor_cannot_emit_framework_lifecycle_event(caplog: "LogCaptureFixture") -> None:
async with make_context() as (ctx, runner_ctx):
caplog.clear()
with caplog.at_level("WARNING"):
await ctx.add_event(WorkflowStatusEvent(state=WorkflowRunState.IN_PROGRESS))
events: list[WorkflowEvent] = await runner_ctx.drain_events()
assert len(events) == 1
assert type(events[0]).__name__ == "WorkflowWarningEvent"
data = getattr(events[0], "data", None)
assert isinstance(data, str)
assert "reserved for framework lifecycle notifications" in data
assert any("attempted to emit WorkflowStatusEvent" in message for message in list(caplog.messages))
async def test_executor_emits_normal_event() -> None:
async with make_context() as (ctx, runner_ctx):
await ctx.add_event(WorkflowCompletedEvent("done"))
events: list[WorkflowEvent] = await runner_ctx.drain_events()
assert len(events) == 1
assert isinstance(events[0], WorkflowCompletedEvent)
@@ -14,9 +14,11 @@ from agent_framework import (
WorkflowBuilder,
WorkflowCompletedEvent,
WorkflowContext,
WorkflowEventSource,
WorkflowFailedEvent,
WorkflowRunResult,
WorkflowRunState,
WorkflowStartedEvent,
WorkflowStatusEvent,
handler,
)
@@ -41,9 +43,12 @@ async def test_executor_failed_and_workflow_failed_events_streaming():
events.append(ev)
# Workflow-level failure and FAILED status should be surfaced
assert any(isinstance(e, WorkflowFailedEvent) for e in events)
failed_events = [e for e in events if isinstance(e, WorkflowFailedEvent)]
assert failed_events
assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in failed_events)
status = [e for e in events if isinstance(e, WorkflowStatusEvent)]
assert status and status[-1].state == WorkflowRunState.FAILED
assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in status)
async def test_executor_failed_event_emitted_on_direct_execute():
@@ -58,7 +63,9 @@ async def test_executor_failed_event_emitted_on_direct_execute():
with pytest.raises(RuntimeError, match="boom"):
await failing.execute(0, wf_ctx)
drained = await ctx.drain_events()
assert any(isinstance(e, ExecutorFailedEvent) for e in drained)
failed = [e for e in drained if isinstance(e, ExecutorFailedEvent)]
assert failed
assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in failed)
class Requester(Executor):
@@ -99,6 +106,19 @@ async def test_completed_status_streaming():
# Last status should be COMPLETED
status = [e for e in events if isinstance(e, WorkflowStatusEvent)]
assert status and status[-1].state == WorkflowRunState.COMPLETED
assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in status)
async def test_started_and_completed_event_origins():
c = Completer(id="c-origin")
wf = WorkflowBuilder().set_start_executor(c).build()
events = [ev async for ev in wf.run_stream("payload")]
started = next(e for e in events if isinstance(e, WorkflowStartedEvent))
assert started.origin is WorkflowEventSource.FRAMEWORK
completed = next(e for e in events if isinstance(e, WorkflowCompletedEvent))
assert completed.origin is WorkflowEventSource.EXECUTOR
async def test_non_streaming_final_state_helpers():
@@ -29,7 +29,8 @@ Purpose:
Show how to wrap chat agents created by AzureChatClient inside workflow executors, wire them with WorkflowBuilder,
and consume streaming events from the workflow. Demonstrate the @handler pattern with typed inputs and typed
WorkflowContext[T] outputs, and finish by emitting a WorkflowCompletedEvent from the terminal node while printing
intermediate events for observability.
intermediate events for observability. The streaming loop also surfaces WorkflowEvent.origin so you can
distinguish runner-generated lifecycle events from executor-generated data-plane events.
Prerequisites:
- Azure OpenAI configured for AzureChatClient with required environment variables.
@@ -123,36 +124,42 @@ async def main():
ChatMessage(role="user", text="Create a slogan for a new electric SUV that is affordable and fun to drive.")
):
if isinstance(event, WorkflowStatusEvent):
prefix = f"State ({event.origin.value}): "
if event.state == WorkflowRunState.IN_PROGRESS:
print("State: IN_PROGRESS")
print(prefix + "IN_PROGRESS")
elif event.state == WorkflowRunState.COMPLETED:
print("State: COMPLETED")
print(prefix + "COMPLETED")
elif event.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS:
print("State: IN_PROGRESS_PENDING_REQUESTS (requests in flight)")
print(prefix + "IN_PROGRESS_PENDING_REQUESTS (requests in flight)")
elif event.state == WorkflowRunState.IDLE:
print("State: IDLE (no active work)")
print(prefix + "IDLE (no active work)")
elif event.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS:
print("State: IDLE_WITH_PENDING_REQUESTS (prompt user or UI now)")
print(prefix + "IDLE_WITH_PENDING_REQUESTS (prompt user or UI now)")
else:
print(f"State: {event.state}")
print(prefix + str(event.state))
elif isinstance(event, WorkflowCompletedEvent):
print(f"Workflow completed ({event.origin.value}): {event.data}")
elif isinstance(event, ExecutorFailedEvent):
print(f"Executor failed: {event.executor_id} {event.details.error_type}: {event.details.message}")
print(
f"Executor failed ({event.origin.value}): "
f"{event.executor_id} {event.details.error_type}: {event.details.message}"
)
elif isinstance(event, WorkflowFailedEvent):
details = event.details
print(f"Workflow failed: {details.error_type}: {details.message}")
print(f"Workflow failed ({event.origin.value}): {details.error_type}: {details.message}")
else:
print(event)
print(f"{event.__class__.__name__} ({event.origin.value}): {event}")
"""
Sample Output:
State: IN_PROGRESS
ExecutorInvokeEvent(executor_id=writer)
ExecutorCompletedEvent(executor_id=writer)
ExecutorInvokeEvent(executor_id=reviewer)
WorkflowCompletedEvent(data=Drive the Future. Affordable Adventure, Electrified.)
ExecutorCompletedEvent(executor_id=reviewer)
State: COMPLETED
State (RUNNER): IN_PROGRESS
ExecutorInvokeEvent (RUNNER): ExecutorInvokeEvent(executor_id=writer)
ExecutorCompletedEvent (RUNNER): ExecutorCompletedEvent(executor_id=writer)
ExecutorInvokeEvent (RUNNER): ExecutorInvokeEvent(executor_id=reviewer)
Workflow completed (EXECUTOR): Drive the Future. Affordable Adventure, Electrified.
ExecutorCompletedEvent (RUNNER): ExecutorCompletedEvent(executor_id=reviewer)
State (RUNNER): COMPLETED
"""
@@ -275,15 +275,15 @@ async def main():
Running workflow with initial message...
UpperCaseExecutor: 'hello world' -> 'HELLO WORLD'
Event: ExecutorInvokeEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
ReverseTextExecutor: 'HELLO WORLD' -> 'DLROW OLLEH'
Event: ExecutorInvokeEvent(executor_id=reverse_text_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
LowerAgent (shared_state): original_input='hello world', upper_output='HELLO WORLD'
Event: ExecutorInvokeEvent(executor_id=submit_lower)
Event: ExecutorInvokeEvent(executor_id=lower_agent)
Event: ExecutorInvokeEvent(executor_id=finalize)
Event: ExecutorInvokedEvent(executor_id=submit_lower)
Event: ExecutorInvokedEvent(executor_id=lower_agent)
Event: ExecutorInvokedEvent(executor_id=finalize)
Event: WorkflowCompletedEvent(data=dlrow olleh)
Checkpoint summary:
@@ -300,9 +300,9 @@ async def main():
Resuming from checkpoint: a78c345a-e5d9-45ba-82c0-cb725452d91b
LowerAgent (shared_state): original_input='hello world', upper_output='HELLO WORLD'
Resumed Event: ExecutorInvokeEvent(executor_id=submit_lower)
Resumed Event: ExecutorInvokeEvent(executor_id=lower_agent)
Resumed Event: ExecutorInvokeEvent(executor_id=finalize)
Resumed Event: ExecutorInvokedEvent(executor_id=submit_lower)
Resumed Event: ExecutorInvokedEvent(executor_id=lower_agent)
Resumed Event: ExecutorInvokedEvent(executor_id=finalize)
Resumed Event: WorkflowCompletedEvent(data=dlrow olleh)
""" # noqa: E501
@@ -13,7 +13,7 @@ The second reverses the text and completes the workflow. Events are printed as t
Purpose:
Show how to declare executors with the @executor decorator, connect them with WorkflowBuilder,
pass intermediate values using ctx.send_message, and signal completion with ctx.add_event by emitting a
WorkflowCompletedEvent. Demonstrate how streaming exposes ExecutorInvokeEvent and WorkflowCompletedEvent
WorkflowCompletedEvent. Demonstrate how streaming exposes ExecutorInvokedEvent and WorkflowCompletedEvent
for observability.
Prerequisites:
@@ -72,9 +72,9 @@ async def main():
"""
Sample Output:
Event: ExecutorInvokeEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokeEvent(executor_id=reverse_text_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: WorkflowCompletedEvent(data=DLROW OLLEH)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH