Files
agent-framework/python/packages/devui/tests/test_helpers.py
T
Eduard van Valkenburg 838a7fd61d Python: [BREAKING] Types API Review improvements (#3647)
* Replace Role and FinishReason classes with NewType + Literal

- Remove EnumLike metaclass from _types.py
- Replace Role class with NewType('Role', str) + RoleLiteral
- Replace FinishReason class with NewType('FinishReason', str) + FinishReasonLiteral
- Update all usages across codebase to use string literals
- Remove .value access patterns (direct string comparison now works)
- Add backward compatibility for legacy dict serialization format
- Update tests to reflect new string-based types

Addresses #3591, #3615

* Simplify ChatResponse and AgentResponse type hints (#3592)

- Remove overloads from ChatResponse.__init__
- Remove text parameter from ChatResponse.__init__
- Remove | dict[str, Any] from finish_reason and usage_details params
- Remove **kwargs from AgentResponse.__init__
- Both now accept ChatMessage | Sequence[ChatMessage] | None for messages
- Update docstrings and examples to reflect changes
- Fix tests that were using removed kwargs
- Fix Role type hint usage in ag-ui utils

* Remove text parameter from ChatResponseUpdate and AgentResponseUpdate (#3597)

- Remove text parameter from ChatResponseUpdate.__init__
- Remove text parameter from AgentResponseUpdate.__init__
- Remove **kwargs from both update classes
- Simplify contents parameter type to Sequence[Content] | None
- Update all usages to use contents=[Content.from_text(...)] pattern
- Fix imports in test files
- Update docstrings and examples

* Rename from_chat_response_updates to from_updates (#3593)

- ChatResponse.from_chat_response_updates → ChatResponse.from_updates
- ChatResponse.from_chat_response_generator → ChatResponse.from_update_generator
- AgentResponse.from_agent_run_response_updates → AgentResponse.from_updates

* Remove try_parse_value method from ChatResponse and AgentResponse (#3595)

- Remove try_parse_value method from ChatResponse
- Remove try_parse_value method from AgentResponse
- Remove try_parse_value calls from from_updates and from_update_generator methods
- Update samples to use try/except with response.value instead
- Update tests to use response.value pattern
- Users should now use response.value with try/except for safe parsing

* Add agent_id to AgentResponse and clarify author_name documentation (#3596)

- Add agent_id parameter to AgentResponse class
- Document that author_name is on ChatMessage objects, not responses
- Update ChatResponse docstring with author_name note
- Update AgentResponse docstring with author_name note

* Simplify ChatMessage.__init__ signature (#3618)

- Make contents a positional argument accepting Sequence[Content | str]
- Auto-convert strings in contents to TextContent
- Remove overloads, keep text kwarg for backward compatibility with serialization
- Update _parse_content_list to handle string items
- Update all usages across codebase to use new format: ChatMessage("role", ["text"])

* Allow Content as input on run and get_response

- Update prepare_messages and normalize_messages to accept Content
- Update type signatures in _agents.py and _clients.py
- Add tests for Content input handling

* Fix ChatMessage usage across packages and samples

Update all remaining ChatMessage(role=..., text=...) to use new
ChatMessage('role', ['text']) signature.

* Fix Role string usage and response format parsing

- Fix redis provider: remove .value access on string literals
- Fix durabletask ensure_response_format: set _response_format before accessing .value

* Fix ollama .value and ai_model_id issues, handle None in content list

- Fix ollama _chat_client: remove .value on string literals
- Fix ollama _chat_client: rename ai_model_id to model_id
- Fix _parse_content_list: skip None values gracefully

* Fix A2AAgent type signature to include Content

* Fix Role/FinishReason NewType dict annotations and improve test coverage to 95%

* Fix mypy errors for Role/FinishReason NewType usage

* Fix Role.TOOL and Role.ASSISTANT usage in _orchestrator_helpers.py

* Fix Role NewType usage in durabletask _models.py
2026-02-04 10:13:23 +00:00

471 lines
16 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Shared test utilities for DevUI tests.
This module provides reusable test helpers including:
- Mock chat clients that don't require API keys
- Real workflow event classes from agent_framework
- Test agents and executors for workflow testing
- Factory functions for test data
These follow the patterns established in other agent_framework packages
(like a2a, ag-ui) which use explicit imports instead of conftest.py
to avoid pytest plugin conflicts when running tests across packages.
"""
import sys
from collections.abc import AsyncIterable, MutableSequence
from typing import Any, Generic
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentThread,
BaseAgent,
BaseChatClient,
ChatAgent,
ChatMessage,
ChatResponse,
ChatResponseUpdate,
ConcurrentBuilder,
Content,
SequentialBuilder,
use_chat_middleware,
)
from agent_framework._clients import TOptions_co
from agent_framework._workflows._agent_executor import AgentExecutorResponse
if sys.version_info >= (3, 12):
from typing import override # type: ignore # pragma: no cover
else:
from typing_extensions import override # type: ignore[import] # pragma: no cover
# Import real workflow event classes - NOT mocks!
from agent_framework._workflows._events import (
ExecutorCompletedEvent,
ExecutorFailedEvent,
ExecutorInvokedEvent,
WorkflowErrorDetails,
)
from agent_framework_devui._discovery import EntityDiscovery
from agent_framework_devui._executor import AgentFrameworkExecutor
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
# =============================================================================
# Mock Chat Clients (from core tests pattern)
# =============================================================================
class MockChatClient:
"""Simple mock chat client that doesn't require API keys.
Configure responses by setting `responses` or `streaming_responses` lists.
"""
def __init__(self) -> None:
self.additional_properties: dict[str, Any] = {}
self.call_count: int = 0
self.responses: list[ChatResponse] = []
self.streaming_responses: list[list[ChatResponseUpdate]] = []
async def get_response(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage],
**kwargs: Any,
) -> ChatResponse:
self.call_count += 1
if self.responses:
return self.responses.pop(0)
return ChatResponse(messages=ChatMessage("assistant", ["test response"]))
async def get_streaming_response(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage],
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
self.call_count += 1
if self.streaming_responses:
for update in self.streaming_responses.pop(0):
yield update
else:
yield ChatResponseUpdate(contents=[Content.from_text(text="test streaming response")], role="assistant")
@use_chat_middleware
class MockBaseChatClient(BaseChatClient[TOptions_co], Generic[TOptions_co]):
"""Full BaseChatClient mock with middleware support.
Use this when testing features that require the full BaseChatClient interface.
This goes through all the middleware, message normalization, etc. - only the
actual LLM call is mocked.
"""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.run_responses: list[ChatResponse] = []
self.streaming_responses: list[list[ChatResponseUpdate]] = []
self.call_count: int = 0
self.received_messages: list[list[ChatMessage]] = []
@override
async def _inner_get_response(
self,
*,
messages: MutableSequence[ChatMessage],
options: dict[str, Any],
**kwargs: Any,
) -> ChatResponse:
self.call_count += 1
self.received_messages.append(list(messages))
if self.run_responses:
return self.run_responses.pop(0)
return ChatResponse(messages=ChatMessage("assistant", ["Mock response from ChatAgent"]))
@override
async def _inner_get_streaming_response(
self,
*,
messages: MutableSequence[ChatMessage],
options: dict[str, Any],
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
self.call_count += 1
self.received_messages.append(list(messages))
if self.streaming_responses:
for update in self.streaming_responses.pop(0):
yield update
else:
# Simulate realistic streaming chunks
yield ChatResponseUpdate(contents=[Content.from_text(text="Mock ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="streaming ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="response ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="from ChatAgent")], role="assistant")
# =============================================================================
# Mock Agents (for workflow testing without API keys)
# =============================================================================
class MockAgent(BaseAgent):
"""Mock agent that returns configurable responses without needing a chat client."""
def __init__(
self,
response_text: str = "Mock agent response",
streaming_chunks: list[str] | None = None,
**kwargs: Any,
):
super().__init__(**kwargs)
self.response_text = response_text
self.streaming_chunks = streaming_chunks or [response_text]
self.call_count = 0
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponse:
self.call_count += 1
return AgentResponse(messages=[ChatMessage("assistant", [Content.from_text(text=self.response_text)])])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
self.call_count += 1
for chunk in self.streaming_chunks:
yield AgentResponseUpdate(contents=[Content.from_text(text=chunk)], role="assistant")
class MockToolCallingAgent(BaseAgent):
"""Mock agent that simulates tool calls and results in streaming mode."""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.call_count = 0
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponse:
self.call_count += 1
return AgentResponse(messages=[ChatMessage("assistant", ["done"])])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
self.call_count += 1
# First: text
yield AgentResponseUpdate(
contents=[Content.from_text(text="Let me search for that...")],
role="assistant",
)
# Second: tool call
yield AgentResponseUpdate(
contents=[
Content.from_function_call(
call_id="call_123",
name="search",
arguments={"query": "weather"},
)
],
role="assistant",
)
# Third: tool result
yield AgentResponseUpdate(
contents=[
Content.from_function_result(
call_id="call_123",
result={"temperature": 72, "condition": "sunny"},
)
],
role="tool",
)
# Fourth: final text
yield AgentResponseUpdate(
contents=[Content.from_text(text="The weather is sunny, 72°F.")],
role="assistant",
)
# =============================================================================
# Factory Functions for Test Data
# =============================================================================
def create_mapper() -> MessageMapper:
"""Create a fresh MessageMapper."""
return MessageMapper()
def create_test_request(
entity_id: str = "test_agent",
input_text: str = "Test input",
stream: bool = True,
) -> AgentFrameworkRequest:
"""Create a standard test request."""
return AgentFrameworkRequest(
metadata={"entity_id": entity_id},
input=input_text,
stream=stream,
)
def create_mock_chat_client() -> MockChatClient:
"""Create a mock chat client."""
return MockChatClient()
def create_mock_base_chat_client() -> MockBaseChatClient:
"""Create a mock BaseChatClient."""
return MockBaseChatClient()
def create_mock_agent(
id: str = "test_agent",
name: str = "TestAgent",
response_text: str = "Mock agent response",
) -> MockAgent:
"""Create a mock agent."""
return MockAgent(id=id, name=name, response_text=response_text)
def create_mock_tool_agent(id: str = "tool_agent", name: str = "ToolAgent") -> MockToolCallingAgent:
"""Create a mock agent that simulates tool calls."""
return MockToolCallingAgent(id=id, name=name)
def create_agent_run_response(text: str = "Test response") -> AgentResponse:
"""Create an AgentResponse with the given text."""
return AgentResponse(messages=[ChatMessage("assistant", [Content.from_text(text=text)])])
def create_agent_executor_response(
executor_id: str = "test_executor",
response_text: str = "Executor response",
) -> AgentExecutorResponse:
"""Create an AgentExecutorResponse - the type that's nested in ExecutorCompletedEvent.data."""
agent_response = create_agent_run_response(response_text)
return AgentExecutorResponse(
executor_id=executor_id,
agent_response=agent_response,
full_conversation=[
ChatMessage("user", [Content.from_text(text="User input")]),
ChatMessage("assistant", [Content.from_text(text=response_text)]),
],
)
def create_executor_completed_event(
executor_id: str = "test_executor",
with_agent_response: bool = True,
) -> ExecutorCompletedEvent:
"""Create an ExecutorCompletedEvent with realistic nested data.
This creates the exact data structure that caused the serialization bug:
ExecutorCompletedEvent.data contains AgentExecutorResponse which contains
AgentResponse and ChatMessage objects (SerializationMixin, not Pydantic).
"""
data = create_agent_executor_response(executor_id) if with_agent_response else {"simple": "dict"}
return ExecutorCompletedEvent(executor_id=executor_id, data=data)
def create_executor_invoked_event(executor_id: str = "test_executor") -> ExecutorInvokedEvent:
"""Create an ExecutorInvokedEvent."""
return ExecutorInvokedEvent(executor_id=executor_id)
def create_executor_failed_event(
executor_id: str = "test_executor",
error_message: str = "Test error",
) -> ExecutorFailedEvent:
"""Create an ExecutorFailedEvent."""
details = WorkflowErrorDetails(error_type="TestError", message=error_message)
return ExecutorFailedEvent(executor_id=executor_id, details=details)
# =============================================================================
# Workflow Setup Helpers (async factory functions)
# =============================================================================
async def create_executor_with_real_agent() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient]:
"""Create an executor with a REAL ChatAgent using mock chat client.
This tests the full execution pipeline:
- Real ChatAgent class
- Real message handling and normalization
- Real middleware pipeline
- Only the LLM call is mocked
Returns tuple of (executor, entity_id, mock_client) so tests can access all components.
"""
mock_client = MockBaseChatClient()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Create a REAL ChatAgent with mock client
agent = ChatAgent(
id="test_chat_agent",
name="Test Chat Agent",
description="A real ChatAgent for testing execution flow",
chat_client=mock_client,
system_message="You are a helpful test assistant.",
)
# Register the real agent
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
discovery.register_entity(entity_info.id, entity_info, agent)
return executor, entity_info.id, mock_client
async def create_sequential_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
"""Create a realistic sequential workflow (Writer -> Reviewer).
This provides a reusable multi-agent workflow that:
- Chains 2 ChatAgents sequentially
- Writer generates content, Reviewer provides feedback
- Pre-configures mock responses for both agents
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
"""
mock_client = MockBaseChatClient()
mock_client.run_responses = [
ChatResponse(messages=ChatMessage("assistant", ["Here's the draft content about the topic."])),
ChatResponse(messages=ChatMessage("assistant", ["Review: Content is clear and well-structured."])),
]
writer = ChatAgent(
id="writer",
name="Writer",
description="Content writer agent",
chat_client=mock_client,
system_message="You are a content writer. Create clear, engaging content.",
)
reviewer = ChatAgent(
id="reviewer",
name="Reviewer",
description="Content reviewer agent",
chat_client=mock_client,
system_message="You are a reviewer. Provide constructive feedback.",
)
workflow = SequentialBuilder().participants([writer, reviewer]).build()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
return executor, entity_info.id, mock_client, workflow
async def create_concurrent_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
"""Create a realistic concurrent workflow (Researcher | Analyst | Summarizer).
This provides a reusable fan-out/fan-in workflow that:
- Runs 3 ChatAgents in parallel
- Each agent processes the same input independently
- Pre-configures mock responses for all agents
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
"""
mock_client = MockBaseChatClient()
mock_client.run_responses = [
ChatResponse(messages=ChatMessage("assistant", ["Research findings: Key data points identified."])),
ChatResponse(messages=ChatMessage("assistant", ["Analysis: Trends indicate positive growth."])),
ChatResponse(messages=ChatMessage("assistant", ["Summary: Overall outlook is favorable."])),
]
researcher = ChatAgent(
id="researcher",
name="Researcher",
description="Research agent",
chat_client=mock_client,
system_message="You are a researcher. Find key data and insights.",
)
analyst = ChatAgent(
id="analyst",
name="Analyst",
description="Analysis agent",
chat_client=mock_client,
system_message="You are an analyst. Identify trends and patterns.",
)
summarizer = ChatAgent(
id="summarizer",
name="Summarizer",
description="Summary agent",
chat_client=mock_client,
system_message="You are a summarizer. Provide concise summaries.",
)
workflow = ConcurrentBuilder().participants([researcher, analyst, summarizer]).build()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
return executor, entity_info.id, mock_client, workflow