Files
agent-framework/python/packages/devui/tests/devui/test_execution.py
T
Eduard van Valkenburg b1b528e4a8 Python: [BREAKING] Remove deprecated kwargs compatibility paths (#4858)
* [BREAKING] Remove deprecated kwargs compatibility paths

Remove the deprecated kwargs compatibility shims across core agents, clients, tools, middleware, and telemetry.

Keep workflow kwargs behavior intact in this branch and follow up separately in #4850.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix PR CI fallout for kwargs removal

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR review feedback

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* updates

* Fix Azure AI CI fallout

Remove the stale _get_current_conversation_id override from the Azure AI client after the OpenAI base helper was deleted.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fixed new classes

* Fix Assistants deprecated import gating

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix integration replay regressions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Switch multi-agent hosting samples to Azure chat completions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Simplify Azure multi-agent sample config

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-27 21:00:12 +00:00

837 lines
30 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Focused tests for execution flow functionality.
Tests include:
- Entity discovery and info retrieval
- Agent execution (sync and streaming) using real Agent with mock LLM
- Workflow execution using real WorkflowBuilder with FunctionExecutor
- Edge cases like non-streaming agents
"""
import asyncio
import tempfile
from pathlib import Path
from typing import Any
import pytest
from agent_framework import Agent, AgentExecutor, FunctionExecutor, WorkflowBuilder
# Import mock classes from conftest for direct use in some tests
from conftest import MockBaseChatClient
from agent_framework_devui._discovery import EntityDiscovery
from agent_framework_devui._executor import AgentFrameworkExecutor, EntityNotFoundError
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
# =============================================================================
# Local Fixtures (module-specific)
# =============================================================================
@pytest.fixture
async def executor(test_entities_dir):
"""Create configured executor."""
discovery = EntityDiscovery(test_entities_dir)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Discover entities
await executor.discover_entities()
return executor
async def test_executor_entity_discovery(executor):
"""Test executor entity discovery."""
entities = await executor.discover_entities()
# Should find entities from samples directory
assert len(entities) > 0, "Should discover at least one entity"
entity_types = [e.type for e in entities]
assert "agent" in entity_types, "Should find at least one agent"
assert "workflow" in entity_types, "Should find at least one workflow"
# Test entity structure
for entity in entities:
assert entity.id, "Entity should have an ID"
assert entity.name, "Entity should have a name"
# Entities with only an `__init__.py` file cannot have their type determined
# until the module is imported during lazy loading. This is why 'unknown' type exists.
assert entity.type in ["agent", "workflow", "unknown"], (
"Entity should have valid type (unknown allowed during discovery phase)"
)
async def test_executor_get_entity_info(executor):
"""Test getting entity info by ID."""
entities = await executor.discover_entities()
entity_id = entities[0].id
entity_info = executor.get_entity_info(entity_id)
assert entity_info is not None
assert entity_info.id == entity_id
assert entity_info.type in ["agent", "workflow", "unknown"]
# =============================================================================
# Agent Execution Tests (using real Agent with mock LLM)
# =============================================================================
async def test_agent_sync_execution(executor_with_real_agent):
"""Test synchronous agent execution with REAL Agent (mock LLM).
This tests the full execution pipeline without needing an API key:
- Real Agent class with middleware
- Real message normalization
- Mock chat client for LLM calls
"""
executor, entity_id, mock_client = executor_with_real_agent
request = AgentFrameworkRequest(
metadata={"entity_id": entity_id},
input="test data",
stream=False,
)
response = await executor.execute_sync(request)
# Response model should be 'devui' when not specified
assert response.model == "devui"
assert response.object == "response"
assert len(response.output) > 0
# Verify mock client was called
assert mock_client.call_count == 1
async def test_agent_sync_execution_respects_model_field(executor_with_real_agent):
"""Test synchronous execution respects the model field in the response."""
executor, entity_id, mock_client = executor_with_real_agent
request = AgentFrameworkRequest(
metadata={"entity_id": entity_id},
model="custom-model-name",
input="test data",
stream=False,
)
response = await executor.execute_sync(request)
# Response model should reflect the specified model
assert response.model == "custom-model-name"
assert response.object == "response"
assert len(response.output) > 0
async def test_chat_client_receives_correct_messages(executor_with_real_agent):
"""Verify the mock chat client receives properly formatted messages.
This tests that the REAL Agent properly:
- Normalizes input messages
- Formats messages for the chat client
"""
executor, entity_id, mock_client = executor_with_real_agent
request = AgentFrameworkRequest(
metadata={"entity_id": entity_id},
input="What is 2+2?",
stream=False,
)
await executor.execute_sync(request)
# Verify chat client was called
assert mock_client.call_count == 1
# Verify messages were received
assert len(mock_client.received_messages) == 1
messages = mock_client.received_messages[0]
# Should have at least one message
assert len(messages) >= 1, f"Expected messages, got: {messages}"
# Verify the input text is present in the messages
all_text = " ".join(m.text or "" for m in messages)
assert "2+2" in all_text, f"Expected '2+2' in messages, got text: '{all_text}'"
# =============================================================================
# Workflow Execution Tests (using real WorkflowBuilder with FunctionExecutor)
# =============================================================================
async def test_workflow_streaming_execution():
"""Test workflow streaming execution with REAL WorkflowBuilder and FunctionExecutor.
This tests the full workflow execution pipeline without needing an API key.
Uses a simple function-based workflow that processes input.
"""
# Create a simple workflow using real agent_framework classes
def process_input(input_data: str) -> str:
return f"Processed: {input_data}"
start_executor = FunctionExecutor(id="process", func=process_input)
workflow = WorkflowBuilder(
name="Test Workflow",
description="Test workflow for execution",
start_executor=start_executor,
).build()
# Create executor and register workflow
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
# Execute workflow
request = AgentFrameworkRequest(
metadata={"entity_id": entity_info.id},
input="hello workflow",
stream=True,
)
events = []
async for event in executor.execute_streaming(request):
events.append(event)
# Should get events from workflow execution
assert len(events) > 0, "Should receive events from workflow"
# Check for workflow-specific events or completion
event_types = [getattr(e, "type", None) for e in events]
assert any(t is not None for t in event_types), f"Should have typed events, got: {event_types}"
async def test_workflow_sync_execution():
"""Test synchronous workflow execution."""
def echo(text: str) -> str:
return f"Echo: {text}"
start_executor = FunctionExecutor(id="echo", func=echo)
workflow = WorkflowBuilder(
name="Echo Workflow",
description="Simple echo workflow",
start_executor=start_executor,
).build()
# Create executor and register workflow
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
# Execute workflow synchronously
request = AgentFrameworkRequest(
metadata={"entity_id": entity_info.id},
input="test input",
stream=False,
)
response = await executor.execute_sync(request)
# Should get a valid response
assert response.object == "response"
assert len(response.output) > 0
# =============================================================================
# Full Pipeline Serialization Tests (Run + Map + JSON)
# =============================================================================
async def test_full_pipeline_agent_events_are_json_serializable(executor_with_real_agent):
"""CRITICAL TEST: Verify ALL events from agent execution can be JSON serialized.
This tests the exact code path that the server uses:
1. Execute agent via executor.execute_streaming()
2. Each event is converted by the mapper
3. Server calls model_dump_json() on each event for SSE
If any event contains non-serializable objects (like AgentResponse),
this test will fail - catching the bug before it hits production.
"""
executor, entity_id, mock_client = executor_with_real_agent
request = AgentFrameworkRequest(
metadata={"entity_id": entity_id},
input="Test message for serialization",
stream=True,
)
events = []
serialization_errors = []
async for event in executor.execute_streaming(request):
events.append(event)
# This is EXACTLY what the server does before sending SSE
try:
if hasattr(event, "model_dump_json"):
json_str = event.model_dump_json()
assert json_str is not None
assert len(json_str) > 0
except Exception as e:
serialization_errors.append(f"Event type={getattr(event, 'type', 'unknown')}: {e}")
# Should have received events
assert len(events) > 0, "Should receive events from agent execution"
# NO serialization errors allowed
assert len(serialization_errors) == 0, f"Found {len(serialization_errors)} serialization errors:\n" + "\n".join(
serialization_errors
)
async def test_full_pipeline_workflow_events_are_json_serializable():
"""CRITICAL TEST: Verify ALL events from workflow execution can be JSON serialized.
This is particularly important for workflows with AgentExecutor because:
- AgentExecutor produces executor_completed event (type='executor_completed') with AgentExecutorResponse
- AgentExecutorResponse contains AgentResponse and Message objects
- These are SerializationMixin objects, not Pydantic, which caused the original bug
This test ensures the ENTIRE streaming pipeline works end-to-end.
"""
# Create a workflow with AgentExecutor (the problematic case)
mock_client = MockBaseChatClient()
agent = Agent(
id="serialization_test_agent",
name="Serialization Test Agent",
description="Agent for testing serialization",
client=mock_client,
instructions="You are a test assistant.",
)
agent_executor = AgentExecutor(id="agent_node", agent=agent)
workflow = WorkflowBuilder(
name="Serialization Test Workflow",
description="Test workflow",
start_executor=agent_executor,
).build()
# Create executor and register
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
request = AgentFrameworkRequest(
metadata={"entity_id": entity_info.id},
input="Test workflow serialization",
stream=True,
)
events = []
serialization_errors = []
event_types_seen = []
async for event in executor.execute_streaming(request):
events.append(event)
event_type = getattr(event, "type", "unknown")
event_types_seen.append(event_type)
# This is EXACTLY what the server does before sending SSE
try:
if hasattr(event, "model_dump_json"):
json_str = event.model_dump_json()
assert json_str is not None
assert len(json_str) > 0
except Exception as e:
serialization_errors.append(f"Event type={event_type}: {e}")
# Should have received events
assert len(events) > 0, "Should receive events from workflow execution"
# Verify we got workflow events (not just generic ones)
assert any("output_item" in str(t) for t in event_types_seen), (
f"Should see output_item events, got: {event_types_seen}"
)
# NO serialization errors allowed - this is the critical assertion
assert len(serialization_errors) == 0, (
f"Found {len(serialization_errors)} serialization errors:\n"
+ "\n".join(serialization_errors)
+ f"\n\nEvent types seen: {event_types_seen}"
)
# Also verify aggregate_to_response works (server calls this after streaming)
final_response = await mapper.aggregate_to_response(events, request)
assert final_response is not None
async def test_get_entity_info_raises_for_invalid_id(executor):
"""Test that get_entity_info raises EntityNotFoundError for invalid ID."""
with pytest.raises(EntityNotFoundError):
executor.get_entity_info("nonexistent_agent")
async def test_request_extracts_entity_id_from_metadata(executor):
"""Test that AgentFrameworkRequest extracts entity_id from metadata."""
request = AgentFrameworkRequest(
metadata={"entity_id": "my_agent"},
input="test",
stream=False,
)
# entity_id is extracted from metadata
entity_id = request.get_entity_id()
assert entity_id == "my_agent"
@pytest.mark.asyncio
async def test_executor_get_start_executor_message_types(sequential_workflow):
"""Test _get_start_executor_message_types with real workflow."""
executor, _entity_id, _mock_client, workflow = sequential_workflow
start_exec, message_types = executor._get_start_executor_message_types(workflow)
assert start_exec is not None
assert len(message_types) > 0
# Real sequential workflows accept str input
assert str in message_types
def test_executor_select_primary_input_prefers_string():
"""Select string input even when discovered after other handlers."""
from agent_framework_devui._utils import select_primary_input_type
placeholder_type = type("Placeholder", (), {})
chosen = select_primary_input_type([placeholder_type, str])
assert chosen is str
@pytest.mark.asyncio
async def test_executor_parse_structured_extracts_input_for_string_workflow():
"""Structured payloads extract 'input' field when workflow expects str."""
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
class StringInputExecutor(Executor):
"""Executor that accepts string input directly."""
@handler
async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None:
await ctx.yield_output(f"Got: {text}")
workflow = WorkflowBuilder(
name="String Workflow",
description="Accepts string",
start_executor=StringInputExecutor(id="str_exec"),
).build()
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
# When workflow expects str and receives {"input": "hello"}, extract "hello"
parsed = executor._parse_structured_workflow_input(workflow, {"input": "hello"})
assert parsed == "hello"
@pytest.mark.asyncio
async def test_executor_parse_raw_string_for_string_workflow():
"""Raw string inputs pass through for string-accepting workflows."""
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
class StringInputExecutor(Executor):
"""Executor that accepts string input directly."""
@handler
async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None:
await ctx.yield_output(f"Got: {text}")
workflow = WorkflowBuilder(
name="String Workflow",
description="Accepts string",
start_executor=StringInputExecutor(id="str_exec"),
).build()
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
# Raw string should pass through unchanged
parsed = executor._parse_raw_workflow_input(workflow, "hi there")
assert parsed == "hi there"
@pytest.mark.asyncio
async def test_executor_parse_converts_to_chat_message_for_sequential_workflow(sequential_workflow):
"""Sequential workflows convert string input to Message."""
from agent_framework import Message
executor, _entity_id, _mock_client, workflow = sequential_workflow
# Sequential workflows expect Message, so raw string becomes Message
parsed = executor._parse_raw_workflow_input(workflow, "hello")
assert isinstance(parsed, Message)
assert parsed.text == "hello"
@pytest.mark.asyncio
async def test_executor_parse_stringified_json_workflow_input():
"""Stringified JSON workflow input is parsed when workflow expects Pydantic model."""
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
from pydantic import BaseModel
class WorkflowInput(BaseModel):
input: str
metadata: dict | None = None
class PydanticInputExecutor(Executor):
"""Executor that accepts a Pydantic model input."""
@handler
async def process(self, data: WorkflowInput, ctx: WorkflowContext[Any, Any]) -> None:
await ctx.yield_output(f"Got: {data.input}")
# Build workflow with Pydantic input type
workflow = WorkflowBuilder(
name="Pydantic Workflow",
description="Accepts Pydantic input",
start_executor=PydanticInputExecutor(id="pydantic_exec"),
).build()
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
# Simulate frontend sending JSON.stringify({"input": "testing!", "metadata": {"key": "value"}})
stringified_json = '{"input": "testing!", "metadata": {"key": "value"}}'
parsed = executor._parse_raw_workflow_input(workflow, stringified_json)
# Should parse into WorkflowInput object
assert isinstance(parsed, WorkflowInput)
assert parsed.input == "testing!"
assert parsed.metadata == {"key": "value"}
def test_extract_workflow_hil_responses_handles_stringified_json():
"""Test HIL response extraction handles both stringified and parsed JSON (regression test)."""
from agent_framework_devui._discovery import EntityDiscovery
from agent_framework_devui._executor import AgentFrameworkExecutor
from agent_framework_devui._mapper import MessageMapper
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
# Regression test: Frontend sends stringified JSON via streamWorkflowExecutionOpenAI
stringified = '[{"type":"message","content":[{"type":"workflow_hil_response","responses":{"req_1":"spam"}}]}]'
assert executor._extract_workflow_hil_responses(stringified) == {"req_1": "spam"}
# Ensure parsed format still works
parsed = [{"type": "message", "content": [{"type": "workflow_hil_response", "responses": {"req_2": "ham"}}]}]
assert executor._extract_workflow_hil_responses(parsed) == {"req_2": "ham"}
# Non-HIL inputs should return None
assert executor._extract_workflow_hil_responses("plain text") is None
assert executor._extract_workflow_hil_responses({"email": "test"}) is None
async def test_executor_handles_streaming_agent():
"""Test executor handles agents with run(stream=True) method."""
from agent_framework import AgentResponse, AgentResponseUpdate, AgentSession, Content, Message
class StreamingAgent:
"""Agent with run() method supporting stream parameter."""
id = "streaming_test"
name = "Streaming Test Agent"
description = "Test agent with run(stream=True)"
def run(self, messages=None, *, stream=False, session=None, **kwargs):
if stream:
# Return an async generator for streaming
return self._stream_impl(messages)
# Return awaitable for non-streaming
return self._run_impl(messages)
async def _run_impl(self, messages):
return AgentResponse(
messages=[Message(role="assistant", contents=[Content.from_text(text=f"Processed: {messages}")])],
response_id="test_123",
)
async def _stream_impl(self, messages):
yield AgentResponseUpdate(
contents=[Content.from_text(text=f"Processed: {messages}")],
role="assistant",
)
def create_session(self, **kwargs):
return AgentSession()
# Create executor and register agent
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
agent = StreamingAgent()
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
discovery.register_entity(entity_info.id, entity_info, agent)
# Execute streaming agent (use metadata.entity_id for routing)
request = AgentFrameworkRequest(
metadata={"entity_id": entity_info.id},
input="hello",
stream=True, # DevUI always streams
)
events = []
async for event in executor.execute_streaming(request):
events.append(event)
# Should get events from streaming agent
assert len(events) > 0
text_events = [e for e in events if hasattr(e, "type") and e.type == "response.output_text.delta"]
assert len(text_events) > 0
assert "Processed: hello" in text_events[0].delta
# =============================================================================
# Full Pipeline Tests for SequentialBuilder
# =============================================================================
@pytest.mark.asyncio
async def test_full_pipeline_sequential_workflow(sequential_workflow):
"""Test SequentialBuilder workflow full pipeline with JSON serialization.
Uses the shared sequential_workflow fixture (Writer → Reviewer) from conftest.
Tests that all events can be JSON serialized for SSE streaming.
"""
executor, entity_id, mock_client, _workflow = sequential_workflow
request = AgentFrameworkRequest(
metadata={"entity_id": entity_id},
input="Write about testing best practices",
stream=True,
)
events = []
serialization_errors = []
async for event in executor.execute_streaming(request):
events.append(event)
event_type = getattr(event, "type", "unknown")
# Verify JSON serialization (exactly what server does for SSE)
try:
if hasattr(event, "model_dump_json"):
json_str = event.model_dump_json()
assert json_str is not None
except Exception as e:
serialization_errors.append(f"Event type={event_type}: {e}")
assert len(events) > 0, "Should receive events from sequential workflow"
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
assert mock_client.call_count >= 2, f"Expected both agents called, got {mock_client.call_count}"
@pytest.mark.asyncio
async def test_full_pipeline_concurrent_workflow(concurrent_workflow):
"""Test ConcurrentBuilder workflow full pipeline with JSON serialization.
Uses the shared concurrent_workflow fixture (Researcher | Analyst | Summarizer) from conftest.
Tests fan-out/fan-in pattern with parallel agent execution.
"""
executor, entity_id, mock_client, _workflow = concurrent_workflow
request = AgentFrameworkRequest(
metadata={"entity_id": entity_id},
input="Analyze market trends for Q4",
stream=True,
)
events = []
serialization_errors = []
async for event in executor.execute_streaming(request):
events.append(event)
event_type = getattr(event, "type", "unknown")
# Verify JSON serialization
try:
if hasattr(event, "model_dump_json"):
json_str = event.model_dump_json()
assert json_str is not None
except Exception as e:
serialization_errors.append(f"Event type={event_type}: {e}")
assert len(events) > 0, "Should receive events from concurrent workflow"
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
assert mock_client.call_count >= 3, f"Expected all 3 agents called, got {mock_client.call_count}"
# =============================================================================
# Full Pipeline Test for Workflow with Output Events
# =============================================================================
@pytest.mark.asyncio
async def test_full_pipeline_workflow_output_event_serialization():
"""Test that output event (type='output') from ctx.yield_output() serializes correctly.
This tests the pattern where executors yield output via ctx.yield_output(),
which emits output event (type='output') that DevUI must serialize for SSE.
"""
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
class OutputtingExecutor(Executor):
"""Executor that yields multiple outputs."""
@handler
async def process(self, input_text: str, ctx: WorkflowContext[Any, Any]) -> None:
await ctx.yield_output(f"First output: {input_text}")
await ctx.yield_output("Second output: processed")
await ctx.yield_output({"final": "result", "data": [1, 2, 3]})
# Build workflow
workflow = WorkflowBuilder(
name="Output Workflow",
description="Tests yield_output",
start_executor=OutputtingExecutor(id="outputter"),
).build()
# Create DevUI executor and register workflow
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
# Execute with streaming
request = AgentFrameworkRequest(
metadata={"entity_id": entity_info.id},
input="Test output events",
stream=True,
)
events = []
output_events = []
serialization_errors = []
async for event in executor.execute_streaming(request):
events.append(event)
event_type = getattr(event, "type", "")
# Track output item events
if "output_item" in event_type:
output_events.append(event)
try:
if hasattr(event, "model_dump_json"):
event.model_dump_json()
except Exception as e:
serialization_errors.append(f"Event type={event_type}: {e}")
assert len(events) > 0, "Should receive events"
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
# Should have received output events for the yield_output calls
assert len(output_events) >= 3, f"Expected 3+ output events for yield_output calls, got {len(output_events)}"
async def test_workflow_error_yields_dict_event_without_crash():
"""Test that workflow errors don't crash execute_entity (#3983).
When a workflow raises an exception, _execute_workflow yields a raw dict
{"type": "error", ...}. The execute_entity caller must handle both dict
events and object events without crashing on attribute access.
"""
from unittest.mock import AsyncMock, MagicMock
from agent_framework_devui.models._discovery_models import EntityInfo
discovery = MagicMock(spec=EntityDiscovery)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = EntityInfo(id="bad_wf", name="bad_wf", type="workflow", framework="agent_framework")
discovery.get_entity_info.return_value = entity_info
# Mock workflow whose run() raises
mock_workflow = MagicMock()
mock_workflow.name = "bad_wf"
def failing_run(*args, **kwargs):
raise RuntimeError("Sorry, something went wrong.")
mock_workflow.run = failing_run
discovery.load_entity = AsyncMock(return_value=mock_workflow)
request = AgentFrameworkRequest(
model="test",
input="hello",
metadata={"entity_id": "bad_wf"},
)
events = []
# This should NOT raise AttributeError: 'dict' object has no attribute 'type'
async for event in executor.execute_entity("bad_wf", request):
events.append(event)
# Should get at least one error event
assert len(events) > 0
error_events = [e for e in events if isinstance(e, dict) and e.get("type") == "error"]
assert len(error_events) > 0, f"Expected error dict events, got: {events}"
if __name__ == "__main__":
# Simple test runner
async def run_tests():
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create test agent
agent_file = temp_path / "streaming_agent.py"
agent_file.write_text("""
class StreamingAgent:
name = "Streaming Test Agent"
description = "Test agent for streaming"
async def run(self, input_str, *, stream: bool = False, session=None, **kwargs):
if stream:
async def _stream():
for i, word in enumerate(f"Processing {input_str}".split()):
yield f"word_{i}: {word} "
return _stream()
return f"Processing {input_str}"
""")
discovery = EntityDiscovery(str(temp_path))
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Test discovery
entities = await executor.discover_entities()
if entities:
# Test sync execution (use metadata.entity_id for routing)
request = AgentFrameworkRequest(
metadata={"entity_id": entities[0].id},
input="test input",
stream=False,
)
await executor.execute_sync(request)
# Test streaming execution
request.stream = True
event_count = 0
async for _event in executor.execute_streaming(request):
event_count += 1
if event_count > 5: # Limit for testing
break
asyncio.run(run_tests())