Files
agent-framework/python/packages/devui/tests/test_execution.py
T
Victor Dibia 1ef24d3e91 Python: Add DevUI to AgentFramework (#781)
* add initial backend service code for devui

* add tests

* add frontendcode

* ui updates

* update readme

* ui updates and tweaks

* update ui bundle

* improve ui, add react flow base

* add react flow ui, fix background

* update ui, fix introspection bug

* update readme

* update ui build

* add support for multimodal input - both backend and frontend

* update ui build

* refactor as main framework package

* backend and tests refactor

* ui build update

* ui build update and refactor

* update pyproject.toml, update uv.lock

* update ui build

* ui update to fit oai responses types

* add backend updat and readme update

* mypy and other fixes

* add intial dev guide

* update ui and fix workflow bug

* update ui build, add thread support

* type fixes

* update workflow view

* update uv.lock

* fix workflow iport errors

* lint and other fixes

* mypy fixes

* minor update

* update ui build

* refactor to use oai dependencies directly, update examples to samples, improve typing

* readme update

* update ui and ui build

* fix workflow pyright error

* update ui, fix issues with run workflow placement, miniamp menu, etc

* make samples integrate serve

---------

Co-authored-by: Chris <66376200+crickman@users.noreply.github.com>
Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
2025-09-22 23:30:08 +00:00

190 lines
6.0 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Focused tests for execution flow functionality."""
import asyncio
import os
import tempfile
from pathlib import Path
import pytest
from agent_framework_devui._discovery import EntityDiscovery
from agent_framework_devui._executor import AgentFrameworkExecutor, EntityNotFoundError
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkExtraBody, AgentFrameworkRequest
@pytest.fixture
def test_entities_dir():
"""Use the samples directory which has proper entity structure."""
current_dir = Path(__file__).parent
samples_dir = current_dir.parent / "samples"
return str(samples_dir.resolve())
@pytest.fixture
async def executor(test_entities_dir):
"""Create configured executor."""
discovery = EntityDiscovery(test_entities_dir)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Discover entities
await executor.discover_entities()
return executor
@pytest.mark.asyncio
async def test_executor_entity_discovery(executor):
"""Test executor entity discovery."""
entities = await executor.discover_entities()
# Should find entities from samples directory
assert len(entities) > 0, "Should discover at least one entity"
entity_types = [e.type for e in entities]
assert "agent" in entity_types, "Should find at least one agent"
assert "workflow" in entity_types, "Should find at least one workflow"
# Test entity structure
for entity in entities:
assert entity.id, "Entity should have an ID"
assert entity.name, "Entity should have a name"
assert entity.type in ["agent", "workflow"], "Entity should have valid type"
@pytest.mark.asyncio
async def test_executor_get_entity_info(executor):
"""Test getting entity info by ID."""
entities = await executor.discover_entities()
entity_id = entities[0].id
entity_info = executor.get_entity_info(entity_id)
assert entity_info is not None
assert entity_info.id == entity_id
assert entity_info.type in ["agent", "workflow"]
@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="requires OpenAI API key")
@pytest.mark.asyncio
async def test_executor_sync_execution(executor):
"""Test synchronous execution."""
entities = await executor.discover_entities()
# Find an agent entity to test with
agents = [e for e in entities if e.type == "agent"]
assert len(agents) > 0, "No agent entities found for testing"
agent_id = agents[0].id
request = AgentFrameworkRequest(
model="agent-framework", input="test data", stream=False, extra_body=AgentFrameworkExtraBody(entity_id=agent_id)
)
response = await executor.execute_sync(request)
assert response.model == "agent-framework"
assert response.object == "response"
assert len(response.output) > 0
assert response.usage.total_tokens > 0
@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="requires OpenAI API key")
@pytest.mark.asyncio
async def test_executor_streaming_execution(executor):
"""Test streaming execution."""
entities = await executor.discover_entities()
# Find an agent entity to test with
agents = [e for e in entities if e.type == "agent"]
assert len(agents) > 0, "No agent entities found for testing"
agent_id = agents[0].id
request = AgentFrameworkRequest(
model="agent-framework",
input="streaming test",
stream=True,
extra_body=AgentFrameworkExtraBody(entity_id=agent_id),
)
event_count = 0
text_events = []
async for event in executor.execute_streaming(request):
event_count += 1
if hasattr(event, "type") and event.type == "response.output_text.delta":
text_events.append(event.delta)
if event_count > 10: # Limit for testing
break
assert event_count > 0
assert len(text_events) > 0
@pytest.mark.asyncio
async def test_executor_invalid_entity_id(executor):
"""Test execution with invalid entity ID."""
with pytest.raises(EntityNotFoundError):
executor.get_entity_info("nonexistent_agent")
@pytest.mark.asyncio
async def test_executor_missing_entity_id(executor):
"""Test execution without entity ID."""
request = AgentFrameworkRequest(
model="agent-framework",
input="test",
stream=False,
extra_body=None, # Test case for missing entity_id
)
entity_id = request.get_entity_id()
assert entity_id is None
if __name__ == "__main__":
# Simple test runner
async def run_tests():
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create test agent
agent_file = temp_path / "streaming_agent.py"
agent_file.write_text("""
class StreamingAgent:
name = "Streaming Test Agent"
description = "Test agent for streaming"
async def run_stream(self, input_str):
for i, word in enumerate(f"Processing {input_str}".split()):
yield f"word_{i}: {word} "
""")
discovery = EntityDiscovery(str(temp_path))
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Test discovery
entities = await executor.discover_entities()
if entities:
# Test sync execution
request = AgentFrameworkRequest(
model="agent-framework",
input="test input",
stream=False,
extra_body=AgentFrameworkExtraBody(entity_id=entities[0].id),
)
await executor.execute_sync(request)
# Test streaming execution
request.stream = True
event_count = 0
async for _event in executor.execute_streaming(request):
event_count += 1
if event_count > 5: # Limit for testing
break
asyncio.run(run_tests())