Files
agent-framework/python/packages/devui/tests/devui/conftest.py
T
Eduard van Valkenburg 1e350ea22f Python: [BREAKING] PR2 — Wire context provider pipeline, remove old types, update all consumers (#3850)
* PR2: Wire context provider pipeline and update all internal consumers

- Replace AgentThread with AgentSession across all packages
- Replace ContextProvider with BaseContextProvider across all packages
- Replace context_provider param with context_providers (Sequence)
- Replace thread= with session= in run() signatures
- Replace get_new_thread() with create_session()
- Add get_session(service_session_id) to agent interface
- DurableAgentThread -> DurableAgentSession
- Remove _notify_thread_of_new_messages from WorkflowAgent
- Wire before_run/after_run context provider pipeline in RawAgent
- Auto-inject InMemoryHistoryProvider when no providers configured

* fix: update all tests for context provider pipeline, fix lazy-loaders, remove old test files

* refactor: update all sample files for context provider pipeline (AgentThread→AgentSession, ContextProvider→BaseContextProvider)

* fix: update remaining ag-ui references (client docstring, getting_started sample)

* fix: make get_session service_session_id keyword-only to avoid confusion with session_id

* refactor: rename _RunContext.thread_messages to session_messages

* refactor: remove _threads.py, _memory.py, and old provider files; migrate devui to use plain message lists

* rename: remove _new_ prefix from test files

* refactor: rewrite SlidingWindowChatMessageStore as SlidingWindowHistoryProvider(InMemoryHistoryProvider)

* fix: read full history from session state directly instead of reaching into provider internals

* fix: update stale .pyi stubs, sample imports, and README references for new provider types

* fix: remove stale message_store, _notify_thread_of_new_messages, and session_id.key references in samples

* refactor: merge context_providers and sessions sample folders into sessions, remove aggregate_context_provider

* refactor: UserInfoMemory stores state in session.state instead of instance attributes

* feat: add Pydantic BaseModel support to session state serialization

Pydantic models stored in session.state are now automatically serialized
via model_dump() and restored via model_validate() during to_dict()/from_dict()
round-trips. Models are auto-registered on first serialization; use
register_state_type() for cold-start deserialization.

Also export register_state_type as a public API.

* fix mem0

* Update sample README links and descriptions for session terminology

- Replace 'thread' with 'session' in sample descriptions across all READMEs
- Update file links for renamed samples (mem0_sessions, redis_sessions, etc.)
- Fix Threads section → Sessions section in main samples/README.md
- Update tools, middleware, workflows, durabletask, azure_functions READMEs
- Update architecture diagrams in concepts/tools/README.md
- Update migration guides (autogen, semantic-kernel)

* Fix broken Redis README link to renamed sample

* Fix Mem0 OSS client search: pass scoping params as direct kwargs

AsyncMemory (OSS) expects user_id/agent_id/run_id as direct kwargs,
while AsyncMemoryClient (Platform) expects them in a filters dict.
Adds tests for both client types.

Port of fix from #3844 to new Mem0ContextProvider.

* Fix rebase issues: restore missing _conversation_state.py and checkpoint decode logic

- Add back _conversation_state.py (encode/decode_chat_messages) lost in rebase
- Fix on_checkpoint_restore to decode cache/conversation with decode_chat_messages
- Fix on_checkpoint_restore to use decode_checkpoint_value for pending requests
- Add tests/workflow/__init__.py for relative import support
- Fix test_agent_executor checkpoint selection (checkpoints[1] not superstep)

* Add STORES_BY_DEFAULT ClassVar to skip redundant InMemoryHistoryProvider injection

Chat clients that store history server-side by default (OpenAI Responses API,
Azure AI Agent) now declare STORES_BY_DEFAULT = True. The agent checks this
during auto-injection and skips InMemoryHistoryProvider unless the user
explicitly sets store=False.

* Fix broken markdown links in azure_ai and redis READMEs

* Fix getting-started samples to use session API instead of removed thread/ContextProvider API

* updates to workflow as agent

* fix group chat import

* Rename Thread→Session throughout, fix service_session_id propagation, remove stale AGUIThread

- Fix: Propagate conversation_id from ChatResponse back to session.service_session_id
  in both streaming and non-streaming paths in _agents.py
- Rename AgentThreadException → AgentSessionException
- Remove stale AGUIThread from ag_ui lazy-loader
- Rename use_service_thread → use_service_session in ag-ui package
- Rename test functions from *_thread_* to *_session_*
- Rename sample files from *_thread* to *_session*
- Update docstrings and comments: thread → session
- Update _mcp.py kwargs filter: add 'session' alongside 'thread'
- Fix ContinuationToken docstring example: thread=thread → session=session
- Fix _clients.py docstring: 'Agent threads' → 'Agent sessions'

* Fix broken markdown links after thread→session file renames

* fix azure ai test
2026-02-12 21:00:32 +00:00

553 lines
20 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Pytest configuration and fixtures for DevUI tests.
This module provides reusable test fixtures including:
- Mock chat clients that don't require API keys
- Real workflow event classes from agent_framework
- Test agents and executors for workflow testing
- Factory functions for test data
"""
import sys
from collections.abc import AsyncIterable, Awaitable, Mapping, Sequence
from pathlib import Path
from typing import Any, Generic
import pytest
import pytest_asyncio
from agent_framework import (
Agent,
AgentResponse,
AgentResponseUpdate,
AgentSession,
BaseAgent,
BaseChatClient,
ChatResponse,
ChatResponseUpdate,
Content,
Message,
ResponseStream,
)
from agent_framework._clients import OptionsCoT
from agent_framework._workflows._agent_executor import AgentExecutorResponse
from agent_framework._workflows._events import (
WorkflowErrorDetails,
WorkflowEvent,
)
from agent_framework.orchestrations import ConcurrentBuilder, SequentialBuilder
from agent_framework_devui._discovery import EntityDiscovery
from agent_framework_devui._executor import AgentFrameworkExecutor
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
if sys.version_info >= (3, 12):
from typing import override # type: ignore # pragma: no cover
else:
from typing_extensions import override # type: ignore[import] # pragma: no cover
# =============================================================================
# Mock Chat Clients (from core tests pattern)
# =============================================================================
class MockChatClient:
"""Simple mock chat client that doesn't require API keys.
Configure responses by setting `responses` or `streaming_responses` lists.
"""
def __init__(self) -> None:
self.additional_properties: dict[str, Any] = {}
self.call_count: int = 0
self.responses: list[ChatResponse] = []
self.streaming_responses: list[list[ChatResponseUpdate]] = []
async def get_response(
self,
messages: str | Message | list[str] | list[Message],
**kwargs: Any,
) -> ChatResponse:
self.call_count += 1
if self.responses:
return self.responses.pop(0)
return ChatResponse(messages=Message("assistant", ["test response"]))
async def get_streaming_response(
self,
messages: str | Message | list[str] | list[Message],
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
self.call_count += 1
if self.streaming_responses:
for update in self.streaming_responses.pop(0):
yield update
else:
yield ChatResponseUpdate(contents=[Content.from_text(text="test streaming response")], role="assistant")
class MockBaseChatClient(BaseChatClient[OptionsCoT], Generic[OptionsCoT]):
"""Full BaseChatClient mock with middleware support.
Use this when testing features that require the full BaseChatClient interface.
This goes through all the middleware, message normalization, etc. - only the
actual LLM call is mocked.
"""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.run_responses: list[ChatResponse] = []
self.streaming_responses: list[list[ChatResponseUpdate]] = []
self.call_count: int = 0
self.received_messages: list[list[Message]] = []
@override
def _inner_get_response(
self,
*,
messages: Sequence[Message],
stream: bool,
options: Mapping[str, Any],
**kwargs: Any,
) -> Awaitable[ChatResponse] | ResponseStream[ChatResponseUpdate, ChatResponse]:
if stream:
return self._build_response_stream(self._stream_impl(messages))
async def _get() -> ChatResponse:
self.call_count += 1
self.received_messages.append(list(messages))
if self.run_responses:
return self.run_responses.pop(0)
return ChatResponse(messages=Message("assistant", ["Mock response from Agent"]))
return _get()
async def _stream_impl(self, messages: Sequence[Message]) -> AsyncIterable[ChatResponseUpdate]:
self.call_count += 1
self.received_messages.append(list(messages))
if self.streaming_responses:
for update in self.streaming_responses.pop(0):
yield update
else:
# Simulate realistic streaming chunks
yield ChatResponseUpdate(contents=[Content.from_text(text="Mock ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="streaming ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="response ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="from Agent")], role="assistant")
# =============================================================================
# Mock Agents (for workflow testing without API keys)
# =============================================================================
class MockAgent(BaseAgent):
"""Mock agent that returns configurable responses without needing a chat client."""
def __init__(
self,
response_text: str = "Mock agent response",
streaming_chunks: list[str] | None = None,
**kwargs: Any,
):
super().__init__(**kwargs)
self.response_text = response_text
self.streaming_chunks = streaming_chunks or [response_text]
self.call_count = 0
def run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
self.call_count += 1
if stream:
return self._run_stream(messages=messages, session=session, **kwargs)
return self._run(messages=messages, session=session, **kwargs)
async def _run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AgentResponse:
self.call_count += 1
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=self.response_text)])])
def _run_stream(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]:
self.call_count += 1
async def _iter():
for chunk in self.streaming_chunks:
yield AgentResponseUpdate(contents=[Content.from_text(text=chunk)], role="assistant")
return ResponseStream(_iter(), finalizer=AgentResponse.from_updates)
class MockToolCallingAgent(BaseAgent):
"""Mock agent that simulates tool calls and results in streaming mode."""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.call_count = 0
def run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
self.call_count += 1
if stream:
return self._run_stream(messages=messages, session=session, **kwargs)
return self._run(messages=messages, session=session, **kwargs)
async def _run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AgentResponse:
return AgentResponse(messages=[Message("assistant", ["done"])])
def _run_stream(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]:
async def _iter() -> AsyncIterable[AgentResponseUpdate]:
# First: text
yield AgentResponseUpdate(
contents=[Content.from_text(text="Let me search for that...")],
role="assistant",
)
# Second: tool call
yield AgentResponseUpdate(
contents=[
Content.from_function_call(
call_id="call_123",
name="search",
arguments={"query": "weather"},
)
],
role="assistant",
)
# Third: tool result
yield AgentResponseUpdate(
contents=[
Content.from_function_result(
call_id="call_123",
result={"temperature": 72, "condition": "sunny"},
)
],
role="tool",
)
# Fourth: final text
yield AgentResponseUpdate(
contents=[Content.from_text(text="The weather is sunny, 72°F.")],
role="assistant",
)
return ResponseStream(_iter(), finalizer=AgentResponse.from_updates)
# =============================================================================
# Helper Functions for Test Data Creation
# =============================================================================
def _create_agent_run_response(text: str = "Test response") -> AgentResponse:
"""Create an AgentResponse with the given text."""
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=text)])])
def _create_agent_executor_response(
executor_id: str = "test_executor",
response_text: str = "Executor response",
) -> AgentExecutorResponse:
"""Create an AgentExecutorResponse - the type that's nested in
executor_completed event (type='executor_completed').data."""
agent_response = _create_agent_run_response(response_text)
return AgentExecutorResponse(
executor_id=executor_id,
agent_response=agent_response,
full_conversation=[
Message("user", [Content.from_text(text="User input")]),
Message("assistant", [Content.from_text(text=response_text)]),
],
)
# =============================================================================
# Public Factory Functions (for direct import in tests)
# =============================================================================
def create_agent_run_response(text: str = "Test response") -> AgentResponse:
"""Create an AgentResponse with the given text."""
return _create_agent_run_response(text)
def create_executor_invoked_event(executor_id: str = "test_executor") -> WorkflowEvent[Any]:
"""Create a WorkflowEvent(type='executor_invoked')."""
return WorkflowEvent.executor_invoked(executor_id=executor_id)
def create_executor_completed_event(
executor_id: str = "test_executor",
with_agent_response: bool = True,
) -> WorkflowEvent[Any]:
"""Create a WorkflowEvent(type='executor_completed') with realistic nested data.
This creates the exact data structure that caused the serialization bug:
WorkflowEvent.data contains AgentExecutorResponse which contains
AgentResponse and Message objects (SerializationMixin, not Pydantic).
"""
data = _create_agent_executor_response(executor_id) if with_agent_response else {"simple": "dict"}
return WorkflowEvent.executor_completed(executor_id=executor_id, data=data)
def create_executor_failed_event(
executor_id: str = "test_executor",
error_message: str = "Test error",
) -> WorkflowEvent[WorkflowErrorDetails]:
"""Create a WorkflowEvent(type='executor_failed')."""
details = WorkflowErrorDetails(error_type="TestError", message=error_message)
return WorkflowEvent.executor_failed(executor_id=executor_id, details=details)
# =============================================================================
# Pytest Fixtures
# =============================================================================
@pytest.fixture
def mapper() -> MessageMapper:
"""Create a fresh MessageMapper for each test."""
return MessageMapper()
@pytest.fixture
def test_request() -> AgentFrameworkRequest:
"""Create a standard test request."""
return AgentFrameworkRequest(
metadata={"entity_id": "test_agent"},
input="Test input",
stream=True,
)
@pytest.fixture
def mock_chat_client() -> MockChatClient:
"""Create a mock chat client."""
return MockChatClient()
@pytest.fixture
def mock_base_chat_client() -> MockBaseChatClient:
"""Create a mock BaseChatClient."""
return MockBaseChatClient()
@pytest.fixture
def mock_agent() -> MockAgent:
"""Create a mock agent."""
return MockAgent(id="test_agent", name="TestAgent", response_text="Mock agent response")
@pytest.fixture
def mock_tool_agent() -> MockToolCallingAgent:
"""Create a mock agent that simulates tool calls."""
return MockToolCallingAgent(id="tool_agent", name="ToolAgent")
@pytest.fixture
def agent_run_response() -> AgentResponse:
"""Create an AgentResponse with default text."""
return _create_agent_run_response()
@pytest.fixture
def executor_completed_event() -> WorkflowEvent[Any]:
"""Create a WorkflowEvent(type='executor_completed') with realistic nested data.
This creates the exact data structure that caused the serialization bug:
executor_completed event (type='executor_completed').data contains AgentExecutorResponse which contains
AgentResponse and Message objects (SerializationMixin, not Pydantic).
"""
data = _create_agent_executor_response("test_executor")
return WorkflowEvent.executor_completed(executor_id="test_executor", data=data)
@pytest.fixture
def executor_invoked_event() -> WorkflowEvent[Any]:
"""Create a WorkflowEvent(type='executor_invoked')."""
return WorkflowEvent.executor_invoked(executor_id="test_executor")
@pytest.fixture
def executor_failed_event() -> WorkflowEvent[WorkflowErrorDetails]:
"""Create a WorkflowEvent(type='executor_failed')."""
details = WorkflowErrorDetails(error_type="TestError", message="Test error")
return WorkflowEvent.executor_failed(executor_id="test_executor", details=details)
@pytest.fixture
def test_entities_dir() -> str:
"""Use the samples directory which has proper entity structure."""
current_dir = Path(__file__).parent
# Navigate to python/samples/02-agents/devui
samples_dir = current_dir.parent.parent.parent.parent / "samples" / "02-agents" / "devui"
return str(samples_dir.resolve())
# =============================================================================
# Async Fixtures for Executor/Workflow Setup
# =============================================================================
@pytest_asyncio.fixture
async def executor_with_real_agent() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient]:
"""Create an executor with a REAL Agent using mock chat client.
This tests the full execution pipeline:
- Real Agent class
- Real message handling and normalization
- Real middleware pipeline
- Only the LLM call is mocked
Returns tuple of (executor, entity_id, mock_client) so tests can access all components.
"""
mock_client = MockBaseChatClient()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
# Create a REAL Agent with mock client
agent = Agent(
id="test_chat_agent",
name="Test Chat Agent",
description="A real Agent for testing execution flow",
client=mock_client,
system_message="You are a helpful test assistant.",
)
# Register the real agent
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
discovery.register_entity(entity_info.id, entity_info, agent)
return executor, entity_info.id, mock_client
@pytest_asyncio.fixture
async def sequential_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
"""Create a realistic sequential workflow (Writer -> Reviewer).
This provides a reusable multi-agent workflow that:
- Chains 2 ChatAgents sequentially
- Writer generates content, Reviewer provides feedback
- Pre-configures mock responses for both agents
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
"""
mock_client = MockBaseChatClient()
mock_client.run_responses = [
ChatResponse(messages=Message("assistant", ["Here's the draft content about the topic."])),
ChatResponse(messages=Message("assistant", ["Review: Content is clear and well-structured."])),
]
writer = Agent(
id="writer",
name="Writer",
description="Content writer agent",
client=mock_client,
system_message="You are a content writer. Create clear, engaging content.",
)
reviewer = Agent(
id="reviewer",
name="Reviewer",
description="Content reviewer agent",
client=mock_client,
system_message="You are a reviewer. Provide constructive feedback.",
)
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
return executor, entity_info.id, mock_client, workflow
@pytest_asyncio.fixture
async def concurrent_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
"""Create a realistic concurrent workflow (Researcher | Analyst | Summarizer).
This provides a reusable fan-out/fan-in workflow that:
- Runs 3 ChatAgents in parallel
- Each agent processes the same input independently
- Pre-configures mock responses for all agents
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
"""
mock_client = MockBaseChatClient()
mock_client.run_responses = [
ChatResponse(messages=Message("assistant", ["Research findings: Key data points identified."])),
ChatResponse(messages=Message("assistant", ["Analysis: Trends indicate positive growth."])),
ChatResponse(messages=Message("assistant", ["Summary: Overall outlook is favorable."])),
]
researcher = Agent(
id="researcher",
name="Researcher",
description="Research agent",
client=mock_client,
system_message="You are a researcher. Find key data and insights.",
)
analyst = Agent(
id="analyst",
name="Analyst",
description="Analysis agent",
client=mock_client,
system_message="You are an analyst. Identify trends and patterns.",
)
summarizer = Agent(
id="summarizer",
name="Summarizer",
description="Summary agent",
client=mock_client,
system_message="You are a summarizer. Provide concise summaries.",
)
workflow = ConcurrentBuilder(participants=[researcher, analyst, summarizer]).build()
discovery = EntityDiscovery(None)
mapper = MessageMapper()
executor = AgentFrameworkExecutor(discovery, mapper)
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
discovery.register_entity(entity_info.id, entity_info, workflow)
return executor, entity_info.id, mock_client, workflow