Files
agent-framework/python/packages/devui/tests/test_helpers.py
T
Victor Dibia 8c6b12e664 Python: DevUI, Fix message serialization issue, improve tests (#2674)
* fix message serialization issue, improve tests for devui

* update tests
2025-12-06 02:20:54 +00:00

468 lines
16 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Shared test utilities for DevUI tests.
This module provides reusable test helpers including:
- Mock chat clients that don't require API keys
- Real workflow event classes from agent_framework
- Test agents and executors for workflow testing
- Factory functions for test data
These follow the patterns established in other agent_framework packages
(like a2a, ag-ui) which use explicit imports instead of conftest.py
to avoid pytest plugin conflicts when running tests across packages.
"""
from collections.abc import AsyncIterable, MutableSequence
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
BaseChatClient,
ChatAgent,
ChatMessage,
ChatOptions,
ChatResponse,
ChatResponseUpdate,
ConcurrentBuilder,
FunctionCallContent,
FunctionResultContent,
Role,
SequentialBuilder,
TextContent,
use_chat_middleware,
)
from agent_framework._workflows._agent_executor import AgentExecutorResponse
# Import real workflow event classes - NOT mocks!
from agent_framework._workflows._events import (
ExecutorCompletedEvent,
ExecutorFailedEvent,
ExecutorInvokedEvent,
WorkflowErrorDetails,
)
from agent_framework_devui._discovery import EntityDiscovery
from agent_framework_devui._executor import AgentFrameworkExecutor
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
# =============================================================================
# Mock Chat Clients (from core tests pattern)
# =============================================================================
class MockChatClient:
"""Simple mock chat client that doesn't require API keys.
Configure responses by setting `responses` or `streaming_responses` lists.
"""
def __init__(self) -> None:
self.additional_properties: dict[str, Any] = {}
self.call_count: int = 0
self.responses: list[ChatResponse] = []
self.streaming_responses: list[list[ChatResponseUpdate]] = []
async def get_response(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage],
**kwargs: Any,
) -> ChatResponse:
self.call_count += 1
if self.responses:
return self.responses.pop(0)
return ChatResponse(messages=ChatMessage(role="assistant", text="test response"))
async def get_streaming_response(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage],
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
self.call_count += 1
if self.streaming_responses:
for update in self.streaming_responses.pop(0):
yield update
else:
yield ChatResponseUpdate(text=TextContent(text="test streaming response"), role="assistant")
@use_chat_middleware
class MockBaseChatClient(BaseChatClient):
"""Full BaseChatClient mock with middleware support.
Use this when testing features that require the full BaseChatClient interface.
This goes through all the middleware, message normalization, etc. - only the
actual LLM call is mocked.
"""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.run_responses: list[ChatResponse] = []
self.streaming_responses: list[list[ChatResponseUpdate]] = []
self.call_count: int = 0
self.received_messages: list[list[ChatMessage]] = []
async def _inner_get_response(
self,
*,
messages: MutableSequence[ChatMessage],
chat_options: ChatOptions,
**kwargs: Any,
) -> ChatResponse:
self.call_count += 1
self.received_messages.append(list(messages))
if self.run_responses:
return self.run_responses.pop(0)
return ChatResponse(messages=ChatMessage(role="assistant", text="Mock response from ChatAgent"))
async def _inner_get_streaming_response(
self,
*,
messages: MutableSequence[ChatMessage],
chat_options: ChatOptions,
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
self.call_count += 1
self.received_messages.append(list(messages))
if self.streaming_responses:
for update in self.streaming_responses.pop(0):
yield update
else:
# Simulate realistic streaming chunks
yield ChatResponseUpdate(text=TextContent(text="Mock "), role="assistant")
yield ChatResponseUpdate(text=TextContent(text="streaming "), role="assistant")
yield ChatResponseUpdate(text=TextContent(text="response "), role="assistant")
yield ChatResponseUpdate(text=TextContent(text="from ChatAgent"), role="assistant")
# =============================================================================
# Mock Agents (for workflow testing without API keys)
# =============================================================================
class MockAgent(BaseAgent):
"""Mock agent that returns configurable responses without needing a chat client."""
def __init__(
self,
response_text: str = "Mock agent response",
streaming_chunks: list[str] | None = None,
**kwargs: Any,
):
super().__init__(**kwargs)
self.response_text = response_text
self.streaming_chunks = streaming_chunks or [response_text]
self.call_count = 0
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
self.call_count += 1
return AgentRunResponse(
messages=[ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=self.response_text)])]
)
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
self.call_count += 1
for chunk in self.streaming_chunks:
yield AgentRunResponseUpdate(contents=[TextContent(text=chunk)], role=Role.ASSISTANT)
class MockToolCallingAgent(BaseAgent):
"""Mock agent that simulates tool calls and results in streaming mode."""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.call_count = 0
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
self.call_count += 1
return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="done")])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
self.call_count += 1
# First: text
yield AgentRunResponseUpdate(
contents=[TextContent(text="Let me search for that...")],
role=Role.ASSISTANT,
)
# Second: tool call
yield AgentRunResponseUpdate(
contents=[
FunctionCallContent(
call_id="call_123",
name="search",
arguments={"query": "weather"},
)
],
role=Role.ASSISTANT,
)
# Third: tool result
yield AgentRunResponseUpdate(
contents=[
FunctionResultContent(
call_id="call_123",
result={"temperature": 72, "condition": "sunny"},
)
],
role=Role.TOOL,
)
# Fourth: final text
yield AgentRunResponseUpdate(
contents=[TextContent(text="The weather is sunny, 72°F.")],
role=Role.ASSISTANT,
)
# =============================================================================
# Factory Functions for Test Data
# =============================================================================
def create_mapper() -> MessageMapper:
"""Create a fresh MessageMapper."""
return MessageMapper()
def create_test_request(
entity_id: str = "test_agent",
input_text: str = "Test input",
stream: bool = True,
) -> AgentFrameworkRequest:
"""Create a standard test request."""
return AgentFrameworkRequest(
metadata={"entity_id": entity_id},
input=input_text,
stream=stream,
)
def create_mock_chat_client() -> MockChatClient:
"""Create a mock chat client."""
return MockChatClient()
def create_mock_base_chat_client() -> MockBaseChatClient:
"""Create a mock BaseChatClient."""
return MockBaseChatClient()
def create_mock_agent(
id: str = "test_agent",
name: str = "TestAgent",
response_text: str = "Mock agent response",
) -> MockAgent:
"""Create a mock agent."""
return MockAgent(id=id, name=name, response_text=response_text)
def create_mock_tool_agent(id: str = "tool_agent", name: str = "ToolAgent") -> MockToolCallingAgent:
"""Create a mock agent that simulates tool calls."""
return MockToolCallingAgent(id=id, name=name)
def create_agent_run_response(text: str = "Test response") -> AgentRunResponse:
"""Create an AgentRunResponse with the given text."""
return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=text)])])
def create_agent_executor_response(
executor_id: str = "test_executor",
response_text: str = "Executor response",
) -> AgentExecutorResponse:
"""Create an AgentExecutorResponse - the type that's nested in ExecutorCompletedEvent.data."""
agent_response = create_agent_run_response(response_text)
return AgentExecutorResponse(
executor_id=executor_id,
agent_run_response=agent_response,
full_conversation=[
ChatMessage(role=Role.USER, contents=[TextContent(text="User input")]),
ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)]),
],
)
def create_executor_completed_event(
executor_id: str = "test_executor",
with_agent_response: bool = True,
) -> ExecutorCompletedEvent:
"""Create an ExecutorCompletedEvent with realistic nested data.
This creates the exact data structure that caused the serialization bug:
ExecutorCompletedEvent.data contains AgentExecutorResponse which contains
AgentRunResponse and ChatMessage objects (SerializationMixin, not Pydantic).
"""
data = create_agent_executor_response(executor_id) if with_agent_response else {"simple": "dict"}
return ExecutorCompletedEvent(executor_id=executor_id, data=data)
def create_executor_invoked_event(executor_id: str = "test_executor") -> ExecutorInvokedEvent:
"""Create an ExecutorInvokedEvent."""
return ExecutorInvokedEvent(executor_id=executor_id)
def create_executor_failed_event(
executor_id: str = "test_executor",
error_message: str = "Test error",
) -> ExecutorFailedEvent:
"""Create an ExecutorFailedEvent."""
details = WorkflowErrorDetails(error_type="TestError", message=error_message)
return ExecutorFailedEvent(executor_id=executor_id, details=details)
# =============================================================================
# Workflow Setup Helpers (async factory functions)
# =============================================================================
async def create_executor_with_real_agent() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient]:
"""Create an executor with a REAL ChatAgent using mock chat client.
This tests the full execution pipeline:
- Real ChatAgent class
- Real message handling and normalization
- Real middleware pipeline
- Only the LLM call is mocked
Returns tuple of (executor, entity_id, mock_client) so tests can access all components.
"""
mock_client = MockBaseChatClient()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Create a REAL ChatAgent with mock client
agent = ChatAgent(
id="test_chat_agent",
name="Test Chat Agent",
description="A real ChatAgent for testing execution flow",
chat_client=mock_client,
system_message="You are a helpful test assistant.",
)
# Register the real agent
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
discovery.register_entity(entity_info.id, entity_info, agent)
return executor, entity_info.id, mock_client
async def create_sequential_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
"""Create a realistic sequential workflow (Writer -> Reviewer).
This provides a reusable multi-agent workflow that:
- Chains 2 ChatAgents sequentially
- Writer generates content, Reviewer provides feedback
- Pre-configures mock responses for both agents
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
"""
mock_client = MockBaseChatClient()
mock_client.run_responses = [
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Here's the draft content about the topic.")),
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Review: Content is clear and well-structured.")),
]
writer = ChatAgent(
id="writer",
name="Writer",
description="Content writer agent",
chat_client=mock_client,
system_message="You are a content writer. Create clear, engaging content.",
)
reviewer = ChatAgent(
id="reviewer",
name="Reviewer",
description="Content reviewer agent",
chat_client=mock_client,
system_message="You are a reviewer. Provide constructive feedback.",
)
workflow = SequentialBuilder().participants([writer, reviewer]).build()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
return executor, entity_info.id, mock_client, workflow
async def create_concurrent_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
"""Create a realistic concurrent workflow (Researcher | Analyst | Summarizer).
This provides a reusable fan-out/fan-in workflow that:
- Runs 3 ChatAgents in parallel
- Each agent processes the same input independently
- Pre-configures mock responses for all agents
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
"""
mock_client = MockBaseChatClient()
mock_client.run_responses = [
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Research findings: Key data points identified.")),
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Analysis: Trends indicate positive growth.")),
ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Summary: Overall outlook is favorable.")),
]
researcher = ChatAgent(
id="researcher",
name="Researcher",
description="Research agent",
chat_client=mock_client,
system_message="You are a researcher. Find key data and insights.",
)
analyst = ChatAgent(
id="analyst",
name="Analyst",
description="Analysis agent",
chat_client=mock_client,
system_message="You are an analyst. Identify trends and patterns.",
)
summarizer = ChatAgent(
id="summarizer",
name="Summarizer",
description="Summary agent",
chat_client=mock_client,
system_message="You are a summarizer. Provide concise summaries.",
)
workflow = ConcurrentBuilder().participants([researcher, analyst, summarizer]).build()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
return executor, entity_info.id, mock_client, workflow