mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
1e350ea22f
* PR2: Wire context provider pipeline and update all internal consumers - Replace AgentThread with AgentSession across all packages - Replace ContextProvider with BaseContextProvider across all packages - Replace context_provider param with context_providers (Sequence) - Replace thread= with session= in run() signatures - Replace get_new_thread() with create_session() - Add get_session(service_session_id) to agent interface - DurableAgentThread -> DurableAgentSession - Remove _notify_thread_of_new_messages from WorkflowAgent - Wire before_run/after_run context provider pipeline in RawAgent - Auto-inject InMemoryHistoryProvider when no providers configured * fix: update all tests for context provider pipeline, fix lazy-loaders, remove old test files * refactor: update all sample files for context provider pipeline (AgentThread→AgentSession, ContextProvider→BaseContextProvider) * fix: update remaining ag-ui references (client docstring, getting_started sample) * fix: make get_session service_session_id keyword-only to avoid confusion with session_id * refactor: rename _RunContext.thread_messages to session_messages * refactor: remove _threads.py, _memory.py, and old provider files; migrate devui to use plain message lists * rename: remove _new_ prefix from test files * refactor: rewrite SlidingWindowChatMessageStore as SlidingWindowHistoryProvider(InMemoryHistoryProvider) * fix: read full history from session state directly instead of reaching into provider internals * fix: update stale .pyi stubs, sample imports, and README references for new provider types * fix: remove stale message_store, _notify_thread_of_new_messages, and session_id.key references in samples * refactor: merge context_providers and sessions sample folders into sessions, remove aggregate_context_provider * refactor: UserInfoMemory stores state in session.state instead of instance attributes * feat: add Pydantic BaseModel support to session state serialization Pydantic models stored in session.state are now automatically serialized via model_dump() and restored via model_validate() during to_dict()/from_dict() round-trips. Models are auto-registered on first serialization; use register_state_type() for cold-start deserialization. Also export register_state_type as a public API. * fix mem0 * Update sample README links and descriptions for session terminology - Replace 'thread' with 'session' in sample descriptions across all READMEs - Update file links for renamed samples (mem0_sessions, redis_sessions, etc.) - Fix Threads section → Sessions section in main samples/README.md - Update tools, middleware, workflows, durabletask, azure_functions READMEs - Update architecture diagrams in concepts/tools/README.md - Update migration guides (autogen, semantic-kernel) * Fix broken Redis README link to renamed sample * Fix Mem0 OSS client search: pass scoping params as direct kwargs AsyncMemory (OSS) expects user_id/agent_id/run_id as direct kwargs, while AsyncMemoryClient (Platform) expects them in a filters dict. Adds tests for both client types. Port of fix from #3844 to new Mem0ContextProvider. * Fix rebase issues: restore missing _conversation_state.py and checkpoint decode logic - Add back _conversation_state.py (encode/decode_chat_messages) lost in rebase - Fix on_checkpoint_restore to decode cache/conversation with decode_chat_messages - Fix on_checkpoint_restore to use decode_checkpoint_value for pending requests - Add tests/workflow/__init__.py for relative import support - Fix test_agent_executor checkpoint selection (checkpoints[1] not superstep) * Add STORES_BY_DEFAULT ClassVar to skip redundant InMemoryHistoryProvider injection Chat clients that store history server-side by default (OpenAI Responses API, Azure AI Agent) now declare STORES_BY_DEFAULT = True. The agent checks this during auto-injection and skips InMemoryHistoryProvider unless the user explicitly sets store=False. * Fix broken markdown links in azure_ai and redis READMEs * Fix getting-started samples to use session API instead of removed thread/ContextProvider API * updates to workflow as agent * fix group chat import * Rename Thread→Session throughout, fix service_session_id propagation, remove stale AGUIThread - Fix: Propagate conversation_id from ChatResponse back to session.service_session_id in both streaming and non-streaming paths in _agents.py - Rename AgentThreadException → AgentSessionException - Remove stale AGUIThread from ag_ui lazy-loader - Rename use_service_thread → use_service_session in ag-ui package - Rename test functions from *_thread_* to *_session_* - Rename sample files from *_thread* to *_session* - Update docstrings and comments: thread → session - Update _mcp.py kwargs filter: add 'session' alongside 'thread' - Fix ContinuationToken docstring example: thread=thread → session=session - Fix _clients.py docstring: 'Agent threads' → 'Agent sessions' * Fix broken markdown links after thread→session file renames * fix azure ai test
792 lines
28 KiB
Python
792 lines
28 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Focused tests for execution flow functionality.
|
|
|
|
Tests include:
|
|
- Entity discovery and info retrieval
|
|
- Agent execution (sync and streaming) using real Agent with mock LLM
|
|
- Workflow execution using real WorkflowBuilder with FunctionExecutor
|
|
- Edge cases like non-streaming agents
|
|
"""
|
|
|
|
import asyncio
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from agent_framework import Agent, AgentExecutor, FunctionExecutor, WorkflowBuilder
|
|
|
|
# Import mock classes from conftest for direct use in some tests
|
|
from conftest import MockBaseChatClient
|
|
|
|
from agent_framework_devui._discovery import EntityDiscovery
|
|
from agent_framework_devui._executor import AgentFrameworkExecutor, EntityNotFoundError
|
|
from agent_framework_devui._mapper import MessageMapper
|
|
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
|
|
|
|
# =============================================================================
|
|
# Local Fixtures (module-specific)
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.fixture
|
|
async def executor(test_entities_dir):
|
|
"""Create configured executor."""
|
|
discovery = EntityDiscovery(test_entities_dir)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
# Discover entities
|
|
await executor.discover_entities()
|
|
|
|
return executor
|
|
|
|
|
|
async def test_executor_entity_discovery(executor):
|
|
"""Test executor entity discovery."""
|
|
entities = await executor.discover_entities()
|
|
|
|
# Should find entities from samples directory
|
|
assert len(entities) > 0, "Should discover at least one entity"
|
|
|
|
entity_types = [e.type for e in entities]
|
|
assert "agent" in entity_types, "Should find at least one agent"
|
|
assert "workflow" in entity_types, "Should find at least one workflow"
|
|
|
|
# Test entity structure
|
|
for entity in entities:
|
|
assert entity.id, "Entity should have an ID"
|
|
assert entity.name, "Entity should have a name"
|
|
# Entities with only an `__init__.py` file cannot have their type determined
|
|
# until the module is imported during lazy loading. This is why 'unknown' type exists.
|
|
assert entity.type in ["agent", "workflow", "unknown"], (
|
|
"Entity should have valid type (unknown allowed during discovery phase)"
|
|
)
|
|
|
|
|
|
async def test_executor_get_entity_info(executor):
|
|
"""Test getting entity info by ID."""
|
|
entities = await executor.discover_entities()
|
|
entity_id = entities[0].id
|
|
|
|
entity_info = executor.get_entity_info(entity_id)
|
|
assert entity_info is not None
|
|
assert entity_info.id == entity_id
|
|
assert entity_info.type in ["agent", "workflow", "unknown"]
|
|
|
|
|
|
# =============================================================================
|
|
# Agent Execution Tests (using real Agent with mock LLM)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_agent_sync_execution(executor_with_real_agent):
|
|
"""Test synchronous agent execution with REAL Agent (mock LLM).
|
|
|
|
This tests the full execution pipeline without needing an API key:
|
|
- Real Agent class with middleware
|
|
- Real message normalization
|
|
- Mock chat client for LLM calls
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="test data",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Response model should be 'devui' when not specified
|
|
assert response.model == "devui"
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
# Verify mock client was called
|
|
assert mock_client.call_count == 1
|
|
|
|
|
|
async def test_agent_sync_execution_respects_model_field(executor_with_real_agent):
|
|
"""Test synchronous execution respects the model field in the response."""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
model="custom-model-name",
|
|
input="test data",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Response model should reflect the specified model
|
|
assert response.model == "custom-model-name"
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
|
|
async def test_chat_client_receives_correct_messages(executor_with_real_agent):
|
|
"""Verify the mock chat client receives properly formatted messages.
|
|
|
|
This tests that the REAL Agent properly:
|
|
- Normalizes input messages
|
|
- Formats messages for the chat client
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="What is 2+2?",
|
|
stream=False,
|
|
)
|
|
|
|
await executor.execute_sync(request)
|
|
|
|
# Verify chat client was called
|
|
assert mock_client.call_count == 1
|
|
|
|
# Verify messages were received
|
|
assert len(mock_client.received_messages) == 1
|
|
messages = mock_client.received_messages[0]
|
|
|
|
# Should have at least one message
|
|
assert len(messages) >= 1, f"Expected messages, got: {messages}"
|
|
|
|
# Verify the input text is present in the messages
|
|
all_text = " ".join(m.text or "" for m in messages)
|
|
assert "2+2" in all_text, f"Expected '2+2' in messages, got text: '{all_text}'"
|
|
|
|
|
|
# =============================================================================
|
|
# Workflow Execution Tests (using real WorkflowBuilder with FunctionExecutor)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_workflow_streaming_execution():
|
|
"""Test workflow streaming execution with REAL WorkflowBuilder and FunctionExecutor.
|
|
|
|
This tests the full workflow execution pipeline without needing an API key.
|
|
Uses a simple function-based workflow that processes input.
|
|
"""
|
|
|
|
# Create a simple workflow using real agent_framework classes
|
|
def process_input(input_data: str) -> str:
|
|
return f"Processed: {input_data}"
|
|
|
|
start_executor = FunctionExecutor(id="process", func=process_input)
|
|
workflow = WorkflowBuilder(
|
|
name="Test Workflow",
|
|
description="Test workflow for execution",
|
|
start_executor=start_executor,
|
|
).build()
|
|
|
|
# Create executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute workflow
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="hello workflow",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# Should get events from workflow execution
|
|
assert len(events) > 0, "Should receive events from workflow"
|
|
|
|
# Check for workflow-specific events or completion
|
|
event_types = [getattr(e, "type", None) for e in events]
|
|
assert any(t is not None for t in event_types), f"Should have typed events, got: {event_types}"
|
|
|
|
|
|
async def test_workflow_sync_execution():
|
|
"""Test synchronous workflow execution."""
|
|
|
|
def echo(text: str) -> str:
|
|
return f"Echo: {text}"
|
|
|
|
start_executor = FunctionExecutor(id="echo", func=echo)
|
|
workflow = WorkflowBuilder(
|
|
name="Echo Workflow",
|
|
description="Simple echo workflow",
|
|
start_executor=start_executor,
|
|
).build()
|
|
|
|
# Create executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute workflow synchronously
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="test input",
|
|
stream=False,
|
|
)
|
|
|
|
response = await executor.execute_sync(request)
|
|
|
|
# Should get a valid response
|
|
assert response.object == "response"
|
|
assert len(response.output) > 0
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Serialization Tests (Run + Map + JSON)
|
|
# =============================================================================
|
|
|
|
|
|
async def test_full_pipeline_agent_events_are_json_serializable(executor_with_real_agent):
|
|
"""CRITICAL TEST: Verify ALL events from agent execution can be JSON serialized.
|
|
|
|
This tests the exact code path that the server uses:
|
|
1. Execute agent via executor.execute_streaming()
|
|
2. Each event is converted by the mapper
|
|
3. Server calls model_dump_json() on each event for SSE
|
|
|
|
If any event contains non-serializable objects (like AgentResponse),
|
|
this test will fail - catching the bug before it hits production.
|
|
"""
|
|
executor, entity_id, mock_client = executor_with_real_agent
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Test message for serialization",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# This is EXACTLY what the server does before sending SSE
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
assert len(json_str) > 0
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={getattr(event, 'type', 'unknown')}: {e}")
|
|
|
|
# Should have received events
|
|
assert len(events) > 0, "Should receive events from agent execution"
|
|
|
|
# NO serialization errors allowed
|
|
assert len(serialization_errors) == 0, f"Found {len(serialization_errors)} serialization errors:\n" + "\n".join(
|
|
serialization_errors
|
|
)
|
|
|
|
|
|
async def test_full_pipeline_workflow_events_are_json_serializable():
|
|
"""CRITICAL TEST: Verify ALL events from workflow execution can be JSON serialized.
|
|
|
|
This is particularly important for workflows with AgentExecutor because:
|
|
- AgentExecutor produces executor_completed event (type='executor_completed') with AgentExecutorResponse
|
|
- AgentExecutorResponse contains AgentResponse and Message objects
|
|
- These are SerializationMixin objects, not Pydantic, which caused the original bug
|
|
|
|
This test ensures the ENTIRE streaming pipeline works end-to-end.
|
|
"""
|
|
# Create a workflow with AgentExecutor (the problematic case)
|
|
mock_client = MockBaseChatClient()
|
|
agent = Agent(
|
|
id="serialization_test_agent",
|
|
name="Serialization Test Agent",
|
|
description="Agent for testing serialization",
|
|
client=mock_client,
|
|
system_message="You are a test assistant.",
|
|
)
|
|
|
|
agent_executor = AgentExecutor(id="agent_node", agent=agent)
|
|
workflow = WorkflowBuilder(
|
|
name="Serialization Test Workflow",
|
|
description="Test workflow",
|
|
start_executor=agent_executor,
|
|
).build()
|
|
|
|
# Create executor and register
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="Test workflow serialization",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
event_types_seen = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
event_types_seen.append(event_type)
|
|
|
|
# This is EXACTLY what the server does before sending SSE
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
assert len(json_str) > 0
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
# Should have received events
|
|
assert len(events) > 0, "Should receive events from workflow execution"
|
|
|
|
# Verify we got workflow events (not just generic ones)
|
|
assert any("output_item" in str(t) for t in event_types_seen), (
|
|
f"Should see output_item events, got: {event_types_seen}"
|
|
)
|
|
|
|
# NO serialization errors allowed - this is the critical assertion
|
|
assert len(serialization_errors) == 0, (
|
|
f"Found {len(serialization_errors)} serialization errors:\n"
|
|
+ "\n".join(serialization_errors)
|
|
+ f"\n\nEvent types seen: {event_types_seen}"
|
|
)
|
|
|
|
# Also verify aggregate_to_response works (server calls this after streaming)
|
|
final_response = await mapper.aggregate_to_response(events, request)
|
|
assert final_response is not None
|
|
|
|
|
|
async def test_get_entity_info_raises_for_invalid_id(executor):
|
|
"""Test that get_entity_info raises EntityNotFoundError for invalid ID."""
|
|
with pytest.raises(EntityNotFoundError):
|
|
executor.get_entity_info("nonexistent_agent")
|
|
|
|
|
|
async def test_request_extracts_entity_id_from_metadata(executor):
|
|
"""Test that AgentFrameworkRequest extracts entity_id from metadata."""
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": "my_agent"},
|
|
input="test",
|
|
stream=False,
|
|
)
|
|
|
|
# entity_id is extracted from metadata
|
|
entity_id = request.get_entity_id()
|
|
assert entity_id == "my_agent"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_get_start_executor_message_types(sequential_workflow):
|
|
"""Test _get_start_executor_message_types with real workflow."""
|
|
executor, _entity_id, _mock_client, workflow = sequential_workflow
|
|
|
|
start_exec, message_types = executor._get_start_executor_message_types(workflow)
|
|
|
|
assert start_exec is not None
|
|
assert len(message_types) > 0
|
|
# Real sequential workflows accept str input
|
|
assert str in message_types
|
|
|
|
|
|
def test_executor_select_primary_input_prefers_string():
|
|
"""Select string input even when discovered after other handlers."""
|
|
from agent_framework_devui._utils import select_primary_input_type
|
|
|
|
placeholder_type = type("Placeholder", (), {})
|
|
|
|
chosen = select_primary_input_type([placeholder_type, str])
|
|
|
|
assert chosen is str
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_structured_extracts_input_for_string_workflow():
|
|
"""Structured payloads extract 'input' field when workflow expects str."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class StringInputExecutor(Executor):
|
|
"""Executor that accepts string input directly."""
|
|
|
|
@handler
|
|
async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {text}")
|
|
|
|
workflow = WorkflowBuilder(
|
|
name="String Workflow",
|
|
description="Accepts string",
|
|
start_executor=StringInputExecutor(id="str_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# When workflow expects str and receives {"input": "hello"}, extract "hello"
|
|
parsed = executor._parse_structured_workflow_input(workflow, {"input": "hello"})
|
|
assert parsed == "hello"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_raw_string_for_string_workflow():
|
|
"""Raw string inputs pass through for string-accepting workflows."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class StringInputExecutor(Executor):
|
|
"""Executor that accepts string input directly."""
|
|
|
|
@handler
|
|
async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {text}")
|
|
|
|
workflow = WorkflowBuilder(
|
|
name="String Workflow",
|
|
description="Accepts string",
|
|
start_executor=StringInputExecutor(id="str_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Raw string should pass through unchanged
|
|
parsed = executor._parse_raw_workflow_input(workflow, "hi there")
|
|
assert parsed == "hi there"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_converts_to_chat_message_for_sequential_workflow(sequential_workflow):
|
|
"""Sequential workflows convert string input to Message."""
|
|
from agent_framework import Message
|
|
|
|
executor, _entity_id, _mock_client, workflow = sequential_workflow
|
|
|
|
# Sequential workflows expect Message, so raw string becomes Message
|
|
parsed = executor._parse_raw_workflow_input(workflow, "hello")
|
|
|
|
assert isinstance(parsed, Message)
|
|
assert parsed.text == "hello"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_parse_stringified_json_workflow_input():
|
|
"""Stringified JSON workflow input is parsed when workflow expects Pydantic model."""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
from pydantic import BaseModel
|
|
|
|
class WorkflowInput(BaseModel):
|
|
input: str
|
|
metadata: dict | None = None
|
|
|
|
class PydanticInputExecutor(Executor):
|
|
"""Executor that accepts a Pydantic model input."""
|
|
|
|
@handler
|
|
async def process(self, data: WorkflowInput, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"Got: {data.input}")
|
|
|
|
# Build workflow with Pydantic input type
|
|
workflow = WorkflowBuilder(
|
|
name="Pydantic Workflow",
|
|
description="Accepts Pydantic input",
|
|
start_executor=PydanticInputExecutor(id="pydantic_exec"),
|
|
).build()
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Simulate frontend sending JSON.stringify({"input": "testing!", "metadata": {"key": "value"}})
|
|
stringified_json = '{"input": "testing!", "metadata": {"key": "value"}}'
|
|
|
|
parsed = executor._parse_raw_workflow_input(workflow, stringified_json)
|
|
|
|
# Should parse into WorkflowInput object
|
|
assert isinstance(parsed, WorkflowInput)
|
|
assert parsed.input == "testing!"
|
|
assert parsed.metadata == {"key": "value"}
|
|
|
|
|
|
def test_extract_workflow_hil_responses_handles_stringified_json():
|
|
"""Test HIL response extraction handles both stringified and parsed JSON (regression test)."""
|
|
from agent_framework_devui._discovery import EntityDiscovery
|
|
from agent_framework_devui._executor import AgentFrameworkExecutor
|
|
from agent_framework_devui._mapper import MessageMapper
|
|
|
|
executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper())
|
|
|
|
# Regression test: Frontend sends stringified JSON via streamWorkflowExecutionOpenAI
|
|
stringified = '[{"type":"message","content":[{"type":"workflow_hil_response","responses":{"req_1":"spam"}}]}]'
|
|
assert executor._extract_workflow_hil_responses(stringified) == {"req_1": "spam"}
|
|
|
|
# Ensure parsed format still works
|
|
parsed = [{"type": "message", "content": [{"type": "workflow_hil_response", "responses": {"req_2": "ham"}}]}]
|
|
assert executor._extract_workflow_hil_responses(parsed) == {"req_2": "ham"}
|
|
|
|
# Non-HIL inputs should return None
|
|
assert executor._extract_workflow_hil_responses("plain text") is None
|
|
assert executor._extract_workflow_hil_responses({"email": "test"}) is None
|
|
|
|
|
|
async def test_executor_handles_streaming_agent():
|
|
"""Test executor handles agents with run(stream=True) method."""
|
|
from agent_framework import AgentResponse, AgentResponseUpdate, AgentSession, Content, Message
|
|
|
|
class StreamingAgent:
|
|
"""Agent with run() method supporting stream parameter."""
|
|
|
|
id = "streaming_test"
|
|
name = "Streaming Test Agent"
|
|
description = "Test agent with run(stream=True)"
|
|
|
|
def run(self, messages=None, *, stream=False, session=None, **kwargs):
|
|
if stream:
|
|
# Return an async generator for streaming
|
|
return self._stream_impl(messages)
|
|
# Return awaitable for non-streaming
|
|
return self._run_impl(messages)
|
|
|
|
async def _run_impl(self, messages):
|
|
return AgentResponse(
|
|
messages=[Message(role="assistant", contents=[Content.from_text(text=f"Processed: {messages}")])],
|
|
response_id="test_123",
|
|
)
|
|
|
|
async def _stream_impl(self, messages):
|
|
yield AgentResponseUpdate(
|
|
contents=[Content.from_text(text=f"Processed: {messages}")],
|
|
role="assistant",
|
|
)
|
|
|
|
def create_session(self, **kwargs):
|
|
return AgentSession()
|
|
|
|
# Create executor and register agent
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
agent = StreamingAgent()
|
|
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, agent)
|
|
|
|
# Execute streaming agent (use metadata.entity_id for routing)
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="hello",
|
|
stream=True, # DevUI always streams
|
|
)
|
|
|
|
events = []
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
|
|
# Should get events from streaming agent
|
|
assert len(events) > 0
|
|
text_events = [e for e in events if hasattr(e, "type") and e.type == "response.output_text.delta"]
|
|
assert len(text_events) > 0
|
|
assert "Processed: hello" in text_events[0].delta
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Tests for SequentialBuilder
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_sequential_workflow(sequential_workflow):
|
|
"""Test SequentialBuilder workflow full pipeline with JSON serialization.
|
|
|
|
Uses the shared sequential_workflow fixture (Writer → Reviewer) from conftest.
|
|
Tests that all events can be JSON serialized for SSE streaming.
|
|
"""
|
|
executor, entity_id, mock_client, _workflow = sequential_workflow
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Write about testing best practices",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
|
|
# Verify JSON serialization (exactly what server does for SSE)
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events from sequential workflow"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
assert mock_client.call_count >= 2, f"Expected both agents called, got {mock_client.call_count}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_concurrent_workflow(concurrent_workflow):
|
|
"""Test ConcurrentBuilder workflow full pipeline with JSON serialization.
|
|
|
|
Uses the shared concurrent_workflow fixture (Researcher | Analyst | Summarizer) from conftest.
|
|
Tests fan-out/fan-in pattern with parallel agent execution.
|
|
"""
|
|
executor, entity_id, mock_client, _workflow = concurrent_workflow
|
|
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input="Analyze market trends for Q4",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "unknown")
|
|
|
|
# Verify JSON serialization
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
json_str = event.model_dump_json()
|
|
assert json_str is not None
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events from concurrent workflow"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
assert mock_client.call_count >= 3, f"Expected all 3 agents called, got {mock_client.call_count}"
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Test for Workflow with Output Events
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_pipeline_workflow_output_event_serialization():
|
|
"""Test that output event (type='output') from ctx.yield_output() serializes correctly.
|
|
|
|
This tests the pattern where executors yield output via ctx.yield_output(),
|
|
which emits output event (type='output') that DevUI must serialize for SSE.
|
|
"""
|
|
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
|
|
|
|
class OutputtingExecutor(Executor):
|
|
"""Executor that yields multiple outputs."""
|
|
|
|
@handler
|
|
async def process(self, input_text: str, ctx: WorkflowContext[Any, Any]) -> None:
|
|
await ctx.yield_output(f"First output: {input_text}")
|
|
await ctx.yield_output("Second output: processed")
|
|
await ctx.yield_output({"final": "result", "data": [1, 2, 3]})
|
|
|
|
# Build workflow
|
|
workflow = WorkflowBuilder(
|
|
name="Output Workflow",
|
|
description="Tests yield_output",
|
|
start_executor=OutputtingExecutor(id="outputter"),
|
|
).build()
|
|
|
|
# Create DevUI executor and register workflow
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
# Execute with streaming
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_info.id},
|
|
input="Test output events",
|
|
stream=True,
|
|
)
|
|
|
|
events = []
|
|
output_events = []
|
|
serialization_errors = []
|
|
|
|
async for event in executor.execute_streaming(request):
|
|
events.append(event)
|
|
event_type = getattr(event, "type", "")
|
|
|
|
# Track output item events
|
|
if "output_item" in event_type:
|
|
output_events.append(event)
|
|
|
|
try:
|
|
if hasattr(event, "model_dump_json"):
|
|
event.model_dump_json()
|
|
except Exception as e:
|
|
serialization_errors.append(f"Event type={event_type}: {e}")
|
|
|
|
assert len(events) > 0, "Should receive events"
|
|
assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}"
|
|
|
|
# Should have received output events for the yield_output calls
|
|
assert len(output_events) >= 3, f"Expected 3+ output events for yield_output calls, got {len(output_events)}"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Simple test runner
|
|
async def run_tests():
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
|
|
# Create test agent
|
|
agent_file = temp_path / "streaming_agent.py"
|
|
agent_file.write_text("""
|
|
class StreamingAgent:
|
|
name = "Streaming Test Agent"
|
|
description = "Test agent for streaming"
|
|
|
|
async def run(self, input_str, *, stream: bool = False, session=None, **kwargs):
|
|
if stream:
|
|
async def _stream():
|
|
for i, word in enumerate(f"Processing {input_str}".split()):
|
|
yield f"word_{i}: {word} "
|
|
return _stream()
|
|
return f"Processing {input_str}"
|
|
""")
|
|
|
|
discovery = EntityDiscovery(str(temp_path))
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
# Test discovery
|
|
entities = await executor.discover_entities()
|
|
|
|
if entities:
|
|
# Test sync execution (use metadata.entity_id for routing)
|
|
request = AgentFrameworkRequest(
|
|
metadata={"entity_id": entities[0].id},
|
|
input="test input",
|
|
stream=False,
|
|
)
|
|
|
|
await executor.execute_sync(request)
|
|
|
|
# Test streaming execution
|
|
request.stream = True
|
|
event_count = 0
|
|
async for _event in executor.execute_streaming(request):
|
|
event_count += 1
|
|
if event_count > 5: # Limit for testing
|
|
break
|
|
|
|
asyncio.run(run_tests())
|