[BREAKING] Python: Refactor workflow events to unified discriminated union pattern (#3690)

* Refactor events

* Merge main

* Fixes

* Cleanup

* Update samples and tests

* Remove unused imports

* PR feedback

* Merge main. Add properties for events to help typing

* Formatting

* Cleanup

* use builtins.type to avoid shadowing by WorkflowEvent.type attribute

* Final improvements
This commit is contained in:
Evan Mattson
2026-02-06 16:47:20 +09:00
committed by GitHub
Unverified
parent 09f59b21ad
commit 0f3f4dbcaf
127 changed files with 1646 additions and 1703 deletions
@@ -7,8 +7,7 @@ import logging
from collections.abc import AsyncGenerator
from typing import Any
from agent_framework import AgentProtocol, Content
from agent_framework._workflows._events import RequestInfoEvent
from agent_framework import AgentProtocol, Content, Workflow
from ._conversations import ConversationStore, InMemoryConversationStore
from ._discovery import EntityDiscovery
@@ -262,10 +261,11 @@ class AgentFrameworkExecutor:
yield event
elif entity_info.type == "workflow":
async for event in self._execute_workflow(entity_obj, request, trace_collector):
# Log RequestInfoEvent for debugging HIL flow
event_class = event.__class__.__name__ if hasattr(event, "__class__") else type(event).__name__
if event_class == "RequestInfoEvent":
logger.info("🔔 [EXECUTOR] RequestInfoEvent detected from workflow!")
# Log request_info event (type='request_info') for debugging HIL flow
if event.type == "request_info":
logger.info(
"🔔 [EXECUTOR] request_info event (type='request_info') detected from workflow!"
)
logger.info(f" request_id: {getattr(event, 'request_id', 'N/A')}")
logger.info(f" source_executor_id: {getattr(event, 'source_executor_id', 'N/A')}")
logger.info(f" request_type: {getattr(event, 'request_type', 'N/A')}")
@@ -360,7 +360,7 @@ class AgentFrameworkExecutor:
yield {"type": "error", "message": f"Agent execution error: {e!s}"}
async def _execute_workflow(
self, workflow: Any, request: AgentFrameworkRequest, trace_collector: Any
self, workflow: Workflow, request: AgentFrameworkRequest, trace_collector: Any
) -> AsyncGenerator[Any, None]:
"""Execute Agent Framework workflow with checkpoint support via conversation items.
@@ -515,8 +515,9 @@ class AgentFrameworkExecutor:
logger.warning(f"Could not convert HIL responses to proper types: {e}")
async for event in workflow.send_responses_streaming(hil_responses):
# Enrich new RequestInfoEvents that may come from subsequent HIL requests
if isinstance(event, RequestInfoEvent):
# Enrich new request_info events (type='request_info')
# that may come from subsequent HIL requests
if event.type == "request_info":
self._enrich_request_info_event_with_response_schema(event, workflow)
for trace_event in trace_collector.get_pending_events():
@@ -538,7 +539,7 @@ class AgentFrameworkExecutor:
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage,
):
if isinstance(event, RequestInfoEvent):
if event.type == "request_info":
self._enrich_request_info_event_with_response_schema(event, workflow)
for trace_event in trace_collector.get_pending_events():
@@ -546,7 +547,7 @@ class AgentFrameworkExecutor:
yield event
# Note: Removed break on RequestInfoEvent - continue yielding all events
# Note: Removed break on request_info event (type='request_info') - continue yielding all events
# The workflow is already paused by ctx.request_info() in the framework
# DevUI should continue yielding events even during HIL pause
@@ -562,7 +563,7 @@ class AgentFrameworkExecutor:
parsed_input = await self._parse_workflow_input(workflow, request.input)
async for event in workflow.run(parsed_input, stream=True, checkpoint_storage=checkpoint_storage):
if isinstance(event, RequestInfoEvent):
if event.type == "request_info":
self._enrich_request_info_event_with_response_schema(event, workflow)
for trace_event in trace_collector.get_pending_events():
@@ -570,7 +571,7 @@ class AgentFrameworkExecutor:
yield event
# Note: Removed break on RequestInfoEvent - continue yielding all events
# Note: Removed break on request_info event (type='request_info') - continue yielding all events
# The workflow is already paused by ctx.request_info() in the framework
# DevUI should continue yielding events even during HIL pause
@@ -1015,10 +1016,12 @@ class AgentFrameworkExecutor:
return raw_input
def _enrich_request_info_event_with_response_schema(self, event: Any, workflow: Any) -> None:
"""Extract response type from workflow executor and attach response schema to RequestInfoEvent.
"""Extract response type from workflow executor.
Attach response schema to request_info event (type='request_info').
Args:
event: RequestInfoEvent to enrich
event: request_info event (type='request_info') to enrich
workflow: Workflow object containing executors
"""
try:
@@ -1029,7 +1032,7 @@ class AgentFrameworkExecutor:
request_type = getattr(event, "request_type", None)
if not source_executor_id or not request_type:
logger.debug("RequestInfoEvent missing source_executor_id or request_type")
logger.debug("request_info event (type='request_info') missing source_executor_id or request_type")
return
# Find the source executor in the workflow
@@ -1062,4 +1065,4 @@ class AgentFrameworkExecutor:
event._response_schema = response_schema
except Exception as e:
logger.warning(f"Failed to enrich RequestInfoEvent with response schema: {e}")
logger.warning(f"Failed to enrich request_info event (type='request_info') with response schema: {e}")
@@ -12,7 +12,7 @@ from datetime import datetime
from typing import Any, Union
from uuid import uuid4
from agent_framework import ChatMessage, Content, WorkflowOutputEvent
from agent_framework import ChatMessage, Content
from openai.types.responses import (
Response,
ResponseContentPartAddedEvent,
@@ -180,16 +180,18 @@ class MessageMapper:
try:
from agent_framework import AgentResponse, AgentResponseUpdate, WorkflowEvent
# Handle AgentRunUpdateEvent - workflow event wrapping AgentResponseUpdate
# Handle WorkflowEvent with type='output' or 'data' wrapping AgentResponseUpdate
# This must be checked BEFORE generic WorkflowEvent check
if isinstance(raw_event, WorkflowOutputEvent):
# Extract the AgentResponseUpdate from the event's data attribute
if raw_event.data and isinstance(raw_event.data, AgentResponseUpdate):
# Preserve executor_id in context for proper output routing
context["current_executor_id"] = raw_event.executor_id
return await self._convert_agent_update(raw_event.data, context)
# If no data, treat as generic workflow event
return await self._convert_workflow_event(raw_event, context)
# Note: AgentExecutor uses type='output' for streaming updates
if (
isinstance(raw_event, WorkflowEvent)
and raw_event.type in ("output", "data")
and raw_event.data
and isinstance(raw_event.data, AgentResponseUpdate)
):
# Preserve executor_id in context for proper output routing
context["current_executor_id"] = raw_event.executor_id
return await self._convert_agent_update(raw_event.data, context)
# Handle complete agent response (AgentResponse) - for non-streaming agent execution
if isinstance(raw_event, AgentResponse):
@@ -824,10 +826,12 @@ class MessageMapper:
List of OpenAI response stream events
"""
try:
event_class = event.__class__.__name__
# Use event.type for discriminated union pattern (similar to Content class)
event_type = getattr(event, "type", None)
event_class = event.__class__.__name__ # Fallback for non-workflow events
# Response-level events - construct proper OpenAI objects
if event_class == "WorkflowStartedEvent":
if event_type == "started":
workflow_id = getattr(event, "workflow_id", str(uuid4()))
context["workflow_id"] = workflow_id
@@ -871,8 +875,8 @@ class MessageMapper:
return events
# Handle WorkflowOutputEvent separately to preserve output data
if event_class == "WorkflowOutputEvent":
# Handle output events separately to preserve output data
if event_type == "output":
output_data = getattr(event, "data", None)
executor_id = getattr(event, "executor_id", "unknown")
@@ -934,7 +938,7 @@ class MessageMapper:
# Emit output_item.added for each yield_output
logger.debug(
f"WorkflowOutputEvent converted to output_item.added "
f"output event (type='output') converted to output_item.added "
f"(executor: {executor_id}, length: {len(text)})"
)
return [
@@ -946,15 +950,15 @@ class MessageMapper:
)
]
# Handle WorkflowCompletedEvent - Don't emit response.completed here
# Handle completed event - Don't emit response.completed here
# The server will emit a proper one with usage data after aggregating all events
if event_class == "WorkflowCompletedEvent":
if event_type == "completed":
return []
if event_class == "WorkflowFailedEvent":
if event_type == "failed":
workflow_id = context.get("workflow_id", str(uuid4()))
# WorkflowFailedEvent uses 'details' field (WorkflowErrorDetails), not 'error'
# This matches ExecutorFailedEvent which also uses 'details'
# failed event (type='failed') uses 'details' field (WorkflowErrorDetails), not 'error'
# This matches executor_failed event which also uses 'details'
details = getattr(event, "details", None)
# Import Response and ResponseError types
@@ -1000,7 +1004,8 @@ class MessageMapper:
]
# Executor-level events (output items)
if event_class == "ExecutorInvokedEvent":
# Check for executor lifecycle events via event.type
if event_type == "executor_invoked":
executor_id = getattr(event, "executor_id", "unknown")
item_id = f"exec_{executor_id}_{uuid4().hex[:8]}"
context[f"exec_item_{executor_id}"] = item_id
@@ -1029,7 +1034,7 @@ class MessageMapper:
)
]
if event_class == "ExecutorCompletedEvent":
if event_type == "executor_completed":
executor_id = getattr(event, "executor_id", "unknown")
item_id = context.get(f"exec_item_{executor_id}", f"exec_{executor_id}_unknown")
@@ -1038,7 +1043,7 @@ class MessageMapper:
context.pop("current_executor_id", None)
# Create ExecutorActionItem with completed status
# ExecutorCompletedEvent uses 'data' field, not 'result'
# executor_completed event (type='executor_completed') uses 'data' field, not 'result'
# Serialize the result data to ensure it's JSON-serializable
# (AgentExecutorResponse contains AgentResponse/ChatMessage which are SerializationMixin)
raw_result = getattr(event, "data", None)
@@ -1061,10 +1066,11 @@ class MessageMapper:
)
]
if event_class == "ExecutorFailedEvent":
if event_type == "executor_failed":
executor_id = getattr(event, "executor_id", "unknown")
item_id = context.get(f"exec_item_{executor_id}", f"exec_{executor_id}_unknown")
# ExecutorFailedEvent uses 'details' field (WorkflowErrorDetails), not 'error'
# executor_failed event (type='executor_failed') uses 'details' property (WorkflowErrorDetails)
# not 'error'. This matches WorkflowEvent.details which returns self.data for executor_failed type
details = getattr(event, "details", None)
if details:
err_msg = getattr(details, "message", None) or str(details)
@@ -1093,8 +1099,8 @@ class MessageMapper:
)
]
# Handle RequestInfoEvent specially - emit as HIL event with schema
if event_class == "RequestInfoEvent":
# Handle request_info events specially - emit as HIL event with schema
if event_type == "request_info":
from .models._openai_custom import ResponseRequestInfoEvent
request_id = getattr(event, "request_id", "")
@@ -1102,7 +1108,7 @@ class MessageMapper:
request_type_class = getattr(event, "request_type", None)
request_data = getattr(event, "data", None)
logger.info("📨 [MAPPER] Processing RequestInfoEvent")
logger.info("📨 [MAPPER] Processing request_info event (type='request_info')")
logger.info(f" request_id: {request_id}")
logger.info(f" source_executor_id: {source_executor_id}")
logger.info(f" request_type_class: {request_type_class}")
@@ -1163,26 +1169,23 @@ class MessageMapper:
return [hil_event]
# Handle other informational workflow events (status, warnings, errors)
if event_class in ["WorkflowStatusEvent", "WorkflowWarningEvent", "WorkflowErrorEvent"]:
if event_type in ["status", "warning", "error"]:
# These are informational events that don't map to OpenAI lifecycle events
# Convert them to trace events for debugging visibility
event_data: dict[str, Any] = {}
# Extract relevant data based on event type
if event_class == "WorkflowStatusEvent":
if event_type == "status":
event_data["state"] = str(getattr(event, "state", "unknown"))
elif event_class == "WorkflowWarningEvent":
event_data["message"] = str(getattr(event, "message", ""))
elif event_class == "WorkflowErrorEvent":
event_data["message"] = str(getattr(event, "message", ""))
event_data["error"] = str(getattr(event, "error", ""))
elif event_type == "warning" or event_type == "error":
event_data["message"] = str(getattr(event, "data", ""))
# Create a trace event for debugging
trace_event = ResponseTraceEventComplete(
type="response.trace.completed",
data={
"trace_type": "workflow_info",
"event_type": event_class,
"event_type": event_type,
"data": event_data,
"timestamp": datetime.now().isoformat(),
},