Files
agent-framework/python/packages/devui/tests/devui/test_mapper.py
T
Evan Mattson 27f926609f Python: Fix incorrect workflow timings in DevUI by adding created_at to executor events (#5615)
* fix(devui): add created_at to custom output item events for correct workflow timings (#5545)

CustomResponseOutputItemAddedEvent and CustomResponseOutputItemDoneEvent lacked a
created_at field, causing the frontend to synthesize timestamps using integer-second
precision with a forced +1s minimum gap between events. This made instant workflows
appear to take 3+ seconds in the DevUI timeline.

Fix:
- Add optional created_at: float | None field to both custom event models
- Populate created_at=float(time.time()) in the mapper for executor_invoked,
  executor_completed, and executor_failed events

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(devui): use event created_at for accurate workflow timeline timings

workflow-view.tsx synthesized _uiTimestamp using Math.max(baseTimestamp,
lastTimestamp + 1) with integer-second precision, forcing a minimum 1-second
gap between every sequential event. This made instant workflows appear to take
several seconds in the DevUI timeline.

The fix prefers event.created_at (a float Unix timestamp populated by the
backend mapper for all executor events) and only falls back to the synthetic
timestamp when created_at is absent. This matches the pattern already used in
devuiStore.ts:addDebugEvent.

Added a regression test in test_mapper.py verifying that the mapper attaches
created_at to all executor lifecycle events (invoked, completed, failed).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(devui): address review feedback for issue #5545

- Read data.timestamp (ISO string) and response.created_at in addition
  to top-level created_at when deriving _uiTimestamp, so
  response.workflow_event.completed events get a real server timestamp
  instead of a synthesized one
- Change uniqueTimestamp tiebreaker: when a real server timestamp is
  available use Math.max(eventTimestamp, lastTimestamp) rather than
  lastTimestamp + 1, eliminating artificial 1-second gaps while still
  preserving monotonic ordering
- Apply the same fix in the HIL streaming path (second setOpenAIEvents
  call in workflow-view.tsx)
- Add assert event.created_at > 0 to regression test to guard against
  zero or negative timestamps
- Add test_custom_output_item_event_models_have_created_at_field model-
  level test so removing the field produces a clear named failure rather
  than a downstream ValidationError

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(#5545): guard NaN timestamps, fix fallback ID uniqueness, add regression tests

- workflow-view.tsx (×2): Wrap data.timestamp ISO→number conversion in a
  Number.isFinite() guard.  Python's datetime.now().isoformat() emits
  microseconds without a trailing 'Z' (e.g. '2024-01-15T12:34:56.123456'),
  which some JS engines cannot parse, returning NaN.  NaN !== undefined is
  true so the eventTimestamp !== undefined guard did not catch it, poisoning
  _uiTimestamp and resetting the monotonic ordering seed (NaN || 0 → 0).

- execution-timeline.tsx: Replace uiTimestamp in the fallback syntheticItemId
  with the per-executor runNumber counter.  Two runs of the same executor
  within the same second previously received identical _uiTimestamp values
  and therefore identical syntheticItemIds, causing their output buckets,
  state, and run entries to collide (execution-timeline.tsx:360–408).

- Add missing test_workflow_timings_bug.py source file (only a stale .pyc
  existed).  Three regression tests:
    · test_custom_event_models_lack_created_at_field – model field guard
    · test_workflow_executor_events_lack_created_at – mapper populates created_at
    · test_rapid_workflow_events_have_no_top_level_timestamps – confirms
      data.timestamp format that requires the frontend NaN guard

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review feedback for #5545: Python: [Bug]: Workflow timings in DevUI are incorrect

* devui: move timing regression tests into test_mapper.py, remove dedicated bug file

- Delete test_workflow_timings_bug.py; tests belong in existing module files
- The two tests already present in test_mapper.py (test_executor_events_carry_created_at_timestamp
  and test_custom_output_item_event_models_have_created_at_field) cover the same ground as the
  first two tests in the deleted file
- Add test_executor_completed_maps_to_output_item_done_event to test_mapper.py, replacing the
  third test from the deleted file with a generic, issue-agnostic name and docstring

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review feedback for #5545: review comment fixes

---------

Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-05 05:59:08 +00:00

842 lines
35 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Tests for message mapping functionality.
This module tests the MessageMapper which converts Agent Framework events
to OpenAI-compatible streaming events. Tests use REAL classes from
agent_framework, not mocks, to ensure proper serialization.
"""
from typing import Any
import pytest
# Import Agent Framework types
from agent_framework._types import (
AgentResponseUpdate,
Content,
)
# Import real workflow event classes - NOT mocks!
from agent_framework._workflows._events import (
WorkflowEvent,
WorkflowRunState,
)
# Import factory functions from conftest for parameterized test data creation
from conftest import (
create_agent_run_response,
create_executor_completed_event,
create_executor_failed_event,
create_executor_invoked_event,
)
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import (
AgentCompletedEvent,
AgentFailedEvent,
AgentFrameworkRequest,
AgentStartedEvent,
)
# Note: mapper and test_request fixtures are provided by conftest.py
# =============================================================================
# Test Helpers
# =============================================================================
def create_test_content(content_type: str, **kwargs: Any) -> Any:
"""Create test content objects."""
if content_type == "text":
return Content.from_text(text=kwargs.get("text", "Hello, world!"))
if content_type == "function_call":
return Content.from_function_call(
call_id=kwargs.get("call_id", "test_call_id"),
name=kwargs.get("name", "test_func"),
arguments=kwargs.get("arguments", {"param": "value"}),
)
if content_type == "error":
return Content.from_error(
message=kwargs.get("message", "Test error"), error_code=kwargs.get("code", "test_error")
)
raise ValueError(f"Unknown content type: {content_type}")
def create_test_agent_update(contents: list[Any]) -> AgentResponseUpdate:
"""Create test AgentResponseUpdate."""
return AgentResponseUpdate(contents=contents, role="assistant", message_id="test_msg", response_id="test_resp")
# =============================================================================
# Basic Content Mapping Tests
# =============================================================================
async def test_critical_isinstance_bug_detection(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""CRITICAL: Test that would have caught the isinstance vs hasattr bug."""
content = create_test_content("text", text="Bug detection test")
update = create_test_agent_update([content])
# Key assertions that would have caught the bug
assert hasattr(update, "contents") # Real attribute
assert not hasattr(update, "response") # Fake attribute should not exist
# Test isinstance works with real types
assert isinstance(update, AgentResponseUpdate)
# Test mapper conversion - should NOT produce "Unknown event"
events = await mapper.convert_event(update, test_request)
assert len(events) > 0
assert all(hasattr(event, "type") for event in events)
assert all(event.type != "unknown" for event in events)
async def test_text_content_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test TextContent mapping with proper OpenAI event hierarchy."""
content = create_test_content("text", text="Hello, clean test!")
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
# With proper OpenAI hierarchy, we expect 3 events:
# 1. response.output_item.added (message)
# 2. response.content_part.added (text part)
# 3. response.output_text.delta (actual text)
assert len(events) == 3
# Check message output item
assert events[0].type == "response.output_item.added"
assert events[0].item.type == "message"
assert events[0].item.role == "assistant"
# Check content part
assert events[1].type == "response.content_part.added"
assert events[1].part.type == "output_text"
# Check text delta
assert events[2].type == "response.output_text.delta"
assert events[2].delta == "Hello, clean test!"
async def test_function_call_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test FunctionCallContent mapping."""
content = create_test_content("function_call", name="test_func", arguments={"location": "TestCity"})
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
# Should generate: response.output_item.added + response.function_call_arguments.delta
assert len(events) >= 2
assert events[0].type == "response.output_item.added"
assert events[1].type == "response.function_call_arguments.delta"
# Check JSON is in delta event
delta_events = [e for e in events if e.type == "response.function_call_arguments.delta"]
full_json = "".join(event.delta for event in delta_events)
assert "TestCity" in full_json
async def test_function_result_content_with_string_result(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test FunctionResultContent with plain string result (regular tools)."""
content = Content.from_function_result(
call_id="test_call_123",
result="Hello, World!",
)
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
assert len(events) >= 1
result_events = [e for e in events if e.type == "response.function_result.complete"]
assert len(result_events) == 1
assert result_events[0].output == "Hello, World!"
assert result_events[0].call_id == "test_call_123"
assert result_events[0].status == "completed"
async def test_function_result_content_with_nested_content_objects(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test FunctionResultContent with nested Content objects (MCP tools case)."""
content = Content.from_function_result(
call_id="mcp_call_456",
result=[Content.from_text(text="Hello from MCP!")],
)
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
assert len(events) >= 1
result_events = [e for e in events if e.type == "response.function_result.complete"]
assert len(result_events) == 1
assert "Hello from MCP!" in result_events[0].output
assert result_events[0].call_id == "mcp_call_456"
async def test_error_content_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test ErrorContent mapping."""
content = create_test_content("error", message="Test error", code="test_code")
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
assert len(events) == 1
assert events[0].type == "error"
assert events[0].message == "Test error"
assert events[0].code == "test_code"
async def test_mixed_content_types(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test multiple content types together."""
contents = [
create_test_content("text", text="Starting..."),
create_test_content("function_call", name="process", arguments={"data": "test"}),
create_test_content("text", text="Done!"),
]
update = create_test_agent_update(contents)
events = await mapper.convert_event(update, test_request)
assert len(events) >= 3
event_types = {event.type for event in events}
assert "response.output_text.delta" in event_types
assert "response.function_call_arguments.delta" in event_types
# =============================================================================
# Agent Lifecycle Event Tests
# =============================================================================
async def test_agent_lifecycle_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that agent lifecycle events are properly converted to OpenAI format."""
# Test AgentStartedEvent
start_event = AgentStartedEvent()
events = await mapper.convert_event(start_event, test_request)
assert len(events) == 2 # response.created and response.in_progress
assert events[0].type == "response.created"
assert events[1].type == "response.in_progress"
assert events[0].response.model == "devui"
assert events[0].response.status == "in_progress"
# Test AgentCompletedEvent
complete_event = AgentCompletedEvent()
events = await mapper.convert_event(complete_event, test_request)
# AgentCompletedEvent no longer emits response.completed to avoid duplicates
assert len(events) == 0
# Test AgentFailedEvent
error = Exception("Test error")
failed_event = AgentFailedEvent(error=error)
events = await mapper.convert_event(failed_event, test_request)
assert len(events) == 1
assert events[0].type == "response.failed"
assert events[0].response.status == "failed"
assert events[0].response.error.message == "Test error"
async def test_agent_run_response_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that mapper handles complete AgentResponse (non-streaming)."""
response = create_agent_run_response("Complete response from run()")
events = await mapper.convert_event(response, test_request)
assert len(events) > 0
text_events = [e for e in events if e.type == "response.output_text.delta"]
assert len(text_events) > 0
assert text_events[0].delta == "Complete response from run()"
# =============================================================================
# Workflow Executor Event Tests (using REAL classes, not mocks!)
# =============================================================================
async def test_executor_invoked_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test WorkflowEvent(type='executor_invoked') using the REAL class from agent_framework."""
# Use real class, not mock!
event = create_executor_invoked_event(executor_id="exec_123")
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
# Access as dict since item might be ExecutorActionItem
item = events[0].item if isinstance(events[0].item, dict) else events[0].item.model_dump()
assert item["type"] == "executor_action"
assert item["executor_id"] == "exec_123"
assert item["status"] == "in_progress"
async def test_executor_completed_event_simple_data(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test WorkflowEvent(type='executor_completed') with simple dict data."""
# Create event with simple data
event = WorkflowEvent.executor_completed(executor_id="exec_123", data={"simple": "result"})
# First need to invoke the executor to set up context
invoke_event = create_executor_invoked_event(executor_id="exec_123")
await mapper.convert_event(invoke_event, test_request)
# Now complete it
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.done"
item = events[0].item if isinstance(events[0].item, dict) else events[0].item.model_dump()
assert item["type"] == "executor_action"
assert item["executor_id"] == "exec_123"
assert item["status"] == "completed"
# Result should be serialized
assert item["result"] == {"simple": "result"}
async def test_executor_completed_event_with_agent_response(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test WorkflowEvent(type='executor_completed') with nested AgentExecutorResponse.
This is a REGRESSION TEST for the serialization bug where
WorkflowEvent.data contained AgentExecutorResponse with nested
AgentResponse and Message objects (SerializationMixin) that
Pydantic couldn't serialize.
"""
# Create event with realistic nested data - the exact structure that caused the bug
event = create_executor_completed_event(executor_id="exec_agent", with_agent_response=True)
# Verify the data has the problematic structure
assert hasattr(event.data, "agent_response")
assert hasattr(event.data, "full_conversation")
# First invoke the executor
invoke_event = create_executor_invoked_event(executor_id="exec_agent")
await mapper.convert_event(invoke_event, test_request)
# Now complete - this should NOT raise serialization errors
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.done"
# Get the item (might be Pydantic model or dict)
item = events[0].item if isinstance(events[0].item, dict) else events[0].item.model_dump()
assert item["type"] == "executor_action"
assert item["executor_id"] == "exec_agent"
assert item["status"] == "completed"
# The result should be serialized (converted to dict)
result = item["result"]
assert result is not None
# Should be a dict or list, not the original object
assert isinstance(result, (dict, list))
async def test_executor_completed_event_serialization_to_json(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""REGRESSION TEST: Verify the full JSON serialization works.
This tests the exact failure mode from the bug: calling model_dump_json()
on the event containing nested SerializationMixin objects.
"""
# Create the problematic event
event = create_executor_completed_event(executor_id="exec_json_test", with_agent_response=True)
# Invoke first
invoke_event = create_executor_invoked_event(executor_id="exec_json_test")
await mapper.convert_event(invoke_event, test_request)
# Complete
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
done_event = events[0]
# This is the critical test - model_dump_json() should NOT raise
# "Unable to serialize unknown type: <class 'agent_framework._types.AgentResponse'>"
try:
json_str = done_event.model_dump_json()
assert json_str is not None
assert len(json_str) > 0
# Verify it's valid JSON by checking it contains expected fields
assert "executor_action" in json_str
assert "exec_json_test" in json_str
assert "completed" in json_str
except Exception as e:
pytest.fail(f"model_dump_json() raised an exception: {e}")
async def test_executor_failed_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test WorkflowEvent(type='executor_failed') using the REAL class."""
# First invoke the executor
invoke_event = create_executor_invoked_event(executor_id="exec_fail")
await mapper.convert_event(invoke_event, test_request)
# Now fail it
event = create_executor_failed_event(executor_id="exec_fail", error_message="Executor failed")
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.done"
item = events[0].item if isinstance(events[0].item, dict) else events[0].item.model_dump()
assert item["type"] == "executor_action"
assert item["executor_id"] == "exec_fail"
assert item["status"] == "failed"
assert "Executor failed" in str(item["error"])
async def test_executor_events_carry_created_at_timestamp(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""REGRESSION TEST: Executor mapped events must include a created_at timestamp.
Without created_at, the frontend synthesizes timestamps using
Math.max(baseTimestamp, lastTimestamp + 1) with second precision, forcing
a minimum 1-second gap between sequential events regardless of their actual
elapsed time. This makes instant workflows appear to take multiple seconds
in the DevUI timeline.
"""
invoke_event = create_executor_invoked_event(executor_id="exec_ts")
complete_event = create_executor_completed_event(executor_id="exec_ts")
fail_event = create_executor_failed_event(executor_id="exec_ts_fail")
invoked_results = await mapper.convert_event(invoke_event, test_request)
completed_results = await mapper.convert_event(complete_event, test_request)
# Set up a separate context for the failed path
mapper2 = MessageMapper()
await mapper2.convert_event(create_executor_invoked_event(executor_id="exec_ts_fail"), test_request)
failed_results = await mapper2.convert_event(fail_event, test_request)
for label, results in [
("executor_invoked", invoked_results),
("executor_completed", completed_results),
("executor_failed", failed_results),
]:
assert results, f"mapper.convert_event should return events for {label}"
for event in results:
assert getattr(event, "created_at", None) is not None, (
f"{label} mapped event {type(event).__name__} is missing 'created_at'. "
"The frontend relies on this field for accurate workflow timeline timings."
)
assert event.created_at > 0, (
f"{label} mapped event {type(event).__name__} has a non-positive "
f"created_at value ({event.created_at!r}); expected a valid Unix timestamp."
)
def test_custom_output_item_event_models_have_created_at_field() -> None:
"""MODEL TEST: CustomResponseOutputItemAddedEvent and Done must declare created_at.
This guards against accidentally removing the field from the model definition.
A missing field causes a downstream ValidationError instead of a clear test failure.
"""
from agent_framework_devui.models._openai_custom import (
CustomResponseOutputItemAddedEvent,
CustomResponseOutputItemDoneEvent,
)
assert "created_at" in CustomResponseOutputItemAddedEvent.model_fields, (
"CustomResponseOutputItemAddedEvent is missing 'created_at' in model_fields. "
"The frontend uses this field for accurate workflow timeline timings."
)
assert "created_at" in CustomResponseOutputItemDoneEvent.model_fields, (
"CustomResponseOutputItemDoneEvent is missing 'created_at' in model_fields. "
"The frontend uses this field for accurate workflow timeline timings."
)
async def test_executor_completed_maps_to_output_item_done_event(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test executor_completed events are mapped to CustomResponseOutputItemDoneEvent.
Ensures executor_completed does not fall through to the legacy
ResponseWorkflowEventComplete path, which lacks a top-level created_at field.
"""
from agent_framework_devui.models._openai_custom import ResponseWorkflowEventComplete
invoke_event = create_executor_invoked_event(executor_id="exec_output_item")
await mapper.convert_event(invoke_event, test_request)
complete_event = create_executor_completed_event(executor_id="exec_output_item")
results = await mapper.convert_event(complete_event, test_request)
assert results, "mapper.convert_event should return events for executor_completed"
workflow_events = [r for r in results if isinstance(r, ResponseWorkflowEventComplete)]
assert not workflow_events, (
"executor_completed should map to CustomResponseOutputItemDoneEvent, not ResponseWorkflowEventComplete."
)
output_item_done = [r for r in results if r.type == "response.output_item.done"]
assert output_item_done, f"Expected at least one response.output_item.done event; got: {[r.type for r in results]}"
# =============================================================================
# Workflow Lifecycle Event Tests
# =============================================================================
async def test_workflow_started_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test WorkflowEvent(type='started') using the REAL class."""
event = WorkflowEvent.started()
events = await mapper.convert_event(event, test_request)
# WorkflowEvent(type='started') should emit response.created and response.in_progress
assert len(events) == 2
assert events[0].type == "response.created"
assert events[1].type == "response.in_progress"
async def test_workflow_status_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test WorkflowEvent(type='status') using the REAL class."""
event = WorkflowEvent.status(state=WorkflowRunState.IN_PROGRESS)
events = await mapper.convert_event(event, test_request)
# Should emit some status-related event
assert len(events) >= 0 # May emit events or may be filtered
# =============================================================================
# Magentic Event Tests - Testing WorkflowEvent[AgentResponseUpdate] with additional_properties
# =============================================================================
async def test_magentic_executor_event_with_agent_delta_metadata(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test that WorkflowEvent[AgentResponseUpdate] with magentic_event_type='agent_delta' is handled correctly.
This tests the ACTUAL event format Magentic emits - not a fake MagenticAgentDeltaEvent class.
Magentic uses WorkflowEvent.emit() with additional_properties containing magentic_event_type.
"""
from agent_framework._types import AgentResponseUpdate
from agent_framework._workflows._events import WorkflowEvent
# Create the REAL event format that Magentic emits
update = AgentResponseUpdate(
contents=[Content.from_text(text="Hello from agent")],
role="assistant",
author_name="Writer",
additional_properties={
"magentic_event_type": "agent_delta",
"agent_id": "writer_agent",
},
)
event = WorkflowEvent.emit(executor_id="magentic_executor", data=update)
events = await mapper.convert_event(event, test_request)
# Should be treated as a regular WorkflowEvent[AgentResponseUpdate] with text content
# The mapper should emit text delta events
assert len(events) >= 1
text_events = [e for e in events if getattr(e, "type", "") == "response.output_text.delta"]
assert len(text_events) >= 1
assert text_events[0].delta == "Hello from agent"
async def test_magentic_orchestrator_message_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that WorkflowEvent[AgentResponseUpdate] with magentic_event_type='orchestrator_message' is handled.
Magentic emits orchestrator planning/instruction messages using WorkflowEvent.emit()
with additional_properties containing magentic_event_type='orchestrator_message'.
"""
from agent_framework._types import AgentResponseUpdate
from agent_framework._workflows._events import WorkflowEvent
# Create orchestrator message event (REAL format from Magentic)
update = AgentResponseUpdate(
contents=[Content.from_text(text="Planning: First, the writer will create content...")],
role="assistant",
author_name="Orchestrator",
additional_properties={
"magentic_event_type": "orchestrator_message",
"orchestrator_message_kind": "task_ledger",
"orchestrator_id": "magentic_orchestrator",
},
)
event = WorkflowEvent.emit(executor_id="magentic_orchestrator", data=update)
events = await mapper.convert_event(event, test_request)
# Currently, mapper treats this as regular WorkflowEvent[AgentResponseUpdate] (no special handling)
# This test documents the current behavior
assert len(events) >= 1
text_events = [e for e in events if getattr(e, "type", "") == "response.output_text.delta"]
assert len(text_events) >= 1
assert "Planning:" in text_events[0].delta
async def test_magentic_events_use_same_event_class_as_other_workflows(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Verify Magentic uses the same WorkflowEvent class as other workflows.
This test documents that Magentic does NOT define separate event classes like
MagenticAgentDeltaEvent - it reuses WorkflowEvent with metadata in
additional_properties. Any mapper code checking for 'MagenticAgentDeltaEvent'
class names is dead code.
"""
from agent_framework._types import AgentResponseUpdate
from agent_framework._workflows._events import WorkflowEvent
# Create events the way different workflows do it
# 1. Regular workflow (no additional_properties)
regular_update = AgentResponseUpdate(
contents=[Content.from_text(text="Regular workflow response")],
role="assistant",
)
regular_event = WorkflowEvent.emit(executor_id="regular_executor", data=regular_update)
# 2. Magentic workflow (with additional_properties)
magentic_update = AgentResponseUpdate(
contents=[Content.from_text(text="Magentic workflow response")],
role="assistant",
additional_properties={"magentic_event_type": "agent_delta"},
)
magentic_event = WorkflowEvent.emit(executor_id="magentic_executor", data=magentic_update)
# Both should be the SAME class
assert type(regular_event) is type(magentic_event)
assert isinstance(regular_event, WorkflowEvent)
assert isinstance(magentic_event, WorkflowEvent)
# Both should be handled by the same isinstance check in mapper
regular_events = await mapper.convert_event(regular_event, test_request)
magentic_events = await mapper.convert_event(magentic_event, test_request)
# Both produce text delta events
regular_text = [e for e in regular_events if getattr(e, "type", "") == "response.output_text.delta"]
magentic_text = [e for e in magentic_events if getattr(e, "type", "") == "response.output_text.delta"]
assert len(regular_text) >= 1
assert len(magentic_text) >= 1
# =============================================================================
# Unknown Content Fallback Tests
# =============================================================================
async def test_unknown_content_fallback(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test graceful handling of unknown content types."""
class MockUnknownContent:
def __init__(self):
self.__class__.__name__ = "WeirdUnknownContent"
context = mapper._get_or_create_context(test_request)
unknown_content = MockUnknownContent()
event = await mapper._create_unknown_content_event(unknown_content, context)
assert event.type == "response.output_text.delta"
assert "Unknown content type" in event.delta
assert "WeirdUnknownContent" in event.delta
# =============================================================================
# output event (type='output') Tests
# =============================================================================
async def test_workflow_output_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test output event (type='output') is converted to output_item.added."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent.output(executor_id="final_executor", data="Final workflow output")
events = await mapper.convert_event(event, test_request)
# output event (type='output') should emit output_item.added
assert len(events) == 1
assert events[0].type == "response.output_item.added"
# Check item contains the output text
item = events[0].item
assert item.type == "message"
assert any("Final workflow output" in str(c) for c in item.content)
async def test_workflow_output_event_with_list_data(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test output event (type='output') with list data (common for sequential/concurrent workflows)."""
from agent_framework import Message
from agent_framework._workflows._events import WorkflowEvent
# Sequential/Concurrent workflows often output list[Message]
messages = [
Message(role="user", contents=[Content.from_text(text="Hello")]),
Message(role="assistant", contents=[Content.from_text(text="World")]),
]
event = WorkflowEvent.output(executor_id="complete", data=messages)
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
# =============================================================================
# failed event (type='failed') Tests
# =============================================================================
async def test_workflow_failed_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test failed event (type='failed') is converted to response.failed."""
from agent_framework._workflows._events import WorkflowErrorDetails, WorkflowEvent
details = WorkflowErrorDetails(
error_type="TestError",
message="Workflow failed due to test error",
executor_id="failing_executor",
)
event = WorkflowEvent.failed(details=details)
events = await mapper.convert_event(event, test_request)
# failed event (type='failed') should emit response.failed
assert len(events) >= 1
# Find the failed event
failed_events = [e for e in events if getattr(e, "type", "") == "response.failed"]
assert len(failed_events) == 1, f"Expected response.failed, got types: {[getattr(e, 'type', '') for e in events]}"
# Check response contains error info
response = failed_events[0].response
assert response.status == "failed"
assert response.error is not None
# Verify error message is correctly extracted from details.message (not "Unknown error")
assert "Workflow failed due to test error" in response.error.message
assert "Unknown error" not in response.error.message
async def test_workflow_failed_event_with_extra(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test failed event (type='failed') includes extra context when available."""
from agent_framework._workflows._events import WorkflowErrorDetails, WorkflowEvent
details = WorkflowErrorDetails(
error_type="ValidationError",
message="Input validation failed",
executor_id="validation_executor",
extra={"field": "email", "reason": "invalid format"},
)
event = WorkflowEvent.failed(details=details)
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.failed"
response = events[0].response
# Verify both the message and extra context are included
assert "Input validation failed" in response.error.message
assert "extra:" in response.error.message
assert "email" in response.error.message
async def test_workflow_failed_event_with_traceback(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test failed event (type='failed') includes traceback when available."""
from agent_framework._workflows._events import WorkflowErrorDetails, WorkflowEvent
details = WorkflowErrorDetails(
error_type="ValueError",
message="Invalid input provided",
traceback="Traceback (most recent call last):\n File ...\nValueError: Invalid input",
executor_id="validation_executor",
)
event = WorkflowEvent.failed(details=details)
events = await mapper.convert_event(event, test_request)
assert len(events) == 1
assert events[0].type == "response.failed"
# =============================================================================
# WorkflowWarningEvent and WorkflowErrorEvent Tests
# =============================================================================
async def test_workflow_warning_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test WorkflowEvent(type='warning') is converted to trace event."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent.warning("This is a warning message")
events = await mapper.convert_event(event, test_request)
# WorkflowEvent(type='warning') should emit a trace event
assert len(events) == 1
assert events[0].type == "response.trace.completed"
assert events[0].data["event_type"] == "warning"
async def test_workflow_error_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test WorkflowEvent(type='error') is converted to trace event."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent.error(ValueError("Something went wrong"))
events = await mapper.convert_event(event, test_request)
# WorkflowEvent(type='error') should emit a trace event
assert len(events) == 1
assert events[0].type == "response.trace.completed"
assert events[0].data["event_type"] == "error"
# =============================================================================
# request_info event (type='request_info') Tests (Human-in-the-Loop)
# =============================================================================
async def test_request_info_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test request_info event (type='request_info') is converted to HIL request event."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent.request_info(
request_id="req_123",
source_executor_id="approval_executor",
request_data={"action": "approve", "details": "Please approve this action"},
response_type=str,
)
events = await mapper.convert_event(event, test_request)
# request_info event (type='request_info') should emit response.request_info.requested
assert len(events) >= 1
# Check that request info is captured
has_hil_event = any(getattr(e, "type", "") == "response.request_info.requested" for e in events)
assert has_hil_event, f"Expected response.request_info.requested, got: {[getattr(e, 'type', '') for e in events]}"
# Verify the event contains the expected data
hil_event = [e for e in events if getattr(e, "type", "") == "response.request_info.requested"][0]
assert hil_event.request_id == "req_123"
assert hil_event.source_executor_id == "approval_executor"
# =============================================================================
# SuperStep Event Tests
# =============================================================================
async def test_superstep_started_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test superstep_started event (type='superstep_started') is handled gracefully."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent.superstep_started(iteration=1)
events = await mapper.convert_event(event, test_request)
# superstep_started event (type='superstep_started') may not emit events (internal workflow signal)
# Just ensure it doesn't crash
assert isinstance(events, list)
async def test_superstep_completed_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test superstep_completed event (type='superstep_completed') is handled gracefully."""
from agent_framework._workflows._events import WorkflowEvent
event = WorkflowEvent.superstep_completed(iteration=1)
events = await mapper.convert_event(event, test_request)
# superstep_completed event (type='superstep_completed') may not emit events (internal workflow signal)
# Just ensure it doesn't crash
assert isinstance(events, list)