Python: Default DevUI workflows to string input when start node is auto-wrapped agent (#1143)

* DevUI workflows default to string input when start node is AgentExecutor

* Remove unnecessary sample

* Fix lint issue
This commit is contained in:
Evan Mattson
2025-10-03 17:11:38 +09:00
committed by GitHub
Unverified
parent 5a208ab1e1
commit c543dbaa92
5 changed files with 230 additions and 20 deletions
@@ -7,7 +7,7 @@ import logging
import os
import uuid
from collections.abc import AsyncGenerator
from typing import Any
from typing import Any, get_origin
from agent_framework import AgentThread
@@ -638,6 +638,54 @@ class AgentFrameworkExecutor:
logger.warning(f"Error parsing workflow input: {e}")
return raw_input
def _get_start_executor_message_types(self, workflow: Any) -> tuple[Any | None, list[Any]]:
"""Return start executor and its declared input types."""
try:
start_executor = workflow.get_start_executor()
except Exception as exc: # pragma: no cover - defensive logging path
logger.debug(f"Unable to access workflow start executor: {exc}")
return None, []
if not start_executor:
return None, []
message_types: list[Any] = []
try:
input_types = getattr(start_executor, "input_types", None)
except Exception as exc: # pragma: no cover - defensive logging path
logger.debug(f"Failed to read executor input_types: {exc}")
else:
if input_types:
message_types = list(input_types)
if not message_types and hasattr(start_executor, "_handlers"):
try:
handlers = start_executor._handlers
if isinstance(handlers, dict):
message_types = list(handlers.keys())
except Exception as exc: # pragma: no cover - defensive logging path
logger.debug(f"Failed to read executor handlers: {exc}")
return start_executor, message_types
def _select_primary_input_type(self, message_types: list[Any]) -> Any | None:
"""Choose the most user-friendly input type for workflow kick-off."""
if not message_types:
return None
preferred = (str, dict)
for candidate in preferred:
for message_type in message_types:
if message_type is candidate:
return candidate
origin = get_origin(message_type)
if origin is candidate:
return candidate
return message_types[0]
def _parse_structured_workflow_input(self, workflow: Any, input_data: dict[str, Any]) -> Any:
"""Parse structured input data for workflow execution.
@@ -650,18 +698,20 @@ class AgentFrameworkExecutor:
"""
try:
# Get the start executor and its input type
start_executor = workflow.get_start_executor()
if not start_executor or not hasattr(start_executor, "_handlers"):
start_executor, message_types = self._get_start_executor_message_types(workflow)
if not start_executor:
logger.debug("Cannot determine input type for workflow - using raw dict")
return input_data
message_types = list(start_executor._handlers.keys())
if not message_types:
logger.debug("No message types found for start executor - using raw dict")
return input_data
# Get the first (primary) input type
input_type = message_types[0]
input_type = self._select_primary_input_type(message_types)
if input_type is None:
logger.debug("Could not select primary input type for workflow - using raw dict")
return input_data
# If input type is dict, return as-is
if input_type is dict:
@@ -715,18 +765,20 @@ class AgentFrameworkExecutor:
"""
try:
# Get the start executor and its input type
start_executor = workflow.get_start_executor()
if not start_executor or not hasattr(start_executor, "_handlers"):
start_executor, message_types = self._get_start_executor_message_types(workflow)
if not start_executor:
logger.debug("Cannot determine input type for workflow - using raw string")
return raw_input
message_types = list(start_executor._handlers.keys())
if not message_types:
logger.debug("No message types found for start executor - using raw string")
return raw_input
# Get the first (primary) input type
input_type = message_types[0]
input_type = self._select_primary_input_type(message_types)
if input_type is None:
logger.debug("Could not select primary input type for workflow - using raw string")
return raw_input
# If input type is str, return as-is
if input_type is str:
@@ -7,7 +7,7 @@ import json
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any
from typing import Any, get_origin
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
@@ -23,6 +23,47 @@ from .models._discovery_models import DiscoveryResponse, EntityInfo
logger = logging.getLogger(__name__)
def _extract_executor_message_types(executor: Any) -> list[Any]:
"""Return declared input types for the given executor."""
message_types: list[Any] = []
try:
input_types = getattr(executor, "input_types", None)
except Exception as exc: # pragma: no cover - defensive logging path
logger.debug(f"Failed to access executor input_types: {exc}")
else:
if input_types:
message_types = list(input_types)
if not message_types and hasattr(executor, "_handlers"):
try:
handlers = executor._handlers
if isinstance(handlers, dict):
message_types = list(handlers.keys())
except Exception as exc: # pragma: no cover - defensive logging path
logger.debug(f"Failed to read executor handlers: {exc}")
return message_types
def _select_primary_input_type(message_types: list[Any]) -> Any | None:
"""Choose the most user-friendly input type for rendering workflow inputs."""
if not message_types:
return None
preferred = (str, dict)
for candidate in preferred:
for message_type in message_types:
if message_type is candidate:
return candidate
origin = get_origin(message_type)
if origin is candidate:
return candidate
return message_types[0]
class DevServer:
"""Development Server - OpenAI compatible API server for debugging agents."""
@@ -223,23 +264,36 @@ class DevServer:
try:
start_executor = entity_obj.get_start_executor()
if start_executor and hasattr(start_executor, "_handlers"):
message_types = list(start_executor._handlers.keys())
if message_types:
input_type = message_types[0]
except Exception as e:
logger.debug(f"Could not extract input info for workflow {entity_id}: {e}")
else:
if start_executor:
start_executor_id = getattr(start_executor, "executor_id", "") or getattr(
start_executor, "id", ""
)
message_types = _extract_executor_message_types(start_executor)
input_type = _select_primary_input_type(message_types)
if input_type:
input_type_name = getattr(input_type, "__name__", str(input_type))
# Basic schema generation for common types
if input_type is str:
input_schema = {"type": "string"}
elif input_type is dict:
input_schema = {"type": "object"}
elif hasattr(input_type, "model_json_schema"):
input_schema = input_type.model_json_schema()
try:
input_schema = input_type.model_json_schema()
except Exception as exc: # pragma: no cover - defensive path
logger.debug(f"model_json_schema() failed for workflow {entity_id}: {exc}")
elif hasattr(input_type, "__annotations__"):
input_schema = {"type": "object"}
start_executor_id = getattr(start_executor, "executor_id", "")
except Exception as e:
logger.debug(f"Could not extract input info for workflow {entity_id}: {e}")
if not input_schema:
input_schema = {"type": "string"}
if input_type_name == "Unknown":
input_type_name = "string"
# Get executor list
executor_list = []
@@ -15,6 +15,26 @@ from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkExtraBody, AgentFrameworkRequest
class _DummyStartExecutor:
"""Minimal executor stub exposing handler metadata for tests."""
def __init__(self, *, input_types=None, handlers=None):
if input_types is not None:
self.input_types = list(input_types)
if handlers is not None:
self._handlers = dict(handlers)
class _DummyWorkflow:
"""Simple workflow stub returning configured start executor."""
def __init__(self, start_executor):
self._start_executor = start_executor
def get_start_executor(self):
return self._start_executor
@pytest.fixture
def test_entities_dir():
"""Use the samples directory which has proper entity structure."""
@@ -137,6 +157,50 @@ async def test_executor_missing_entity_id(executor):
assert entity_id is None
def test_executor_get_start_executor_message_types_uses_handlers():
"""Ensure handler metadata is surfaced when input_types missing."""
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
start_executor = _DummyStartExecutor(handlers={str: lambda *_: None})
workflow = _DummyWorkflow(start_executor)
start, message_types = executor._get_start_executor_message_types(workflow)
assert start is start_executor
assert str in message_types
def test_executor_select_primary_input_prefers_string():
"""Select string input even when discovered after other handlers."""
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
placeholder_type = type("Placeholder", (), {})
chosen = executor._select_primary_input_type([placeholder_type, str])
assert chosen is str
def test_executor_parse_structured_prefers_input_field():
"""Structured payloads map to string when agent start requires text."""
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
start_executor = _DummyStartExecutor(handlers={type("Req", (), {}): None, str: lambda *_: None})
workflow = _DummyWorkflow(start_executor)
parsed = executor._parse_structured_workflow_input(workflow, {"input": "hello"})
assert parsed == "hello"
def test_executor_parse_raw_falls_back_to_string():
"""Raw inputs remain untouched when start executor expects text."""
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
start_executor = _DummyStartExecutor(handlers={str: lambda *_: None})
workflow = _DummyWorkflow(start_executor)
parsed = executor._parse_raw_workflow_input(workflow, "hi there")
assert parsed == "hi there"
if __name__ == "__main__":
# Simple test runner
async def run_tests():
@@ -9,9 +9,20 @@ from pathlib import Path
import pytest
from agent_framework_devui import DevServer
from agent_framework_devui._server import _extract_executor_message_types, _select_primary_input_type
from agent_framework_devui.models._openai_custom import AgentFrameworkExtraBody, AgentFrameworkRequest
class _StubExecutor:
"""Simple executor stub exposing handler metadata."""
def __init__(self, *, input_types=None, handlers=None):
if input_types is not None:
self.input_types = list(input_types)
if handlers is not None:
self._handlers = dict(handlers)
@pytest.fixture
def test_entities_dir():
"""Use the samples directory which has proper entity structure."""
@@ -97,6 +108,36 @@ def test_configuration():
assert server.ui_enabled
def test_extract_executor_message_types_prefers_input_types():
"""Input types property is used when available."""
stub = _StubExecutor(input_types=[str, dict])
types = _extract_executor_message_types(stub)
assert types == [str, dict]
def test_extract_executor_message_types_falls_back_to_handlers():
"""Handlers provide message metadata when input_types missing."""
stub = _StubExecutor(handlers={str: object(), int: object()})
types = _extract_executor_message_types(stub)
assert str in types
assert int in types
def test_select_primary_input_type_prefers_string_and_dict():
"""Primary type selection prefers user-friendly primitives."""
string_first = _select_primary_input_type([dict[str, str], str])
dict_first = _select_primary_input_type([dict[str, str]])
fallback = _select_primary_input_type([int, float])
assert string_first is str
assert dict_first is dict
assert fallback is int
if __name__ == "__main__":
# Simple test runner
async def run_tests():