mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
b1b528e4a8
* [BREAKING] Remove deprecated kwargs compatibility paths Remove the deprecated kwargs compatibility shims across core agents, clients, tools, middleware, and telemetry. Keep workflow kwargs behavior intact in this branch and follow up separately in #4850. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix PR CI fallout for kwargs removal Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address PR review feedback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * updates * Fix Azure AI CI fallout Remove the stale _get_current_conversation_id override from the Azure AI client after the OpenAI base helper was deleted. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fixed new classes * Fix Assistants deprecated import gating Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix integration replay regressions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Switch multi-agent hosting samples to Azure chat completions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Simplify Azure multi-agent sample config Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
837 lines
30 KiB
Python
837 lines
30 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Focused tests for execution flow functionality.
|
|
|
|
Tests include:
|
|
- Entity discovery and info retrieval
|
|
- Agent execution (sync and streaming) using real Agent with mock LLM
|
|
- Workflow execution using real WorkflowBuilder with FunctionExecutor
|
|
- Edge cases like non-streaming agents
|
|
"""
|
|
|
|
import asyncio
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from agent_framework import Agent, AgentExecutor, FunctionExecutor, WorkflowBuilder
|
|
|
|
# Import mock classes from conftest for direct use in some tests
|
|
from conftest import MockBaseChatClient
|
|
|
|
from agent_framework_devui._discovery import EntityDiscovery
|
|
from agent_framework_devui._executor import AgentFrameworkExecutor, EntityNotFoundError
|
|
from agent_framework_devui._mapper import MessageMapper
|
|
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
|
|
|
|
# =============================================================================
|
|
# Local Fixtures (module-specific)
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.fixture
|
|
async def executor(test_entities_dir):
|
|
"""Create configured executor."""
|
|
discovery = EntityDiscovery(test_entities_dir)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
# Discover entities
|
|
await executor.discover_entities()
|
|
|
|
return executor
|
|
|
|
|
|
async def test_executor_entity_discovery(executor):
|
|
"""Test executor entity discovery."""
|
|
entities = await executor.discover_entities()
|
|
|
|
# Should find entities from samples directory
|
|
assert len(entities) > 0, "Should discover at least one entity"
|
|
|
|
entity_types = [e.type for e in entities]
|
|
assert "agent" in entity_types, "Should find at least one agent"
|
|
assert "workflow" in entity_types, "Should find at least one workflow"
|
|
|
|
# Test entity structure
|
|
for entity in entities:
|
|
assert entity.id, "Entity should have an ID"
|
|
assert entity.name, "Entity should have a name"
|
|
# Entities with only an `__init__.py` file cannot have their type determined
|
|
# until the module is imported during lazy loading. This is why 'unknown' type exists.
|
|
assert entity.type in ["agent", "workflow", "unknown"], (
|
|
"Entity should have valid type (unknown allowed during discovery phase)"
|
|
)
|
|
|
|
|
|
async def test_executor_get_entity_info(executor):
|
|
"""Test getting entity info by ID."""
|
|
entities = await executor.discover_entities()
|
|
entity_id = entities[0].id
|
|
|
|
entity_info = executor.get_entity_info(entity_id)
|
|
assert entity_info is not None
|
|
assert entity_info.id == entity_id
|
|
assert entity_info.type in ["agent", "workflow", "unknown"]
|
|
|
|
|
|
# =============================================================================
|
|
# Agent Execution Tests (using real Agent with mock LLM)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_agent_sync_execution(executor_with_real_agent):
|
|
"""Test synchronous agent execution with REAL Agent (mock LLM).
|
|
|
|
This tests the full execution pipeline without needing an API key:
|
|
- Real Agent class with middleware
|
|
- Real message normalization
|
|
- Mock chat client for LLM calls
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="test data",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Response model should be 'devui' when not specified
|
|
assert response.model == "devui"
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
# Verify mock client was called
|
|
assert mock_client.call_count == 1
|
|
|
|
|
|
async def test_agent_sync_execution_respects_model_field(executor_with_real_agent):
|
|
"""Test synchronous execution respects the model field in the response."""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
model="custom-model-name",
|
|
input="test data",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Response model should reflect the specified model
|
|
assert response.model == "custom-model-name"
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
|
|
async def test_chat_client_receives_correct_messages(executor_with_real_agent):
|
|
"""Verify the mock chat client receives properly formatted messages.
|
|
|
|
This tests that the REAL Agent properly:
|
|
- Normalizes input messages
|
|
- Formats messages for the chat client
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="What is 2+2?",
|
|
stream=False,
|
|
)
|
|
|
|
await executor.execute_sync(request)
|
|
|
|
# Verify chat client was called
|
|
assert mock_client.call_count == 1
|
|
|
|
# Verify messages were received
|
|
assert len(mock_client.received_messages) == 1
|
|
messages = mock_client.received_messages[0]
|
|
|
|
# Should have at least one message
|
|
assert len(messages) >= 1, f"Expected messages, got: {messages}"
|
|
|
|
# Verify the input text is present in the messages
|
|
all_text = " ".join(m.text or "" for m in messages)
|
|
assert "2+2" in all_text, f"Expected '2+2' in messages, got text: '{all_text}'"
|
|
|
|
|
|
# =============================================================================
|
|
# Workflow Execution Tests (using real WorkflowBuilder with FunctionExecutor)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_workflow_streaming_execution():
|
|
"""Test workflow streaming execution with REAL WorkflowBuilder and FunctionExecutor.
|
|
|
|
This tests the full workflow execution pipeline without needing an API key.
|
|
Uses a simple function-based workflow that processes input.
|
|
"""
|
|
|
|
# Create a simple workflow using real agent_framework classes
|
|
def process_input(input_data: str) -> str:
|
|
return f"Processed: {input_data}"
|
|
|
|
start_executor = FunctionExecutor(id="process", func=process_input)
|
|
workflow = WorkflowBuilder(
|
|
name="Test Workflow",
|
|
description="Test workflow for execution",
|
|
start_executor=start_executor,
|
|
).build()
|
|
|
|
# Create executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute workflow
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="hello workflow",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# Should get events from workflow execution
|
|
assert len(events) > 0, "Should receive events from workflow"
|
|
|
|
# Check for workflow-specific events or completion
|
|
event_types = [getattr(e, "type", None) for e in events]
|
|
assert any(t is not None for t in event_types), f"Should have typed events, got: {event_types}"
|
|
|
|
|
|
async def test_workflow_sync_execution():
|
|
"""Test synchronous workflow execution."""
|
|
|
|
def echo(text: str) -> str:
|
|
return f"Echo: {text}"
|
|
|
|
start_executor = FunctionExecutor(id="echo", func=echo)
|
|
workflow = WorkflowBuilder(
|
|
name="Echo Workflow",
|
|
description="Simple echo workflow",
|
|
start_executor=start_executor,
|
|
).build()
|
|
|
|
# Create executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute workflow synchronously
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="test input",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Should get a valid response
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Serialization Tests (Run + Map + JSON)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_full_pipeline_agent_events_are_json_serializable(executor_with_real_agent):
|
|
"""CRITICAL TEST: Verify ALL events from agent execution can be JSON serialized.
|
|
|
|
This tests the exact code path that the server uses:
|
|
1. Execute agent via executor.execute_streaming()
|
|
2. Each event is converted by the mapper
|
|
3. Server calls model_dump_json() on each event for SSE
|
|
|
|
If any event contains non-serializable objects (like AgentResponse),
|
|
this test will fail - catching the bug before it hits production.
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Test message for serialization",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# This is EXACTLY what the server does before sending SSE
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
assert len(json_str) > 0
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={getattr(event, 'type', 'unknown')}: {e}")
|
|
|
|
# Should have received events
|
|
assert len(events) > 0, "Should receive events from agent execution"
|
|
|
|
# NO serialization errors allowed
|
|
assert len(serialization_errors) == 0, f"Found {len(serialization_errors)} serialization errors:\n" + "\n".join(
|
|
serialization_errors
|
|
)
|
|
|
|
|
|
async def test_full_pipeline_workflow_events_are_json_serializable():
|
|
"""CRITICAL TEST: Verify ALL events from workflow execution can be JSON serialized.
|
|
|
|
This is particularly important for workflows with AgentExecutor because:
|
|
- AgentExecutor produces executor_completed event (type='executor_completed') with AgentExecutorResponse
|
|
- AgentExecutorResponse contains AgentResponse and Message objects
|
|
- These are SerializationMixin objects, not Pydantic, which caused the original bug
|
|
|
|
This test ensures the ENTIRE streaming pipeline works end-to-end.
|
|
"""
|
|
# Create a workflow with AgentExecutor (the problematic case)
|
|
mock_client = MockBaseChatClient()
|
|
agent = Agent(
|
|
id="serialization_test_agent",
|
|
name="Serialization Test Agent",
|
|
description="Agent for testing serialization",
|
|
client=mock_client,
|
|
instructions="You are a test assistant.",
|
|
)
|
|
|
|
agent_executor = AgentExecutor(id="agent_node", agent=agent)
|
|
workflow = WorkflowBuilder(
|
|
name="Serialization Test Workflow",
|
|
description="Test workflow",
|
|
start_executor=agent_executor,
|
|
).build()
|
|
|
|
# Create executor and register
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="Test workflow serialization",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
event_types_seen = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
event_types_seen.append(event_type)
|
|
|
|
# This is EXACTLY what the server does before sending SSE
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
assert len(json_str) > 0
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
# Should have received events
|
|
assert len(events) > 0, "Should receive events from workflow execution"
|
|
|
|
# Verify we got workflow events (not just generic ones)
|
|
assert any("output_item" in str(t) for t in event_types_seen), (
|
|
f"Should see output_item events, got: {event_types_seen}"
|
|
)
|
|
|
|
# NO serialization errors allowed - this is the critical assertion
|
|
assert len(serialization_errors) == 0, (
|
|
f"Found {len(serialization_errors)} serialization errors:\n"
|
|
+ "\n".join(serialization_errors)
|
|
+ f"\n\nEvent types seen: {event_types_seen}"
|
|
)
|
|
|
|
# Also verify aggregate_to_response works (server calls this after streaming)
|
|
final_response = await mapper.aggregate_to_response(events, request)
|
|
assert final_response is not None
|
|
|
|
|
|
async def test_get_entity_info_raises_for_invalid_id(executor):
|
|
"""Test that get_entity_info raises EntityNotFoundError for invalid ID."""
|
|
with pytest.raises(EntityNotFoundError):
|
|
executor.get_entity_info("nonexistent_agent")
|
|
|
|
|
|
async def test_request_extracts_entity_id_from_metadata(executor):
|
|
"""Test that AgentFrameworkRequest extracts entity_id from metadata."""
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": "my_agent"},
|
|
input="test",
|
|
stream=False,
|
|
)
|
|
|
|
# entity_id is extracted from metadata
|
|
entity_id = request.get_entity_id()
|
|
assert entity_id == "my_agent"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_get_start_executor_message_types(sequential_workflow):
|
|
"""Test _get_start_executor_message_types with real workflow."""
|
|
executor, _entity_id, _mock_client, workflow = sequential_workflow
|
|
|
|
start_exec, message_types = executor._get_start_executor_message_types(workflow)
|
|
|
|
assert start_exec is not None
|
|
assert len(message_types) > 0
|
|
# Real sequential workflows accept str input
|
|
assert str in message_types
|
|
|
|
|
|
def test_executor_select_primary_input_prefers_string():
|
|
"""Select string input even when discovered after other handlers."""
|
|
from agent_framework_devui._utils import select_primary_input_type
|
|
|
|
placeholder_type = type("Placeholder", (), {})
|
|
|
|
chosen = select_primary_input_type([placeholder_type, str])
|
|
|
|
assert chosen is str
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_structured_extracts_input_for_string_workflow():
|
|
"""Structured payloads extract 'input' field when workflow expects str."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class StringInputExecutor(Executor):
|
|
"""Executor that accepts string input directly."""
|
|
|
|
@handler
|
|
async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {text}")
|
|
|
|
workflow = WorkflowBuilder(
|
|
name="String Workflow",
|
|
description="Accepts string",
|
|
start_executor=StringInputExecutor(id="str_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# When workflow expects str and receives {"input": "hello"}, extract "hello"
|
|
parsed = executor._parse_structured_workflow_input(workflow, {"input": "hello"})
|
|
assert parsed == "hello"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_raw_string_for_string_workflow():
|
|
"""Raw string inputs pass through for string-accepting workflows."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class StringInputExecutor(Executor):
|
|
"""Executor that accepts string input directly."""
|
|
|
|
@handler
|
|
async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {text}")
|
|
|
|
workflow = WorkflowBuilder(
|
|
name="String Workflow",
|
|
description="Accepts string",
|
|
start_executor=StringInputExecutor(id="str_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Raw string should pass through unchanged
|
|
parsed = executor._parse_raw_workflow_input(workflow, "hi there")
|
|
assert parsed == "hi there"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_converts_to_chat_message_for_sequential_workflow(sequential_workflow):
|
|
"""Sequential workflows convert string input to Message."""
|
|
from agent_framework import Message
|
|
|
|
executor, _entity_id, _mock_client, workflow = sequential_workflow
|
|
|
|
# Sequential workflows expect Message, so raw string becomes Message
|
|
parsed = executor._parse_raw_workflow_input(workflow, "hello")
|
|
|
|
assert isinstance(parsed, Message)
|
|
assert parsed.text == "hello"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_stringified_json_workflow_input():
|
|
"""Stringified JSON workflow input is parsed when workflow expects Pydantic model."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
from pydantic import BaseModel
|
|
|
|
class WorkflowInput(BaseModel):
|
|
input: str
|
|
metadata: dict | None = None
|
|
|
|
class PydanticInputExecutor(Executor):
|
|
"""Executor that accepts a Pydantic model input."""
|
|
|
|
@handler
|
|
async def process(self, data: WorkflowInput, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {data.input}")
|
|
|
|
# Build workflow with Pydantic input type
|
|
workflow = WorkflowBuilder(
|
|
name="Pydantic Workflow",
|
|
description="Accepts Pydantic input",
|
|
start_executor=PydanticInputExecutor(id="pydantic_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Simulate frontend sending JSON.stringify({"input": "testing!", "metadata": {"key": "value"}})
|
|
stringified_json = '{"input": "testing!", "metadata": {"key": "value"}}'
|
|
|
|
parsed = executor._parse_raw_workflow_input(workflow, stringified_json)
|
|
|
|
# Should parse into WorkflowInput object
|
|
assert isinstance(parsed, WorkflowInput)
|
|
assert parsed.input == "testing!"
|
|
assert parsed.metadata == {"key": "value"}
|
|
|
|
|
|
def test_extract_workflow_hil_responses_handles_stringified_json():
|
|
"""Test HIL response extraction handles both stringified and parsed JSON (regression test)."""
|
|
from agent_framework_devui._discovery import EntityDiscovery
|
|
from agent_framework_devui._executor import AgentFrameworkExecutor
|
|
from agent_framework_devui._mapper import MessageMapper
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Regression test: Frontend sends stringified JSON via streamWorkflowExecutionOpenAI
|
|
stringified = '[{"type":"message","content":[{"type":"workflow_hil_response","responses":{"req_1":"spam"}}]}]'
|
|
assert executor._extract_workflow_hil_responses(stringified) == {"req_1": "spam"}
|
|
|
|
# Ensure parsed format still works
|
|
parsed = [{"type": "message", "content": [{"type": "workflow_hil_response", "responses": {"req_2": "ham"}}]}]
|
|
assert executor._extract_workflow_hil_responses(parsed) == {"req_2": "ham"}
|
|
|
|
# Non-HIL inputs should return None
|
|
assert executor._extract_workflow_hil_responses("plain text") is None
|
|
assert executor._extract_workflow_hil_responses({"email": "test"}) is None
|
|
|
|
|
|
async def test_executor_handles_streaming_agent():
|
|
"""Test executor handles agents with run(stream=True) method."""
|
|
from agent_framework import AgentResponse, AgentResponseUpdate, AgentSession, Content, Message
|
|
|
|
class StreamingAgent:
|
|
"""Agent with run() method supporting stream parameter."""
|
|
|
|
id = "streaming_test"
|
|
name = "Streaming Test Agent"
|
|
description = "Test agent with run(stream=True)"
|
|
|
|
def run(self, messages=None, *, stream=False, session=None, **kwargs):
|
|
if stream:
|
|
# Return an async generator for streaming
|
|
return self._stream_impl(messages)
|
|
# Return awaitable for non-streaming
|
|
return self._run_impl(messages)
|
|
|
|
async def _run_impl(self, messages):
|
|
return AgentResponse(
|
|
messages=[Message(role="assistant", contents=[Content.from_text(text=f"Processed: {messages}")])],
|
|
response_id="test_123",
|
|
)
|
|
|
|
async def _stream_impl(self, messages):
|
|
yield AgentResponseUpdate(
|
|
contents=[Content.from_text(text=f"Processed: {messages}")],
|
|
role="assistant",
|
|
)
|
|
|
|
def create_session(self, **kwargs):
|
|
return AgentSession()
|
|
|
|
# Create executor and register agent
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
agent = StreamingAgent()
|
|
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, agent)
|
|
|
|
# Execute streaming agent (use metadata.entity_id for routing)
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="hello",
|
|
stream=True, # DevUI always streams
|
|
)
|
|
|
|
events = []
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# Should get events from streaming agent
|
|
assert len(events) > 0
|
|
text_events = [e for e in events if hasattr(e, "type") and e.type == "response.output_text.delta"]
|
|
assert len(text_events) > 0
|
|
assert "Processed: hello" in text_events[0].delta
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Tests for SequentialBuilder
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_sequential_workflow(sequential_workflow):
|
|
"""Test SequentialBuilder workflow full pipeline with JSON serialization.
|
|
|
|
Uses the shared sequential_workflow fixture (Writer → Reviewer) from conftest.
|
|
Tests that all events can be JSON serialized for SSE streaming.
|
|
"""
|
|
executor, entity_id, mock_client, _workflow = sequential_workflow
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Write about testing best practices",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
|
|
# Verify JSON serialization (exactly what server does for SSE)
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events from sequential workflow"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
assert mock_client.call_count >= 2, f"Expected both agents called, got {mock_client.call_count}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_concurrent_workflow(concurrent_workflow):
|
|
"""Test ConcurrentBuilder workflow full pipeline with JSON serialization.
|
|
|
|
Uses the shared concurrent_workflow fixture (Researcher | Analyst | Summarizer) from conftest.
|
|
Tests fan-out/fan-in pattern with parallel agent execution.
|
|
"""
|
|
executor, entity_id, mock_client, _workflow = concurrent_workflow
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Analyze market trends for Q4",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
|
|
# Verify JSON serialization
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events from concurrent workflow"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
assert mock_client.call_count >= 3, f"Expected all 3 agents called, got {mock_client.call_count}"
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Test for Workflow with Output Events
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_workflow_output_event_serialization():
|
|
"""Test that output event (type='output') from ctx.yield_output() serializes correctly.
|
|
|
|
This tests the pattern where executors yield output via ctx.yield_output(),
|
|
which emits output event (type='output') that DevUI must serialize for SSE.
|
|
"""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class OutputtingExecutor(Executor):
|
|
"""Executor that yields multiple outputs."""
|
|
|
|
@handler
|
|
async def process(self, input_text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"First output: {input_text}")
|
|
await ctx.yield_output("Second output: processed")
|
|
await ctx.yield_output({"final": "result", "data": [1, 2, 3]})
|
|
|
|
# Build workflow
|
|
workflow = WorkflowBuilder(
|
|
name="Output Workflow",
|
|
description="Tests yield_output",
|
|
start_executor=OutputtingExecutor(id="outputter"),
|
|
).build()
|
|
|
|
# Create DevUI executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute with streaming
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="Test output events",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
output_events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "")
|
|
|
|
# Track output item events
|
|
if "output_item" in event_type:
|
|
output_events.append(event)
|
|
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
event.model_dump_json()
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
|
|
# Should have received output events for the yield_output calls
|
|
assert len(output_events) >= 3, f"Expected 3+ output events for yield_output calls, got {len(output_events)}"
|
|
|
|
|
|
async def test_workflow_error_yields_dict_event_without_crash():
|
|
"""Test that workflow errors don't crash execute_entity (#3983).
|
|
|
|
When a workflow raises an exception, _execute_workflow yields a raw dict
|
|
{"type": "error", ...}. The execute_entity caller must handle both dict
|
|
events and object events without crashing on attribute access.
|
|
"""
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
from agent_framework_devui.models._discovery_models import EntityInfo
|
|
|
|
discovery = MagicMock(spec=EntityDiscovery)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = EntityInfo(id="bad_wf", name="bad_wf", type="workflow", framework="agent_framework")
|
|
discovery.get_entity_info.return_value = entity_info
|
|
|
|
# Mock workflow whose run() raises
|
|
mock_workflow = MagicMock()
|
|
mock_workflow.name = "bad_wf"
|
|
|
|
def failing_run(*args, **kwargs):
|
|
raise RuntimeError("Sorry, something went wrong.")
|
|
|
|
mock_workflow.run = failing_run
|
|
discovery.load_entity = AsyncMock(return_value=mock_workflow)
|
|
|
|
request = AgentFrameworkRequest(
|
|
model="test",
|
|
input="hello",
|
|
metadata={"entity_id": "bad_wf"},
|
|
)
|
|
|
|
events = []
|
|
# This should NOT raise AttributeError: 'dict' object has no attribute 'type'
|
|
async for event in executor.execute_entity("bad_wf", request):
|
|
events.append(event)
|
|
|
|
# Should get at least one error event
|
|
assert len(events) > 0
|
|
error_events = [e for e in events if isinstance(e, dict) and e.get("type") == "error"]
|
|
assert len(error_events) > 0, f"Expected error dict events, got: {events}"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Simple test runner
|
|
async def run_tests():
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
|
|
# Create test agent
|
|
agent_file = temp_path / "streaming_agent.py"
|
|
agent_file.write_text("""
|
|
class StreamingAgent:
|
|
name = "Streaming Test Agent"
|
|
description = "Test agent for streaming"
|
|
|
|
async def run(self, input_str, *, stream: bool = False, session=None, **kwargs):
|
|
if stream:
|
|
async def _stream():
|
|
for i, word in enumerate(f"Processing {input_str}".split()):
|
|
yield f"word_{i}: {word} "
|
|
return _stream()
|
|
return f"Processing {input_str}"
|
|
""")
|
|
|
|
discovery = EntityDiscovery(str(temp_path))
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
# Test discovery
|
|
entities = await executor.discover_entities()
|
|
|
|
if entities:
|
|
# Test sync execution (use metadata.entity_id for routing)
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entities[0].id},
|
|
input="test input",
|
|
stream=False,
|
|
)
|
|
|
|
await executor.execute_sync(request)
|
|
|
|
# Test streaming execution
|
|
request.stream = True
|
|
event_count = 0
|
|
async for _event in executor.execute_streaming(request):
|
|
event_count += 1
|
|
if event_count > 5: # Limit for testing
|
|
break
|
|
|
|
asyncio.run(run_tests())
|