mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
c8694a8c76
* Workflow init commit * Add samples and clean up * ExecutionContext -> WorkflowContext * Address comments 1 * Fix mypy * flatting folder structure, and rename contexts * Remove add_loop * Add map reduce sample, remove Activation conditions * Add AgentExecutor and allow multiple handlers per executor * Minor improvement * Add RequestInfoExecutor * Add unit tests part 1 * Address comments 2 * Pre-commit update * Add run method and more unit tests * Add xml docs * run_stream -> run_streaming * message_handler -> handler --------- Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> Co-authored-by: Evan Mattson <evan.mattson@microsoft.com>
146 lines
4.3 KiB
Python
146 lines
4.3 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
|
|
import pytest
|
|
from agent_framework.workflow import Executor, WorkflowCompletedEvent, WorkflowContext, WorkflowEvent, handler
|
|
|
|
from agent_framework_workflow._edge import Edge
|
|
from agent_framework_workflow._runner import Runner
|
|
from agent_framework_workflow._runner_context import InProcRunnerContext, RunnerContext
|
|
from agent_framework_workflow._shared_state import SharedState
|
|
|
|
|
|
@dataclass
|
|
class MockMessage:
|
|
"""A mock message for testing purposes."""
|
|
|
|
data: int
|
|
|
|
|
|
class MockExecutor(Executor):
|
|
"""A mock executor for testing purposes."""
|
|
|
|
@handler(output_types=[MockMessage])
|
|
async def mock_handler(self, message: MockMessage, ctx: WorkflowContext) -> None:
|
|
if message.data < 10:
|
|
await ctx.send_message(MockMessage(data=message.data + 1))
|
|
else:
|
|
await ctx.add_event(WorkflowCompletedEvent(data=message.data))
|
|
|
|
|
|
def test_create_runner():
|
|
"""Test creating a runner with edges and shared state."""
|
|
executor_a = MockExecutor(id="executor_a")
|
|
executor_b = MockExecutor(id="executor_b")
|
|
|
|
# Create a loop
|
|
edges = [
|
|
Edge(source=executor_a, target=executor_b),
|
|
Edge(source=executor_b, target=executor_a),
|
|
]
|
|
|
|
runner = Runner(edges, shared_state=SharedState(), ctx=InProcRunnerContext())
|
|
|
|
assert runner.context is not None and isinstance(runner.context, RunnerContext)
|
|
|
|
|
|
async def test_runner_run_until_convergence():
|
|
"""Test running the runner with a simple workflow."""
|
|
executor_a = MockExecutor(id="executor_a")
|
|
executor_b = MockExecutor(id="executor_b")
|
|
|
|
# Create a loop
|
|
edges = [
|
|
Edge(source=executor_a, target=executor_b),
|
|
Edge(source=executor_b, target=executor_a),
|
|
]
|
|
|
|
shared_state = SharedState()
|
|
ctx = InProcRunnerContext()
|
|
|
|
runner = Runner(edges, shared_state, ctx)
|
|
|
|
result: int | None = None
|
|
await executor_a.execute(
|
|
MockMessage(data=0),
|
|
WorkflowContext(
|
|
executor_id=executor_a.id,
|
|
source_executor_ids=["START"],
|
|
shared_state=shared_state,
|
|
runner_context=ctx,
|
|
),
|
|
)
|
|
async for event in runner.run_until_convergence():
|
|
assert isinstance(event, WorkflowEvent)
|
|
if isinstance(event, WorkflowCompletedEvent):
|
|
result = event.data
|
|
|
|
assert result is not None and result == 10
|
|
|
|
|
|
async def test_runner_run_until_convergence_not_completed():
|
|
"""Test running the runner with a simple workflow."""
|
|
executor_a = MockExecutor(id="executor_a")
|
|
executor_b = MockExecutor(id="executor_b")
|
|
|
|
# Create a loop
|
|
edges = [
|
|
Edge(source=executor_a, target=executor_b),
|
|
Edge(source=executor_b, target=executor_a),
|
|
]
|
|
|
|
shared_state = SharedState()
|
|
ctx = InProcRunnerContext()
|
|
|
|
runner = Runner(edges, shared_state, ctx, max_iterations=5)
|
|
|
|
await executor_a.execute(
|
|
MockMessage(data=0),
|
|
WorkflowContext(
|
|
executor_id=executor_a.id,
|
|
source_executor_ids=["START"],
|
|
shared_state=shared_state,
|
|
runner_context=ctx,
|
|
),
|
|
)
|
|
with pytest.raises(RuntimeError, match="Runner did not converge after 5 iterations."):
|
|
async for event in runner.run_until_convergence():
|
|
assert not isinstance(event, WorkflowCompletedEvent)
|
|
|
|
|
|
async def test_runner_already_running():
|
|
"""Test that running the runner while it is already running raises an error."""
|
|
executor_a = MockExecutor(id="executor_a")
|
|
executor_b = MockExecutor(id="executor_b")
|
|
|
|
# Create a loop
|
|
edges = [
|
|
Edge(source=executor_a, target=executor_b),
|
|
Edge(source=executor_b, target=executor_a),
|
|
]
|
|
|
|
shared_state = SharedState()
|
|
ctx = InProcRunnerContext()
|
|
|
|
runner = Runner(edges, shared_state, ctx)
|
|
|
|
await executor_a.execute(
|
|
MockMessage(data=0),
|
|
WorkflowContext(
|
|
executor_id=executor_a.id,
|
|
source_executor_ids=["START"],
|
|
shared_state=shared_state,
|
|
runner_context=ctx,
|
|
),
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="Runner is already running."):
|
|
|
|
async def _run():
|
|
async for _ in runner.run_until_convergence():
|
|
pass
|
|
|
|
await asyncio.gather(_run(), _run())
|