Files
agent-framework/python/packages/devui/tests/devui/conftest.py
T
Eduard van Valkenburg b1b528e4a8 Python: [BREAKING] Remove deprecated kwargs compatibility paths (#4858)
* [BREAKING] Remove deprecated kwargs compatibility paths

Remove the deprecated kwargs compatibility shims across core agents, clients, tools, middleware, and telemetry.

Keep workflow kwargs behavior intact in this branch and follow up separately in #4850.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix PR CI fallout for kwargs removal

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR review feedback

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* updates

* Fix Azure AI CI fallout

Remove the stale _get_current_conversation_id override from the Azure AI client after the OpenAI base helper was deleted.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fixed new classes

* Fix Assistants deprecated import gating

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix integration replay regressions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Switch multi-agent hosting samples to Azure chat completions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Simplify Azure multi-agent sample config

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-27 21:00:12 +00:00

553 lines
20 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Pytest configuration and fixtures for DevUI tests.
This module provides reusable test fixtures including:
- Mock chat clients that don't require API keys
- Real workflow event classes from agent_framework
- Test agents and executors for workflow testing
- Factory functions for test data
"""
import sys
from collections.abc import AsyncIterable, Awaitable, Mapping, Sequence
from pathlib import Path
from typing import Any, Generic
import pytest
import pytest_asyncio
from agent_framework import (
Agent,
AgentResponse,
AgentResponseUpdate,
AgentSession,
BaseAgent,
BaseChatClient,
ChatResponse,
ChatResponseUpdate,
Content,
Message,
ResponseStream,
)
from agent_framework._clients import OptionsCoT
from agent_framework._workflows._agent_executor import AgentExecutorResponse
from agent_framework._workflows._events import (
WorkflowErrorDetails,
WorkflowEvent,
)
from agent_framework.orchestrations import ConcurrentBuilder, SequentialBuilder
from agent_framework_devui._discovery import EntityDiscovery
from agent_framework_devui._executor import AgentFrameworkExecutor
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
if sys.version_info >= (3, 12):
from typing import override # type: ignore # pragma: no cover
else:
from typing_extensions import override # type: ignore[import] # pragma: no cover
# =============================================================================
# Mock Chat Clients (from core tests pattern)
# =============================================================================
class MockChatClient:
"""Simple mock chat client that doesn't require API keys.
Configure responses by setting `responses` or `streaming_responses` lists.
"""
def __init__(self) -> None:
self.additional_properties: dict[str, Any] = {}
self.call_count: int = 0
self.responses: list[ChatResponse] = []
self.streaming_responses: list[list[ChatResponseUpdate]] = []
async def get_response(
self,
messages: str | Message | list[str] | list[Message],
**kwargs: Any,
) -> ChatResponse:
self.call_count += 1
if self.responses:
return self.responses.pop(0)
return ChatResponse(messages=Message("assistant", ["test response"]))
async def get_streaming_response(
self,
messages: str | Message | list[str] | list[Message],
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
self.call_count += 1
if self.streaming_responses:
for update in self.streaming_responses.pop(0):
yield update
else:
yield ChatResponseUpdate(contents=[Content.from_text(text="test streaming response")], role="assistant")
class MockBaseChatClient(BaseChatClient[OptionsCoT], Generic[OptionsCoT]):
"""Full BaseChatClient mock with middleware support.
Use this when testing features that require the full BaseChatClient interface.
This goes through all the middleware, message normalization, etc. - only the
actual LLM call is mocked.
"""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.run_responses: list[ChatResponse] = []
self.streaming_responses: list[list[ChatResponseUpdate]] = []
self.call_count: int = 0
self.received_messages: list[list[Message]] = []
@override
def _inner_get_response(
self,
*,
messages: Sequence[Message],
stream: bool,
options: Mapping[str, Any],
**kwargs: Any,
) -> Awaitable[ChatResponse] | ResponseStream[ChatResponseUpdate, ChatResponse]:
if stream:
return self._build_response_stream(self._stream_impl(messages))
async def _get() -> ChatResponse:
self.call_count += 1
self.received_messages.append(list(messages))
if self.run_responses:
return self.run_responses.pop(0)
return ChatResponse(messages=Message("assistant", ["Mock response from Agent"]))
return _get()
async def _stream_impl(self, messages: Sequence[Message]) -> AsyncIterable[ChatResponseUpdate]:
self.call_count += 1
self.received_messages.append(list(messages))
if self.streaming_responses:
for update in self.streaming_responses.pop(0):
yield update
else:
# Simulate realistic streaming chunks
yield ChatResponseUpdate(contents=[Content.from_text(text="Mock ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="streaming ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="response ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="from Agent")], role="assistant")
# =============================================================================
# Mock Agents (for workflow testing without API keys)
# =============================================================================
class MockAgent(BaseAgent):
"""Mock agent that returns configurable responses without needing a chat client."""
def __init__(
self,
response_text: str = "Mock agent response",
streaming_chunks: list[str] | None = None,
**kwargs: Any,
):
super().__init__(**kwargs)
self.response_text = response_text
self.streaming_chunks = streaming_chunks or [response_text]
self.call_count = 0
def run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
self.call_count += 1
if stream:
return self._run_stream(messages=messages, session=session, **kwargs)
return self._run(messages=messages, session=session, **kwargs)
async def _run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AgentResponse:
self.call_count += 1
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=self.response_text)])])
def _run_stream(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]:
self.call_count += 1
async def _iter():
for chunk in self.streaming_chunks:
yield AgentResponseUpdate(contents=[Content.from_text(text=chunk)], role="assistant")
return ResponseStream(_iter(), finalizer=AgentResponse.from_updates)
class MockToolCallingAgent(BaseAgent):
"""Mock agent that simulates tool calls and results in streaming mode."""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.call_count = 0
def run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
self.call_count += 1
if stream:
return self._run_stream(messages=messages, session=session, **kwargs)
return self._run(messages=messages, session=session, **kwargs)
async def _run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AgentResponse:
return AgentResponse(messages=[Message("assistant", ["done"])])
def _run_stream(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]:
async def _iter() -> AsyncIterable[AgentResponseUpdate]:
# First: text
yield AgentResponseUpdate(
contents=[Content.from_text(text="Let me search for that...")],
role="assistant",
)
# Second: tool call
yield AgentResponseUpdate(
contents=[
Content.from_function_call(
call_id="call_123",
name="search",
arguments={"query": "weather"},
)
],
role="assistant",
)
# Third: tool result
yield AgentResponseUpdate(
contents=[
Content.from_function_result(
call_id="call_123",
result={"temperature": 72, "condition": "sunny"},
)
],
role="tool",
)
# Fourth: final text
yield AgentResponseUpdate(
contents=[Content.from_text(text="The weather is sunny, 72°F.")],
role="assistant",
)
return ResponseStream(_iter(), finalizer=AgentResponse.from_updates)
# =============================================================================
# Helper Functions for Test Data Creation
# =============================================================================
def _create_agent_run_response(text: str = "Test response") -> AgentResponse:
"""Create an AgentResponse with the given text."""
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=text)])])
def _create_agent_executor_response(
executor_id: str = "test_executor",
response_text: str = "Executor response",
) -> AgentExecutorResponse:
"""Create an AgentExecutorResponse - the type that's nested in
executor_completed event (type='executor_completed').data."""
agent_response = _create_agent_run_response(response_text)
return AgentExecutorResponse(
executor_id=executor_id,
agent_response=agent_response,
full_conversation=[
Message("user", [Content.from_text(text="User input")]),
Message("assistant", [Content.from_text(text=response_text)]),
],
)
# =============================================================================
# Public Factory Functions (for direct import in tests)
# =============================================================================
def create_agent_run_response(text: str = "Test response") -> AgentResponse:
"""Create an AgentResponse with the given text."""
return _create_agent_run_response(text)
def create_executor_invoked_event(executor_id: str = "test_executor") -> WorkflowEvent[Any]:
"""Create a WorkflowEvent(type='executor_invoked')."""
return WorkflowEvent.executor_invoked(executor_id=executor_id)
def create_executor_completed_event(
executor_id: str = "test_executor",
with_agent_response: bool = True,
) -> WorkflowEvent[Any]:
"""Create a WorkflowEvent(type='executor_completed') with realistic nested data.
This creates the exact data structure that caused the serialization bug:
WorkflowEvent.data contains AgentExecutorResponse which contains
AgentResponse and Message objects (SerializationMixin, not Pydantic).
"""
data = _create_agent_executor_response(executor_id) if with_agent_response else {"simple": "dict"}
return WorkflowEvent.executor_completed(executor_id=executor_id, data=data)
def create_executor_failed_event(
executor_id: str = "test_executor",
error_message: str = "Test error",
) -> WorkflowEvent[WorkflowErrorDetails]:
"""Create a WorkflowEvent(type='executor_failed')."""
details = WorkflowErrorDetails(error_type="TestError", message=error_message)
return WorkflowEvent.executor_failed(executor_id=executor_id, details=details)
# =============================================================================
# Pytest Fixtures
# =============================================================================
@pytest.fixture
def mapper() -> MessageMapper:
"""Create a fresh MessageMapper for each test."""
return MessageMapper()
@pytest.fixture
def test_request() -> AgentFrameworkRequest:
"""Create a standard test request."""
return AgentFrameworkRequest(
metadata={"entity_id": "test_agent"},
input="Test input",
stream=True,
)
@pytest.fixture
def mock_chat_client() -> MockChatClient:
"""Create a mock chat client."""
return MockChatClient()
@pytest.fixture
def mock_base_chat_client() -> MockBaseChatClient:
"""Create a mock BaseChatClient."""
return MockBaseChatClient()
@pytest.fixture
def mock_agent() -> MockAgent:
"""Create a mock agent."""
return MockAgent(id="test_agent", name="TestAgent", response_text="Mock agent response")
@pytest.fixture
def mock_tool_agent() -> MockToolCallingAgent:
"""Create a mock agent that simulates tool calls."""
return MockToolCallingAgent(id="tool_agent", name="ToolAgent")
@pytest.fixture
def agent_run_response() -> AgentResponse:
"""Create an AgentResponse with default text."""
return _create_agent_run_response()
@pytest.fixture
def executor_completed_event() -> WorkflowEvent[Any]:
"""Create a WorkflowEvent(type='executor_completed') with realistic nested data.
This creates the exact data structure that caused the serialization bug:
executor_completed event (type='executor_completed').data contains AgentExecutorResponse which contains
AgentResponse and Message objects (SerializationMixin, not Pydantic).
"""
data = _create_agent_executor_response("test_executor")
return WorkflowEvent.executor_completed(executor_id="test_executor", data=data)
@pytest.fixture
def executor_invoked_event() -> WorkflowEvent[Any]:
"""Create a WorkflowEvent(type='executor_invoked')."""
return WorkflowEvent.executor_invoked(executor_id="test_executor")
@pytest.fixture
def executor_failed_event() -> WorkflowEvent[WorkflowErrorDetails]:
"""Create a WorkflowEvent(type='executor_failed')."""
details = WorkflowErrorDetails(error_type="TestError", message="Test error")
return WorkflowEvent.executor_failed(executor_id="test_executor", details=details)
@pytest.fixture
def test_entities_dir() -> str:
"""Use the samples directory which has proper entity structure."""
current_dir = Path(__file__).parent
# Navigate to python/samples/02-agents/devui
samples_dir = current_dir.parent.parent.parent.parent / "samples" / "02-agents" / "devui"
return str(samples_dir.resolve())
# =============================================================================
# Async Fixtures for Executor/Workflow Setup
# =============================================================================
@pytest_asyncio.fixture
async def executor_with_real_agent() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient]:
"""Create an executor with a REAL Agent using mock chat client.
This tests the full execution pipeline:
- Real Agent class
- Real message handling and normalization
- Real middleware pipeline
- Only the LLM call is mocked
Returns tuple of (executor, entity_id, mock_client) so tests can access all components.
"""
mock_client = MockBaseChatClient()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Create a REAL Agent with mock client
agent = Agent(
id="test_chat_agent",
name="Test Chat Agent",
description="A real Agent for testing execution flow",
client=mock_client,
instructions="You are a helpful test assistant.",
)
# Register the real agent
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
discovery.register_entity(entity_info.id, entity_info, agent)
return executor, entity_info.id, mock_client
@pytest_asyncio.fixture
async def sequential_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
"""Create a realistic sequential workflow (Writer -> Reviewer).
This provides a reusable multi-agent workflow that:
- Chains 2 ChatAgents sequentially
- Writer generates content, Reviewer provides feedback
- Pre-configures mock responses for both agents
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
"""
mock_client = MockBaseChatClient()
mock_client.run_responses = [
ChatResponse(messages=Message("assistant", ["Here's the draft content about the topic."])),
ChatResponse(messages=Message("assistant", ["Review: Content is clear and well-structured."])),
]
writer = Agent(
id="writer",
name="Writer",
description="Content writer agent",
client=mock_client,
instructions="You are a content writer. Create clear, engaging content.",
)
reviewer = Agent(
id="reviewer",
name="Reviewer",
description="Content reviewer agent",
client=mock_client,
instructions="You are a reviewer. Provide constructive feedback.",
)
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
return executor, entity_info.id, mock_client, workflow
@pytest_asyncio.fixture
async def concurrent_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
"""Create a realistic concurrent workflow (Researcher | Analyst | Summarizer).
This provides a reusable fan-out/fan-in workflow that:
- Runs 3 ChatAgents in parallel
- Each agent processes the same input independently
- Pre-configures mock responses for all agents
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
"""
mock_client = MockBaseChatClient()
mock_client.run_responses = [
ChatResponse(messages=Message("assistant", ["Research findings: Key data points identified."])),
ChatResponse(messages=Message("assistant", ["Analysis: Trends indicate positive growth."])),
ChatResponse(messages=Message("assistant", ["Summary: Overall outlook is favorable."])),
]
researcher = Agent(
id="researcher",
name="Researcher",
description="Research agent",
client=mock_client,
instructions="You are a researcher. Find key data and insights.",
)
analyst = Agent(
id="analyst",
name="Analyst",
description="Analysis agent",
client=mock_client,
instructions="You are an analyst. Identify trends and patterns.",
)
summarizer = Agent(
id="summarizer",
name="Summarizer",
description="Summary agent",
client=mock_client,
instructions="You are a summarizer. Provide concise summaries.",
)
workflow = ConcurrentBuilder(participants=[researcher, analyst, summarizer]).build()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
return executor, entity_info.id, mock_client, workflow