mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
e7937947d9
* Fix declarative object parsing bug * Remove unnecessary comment * Address PR comments * Address PR comments. * Fix CI failures.
2938 lines
112 KiB
Python
2938 lines
112 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
# pyright: reportUnknownParameterType=false, reportUnknownArgumentType=false
|
|
# pyright: reportMissingParameterType=false, reportUnknownMemberType=false
|
|
# pyright: reportPrivateUsage=false, reportUnknownVariableType=false
|
|
# pyright: reportGeneralTypeIssues=false
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from agent_framework_declarative._workflows import (
|
|
ActionComplete,
|
|
ActionTrigger,
|
|
DeclarativeWorkflowState,
|
|
)
|
|
from agent_framework_declarative._workflows._declarative_base import (
|
|
ConditionResult,
|
|
LoopControl,
|
|
LoopIterationResult,
|
|
)
|
|
|
|
try:
|
|
import powerfx # noqa: F401
|
|
|
|
_powerfx_available = True
|
|
except (ImportError, RuntimeError):
|
|
_powerfx_available = False
|
|
|
|
_requires_powerfx = pytest.mark.skipif(not _powerfx_available, reason="PowerFx engine not available")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_state() -> MagicMock:
|
|
"""Create a mock state with sync get/set/delete methods."""
|
|
mock_state = MagicMock()
|
|
mock_state._data = {}
|
|
|
|
def mock_get(key: str, default: Any = None) -> Any:
|
|
return mock_state._data.get(key, default)
|
|
|
|
def mock_set(key: str, value: Any) -> None:
|
|
mock_state._data[key] = value
|
|
|
|
def mock_has(key: str) -> bool:
|
|
return key in mock_state._data
|
|
|
|
def mock_delete(key: str) -> None:
|
|
if key in mock_state._data:
|
|
del mock_state._data[key]
|
|
|
|
mock_state.get = MagicMock(side_effect=mock_get)
|
|
mock_state.set = MagicMock(side_effect=mock_set)
|
|
mock_state.has = MagicMock(side_effect=mock_has)
|
|
mock_state.delete = MagicMock(side_effect=mock_delete)
|
|
|
|
return mock_state
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_context(mock_state: MagicMock) -> MagicMock:
|
|
"""Create a mock workflow context."""
|
|
ctx = MagicMock()
|
|
ctx.state = mock_state
|
|
ctx.send_message = AsyncMock()
|
|
ctx.yield_output = AsyncMock()
|
|
ctx.request_info = AsyncMock()
|
|
return ctx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DeclarativeWorkflowState Tests - Covering _base.py gaps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDeclarativeWorkflowStateExtended:
|
|
"""Extended tests for DeclarativeWorkflowState covering uncovered code paths."""
|
|
|
|
async def test_get_with_local_namespace(self, mock_state):
|
|
"""Test Local. namespace mapping."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.myVar", "value123")
|
|
|
|
# Access via Local. namespace
|
|
result = state.get("Local.myVar")
|
|
assert result == "value123"
|
|
|
|
async def test_get_with_system_namespace(self, mock_state):
|
|
"""Test System. namespace mapping."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("System.ConversationId", "conv-123")
|
|
|
|
result = state.get("System.ConversationId")
|
|
assert result == "conv-123"
|
|
|
|
async def test_get_with_workflow_namespace(self, mock_state):
|
|
"""Test Workflow. namespace mapping."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize({"query": "test"})
|
|
|
|
result = state.get("Workflow.Inputs.query")
|
|
assert result == "test"
|
|
|
|
async def test_get_with_inputs_shorthand(self, mock_state):
|
|
"""Test inputs. shorthand namespace mapping."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize({"query": "test"})
|
|
|
|
result = state.get("Workflow.Inputs.query")
|
|
assert result == "test"
|
|
|
|
async def test_get_agent_namespace(self, mock_state):
|
|
"""Test agent namespace access."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Agent.response", "Hello!")
|
|
|
|
result = state.get("Agent.response")
|
|
assert result == "Hello!"
|
|
|
|
async def test_get_conversation_namespace(self, mock_state):
|
|
"""Test conversation namespace access."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Conversation.messages", [{"role": "user", "text": "hi"}])
|
|
|
|
result = state.get("Conversation.messages")
|
|
assert result == [{"role": "user", "text": "hi"}]
|
|
|
|
async def test_get_custom_namespace(self, mock_state):
|
|
"""Test custom namespace access."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Set via direct state data manipulation to create custom namespace
|
|
state_data = state.get_state_data()
|
|
state_data["Custom"] = {"myns": {"value": 42}}
|
|
state.set_state_data(state_data)
|
|
|
|
result = state.get("myns.value")
|
|
assert result == 42
|
|
|
|
async def test_get_object_attribute_access(self, mock_state):
|
|
"""Test accessing object attributes via hasattr/getattr path."""
|
|
|
|
@dataclass
|
|
class MockObj:
|
|
name: str
|
|
value: int
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.obj", MockObj(name="test", value=99))
|
|
|
|
result = state.get("Local.obj.name")
|
|
assert result == "test"
|
|
|
|
async def test_set_with_local_namespace(self, mock_state):
|
|
"""Test Local. namespace mapping for set."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
state.set("Local.myVar", "value123")
|
|
result = state.get("Local.myVar")
|
|
assert result == "value123"
|
|
|
|
async def test_set_with_system_namespace(self, mock_state):
|
|
"""Test System. namespace mapping for set."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
state.set("System.ConversationId", "conv-456")
|
|
result = state.get("System.ConversationId")
|
|
assert result == "conv-456"
|
|
|
|
async def test_set_workflow_outputs(self, mock_state):
|
|
"""Test setting workflow outputs."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
state.set("Workflow.Outputs.result", "done")
|
|
outputs = state.get("Workflow.Outputs")
|
|
assert outputs.get("result") == "done"
|
|
|
|
async def test_set_workflow_inputs_raises_error(self, mock_state):
|
|
"""Test that setting Workflow.Inputs raises an error (read-only)."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize({"query": "test"})
|
|
|
|
with pytest.raises(ValueError, match="Cannot modify Workflow.Inputs"):
|
|
state.set("Workflow.Inputs.query", "modified")
|
|
|
|
async def test_set_workflow_directly_raises_error(self, mock_state):
|
|
"""Test that setting 'Workflow' directly raises an error."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
with pytest.raises(ValueError, match="Cannot set 'Workflow' directly"):
|
|
state.set("Workflow", {})
|
|
|
|
async def test_set_unknown_workflow_subnamespace_raises_error(self, mock_state):
|
|
"""Test unknown workflow sub-namespace raises error."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
with pytest.raises(ValueError, match="Unknown Workflow namespace"):
|
|
state.set("Workflow.unknown.field", "value")
|
|
|
|
async def test_set_creates_custom_namespace(self, mock_state):
|
|
"""Test setting value in custom namespace creates it."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
state.set("myns.field.nested", "value")
|
|
result = state.get("myns.field.nested")
|
|
assert result == "value"
|
|
|
|
async def test_set_cannot_replace_entire_namespace(self, mock_state):
|
|
"""Test that replacing entire namespace raises error."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
with pytest.raises(ValueError, match="Cannot replace entire namespace"):
|
|
state.set("turn", {})
|
|
|
|
async def test_append_to_nonlist_raises_error(self, mock_state):
|
|
"""Test appending to non-list raises error."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.scalar", "string value")
|
|
|
|
with pytest.raises(ValueError, match="Cannot append to non-list"):
|
|
state.append("Local.scalar", "new item")
|
|
|
|
async def test_eval_empty_string(self, mock_state):
|
|
"""Test evaluating empty string returns as-is."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
result = state.eval("")
|
|
assert result == ""
|
|
|
|
async def test_eval_non_string_returns_as_is(self, mock_state):
|
|
"""Test evaluating non-string returns as-is."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Cast to Any to test the runtime behavior with non-string inputs
|
|
result = state.eval(42) # type: ignore[arg-type]
|
|
assert result == 42
|
|
|
|
result = state.eval([1, 2, 3]) # type: ignore[arg-type]
|
|
assert result == [1, 2, 3]
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_simple_and_operator(self, mock_state):
|
|
"""Test simple And operator evaluation."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.a", True)
|
|
state.set("Local.b", False)
|
|
|
|
result = state.eval("=Local.a And Local.b")
|
|
assert result is False
|
|
|
|
state.set("Local.b", True)
|
|
result = state.eval("=Local.a And Local.b")
|
|
assert result is True
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_simple_or_operator(self, mock_state):
|
|
"""Test simple Or operator evaluation."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.a", True)
|
|
state.set("Local.b", False)
|
|
|
|
result = state.eval("=Local.a Or Local.b")
|
|
assert result is True
|
|
|
|
state.set("Local.a", False)
|
|
result = state.eval("=Local.a Or Local.b")
|
|
assert result is False
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_negation(self, mock_state):
|
|
"""Test negation (!) evaluation."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.flag", True)
|
|
|
|
result = state.eval("=!Local.flag")
|
|
assert result is False
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_not_function(self, mock_state):
|
|
"""Test Not() function evaluation."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.flag", True)
|
|
|
|
result = state.eval("=Not(Local.flag)")
|
|
assert result is False
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_comparison_operators(self, mock_state):
|
|
"""Test comparison operators."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.x", 5)
|
|
state.set("Local.y", 10)
|
|
|
|
assert state.eval("=Local.x < Local.y") is True
|
|
assert state.eval("=Local.x > Local.y") is False
|
|
assert state.eval("=Local.x <= 5") is True
|
|
assert state.eval("=Local.x >= 5") is True
|
|
assert state.eval("=Local.x <> Local.y") is True
|
|
assert state.eval("=Local.x = 5") is True
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_arithmetic_operators(self, mock_state):
|
|
"""Test arithmetic operators."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.x", 10)
|
|
state.set("Local.y", 3)
|
|
|
|
assert state.eval("=Local.x + Local.y") == 13
|
|
assert state.eval("=Local.x - Local.y") == 7
|
|
assert state.eval("=Local.x * Local.y") == 30
|
|
assert state.eval("=Local.x / Local.y") == pytest.approx(3.333, rel=0.01)
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_string_literal(self, mock_state):
|
|
"""Test string literal evaluation."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
result = state.eval('="hello world"')
|
|
assert result == "hello world"
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_float_literal(self, mock_state):
|
|
"""Test float literal evaluation."""
|
|
from decimal import Decimal
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
result = state.eval("=3.14")
|
|
# Accepts both float (Python fallback) and Decimal (pythonnet/PowerFx)
|
|
assert result == 3.14 or result == Decimal("3.14")
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_variable_reference_with_namespace_mappings(self, mock_state):
|
|
"""Test variable reference with PowerFx symbols."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize({"query": "test"})
|
|
state.set("Local.myVar", "localValue")
|
|
|
|
# Test Local namespace (PowerFx symbol)
|
|
result = state.eval("=Local.myVar")
|
|
assert result == "localValue"
|
|
|
|
# Test Workflow.Inputs (PowerFx symbol)
|
|
result = state.eval("=Workflow.Inputs.query")
|
|
assert result == "test"
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_if_expression_with_dict(self, mock_state):
|
|
"""Test eval_if_expression recursively evaluates dicts."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.name", "Alice")
|
|
|
|
result = state.eval_if_expression({"greeting": "=Local.name", "static": "hello"})
|
|
assert result == {"greeting": "Alice", "static": "hello"}
|
|
|
|
@_requires_powerfx
|
|
async def test_eval_if_expression_with_list(self, mock_state):
|
|
"""Test eval_if_expression recursively evaluates lists."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.x", 10)
|
|
|
|
result = state.eval_if_expression(["=Local.x", "static", "=5"])
|
|
assert result == [10, "static", 5]
|
|
|
|
async def test_interpolate_string_with_local_vars(self, mock_state):
|
|
"""Test string interpolation with Local. variables."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.TicketId", "TKT-001")
|
|
state.set("Local.TeamName", "Support")
|
|
|
|
result = state.interpolate_string("Created ticket #{Local.TicketId} for team {Local.TeamName}")
|
|
assert result == "Created ticket #TKT-001 for team Support"
|
|
|
|
async def test_interpolate_string_with_system_vars(self, mock_state):
|
|
"""Test string interpolation with System. variables."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("System.ConversationId", "conv-789")
|
|
|
|
result = state.interpolate_string("Conversation: {System.ConversationId}")
|
|
assert result == "Conversation: conv-789"
|
|
|
|
async def test_interpolate_string_with_none_value(self, mock_state):
|
|
"""Test string interpolation with None value returns empty string."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
result = state.interpolate_string("Value: {Local.Missing}")
|
|
assert result == "Value: "
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Basic Executors Tests - Covering _executors_basic.py gaps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBasicExecutorsCoverage:
|
|
"""Tests for basic executors covering uncovered code paths."""
|
|
|
|
async def test_set_variable_executor(self, mock_context, mock_state):
|
|
"""Test SetVariableExecutor (distinct from SetValueExecutor)."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetVariableExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "SetVariable",
|
|
"variable": "Local.result",
|
|
"value": "test value",
|
|
}
|
|
executor = SetVariableExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
result = state.get("Local.result")
|
|
assert result == "test value"
|
|
|
|
async def test_set_variable_executor_with_nested_variable(self, mock_context, mock_state):
|
|
"""Test SetVariableExecutor with nested variable object."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetVariableExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "SetVariable",
|
|
"variable": {"path": "Local.nested"},
|
|
"value": 42,
|
|
}
|
|
executor = SetVariableExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
result = state.get("Local.nested")
|
|
assert result == 42
|
|
|
|
@_requires_powerfx
|
|
async def test_set_text_variable_executor(self, mock_context, mock_state):
|
|
"""Test SetTextVariableExecutor."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetTextVariableExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.name", "World")
|
|
|
|
action_def = {
|
|
"kind": "SetTextVariable",
|
|
"variable": "Local.greeting",
|
|
"text": "=Local.name",
|
|
}
|
|
executor = SetTextVariableExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
result = state.get("Local.greeting")
|
|
assert result == "World"
|
|
|
|
async def test_set_multiple_variables_executor(self, mock_context, mock_state):
|
|
"""Test SetMultipleVariablesExecutor."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetMultipleVariablesExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "SetMultipleVariables",
|
|
"assignments": [
|
|
{"variable": "Local.a", "value": 1},
|
|
{"variable": {"path": "Local.b"}, "value": 2},
|
|
{"path": "Local.c", "value": 3},
|
|
],
|
|
}
|
|
executor = SetMultipleVariablesExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
assert state.get("Local.a") == 1
|
|
assert state.get("Local.b") == 2
|
|
assert state.get("Local.c") == 3
|
|
|
|
async def test_reset_variable_executor(self, mock_context, mock_state):
|
|
"""Test ResetVariableExecutor."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
ResetVariableExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.myVar", "some value")
|
|
|
|
action_def = {
|
|
"kind": "ResetVariable",
|
|
"variable": "Local.myVar",
|
|
}
|
|
executor = ResetVariableExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
result = state.get("Local.myVar")
|
|
assert result is None
|
|
|
|
async def test_clear_all_variables_executor(self, mock_context, mock_state):
|
|
"""Test ClearAllVariablesExecutor."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
ClearAllVariablesExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.a", 1)
|
|
state.set("Local.b", 2)
|
|
|
|
action_def = {"kind": "ClearAllVariables"}
|
|
executor = ClearAllVariablesExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# Turn namespace should be cleared
|
|
assert state.get("Local.a") is None
|
|
assert state.get("Local.b") is None
|
|
|
|
async def test_send_activity_with_dict_activity(self, mock_context, mock_state):
|
|
"""Test SendActivityExecutor with dict activity containing text field."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SendActivityExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.name", "Alice")
|
|
|
|
action_def = {
|
|
"kind": "SendActivity",
|
|
"activity": {"text": "Hello, {Local.name}!"},
|
|
}
|
|
executor = SendActivityExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
mock_context.yield_output.assert_called_once_with("Hello, Alice!")
|
|
|
|
async def test_send_activity_with_string_activity(self, mock_context, mock_state):
|
|
"""Test SendActivityExecutor with string activity."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SendActivityExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "SendActivity",
|
|
"activity": "Plain text message",
|
|
}
|
|
executor = SendActivityExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
mock_context.yield_output.assert_called_once_with("Plain text message")
|
|
|
|
@_requires_powerfx
|
|
async def test_send_activity_with_expression(self, mock_context, mock_state):
|
|
"""Test SendActivityExecutor evaluates expressions."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SendActivityExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.msg", "Dynamic message")
|
|
|
|
action_def = {
|
|
"kind": "SendActivity",
|
|
"activity": "=Local.msg",
|
|
}
|
|
executor = SendActivityExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
mock_context.yield_output.assert_called_once_with("Dynamic message")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Agent Executors Tests - Covering _executors_agents.py gaps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAgentExecutorsCoverage:
|
|
"""Tests for agent executors covering uncovered code paths."""
|
|
|
|
async def test_normalize_variable_path_all_cases(self):
|
|
"""Test _normalize_variable_path with all namespace prefixes."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
_normalize_variable_path,
|
|
)
|
|
|
|
# Local. -> Local. (unchanged)
|
|
assert _normalize_variable_path("Local.MyVar") == "Local.MyVar"
|
|
|
|
# System. -> System. (unchanged)
|
|
assert _normalize_variable_path("System.ConvId") == "System.ConvId"
|
|
|
|
# Workflow. -> Workflow. (unchanged)
|
|
assert _normalize_variable_path("Workflow.Outputs.result") == "Workflow.Outputs.result"
|
|
|
|
# Already has a namespace with dots - pass through
|
|
assert _normalize_variable_path("custom.existing") == "custom.existing"
|
|
|
|
# No namespace - default to Local.
|
|
assert _normalize_variable_path("simpleVar") == "Local.simpleVar"
|
|
|
|
async def test_agent_executor_get_agent_name_string(self, mock_context, mock_state):
|
|
"""Test agent name extraction from simple string config."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "MyAgent",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
name = executor._get_agent_name(state)
|
|
assert name == "MyAgent"
|
|
|
|
async def test_agent_executor_get_agent_name_dict(self, mock_context, mock_state):
|
|
"""Test agent name extraction from nested dict config."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": {"name": "NestedAgent"},
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
name = executor._get_agent_name(state)
|
|
assert name == "NestedAgent"
|
|
|
|
async def test_agent_executor_get_agent_name_legacy(self, mock_context, mock_state):
|
|
"""Test agent name extraction from agentName (legacy)."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agentName": "LegacyAgent",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
name = executor._get_agent_name(state)
|
|
assert name == "LegacyAgent"
|
|
|
|
async def test_agent_executor_get_agent_name_string_expression(self, mock_context, mock_state):
|
|
"""Test agent name extraction from simple string expression."""
|
|
from unittest.mock import patch
|
|
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "=Local.SelectedAgent",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
with patch.object(state, "eval_if_expression", return_value="DynamicAgent"):
|
|
name = executor._get_agent_name(state)
|
|
assert name == "DynamicAgent"
|
|
|
|
async def test_agent_executor_get_agent_name_dict_expression(self, mock_context, mock_state):
|
|
"""Test agent name extraction from nested dict with expression."""
|
|
from unittest.mock import patch
|
|
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": {"name": "=Local.ManagerResult.next_speaker.answer"},
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
with patch.object(state, "eval_if_expression", return_value="WeatherAgent"):
|
|
name = executor._get_agent_name(state)
|
|
assert name == "WeatherAgent"
|
|
|
|
async def test_agent_executor_get_agent_name_legacy_expression(self, mock_context, mock_state):
|
|
"""Test agent name extraction from legacy agentName with expression."""
|
|
from unittest.mock import patch
|
|
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agentName": "=Local.NextAgent",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
with patch.object(state, "eval_if_expression", return_value="ResolvedAgent"):
|
|
name = executor._get_agent_name(state)
|
|
assert name == "ResolvedAgent"
|
|
|
|
async def test_agent_executor_get_agent_name_expression_returns_none(self, mock_context, mock_state):
|
|
"""Test agent name returns None when expression evaluates to None."""
|
|
from unittest.mock import patch
|
|
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": {"name": "=Local.UndefinedVar"},
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
with patch.object(state, "eval_if_expression", return_value=None):
|
|
name = executor._get_agent_name(state)
|
|
assert name is None
|
|
|
|
async def test_agent_executor_get_input_config_simple(self, mock_context, mock_state):
|
|
"""Test input config parsing with simple non-dict input."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"input": "simple string input",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
args, messages, external_loop, max_iterations = executor._get_input_config()
|
|
assert args == {}
|
|
assert messages == "simple string input"
|
|
assert external_loop is None
|
|
assert max_iterations == 100 # Default
|
|
|
|
async def test_agent_executor_get_input_config_full(self, mock_context, mock_state):
|
|
"""Test input config parsing with full structured input."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"input": {
|
|
"arguments": {"param1": "=Local.value"},
|
|
"messages": "=conversation.messages",
|
|
"externalLoop": {"when": "=Local.needsMore", "maxIterations": 50},
|
|
},
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
args, messages, external_loop, max_iterations = executor._get_input_config()
|
|
assert args == {"param1": "=Local.value"}
|
|
assert messages == "=conversation.messages"
|
|
assert external_loop == "=Local.needsMore"
|
|
assert max_iterations == 50
|
|
|
|
async def test_agent_executor_get_output_config_simple(self, mock_context, mock_state):
|
|
"""Test output config parsing with simple resultProperty."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"resultProperty": "Local.result",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
messages_var, response_obj, result_prop, auto_send = executor._get_output_config()
|
|
assert messages_var is None
|
|
assert response_obj is None
|
|
assert result_prop == "Local.result"
|
|
assert auto_send is True
|
|
|
|
async def test_agent_executor_get_output_config_full(self, mock_context, mock_state):
|
|
"""Test output config parsing with full structured output."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"output": {
|
|
"messages": "Local.ResponseMessages",
|
|
"responseObject": "Local.ParsedResponse",
|
|
"property": "Local.result",
|
|
"autoSend": False,
|
|
},
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
messages_var, response_obj, result_prop, auto_send = executor._get_output_config()
|
|
assert messages_var == "Local.ResponseMessages"
|
|
assert response_obj == "Local.ParsedResponse"
|
|
assert result_prop == "Local.result"
|
|
assert auto_send is False
|
|
|
|
@_requires_powerfx
|
|
async def test_agent_executor_build_input_text_from_string_messages(self, mock_context, mock_state):
|
|
"""Test _build_input_text with string messages expression."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.userInput", "Hello agent!")
|
|
|
|
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
input_text = await executor._build_input_text(state, {}, "=Local.userInput")
|
|
assert input_text == "Hello agent!"
|
|
|
|
@_requires_powerfx
|
|
async def test_agent_executor_build_input_text_from_message_list(self, mock_context, mock_state):
|
|
"""Test _build_input_text extracts text from message list."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set(
|
|
"Conversation.messages",
|
|
[
|
|
{"role": "user", "content": "First"},
|
|
{"role": "assistant", "content": "Response"},
|
|
{"role": "user", "content": "Last message"},
|
|
],
|
|
)
|
|
|
|
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
input_text = await executor._build_input_text(state, {}, "=Conversation.messages")
|
|
assert input_text == "Last message"
|
|
|
|
@_requires_powerfx
|
|
async def test_agent_executor_build_input_text_from_message_with_text_attr(self, mock_context, mock_state):
|
|
"""Test _build_input_text extracts text from message with text attribute."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.messages", [{"text": "From attribute"}])
|
|
|
|
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
input_text = await executor._build_input_text(state, {}, "=Local.messages")
|
|
assert input_text == "From attribute"
|
|
|
|
async def test_agent_executor_build_input_text_fallback_chain(self, mock_context, mock_state):
|
|
"""Test _build_input_text fallback chain when no messages expression."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize({"query": "workflow input"})
|
|
|
|
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
# No messages_expr, so falls back to workflow.inputs
|
|
input_text = await executor._build_input_text(state, {}, None)
|
|
assert input_text == "workflow input"
|
|
|
|
async def test_agent_executor_build_input_text_from_system_last_message(self, mock_context, mock_state):
|
|
"""Test _build_input_text falls back to system.LastMessage.Text."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("System.LastMessage", {"Text": "From last message"})
|
|
|
|
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
input_text = await executor._build_input_text(state, {}, None)
|
|
assert input_text == "From last message"
|
|
|
|
async def test_agent_executor_missing_agent_name(self, mock_context, mock_state):
|
|
"""Test agent executor with missing agent name logs warning."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {"kind": "InvokeAzureAgent"} # No agent specified
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# Should complete without error
|
|
mock_context.send_message.assert_called_once()
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, ActionComplete)
|
|
|
|
async def test_agent_executor_with_working_agent(self, mock_context, mock_state):
|
|
"""Test agent executor with a working mock agent."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
# Create mock agent
|
|
@dataclass
|
|
class MockResult:
|
|
text: str
|
|
messages: list[Any]
|
|
|
|
mock_agent = MagicMock()
|
|
mock_agent.run = AsyncMock(return_value=MockResult(text="Agent response", messages=[]))
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.input", "User query")
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"resultProperty": "Local.result",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# Verify agent was called
|
|
mock_agent.run.assert_called_once()
|
|
|
|
# Verify result was stored
|
|
result = state.get("Local.result")
|
|
assert result == "Agent response"
|
|
|
|
# Verify agent state was set
|
|
assert state.get("Agent.response") == "Agent response"
|
|
assert state.get("Agent.name") == "TestAgent"
|
|
assert state.get("Agent.text") == "Agent response"
|
|
|
|
async def test_agent_executor_with_agent_from_registry(self, mock_context, mock_state):
|
|
"""Test agent executor retrieves agent from shared state registry."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
AGENT_REGISTRY_KEY,
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
# Create mock agent
|
|
@dataclass
|
|
class MockResult:
|
|
text: str
|
|
messages: list[Any]
|
|
|
|
mock_agent = MagicMock()
|
|
mock_agent.run = AsyncMock(return_value=MockResult(text="Registry agent", messages=[]))
|
|
|
|
# Store in registry
|
|
mock_state._data[AGENT_REGISTRY_KEY] = {"RegistryAgent": mock_agent}
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.input", "Query")
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "RegistryAgent",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
mock_agent.run.assert_called_once()
|
|
|
|
async def test_agent_executor_parses_json_response(self, mock_context, mock_state):
|
|
"""Test agent executor parses JSON response into responseObject."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
@dataclass
|
|
class MockResult:
|
|
text: str
|
|
messages: list[Any]
|
|
|
|
mock_agent = MagicMock()
|
|
mock_agent.run = AsyncMock(return_value=MockResult(text='{"status": "ok", "count": 42}', messages=[]))
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.input", "Query")
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"output": {
|
|
"responseObject": "Local.Parsed",
|
|
},
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
parsed = state.get("Local.Parsed")
|
|
assert parsed == {"status": "ok", "count": 42}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Control Flow Executors Tests - Additional coverage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestControlFlowCoverage:
|
|
"""Tests for control flow executors covering uncovered code paths."""
|
|
|
|
@_requires_powerfx
|
|
async def test_foreach_with_source(self, mock_context, mock_state):
|
|
"""Test ForeachInitExecutor with the 'source' field."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
ForeachInitExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.data", [10, 20, 30])
|
|
|
|
action_def = {
|
|
"kind": "Foreach",
|
|
"source": "=Local.data",
|
|
"itemName": "item",
|
|
"indexName": "idx",
|
|
}
|
|
executor = ForeachInitExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, LoopIterationResult)
|
|
assert msg.has_next is True
|
|
assert msg.current_item == 10
|
|
assert msg.current_index == 0
|
|
|
|
async def test_foreach_next_continues_iteration(self, mock_context, mock_state):
|
|
"""Test ForeachNextExecutor continues to next item."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
LOOP_STATE_KEY,
|
|
ForeachNextExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.data", ["a", "b", "c"])
|
|
|
|
# Set up loop state as ForeachInitExecutor would
|
|
state_data = state.get_state_data()
|
|
state_data[LOOP_STATE_KEY] = {
|
|
"foreach_init": {
|
|
"items": ["a", "b", "c"],
|
|
"index": 0,
|
|
"length": 3,
|
|
}
|
|
}
|
|
state.set_state_data(state_data)
|
|
|
|
action_def = {
|
|
"kind": "Foreach",
|
|
"source": "=Local.data",
|
|
"itemName": "item",
|
|
}
|
|
executor = ForeachNextExecutor(action_def, init_executor_id="foreach_init")
|
|
|
|
await executor.handle_action(LoopIterationResult(has_next=True), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, LoopIterationResult)
|
|
assert msg.current_index == 1
|
|
assert msg.current_item == "b"
|
|
|
|
async def test_join_executor_accepts_condition_result(self, mock_context, mock_state):
|
|
"""Test JoinExecutor accepts ConditionResult as trigger."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
JoinExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {"kind": "_Join"}
|
|
executor = JoinExecutor(action_def)
|
|
|
|
# Trigger with ConditionResult
|
|
await executor.handle_action(ConditionResult(matched=True, branch_index=0), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, ActionComplete)
|
|
|
|
async def test_break_loop_executor(self, mock_context, mock_state):
|
|
"""Test BreakLoopExecutor emits LoopControl."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
BreakLoopExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {"kind": "BreakLoop"}
|
|
executor = BreakLoopExecutor(action_def, loop_next_executor_id="loop_next")
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, LoopControl)
|
|
assert msg.action == "break"
|
|
|
|
async def test_continue_loop_executor(self, mock_context, mock_state):
|
|
"""Test ContinueLoopExecutor emits LoopControl."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
ContinueLoopExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {"kind": "ContinueLoop"}
|
|
executor = ContinueLoopExecutor(action_def, loop_next_executor_id="loop_next")
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, LoopControl)
|
|
assert msg.action == "continue"
|
|
|
|
async def test_foreach_next_no_loop_state(self, mock_context, mock_state):
|
|
"""Test ForeachNextExecutor with missing loop state."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
ForeachNextExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "Foreach",
|
|
"source": "=Local.data",
|
|
"itemName": "item",
|
|
}
|
|
executor = ForeachNextExecutor(action_def, init_executor_id="missing_loop")
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, LoopIterationResult)
|
|
assert msg.has_next is False
|
|
|
|
async def test_foreach_next_loop_complete(self, mock_context, mock_state):
|
|
"""Test ForeachNextExecutor when loop is complete."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
LOOP_STATE_KEY,
|
|
ForeachNextExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Set up loop state at last item
|
|
state_data = state.get_state_data()
|
|
state_data[LOOP_STATE_KEY] = {
|
|
"loop_id": {
|
|
"items": ["a", "b"],
|
|
"index": 1, # Already at last item
|
|
"length": 2,
|
|
}
|
|
}
|
|
state.set_state_data(state_data)
|
|
|
|
action_def = {
|
|
"kind": "Foreach",
|
|
"source": "=Local.data",
|
|
"itemName": "item",
|
|
}
|
|
executor = ForeachNextExecutor(action_def, init_executor_id="loop_id")
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, LoopIterationResult)
|
|
assert msg.has_next is False
|
|
|
|
async def test_foreach_next_handle_break_control(self, mock_context, mock_state):
|
|
"""Test ForeachNextExecutor handles break LoopControl."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
LOOP_STATE_KEY,
|
|
ForeachNextExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Set up loop state
|
|
state_data = state.get_state_data()
|
|
state_data[LOOP_STATE_KEY] = {
|
|
"loop_id": {
|
|
"items": ["a", "b", "c"],
|
|
"index": 0,
|
|
"length": 3,
|
|
}
|
|
}
|
|
state.set_state_data(state_data)
|
|
|
|
action_def = {
|
|
"kind": "Foreach",
|
|
"source": "=Local.data",
|
|
"itemName": "item",
|
|
}
|
|
executor = ForeachNextExecutor(action_def, init_executor_id="loop_id")
|
|
|
|
await executor.handle_loop_control(LoopControl(action="break"), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, LoopIterationResult)
|
|
assert msg.has_next is False
|
|
|
|
async def test_foreach_next_handle_continue_control(self, mock_context, mock_state):
|
|
"""Test ForeachNextExecutor handles continue LoopControl."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
LOOP_STATE_KEY,
|
|
ForeachNextExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Set up loop state
|
|
state_data = state.get_state_data()
|
|
state_data[LOOP_STATE_KEY] = {
|
|
"loop_id": {
|
|
"items": ["a", "b", "c"],
|
|
"index": 0,
|
|
"length": 3,
|
|
}
|
|
}
|
|
state.set_state_data(state_data)
|
|
|
|
action_def = {
|
|
"kind": "Foreach",
|
|
"source": "=Local.data",
|
|
"itemName": "item",
|
|
}
|
|
executor = ForeachNextExecutor(action_def, init_executor_id="loop_id")
|
|
|
|
await executor.handle_loop_control(LoopControl(action="continue"), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, LoopIterationResult)
|
|
assert msg.has_next is True
|
|
assert msg.current_index == 1
|
|
|
|
async def test_end_workflow_executor(self, mock_context, mock_state):
|
|
"""Test EndWorkflowExecutor does not send continuation."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
EndWorkflowExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {"kind": "EndWorkflow"}
|
|
executor = EndWorkflowExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# Should NOT send any message
|
|
mock_context.send_message.assert_not_called()
|
|
|
|
async def test_end_conversation_executor(self, mock_context, mock_state):
|
|
"""Test EndConversationExecutor does not send continuation."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
EndConversationExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {"kind": "EndConversation"}
|
|
executor = EndConversationExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# Should NOT send any message
|
|
mock_context.send_message.assert_not_called()
|
|
|
|
@_requires_powerfx
|
|
async def test_condition_group_evaluator_first_match(self, mock_context, mock_state):
|
|
"""Test ConditionGroupEvaluatorExecutor returns first match."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
ConditionGroupEvaluatorExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.x", 10)
|
|
|
|
action_def = {"kind": "ConditionGroup"}
|
|
conditions = [
|
|
{"condition": "=Local.x > 20"},
|
|
{"condition": "=Local.x > 5"},
|
|
{"condition": "=Local.x > 0"},
|
|
]
|
|
executor = ConditionGroupEvaluatorExecutor(action_def, conditions=conditions)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, ConditionResult)
|
|
assert msg.matched is True
|
|
assert msg.branch_index == 1 # Second condition (x > 5) is first match
|
|
|
|
@_requires_powerfx
|
|
async def test_condition_group_evaluator_no_match(self, mock_context, mock_state):
|
|
"""Test ConditionGroupEvaluatorExecutor with no matches."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
ConditionGroupEvaluatorExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.x", 0)
|
|
|
|
action_def = {"kind": "ConditionGroup"}
|
|
conditions = [
|
|
{"condition": "=Local.x > 10"},
|
|
{"condition": "=Local.x > 5"},
|
|
]
|
|
executor = ConditionGroupEvaluatorExecutor(action_def, conditions=conditions)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, ConditionResult)
|
|
assert msg.matched is False
|
|
assert msg.branch_index == -1
|
|
|
|
@_requires_powerfx
|
|
async def test_condition_group_evaluator_boolean_true_condition(self, mock_context, mock_state):
|
|
"""Test ConditionGroupEvaluatorExecutor with boolean True condition."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
ConditionGroupEvaluatorExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {"kind": "ConditionGroup"}
|
|
conditions = [
|
|
{"condition": False}, # Should skip
|
|
{"condition": True}, # Should match
|
|
]
|
|
executor = ConditionGroupEvaluatorExecutor(action_def, conditions=conditions)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, ConditionResult)
|
|
assert msg.matched is True
|
|
assert msg.branch_index == 1
|
|
|
|
@_requires_powerfx
|
|
async def test_if_condition_evaluator_true(self, mock_context, mock_state):
|
|
"""Test IfConditionEvaluatorExecutor with true condition."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
IfConditionEvaluatorExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.flag", True)
|
|
|
|
action_def = {"kind": "If"}
|
|
executor = IfConditionEvaluatorExecutor(action_def, condition_expr="=Local.flag")
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, ConditionResult)
|
|
assert msg.matched is True
|
|
assert msg.branch_index == 0 # Then branch
|
|
|
|
@_requires_powerfx
|
|
async def test_if_condition_evaluator_false(self, mock_context, mock_state):
|
|
"""Test IfConditionEvaluatorExecutor with false condition."""
|
|
from agent_framework_declarative._workflows._executors_control_flow import (
|
|
IfConditionEvaluatorExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.flag", False)
|
|
|
|
action_def = {"kind": "If"}
|
|
executor = IfConditionEvaluatorExecutor(action_def, condition_expr="=Local.flag")
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
msg = mock_context.send_message.call_args[0][0]
|
|
assert isinstance(msg, ConditionResult)
|
|
assert msg.matched is False
|
|
assert msg.branch_index == -1 # Else branch
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Declarative Action Executor Base Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDeclarativeActionExecutorBase:
|
|
"""Tests for DeclarativeActionExecutor base class."""
|
|
|
|
async def test_ensure_state_initialized_with_dict_input(self, mock_context, mock_state):
|
|
"""Test _ensure_state_initialized with dict input."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetValueExecutor,
|
|
)
|
|
|
|
action_def = {"kind": "SetValue", "path": "Local.x", "value": 1}
|
|
executor = SetValueExecutor(action_def)
|
|
|
|
# Trigger with dict - should initialize state with it
|
|
await executor.handle_action({"custom": "input"}, mock_context)
|
|
|
|
# State should have been initialized with the dict
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
inputs = state.get("Workflow.Inputs")
|
|
assert inputs == {"custom": "input"}
|
|
|
|
async def test_ensure_state_initialized_with_string_input(self, mock_context, mock_state):
|
|
"""Test _ensure_state_initialized with string input."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetValueExecutor,
|
|
)
|
|
|
|
action_def = {"kind": "SetValue", "path": "Local.x", "value": 1}
|
|
executor = SetValueExecutor(action_def)
|
|
|
|
# Trigger with string - should wrap in {"input": ...}
|
|
await executor.handle_action("string trigger", mock_context)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
inputs = state.get("Workflow.Inputs")
|
|
assert inputs == {"input": "string trigger"}
|
|
|
|
async def test_ensure_state_initialized_with_custom_object(self, mock_context, mock_state):
|
|
"""Test _ensure_state_initialized with custom object converts to string."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetValueExecutor,
|
|
)
|
|
|
|
class CustomObj:
|
|
def __str__(self):
|
|
return "custom string"
|
|
|
|
action_def = {"kind": "SetValue", "path": "Local.x", "value": 1}
|
|
executor = SetValueExecutor(action_def)
|
|
|
|
await executor.handle_action(CustomObj(), mock_context)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
inputs = state.get("Workflow.Inputs")
|
|
assert inputs == {"input": "custom string"}
|
|
|
|
async def test_executor_display_name_property(self, mock_context, mock_state):
|
|
"""Test executor display_name property."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetValueExecutor,
|
|
)
|
|
|
|
action_def = {
|
|
"kind": "SetValue",
|
|
"displayName": "My Custom Action",
|
|
"path": "Local.x",
|
|
"value": 1,
|
|
}
|
|
executor = SetValueExecutor(action_def)
|
|
|
|
assert executor.display_name == "My Custom Action"
|
|
|
|
async def test_executor_action_def_property(self, mock_context, mock_state):
|
|
"""Test executor action_def property."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
SetValueExecutor,
|
|
)
|
|
|
|
action_def = {"kind": "SetValue", "path": "Local.x", "value": 1}
|
|
executor = SetValueExecutor(action_def)
|
|
|
|
assert executor.action_def == action_def
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Human Input Executors Tests - Covering _executors_external_input.py gaps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestHumanInputExecutorsCoverage:
|
|
"""Tests for human input executors covering uncovered code paths."""
|
|
|
|
async def test_request_external_input_executor(self, mock_context, mock_state):
|
|
"""Test RequestExternalInputExecutor."""
|
|
from agent_framework_declarative._workflows._executors_external_input import (
|
|
ExternalInputRequest,
|
|
RequestExternalInputExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "RequestExternalInput",
|
|
"requestType": "approval",
|
|
"prompt": {"text": "Please approve this request"},
|
|
"variable": "Local.approvalResult",
|
|
"timeout": 3600,
|
|
"requiredFields": ["approver", "notes"],
|
|
"metadata": {"priority": "high"},
|
|
}
|
|
executor = RequestExternalInputExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
mock_context.request_info.assert_called_once()
|
|
request = mock_context.request_info.call_args[0][0]
|
|
assert isinstance(request, ExternalInputRequest)
|
|
assert request.request_type == "approval"
|
|
assert request.message == "Please approve this request"
|
|
assert request.metadata["priority"] == "high"
|
|
assert request.metadata["required_fields"] == ["approver", "notes"]
|
|
assert request.metadata["timeout_seconds"] == 3600
|
|
|
|
async def test_question_executor_with_choices(self, mock_context, mock_state):
|
|
"""Test QuestionExecutor with choices as dicts and strings."""
|
|
from agent_framework_declarative._workflows._executors_external_input import (
|
|
ExternalInputRequest,
|
|
QuestionExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "Question",
|
|
"question": {"text": "Select an option:"},
|
|
"variable": "Local.selection",
|
|
"choices": [
|
|
{"value": "a", "label": "Option A"},
|
|
{"value": "b"}, # No label, should use value
|
|
"c", # String choice
|
|
],
|
|
"allowFreeText": False,
|
|
}
|
|
executor = QuestionExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
mock_context.request_info.assert_called_once()
|
|
request = mock_context.request_info.call_args[0][0]
|
|
assert isinstance(request, ExternalInputRequest)
|
|
assert request.request_type == "question"
|
|
choices = request.metadata["choices"]
|
|
assert len(choices) == 3
|
|
assert choices[0] == {"value": "a", "label": "Option A"}
|
|
assert choices[1] == {"value": "b", "label": "b"}
|
|
assert choices[2] == {"value": "c", "label": "c"}
|
|
assert request.metadata["allow_free_text"] is False
|
|
|
|
async def test_question_executor_reads_nested_question_text(self, mock_context, mock_state):
|
|
"""QuestionExecutor reads ``question.text``/``variable``/``default`` into the request."""
|
|
from agent_framework_declarative._workflows._executors_external_input import (
|
|
ExternalInputRequest,
|
|
QuestionExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "Question",
|
|
"question": {"text": "What is your name?"},
|
|
"variable": "Local.userName",
|
|
"default": "Guest",
|
|
}
|
|
executor = QuestionExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
mock_context.request_info.assert_called_once()
|
|
request = mock_context.request_info.call_args[0][0]
|
|
assert isinstance(request, ExternalInputRequest)
|
|
# Canonical text comes through as a plain string, not the stringified dict.
|
|
assert request.message == "What is your name?"
|
|
# Canonical `variable` overrides the legacy default of Local.answer.
|
|
assert request.metadata["output_property"] == "Local.userName"
|
|
assert request.metadata["default_value"] == "Guest"
|
|
|
|
async def test_question_executor_reads_top_level_alternates(self, mock_context, mock_state):
|
|
"""Top-level ``text``/``property``/``defaultValue`` are accepted as alternates."""
|
|
from agent_framework_declarative._workflows._executors_external_input import (
|
|
ExternalInputRequest,
|
|
QuestionExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "Question",
|
|
"text": "Legacy question",
|
|
"property": "Local.legacyAnswer",
|
|
"defaultValue": "legacy-default",
|
|
}
|
|
executor = QuestionExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
request = mock_context.request_info.call_args[0][0]
|
|
assert isinstance(request, ExternalInputRequest)
|
|
assert request.message == "Legacy question"
|
|
assert request.metadata["output_property"] == "Local.legacyAnswer"
|
|
assert request.metadata["default_value"] == "legacy-default"
|
|
|
|
async def test_request_external_input_reads_nested_prompt_text(self, mock_context, mock_state):
|
|
"""RequestExternalInputExecutor reads ``prompt.text``/``variable``/``default``."""
|
|
from agent_framework_declarative._workflows._executors_external_input import (
|
|
ExternalInputRequest,
|
|
RequestExternalInputExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "RequestExternalInput",
|
|
"prompt": {"text": "Please approve"},
|
|
"variable": "Local.approved",
|
|
"default": "pending",
|
|
}
|
|
executor = RequestExternalInputExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
request = mock_context.request_info.call_args[0][0]
|
|
assert isinstance(request, ExternalInputRequest)
|
|
assert request.message == "Please approve"
|
|
assert request.metadata["output_property"] == "Local.approved"
|
|
assert request.metadata["default_value"] == "pending"
|
|
|
|
async def test_request_external_input_reads_top_level_alternates(self, mock_context, mock_state):
|
|
"""Top-level ``message``/``property`` are accepted as alternates."""
|
|
from agent_framework_declarative._workflows._executors_external_input import (
|
|
ExternalInputRequest,
|
|
RequestExternalInputExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "RequestExternalInput",
|
|
"message": "Legacy message",
|
|
"property": "Local.legacyApproval",
|
|
}
|
|
executor = RequestExternalInputExecutor(action_def)
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
request = mock_context.request_info.call_args[0][0]
|
|
assert isinstance(request, ExternalInputRequest)
|
|
assert request.message == "Legacy message"
|
|
assert request.metadata["output_property"] == "Local.legacyApproval"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Additional Agent Executor Tests - External Loop Coverage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAgentExternalLoopCoverage:
|
|
"""Tests for agent executor external loop handling."""
|
|
|
|
@_requires_powerfx
|
|
async def test_agent_executor_with_external_loop(self, mock_context, mock_state):
|
|
"""Test agent executor with external loop that triggers."""
|
|
from unittest.mock import patch
|
|
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
AgentExternalInputRequest,
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
mock_agent = MagicMock()
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.input", "User query")
|
|
state.set("Local.needsMore", True) # Loop condition will be true
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"input": {
|
|
"externalLoop": {"when": "=Local.needsMore"},
|
|
},
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
|
|
|
|
# Mock the internal method to avoid storing Message objects in state
|
|
# (PowerFx cannot serialize Message)
|
|
with patch.object(
|
|
executor,
|
|
"_invoke_agent_and_store_results",
|
|
new=AsyncMock(return_value=("Need more info", [], [])),
|
|
):
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# Should request external input via request_info
|
|
mock_context.request_info.assert_called_once()
|
|
request = mock_context.request_info.call_args[0][0]
|
|
assert isinstance(request, AgentExternalInputRequest)
|
|
assert request.agent_name == "TestAgent"
|
|
|
|
async def test_agent_executor_agent_error_handling(self, mock_context, mock_state):
|
|
"""Test agent executor raises AgentInvalidResponseException on failure."""
|
|
from agent_framework.exceptions import AgentInvalidResponseException
|
|
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
mock_agent = MagicMock()
|
|
mock_agent.run = AsyncMock(side_effect=RuntimeError("Agent failed"))
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.input", "Query")
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"resultProperty": "Local.result",
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
|
|
|
|
with pytest.raises(AgentInvalidResponseException) as exc_info:
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
assert "TestAgent" in str(exc_info.value)
|
|
assert "Agent failed" in str(exc_info.value)
|
|
|
|
# Should still store error in state before raising
|
|
error = state.get("Agent.error")
|
|
assert "Agent failed" in error
|
|
result = state.get("Local.result")
|
|
assert result == {"error": "Agent failed"}
|
|
|
|
async def test_agent_executor_string_result(self, mock_context, mock_state):
|
|
"""Test agent executor with agent that returns string directly."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
mock_agent = MagicMock()
|
|
mock_agent.run = AsyncMock(return_value="Direct string response")
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.input", "Query")
|
|
|
|
action_def = {
|
|
"kind": "InvokeAzureAgent",
|
|
"agent": "TestAgent",
|
|
"resultProperty": "Local.result",
|
|
"output": {"autoSend": True},
|
|
}
|
|
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
|
|
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# Should auto-send output
|
|
mock_context.yield_output.assert_called_with("Direct string response")
|
|
result = state.get("Local.result")
|
|
assert result == "Direct string response"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PowerFx Functions Coverage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@_requires_powerfx
|
|
class TestPowerFxFunctionsCoverage:
|
|
"""Tests for PowerFx function evaluation coverage."""
|
|
|
|
async def test_eval_lower_upper_functions(self, mock_state):
|
|
"""Test Lower and Upper functions."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.text", "Hello World")
|
|
|
|
result = state.eval("=Lower(Local.text)")
|
|
assert result == "hello world"
|
|
|
|
result = state.eval("=Upper(Local.text)")
|
|
assert result == "HELLO WORLD"
|
|
|
|
async def test_eval_if_function(self, mock_state):
|
|
"""Test If function."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.flag", True)
|
|
|
|
result = state.eval('=If(Local.flag, "yes", "no")')
|
|
assert result == "yes"
|
|
|
|
state.set("Local.flag", False)
|
|
result = state.eval('=If(Local.flag, "yes", "no")')
|
|
assert result == "no"
|
|
|
|
async def test_eval_not_function(self, mock_state):
|
|
"""Test Not function."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.flag", True)
|
|
|
|
result = state.eval("=Not(Local.flag)")
|
|
assert result is False
|
|
|
|
async def test_eval_and_or_functions(self, mock_state):
|
|
"""Test And and Or functions."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.a", True)
|
|
state.set("Local.b", False)
|
|
|
|
result = state.eval("=And(Local.a, Local.b)")
|
|
assert result is False
|
|
|
|
result = state.eval("=Or(Local.a, Local.b)")
|
|
assert result is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Builder control flow tests - Covering Goto/Break/Continue creation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBuilderControlFlowCreation:
|
|
"""Tests for Goto, Break, Continue executor creation in builder."""
|
|
|
|
def test_create_goto_reference(self):
|
|
"""Test creating a goto reference executor."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
# Create builder with minimal yaml definition
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
action_def = {
|
|
"kind": "GotoAction",
|
|
"target": "some_target_action",
|
|
"id": "goto_test",
|
|
}
|
|
|
|
executor = graph_builder._create_goto_reference(action_def, wb, None)
|
|
|
|
assert executor is not None
|
|
assert executor.id == "goto_test"
|
|
# Verify pending goto was recorded
|
|
assert len(graph_builder._pending_gotos) == 1
|
|
assert graph_builder._pending_gotos[0][1] == "some_target_action"
|
|
|
|
def test_create_goto_reference_auto_id(self):
|
|
"""Test creating a goto with auto-generated ID."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
action_def = {
|
|
"kind": "GotoAction",
|
|
"target": "target_action",
|
|
}
|
|
|
|
executor = graph_builder._create_goto_reference(action_def, wb, None)
|
|
|
|
assert executor is not None
|
|
assert "goto_target_action" in executor.id
|
|
|
|
def test_create_goto_reference_no_target(self):
|
|
"""Test creating a goto with no target returns None."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
action_def = {
|
|
"kind": "GotoAction",
|
|
# No target specified
|
|
}
|
|
|
|
executor = graph_builder._create_goto_reference(action_def, wb, None)
|
|
assert executor is None
|
|
|
|
def test_goto_invalid_target_raises_error(self):
|
|
"""Test that goto to non-existent target raises ValueError."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [
|
|
{"kind": "SendActivity", "id": "action1", "activity": {"text": "Hello"}},
|
|
{"kind": "GotoAction", "target": "non_existent_action"},
|
|
],
|
|
}
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "non_existent_action" in str(exc_info.value)
|
|
assert "not found" in str(exc_info.value)
|
|
|
|
def test_create_break_executor(self):
|
|
"""Test creating a break executor within a loop context."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_control_flow import ForeachNextExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
# Create a mock loop_next executor
|
|
loop_next = ForeachNextExecutor(
|
|
{"kind": "Foreach", "source": "=Local.items"},
|
|
init_executor_id="foreach_init",
|
|
id="foreach_next",
|
|
)
|
|
wb._add_executor(loop_next)
|
|
|
|
parent_context = {"loop_next_executor": loop_next}
|
|
|
|
action_def = {
|
|
"kind": "BreakLoop",
|
|
"id": "break_test",
|
|
}
|
|
|
|
executor = graph_builder._create_break_executor(action_def, wb, parent_context)
|
|
|
|
assert executor is not None
|
|
assert executor.id == "break_test"
|
|
|
|
def test_create_break_executor_no_loop_context(self):
|
|
"""Test creating a break executor without loop context raises ValueError."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
action_def = {
|
|
"kind": "BreakLoop",
|
|
}
|
|
|
|
# No parent_context should raise ValueError
|
|
with pytest.raises(ValueError) as exc_info:
|
|
graph_builder._create_break_executor(action_def, wb, None)
|
|
assert "BreakLoop action can only be used inside a Foreach loop" in str(exc_info.value)
|
|
|
|
# Empty context should also raise ValueError
|
|
with pytest.raises(ValueError) as exc_info:
|
|
graph_builder._create_break_executor(action_def, wb, {})
|
|
assert "BreakLoop action can only be used inside a Foreach loop" in str(exc_info.value)
|
|
|
|
def test_create_continue_executor(self):
|
|
"""Test creating a continue executor within a loop context."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_control_flow import ForeachNextExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
# Create a mock loop_next executor
|
|
loop_next = ForeachNextExecutor(
|
|
{"kind": "Foreach", "source": "=Local.items"},
|
|
init_executor_id="foreach_init",
|
|
id="foreach_next",
|
|
)
|
|
wb._add_executor(loop_next)
|
|
|
|
parent_context = {"loop_next_executor": loop_next}
|
|
|
|
action_def = {
|
|
"kind": "ContinueLoop",
|
|
"id": "continue_test",
|
|
}
|
|
|
|
executor = graph_builder._create_continue_executor(action_def, wb, parent_context)
|
|
|
|
assert executor is not None
|
|
assert executor.id == "continue_test"
|
|
|
|
def test_create_continue_executor_no_loop_context(self):
|
|
"""Test creating a continue executor without loop context raises ValueError."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
action_def = {
|
|
"kind": "ContinueLoop",
|
|
}
|
|
|
|
# No parent_context should raise ValueError
|
|
with pytest.raises(ValueError) as exc_info:
|
|
graph_builder._create_continue_executor(action_def, wb, None)
|
|
assert "ContinueLoop action can only be used inside a Foreach loop" in str(exc_info.value)
|
|
|
|
|
|
class TestBuilderEdgeWiring:
|
|
"""Tests for builder edge wiring methods."""
|
|
|
|
def test_foreach_advance_edge_wired_from_last_body_action(self):
|
|
"""Advance edge must come from the last body action."""
|
|
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "foreach_seq",
|
|
"actions": [
|
|
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A", "B"]},
|
|
{
|
|
"kind": "Foreach",
|
|
"id": "loop",
|
|
"source": "=Local.items",
|
|
"itemName": "item",
|
|
"actions": [
|
|
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
|
|
{"kind": "SendActivity", "id": "step_2", "activity": {"text": "two"}},
|
|
{"kind": "SendActivity", "id": "step_3", "activity": {"text": "three"}},
|
|
],
|
|
},
|
|
],
|
|
}
|
|
|
|
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
|
|
edges = {(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges}
|
|
|
|
assert ("step_3", "loop_next") in edges
|
|
assert ("step_1", "loop_next") not in edges
|
|
assert ("step_2", "loop_next") not in edges
|
|
assert ("step_1", "step_2") in edges
|
|
assert ("step_2", "step_3") in edges
|
|
|
|
def test_foreach_advance_edge_skipped_for_terminator_body(self):
|
|
"""BreakLoop at end of body wires itself to loop_next; no duplicate edge."""
|
|
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "foreach_terminator",
|
|
"actions": [
|
|
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A"]},
|
|
{
|
|
"kind": "Foreach",
|
|
"id": "loop",
|
|
"source": "=Local.items",
|
|
"itemName": "item",
|
|
"actions": [
|
|
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
|
|
{"kind": "BreakLoop", "id": "stop"},
|
|
],
|
|
},
|
|
],
|
|
}
|
|
|
|
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
|
|
all_edges = [(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges]
|
|
assert all_edges.count(("stop", "loop_next")) == 1
|
|
assert ("step_1", "loop_next") not in all_edges
|
|
|
|
def test_foreach_advance_edge_with_if_as_last_body_action(self):
|
|
"""Trailing If in a Foreach body wires every branch exit to loop_next."""
|
|
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "foreach_if_last",
|
|
"actions": [
|
|
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A", "B"]},
|
|
{
|
|
"kind": "Foreach",
|
|
"id": "loop",
|
|
"source": "=Local.items",
|
|
"itemName": "item",
|
|
"actions": [
|
|
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
|
|
{
|
|
"kind": "If",
|
|
"id": "check",
|
|
"condition": '=Local.item = "A"',
|
|
"then": [
|
|
{"kind": "SendActivity", "id": "then_action", "activity": {"text": "then"}},
|
|
],
|
|
"else": [
|
|
{"kind": "SendActivity", "id": "else_action", "activity": {"text": "else"}},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
],
|
|
}
|
|
|
|
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
|
|
edges = {(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges}
|
|
|
|
assert ("then_action", "loop_next") in edges
|
|
assert ("else_action", "loop_next") in edges
|
|
assert ("step_1", "loop_next") not in edges
|
|
|
|
def test_wire_to_target_with_if_structure(self):
|
|
"""Test wiring to an If structure routes to evaluator."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
# Create a mock source executor
|
|
source = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "test"}}, id="source")
|
|
wb._add_executor(source)
|
|
|
|
# Create a mock If structure with evaluator
|
|
class MockIfStructure:
|
|
_is_if_structure = True
|
|
|
|
def __init__(self):
|
|
self.evaluator = SendActivityExecutor(
|
|
{"kind": "SendActivity", "activity": {"text": "evaluator"}}, id="evaluator"
|
|
)
|
|
|
|
target = MockIfStructure()
|
|
wb._add_executor(target.evaluator)
|
|
|
|
# Wire should add edge to evaluator
|
|
graph_builder._wire_to_target(wb, source, target)
|
|
|
|
# Verify edge was added (would need to inspect workflow internals)
|
|
# For now, just verify no exception was raised
|
|
|
|
def test_wire_to_target_normal_executor(self):
|
|
"""Test wiring to a normal executor adds direct edge."""
|
|
from agent_framework import WorkflowBuilder
|
|
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
|
|
|
|
source = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "source"}}, id="source")
|
|
target = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "target"}}, id="target")
|
|
|
|
wb._add_executor(source)
|
|
wb._add_executor(target)
|
|
|
|
graph_builder._wire_to_target(wb, source, target)
|
|
# Verify edge creation (no exception = success)
|
|
|
|
def test_collect_all_exits_for_nested_structure(self):
|
|
"""Test collecting all exits from nested structures."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
# Create mock nested structure
|
|
exit1 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "exit1"}}, id="exit1")
|
|
exit2 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "exit2"}}, id="exit2")
|
|
|
|
class InnerStructure:
|
|
def __init__(self):
|
|
self.branch_exits = [exit1, exit2]
|
|
|
|
class OuterStructure:
|
|
def __init__(self):
|
|
self.branch_exits = [InnerStructure()]
|
|
|
|
outer = OuterStructure()
|
|
exits = graph_builder._collect_all_exits(outer)
|
|
|
|
assert len(exits) == 2
|
|
assert exit1 in exits
|
|
assert exit2 in exits
|
|
|
|
def test_collect_all_exits_for_simple_executor(self):
|
|
"""Test collecting exits from a simple executor."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
executor = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "test"}}, id="test")
|
|
|
|
exits = graph_builder._collect_all_exits(executor)
|
|
|
|
assert len(exits) == 1
|
|
assert executor in exits
|
|
|
|
def test_get_branch_exit_with_chain(self):
|
|
"""Test getting branch exit from a chain of executors."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
exec1 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "1"}}, id="e1")
|
|
exec2 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "2"}}, id="e2")
|
|
exec3 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "3"}}, id="e3")
|
|
|
|
# Simulate a chain by dynamically setting attribute
|
|
exec1._chain_executors = [exec1, exec2, exec3] # type: ignore[attr-defined]
|
|
|
|
exit_exec = graph_builder._get_branch_exit(exec1)
|
|
|
|
assert exit_exec == exec3
|
|
|
|
def test_get_branch_exit_none(self):
|
|
"""Test getting branch exit from None."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
exit_exec = graph_builder._get_branch_exit(None)
|
|
assert exit_exec is None
|
|
|
|
def test_get_branch_exit_returns_none_for_goto_terminator(self):
|
|
"""Test that _get_branch_exit returns None when branch ends with GotoAction.
|
|
|
|
GotoAction is a terminator that handles its own control flow (jumping to
|
|
the target action). It should NOT be returned as a branch exit, because
|
|
that would cause the parent ConditionGroup to wire it to the next
|
|
sequential action, creating a dual-edge where both the goto target and
|
|
the next action receive messages.
|
|
"""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
# GotoAction executor is a JoinExecutor with a GotoAction action_def
|
|
goto_executor = JoinExecutor(
|
|
{"kind": "GotoAction", "id": "goto_summary", "actionId": "invoke_summary"},
|
|
id="goto_summary",
|
|
)
|
|
|
|
# Simulate a single-action branch chain
|
|
goto_executor._chain_executors = [goto_executor] # type: ignore[attr-defined]
|
|
|
|
exit_exec = graph_builder._get_branch_exit(goto_executor)
|
|
assert exit_exec is None
|
|
|
|
def test_get_branch_exit_returns_none_for_end_workflow_terminator(self):
|
|
"""Test that _get_branch_exit returns None when branch ends with EndWorkflow."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
end_executor = JoinExecutor(
|
|
{"kind": "EndWorkflow", "id": "end"},
|
|
id="end",
|
|
)
|
|
end_executor._chain_executors = [end_executor] # type: ignore[attr-defined]
|
|
|
|
exit_exec = graph_builder._get_branch_exit(end_executor)
|
|
assert exit_exec is None
|
|
|
|
def test_get_branch_exit_returns_none_for_goto_in_chain(self):
|
|
"""Test that _get_branch_exit returns None when chain ends with GotoAction.
|
|
|
|
Even when a branch has multiple actions before the GotoAction,
|
|
the branch exit should be None because the last action is a terminator.
|
|
"""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
|
|
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
# A branch with: SendActivity -> GotoAction
|
|
activity = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "msg"}}, id="msg")
|
|
goto = JoinExecutor(
|
|
{"kind": "GotoAction", "id": "goto_target", "actionId": "some_target"},
|
|
id="goto_target",
|
|
)
|
|
activity._chain_executors = [activity, goto] # type: ignore[attr-defined]
|
|
|
|
exit_exec = graph_builder._get_branch_exit(activity)
|
|
assert exit_exec is None
|
|
|
|
def test_get_branch_exit_returns_executor_for_non_terminator(self):
|
|
"""Test that _get_branch_exit still returns the exit for non-terminator branches."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
|
|
|
|
yaml_def = {"name": "test_workflow", "actions": []}
|
|
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
|
|
exec1 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "1"}}, id="e1")
|
|
exec2 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "2"}}, id="e2")
|
|
exec1._chain_executors = [exec1, exec2] # type: ignore[attr-defined]
|
|
|
|
exit_exec = graph_builder._get_branch_exit(exec1)
|
|
assert exit_exec == exec2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Agent executor external loop response handler tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAgentExecutorExternalLoop:
|
|
"""Tests for InvokeAzureAgentExecutor external loop response handling."""
|
|
|
|
async def test_handle_external_input_response_no_state(self, mock_context, mock_state):
|
|
"""Test handling external input response when loop state not found."""
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
AgentExternalInputRequest,
|
|
AgentExternalInputResponse,
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
executor = InvokeAzureAgentExecutor({"kind": "InvokeAzureAgent", "agent": "TestAgent"})
|
|
|
|
# No external loop state in mock_state
|
|
original_request = AgentExternalInputRequest(
|
|
request_id="req-1",
|
|
agent_name="TestAgent",
|
|
agent_response="Hello",
|
|
iteration=1,
|
|
)
|
|
response = AgentExternalInputResponse(user_input="hi there")
|
|
|
|
await executor.handle_external_input_response(original_request, response, mock_context)
|
|
|
|
# Should send ActionComplete due to missing state
|
|
mock_context.send_message.assert_called()
|
|
call_args = mock_context.send_message.call_args[0][0]
|
|
from agent_framework_declarative._workflows import ActionComplete
|
|
|
|
assert isinstance(call_args, ActionComplete)
|
|
|
|
async def test_handle_external_input_response_agent_not_found(self, mock_context, mock_state):
|
|
"""Test handling external input raises error when agent not found during resumption."""
|
|
from agent_framework.exceptions import AgentInvalidRequestException
|
|
|
|
from agent_framework_declarative._workflows._executors_agents import (
|
|
EXTERNAL_LOOP_STATE_KEY,
|
|
AgentExternalInputRequest,
|
|
AgentExternalInputResponse,
|
|
ExternalLoopState,
|
|
InvokeAzureAgentExecutor,
|
|
)
|
|
|
|
# Set up loop state with always true condition (literal)
|
|
loop_state = ExternalLoopState(
|
|
agent_name="NonExistentAgent",
|
|
iteration=1,
|
|
external_loop_when="true", # Literal true
|
|
messages_var=None,
|
|
response_obj_var=None,
|
|
result_property=None,
|
|
auto_send=True,
|
|
messages_path="Conversation.messages",
|
|
)
|
|
mock_state._data[EXTERNAL_LOOP_STATE_KEY] = loop_state
|
|
|
|
# Initialize declarative state with simple value
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
executor = InvokeAzureAgentExecutor({"kind": "InvokeAzureAgent", "agent": "NonExistentAgent"})
|
|
|
|
original_request = AgentExternalInputRequest(
|
|
request_id="req-1",
|
|
agent_name="NonExistentAgent",
|
|
agent_response="Hello",
|
|
iteration=1,
|
|
)
|
|
response = AgentExternalInputResponse(user_input="continue")
|
|
|
|
with pytest.raises(AgentInvalidRequestException) as exc_info:
|
|
await executor.handle_external_input_response(original_request, response, mock_context)
|
|
|
|
assert "NonExistentAgent" in str(exc_info.value)
|
|
assert "not found during loop resumption" in str(exc_info.value)
|
|
|
|
|
|
class TestBuilderValidation:
|
|
"""Tests for builder validation features (P1 fixes)."""
|
|
|
|
def test_duplicate_explicit_action_id_raises_error(self):
|
|
"""Test that duplicate explicit action IDs are detected."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [
|
|
{"id": "my_action", "kind": "SendActivity", "activity": {"text": "First"}},
|
|
{"id": "my_action", "kind": "SendActivity", "activity": {"text": "Second"}},
|
|
],
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "Duplicate action ID 'my_action'" in str(exc_info.value)
|
|
|
|
def test_duplicate_id_in_nested_actions(self):
|
|
"""Test duplicate ID detection in nested If/Switch branches."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [
|
|
{
|
|
"kind": "If",
|
|
"condition": "=true",
|
|
"then": [{"id": "shared_id", "kind": "SendActivity", "activity": {"text": "Then"}}],
|
|
"else": [{"id": "shared_id", "kind": "SendActivity", "activity": {"text": "Else"}}],
|
|
}
|
|
],
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "Duplicate action ID 'shared_id'" in str(exc_info.value)
|
|
|
|
def test_missing_required_field_sendactivity(self):
|
|
"""Test that missing required fields are detected."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [{"kind": "SendActivity"}], # Missing 'activity' field
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "SendActivity" in str(exc_info.value)
|
|
assert "missing required field" in str(exc_info.value)
|
|
assert "activity" in str(exc_info.value)
|
|
|
|
def test_missing_required_field_setvalue(self):
|
|
"""Test SetValue without path raises error."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [{"kind": "SetValue", "value": "test"}], # Missing 'path' field
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "SetValue" in str(exc_info.value)
|
|
assert "path" in str(exc_info.value)
|
|
|
|
def test_setvalue_accepts_alternate_variable_field(self):
|
|
"""Test SetValue accepts 'variable' as alternate to 'path'."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [{"kind": "SetValue", "variable": {"path": "Local.x"}, "value": "test"}],
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
# Should not raise - 'variable' is accepted as alternate
|
|
workflow = builder.build()
|
|
assert workflow is not None
|
|
|
|
def test_missing_required_field_foreach(self):
|
|
"""Test Foreach without source raises error."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [{"kind": "Foreach", "actions": [{"kind": "SendActivity", "activity": {"text": "Hi"}}]}],
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "Foreach" in str(exc_info.value)
|
|
assert "source" in str(exc_info.value)
|
|
|
|
def test_self_referencing_goto_raises_error(self):
|
|
"""Test that a goto referencing itself is detected."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [{"id": "loop", "kind": "GotoAction", "actionId": "loop"}],
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "loop" in str(exc_info.value)
|
|
assert "self-referencing" in str(exc_info.value)
|
|
|
|
def test_validation_can_be_disabled(self):
|
|
"""Test that validation can be disabled for early schema/duplicate checks.
|
|
|
|
Note: Even with validation disabled, the underlying WorkflowBuilder may
|
|
still catch duplicates during graph construction. This flag disables
|
|
our upfront validation pass but not runtime checks.
|
|
"""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
# Test with missing required field - validation disabled should skip our check
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [{"kind": "SendActivity"}], # Missing 'activity' - normally caught by validation
|
|
}
|
|
|
|
# With validation disabled, our upfront check is skipped
|
|
builder = DeclarativeWorkflowBuilder(yaml_def, validate=False)
|
|
# The workflow may still fail for other reasons, but our validation pass is skipped
|
|
# In this case, it should succeed because SendActivityExecutor handles missing fields gracefully
|
|
workflow = builder.build()
|
|
assert workflow is not None
|
|
|
|
def test_validation_in_condition_group_branches(self):
|
|
"""Test validation catches issues in ConditionGroup branches."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [
|
|
{
|
|
"kind": "ConditionGroup",
|
|
"conditions": [
|
|
{
|
|
"condition": '=Local.choice = "a"',
|
|
"actions": [{"id": "dup", "kind": "SendActivity", "activity": {"text": "A"}}],
|
|
},
|
|
{
|
|
"condition": '=Local.choice = "b"',
|
|
"actions": [{"id": "dup", "kind": "SendActivity", "activity": {"text": "B"}}],
|
|
},
|
|
],
|
|
}
|
|
],
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "Duplicate action ID 'dup'" in str(exc_info.value)
|
|
|
|
def test_validation_in_foreach_body(self):
|
|
"""Test validation catches issues in Foreach body."""
|
|
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
|
|
|
|
yaml_def = {
|
|
"name": "test_workflow",
|
|
"actions": [
|
|
{
|
|
"kind": "Foreach",
|
|
"source": "=Local.items",
|
|
"actions": [{"kind": "SendActivity"}], # Missing 'activity'
|
|
}
|
|
],
|
|
}
|
|
|
|
builder = DeclarativeWorkflowBuilder(yaml_def)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
builder.build()
|
|
|
|
assert "SendActivity" in str(exc_info.value)
|
|
assert "activity" in str(exc_info.value)
|
|
|
|
|
|
@_requires_powerfx
|
|
class TestExpressionEdgeCases:
|
|
"""Tests for expression evaluation edge cases."""
|
|
|
|
async def test_division_with_valid_values(self, mock_state):
|
|
"""Test normal division works correctly."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.x", 10)
|
|
state.set("Local.y", 4)
|
|
|
|
result = state.eval("=Local.x / Local.y")
|
|
assert result == 2.5
|
|
|
|
async def test_multiplication_normal(self, mock_state):
|
|
"""Test normal multiplication."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
state.set("Local.x", 6)
|
|
state.set("Local.y", 7)
|
|
|
|
result = state.eval("=Local.x * Local.y")
|
|
assert result == 42
|
|
|
|
|
|
@_requires_powerfx
|
|
class TestLongMessageTextHandling:
|
|
"""Tests for handling long MessageText results that exceed PowerFx limits."""
|
|
|
|
async def test_short_message_text_embedded_inline(self, mock_state):
|
|
"""Test that short MessageText results are embedded inline."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Store a short message
|
|
short_text = "Hello world"
|
|
state.set("Local.Messages", [{"text": short_text, "contents": [{"type": "text", "text": short_text}]}])
|
|
|
|
# Evaluate a formula with MessageText - should embed inline
|
|
result = state.eval("=Upper(MessageText(Local.Messages))")
|
|
assert result == "HELLO WORLD"
|
|
|
|
# No temp variable should be created for short strings
|
|
temp_var = state.get("Local._TempMessageText0")
|
|
assert temp_var is None
|
|
|
|
async def test_long_message_text_stored_in_temp_variable(self, mock_state):
|
|
"""Long MessageText results round-trip and the temp key is removed after eval."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Create a message longer than 500 characters
|
|
long_text = "A" * 600 # 600 characters exceeds the 500 char threshold
|
|
state.set("Local.Messages", [{"text": long_text, "contents": [{"type": "text", "text": long_text}]}])
|
|
|
|
# Evaluate a formula with MessageText
|
|
result = state.eval("=Upper(MessageText(Local.Messages))")
|
|
assert result == "A" * 600 # Upper on 'A' is still 'A'
|
|
|
|
local = state.get_state_data().get("Local", {})
|
|
remaining = sorted(k for k in local if k.startswith("_TempMessageText"))
|
|
assert not remaining, f"Temporary keys remain in Local: {remaining}"
|
|
|
|
async def test_find_with_long_message_text(self, mock_state):
|
|
"""Test Find function works with long MessageText stored in temp variable."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Create a long message with a keyword to find
|
|
long_text = "X" * 550 + "CONGRATULATIONS" + "Y" * 50
|
|
state.set("Local.Messages", [{"text": long_text, "contents": [{"type": "text", "text": long_text}]}])
|
|
|
|
# Test the pattern used in student_teacher workflow
|
|
result = state.eval('=!IsBlank(Find("CONGRATULATIONS", Upper(MessageText(Local.Messages))))')
|
|
assert result is True
|
|
|
|
async def test_find_without_keyword_in_long_text(self, mock_state):
|
|
"""Test Find returns blank when keyword not found in long text."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
# Long text without the keyword
|
|
long_text = "X" * 600
|
|
state.set("Local.Messages", [{"text": long_text, "contents": [{"type": "text", "text": long_text}]}])
|
|
|
|
result = state.eval('=!IsBlank(Find("CONGRATULATIONS", Upper(MessageText(Local.Messages))))')
|
|
assert result is False
|
|
|
|
|
|
class TestCreateConversationExecutor:
|
|
"""Tests for CreateConversationExecutor."""
|
|
|
|
async def test_basic_creation(self, mock_context, mock_state):
|
|
"""Test that a UUID is generated, stored at conversationId path, and conversation entry created."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
CreateConversationExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "CreateConversation",
|
|
"conversationId": "Local.myConvId",
|
|
}
|
|
executor = CreateConversationExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# A UUID should be stored at the requested path
|
|
conv_id = state.get("Local.myConvId")
|
|
assert conv_id is not None
|
|
assert isinstance(conv_id, str)
|
|
assert len(conv_id) == 36 # UUID format
|
|
|
|
# Conversation entry should exist in System.conversations
|
|
conversations = state.get("System.conversations")
|
|
assert conversations is not None
|
|
assert conv_id in conversations
|
|
assert conversations[conv_id]["id"] == conv_id
|
|
assert conversations[conv_id]["messages"] == []
|
|
|
|
async def test_no_conversation_id_param(self, mock_context, mock_state):
|
|
"""Test that conversation is still created even without a conversationId param."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
CreateConversationExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def = {
|
|
"kind": "CreateConversation",
|
|
}
|
|
executor = CreateConversationExecutor(action_def)
|
|
await executor.handle_action(ActionTrigger(), mock_context)
|
|
|
|
# Conversation entry should still exist in System.conversations
|
|
# (initialize() seeds one default conversation, plus the one just created)
|
|
conversations = state.get("System.conversations")
|
|
assert conversations is not None
|
|
assert len(conversations) == 2
|
|
|
|
async def test_multiple_conversations(self, mock_context, mock_state):
|
|
"""Test creating multiple conversations produces distinct IDs."""
|
|
from agent_framework_declarative._workflows._executors_basic import (
|
|
CreateConversationExecutor,
|
|
)
|
|
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
action_def1 = {
|
|
"kind": "CreateConversation",
|
|
"conversationId": "Local.conv1",
|
|
}
|
|
action_def2 = {
|
|
"kind": "CreateConversation",
|
|
"conversationId": "Local.conv2",
|
|
}
|
|
|
|
executor1 = CreateConversationExecutor(action_def1)
|
|
await executor1.handle_action(ActionTrigger(), mock_context)
|
|
|
|
executor2 = CreateConversationExecutor(action_def2)
|
|
await executor2.handle_action(ActionTrigger(), mock_context)
|
|
|
|
conv1 = state.get("Local.conv1")
|
|
conv2 = state.get("Local.conv2")
|
|
|
|
assert conv1 != conv2
|
|
|
|
# initialize() seeds one default conversation, plus the two just created
|
|
conversations = state.get("System.conversations")
|
|
assert len(conversations) == 3
|
|
assert conv1 in conversations
|
|
assert conv2 in conversations
|
|
|
|
|
|
class TestDeclarativeWorkflowStateConversationIdInit:
|
|
"""Tests that DeclarativeWorkflowState.initialize() generates a real UUID for ConversationId."""
|
|
|
|
async def test_conversation_id_is_not_default(self, mock_state):
|
|
"""System.ConversationId should be a UUID, not 'default'."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
conv_id = state.get("System.ConversationId")
|
|
assert conv_id is not None
|
|
assert conv_id != "default"
|
|
# Validate it looks like a UUID
|
|
import uuid
|
|
|
|
uuid.UUID(conv_id) # Raises ValueError if not a valid UUID
|
|
|
|
async def test_conversations_dict_initialized(self, mock_state):
|
|
"""System.conversations should contain an entry matching ConversationId."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
state.initialize()
|
|
|
|
conv_id = state.get("System.ConversationId")
|
|
conversations = state.get("System.conversations")
|
|
assert conversations is not None
|
|
assert conv_id in conversations
|
|
assert conversations[conv_id]["id"] == conv_id
|
|
assert conversations[conv_id]["messages"] == []
|
|
|
|
async def test_each_initialize_generates_unique_id(self, mock_state):
|
|
"""Each call to initialize() should produce a different ConversationId."""
|
|
state = DeclarativeWorkflowState(mock_state)
|
|
|
|
state.initialize()
|
|
id1 = state.get("System.ConversationId")
|
|
|
|
state.initialize()
|
|
id2 = state.get("System.ConversationId")
|
|
|
|
assert id1 != id2
|