Python: .NET: Fix .NET conversation memory in DevUI (#3484) (#4294)

* Fix .NET conversation memory in DevUI (#3484)

* formatting fixes

* fix memory regression in python devui , fix for #4123

* Fix for #3983: Added _get_event_type() helper that safely accesses event type on both objects (.type) and dicts (.get("type")). Replaced all 4 bare event.type accesses in _executor.py (lines 267, 477, 499, 523).

Root cause: PR #3690 changed event.__class__.__name__ == "RequestInfoEvent" (safe) to event.type == "request_info" (crashes on dicts), but _execute_workflow still yields raw dicts on error paths.

Test: test_workflow_error_yields_dict_event_without_crash — mocks a workflow that raises, verifies execute_entity consumes the dict error events without crashing.

* format fixes

* lint fixes
This commit is contained in:
Victor Dibia
2026-03-02 02:34:25 -08:00
committed by GitHub
Unverified
parent 0d6b9d61a5
commit 9124d51e0e
9 changed files with 388 additions and 16 deletions
@@ -21,6 +21,13 @@ from .models._discovery_models import EntityInfo
logger = logging.getLogger(__name__)
def _get_event_type(event: Any) -> str | None:
"""Safely get the type of an event, handling both objects and dicts."""
if isinstance(event, dict):
return event.get("type")
return getattr(event, "type", None)
class EntityNotFoundError(Exception):
"""Raised when an entity is not found."""
@@ -264,7 +271,7 @@ class AgentFrameworkExecutor:
elif entity_info.type == "workflow":
async for event in self._execute_workflow(entity_obj, request, trace_collector):
# Log request_info event (type='request_info') for debugging HIL flow
if event.type == "request_info":
if _get_event_type(event) == "request_info":
logger.info(
"🔔 [EXECUTOR] request_info event (type='request_info') detected from workflow!"
)
@@ -330,19 +337,22 @@ class AgentFrameworkExecutor:
# Agent must have run() method - use stream=True for streaming
if hasattr(agent, "run") and callable(agent.run):
# Use Agent Framework's run() with stream=True for streaming
# Capture the stream reference so we can call get_final_response()
# after iteration. This triggers result hooks (after_run providers
# like InMemoryHistoryProvider) that persist conversation history.
run_kwargs: dict[str, Any] = {"stream": True}
if session:
async for update in agent.run(user_message, stream=True, session=session):
for trace_event in trace_collector.get_pending_events():
yield trace_event
run_kwargs["session"] = session
yield update
else:
async for update in agent.run(user_message, stream=True):
for trace_event in trace_collector.get_pending_events():
yield trace_event
stream = agent.run(user_message, **run_kwargs)
async for update in stream:
for trace_event in trace_collector.get_pending_events():
yield trace_event
yield update
yield update
# Finalize stream to trigger result hooks (saves conversation history)
await stream.get_final_response()
else:
raise ValueError("Agent must implement run() method")
@@ -471,7 +481,7 @@ class AgentFrameworkExecutor:
checkpoint_storage=checkpoint_storage,
):
# Enrich new request_info events that may come from subsequent HIL requests
if event.type == "request_info":
if _get_event_type(event) == "request_info":
self._enrich_request_info_event_with_response_schema(event, workflow)
for trace_event in trace_collector.get_pending_events():
@@ -493,7 +503,7 @@ class AgentFrameworkExecutor:
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage,
):
if event.type == "request_info":
if _get_event_type(event) == "request_info":
self._enrich_request_info_event_with_response_schema(event, workflow)
for trace_event in trace_collector.get_pending_events():
@@ -517,7 +527,7 @@ class AgentFrameworkExecutor:
parsed_input = await self._parse_workflow_input(workflow, request.input)
async for event in workflow.run(parsed_input, stream=True, checkpoint_storage=checkpoint_storage):
if event.type == "request_info":
if _get_event_type(event) == "request_info":
self._enrich_request_info_event_with_response_schema(event, workflow)
for trace_event in trace_collector.get_pending_events():
@@ -741,6 +741,51 @@ async def test_full_pipeline_workflow_output_event_serialization():
assert len(output_events) >= 3, f"Expected 3+ output events for yield_output calls, got {len(output_events)}"
async def test_workflow_error_yields_dict_event_without_crash():
"""Test that workflow errors don't crash execute_entity (#3983).
When a workflow raises an exception, _execute_workflow yields a raw dict
{"type": "error", ...}. The execute_entity caller must handle both dict
events and object events without crashing on attribute access.
"""
from unittest.mock import AsyncMock, MagicMock
from agent_framework_devui.models._discovery_models import EntityInfo
discovery = MagicMock(spec=EntityDiscovery)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = EntityInfo(id="bad_wf", name="bad_wf", type="workflow", framework="agent_framework")
discovery.get_entity_info.return_value = entity_info
# Mock workflow whose run() raises
mock_workflow = MagicMock()
mock_workflow.name = "bad_wf"
def failing_run(*args, **kwargs):
raise RuntimeError("Sorry, something went wrong.")
mock_workflow.run = failing_run
discovery.load_entity = AsyncMock(return_value=mock_workflow)
request = AgentFrameworkRequest(
model="test",
input="hello",
metadata={"entity_id": "bad_wf"},
)
events = []
# This should NOT raise AttributeError: 'dict' object has no attribute 'type'
async for event in executor.execute_entity("bad_wf", request):
events.append(event)
# Should get at least one error event
assert len(events) > 0
error_events = [e for e in events if isinstance(e, dict) and e.get("type") == "error"]
assert len(error_events) > 0, f"Expected error dict events, got: {events}"
if __name__ == "__main__":
# Simple test runner
async def run_tests():