Files
agent-framework/python/packages/devui/tests/test_execution.py
T
Victor Dibia c341ee7ed2 Python: DevUI - Internal Refactor, Conversations API support, and per… (#1235)
* Python: DevUI - Internal Refactor, Conversations API support, and performance improvements

Comprehensive refactor of DevUI package including samples relocation,
frontend reorganization, OpenAI Conversations API support, and critical
performance and code quality improvements.

Key Changes:

Architecture & Organization
- Moved DevUI samples to python/samples/getting_started/devui/
- Consolidated with other framework samples for better discoverability
- Added .env.example files and comprehensive README
- Restructured frontend components into feature-based folders (agent, workflow, gallery, layout)
- Created new OpenAI-compliant message renderers (devui should render oai responses types primarily)

New Features
- Added _conversations.py (467 lines) - Full conversation storage abstraction, replaces the /threads endpoint to better match oai conversations api
- Implements OpenAI Conversations API for thread management, Supports in-memory and extensible storage backends

API Simplification
- Use 'model' field as entity_id (agent/workflow name) instead of extra_body
- Use standard OpenAI 'conversation' field for conversation context.

Performance & Quality Improvements
- Improved context management in MessageMapper with bounded memory (~500KB max)
- Implemented hybrid LRU + cleanup approach to prevent unbounded memory growth
- General QOL improvement - Eliminated ~150 lines of dead/duplicate code, Consolidated helper functions into _utils.py, Extracted magic numbers to module-level constants, Optimized conversation item lookups with index-based approach

Testing
- Added test_conversations.py (13 tests)
- Added test_performance_fixes.py (9 tests)
- Updated existing tests for code consolidation
- 53 tests passing

Impact: 76 files changed: +4,106 insertions, -2,373 deletions
All linting and formatting checks passing. No breaking changes - backward compatible.

Migration: Samples moved to python/samples/getting_started/devui/

* readme lint fixes

* initial support for function approval and minor ui fixes
2025-10-08 19:34:30 +00:00

306 lines
10 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Focused tests for execution flow functionality."""
import asyncio
import os
import tempfile
from pathlib import Path
import pytest
from agent_framework_devui._discovery import EntityDiscovery
from agent_framework_devui._executor import AgentFrameworkExecutor, EntityNotFoundError
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
class _DummyStartExecutor:
"""Minimal executor stub exposing handler metadata for tests."""
def __init__(self, *, input_types=None, handlers=None):
if input_types is not None:
self.input_types = list(input_types)
if handlers is not None:
self._handlers = dict(handlers)
class _DummyWorkflow:
"""Simple workflow stub returning configured start executor."""
def __init__(self, start_executor):
self._start_executor = start_executor
def get_start_executor(self):
return self._start_executor
@pytest.fixture
def test_entities_dir():
"""Use the samples directory which has proper entity structure."""
# Get the samples directory from the main python samples folder
current_dir = Path(__file__).parent
# Navigate to python/samples/getting_started/devui
samples_dir = current_dir.parent.parent.parent / "samples" / "getting_started" / "devui"
return str(samples_dir.resolve())
@pytest.fixture
async def executor(test_entities_dir):
"""Create configured executor."""
discovery = EntityDiscovery(test_entities_dir)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Discover entities
await executor.discover_entities()
return executor
async def test_executor_entity_discovery(executor):
"""Test executor entity discovery."""
entities = await executor.discover_entities()
# Should find entities from samples directory
assert len(entities) > 0, "Should discover at least one entity"
entity_types = [e.type for e in entities]
assert "agent" in entity_types, "Should find at least one agent"
assert "workflow" in entity_types, "Should find at least one workflow"
# Test entity structure
for entity in entities:
assert entity.id, "Entity should have an ID"
assert entity.name, "Entity should have a name"
assert entity.type in ["agent", "workflow"], "Entity should have valid type"
async def test_executor_get_entity_info(executor):
"""Test getting entity info by ID."""
entities = await executor.discover_entities()
entity_id = entities[0].id
entity_info = executor.get_entity_info(entity_id)
assert entity_info is not None
assert entity_info.id == entity_id
assert entity_info.type in ["agent", "workflow"]
@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="requires OpenAI API key")
async def test_executor_sync_execution(executor):
"""Test synchronous execution."""
entities = await executor.discover_entities()
# Find an agent entity to test with
agents = [e for e in entities if e.type == "agent"]
assert len(agents) > 0, "No agent entities found for testing"
agent_id = agents[0].id
# Use simplified routing: model = entity_id
request = AgentFrameworkRequest(
model=agent_id, # Model IS the entity_id
input="test data",
stream=False,
)
response = await executor.execute_sync(request)
# With simplified routing, response.model reflects the actual agent_id
assert response.model == agent_id
assert response.object == "response"
assert len(response.output) > 0
assert response.usage.total_tokens > 0
@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="requires OpenAI API key")
@pytest.mark.skip("Skipping while we fix discovery")
async def test_executor_streaming_execution(executor):
"""Test streaming execution."""
entities = await executor.discover_entities()
# Find an agent entity to test with
agents = [e for e in entities if e.type == "agent"]
assert len(agents) > 0, "No agent entities found for testing"
agent_id = agents[0].id
# Use simplified routing: model = entity_id
request = AgentFrameworkRequest(
model=agent_id, # Model IS the entity_id
input="streaming test",
stream=True,
)
event_count = 0
text_events = []
async for event in executor.execute_streaming(request):
event_count += 1
if hasattr(event, "type") and event.type == "response.output_text.delta":
text_events.append(event.delta)
if event_count > 10: # Limit for testing
break
assert event_count > 0
assert len(text_events) > 0
async def test_executor_invalid_entity_id(executor):
"""Test execution with invalid entity ID."""
with pytest.raises(EntityNotFoundError):
executor.get_entity_info("nonexistent_agent")
async def test_executor_missing_entity_id(executor):
"""Test get_entity_id returns model field (simplified routing)."""
request = AgentFrameworkRequest(
model="my_agent",
input="test",
stream=False,
)
# With simplified routing, model field IS the entity_id
entity_id = request.get_entity_id()
assert entity_id == "my_agent"
def test_executor_get_start_executor_message_types_uses_handlers():
"""Ensure handler metadata is surfaced when input_types missing."""
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
start_executor = _DummyStartExecutor(handlers={str: lambda *_: None})
workflow = _DummyWorkflow(start_executor)
start, message_types = executor._get_start_executor_message_types(workflow)
assert start is start_executor
assert str in message_types
def test_executor_select_primary_input_prefers_string():
"""Select string input even when discovered after other handlers."""
from agent_framework_devui._utils import select_primary_input_type
placeholder_type = type("Placeholder", (), {})
chosen = select_primary_input_type([placeholder_type, str])
assert chosen is str
def test_executor_parse_structured_prefers_input_field():
"""Structured payloads map to string when agent start requires text."""
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
start_executor = _DummyStartExecutor(handlers={type("Req", (), {}): None, str: lambda *_: None})
workflow = _DummyWorkflow(start_executor)
parsed = executor._parse_structured_workflow_input(workflow, {"input": "hello"})
assert parsed == "hello"
def test_executor_parse_raw_falls_back_to_string():
"""Raw inputs remain untouched when start executor expects text."""
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
start_executor = _DummyStartExecutor(handlers={str: lambda *_: None})
workflow = _DummyWorkflow(start_executor)
parsed = executor._parse_raw_workflow_input(workflow, "hi there")
assert parsed == "hi there"
async def test_executor_handles_non_streaming_agent():
"""Test executor can handle agents with only run() method (no run_stream)."""
from agent_framework import AgentRunResponse, AgentThread, ChatMessage, Role, TextContent
class NonStreamingAgent:
"""Agent with only run() method - does NOT satisfy full AgentProtocol."""
id = "non_streaming_test"
name = "Non-Streaming Test Agent"
description = "Test agent without run_stream()"
@property
def display_name(self):
return self.name
async def run(self, messages=None, *, thread=None, **kwargs):
return AgentRunResponse(
messages=[ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=f"Processed: {messages}")])],
response_id="test_123",
)
def get_new_thread(self, **kwargs):
return AgentThread()
# Create executor and register agent
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
agent = NonStreamingAgent()
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
discovery.register_entity(entity_info.id, entity_info, agent)
# Execute non-streaming agent (use simplified routing)
request = AgentFrameworkRequest(
model=entity_info.id, # Model IS the entity_id
input="hello",
stream=True, # DevUI always streams
)
events = []
async for event in executor.execute_streaming(request):
events.append(event)
# Should get events even though agent doesn't stream
assert len(events) > 0
text_events = [e for e in events if hasattr(e, "type") and e.type == "response.output_text.delta"]
assert len(text_events) > 0
assert "Processed: hello" in text_events[0].delta
if __name__ == "__main__":
# Simple test runner
async def run_tests():
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create test agent
agent_file = temp_path / "streaming_agent.py"
agent_file.write_text("""
class StreamingAgent:
name = "Streaming Test Agent"
description = "Test agent for streaming"
async def run_stream(self, input_str):
for i, word in enumerate(f"Processing {input_str}".split()):
yield f"word_{i}: {word} "
""")
discovery = EntityDiscovery(str(temp_path))
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Test discovery
entities = await executor.discover_entities()
if entities:
# Test sync execution (use simplified routing)
request = AgentFrameworkRequest(
model=entities[0].id, # Model IS the entity_id
input="test input",
stream=False,
)
await executor.execute_sync(request)
# Test streaming execution
request.stream = True
event_count = 0
async for _event in executor.execute_streaming(request):
event_count += 1
if event_count > 5: # Limit for testing
break
asyncio.run(run_tests())