mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
9124d51e0e
* Fix .NET conversation memory in DevUI (#3484) * formatting fixes * fix memory regression in python devui , fix for #4123 * Fix for #3983: Added _get_event_type() helper that safely accesses event type on both objects (.type) and dicts (.get("type")). Replaced all 4 bare event.type accesses in _executor.py (lines 267, 477, 499, 523). Root cause: PR #3690 changed event.__class__.__name__ == "RequestInfoEvent" (safe) to event.type == "request_info" (crashes on dicts), but _execute_workflow still yields raw dicts on error paths. Test: test_workflow_error_yields_dict_event_without_crash — mocks a workflow that raises, verifies execute_entity consumes the dict error events without crashing. * format fixes * lint fixes
837 lines
30 KiB
Python
837 lines
30 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Focused tests for execution flow functionality.
|
|
|
|
Tests include:
|
|
- Entity discovery and info retrieval
|
|
- Agent execution (sync and streaming) using real Agent with mock LLM
|
|
- Workflow execution using real WorkflowBuilder with FunctionExecutor
|
|
- Edge cases like non-streaming agents
|
|
"""
|
|
|
|
import asyncio
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from agent_framework import Agent, AgentExecutor, FunctionExecutor, WorkflowBuilder
|
|
|
|
# Import mock classes from conftest for direct use in some tests
|
|
from conftest import MockBaseChatClient
|
|
|
|
from agent_framework_devui._discovery import EntityDiscovery
|
|
from agent_framework_devui._executor import AgentFrameworkExecutor, EntityNotFoundError
|
|
from agent_framework_devui._mapper import MessageMapper
|
|
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
|
|
|
|
# =============================================================================
|
|
# Local Fixtures (module-specific)
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.fixture
|
|
async def executor(test_entities_dir):
|
|
"""Create configured executor."""
|
|
discovery = EntityDiscovery(test_entities_dir)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
# Discover entities
|
|
await executor.discover_entities()
|
|
|
|
return executor
|
|
|
|
|
|
async def test_executor_entity_discovery(executor):
|
|
"""Test executor entity discovery."""
|
|
entities = await executor.discover_entities()
|
|
|
|
# Should find entities from samples directory
|
|
assert len(entities) > 0, "Should discover at least one entity"
|
|
|
|
entity_types = [e.type for e in entities]
|
|
assert "agent" in entity_types, "Should find at least one agent"
|
|
assert "workflow" in entity_types, "Should find at least one workflow"
|
|
|
|
# Test entity structure
|
|
for entity in entities:
|
|
assert entity.id, "Entity should have an ID"
|
|
assert entity.name, "Entity should have a name"
|
|
# Entities with only an `__init__.py` file cannot have their type determined
|
|
# until the module is imported during lazy loading. This is why 'unknown' type exists.
|
|
assert entity.type in ["agent", "workflow", "unknown"], (
|
|
"Entity should have valid type (unknown allowed during discovery phase)"
|
|
)
|
|
|
|
|
|
async def test_executor_get_entity_info(executor):
|
|
"""Test getting entity info by ID."""
|
|
entities = await executor.discover_entities()
|
|
entity_id = entities[0].id
|
|
|
|
entity_info = executor.get_entity_info(entity_id)
|
|
assert entity_info is not None
|
|
assert entity_info.id == entity_id
|
|
assert entity_info.type in ["agent", "workflow", "unknown"]
|
|
|
|
|
|
# =============================================================================
|
|
# Agent Execution Tests (using real Agent with mock LLM)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_agent_sync_execution(executor_with_real_agent):
|
|
"""Test synchronous agent execution with REAL Agent (mock LLM).
|
|
|
|
This tests the full execution pipeline without needing an API key:
|
|
- Real Agent class with middleware
|
|
- Real message normalization
|
|
- Mock chat client for LLM calls
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="test data",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Response model should be 'devui' when not specified
|
|
assert response.model == "devui"
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
# Verify mock client was called
|
|
assert mock_client.call_count == 1
|
|
|
|
|
|
async def test_agent_sync_execution_respects_model_field(executor_with_real_agent):
|
|
"""Test synchronous execution respects the model field in the response."""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
model="custom-model-name",
|
|
input="test data",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Response model should reflect the specified model
|
|
assert response.model == "custom-model-name"
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
|
|
async def test_chat_client_receives_correct_messages(executor_with_real_agent):
|
|
"""Verify the mock chat client receives properly formatted messages.
|
|
|
|
This tests that the REAL Agent properly:
|
|
- Normalizes input messages
|
|
- Formats messages for the chat client
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="What is 2+2?",
|
|
stream=False,
|
|
)
|
|
|
|
await executor.execute_sync(request)
|
|
|
|
# Verify chat client was called
|
|
assert mock_client.call_count == 1
|
|
|
|
# Verify messages were received
|
|
assert len(mock_client.received_messages) == 1
|
|
messages = mock_client.received_messages[0]
|
|
|
|
# Should have at least one message
|
|
assert len(messages) >= 1, f"Expected messages, got: {messages}"
|
|
|
|
# Verify the input text is present in the messages
|
|
all_text = " ".join(m.text or "" for m in messages)
|
|
assert "2+2" in all_text, f"Expected '2+2' in messages, got text: '{all_text}'"
|
|
|
|
|
|
# =============================================================================
|
|
# Workflow Execution Tests (using real WorkflowBuilder with FunctionExecutor)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_workflow_streaming_execution():
|
|
"""Test workflow streaming execution with REAL WorkflowBuilder and FunctionExecutor.
|
|
|
|
This tests the full workflow execution pipeline without needing an API key.
|
|
Uses a simple function-based workflow that processes input.
|
|
"""
|
|
|
|
# Create a simple workflow using real agent_framework classes
|
|
def process_input(input_data: str) -> str:
|
|
return f"Processed: {input_data}"
|
|
|
|
start_executor = FunctionExecutor(id="process", func=process_input)
|
|
workflow = WorkflowBuilder(
|
|
name="Test Workflow",
|
|
description="Test workflow for execution",
|
|
start_executor=start_executor,
|
|
).build()
|
|
|
|
# Create executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute workflow
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="hello workflow",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# Should get events from workflow execution
|
|
assert len(events) > 0, "Should receive events from workflow"
|
|
|
|
# Check for workflow-specific events or completion
|
|
event_types = [getattr(e, "type", None) for e in events]
|
|
assert any(t is not None for t in event_types), f"Should have typed events, got: {event_types}"
|
|
|
|
|
|
async def test_workflow_sync_execution():
|
|
"""Test synchronous workflow execution."""
|
|
|
|
def echo(text: str) -> str:
|
|
return f"Echo: {text}"
|
|
|
|
start_executor = FunctionExecutor(id="echo", func=echo)
|
|
workflow = WorkflowBuilder(
|
|
name="Echo Workflow",
|
|
description="Simple echo workflow",
|
|
start_executor=start_executor,
|
|
).build()
|
|
|
|
# Create executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute workflow synchronously
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="test input",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Should get a valid response
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Serialization Tests (Run + Map + JSON)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_full_pipeline_agent_events_are_json_serializable(executor_with_real_agent):
|
|
"""CRITICAL TEST: Verify ALL events from agent execution can be JSON serialized.
|
|
|
|
This tests the exact code path that the server uses:
|
|
1. Execute agent via executor.execute_streaming()
|
|
2. Each event is converted by the mapper
|
|
3. Server calls model_dump_json() on each event for SSE
|
|
|
|
If any event contains non-serializable objects (like AgentResponse),
|
|
this test will fail - catching the bug before it hits production.
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Test message for serialization",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# This is EXACTLY what the server does before sending SSE
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
assert len(json_str) > 0
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={getattr(event, 'type', 'unknown')}: {e}")
|
|
|
|
# Should have received events
|
|
assert len(events) > 0, "Should receive events from agent execution"
|
|
|
|
# NO serialization errors allowed
|
|
assert len(serialization_errors) == 0, f"Found {len(serialization_errors)} serialization errors:\n" + "\n".join(
|
|
serialization_errors
|
|
)
|
|
|
|
|
|
async def test_full_pipeline_workflow_events_are_json_serializable():
|
|
"""CRITICAL TEST: Verify ALL events from workflow execution can be JSON serialized.
|
|
|
|
This is particularly important for workflows with AgentExecutor because:
|
|
- AgentExecutor produces executor_completed event (type='executor_completed') with AgentExecutorResponse
|
|
- AgentExecutorResponse contains AgentResponse and Message objects
|
|
- These are SerializationMixin objects, not Pydantic, which caused the original bug
|
|
|
|
This test ensures the ENTIRE streaming pipeline works end-to-end.
|
|
"""
|
|
# Create a workflow with AgentExecutor (the problematic case)
|
|
mock_client = MockBaseChatClient()
|
|
agent = Agent(
|
|
id="serialization_test_agent",
|
|
name="Serialization Test Agent",
|
|
description="Agent for testing serialization",
|
|
client=mock_client,
|
|
system_message="You are a test assistant.",
|
|
)
|
|
|
|
agent_executor = AgentExecutor(id="agent_node", agent=agent)
|
|
workflow = WorkflowBuilder(
|
|
name="Serialization Test Workflow",
|
|
description="Test workflow",
|
|
start_executor=agent_executor,
|
|
).build()
|
|
|
|
# Create executor and register
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="Test workflow serialization",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
event_types_seen = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
event_types_seen.append(event_type)
|
|
|
|
# This is EXACTLY what the server does before sending SSE
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
assert len(json_str) > 0
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
# Should have received events
|
|
assert len(events) > 0, "Should receive events from workflow execution"
|
|
|
|
# Verify we got workflow events (not just generic ones)
|
|
assert any("output_item" in str(t) for t in event_types_seen), (
|
|
f"Should see output_item events, got: {event_types_seen}"
|
|
)
|
|
|
|
# NO serialization errors allowed - this is the critical assertion
|
|
assert len(serialization_errors) == 0, (
|
|
f"Found {len(serialization_errors)} serialization errors:\n"
|
|
+ "\n".join(serialization_errors)
|
|
+ f"\n\nEvent types seen: {event_types_seen}"
|
|
)
|
|
|
|
# Also verify aggregate_to_response works (server calls this after streaming)
|
|
final_response = await mapper.aggregate_to_response(events, request)
|
|
assert final_response is not None
|
|
|
|
|
|
async def test_get_entity_info_raises_for_invalid_id(executor):
|
|
"""Test that get_entity_info raises EntityNotFoundError for invalid ID."""
|
|
with pytest.raises(EntityNotFoundError):
|
|
executor.get_entity_info("nonexistent_agent")
|
|
|
|
|
|
async def test_request_extracts_entity_id_from_metadata(executor):
|
|
"""Test that AgentFrameworkRequest extracts entity_id from metadata."""
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": "my_agent"},
|
|
input="test",
|
|
stream=False,
|
|
)
|
|
|
|
# entity_id is extracted from metadata
|
|
entity_id = request.get_entity_id()
|
|
assert entity_id == "my_agent"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_get_start_executor_message_types(sequential_workflow):
|
|
"""Test _get_start_executor_message_types with real workflow."""
|
|
executor, _entity_id, _mock_client, workflow = sequential_workflow
|
|
|
|
start_exec, message_types = executor._get_start_executor_message_types(workflow)
|
|
|
|
assert start_exec is not None
|
|
assert len(message_types) > 0
|
|
# Real sequential workflows accept str input
|
|
assert str in message_types
|
|
|
|
|
|
def test_executor_select_primary_input_prefers_string():
|
|
"""Select string input even when discovered after other handlers."""
|
|
from agent_framework_devui._utils import select_primary_input_type
|
|
|
|
placeholder_type = type("Placeholder", (), {})
|
|
|
|
chosen = select_primary_input_type([placeholder_type, str])
|
|
|
|
assert chosen is str
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_structured_extracts_input_for_string_workflow():
|
|
"""Structured payloads extract 'input' field when workflow expects str."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class StringInputExecutor(Executor):
|
|
"""Executor that accepts string input directly."""
|
|
|
|
@handler
|
|
async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {text}")
|
|
|
|
workflow = WorkflowBuilder(
|
|
name="String Workflow",
|
|
description="Accepts string",
|
|
start_executor=StringInputExecutor(id="str_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# When workflow expects str and receives {"input": "hello"}, extract "hello"
|
|
parsed = executor._parse_structured_workflow_input(workflow, {"input": "hello"})
|
|
assert parsed == "hello"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_raw_string_for_string_workflow():
|
|
"""Raw string inputs pass through for string-accepting workflows."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class StringInputExecutor(Executor):
|
|
"""Executor that accepts string input directly."""
|
|
|
|
@handler
|
|
async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {text}")
|
|
|
|
workflow = WorkflowBuilder(
|
|
name="String Workflow",
|
|
description="Accepts string",
|
|
start_executor=StringInputExecutor(id="str_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Raw string should pass through unchanged
|
|
parsed = executor._parse_raw_workflow_input(workflow, "hi there")
|
|
assert parsed == "hi there"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_converts_to_chat_message_for_sequential_workflow(sequential_workflow):
|
|
"""Sequential workflows convert string input to Message."""
|
|
from agent_framework import Message
|
|
|
|
executor, _entity_id, _mock_client, workflow = sequential_workflow
|
|
|
|
# Sequential workflows expect Message, so raw string becomes Message
|
|
parsed = executor._parse_raw_workflow_input(workflow, "hello")
|
|
|
|
assert isinstance(parsed, Message)
|
|
assert parsed.text == "hello"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_stringified_json_workflow_input():
|
|
"""Stringified JSON workflow input is parsed when workflow expects Pydantic model."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
from pydantic import BaseModel
|
|
|
|
class WorkflowInput(BaseModel):
|
|
input: str
|
|
metadata: dict | None = None
|
|
|
|
class PydanticInputExecutor(Executor):
|
|
"""Executor that accepts a Pydantic model input."""
|
|
|
|
@handler
|
|
async def process(self, data: WorkflowInput, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {data.input}")
|
|
|
|
# Build workflow with Pydantic input type
|
|
workflow = WorkflowBuilder(
|
|
name="Pydantic Workflow",
|
|
description="Accepts Pydantic input",
|
|
start_executor=PydanticInputExecutor(id="pydantic_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Simulate frontend sending JSON.stringify({"input": "testing!", "metadata": {"key": "value"}})
|
|
stringified_json = '{"input": "testing!", "metadata": {"key": "value"}}'
|
|
|
|
parsed = executor._parse_raw_workflow_input(workflow, stringified_json)
|
|
|
|
# Should parse into WorkflowInput object
|
|
assert isinstance(parsed, WorkflowInput)
|
|
assert parsed.input == "testing!"
|
|
assert parsed.metadata == {"key": "value"}
|
|
|
|
|
|
def test_extract_workflow_hil_responses_handles_stringified_json():
|
|
"""Test HIL response extraction handles both stringified and parsed JSON (regression test)."""
|
|
from agent_framework_devui._discovery import EntityDiscovery
|
|
from agent_framework_devui._executor import AgentFrameworkExecutor
|
|
from agent_framework_devui._mapper import MessageMapper
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Regression test: Frontend sends stringified JSON via streamWorkflowExecutionOpenAI
|
|
stringified = '[{"type":"message","content":[{"type":"workflow_hil_response","responses":{"req_1":"spam"}}]}]'
|
|
assert executor._extract_workflow_hil_responses(stringified) == {"req_1": "spam"}
|
|
|
|
# Ensure parsed format still works
|
|
parsed = [{"type": "message", "content": [{"type": "workflow_hil_response", "responses": {"req_2": "ham"}}]}]
|
|
assert executor._extract_workflow_hil_responses(parsed) == {"req_2": "ham"}
|
|
|
|
# Non-HIL inputs should return None
|
|
assert executor._extract_workflow_hil_responses("plain text") is None
|
|
assert executor._extract_workflow_hil_responses({"email": "test"}) is None
|
|
|
|
|
|
async def test_executor_handles_streaming_agent():
|
|
"""Test executor handles agents with run(stream=True) method."""
|
|
from agent_framework import AgentResponse, AgentResponseUpdate, AgentSession, Content, Message
|
|
|
|
class StreamingAgent:
|
|
"""Agent with run() method supporting stream parameter."""
|
|
|
|
id = "streaming_test"
|
|
name = "Streaming Test Agent"
|
|
description = "Test agent with run(stream=True)"
|
|
|
|
def run(self, messages=None, *, stream=False, session=None, **kwargs):
|
|
if stream:
|
|
# Return an async generator for streaming
|
|
return self._stream_impl(messages)
|
|
# Return awaitable for non-streaming
|
|
return self._run_impl(messages)
|
|
|
|
async def _run_impl(self, messages):
|
|
return AgentResponse(
|
|
messages=[Message(role="assistant", contents=[Content.from_text(text=f"Processed: {messages}")])],
|
|
response_id="test_123",
|
|
)
|
|
|
|
async def _stream_impl(self, messages):
|
|
yield AgentResponseUpdate(
|
|
contents=[Content.from_text(text=f"Processed: {messages}")],
|
|
role="assistant",
|
|
)
|
|
|
|
def create_session(self, **kwargs):
|
|
return AgentSession()
|
|
|
|
# Create executor and register agent
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
agent = StreamingAgent()
|
|
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, agent)
|
|
|
|
# Execute streaming agent (use metadata.entity_id for routing)
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="hello",
|
|
stream=True, # DevUI always streams
|
|
)
|
|
|
|
events = []
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# Should get events from streaming agent
|
|
assert len(events) > 0
|
|
text_events = [e for e in events if hasattr(e, "type") and e.type == "response.output_text.delta"]
|
|
assert len(text_events) > 0
|
|
assert "Processed: hello" in text_events[0].delta
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Tests for SequentialBuilder
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_sequential_workflow(sequential_workflow):
|
|
"""Test SequentialBuilder workflow full pipeline with JSON serialization.
|
|
|
|
Uses the shared sequential_workflow fixture (Writer → Reviewer) from conftest.
|
|
Tests that all events can be JSON serialized for SSE streaming.
|
|
"""
|
|
executor, entity_id, mock_client, _workflow = sequential_workflow
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Write about testing best practices",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
|
|
# Verify JSON serialization (exactly what server does for SSE)
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events from sequential workflow"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
assert mock_client.call_count >= 2, f"Expected both agents called, got {mock_client.call_count}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_concurrent_workflow(concurrent_workflow):
|
|
"""Test ConcurrentBuilder workflow full pipeline with JSON serialization.
|
|
|
|
Uses the shared concurrent_workflow fixture (Researcher | Analyst | Summarizer) from conftest.
|
|
Tests fan-out/fan-in pattern with parallel agent execution.
|
|
"""
|
|
executor, entity_id, mock_client, _workflow = concurrent_workflow
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Analyze market trends for Q4",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
|
|
# Verify JSON serialization
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events from concurrent workflow"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
assert mock_client.call_count >= 3, f"Expected all 3 agents called, got {mock_client.call_count}"
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Test for Workflow with Output Events
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_workflow_output_event_serialization():
|
|
"""Test that output event (type='output') from ctx.yield_output() serializes correctly.
|
|
|
|
This tests the pattern where executors yield output via ctx.yield_output(),
|
|
which emits output event (type='output') that DevUI must serialize for SSE.
|
|
"""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class OutputtingExecutor(Executor):
|
|
"""Executor that yields multiple outputs."""
|
|
|
|
@handler
|
|
async def process(self, input_text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"First output: {input_text}")
|
|
await ctx.yield_output("Second output: processed")
|
|
await ctx.yield_output({"final": "result", "data": [1, 2, 3]})
|
|
|
|
# Build workflow
|
|
workflow = WorkflowBuilder(
|
|
name="Output Workflow",
|
|
description="Tests yield_output",
|
|
start_executor=OutputtingExecutor(id="outputter"),
|
|
).build()
|
|
|
|
# Create DevUI executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute with streaming
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="Test output events",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
output_events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "")
|
|
|
|
# Track output item events
|
|
if "output_item" in event_type:
|
|
output_events.append(event)
|
|
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
event.model_dump_json()
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
|
|
# Should have received output events for the yield_output calls
|
|
assert len(output_events) >= 3, f"Expected 3+ output events for yield_output calls, got {len(output_events)}"
|
|
|
|
|
|
async def test_workflow_error_yields_dict_event_without_crash():
|
|
"""Test that workflow errors don't crash execute_entity (#3983).
|
|
|
|
When a workflow raises an exception, _execute_workflow yields a raw dict
|
|
{"type": "error", ...}. The execute_entity caller must handle both dict
|
|
events and object events without crashing on attribute access.
|
|
"""
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
from agent_framework_devui.models._discovery_models import EntityInfo
|
|
|
|
discovery = MagicMock(spec=EntityDiscovery)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = EntityInfo(id="bad_wf", name="bad_wf", type="workflow", framework="agent_framework")
|
|
discovery.get_entity_info.return_value = entity_info
|
|
|
|
# Mock workflow whose run() raises
|
|
mock_workflow = MagicMock()
|
|
mock_workflow.name = "bad_wf"
|
|
|
|
def failing_run(*args, **kwargs):
|
|
raise RuntimeError("Sorry, something went wrong.")
|
|
|
|
mock_workflow.run = failing_run
|
|
discovery.load_entity = AsyncMock(return_value=mock_workflow)
|
|
|
|
request = AgentFrameworkRequest(
|
|
model="test",
|
|
input="hello",
|
|
metadata={"entity_id": "bad_wf"},
|
|
)
|
|
|
|
events = []
|
|
# This should NOT raise AttributeError: 'dict' object has no attribute 'type'
|
|
async for event in executor.execute_entity("bad_wf", request):
|
|
events.append(event)
|
|
|
|
# Should get at least one error event
|
|
assert len(events) > 0
|
|
error_events = [e for e in events if isinstance(e, dict) and e.get("type") == "error"]
|
|
assert len(error_events) > 0, f"Expected error dict events, got: {events}"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Simple test runner
|
|
async def run_tests():
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
|
|
# Create test agent
|
|
agent_file = temp_path / "streaming_agent.py"
|
|
agent_file.write_text("""
|
|
class StreamingAgent:
|
|
name = "Streaming Test Agent"
|
|
description = "Test agent for streaming"
|
|
|
|
async def run(self, input_str, *, stream: bool = False, session=None, **kwargs):
|
|
if stream:
|
|
async def _stream():
|
|
for i, word in enumerate(f"Processing {input_str}".split()):
|
|
yield f"word_{i}: {word} "
|
|
return _stream()
|
|
return f"Processing {input_str}"
|
|
""")
|
|
|
|
discovery = EntityDiscovery(str(temp_path))
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
# Test discovery
|
|
entities = await executor.discover_entities()
|
|
|
|
if entities:
|
|
# Test sync execution (use metadata.entity_id for routing)
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entities[0].id},
|
|
input="test input",
|
|
stream=False,
|
|
)
|
|
|
|
await executor.execute_sync(request)
|
|
|
|
# Test streaming execution
|
|
request.stream = True
|
|
event_count = 0
|
|
async for _event in executor.execute_streaming(request):
|
|
event_count += 1
|
|
if event_count > 5: # Limit for testing
|
|
break
|
|
|
|
asyncio.run(run_tests())
|