diff --git a/python/packages/main/agent_framework/_workflow/__init__.py b/python/packages/main/agent_framework/_workflow/__init__.py index f8c3955c9d..8848794a74 100644 --- a/python/packages/main/agent_framework/_workflow/__init__.py +++ b/python/packages/main/agent_framework/_workflow/__init__.py @@ -31,12 +31,14 @@ from ._events import ( ExecutorCompletedEvent, ExecutorEvent, ExecutorFailedEvent, - ExecutorInvokeEvent, + ExecutorInvokedEvent, RequestInfoEvent, WorkflowCompletedEvent, WorkflowErrorDetails, WorkflowEvent, + WorkflowEventSource, WorkflowFailedEvent, + WorkflowLifecycleEvent, WorkflowRunState, WorkflowStartedEvent, WorkflowStatusEvent, @@ -118,7 +120,7 @@ __all__ = [ "ExecutorCompletedEvent", "ExecutorEvent", "ExecutorFailedEvent", - "ExecutorInvokeEvent", + "ExecutorInvokedEvent", "FanInEdgeGroup", "FanOutEdgeGroup", "FileCheckpointStorage", @@ -172,8 +174,10 @@ __all__ = [ "WorkflowContext", "WorkflowErrorDetails", "WorkflowEvent", + "WorkflowEventSource", "WorkflowExecutor", "WorkflowFailedEvent", + "WorkflowLifecycleEvent", "WorkflowRunResult", "WorkflowRunState", "WorkflowStartedEvent", diff --git a/python/packages/main/agent_framework/_workflow/__init__.pyi b/python/packages/main/agent_framework/_workflow/__init__.pyi index 506cac18e3..90d2a4322e 100644 --- a/python/packages/main/agent_framework/_workflow/__init__.pyi +++ b/python/packages/main/agent_framework/_workflow/__init__.pyi @@ -27,12 +27,14 @@ from ._events import ( ExecutorCompletedEvent, ExecutorEvent, ExecutorFailedEvent, - ExecutorInvokeEvent, + ExecutorInvokedEvent, RequestInfoEvent, WorkflowCompletedEvent, WorkflowErrorDetails, WorkflowEvent, + WorkflowEventSource, WorkflowFailedEvent, + WorkflowLifecycleEvent, WorkflowRunState, WorkflowStartedEvent, WorkflowStatusEvent, @@ -114,7 +116,7 @@ __all__ = [ "ExecutorCompletedEvent", "ExecutorEvent", "ExecutorFailedEvent", - "ExecutorInvokeEvent", + "ExecutorInvokedEvent", "FanInEdgeGroup", "FanOutEdgeGroup", "FileCheckpointStorage", @@ -168,8 +170,10 @@ __all__ = [ "WorkflowContext", "WorkflowErrorDetails", "WorkflowEvent", + "WorkflowEventSource", "WorkflowExecutor", "WorkflowFailedEvent", + "WorkflowLifecycleEvent", "WorkflowRunResult", "WorkflowRunState", "WorkflowStartedEvent", diff --git a/python/packages/main/agent_framework/_workflow/_events.py b/python/packages/main/agent_framework/_workflow/_events.py index a04136eb0e..d8c16aeb4c 100644 --- a/python/packages/main/agent_framework/_workflow/_events.py +++ b/python/packages/main/agent_framework/_workflow/_events.py @@ -1,9 +1,12 @@ # Copyright (c) Microsoft. All rights reserved. import traceback as _traceback +from collections.abc import Iterator +from contextlib import contextmanager +from contextvars import ContextVar from dataclasses import dataclass from enum import Enum -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypeAlias from agent_framework import AgentRunResponse, AgentRunResponseUpdate @@ -11,32 +14,70 @@ if TYPE_CHECKING: from ._executor import RequestInfoMessage +class WorkflowEventSource(str, Enum): + """Identifies whether a workflow event came from the framework or an executor. + + Use `FRAMEWORK` for events emitted by built-in orchestration paths—even when the + code that raises them lives in runner-related modules—and `EXECUTOR` for events + surfaced by developer-provided executor implementations. + """ + + FRAMEWORK = "FRAMEWORK" # Framework-owned orchestration, regardless of module location + EXECUTOR = "EXECUTOR" # User-supplied executor code and callbacks + + +_event_origin_context: ContextVar[WorkflowEventSource] = ContextVar( + "workflow_event_origin", default=WorkflowEventSource.EXECUTOR +) + + +def _current_event_origin() -> WorkflowEventSource: + """Return the origin to associate with newly created workflow events.""" + return _event_origin_context.get() + + +@contextmanager +def _framework_event_origin() -> Iterator[None]: # pyright: ignore[reportUnusedFunction] + """Temporarily mark subsequently created events as originating from the framework (internal).""" + token = _event_origin_context.set(WorkflowEventSource.FRAMEWORK) + try: + yield + finally: + _event_origin_context.reset(token) + + class WorkflowEvent: """Base class for workflow events.""" def __init__(self, data: Any | None = None): """Initialize the workflow event with optional data.""" self.data = data + self.origin = _current_event_origin() def __repr__(self) -> str: """Return a string representation of the workflow event.""" - return f"{self.__class__.__name__}(data={self.data if self.data is not None else 'None'})" + data_repr = self.data if self.data is not None else "None" + return f"{self.__class__.__name__}(origin={self.origin}, data={data_repr})" class WorkflowStartedEvent(WorkflowEvent): - """Event triggered when a workflow starts.""" + """Built-in lifecycle event emitted when a workflow run begins.""" ... class WorkflowCompletedEvent(WorkflowEvent): - """Event triggered when a workflow completes.""" + """Built-in lifecycle event emitted when a workflow run completes successfully. + + Unlike the framework-only `WorkflowLifecycleEvent` union, this event can be + emitted by developer-provided executors to return final workflow output. + """ ... class WorkflowWarningEvent(WorkflowEvent): - """Event triggered when a warning occurs in the workflow.""" + """Executor-origin event signaling a warning surfaced by user code.""" def __init__(self, data: str): """Initialize the workflow warning event with optional data and warning message.""" @@ -44,11 +85,11 @@ class WorkflowWarningEvent(WorkflowEvent): def __repr__(self) -> str: """Return a string representation of the workflow warning event.""" - return f"{self.__class__.__name__}(message={self.data})" + return f"{self.__class__.__name__}(message={self.data}, origin={self.origin})" class WorkflowErrorEvent(WorkflowEvent): - """Event triggered when an error occurs in the workflow.""" + """Executor-origin event signaling an error surfaced by user code.""" def __init__(self, data: Exception): """Initialize the workflow error event with optional data and error message.""" @@ -56,7 +97,7 @@ class WorkflowErrorEvent(WorkflowEvent): def __repr__(self) -> str: """Return a string representation of the workflow error event.""" - return f"{self.__class__.__name__}(exception={self.data})" + return f"{self.__class__.__name__}(exception={self.data}, origin={self.origin})" class WorkflowRunState(str, Enum): @@ -108,14 +149,24 @@ class WorkflowRunState(str, Enum): class WorkflowStatusEvent(WorkflowEvent): - """Event indicating a transition in the workflow run state.""" + """Built-in lifecycle event emitted for workflow run state transitions.""" - def __init__(self, state: WorkflowRunState, data: Any | None = None): + def __init__( + self, + state: WorkflowRunState, + data: Any | None = None, + ): + """Initialize the workflow status event with a new state and optional data. + + Args: + state: The new state of the workflow run. + data: Optional additional data associated with the state change. + """ super().__init__(data) self.state = state def __repr__(self) -> str: # pragma: no cover - representation only - return f"{self.__class__.__name__}(state={self.state}, data={self.data!r})" + return f"{self.__class__.__name__}(state={self.state}, data={self.data!r}, origin={self.origin})" @dataclass @@ -151,14 +202,18 @@ class WorkflowErrorDetails: class WorkflowFailedEvent(WorkflowEvent): - """Terminal failure event for a workflow run.""" + """Built-in lifecycle event emitted when a workflow run terminates with an error.""" - def __init__(self, details: WorkflowErrorDetails, data: Any | None = None): + def __init__( + self, + details: WorkflowErrorDetails, + data: Any | None = None, + ): super().__init__(data) self.details = details def __repr__(self) -> str: # pragma: no cover - representation only - return f"{self.__class__.__name__}(details={self.details}, data={self.data!r})" + return f"{self.__class__.__name__}(details={self.details}, data={self.data!r}, origin={self.origin})" class RequestInfoEvent(WorkflowEvent): @@ -208,12 +263,12 @@ class ExecutorEvent(WorkflowEvent): return f"{self.__class__.__name__}(executor_id={self.executor_id}, data={self.data})" -class ExecutorInvokeEvent(ExecutorEvent): +class ExecutorInvokedEvent(ExecutorEvent): """Event triggered when an executor handler is invoked.""" def __repr__(self) -> str: """Return a string representation of the executor handler invoke event.""" - return f"{self.__class__.__name__}(executor_id={self.executor_id})" + return f"{self.__class__.__name__}(executor_id={self.executor_id}, data={self.data})" class ExecutorCompletedEvent(ExecutorEvent): @@ -221,13 +276,17 @@ class ExecutorCompletedEvent(ExecutorEvent): def __repr__(self) -> str: """Return a string representation of the executor handler complete event.""" - return f"{self.__class__.__name__}(executor_id={self.executor_id})" + return f"{self.__class__.__name__}(executor_id={self.executor_id}, data={self.data})" class ExecutorFailedEvent(ExecutorEvent): """Event triggered when an executor handler raises an error.""" - def __init__(self, executor_id: str, details: WorkflowErrorDetails): + def __init__( + self, + executor_id: str, + details: WorkflowErrorDetails, + ): super().__init__(executor_id, details) self.details = details @@ -257,3 +316,6 @@ class AgentRunEvent(ExecutorEvent): def __repr__(self) -> str: """Return a string representation of the agent run event.""" return f"{self.__class__.__name__}(executor_id={self.executor_id}, data={self.data})" + + +WorkflowLifecycleEvent: TypeAlias = WorkflowStartedEvent | WorkflowStatusEvent | WorkflowFailedEvent diff --git a/python/packages/main/agent_framework/_workflow/_executor.py b/python/packages/main/agent_framework/_workflow/_executor.py index 83763bc02a..8a563e21f9 100644 --- a/python/packages/main/agent_framework/_workflow/_executor.py +++ b/python/packages/main/agent_framework/_workflow/_executor.py @@ -21,8 +21,9 @@ from ._events import ( AgentRunEvent, AgentRunUpdateEvent, ExecutorCompletedEvent, - ExecutorInvokeEvent, + ExecutorInvokedEvent, RequestInfoEvent, + _framework_event_origin, # pyright: ignore[reportPrivateUsage] ) from ._typing_utils import is_instance_of from ._workflow_context import WorkflowContext @@ -102,16 +103,22 @@ class Executor(AFBaseModel): # Lazy registration for SubWorkflowRequestInfo if we have interceptors if self._request_interceptors and message.__class__.__name__ == "SubWorkflowRequestInfo": # Directly handle SubWorkflowRequestInfo - await context.add_event(ExecutorInvokeEvent(self.id)) + with _framework_event_origin(): + invoke_event = ExecutorInvokedEvent(self.id) + await context.add_event(invoke_event) try: await self._handle_sub_workflow_request(message, context) except Exception as exc: # Surface structured executor failure before propagating from ._events import ExecutorFailedEvent, WorkflowErrorDetails - await context.add_event(ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc))) + with _framework_event_origin(): + failure_event = ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc)) + await context.add_event(failure_event) raise - await context.add_event(ExecutorCompletedEvent(self.id)) + with _framework_event_origin(): + completed_event = ExecutorCompletedEvent(self.id) + await context.add_event(completed_event) return handler: Callable[[Any, WorkflowContext[Any]], Any] | None = None @@ -122,16 +129,22 @@ class Executor(AFBaseModel): if handler is None: raise RuntimeError(f"Executor {self.__class__.__name__} cannot handle message of type {type(message)}.") - await context.add_event(ExecutorInvokeEvent(self.id)) + with _framework_event_origin(): + invoke_event = ExecutorInvokedEvent(self.id) + await context.add_event(invoke_event) try: await handler(message, context) except Exception as exc: # Surface structured executor failure before propagating from ._events import ExecutorFailedEvent, WorkflowErrorDetails - await context.add_event(ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc))) + with _framework_event_origin(): + failure_event = ExecutorFailedEvent(self.id, WorkflowErrorDetails.from_exception(exc)) + await context.add_event(failure_event) raise - await context.add_event(ExecutorCompletedEvent(self.id)) + with _framework_event_origin(): + completed_event = ExecutorCompletedEvent(self.id) + await context.add_event(completed_event) def _discover_handlers(self) -> None: """Discover message handlers and request interceptors in the executor class.""" diff --git a/python/packages/main/agent_framework/_workflow/_runner.py b/python/packages/main/agent_framework/_workflow/_runner.py index 05e81619df..b501ff205f 100644 --- a/python/packages/main/agent_framework/_workflow/_runner.py +++ b/python/packages/main/agent_framework/_workflow/_runner.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from ._edge import EdgeGroup from ._edge_runner import EdgeRunner, create_edge_runner -from ._events import WorkflowCompletedEvent, WorkflowEvent +from ._events import WorkflowCompletedEvent, WorkflowEvent, _framework_event_origin from ._executor import Executor from ._runner_context import Message, RunnerContext from ._shared_state import SharedState @@ -280,7 +280,9 @@ class Runner: if isinstance(message.data, AgentExecutorResponse): final_messages = message.data.agent_run_response.messages final_text = final_messages[-1].text if final_messages else "(no content)" - await self._ctx.add_event(WorkflowCompletedEvent(final_text)) + with _framework_event_origin(): + completion_event = WorkflowCompletedEvent(final_text) + await self._ctx.add_event(completion_event) continue # Terminal handled except Exception as exc: # pragma: no cover - defensive logger.debug("Suppressed exception during terminal message type check: %s", exc) @@ -301,7 +303,9 @@ class Runner: # Emit a single completion event with final text (best-effort extraction) final_messages = message.data.agent_run_response.messages final_text = final_messages[-1].text if final_messages else "(no content)" - await self._ctx.add_event(WorkflowCompletedEvent(final_text)) + with _framework_event_origin(): + completion_event = WorkflowCompletedEvent(final_text) + await self._ctx.add_event(completion_event) continue except Exception as exc: # pragma: no cover logger.debug("Terminal completion emission failed: %s", exc) diff --git a/python/packages/main/agent_framework/_workflow/_workflow.py b/python/packages/main/agent_framework/_workflow/_workflow.py index d4915fb8bd..2fe61c2209 100644 --- a/python/packages/main/agent_framework/_workflow/_workflow.py +++ b/python/packages/main/agent_framework/_workflow/_workflow.py @@ -5,7 +5,7 @@ import logging import sys import uuid from collections.abc import AsyncIterable, Awaitable, Callable, Sequence -from typing import Any, cast +from typing import Any from pydantic import Field @@ -35,6 +35,7 @@ from ._events import ( WorkflowRunState, WorkflowStartedEvent, WorkflowStatusEvent, + _framework_event_origin, ) from ._executor import AgentExecutor, Executor, RequestInfoExecutor from ._runner import Runner @@ -52,14 +53,6 @@ else: logger = logging.getLogger(__name__) -def _default_edge_groups() -> list[EdgeGroup]: - return [] - - -def _default_executors() -> dict[str, Executor]: - return {} - - class WorkflowRunResult(list[WorkflowEvent]): """A list of events generated during the workflow execution in non-streaming mode. @@ -125,10 +118,10 @@ class Workflow(AFBaseModel): """ edge_groups: list[EdgeGroup] = Field( - default_factory=_default_edge_groups, description="List of edge groups that define the workflow edges" + default_factory=list, description="List of edge groups that define the workflow edges" ) executors: dict[str, Executor] = Field( - default_factory=_default_executors, description="Dictionary mapping executor IDs to Executor instances" + default_factory=dict, description="Dictionary mapping executor IDs to Executor instances" ) start_executor_id: str = Field(min_length=1, description="The ID of the starting executor for the workflow") max_iterations: int = Field( @@ -190,20 +183,21 @@ class Workflow(AFBaseModel): # Ensure WorkflowExecutor instances have their workflow field serialized if "executors" in data: - executors_data = cast(dict[str, Any], data["executors"]) - executor_map: dict[str, Executor] = self.executors + executors_data = data["executors"] for executor_id, executor_data in executors_data.items(): # Check if this is a WorkflowExecutor that might be missing its workflow field - if isinstance(executor_data, dict): - executor_dict = cast(dict[str, Any], executor_data) - if executor_dict.get("type") == "WorkflowExecutor" and "workflow" not in executor_dict: - # Get the original executor object and serialize its workflow - original_executor = executor_map.get(executor_id) - if original_executor is not None and hasattr(original_executor, "workflow"): - from ._executor import WorkflowExecutor + if ( + isinstance(executor_data, dict) + and executor_data.get("type") == "WorkflowExecutor" + and "workflow" not in executor_data + ): + # Get the original executor object and serialize its workflow + original_executor = self.executors.get(executor_id) + if original_executor and hasattr(original_executor, "workflow"): + from ._executor import WorkflowExecutor - if isinstance(original_executor, WorkflowExecutor): - executor_dict["workflow"] = original_executor.workflow.model_dump(**kwargs) + if isinstance(original_executor, WorkflowExecutor): + executor_data["workflow"] = original_executor.workflow.model_dump(**kwargs) return data @@ -252,8 +246,12 @@ class Workflow(AFBaseModel): # Add workflow started event (telemetry + surface state to consumers) workflow_tracer.add_workflow_event("workflow.started") # Emit explicit start/status events to the stream - yield WorkflowStartedEvent() - yield WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS) + with _framework_event_origin(): + started = WorkflowStartedEvent() + yield started + with _framework_event_origin(): + in_progress = WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS) + yield in_progress # Reset context for a new run if supported if reset_context: @@ -274,22 +272,34 @@ class Workflow(AFBaseModel): if isinstance(event, RequestInfoEvent) and not emitted_in_progress_pending and not saw_completed: emitted_in_progress_pending = True - yield WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS) + with _framework_event_origin(): + pending_status = WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS) + yield pending_status # Success path: emit a final status based on observed terminal signals if saw_completed: - yield WorkflowStatusEvent(WorkflowRunState.COMPLETED) + with _framework_event_origin(): + terminal_status = WorkflowStatusEvent(WorkflowRunState.COMPLETED) + yield terminal_status elif saw_request: - yield WorkflowStatusEvent(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS) + with _framework_event_origin(): + terminal_status = WorkflowStatusEvent(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS) + yield terminal_status else: - yield WorkflowStatusEvent(WorkflowRunState.IDLE) + with _framework_event_origin(): + terminal_status = WorkflowStatusEvent(WorkflowRunState.IDLE) + yield terminal_status workflow_tracer.add_workflow_event("workflow.completed") except Exception as e: # Surface structured failure details before propagating exception details = WorkflowErrorDetails.from_exception(e) - yield WorkflowFailedEvent(details) - yield WorkflowStatusEvent(WorkflowRunState.FAILED) + with _framework_event_origin(): + failed_event = WorkflowFailedEvent(details) + yield failed_event + with _framework_event_origin(): + failed_status = WorkflowStatusEvent(WorkflowRunState.FAILED) + yield failed_status workflow_tracer.add_workflow_error_event(e) raise diff --git a/python/packages/main/agent_framework/_workflow/_workflow_context.py b/python/packages/main/agent_framework/_workflow/_workflow_context.py index 219f164966..614b088348 100644 --- a/python/packages/main/agent_framework/_workflow/_workflow_context.py +++ b/python/packages/main/agent_framework/_workflow/_workflow_context.py @@ -1,10 +1,19 @@ # Copyright (c) Microsoft. All rights reserved. -from typing import Any, Generic, TypeVar +import logging +from typing import Any, Generic, TypeVar, cast, get_args from opentelemetry.propagate import inject -from ._events import WorkflowEvent +from ._events import ( + WorkflowEvent, + WorkflowEventSource, + WorkflowFailedEvent, + WorkflowLifecycleEvent, + WorkflowStartedEvent, + WorkflowStatusEvent, + WorkflowWarningEvent, +) from ._runner_context import Message, RunnerContext from ._shared_state import SharedState from ._telemetry import workflow_tracer @@ -12,6 +21,20 @@ from ._telemetry import workflow_tracer T_Out = TypeVar("T_Out") +logger = logging.getLogger(__name__) + + +_FRAMEWORK_LIFECYCLE_EVENT_TYPES: tuple[type[WorkflowEvent], ...] = cast( + tuple[type[WorkflowEvent], ...], + tuple(get_args(WorkflowLifecycleEvent)) + or ( + WorkflowStartedEvent, + WorkflowStatusEvent, + WorkflowFailedEvent, + ), +) + + class WorkflowContext(Generic[T_Out]): """Context for executors in a workflow. @@ -77,6 +100,16 @@ class WorkflowContext(Generic[T_Out]): async def add_event(self, event: WorkflowEvent) -> None: """Add an event to the workflow context.""" + if event.origin == WorkflowEventSource.EXECUTOR and isinstance(event, _FRAMEWORK_LIFECYCLE_EVENT_TYPES): + event_name = event.__class__.__name__ + warning_msg = ( + f"Executor '{self._executor_id}' attempted to emit {event_name}, " + "which is reserved for framework lifecycle notifications. The " + "event was ignored." + ) + logger.warning(warning_msg) + await self._runner_context.add_event(WorkflowWarningEvent(warning_msg)) + return await self._runner_context.add_event(event) async def get_shared_state(self, key: str) -> Any: diff --git a/python/packages/main/tests/workflow/test_runner.py b/python/packages/main/tests/workflow/test_runner.py index cfd01f1f08..722856134a 100644 --- a/python/packages/main/tests/workflow/test_runner.py +++ b/python/packages/main/tests/workflow/test_runner.py @@ -5,10 +5,19 @@ from dataclasses import dataclass import pytest -from agent_framework import Executor, WorkflowCompletedEvent, WorkflowContext, WorkflowEvent, handler +from agent_framework import ( + AgentExecutorResponse, + AgentRunResponse, + Executor, + WorkflowCompletedEvent, + WorkflowContext, + WorkflowEvent, + WorkflowEventSource, + handler, +) from agent_framework._workflow._edge import SingleEdgeGroup from agent_framework._workflow._runner import Runner -from agent_framework._workflow._runner_context import InProcRunnerContext, RunnerContext +from agent_framework._workflow._runner_context import InProcRunnerContext, Message, RunnerContext from agent_framework._workflow._shared_state import SharedState @@ -79,6 +88,7 @@ async def test_runner_run_until_convergence(): assert isinstance(event, WorkflowEvent) if isinstance(event, WorkflowCompletedEvent): result = event.data + assert event.origin is WorkflowEventSource.EXECUTOR assert result is not None and result == 10 @@ -148,3 +158,20 @@ async def test_runner_already_running(): pass await asyncio.gather(_run(), _run()) + + +async def test_runner_emits_runner_completion_for_agent_response_without_targets(): + ctx = InProcRunnerContext() + runner = Runner([], {}, SharedState(), ctx) + + await ctx.send_message( + Message( + data=AgentExecutorResponse("agent", AgentRunResponse()), + source_id="agent", + ) + ) + + events: list[WorkflowEvent] = [event async for event in runner.run_until_convergence()] + completions = [e for e in events if isinstance(e, WorkflowCompletedEvent)] + assert completions + assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in completions) diff --git a/python/packages/main/tests/workflow/test_simple_sub_workflow.py b/python/packages/main/tests/workflow/test_simple_sub_workflow.py index 41309db05d..73585fafe9 100644 --- a/python/packages/main/tests/workflow/test_simple_sub_workflow.py +++ b/python/packages/main/tests/workflow/test_simple_sub_workflow.py @@ -3,8 +3,6 @@ import asyncio from dataclasses import dataclass -import pytest - from agent_framework import ( Executor, WorkflowBuilder, @@ -64,7 +62,6 @@ class SimpleParent(Executor): self.result = response -@pytest.mark.asyncio async def test_simple_sub_workflow(): """Test the simplest possible sub-workflow.""" # Create sub-workflow with dummy executor to satisfy validation diff --git a/python/packages/main/tests/workflow/test_sub_workflow.py b/python/packages/main/tests/workflow/test_sub_workflow.py index 7bcb72e54b..3fe8109cb8 100644 --- a/python/packages/main/tests/workflow/test_sub_workflow.py +++ b/python/packages/main/tests/workflow/test_sub_workflow.py @@ -4,7 +4,6 @@ import asyncio from dataclasses import dataclass from typing import Any -import pytest from pydantic import Field from agent_framework import ( @@ -120,7 +119,6 @@ class ParentOrchestrator(Executor): self.results.append(result) -@pytest.mark.asyncio async def test_basic_sub_workflow() -> None: """Test basic sub-workflow execution without interception.""" # Create sub-workflow @@ -185,7 +183,6 @@ async def test_basic_sub_workflow() -> None: assert parent.result.is_valid is True -@pytest.mark.asyncio async def test_sub_workflow_with_interception(): """Test sub-workflow with parent interception of requests.""" # Create sub-workflow @@ -248,7 +245,6 @@ async def test_sub_workflow_with_interception(): assert parent.results[0].reason == "Domain not approved" -@pytest.mark.asyncio async def test_conditional_forwarding() -> None: """Test conditional forwarding with RequestResponse.forward().""" @@ -326,7 +322,6 @@ async def test_conditional_forwarding() -> None: assert parent.result.is_valid is True -@pytest.mark.asyncio async def test_workflow_scoped_interception() -> None: """Test interception scoped to specific sub-workflows.""" diff --git a/python/packages/main/tests/workflow/test_tracing.py b/python/packages/main/tests/workflow/test_tracing.py index 5b280727c4..e0cf624850 100644 --- a/python/packages/main/tests/workflow/test_tracing.py +++ b/python/packages/main/tests/workflow/test_tracing.py @@ -273,7 +273,6 @@ async def test_trace_context_handling(tracing_enabled: Any, span_exporter: InMem assert processing_span.attributes.get("message.type") == "str" -@pytest.mark.asyncio async def test_trace_context_disabled_when_tracing_disabled() -> None: """Test that no trace context is added when tracing is disabled.""" # Tracing should be disabled by default @@ -299,7 +298,6 @@ async def test_trace_context_disabled_when_tracing_disabled() -> None: assert message.source_span_id is None -@pytest.mark.asyncio async def test_end_to_end_workflow_tracing(tracing_enabled: Any, span_exporter: InMemorySpanExporter) -> None: """Test end-to-end tracing including workflow build, execution, and span linking with fan-in edges.""" # Create executors for fan-in scenario @@ -424,7 +422,6 @@ async def test_end_to_end_workflow_tracing(tracing_enabled: Any, span_exporter: assert len(aggregator_span.links) >= 2, f"Expected at least 2 links, got {len(aggregator_span.links)}" -@pytest.mark.asyncio async def test_workflow_error_handling_in_tracing(tracing_enabled: Any, span_exporter: InMemorySpanExporter) -> None: """Test that workflow errors are properly recorded in traces.""" @@ -460,7 +457,6 @@ async def test_workflow_error_handling_in_tracing(tracing_enabled: Any, span_exp assert workflow_span.status.status_code.name == "ERROR" -@pytest.mark.asyncio async def test_message_trace_context_serialization() -> None: """Test that message trace context is properly serialized/deserialized.""" ctx = InProcRunnerContext() @@ -495,7 +491,6 @@ async def test_message_trace_context_serialization() -> None: assert restored_msg.source_span_ids == ["span123"] # Test new format -@pytest.mark.asyncio async def test_workflow_build_error_tracing(tracing_enabled: Any, span_exporter: InMemorySpanExporter) -> None: """Test that build errors are properly recorded in build spans.""" diff --git a/python/packages/main/tests/workflow/test_workflow.py b/python/packages/main/tests/workflow/test_workflow.py index e1b5956c3c..b644ba2cb4 100644 --- a/python/packages/main/tests/workflow/test_workflow.py +++ b/python/packages/main/tests/workflow/test_workflow.py @@ -231,7 +231,7 @@ async def test_fan_out(): events = await workflow.run(NumberMessage(data=0)) - # Each executor will emit two events: ExecutorInvokeEvent and ExecutorCompletedEvent + # Each executor will emit two events: ExecutorInvokedEvent and ExecutorCompletedEvent # executor_b will also emit a WorkflowCompletedEvent assert len(events) == 7 @@ -251,7 +251,7 @@ async def test_fan_out_multiple_completed_events(): events = await workflow.run(NumberMessage(data=0)) - # Each executor will emit two events: ExecutorInvokeEvent and ExecutorCompletedEvent + # Each executor will emit two events: ExecutorInvokedEvent and ExecutorCompletedEvent # executor_a and executor_b will also emit a WorkflowCompletedEvent assert len(events) == 8 @@ -276,7 +276,7 @@ async def test_fan_in(): events = await workflow.run(NumberMessage(data=0)) - # Each executor will emit two events: ExecutorInvokeEvent and ExecutorCompletedEvent + # Each executor will emit two events: ExecutorInvokedEvent and ExecutorCompletedEvent # aggregator will also emit a WorkflowCompletedEvent assert len(events) == 9 @@ -667,10 +667,10 @@ async def test_workflow_with_simple_cycle_and_exit_condition(): ) # Should complete when executor_a reaches its limit # Verify cycling occurred (should have events from both executors) - # Check for ExecutorInvokeEvent and ExecutorCompletedEvent types that have executor_id - from agent_framework import ExecutorCompletedEvent, ExecutorInvokeEvent + # Check for ExecutorInvokedEvent and ExecutorCompletedEvent types that have executor_id + from agent_framework import ExecutorCompletedEvent, ExecutorInvokedEvent - executor_events = [e for e in events if isinstance(e, (ExecutorInvokeEvent, ExecutorCompletedEvent))] + executor_events = [e for e in events if isinstance(e, (ExecutorInvokedEvent, ExecutorCompletedEvent))] executor_ids = {e.executor_id for e in executor_events} assert "exec_a" in executor_ids, "Should have events from executor A" assert "exec_b" in executor_ids, "Should have events from executor B" diff --git a/python/packages/main/tests/workflow/test_workflow_agent.py b/python/packages/main/tests/workflow/test_workflow_agent.py index a194940d27..145ac1c06a 100644 --- a/python/packages/main/tests/workflow/test_workflow_agent.py +++ b/python/packages/main/tests/workflow/test_workflow_agent.py @@ -76,7 +76,6 @@ class RequestingExecutor(Executor): class TestWorkflowAgent: """Test cases for WorkflowAgent end-to-end functionality.""" - @pytest.mark.asyncio async def test_end_to_end_basic_workflow(self): """Test basic end-to-end workflow execution with 2 executors emitting AgentRunEvent.""" # Create workflow with two executors @@ -117,7 +116,6 @@ class TestWorkflowAgent: assert "Step1: Hello World" in step1_text assert "Step2: Step1: Hello World" in step2_text - @pytest.mark.asyncio async def test_end_to_end_basic_workflow_streaming(self): """Test end-to-end workflow with streaming executor that emits AgentRunStreamingEvent.""" # Create a single streaming executor @@ -146,7 +144,6 @@ class TestWorkflowAgent: assert isinstance(second_content, TextContent) assert "Streaming2: Streaming1: Test input" in second_content.text - @pytest.mark.asyncio async def test_end_to_end_request_info_handling(self): """Test end-to-end workflow with RequestInfoEvent handling.""" # Create workflow with requesting executor -> request info executor (no cycle) diff --git a/python/packages/main/tests/workflow/test_workflow_context.py b/python/packages/main/tests/workflow/test_workflow_context.py new file mode 100644 index 0000000000..b78de9af67 --- /dev/null +++ b/python/packages/main/tests/workflow/test_workflow_context.py @@ -0,0 +1,64 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import TYPE_CHECKING + +from agent_framework import ( + WorkflowCompletedEvent, + WorkflowContext, + WorkflowEvent, + WorkflowRunState, + WorkflowStatusEvent, +) + +if TYPE_CHECKING: + from _pytest.logging import LogCaptureFixture + + from agent_framework._workflow._runner_context import InProcRunnerContext + + +@asynccontextmanager +async def make_context( + executor_id: str = "exec", +) -> AsyncIterator[tuple[WorkflowContext[object], "InProcRunnerContext"]]: + from agent_framework._workflow._runner_context import InProcRunnerContext + from agent_framework._workflow._shared_state import SharedState + + runner_ctx = InProcRunnerContext() + shared_state = SharedState() + workflow_ctx: WorkflowContext[object] = WorkflowContext( + executor_id, + ["source"], + shared_state, + runner_ctx, + ) + try: + yield workflow_ctx, runner_ctx + finally: + await asyncio.sleep(0) + + +async def test_executor_cannot_emit_framework_lifecycle_event(caplog: "LogCaptureFixture") -> None: + async with make_context() as (ctx, runner_ctx): + caplog.clear() + with caplog.at_level("WARNING"): + await ctx.add_event(WorkflowStatusEvent(state=WorkflowRunState.IN_PROGRESS)) + + events: list[WorkflowEvent] = await runner_ctx.drain_events() + assert len(events) == 1 + assert type(events[0]).__name__ == "WorkflowWarningEvent" + data = getattr(events[0], "data", None) + assert isinstance(data, str) + assert "reserved for framework lifecycle notifications" in data + assert any("attempted to emit WorkflowStatusEvent" in message for message in list(caplog.messages)) + + +async def test_executor_emits_normal_event() -> None: + async with make_context() as (ctx, runner_ctx): + await ctx.add_event(WorkflowCompletedEvent("done")) + + events: list[WorkflowEvent] = await runner_ctx.drain_events() + assert len(events) == 1 + assert isinstance(events[0], WorkflowCompletedEvent) diff --git a/python/packages/main/tests/workflow/test_workflow_states.py b/python/packages/main/tests/workflow/test_workflow_states.py index 51e16d5afb..c419682c7c 100644 --- a/python/packages/main/tests/workflow/test_workflow_states.py +++ b/python/packages/main/tests/workflow/test_workflow_states.py @@ -14,9 +14,11 @@ from agent_framework import ( WorkflowBuilder, WorkflowCompletedEvent, WorkflowContext, + WorkflowEventSource, WorkflowFailedEvent, WorkflowRunResult, WorkflowRunState, + WorkflowStartedEvent, WorkflowStatusEvent, handler, ) @@ -41,9 +43,12 @@ async def test_executor_failed_and_workflow_failed_events_streaming(): events.append(ev) # Workflow-level failure and FAILED status should be surfaced - assert any(isinstance(e, WorkflowFailedEvent) for e in events) + failed_events = [e for e in events if isinstance(e, WorkflowFailedEvent)] + assert failed_events + assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in failed_events) status = [e for e in events if isinstance(e, WorkflowStatusEvent)] assert status and status[-1].state == WorkflowRunState.FAILED + assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in status) async def test_executor_failed_event_emitted_on_direct_execute(): @@ -58,7 +63,9 @@ async def test_executor_failed_event_emitted_on_direct_execute(): with pytest.raises(RuntimeError, match="boom"): await failing.execute(0, wf_ctx) drained = await ctx.drain_events() - assert any(isinstance(e, ExecutorFailedEvent) for e in drained) + failed = [e for e in drained if isinstance(e, ExecutorFailedEvent)] + assert failed + assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in failed) class Requester(Executor): @@ -99,6 +106,19 @@ async def test_completed_status_streaming(): # Last status should be COMPLETED status = [e for e in events if isinstance(e, WorkflowStatusEvent)] assert status and status[-1].state == WorkflowRunState.COMPLETED + assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in status) + + +async def test_started_and_completed_event_origins(): + c = Completer(id="c-origin") + wf = WorkflowBuilder().set_start_executor(c).build() + events = [ev async for ev in wf.run_stream("payload")] + + started = next(e for e in events if isinstance(e, WorkflowStartedEvent)) + assert started.origin is WorkflowEventSource.FRAMEWORK + + completed = next(e for e in events if isinstance(e, WorkflowCompletedEvent)) + assert completed.origin is WorkflowEventSource.EXECUTOR async def test_non_streaming_final_state_helpers(): diff --git a/python/samples/getting_started/workflow/_start-here/step3_streaming.py b/python/samples/getting_started/workflow/_start-here/step3_streaming.py index 423fcd53da..f5d29e178a 100644 --- a/python/samples/getting_started/workflow/_start-here/step3_streaming.py +++ b/python/samples/getting_started/workflow/_start-here/step3_streaming.py @@ -29,7 +29,8 @@ Purpose: Show how to wrap chat agents created by AzureChatClient inside workflow executors, wire them with WorkflowBuilder, and consume streaming events from the workflow. Demonstrate the @handler pattern with typed inputs and typed WorkflowContext[T] outputs, and finish by emitting a WorkflowCompletedEvent from the terminal node while printing -intermediate events for observability. +intermediate events for observability. The streaming loop also surfaces WorkflowEvent.origin so you can +distinguish runner-generated lifecycle events from executor-generated data-plane events. Prerequisites: - Azure OpenAI configured for AzureChatClient with required environment variables. @@ -123,36 +124,42 @@ async def main(): ChatMessage(role="user", text="Create a slogan for a new electric SUV that is affordable and fun to drive.") ): if isinstance(event, WorkflowStatusEvent): + prefix = f"State ({event.origin.value}): " if event.state == WorkflowRunState.IN_PROGRESS: - print("State: IN_PROGRESS") + print(prefix + "IN_PROGRESS") elif event.state == WorkflowRunState.COMPLETED: - print("State: COMPLETED") + print(prefix + "COMPLETED") elif event.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS: - print("State: IN_PROGRESS_PENDING_REQUESTS (requests in flight)") + print(prefix + "IN_PROGRESS_PENDING_REQUESTS (requests in flight)") elif event.state == WorkflowRunState.IDLE: - print("State: IDLE (no active work)") + print(prefix + "IDLE (no active work)") elif event.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS: - print("State: IDLE_WITH_PENDING_REQUESTS (prompt user or UI now)") + print(prefix + "IDLE_WITH_PENDING_REQUESTS (prompt user or UI now)") else: - print(f"State: {event.state}") + print(prefix + str(event.state)) + elif isinstance(event, WorkflowCompletedEvent): + print(f"Workflow completed ({event.origin.value}): {event.data}") elif isinstance(event, ExecutorFailedEvent): - print(f"Executor failed: {event.executor_id} {event.details.error_type}: {event.details.message}") + print( + f"Executor failed ({event.origin.value}): " + f"{event.executor_id} {event.details.error_type}: {event.details.message}" + ) elif isinstance(event, WorkflowFailedEvent): details = event.details - print(f"Workflow failed: {details.error_type}: {details.message}") + print(f"Workflow failed ({event.origin.value}): {details.error_type}: {details.message}") else: - print(event) + print(f"{event.__class__.__name__} ({event.origin.value}): {event}") """ Sample Output: - State: IN_PROGRESS - ExecutorInvokeEvent(executor_id=writer) - ExecutorCompletedEvent(executor_id=writer) - ExecutorInvokeEvent(executor_id=reviewer) - WorkflowCompletedEvent(data=Drive the Future. Affordable Adventure, Electrified.) - ExecutorCompletedEvent(executor_id=reviewer) - State: COMPLETED + State (RUNNER): IN_PROGRESS + ExecutorInvokeEvent (RUNNER): ExecutorInvokeEvent(executor_id=writer) + ExecutorCompletedEvent (RUNNER): ExecutorCompletedEvent(executor_id=writer) + ExecutorInvokeEvent (RUNNER): ExecutorInvokeEvent(executor_id=reviewer) + Workflow completed (EXECUTOR): Drive the Future. Affordable Adventure, Electrified. + ExecutorCompletedEvent (RUNNER): ExecutorCompletedEvent(executor_id=reviewer) + State (RUNNER): COMPLETED """ diff --git a/python/samples/getting_started/workflow/checkpoint/checkpoint_with_resume.py b/python/samples/getting_started/workflow/checkpoint/checkpoint_with_resume.py index 839b3ad12c..42dbb789fd 100644 --- a/python/samples/getting_started/workflow/checkpoint/checkpoint_with_resume.py +++ b/python/samples/getting_started/workflow/checkpoint/checkpoint_with_resume.py @@ -275,15 +275,15 @@ async def main(): Running workflow with initial message... UpperCaseExecutor: 'hello world' -> 'HELLO WORLD' - Event: ExecutorInvokeEvent(executor_id=upper_case_executor) + Event: ExecutorInvokedEvent(executor_id=upper_case_executor) Event: ExecutorCompletedEvent(executor_id=upper_case_executor) ReverseTextExecutor: 'HELLO WORLD' -> 'DLROW OLLEH' - Event: ExecutorInvokeEvent(executor_id=reverse_text_executor) + Event: ExecutorInvokedEvent(executor_id=reverse_text_executor) Event: ExecutorCompletedEvent(executor_id=reverse_text_executor) LowerAgent (shared_state): original_input='hello world', upper_output='HELLO WORLD' - Event: ExecutorInvokeEvent(executor_id=submit_lower) - Event: ExecutorInvokeEvent(executor_id=lower_agent) - Event: ExecutorInvokeEvent(executor_id=finalize) + Event: ExecutorInvokedEvent(executor_id=submit_lower) + Event: ExecutorInvokedEvent(executor_id=lower_agent) + Event: ExecutorInvokedEvent(executor_id=finalize) Event: WorkflowCompletedEvent(data=dlrow olleh) Checkpoint summary: @@ -300,9 +300,9 @@ async def main(): Resuming from checkpoint: a78c345a-e5d9-45ba-82c0-cb725452d91b LowerAgent (shared_state): original_input='hello world', upper_output='HELLO WORLD' - Resumed Event: ExecutorInvokeEvent(executor_id=submit_lower) - Resumed Event: ExecutorInvokeEvent(executor_id=lower_agent) - Resumed Event: ExecutorInvokeEvent(executor_id=finalize) + Resumed Event: ExecutorInvokedEvent(executor_id=submit_lower) + Resumed Event: ExecutorInvokedEvent(executor_id=lower_agent) + Resumed Event: ExecutorInvokedEvent(executor_id=finalize) Resumed Event: WorkflowCompletedEvent(data=dlrow olleh) """ # noqa: E501 diff --git a/python/samples/getting_started/workflow/control-flow/sequential_streaming.py b/python/samples/getting_started/workflow/control-flow/sequential_streaming.py index bf194ba901..fe6056cfa6 100644 --- a/python/samples/getting_started/workflow/control-flow/sequential_streaming.py +++ b/python/samples/getting_started/workflow/control-flow/sequential_streaming.py @@ -13,7 +13,7 @@ The second reverses the text and completes the workflow. Events are printed as t Purpose: Show how to declare executors with the @executor decorator, connect them with WorkflowBuilder, pass intermediate values using ctx.send_message, and signal completion with ctx.add_event by emitting a -WorkflowCompletedEvent. Demonstrate how streaming exposes ExecutorInvokeEvent and WorkflowCompletedEvent +WorkflowCompletedEvent. Demonstrate how streaming exposes ExecutorInvokedEvent and WorkflowCompletedEvent for observability. Prerequisites: @@ -72,9 +72,9 @@ async def main(): """ Sample Output: - Event: ExecutorInvokeEvent(executor_id=upper_case_executor) + Event: ExecutorInvokedEvent(executor_id=upper_case_executor) Event: ExecutorCompletedEvent(executor_id=upper_case_executor) - Event: ExecutorInvokeEvent(executor_id=reverse_text_executor) + Event: ExecutorInvokedEvent(executor_id=reverse_text_executor) Event: WorkflowCompletedEvent(data=DLROW OLLEH) Event: ExecutorCompletedEvent(executor_id=reverse_text_executor) Workflow completed with result: DLROW OLLEH