mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
8c6b12e664
* fix message serialization issue, improve tests for devui * update tests
468 lines
16 KiB
Python
468 lines
16 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Shared test utilities for DevUI tests.
|
|
|
|
This module provides reusable test helpers including:
|
|
- Mock chat clients that don't require API keys
|
|
- Real workflow event classes from agent_framework
|
|
- Test agents and executors for workflow testing
|
|
- Factory functions for test data
|
|
|
|
These follow the patterns established in other agent_framework packages
|
|
(like a2a, ag-ui) which use explicit imports instead of conftest.py
|
|
to avoid pytest plugin conflicts when running tests across packages.
|
|
"""
|
|
|
|
from collections.abc import AsyncIterable, MutableSequence
|
|
from typing import Any
|
|
|
|
from agent_framework import (
|
|
AgentRunResponse,
|
|
AgentRunResponseUpdate,
|
|
AgentThread,
|
|
BaseAgent,
|
|
BaseChatClient,
|
|
ChatAgent,
|
|
ChatMessage,
|
|
ChatOptions,
|
|
ChatResponse,
|
|
ChatResponseUpdate,
|
|
ConcurrentBuilder,
|
|
FunctionCallContent,
|
|
FunctionResultContent,
|
|
Role,
|
|
SequentialBuilder,
|
|
TextContent,
|
|
use_chat_middleware,
|
|
)
|
|
from agent_framework._workflows._agent_executor import AgentExecutorResponse
|
|
|
|
# Import real workflow event classes - NOT mocks!
|
|
from agent_framework._workflows._events import (
|
|
ExecutorCompletedEvent,
|
|
ExecutorFailedEvent,
|
|
ExecutorInvokedEvent,
|
|
WorkflowErrorDetails,
|
|
)
|
|
|
|
from agent_framework_devui._discovery import EntityDiscovery
|
|
from agent_framework_devui._executor import AgentFrameworkExecutor
|
|
from agent_framework_devui._mapper import MessageMapper
|
|
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
|
|
|
|
# =============================================================================
|
|
# Mock Chat Clients (from core tests pattern)
|
|
# =============================================================================
|
|
|
|
|
|
class MockChatClient:
|
|
"""Simple mock chat client that doesn't require API keys.
|
|
|
|
Configure responses by setting `responses` or `streaming_responses` lists.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.additional_properties: dict[str, Any] = {}
|
|
self.call_count: int = 0
|
|
self.responses: list[ChatResponse] = []
|
|
self.streaming_responses: list[list[ChatResponseUpdate]] = []
|
|
|
|
async def get_response(
|
|
self,
|
|
messages: str | ChatMessage | list[str] | list[ChatMessage],
|
|
**kwargs: Any,
|
|
) -> ChatResponse:
|
|
self.call_count += 1
|
|
if self.responses:
|
|
return self.responses.pop(0)
|
|
return ChatResponse(messages=ChatMessage(role="assistant", text="test response"))
|
|
|
|
async def get_streaming_response(
|
|
self,
|
|
messages: str | ChatMessage | list[str] | list[ChatMessage],
|
|
**kwargs: Any,
|
|
) -> AsyncIterable[ChatResponseUpdate]:
|
|
self.call_count += 1
|
|
if self.streaming_responses:
|
|
for update in self.streaming_responses.pop(0):
|
|
yield update
|
|
else:
|
|
yield ChatResponseUpdate(text=TextContent(text="test streaming response"), role="assistant")
|
|
|
|
|
|
@use_chat_middleware
|
|
class MockBaseChatClient(BaseChatClient):
|
|
"""Full BaseChatClient mock with middleware support.
|
|
|
|
Use this when testing features that require the full BaseChatClient interface.
|
|
This goes through all the middleware, message normalization, etc. - only the
|
|
actual LLM call is mocked.
|
|
"""
|
|
|
|
def __init__(self, **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
self.run_responses: list[ChatResponse] = []
|
|
self.streaming_responses: list[list[ChatResponseUpdate]] = []
|
|
self.call_count: int = 0
|
|
self.received_messages: list[list[ChatMessage]] = []
|
|
|
|
async def _inner_get_response(
|
|
self,
|
|
*,
|
|
messages: MutableSequence[ChatMessage],
|
|
chat_options: ChatOptions,
|
|
**kwargs: Any,
|
|
) -> ChatResponse:
|
|
self.call_count += 1
|
|
self.received_messages.append(list(messages))
|
|
if self.run_responses:
|
|
return self.run_responses.pop(0)
|
|
return ChatResponse(messages=ChatMessage(role="assistant", text="Mock response from ChatAgent"))
|
|
|
|
async def _inner_get_streaming_response(
|
|
self,
|
|
*,
|
|
messages: MutableSequence[ChatMessage],
|
|
chat_options: ChatOptions,
|
|
**kwargs: Any,
|
|
) -> AsyncIterable[ChatResponseUpdate]:
|
|
self.call_count += 1
|
|
self.received_messages.append(list(messages))
|
|
if self.streaming_responses:
|
|
for update in self.streaming_responses.pop(0):
|
|
yield update
|
|
else:
|
|
# Simulate realistic streaming chunks
|
|
yield ChatResponseUpdate(text=TextContent(text="Mock "), role="assistant")
|
|
yield ChatResponseUpdate(text=TextContent(text="streaming "), role="assistant")
|
|
yield ChatResponseUpdate(text=TextContent(text="response "), role="assistant")
|
|
yield ChatResponseUpdate(text=TextContent(text="from ChatAgent"), role="assistant")
|
|
|
|
|
|
# =============================================================================
|
|
# Mock Agents (for workflow testing without API keys)
|
|
# =============================================================================
|
|
|
|
|
|
class MockAgent(BaseAgent):
|
|
"""Mock agent that returns configurable responses without needing a chat client."""
|
|
|
|
def __init__(
|
|
self,
|
|
response_text: str = "Mock agent response",
|
|
streaming_chunks: list[str] | None = None,
|
|
**kwargs: Any,
|
|
):
|
|
super().__init__(**kwargs)
|
|
self.response_text = response_text
|
|
self.streaming_chunks = streaming_chunks or [response_text]
|
|
self.call_count = 0
|
|
|
|
async def run(
|
|
self,
|
|
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
|
|
*,
|
|
thread: AgentThread | None = None,
|
|
**kwargs: Any,
|
|
) -> AgentRunResponse:
|
|
self.call_count += 1
|
|
return AgentRunResponse(
|
|
messages=[ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=self.response_text)])]
|
|
)
|
|
|
|
async def run_stream(
|
|
self,
|
|
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
|
|
*,
|
|
thread: AgentThread | None = None,
|
|
**kwargs: Any,
|
|
) -> AsyncIterable[AgentRunResponseUpdate]:
|
|
self.call_count += 1
|
|
for chunk in self.streaming_chunks:
|
|
yield AgentRunResponseUpdate(contents=[TextContent(text=chunk)], role=Role.ASSISTANT)
|
|
|
|
|
|
class MockToolCallingAgent(BaseAgent):
|
|
"""Mock agent that simulates tool calls and results in streaming mode."""
|
|
|
|
def __init__(self, **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
self.call_count = 0
|
|
|
|
async def run(
|
|
self,
|
|
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
|
|
*,
|
|
thread: AgentThread | None = None,
|
|
**kwargs: Any,
|
|
) -> AgentRunResponse:
|
|
self.call_count += 1
|
|
return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="done")])
|
|
|
|
async def run_stream(
|
|
self,
|
|
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
|
|
*,
|
|
thread: AgentThread | None = None,
|
|
**kwargs: Any,
|
|
) -> AsyncIterable[AgentRunResponseUpdate]:
|
|
self.call_count += 1
|
|
# First: text
|
|
yield AgentRunResponseUpdate(
|
|
contents=[TextContent(text="Let me search for that...")],
|
|
role=Role.ASSISTANT,
|
|
)
|
|
# Second: tool call
|
|
yield AgentRunResponseUpdate(
|
|
contents=[
|
|
FunctionCallContent(
|
|
call_id="call_123",
|
|
name="search",
|
|
arguments={"query": "weather"},
|
|
)
|
|
],
|
|
role=Role.ASSISTANT,
|
|
)
|
|
# Third: tool result
|
|
yield AgentRunResponseUpdate(
|
|
contents=[
|
|
FunctionResultContent(
|
|
call_id="call_123",
|
|
result={"temperature": 72, "condition": "sunny"},
|
|
)
|
|
],
|
|
role=Role.TOOL,
|
|
)
|
|
# Fourth: final text
|
|
yield AgentRunResponseUpdate(
|
|
contents=[TextContent(text="The weather is sunny, 72°F.")],
|
|
role=Role.ASSISTANT,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Factory Functions for Test Data
|
|
# =============================================================================
|
|
|
|
|
|
def create_mapper() -> MessageMapper:
|
|
"""Create a fresh MessageMapper."""
|
|
return MessageMapper()
|
|
|
|
|
|
def create_test_request(
|
|
entity_id: str = "test_agent",
|
|
input_text: str = "Test input",
|
|
stream: bool = True,
|
|
) -> AgentFrameworkRequest:
|
|
"""Create a standard test request."""
|
|
return AgentFrameworkRequest(
|
|
metadata={"entity_id": entity_id},
|
|
input=input_text,
|
|
stream=stream,
|
|
)
|
|
|
|
|
|
def create_mock_chat_client() -> MockChatClient:
|
|
"""Create a mock chat client."""
|
|
return MockChatClient()
|
|
|
|
|
|
def create_mock_base_chat_client() -> MockBaseChatClient:
|
|
"""Create a mock BaseChatClient."""
|
|
return MockBaseChatClient()
|
|
|
|
|
|
def create_mock_agent(
|
|
id: str = "test_agent",
|
|
name: str = "TestAgent",
|
|
response_text: str = "Mock agent response",
|
|
) -> MockAgent:
|
|
"""Create a mock agent."""
|
|
return MockAgent(id=id, name=name, response_text=response_text)
|
|
|
|
|
|
def create_mock_tool_agent(id: str = "tool_agent", name: str = "ToolAgent") -> MockToolCallingAgent:
|
|
"""Create a mock agent that simulates tool calls."""
|
|
return MockToolCallingAgent(id=id, name=name)
|
|
|
|
|
|
def create_agent_run_response(text: str = "Test response") -> AgentRunResponse:
|
|
"""Create an AgentRunResponse with the given text."""
|
|
return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=text)])])
|
|
|
|
|
|
def create_agent_executor_response(
|
|
executor_id: str = "test_executor",
|
|
response_text: str = "Executor response",
|
|
) -> AgentExecutorResponse:
|
|
"""Create an AgentExecutorResponse - the type that's nested in ExecutorCompletedEvent.data."""
|
|
agent_response = create_agent_run_response(response_text)
|
|
return AgentExecutorResponse(
|
|
executor_id=executor_id,
|
|
agent_run_response=agent_response,
|
|
full_conversation=[
|
|
ChatMessage(role=Role.USER, contents=[TextContent(text="User input")]),
|
|
ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)]),
|
|
],
|
|
)
|
|
|
|
|
|
def create_executor_completed_event(
|
|
executor_id: str = "test_executor",
|
|
with_agent_response: bool = True,
|
|
) -> ExecutorCompletedEvent:
|
|
"""Create an ExecutorCompletedEvent with realistic nested data.
|
|
|
|
This creates the exact data structure that caused the serialization bug:
|
|
ExecutorCompletedEvent.data contains AgentExecutorResponse which contains
|
|
AgentRunResponse and ChatMessage objects (SerializationMixin, not Pydantic).
|
|
"""
|
|
data = create_agent_executor_response(executor_id) if with_agent_response else {"simple": "dict"}
|
|
return ExecutorCompletedEvent(executor_id=executor_id, data=data)
|
|
|
|
|
|
def create_executor_invoked_event(executor_id: str = "test_executor") -> ExecutorInvokedEvent:
|
|
"""Create an ExecutorInvokedEvent."""
|
|
return ExecutorInvokedEvent(executor_id=executor_id)
|
|
|
|
|
|
def create_executor_failed_event(
|
|
executor_id: str = "test_executor",
|
|
error_message: str = "Test error",
|
|
) -> ExecutorFailedEvent:
|
|
"""Create an ExecutorFailedEvent."""
|
|
details = WorkflowErrorDetails(error_type="TestError", message=error_message)
|
|
return ExecutorFailedEvent(executor_id=executor_id, details=details)
|
|
|
|
|
|
# =============================================================================
|
|
# Workflow Setup Helpers (async factory functions)
|
|
# =============================================================================
|
|
|
|
|
|
async def create_executor_with_real_agent() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient]:
|
|
"""Create an executor with a REAL ChatAgent using mock chat client.
|
|
|
|
This tests the full execution pipeline:
|
|
- Real ChatAgent class
|
|
- Real message handling and normalization
|
|
- Real middleware pipeline
|
|
- Only the LLM call is mocked
|
|
|
|
Returns tuple of (executor, entity_id, mock_client) so tests can access all components.
|
|
"""
|
|
mock_client = MockBaseChatClient()
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
# Create a REAL ChatAgent with mock client
|
|
agent = ChatAgent(
|
|
id="test_chat_agent",
|
|
name="Test Chat Agent",
|
|
description="A real ChatAgent for testing execution flow",
|
|
chat_client=mock_client,
|
|
system_message="You are a helpful test assistant.",
|
|
)
|
|
|
|
# Register the real agent
|
|
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, agent)
|
|
|
|
return executor, entity_info.id, mock_client
|
|
|
|
|
|
async def create_sequential_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
|
|
"""Create a realistic sequential workflow (Writer -> Reviewer).
|
|
|
|
This provides a reusable multi-agent workflow that:
|
|
- Chains 2 ChatAgents sequentially
|
|
- Writer generates content, Reviewer provides feedback
|
|
- Pre-configures mock responses for both agents
|
|
|
|
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
|
|
"""
|
|
mock_client = MockBaseChatClient()
|
|
mock_client.run_responses = [
|
|
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Here's the draft content about the topic.")),
|
|
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Review: Content is clear and well-structured.")),
|
|
]
|
|
|
|
writer = ChatAgent(
|
|
id="writer",
|
|
name="Writer",
|
|
description="Content writer agent",
|
|
chat_client=mock_client,
|
|
system_message="You are a content writer. Create clear, engaging content.",
|
|
)
|
|
reviewer = ChatAgent(
|
|
id="reviewer",
|
|
name="Reviewer",
|
|
description="Content reviewer agent",
|
|
chat_client=mock_client,
|
|
system_message="You are a reviewer. Provide constructive feedback.",
|
|
)
|
|
|
|
workflow = SequentialBuilder().participants([writer, reviewer]).build()
|
|
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
return executor, entity_info.id, mock_client, workflow
|
|
|
|
|
|
async def create_concurrent_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
|
|
"""Create a realistic concurrent workflow (Researcher | Analyst | Summarizer).
|
|
|
|
This provides a reusable fan-out/fan-in workflow that:
|
|
- Runs 3 ChatAgents in parallel
|
|
- Each agent processes the same input independently
|
|
- Pre-configures mock responses for all agents
|
|
|
|
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
|
|
"""
|
|
mock_client = MockBaseChatClient()
|
|
mock_client.run_responses = [
|
|
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Research findings: Key data points identified.")),
|
|
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Analysis: Trends indicate positive growth.")),
|
|
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Summary: Overall outlook is favorable.")),
|
|
]
|
|
|
|
researcher = ChatAgent(
|
|
id="researcher",
|
|
name="Researcher",
|
|
description="Research agent",
|
|
chat_client=mock_client,
|
|
system_message="You are a researcher. Find key data and insights.",
|
|
)
|
|
analyst = ChatAgent(
|
|
id="analyst",
|
|
name="Analyst",
|
|
description="Analysis agent",
|
|
chat_client=mock_client,
|
|
system_message="You are an analyst. Identify trends and patterns.",
|
|
)
|
|
summarizer = ChatAgent(
|
|
id="summarizer",
|
|
name="Summarizer",
|
|
description="Summary agent",
|
|
chat_client=mock_client,
|
|
system_message="You are a summarizer. Provide concise summaries.",
|
|
)
|
|
|
|
workflow = ConcurrentBuilder().participants([researcher, analyst, summarizer]).build()
|
|
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
return executor, entity_info.id, mock_client, workflow
|