mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
1e350ea22f
* PR2: Wire context provider pipeline and update all internal consumers - Replace AgentThread with AgentSession across all packages - Replace ContextProvider with BaseContextProvider across all packages - Replace context_provider param with context_providers (Sequence) - Replace thread= with session= in run() signatures - Replace get_new_thread() with create_session() - Add get_session(service_session_id) to agent interface - DurableAgentThread -> DurableAgentSession - Remove _notify_thread_of_new_messages from WorkflowAgent - Wire before_run/after_run context provider pipeline in RawAgent - Auto-inject InMemoryHistoryProvider when no providers configured * fix: update all tests for context provider pipeline, fix lazy-loaders, remove old test files * refactor: update all sample files for context provider pipeline (AgentThread→AgentSession, ContextProvider→BaseContextProvider) * fix: update remaining ag-ui references (client docstring, getting_started sample) * fix: make get_session service_session_id keyword-only to avoid confusion with session_id * refactor: rename _RunContext.thread_messages to session_messages * refactor: remove _threads.py, _memory.py, and old provider files; migrate devui to use plain message lists * rename: remove _new_ prefix from test files * refactor: rewrite SlidingWindowChatMessageStore as SlidingWindowHistoryProvider(InMemoryHistoryProvider) * fix: read full history from session state directly instead of reaching into provider internals * fix: update stale .pyi stubs, sample imports, and README references for new provider types * fix: remove stale message_store, _notify_thread_of_new_messages, and session_id.key references in samples * refactor: merge context_providers and sessions sample folders into sessions, remove aggregate_context_provider * refactor: UserInfoMemory stores state in session.state instead of instance attributes * feat: add Pydantic BaseModel support to session state serialization Pydantic models stored in session.state are now automatically serialized via model_dump() and restored via model_validate() during to_dict()/from_dict() round-trips. Models are auto-registered on first serialization; use register_state_type() for cold-start deserialization. Also export register_state_type as a public API. * fix mem0 * Update sample README links and descriptions for session terminology - Replace 'thread' with 'session' in sample descriptions across all READMEs - Update file links for renamed samples (mem0_sessions, redis_sessions, etc.) - Fix Threads section → Sessions section in main samples/README.md - Update tools, middleware, workflows, durabletask, azure_functions READMEs - Update architecture diagrams in concepts/tools/README.md - Update migration guides (autogen, semantic-kernel) * Fix broken Redis README link to renamed sample * Fix Mem0 OSS client search: pass scoping params as direct kwargs AsyncMemory (OSS) expects user_id/agent_id/run_id as direct kwargs, while AsyncMemoryClient (Platform) expects them in a filters dict. Adds tests for both client types. Port of fix from #3844 to new Mem0ContextProvider. * Fix rebase issues: restore missing _conversation_state.py and checkpoint decode logic - Add back _conversation_state.py (encode/decode_chat_messages) lost in rebase - Fix on_checkpoint_restore to decode cache/conversation with decode_chat_messages - Fix on_checkpoint_restore to use decode_checkpoint_value for pending requests - Add tests/workflow/__init__.py for relative import support - Fix test_agent_executor checkpoint selection (checkpoints[1] not superstep) * Add STORES_BY_DEFAULT ClassVar to skip redundant InMemoryHistoryProvider injection Chat clients that store history server-side by default (OpenAI Responses API, Azure AI Agent) now declare STORES_BY_DEFAULT = True. The agent checks this during auto-injection and skips InMemoryHistoryProvider unless the user explicitly sets store=False. * Fix broken markdown links in azure_ai and redis READMEs * Fix getting-started samples to use session API instead of removed thread/ContextProvider API * updates to workflow as agent * fix group chat import * Rename Thread→Session throughout, fix service_session_id propagation, remove stale AGUIThread - Fix: Propagate conversation_id from ChatResponse back to session.service_session_id in both streaming and non-streaming paths in _agents.py - Rename AgentThreadException → AgentSessionException - Remove stale AGUIThread from ag_ui lazy-loader - Rename use_service_thread → use_service_session in ag-ui package - Rename test functions from *_thread_* to *_session_* - Rename sample files from *_thread* to *_session* - Update docstrings and comments: thread → session - Update _mcp.py kwargs filter: add 'session' alongside 'thread' - Fix ContinuationToken docstring example: thread=thread → session=session - Fix _clients.py docstring: 'Agent threads' → 'Agent sessions' * Fix broken markdown links after thread→session file renames * fix azure ai test
553 lines
20 KiB
Python
553 lines
20 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Pytest configuration and fixtures for DevUI tests.
|
|
|
|
This module provides reusable test fixtures including:
|
|
- Mock chat clients that don't require API keys
|
|
- Real workflow event classes from agent_framework
|
|
- Test agents and executors for workflow testing
|
|
- Factory functions for test data
|
|
"""
|
|
|
|
import sys
|
|
from collections.abc import AsyncIterable, Awaitable, Mapping, Sequence
|
|
from pathlib import Path
|
|
from typing import Any, Generic
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from agent_framework import (
|
|
Agent,
|
|
AgentResponse,
|
|
AgentResponseUpdate,
|
|
AgentSession,
|
|
BaseAgent,
|
|
BaseChatClient,
|
|
ChatResponse,
|
|
ChatResponseUpdate,
|
|
Content,
|
|
Message,
|
|
ResponseStream,
|
|
)
|
|
from agent_framework._clients import OptionsCoT
|
|
from agent_framework._workflows._agent_executor import AgentExecutorResponse
|
|
from agent_framework._workflows._events import (
|
|
WorkflowErrorDetails,
|
|
WorkflowEvent,
|
|
)
|
|
from agent_framework.orchestrations import ConcurrentBuilder, SequentialBuilder
|
|
|
|
from agent_framework_devui._discovery import EntityDiscovery
|
|
from agent_framework_devui._executor import AgentFrameworkExecutor
|
|
from agent_framework_devui._mapper import MessageMapper
|
|
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
|
|
|
|
if sys.version_info >= (3, 12):
|
|
from typing import override # type: ignore # pragma: no cover
|
|
else:
|
|
from typing_extensions import override # type: ignore[import] # pragma: no cover
|
|
|
|
|
|
# =============================================================================
|
|
# Mock Chat Clients (from core tests pattern)
|
|
# =============================================================================
|
|
|
|
|
|
class MockChatClient:
|
|
"""Simple mock chat client that doesn't require API keys.
|
|
|
|
Configure responses by setting `responses` or `streaming_responses` lists.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.additional_properties: dict[str, Any] = {}
|
|
self.call_count: int = 0
|
|
self.responses: list[ChatResponse] = []
|
|
self.streaming_responses: list[list[ChatResponseUpdate]] = []
|
|
|
|
async def get_response(
|
|
self,
|
|
messages: str | Message | list[str] | list[Message],
|
|
**kwargs: Any,
|
|
) -> ChatResponse:
|
|
self.call_count += 1
|
|
if self.responses:
|
|
return self.responses.pop(0)
|
|
return ChatResponse(messages=Message("assistant", ["test response"]))
|
|
|
|
async def get_streaming_response(
|
|
self,
|
|
messages: str | Message | list[str] | list[Message],
|
|
**kwargs: Any,
|
|
) -> AsyncIterable[ChatResponseUpdate]:
|
|
self.call_count += 1
|
|
if self.streaming_responses:
|
|
for update in self.streaming_responses.pop(0):
|
|
yield update
|
|
else:
|
|
yield ChatResponseUpdate(contents=[Content.from_text(text="test streaming response")], role="assistant")
|
|
|
|
|
|
class MockBaseChatClient(BaseChatClient[OptionsCoT], Generic[OptionsCoT]):
|
|
"""Full BaseChatClient mock with middleware support.
|
|
|
|
Use this when testing features that require the full BaseChatClient interface.
|
|
This goes through all the middleware, message normalization, etc. - only the
|
|
actual LLM call is mocked.
|
|
"""
|
|
|
|
def __init__(self, **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
self.run_responses: list[ChatResponse] = []
|
|
self.streaming_responses: list[list[ChatResponseUpdate]] = []
|
|
self.call_count: int = 0
|
|
self.received_messages: list[list[Message]] = []
|
|
|
|
@override
|
|
def _inner_get_response(
|
|
self,
|
|
*,
|
|
messages: Sequence[Message],
|
|
stream: bool,
|
|
options: Mapping[str, Any],
|
|
**kwargs: Any,
|
|
) -> Awaitable[ChatResponse] | ResponseStream[ChatResponseUpdate, ChatResponse]:
|
|
if stream:
|
|
return self._build_response_stream(self._stream_impl(messages))
|
|
|
|
async def _get() -> ChatResponse:
|
|
self.call_count += 1
|
|
self.received_messages.append(list(messages))
|
|
if self.run_responses:
|
|
return self.run_responses.pop(0)
|
|
return ChatResponse(messages=Message("assistant", ["Mock response from Agent"]))
|
|
|
|
return _get()
|
|
|
|
async def _stream_impl(self, messages: Sequence[Message]) -> AsyncIterable[ChatResponseUpdate]:
|
|
self.call_count += 1
|
|
self.received_messages.append(list(messages))
|
|
if self.streaming_responses:
|
|
for update in self.streaming_responses.pop(0):
|
|
yield update
|
|
else:
|
|
# Simulate realistic streaming chunks
|
|
yield ChatResponseUpdate(contents=[Content.from_text(text="Mock ")], role="assistant")
|
|
yield ChatResponseUpdate(contents=[Content.from_text(text="streaming ")], role="assistant")
|
|
yield ChatResponseUpdate(contents=[Content.from_text(text="response ")], role="assistant")
|
|
yield ChatResponseUpdate(contents=[Content.from_text(text="from Agent")], role="assistant")
|
|
|
|
|
|
# =============================================================================
|
|
# Mock Agents (for workflow testing without API keys)
|
|
# =============================================================================
|
|
|
|
|
|
class MockAgent(BaseAgent):
|
|
"""Mock agent that returns configurable responses without needing a chat client."""
|
|
|
|
def __init__(
|
|
self,
|
|
response_text: str = "Mock agent response",
|
|
streaming_chunks: list[str] | None = None,
|
|
**kwargs: Any,
|
|
):
|
|
super().__init__(**kwargs)
|
|
self.response_text = response_text
|
|
self.streaming_chunks = streaming_chunks or [response_text]
|
|
self.call_count = 0
|
|
|
|
def run(
|
|
self,
|
|
messages: str | Message | list[str] | list[Message] | None = None,
|
|
*,
|
|
stream: bool = False,
|
|
session: AgentSession | None = None,
|
|
**kwargs: Any,
|
|
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
|
|
self.call_count += 1
|
|
if stream:
|
|
return self._run_stream(messages=messages, session=session, **kwargs)
|
|
return self._run(messages=messages, session=session, **kwargs)
|
|
|
|
async def _run(
|
|
self,
|
|
messages: str | Message | list[str] | list[Message] | None = None,
|
|
*,
|
|
session: AgentSession | None = None,
|
|
**kwargs: Any,
|
|
) -> AgentResponse:
|
|
self.call_count += 1
|
|
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=self.response_text)])])
|
|
|
|
def _run_stream(
|
|
self,
|
|
messages: str | Message | list[str] | list[Message] | None = None,
|
|
*,
|
|
session: AgentSession | None = None,
|
|
**kwargs: Any,
|
|
) -> ResponseStream[AgentResponseUpdate, AgentResponse]:
|
|
self.call_count += 1
|
|
|
|
async def _iter():
|
|
for chunk in self.streaming_chunks:
|
|
yield AgentResponseUpdate(contents=[Content.from_text(text=chunk)], role="assistant")
|
|
|
|
return ResponseStream(_iter(), finalizer=AgentResponse.from_updates)
|
|
|
|
|
|
class MockToolCallingAgent(BaseAgent):
|
|
"""Mock agent that simulates tool calls and results in streaming mode."""
|
|
|
|
def __init__(self, **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
self.call_count = 0
|
|
|
|
def run(
|
|
self,
|
|
messages: str | Message | list[str] | list[Message] | None = None,
|
|
*,
|
|
stream: bool = False,
|
|
session: AgentSession | None = None,
|
|
**kwargs: Any,
|
|
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
|
|
self.call_count += 1
|
|
if stream:
|
|
return self._run_stream(messages=messages, session=session, **kwargs)
|
|
return self._run(messages=messages, session=session, **kwargs)
|
|
|
|
async def _run(
|
|
self,
|
|
messages: str | Message | list[str] | list[Message] | None = None,
|
|
*,
|
|
session: AgentSession | None = None,
|
|
**kwargs: Any,
|
|
) -> AgentResponse:
|
|
return AgentResponse(messages=[Message("assistant", ["done"])])
|
|
|
|
def _run_stream(
|
|
self,
|
|
messages: str | Message | list[str] | list[Message] | None = None,
|
|
*,
|
|
session: AgentSession | None = None,
|
|
**kwargs: Any,
|
|
) -> ResponseStream[AgentResponseUpdate, AgentResponse]:
|
|
async def _iter() -> AsyncIterable[AgentResponseUpdate]:
|
|
# First: text
|
|
yield AgentResponseUpdate(
|
|
contents=[Content.from_text(text="Let me search for that...")],
|
|
role="assistant",
|
|
)
|
|
# Second: tool call
|
|
yield AgentResponseUpdate(
|
|
contents=[
|
|
Content.from_function_call(
|
|
call_id="call_123",
|
|
name="search",
|
|
arguments={"query": "weather"},
|
|
)
|
|
],
|
|
role="assistant",
|
|
)
|
|
# Third: tool result
|
|
yield AgentResponseUpdate(
|
|
contents=[
|
|
Content.from_function_result(
|
|
call_id="call_123",
|
|
result={"temperature": 72, "condition": "sunny"},
|
|
)
|
|
],
|
|
role="tool",
|
|
)
|
|
# Fourth: final text
|
|
yield AgentResponseUpdate(
|
|
contents=[Content.from_text(text="The weather is sunny, 72°F.")],
|
|
role="assistant",
|
|
)
|
|
|
|
return ResponseStream(_iter(), finalizer=AgentResponse.from_updates)
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions for Test Data Creation
|
|
# =============================================================================
|
|
|
|
|
|
def _create_agent_run_response(text: str = "Test response") -> AgentResponse:
|
|
"""Create an AgentResponse with the given text."""
|
|
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=text)])])
|
|
|
|
|
|
def _create_agent_executor_response(
|
|
executor_id: str = "test_executor",
|
|
response_text: str = "Executor response",
|
|
) -> AgentExecutorResponse:
|
|
"""Create an AgentExecutorResponse - the type that's nested in
|
|
executor_completed event (type='executor_completed').data."""
|
|
agent_response = _create_agent_run_response(response_text)
|
|
return AgentExecutorResponse(
|
|
executor_id=executor_id,
|
|
agent_response=agent_response,
|
|
full_conversation=[
|
|
Message("user", [Content.from_text(text="User input")]),
|
|
Message("assistant", [Content.from_text(text=response_text)]),
|
|
],
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Public Factory Functions (for direct import in tests)
|
|
# =============================================================================
|
|
|
|
|
|
def create_agent_run_response(text: str = "Test response") -> AgentResponse:
|
|
"""Create an AgentResponse with the given text."""
|
|
return _create_agent_run_response(text)
|
|
|
|
|
|
def create_executor_invoked_event(executor_id: str = "test_executor") -> WorkflowEvent[Any]:
|
|
"""Create a WorkflowEvent(type='executor_invoked')."""
|
|
return WorkflowEvent.executor_invoked(executor_id=executor_id)
|
|
|
|
|
|
def create_executor_completed_event(
|
|
executor_id: str = "test_executor",
|
|
with_agent_response: bool = True,
|
|
) -> WorkflowEvent[Any]:
|
|
"""Create a WorkflowEvent(type='executor_completed') with realistic nested data.
|
|
|
|
This creates the exact data structure that caused the serialization bug:
|
|
WorkflowEvent.data contains AgentExecutorResponse which contains
|
|
AgentResponse and Message objects (SerializationMixin, not Pydantic).
|
|
"""
|
|
data = _create_agent_executor_response(executor_id) if with_agent_response else {"simple": "dict"}
|
|
return WorkflowEvent.executor_completed(executor_id=executor_id, data=data)
|
|
|
|
|
|
def create_executor_failed_event(
|
|
executor_id: str = "test_executor",
|
|
error_message: str = "Test error",
|
|
) -> WorkflowEvent[WorkflowErrorDetails]:
|
|
"""Create a WorkflowEvent(type='executor_failed')."""
|
|
details = WorkflowErrorDetails(error_type="TestError", message=error_message)
|
|
return WorkflowEvent.executor_failed(executor_id=executor_id, details=details)
|
|
|
|
|
|
# =============================================================================
|
|
# Pytest Fixtures
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.fixture
|
|
def mapper() -> MessageMapper:
|
|
"""Create a fresh MessageMapper for each test."""
|
|
return MessageMapper()
|
|
|
|
|
|
@pytest.fixture
|
|
def test_request() -> AgentFrameworkRequest:
|
|
"""Create a standard test request."""
|
|
return AgentFrameworkRequest(
|
|
metadata={"entity_id": "test_agent"},
|
|
input="Test input",
|
|
stream=True,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_chat_client() -> MockChatClient:
|
|
"""Create a mock chat client."""
|
|
return MockChatClient()
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_base_chat_client() -> MockBaseChatClient:
|
|
"""Create a mock BaseChatClient."""
|
|
return MockBaseChatClient()
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_agent() -> MockAgent:
|
|
"""Create a mock agent."""
|
|
return MockAgent(id="test_agent", name="TestAgent", response_text="Mock agent response")
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_tool_agent() -> MockToolCallingAgent:
|
|
"""Create a mock agent that simulates tool calls."""
|
|
return MockToolCallingAgent(id="tool_agent", name="ToolAgent")
|
|
|
|
|
|
@pytest.fixture
|
|
def agent_run_response() -> AgentResponse:
|
|
"""Create an AgentResponse with default text."""
|
|
return _create_agent_run_response()
|
|
|
|
|
|
@pytest.fixture
|
|
def executor_completed_event() -> WorkflowEvent[Any]:
|
|
"""Create a WorkflowEvent(type='executor_completed') with realistic nested data.
|
|
|
|
This creates the exact data structure that caused the serialization bug:
|
|
executor_completed event (type='executor_completed').data contains AgentExecutorResponse which contains
|
|
AgentResponse and Message objects (SerializationMixin, not Pydantic).
|
|
"""
|
|
data = _create_agent_executor_response("test_executor")
|
|
return WorkflowEvent.executor_completed(executor_id="test_executor", data=data)
|
|
|
|
|
|
@pytest.fixture
|
|
def executor_invoked_event() -> WorkflowEvent[Any]:
|
|
"""Create a WorkflowEvent(type='executor_invoked')."""
|
|
return WorkflowEvent.executor_invoked(executor_id="test_executor")
|
|
|
|
|
|
@pytest.fixture
|
|
def executor_failed_event() -> WorkflowEvent[WorkflowErrorDetails]:
|
|
"""Create a WorkflowEvent(type='executor_failed')."""
|
|
details = WorkflowErrorDetails(error_type="TestError", message="Test error")
|
|
return WorkflowEvent.executor_failed(executor_id="test_executor", details=details)
|
|
|
|
|
|
@pytest.fixture
|
|
def test_entities_dir() -> str:
|
|
"""Use the samples directory which has proper entity structure."""
|
|
current_dir = Path(__file__).parent
|
|
# Navigate to python/samples/02-agents/devui
|
|
samples_dir = current_dir.parent.parent.parent.parent / "samples" / "02-agents" / "devui"
|
|
return str(samples_dir.resolve())
|
|
|
|
|
|
# =============================================================================
|
|
# Async Fixtures for Executor/Workflow Setup
|
|
# =============================================================================
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def executor_with_real_agent() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient]:
|
|
"""Create an executor with a REAL Agent using mock chat client.
|
|
|
|
This tests the full execution pipeline:
|
|
- Real Agent class
|
|
- Real message handling and normalization
|
|
- Real middleware pipeline
|
|
- Only the LLM call is mocked
|
|
|
|
Returns tuple of (executor, entity_id, mock_client) so tests can access all components.
|
|
"""
|
|
mock_client = MockBaseChatClient()
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
# Create a REAL Agent with mock client
|
|
agent = Agent(
|
|
id="test_chat_agent",
|
|
name="Test Chat Agent",
|
|
description="A real Agent for testing execution flow",
|
|
client=mock_client,
|
|
system_message="You are a helpful test assistant.",
|
|
)
|
|
|
|
# Register the real agent
|
|
entity_info = await discovery.create_entity_info_from_object(agent, source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, agent)
|
|
|
|
return executor, entity_info.id, mock_client
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def sequential_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
|
|
"""Create a realistic sequential workflow (Writer -> Reviewer).
|
|
|
|
This provides a reusable multi-agent workflow that:
|
|
- Chains 2 ChatAgents sequentially
|
|
- Writer generates content, Reviewer provides feedback
|
|
- Pre-configures mock responses for both agents
|
|
|
|
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
|
|
"""
|
|
mock_client = MockBaseChatClient()
|
|
mock_client.run_responses = [
|
|
ChatResponse(messages=Message("assistant", ["Here's the draft content about the topic."])),
|
|
ChatResponse(messages=Message("assistant", ["Review: Content is clear and well-structured."])),
|
|
]
|
|
|
|
writer = Agent(
|
|
id="writer",
|
|
name="Writer",
|
|
description="Content writer agent",
|
|
client=mock_client,
|
|
system_message="You are a content writer. Create clear, engaging content.",
|
|
)
|
|
reviewer = Agent(
|
|
id="reviewer",
|
|
name="Reviewer",
|
|
description="Content reviewer agent",
|
|
client=mock_client,
|
|
system_message="You are a reviewer. Provide constructive feedback.",
|
|
)
|
|
|
|
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
|
|
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
return executor, entity_info.id, mock_client, workflow
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def concurrent_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]:
|
|
"""Create a realistic concurrent workflow (Researcher | Analyst | Summarizer).
|
|
|
|
This provides a reusable fan-out/fan-in workflow that:
|
|
- Runs 3 ChatAgents in parallel
|
|
- Each agent processes the same input independently
|
|
- Pre-configures mock responses for all agents
|
|
|
|
Returns tuple of (executor, entity_id, mock_client, workflow) for test access.
|
|
"""
|
|
mock_client = MockBaseChatClient()
|
|
mock_client.run_responses = [
|
|
ChatResponse(messages=Message("assistant", ["Research findings: Key data points identified."])),
|
|
ChatResponse(messages=Message("assistant", ["Analysis: Trends indicate positive growth."])),
|
|
ChatResponse(messages=Message("assistant", ["Summary: Overall outlook is favorable."])),
|
|
]
|
|
|
|
researcher = Agent(
|
|
id="researcher",
|
|
name="Researcher",
|
|
description="Research agent",
|
|
client=mock_client,
|
|
system_message="You are a researcher. Find key data and insights.",
|
|
)
|
|
analyst = Agent(
|
|
id="analyst",
|
|
name="Analyst",
|
|
description="Analysis agent",
|
|
client=mock_client,
|
|
system_message="You are an analyst. Identify trends and patterns.",
|
|
)
|
|
summarizer = Agent(
|
|
id="summarizer",
|
|
name="Summarizer",
|
|
description="Summary agent",
|
|
client=mock_client,
|
|
system_message="You are a summarizer. Provide concise summaries.",
|
|
)
|
|
|
|
workflow = ConcurrentBuilder(participants=[researcher, analyst, summarizer]).build()
|
|
|
|
discovery = EntityDiscovery(None)
|
|
mapper = MessageMapper()
|
|
executor = AgentFrameworkExecutor(discovery, mapper)
|
|
|
|
entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test")
|
|
discovery.register_entity(entity_info.id, entity_info, workflow)
|
|
|
|
return executor, entity_info.id, mock_client, workflow
|