Files
agent-framework/python/packages/devui/tests/test_mapper.py
T
Victor Dibia 94eae24082 Python: DevUI: Add OpenAI Responses API proxy support + HIL for Workflows (#1737)
* DevUI: Add OpenAI Responses API proxy support with enhanced UI features

This commit adds support for proxying requests to OpenAI's Responses API,
allowing DevUI to route conversations to OpenAI models when configured to enable testing.

Backend changes:
- Add OpenAI proxy executor with conversation routing logic
- Enhance event mapper to support OpenAI Responses API format
- Extend server endpoints to handle OpenAI proxy mode
- Update models with OpenAI-specific response types
- Remove emojis from logging and CLI output for cleaner text

Frontend changes:
- Add settings modal with OpenAI proxy configuration UI
- Enhance agent and workflow views with improved state management
- Add new UI components (separator, switch) for settings
- Update debug panel with better event filtering
- Improve message renderers for OpenAI content types
- Update types and API client for OpenAI integration

* update ui, settings modal and workflow input form, add register cleanup hooks.

* add workflow HIL support, user mode, other fixes

* feat(devui): add human-in-the-loop (HIL) support with dynamic response schemas

Implement  HIL workflow support allowing workflows to pause for user input
with dynamically generated JSON schemas based on response handler type hints.

Key Features:
- Automatic response schema extraction from @response_handler decorators
- Dynamic form generation in UI based on Pydantic/dataclass response types
- Checkpoint-based conversation storage for HIL requests/responses
- Resume workflow execution after user provides HIL response

Backend Changes:
- Add extract_response_type_from_executor() to introspect response handlers
- Enrich RequestInfoEvent with response_schema via _enrich_request_info_event_with_response_schema()
- Map RequestInfoEvent to response.input.requested OpenAI event format
- Store HIL responses in conversation history and restore checkpoints

Frontend Changes:
- Add HILInputModal component with SchemaFormRenderer for dynamic forms
- Support Pydantic BaseModel and dataclass response types
- Render enum fields as dropdowns, strings as text/textarea, numbers, booleans, arrays, objects
- Display original request context alongside response form

Testing:
- Add  tests for checkpoint storage (test_checkpoints.py)
- Add schema generation tests for all input types (test_schema_generation.py)
- Validate end-to-end HIL flow with spam workflow sample

This enables workflows to seamlessly pause execution and request structured user input
with type-safe, validated forms generated automatically from response type annotations.

* improve HIL support, improve workflow execution view

* ui updates

* ui updates

* improve HIL for workflows, add auth and view modes

* update workflow

* security improvements , ui fixes

* fix mypy error

* update loading spinner in ui

---------

Co-authored-by: Mark Wallace <127216156+markwallace-microsoft@users.noreply.github.com>
2025-11-07 23:28:32 +00:00

496 lines
20 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Clean focused tests for message mapping functionality."""
import asyncio
import sys
from pathlib import Path
from typing import Any
import pytest
# Add the main agent_framework package for real types
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "main"))
# Import Agent Framework types (assuming they are always available)
from agent_framework._types import (
AgentRunResponseUpdate,
ErrorContent,
FunctionCallContent,
FunctionResultContent,
Role,
TextContent,
)
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
def create_test_content(content_type: str, **kwargs: Any) -> Any:
"""Create test content objects."""
if content_type == "text":
return TextContent(text=kwargs.get("text", "Hello, world!"))
if content_type == "function_call":
return FunctionCallContent(
call_id=kwargs.get("call_id", "test_call_id"),
name=kwargs.get("name", "test_func"),
arguments=kwargs.get("arguments", {"param": "value"}),
)
if content_type == "error":
return ErrorContent(message=kwargs.get("message", "Test error"), error_code=kwargs.get("code", "test_error"))
raise ValueError(f"Unknown content type: {content_type}")
def create_test_agent_update(contents: list[Any]) -> Any:
"""Create test AgentRunResponseUpdate - NO fake attributes!"""
return AgentRunResponseUpdate(
contents=contents, role=Role.ASSISTANT, message_id="test_msg", response_id="test_resp"
)
@pytest.fixture
def mapper() -> MessageMapper:
return MessageMapper()
@pytest.fixture
def test_request() -> AgentFrameworkRequest:
# Use metadata.entity_id for routing
return AgentFrameworkRequest(
metadata={"entity_id": "test_agent"},
input="Test input",
stream=True,
)
async def test_critical_isinstance_bug_detection(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""CRITICAL: Test that would have caught the isinstance vs hasattr bug."""
content = create_test_content("text", text="Bug detection test")
update = create_test_agent_update([content])
# Key assertions that would have caught the bug
assert hasattr(update, "contents") # Real attribute ✅
assert not hasattr(update, "response") # Fake attribute should not exist ✅
# Test isinstance works with real types
assert isinstance(update, AgentRunResponseUpdate)
# Test mapper conversion - should NOT produce "Unknown event"
events = await mapper.convert_event(update, test_request)
assert len(events) > 0
assert all(hasattr(event, "type") for event in events)
# Should never get unknown events with proper types
assert all(event.type != "unknown" for event in events)
async def test_text_content_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test TextContent mapping with proper OpenAI event hierarchy."""
content = create_test_content("text", text="Hello, clean test!")
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
# With proper OpenAI hierarchy, we expect 3 events:
# 1. response.output_item.added (message)
# 2. response.content_part.added (text part)
# 3. response.output_text.delta (actual text)
assert len(events) == 3
# Check message output item
assert events[0].type == "response.output_item.added"
assert events[0].item.type == "message"
assert events[0].item.role == "assistant"
# Check content part
assert events[1].type == "response.content_part.added"
assert events[1].part.type == "output_text"
# Check text delta
assert events[2].type == "response.output_text.delta"
assert events[2].delta == "Hello, clean test!"
async def test_function_call_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test FunctionCallContent mapping."""
content = create_test_content("function_call", name="test_func", arguments={"location": "TestCity"})
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
# Should generate: response.output_item.added + response.function_call_arguments.delta
assert len(events) >= 2
assert events[0].type == "response.output_item.added"
assert events[1].type == "response.function_call_arguments.delta"
# Check JSON is in delta event
delta_events = [e for e in events if e.type == "response.function_call_arguments.delta"]
full_json = "".join(event.delta for event in delta_events)
assert "TestCity" in full_json
async def test_function_result_content_with_string_result(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test FunctionResultContent with plain string result (regular tools)."""
content = FunctionResultContent(
call_id="test_call_123",
result="Hello, World!", # Plain string like regular Python function tools
)
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
# Should produce response.function_result.complete event
assert len(events) >= 1
result_events = [e for e in events if e.type == "response.function_result.complete"]
assert len(result_events) == 1
assert result_events[0].output == "Hello, World!"
assert result_events[0].call_id == "test_call_123"
assert result_events[0].status == "completed"
async def test_function_result_content_with_nested_content_objects(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test FunctionResultContent with nested Content objects (MCP tools case).
This tests the issue from GitHub #1476 where MCP tools return FunctionResultContent
with nested TextContent objects that fail to serialize properly.
"""
# This is what MCP tools return - result contains nested Content objects
content = FunctionResultContent(
call_id="mcp_call_456",
result=[TextContent(text="Hello from MCP!")], # List containing TextContent object
)
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
# Should successfully serialize the nested Content object
assert len(events) >= 1
result_events = [e for e in events if e.type == "response.function_result.complete"]
assert len(result_events) == 1
# The output should contain the text from the nested TextContent
# Should not have TypeError or empty output
assert result_events[0].output != ""
assert "Hello from MCP!" in result_events[0].output
assert result_events[0].call_id == "mcp_call_456"
async def test_function_result_content_with_multiple_nested_content_objects(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test FunctionResultContent with multiple nested Content objects."""
# MCP tools can return multiple Content objects
content = FunctionResultContent(
call_id="mcp_call_789",
result=[
TextContent(text="First result"),
TextContent(text="Second result"),
],
)
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
assert len(events) >= 1
result_events = [e for e in events if e.type == "response.function_result.complete"]
assert len(result_events) == 1
# Should serialize all nested Content objects
output = result_events[0].output
assert output != ""
assert "First result" in output
assert "Second result" in output
async def test_error_content_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test ErrorContent mapping."""
content = create_test_content("error", message="Test error", code="test_code")
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
assert len(events) == 1
assert events[0].type == "error"
assert events[0].message == "Test error"
assert events[0].code == "test_code"
async def test_mixed_content_types(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test multiple content types together."""
contents = [
create_test_content("text", text="Starting..."),
create_test_content("function_call", name="process", arguments={"data": "test"}),
create_test_content("text", text="Done!"),
]
update = create_test_agent_update(contents)
events = await mapper.convert_event(update, test_request)
assert len(events) >= 3
# Should have both types of events
event_types = {event.type for event in events}
assert "response.output_text.delta" in event_types
assert "response.function_call_arguments.delta" in event_types
async def test_unknown_content_fallback(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test graceful handling of unknown content types."""
# Test the fallback path directly since we can't create invalid AgentRunResponseUpdate
# due to Pydantic validation. Instead, test the content mapper's unknown content handling.
class MockUnknownContent:
def __init__(self):
self.__class__.__name__ = "WeirdUnknownContent" # Not in content_mappers
# Test the content mapper directly
context = mapper._get_or_create_context(test_request)
unknown_content = MockUnknownContent()
# This should trigger the unknown content fallback in _convert_agent_update
event = await mapper._create_unknown_content_event(unknown_content, context)
assert event.type == "response.output_text.delta"
assert "Unknown content type" in event.delta
assert "WeirdUnknownContent" in event.delta
async def test_agent_run_response_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that mapper handles complete AgentRunResponse (non-streaming)."""
from agent_framework import AgentRunResponse, ChatMessage, Role, TextContent
# Create a complete response like agent.run() would return
message = ChatMessage(
role=Role.ASSISTANT,
contents=[TextContent(text="Complete response from run()")],
)
response = AgentRunResponse(messages=[message], response_id="test_resp_123")
# Mapper should convert it to streaming events
events = await mapper.convert_event(response, test_request)
assert len(events) > 0
# Should produce text delta events
text_events = [e for e in events if e.type == "response.output_text.delta"]
assert len(text_events) > 0
assert text_events[0].delta == "Complete response from run()"
async def test_agent_lifecycle_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that agent lifecycle events are properly converted to OpenAI format."""
from agent_framework_devui.models._openai_custom import AgentCompletedEvent, AgentFailedEvent, AgentStartedEvent
# Test AgentStartedEvent
start_event = AgentStartedEvent()
events = await mapper.convert_event(start_event, test_request)
assert len(events) == 2 # Should emit response.created and response.in_progress
assert events[0].type == "response.created"
assert events[1].type == "response.in_progress"
assert events[0].response.model == "devui" # Should use 'devui' when model not specified in request
assert events[0].response.status == "in_progress"
# Test AgentCompletedEvent
complete_event = AgentCompletedEvent()
events = await mapper.convert_event(complete_event, test_request)
assert len(events) == 1
assert events[0].type == "response.completed"
assert events[0].response.status == "completed"
# Test AgentFailedEvent
error = Exception("Test error")
failed_event = AgentFailedEvent(error=error)
events = await mapper.convert_event(failed_event, test_request)
assert len(events) == 1
assert events[0].type == "response.failed"
assert events[0].response.status == "failed"
assert events[0].response.error.message == "Test error"
assert events[0].response.error.code == "server_error"
@pytest.mark.skip(reason="Workflow events need real classes from agent_framework.workflows")
async def test_workflow_lifecycle_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that workflow lifecycle events are properly converted to OpenAI format."""
# Create mock workflow events (since we don't have access to the real ones in tests)
class WorkflowStartedEvent: # noqa: B903
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
class WorkflowCompletedEvent: # noqa: B903
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
class WorkflowFailedEvent: # noqa: B903
def __init__(self, workflow_id: str, error_info: dict | None = None):
self.workflow_id = workflow_id
self.error_info = error_info
# Test WorkflowStartedEvent
start_event = WorkflowStartedEvent(workflow_id="test_workflow_123")
events = await mapper.convert_event(start_event, test_request)
assert len(events) == 2 # Should emit response.created and response.in_progress
assert events[0].type == "response.created"
assert events[1].type == "response.in_progress"
assert events[0].response.model == "test_agent" # Should use model from request
assert events[0].response.status == "in_progress"
# Test WorkflowCompletedEvent
complete_event = WorkflowCompletedEvent(workflow_id="test_workflow_123")
events = await mapper.convert_event(complete_event, test_request)
assert len(events) == 1
assert events[0].type == "response.completed"
assert events[0].response.status == "completed"
# Test WorkflowFailedEvent with error info
failed_event = WorkflowFailedEvent(workflow_id="test_workflow_123", error_info={"message": "Workflow failed"})
events = await mapper.convert_event(failed_event, test_request)
assert len(events) == 1
assert events[0].type == "response.failed"
assert events[0].response.status == "failed"
assert events[0].response.error.message == "{'message': 'Workflow failed'}"
assert events[0].response.error.code == "server_error"
@pytest.mark.skip(reason="Executor events need real classes from agent_framework.workflows")
async def test_executor_action_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that workflow executor events are properly converted to custom output item events."""
# Create mock executor events (since we don't have access to the real ones in tests)
class ExecutorInvokedEvent: # noqa: B903
def __init__(self, executor_id: str, executor_type: str = "test"):
self.executor_id = executor_id
self.executor_type = executor_type
class ExecutorCompletedEvent: # noqa: B903
def __init__(self, executor_id: str, result: Any = None):
self.executor_id = executor_id
self.result = result
class ExecutorFailedEvent: # noqa: B903
def __init__(self, executor_id: str, error: Exception | None = None):
self.executor_id = executor_id
self.error = error
# Test ExecutorInvokedEvent
invoked_event = ExecutorInvokedEvent(executor_id="exec_123", executor_type="test_executor")
events = await mapper.convert_event(invoked_event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
assert events[0].item["type"] == "executor_action"
assert events[0].item["executor_id"] == "exec_123"
assert events[0].item["status"] == "in_progress"
# Test ExecutorCompletedEvent
complete_event = ExecutorCompletedEvent(executor_id="exec_123", result={"data": "success"})
events = await mapper.convert_event(complete_event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.done"
assert events[0].item["type"] == "executor_action"
assert events[0].item["executor_id"] == "exec_123"
assert events[0].item["status"] == "completed"
assert events[0].item["result"] == {"data": "success"}
# Test ExecutorFailedEvent
failed_event = ExecutorFailedEvent(executor_id="exec_123", error=Exception("Executor failed"))
events = await mapper.convert_event(failed_event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.done"
assert events[0].item["type"] == "executor_action"
assert events[0].item["executor_id"] == "exec_123"
assert events[0].item["status"] == "failed"
assert "Executor failed" in str(events[0].item["error"]["message"])
async def test_magentic_agent_delta_creates_message_container(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test that MagenticAgentDeltaEvent creates message containers (Option A implementation)."""
# Create mock MagenticAgentDeltaEvent that mimics the real class
from dataclasses import dataclass
try:
from agent_framework import WorkflowEvent
@dataclass
class MagenticAgentDeltaEvent(WorkflowEvent): # Inherit from WorkflowEvent
agent_id: str
text: str | None = None
except ImportError:
# Fallback if WorkflowEvent is not available
@dataclass
class MagenticAgentDeltaEvent: # Use the expected name directly
agent_id: str
text: str | None = None
# First delta should create message container
first_delta = MagenticAgentDeltaEvent(agent_id="test_agent", text="Hello ")
events = await mapper.convert_event(first_delta, test_request)
# Should emit 3 events: message container, content part, and text delta
assert len(events) == 3
assert events[0].type == "response.output_item.added"
assert events[0].item.type == "message" # Message, not executor_action!
assert events[0].item.metadata["agent_id"] == "test_agent"
assert events[0].item.metadata["source"] == "magentic"
message_id = events[0].item.id
# Check text delta references the message ID
assert events[2].type == "response.output_text.delta"
assert events[2].item_id == message_id
assert events[2].delta == "Hello "
# Second delta should NOT create new container
second_delta = MagenticAgentDeltaEvent(agent_id="test_agent", text="world!")
events = await mapper.convert_event(second_delta, test_request)
# Only text delta, no new container
assert len(events) == 1
assert events[0].type == "response.output_text.delta"
assert events[0].item_id == message_id
if __name__ == "__main__":
# Simple test runner
async def run_all_tests() -> None:
mapper = MessageMapper()
test_request = AgentFrameworkRequest(
metadata={"entity_id": "test"},
input="Test",
stream=True,
)
tests = [
("Critical isinstance bug detection", test_critical_isinstance_bug_detection),
("Text content mapping", test_text_content_mapping),
("Function call mapping", test_function_call_mapping),
("Error content mapping", test_error_content_mapping),
("Mixed content types", test_mixed_content_types),
("Unknown content fallback", test_unknown_content_fallback),
]
passed = 0
for _test_name, test_func in tests:
try:
await test_func(mapper, test_request)
passed += 1
except Exception:
pass
asyncio.run(run_all_tests())