# Copyright (c) Microsoft. All rights reserved. """Focused tests for execution flow functionality. Tests include: - Entity discovery and info retrieval - Agent execution (sync and streaming) using real Agent with mock LLM - Workflow execution using real WorkflowBuilder with FunctionExecutor - Edge cases like non-streaming agents """ import asyncio import tempfile from pathlib import Path from typing import Any import pytest from agent_framework import Agent, AgentExecutor, FunctionExecutor, WorkflowBuilder # Import mock classes from conftest for direct use in some tests from conftest import MockBaseChatClient from agent_framework_devui._discovery import EntityDiscovery from agent_framework_devui._executor import AgentFrameworkExecutor, EntityNotFoundError from agent_framework_devui._mapper import MessageMapper from agent_framework_devui.models._openai_custom import AgentFrameworkRequest # ============================================================================= # Local Fixtures (module-specific) # ============================================================================= @pytest.fixture async def executor(test_entities_dir): """Create configured executor.""" discovery = EntityDiscovery(test_entities_dir) mapper = MessageMapper() executor = AgentFrameworkExecutor(discovery, mapper) # Discover entities await executor.discover_entities() return executor async def test_executor_entity_discovery(executor): """Test executor entity discovery.""" entities = await executor.discover_entities() # Should find entities from samples directory assert len(entities) > 0, "Should discover at least one entity" entity_types = [e.type for e in entities] assert "agent" in entity_types, "Should find at least one agent" assert "workflow" in entity_types, "Should find at least one workflow" # Test entity structure for entity in entities: assert entity.id, "Entity should have an ID" assert entity.name, "Entity should have a name" # Entities with only an `__init__.py` file cannot have their type determined # until the module is imported during lazy loading. This is why 'unknown' type exists. assert entity.type in ["agent", "workflow", "unknown"], ( "Entity should have valid type (unknown allowed during discovery phase)" ) async def test_executor_get_entity_info(executor): """Test getting entity info by ID.""" entities = await executor.discover_entities() entity_id = entities[0].id entity_info = executor.get_entity_info(entity_id) assert entity_info is not None assert entity_info.id == entity_id assert entity_info.type in ["agent", "workflow", "unknown"] # ============================================================================= # Agent Execution Tests (using real Agent with mock LLM) # ============================================================================= async def test_agent_sync_execution(executor_with_real_agent): """Test synchronous agent execution with REAL Agent (mock LLM). This tests the full execution pipeline without needing an API key: - Real Agent class with middleware - Real message normalization - Mock chat client for LLM calls """ executor, entity_id, mock_client = executor_with_real_agent request = AgentFrameworkRequest( metadata={"entity_id": entity_id}, input="test data", stream=False, ) response = await executor.execute_sync(request) # Response model should be 'devui' when not specified assert response.model == "devui" assert response.object == "response" assert len(response.output) > 0 # Verify mock client was called assert mock_client.call_count == 1 async def test_agent_sync_execution_respects_model_field(executor_with_real_agent): """Test synchronous execution respects the model field in the response.""" executor, entity_id, mock_client = executor_with_real_agent request = AgentFrameworkRequest( metadata={"entity_id": entity_id}, model="custom-model-name", input="test data", stream=False, ) response = await executor.execute_sync(request) # Response model should reflect the specified model assert response.model == "custom-model-name" assert response.object == "response" assert len(response.output) > 0 async def test_chat_client_receives_correct_messages(executor_with_real_agent): """Verify the mock chat client receives properly formatted messages. This tests that the REAL Agent properly: - Normalizes input messages - Formats messages for the chat client """ executor, entity_id, mock_client = executor_with_real_agent request = AgentFrameworkRequest( metadata={"entity_id": entity_id}, input="What is 2+2?", stream=False, ) await executor.execute_sync(request) # Verify chat client was called assert mock_client.call_count == 1 # Verify messages were received assert len(mock_client.received_messages) == 1 messages = mock_client.received_messages[0] # Should have at least one message assert len(messages) >= 1, f"Expected messages, got: {messages}" # Verify the input text is present in the messages all_text = " ".join(m.text or "" for m in messages) assert "2+2" in all_text, f"Expected '2+2' in messages, got text: '{all_text}'" # ============================================================================= # Workflow Execution Tests (using real WorkflowBuilder with FunctionExecutor) # ============================================================================= async def test_workflow_streaming_execution(): """Test workflow streaming execution with REAL WorkflowBuilder and FunctionExecutor. This tests the full workflow execution pipeline without needing an API key. Uses a simple function-based workflow that processes input. """ # Create a simple workflow using real agent_framework classes def process_input(input_data: str) -> str: return f"Processed: {input_data}" start_executor = FunctionExecutor(id="process", func=process_input) workflow = WorkflowBuilder( name="Test Workflow", description="Test workflow for execution", start_executor=start_executor, ).build() # Create executor and register workflow discovery = EntityDiscovery(None) mapper = MessageMapper() executor = AgentFrameworkExecutor(discovery, mapper) entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") discovery.register_entity(entity_info.id, entity_info, workflow) # Execute workflow request = AgentFrameworkRequest( metadata={"entity_id": entity_info.id}, input="hello workflow", stream=True, ) events = [] async for event in executor.execute_streaming(request): events.append(event) # Should get events from workflow execution assert len(events) > 0, "Should receive events from workflow" # Check for workflow-specific events or completion event_types = [getattr(e, "type", None) for e in events] assert any(t is not None for t in event_types), f"Should have typed events, got: {event_types}" async def test_workflow_sync_execution(): """Test synchronous workflow execution.""" def echo(text: str) -> str: return f"Echo: {text}" start_executor = FunctionExecutor(id="echo", func=echo) workflow = WorkflowBuilder( name="Echo Workflow", description="Simple echo workflow", start_executor=start_executor, ).build() # Create executor and register workflow discovery = EntityDiscovery(None) mapper = MessageMapper() executor = AgentFrameworkExecutor(discovery, mapper) entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") discovery.register_entity(entity_info.id, entity_info, workflow) # Execute workflow synchronously request = AgentFrameworkRequest( metadata={"entity_id": entity_info.id}, input="test input", stream=False, ) response = await executor.execute_sync(request) # Should get a valid response assert response.object == "response" assert len(response.output) > 0 # ============================================================================= # Full Pipeline Serialization Tests (Run + Map + JSON) # ============================================================================= async def test_full_pipeline_agent_events_are_json_serializable(executor_with_real_agent): """CRITICAL TEST: Verify ALL events from agent execution can be JSON serialized. This tests the exact code path that the server uses: 1. Execute agent via executor.execute_streaming() 2. Each event is converted by the mapper 3. Server calls model_dump_json() on each event for SSE If any event contains non-serializable objects (like AgentResponse), this test will fail - catching the bug before it hits production. """ executor, entity_id, mock_client = executor_with_real_agent request = AgentFrameworkRequest( metadata={"entity_id": entity_id}, input="Test message for serialization", stream=True, ) events = [] serialization_errors = [] async for event in executor.execute_streaming(request): events.append(event) # This is EXACTLY what the server does before sending SSE try: if hasattr(event, "model_dump_json"): json_str = event.model_dump_json() assert json_str is not None assert len(json_str) > 0 except Exception as e: serialization_errors.append(f"Event type={getattr(event, 'type', 'unknown')}: {e}") # Should have received events assert len(events) > 0, "Should receive events from agent execution" # NO serialization errors allowed assert len(serialization_errors) == 0, f"Found {len(serialization_errors)} serialization errors:\n" + "\n".join( serialization_errors ) async def test_full_pipeline_workflow_events_are_json_serializable(): """CRITICAL TEST: Verify ALL events from workflow execution can be JSON serialized. This is particularly important for workflows with AgentExecutor because: - AgentExecutor produces executor_completed event (type='executor_completed') with AgentExecutorResponse - AgentExecutorResponse contains AgentResponse and Message objects - These are SerializationMixin objects, not Pydantic, which caused the original bug This test ensures the ENTIRE streaming pipeline works end-to-end. """ # Create a workflow with AgentExecutor (the problematic case) mock_client = MockBaseChatClient() agent = Agent( id="serialization_test_agent", name="Serialization Test Agent", description="Agent for testing serialization", client=mock_client, instructions="You are a test assistant.", ) agent_executor = AgentExecutor(id="agent_node", agent=agent) workflow = WorkflowBuilder( name="Serialization Test Workflow", description="Test workflow", start_executor=agent_executor, ).build() # Create executor and register discovery = EntityDiscovery(None) mapper = MessageMapper() executor = AgentFrameworkExecutor(discovery, mapper) entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") discovery.register_entity(entity_info.id, entity_info, workflow) request = AgentFrameworkRequest( metadata={"entity_id": entity_info.id}, input="Test workflow serialization", stream=True, ) events = [] serialization_errors = [] event_types_seen = [] async for event in executor.execute_streaming(request): events.append(event) event_type = getattr(event, "type", "unknown") event_types_seen.append(event_type) # This is EXACTLY what the server does before sending SSE try: if hasattr(event, "model_dump_json"): json_str = event.model_dump_json() assert json_str is not None assert len(json_str) > 0 except Exception as e: serialization_errors.append(f"Event type={event_type}: {e}") # Should have received events assert len(events) > 0, "Should receive events from workflow execution" # Verify we got workflow events (not just generic ones) assert any("output_item" in str(t) for t in event_types_seen), ( f"Should see output_item events, got: {event_types_seen}" ) # NO serialization errors allowed - this is the critical assertion assert len(serialization_errors) == 0, ( f"Found {len(serialization_errors)} serialization errors:\n" + "\n".join(serialization_errors) + f"\n\nEvent types seen: {event_types_seen}" ) # Also verify aggregate_to_response works (server calls this after streaming) final_response = await mapper.aggregate_to_response(events, request) assert final_response is not None async def test_get_entity_info_raises_for_invalid_id(executor): """Test that get_entity_info raises EntityNotFoundError for invalid ID.""" with pytest.raises(EntityNotFoundError): executor.get_entity_info("nonexistent_agent") async def test_request_extracts_entity_id_from_metadata(executor): """Test that AgentFrameworkRequest extracts entity_id from metadata.""" request = AgentFrameworkRequest( metadata={"entity_id": "my_agent"}, input="test", stream=False, ) # entity_id is extracted from metadata entity_id = request.get_entity_id() assert entity_id == "my_agent" @pytest.mark.asyncio async def test_executor_get_start_executor_message_types(sequential_workflow): """Test _get_start_executor_message_types with real workflow.""" executor, _entity_id, _mock_client, workflow = sequential_workflow start_exec, message_types = executor._get_start_executor_message_types(workflow) assert start_exec is not None assert len(message_types) > 0 # Real sequential workflows accept str input assert str in message_types def test_executor_select_primary_input_prefers_string(): """Select string input even when discovered after other handlers.""" from agent_framework_devui._utils import select_primary_input_type placeholder_type = type("Placeholder", (), {}) chosen = select_primary_input_type([placeholder_type, str]) assert chosen is str @pytest.mark.asyncio async def test_executor_parse_structured_extracts_input_for_string_workflow(): """Structured payloads extract 'input' field when workflow expects str.""" from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler class StringInputExecutor(Executor): """Executor that accepts string input directly.""" @handler async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None: await ctx.yield_output(f"Got: {text}") workflow = WorkflowBuilder( name="String Workflow", description="Accepts string", start_executor=StringInputExecutor(id="str_exec"), ).build() executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper()) # When workflow expects str and receives {"input": "hello"}, extract "hello" parsed = executor._parse_structured_workflow_input(workflow, {"input": "hello"}) assert parsed == "hello" @pytest.mark.asyncio async def test_executor_parse_raw_string_for_string_workflow(): """Raw string inputs pass through for string-accepting workflows.""" from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler class StringInputExecutor(Executor): """Executor that accepts string input directly.""" @handler async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None: await ctx.yield_output(f"Got: {text}") workflow = WorkflowBuilder( name="String Workflow", description="Accepts string", start_executor=StringInputExecutor(id="str_exec"), ).build() executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper()) # Raw string should pass through unchanged parsed = executor._parse_raw_workflow_input(workflow, "hi there") assert parsed == "hi there" @pytest.mark.asyncio async def test_executor_parse_converts_to_chat_message_for_sequential_workflow(sequential_workflow): """Sequential workflows convert string input to Message.""" from agent_framework import Message executor, _entity_id, _mock_client, workflow = sequential_workflow # Sequential workflows expect Message, so raw string becomes Message parsed = executor._parse_raw_workflow_input(workflow, "hello") assert isinstance(parsed, Message) assert parsed.text == "hello" @pytest.mark.asyncio async def test_executor_parse_stringified_json_workflow_input(): """Stringified JSON workflow input is parsed when workflow expects Pydantic model.""" from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler from pydantic import BaseModel class WorkflowInput(BaseModel): input: str metadata: dict | None = None class PydanticInputExecutor(Executor): """Executor that accepts a Pydantic model input.""" @handler async def process(self, data: WorkflowInput, ctx: WorkflowContext[Any, Any]) -> None: await ctx.yield_output(f"Got: {data.input}") # Build workflow with Pydantic input type workflow = WorkflowBuilder( name="Pydantic Workflow", description="Accepts Pydantic input", start_executor=PydanticInputExecutor(id="pydantic_exec"), ).build() executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper()) # Simulate frontend sending JSON.stringify({"input": "testing!", "metadata": {"key": "value"}}) stringified_json = '{"input": "testing!", "metadata": {"key": "value"}}' parsed = executor._parse_raw_workflow_input(workflow, stringified_json) # Should parse into WorkflowInput object assert isinstance(parsed, WorkflowInput) assert parsed.input == "testing!" assert parsed.metadata == {"key": "value"} def test_extract_workflow_hil_responses_handles_stringified_json(): """Test HIL response extraction handles both stringified and parsed JSON (regression test).""" from agent_framework_devui._discovery import EntityDiscovery from agent_framework_devui._executor import AgentFrameworkExecutor from agent_framework_devui._mapper import MessageMapper executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper()) # Regression test: Frontend sends stringified JSON via streamWorkflowExecutionOpenAI stringified = '[{"type":"message","content":[{"type":"workflow_hil_response","responses":{"req_1":"spam"}}]}]' assert executor._extract_workflow_hil_responses(stringified) == {"req_1": "spam"} # Ensure parsed format still works parsed = [{"type": "message", "content": [{"type": "workflow_hil_response", "responses": {"req_2": "ham"}}]}] assert executor._extract_workflow_hil_responses(parsed) == {"req_2": "ham"} # Non-HIL inputs should return None assert executor._extract_workflow_hil_responses("plain text") is None assert executor._extract_workflow_hil_responses({"email": "test"}) is None async def test_executor_handles_streaming_agent(): """Test executor handles agents with run(stream=True) method.""" from agent_framework import AgentResponse, AgentResponseUpdate, AgentSession, Content, Message class StreamingAgent: """Agent with run() method supporting stream parameter.""" id = "streaming_test" name = "Streaming Test Agent" description = "Test agent with run(stream=True)" def run(self, messages=None, *, stream=False, session=None, **kwargs): if stream: # Return an async generator for streaming return self._stream_impl(messages) # Return awaitable for non-streaming return self._run_impl(messages) async def _run_impl(self, messages): return AgentResponse( messages=[Message(role="assistant", contents=[Content.from_text(text=f"Processed: {messages}")])], response_id="test_123", ) async def _stream_impl(self, messages): yield AgentResponseUpdate( contents=[Content.from_text(text=f"Processed: {messages}")], role="assistant", ) def create_session(self, **kwargs): return AgentSession() # Create executor and register agent discovery = EntityDiscovery(None) mapper = MessageMapper() executor = AgentFrameworkExecutor(discovery, mapper) agent = StreamingAgent() entity_info = await discovery.create_entity_info_from_object(agent, source="test") discovery.register_entity(entity_info.id, entity_info, agent) # Execute streaming agent (use metadata.entity_id for routing) request = AgentFrameworkRequest( metadata={"entity_id": entity_info.id}, input="hello", stream=True, # DevUI always streams ) events = [] async for event in executor.execute_streaming(request): events.append(event) # Should get events from streaming agent assert len(events) > 0 text_events = [e for e in events if hasattr(e, "type") and e.type == "response.output_text.delta"] assert len(text_events) > 0 assert "Processed: hello" in text_events[0].delta # ============================================================================= # Full Pipeline Tests for SequentialBuilder # ============================================================================= @pytest.mark.asyncio async def test_full_pipeline_sequential_workflow(sequential_workflow): """Test SequentialBuilder workflow full pipeline with JSON serialization. Uses the shared sequential_workflow fixture (Writer → Reviewer) from conftest. Tests that all events can be JSON serialized for SSE streaming. """ executor, entity_id, mock_client, _workflow = sequential_workflow request = AgentFrameworkRequest( metadata={"entity_id": entity_id}, input="Write about testing best practices", stream=True, ) events = [] serialization_errors = [] async for event in executor.execute_streaming(request): events.append(event) event_type = getattr(event, "type", "unknown") # Verify JSON serialization (exactly what server does for SSE) try: if hasattr(event, "model_dump_json"): json_str = event.model_dump_json() assert json_str is not None except Exception as e: serialization_errors.append(f"Event type={event_type}: {e}") assert len(events) > 0, "Should receive events from sequential workflow" assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}" assert mock_client.call_count >= 2, f"Expected both agents called, got {mock_client.call_count}" @pytest.mark.asyncio async def test_full_pipeline_concurrent_workflow(concurrent_workflow): """Test ConcurrentBuilder workflow full pipeline with JSON serialization. Uses the shared concurrent_workflow fixture (Researcher | Analyst | Summarizer) from conftest. Tests fan-out/fan-in pattern with parallel agent execution. """ executor, entity_id, mock_client, _workflow = concurrent_workflow request = AgentFrameworkRequest( metadata={"entity_id": entity_id}, input="Analyze market trends for Q4", stream=True, ) events = [] serialization_errors = [] async for event in executor.execute_streaming(request): events.append(event) event_type = getattr(event, "type", "unknown") # Verify JSON serialization try: if hasattr(event, "model_dump_json"): json_str = event.model_dump_json() assert json_str is not None except Exception as e: serialization_errors.append(f"Event type={event_type}: {e}") assert len(events) > 0, "Should receive events from concurrent workflow" assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}" assert mock_client.call_count >= 3, f"Expected all 3 agents called, got {mock_client.call_count}" # ============================================================================= # Full Pipeline Test for Workflow with Output Events # ============================================================================= @pytest.mark.asyncio async def test_full_pipeline_workflow_output_event_serialization(): """Test that output event (type='output') from ctx.yield_output() serializes correctly. This tests the pattern where executors yield output via ctx.yield_output(), which emits output event (type='output') that DevUI must serialize for SSE. """ from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler class OutputtingExecutor(Executor): """Executor that yields multiple outputs.""" @handler async def process(self, input_text: str, ctx: WorkflowContext[Any, Any]) -> None: await ctx.yield_output(f"First output: {input_text}") await ctx.yield_output("Second output: processed") await ctx.yield_output({"final": "result", "data": [1, 2, 3]}) # Build workflow workflow = WorkflowBuilder( name="Output Workflow", description="Tests yield_output", start_executor=OutputtingExecutor(id="outputter"), ).build() # Create DevUI executor and register workflow discovery = EntityDiscovery(None) mapper = MessageMapper() executor = AgentFrameworkExecutor(discovery, mapper) entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") discovery.register_entity(entity_info.id, entity_info, workflow) # Execute with streaming request = AgentFrameworkRequest( metadata={"entity_id": entity_info.id}, input="Test output events", stream=True, ) events = [] output_events = [] serialization_errors = [] async for event in executor.execute_streaming(request): events.append(event) event_type = getattr(event, "type", "") # Track output item events if "output_item" in event_type: output_events.append(event) try: if hasattr(event, "model_dump_json"): event.model_dump_json() except Exception as e: serialization_errors.append(f"Event type={event_type}: {e}") assert len(events) > 0, "Should receive events" assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}" # Should have received output events for the yield_output calls assert len(output_events) >= 3, f"Expected 3+ output events for yield_output calls, got {len(output_events)}" async def test_workflow_error_yields_dict_event_without_crash(): """Test that workflow errors don't crash execute_entity (#3983). When a workflow raises an exception, _execute_workflow yields a raw dict {"type": "error", ...}. The execute_entity caller must handle both dict events and object events without crashing on attribute access. """ from unittest.mock import AsyncMock, MagicMock from agent_framework_devui.models._discovery_models import EntityInfo discovery = MagicMock(spec=EntityDiscovery) mapper = MessageMapper() executor = AgentFrameworkExecutor(discovery, mapper) entity_info = EntityInfo(id="bad_wf", name="bad_wf", type="workflow", framework="agent_framework") discovery.get_entity_info.return_value = entity_info # Mock workflow whose run() raises mock_workflow = MagicMock() mock_workflow.name = "bad_wf" def failing_run(*args, **kwargs): raise RuntimeError("Sorry, something went wrong.") mock_workflow.run = failing_run discovery.load_entity = AsyncMock(return_value=mock_workflow) request = AgentFrameworkRequest( model="test", input="hello", metadata={"entity_id": "bad_wf"}, ) events = [] # This should NOT raise AttributeError: 'dict' object has no attribute 'type' async for event in executor.execute_entity("bad_wf", request): events.append(event) # Should get at least one error event assert len(events) > 0 error_events = [e for e in events if isinstance(e, dict) and e.get("type") == "error"] assert len(error_events) > 0, f"Expected error dict events, got: {events}" if __name__ == "__main__": # Simple test runner async def run_tests(): with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Create test agent agent_file = temp_path / "streaming_agent.py" agent_file.write_text(""" class StreamingAgent: name = "Streaming Test Agent" description = "Test agent for streaming" async def run(self, input_str, *, stream: bool = False, session=None, **kwargs): if stream: async def _stream(): for i, word in enumerate(f"Processing {input_str}".split()): yield f"word_{i}: {word} " return _stream() return f"Processing {input_str}" """) discovery = EntityDiscovery(str(temp_path)) mapper = MessageMapper() executor = AgentFrameworkExecutor(discovery, mapper) # Test discovery entities = await executor.discover_entities() if entities: # Test sync execution (use metadata.entity_id for routing) request = AgentFrameworkRequest( metadata={"entity_id": entities[0].id}, input="test input", stream=False, ) await executor.execute_sync(request) # Test streaming execution request.stream = True event_count = 0 async for _event in executor.execute_streaming(request): event_count += 1 if event_count > 5: # Limit for testing break asyncio.run(run_tests())