Files
Peter Ibekwe e7937947d9 Python: Bug fix for declarative workflows (#6468)
* Fix declarative object parsing bug

* Remove unnecessary comment

* Address PR comments

* Address PR comments.

* Fix CI failures.
2026-06-11 22:34:15 +00:00

2938 lines
112 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
# pyright: reportUnknownParameterType=false, reportUnknownArgumentType=false
# pyright: reportMissingParameterType=false, reportUnknownMemberType=false
# pyright: reportPrivateUsage=false, reportUnknownVariableType=false
# pyright: reportGeneralTypeIssues=false
from dataclasses import dataclass
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from agent_framework_declarative._workflows import (
ActionComplete,
ActionTrigger,
DeclarativeWorkflowState,
)
from agent_framework_declarative._workflows._declarative_base import (
ConditionResult,
LoopControl,
LoopIterationResult,
)
try:
import powerfx # noqa: F401
_powerfx_available = True
except (ImportError, RuntimeError):
_powerfx_available = False
_requires_powerfx = pytest.mark.skipif(not _powerfx_available, reason="PowerFx engine not available")
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_state() -> MagicMock:
"""Create a mock state with sync get/set/delete methods."""
mock_state = MagicMock()
mock_state._data = {}
def mock_get(key: str, default: Any = None) -> Any:
return mock_state._data.get(key, default)
def mock_set(key: str, value: Any) -> None:
mock_state._data[key] = value
def mock_has(key: str) -> bool:
return key in mock_state._data
def mock_delete(key: str) -> None:
if key in mock_state._data:
del mock_state._data[key]
mock_state.get = MagicMock(side_effect=mock_get)
mock_state.set = MagicMock(side_effect=mock_set)
mock_state.has = MagicMock(side_effect=mock_has)
mock_state.delete = MagicMock(side_effect=mock_delete)
return mock_state
@pytest.fixture
def mock_context(mock_state: MagicMock) -> MagicMock:
"""Create a mock workflow context."""
ctx = MagicMock()
ctx.state = mock_state
ctx.send_message = AsyncMock()
ctx.yield_output = AsyncMock()
ctx.request_info = AsyncMock()
return ctx
# ---------------------------------------------------------------------------
# DeclarativeWorkflowState Tests - Covering _base.py gaps
# ---------------------------------------------------------------------------
class TestDeclarativeWorkflowStateExtended:
"""Extended tests for DeclarativeWorkflowState covering uncovered code paths."""
async def test_get_with_local_namespace(self, mock_state):
"""Test Local. namespace mapping."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.myVar", "value123")
# Access via Local. namespace
result = state.get("Local.myVar")
assert result == "value123"
async def test_get_with_system_namespace(self, mock_state):
"""Test System. namespace mapping."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("System.ConversationId", "conv-123")
result = state.get("System.ConversationId")
assert result == "conv-123"
async def test_get_with_workflow_namespace(self, mock_state):
"""Test Workflow. namespace mapping."""
state = DeclarativeWorkflowState(mock_state)
state.initialize({"query": "test"})
result = state.get("Workflow.Inputs.query")
assert result == "test"
async def test_get_with_inputs_shorthand(self, mock_state):
"""Test inputs. shorthand namespace mapping."""
state = DeclarativeWorkflowState(mock_state)
state.initialize({"query": "test"})
result = state.get("Workflow.Inputs.query")
assert result == "test"
async def test_get_agent_namespace(self, mock_state):
"""Test agent namespace access."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Agent.response", "Hello!")
result = state.get("Agent.response")
assert result == "Hello!"
async def test_get_conversation_namespace(self, mock_state):
"""Test conversation namespace access."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Conversation.messages", [{"role": "user", "text": "hi"}])
result = state.get("Conversation.messages")
assert result == [{"role": "user", "text": "hi"}]
async def test_get_custom_namespace(self, mock_state):
"""Test custom namespace access."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Set via direct state data manipulation to create custom namespace
state_data = state.get_state_data()
state_data["Custom"] = {"myns": {"value": 42}}
state.set_state_data(state_data)
result = state.get("myns.value")
assert result == 42
async def test_get_object_attribute_access(self, mock_state):
"""Test accessing object attributes via hasattr/getattr path."""
@dataclass
class MockObj:
name: str
value: int
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.obj", MockObj(name="test", value=99))
result = state.get("Local.obj.name")
assert result == "test"
async def test_set_with_local_namespace(self, mock_state):
"""Test Local. namespace mapping for set."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.myVar", "value123")
result = state.get("Local.myVar")
assert result == "value123"
async def test_set_with_system_namespace(self, mock_state):
"""Test System. namespace mapping for set."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("System.ConversationId", "conv-456")
result = state.get("System.ConversationId")
assert result == "conv-456"
async def test_set_workflow_outputs(self, mock_state):
"""Test setting workflow outputs."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Workflow.Outputs.result", "done")
outputs = state.get("Workflow.Outputs")
assert outputs.get("result") == "done"
async def test_set_workflow_inputs_raises_error(self, mock_state):
"""Test that setting Workflow.Inputs raises an error (read-only)."""
state = DeclarativeWorkflowState(mock_state)
state.initialize({"query": "test"})
with pytest.raises(ValueError, match="Cannot modify Workflow.Inputs"):
state.set("Workflow.Inputs.query", "modified")
async def test_set_workflow_directly_raises_error(self, mock_state):
"""Test that setting 'Workflow' directly raises an error."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
with pytest.raises(ValueError, match="Cannot set 'Workflow' directly"):
state.set("Workflow", {})
async def test_set_unknown_workflow_subnamespace_raises_error(self, mock_state):
"""Test unknown workflow sub-namespace raises error."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
with pytest.raises(ValueError, match="Unknown Workflow namespace"):
state.set("Workflow.unknown.field", "value")
async def test_set_creates_custom_namespace(self, mock_state):
"""Test setting value in custom namespace creates it."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("myns.field.nested", "value")
result = state.get("myns.field.nested")
assert result == "value"
async def test_set_cannot_replace_entire_namespace(self, mock_state):
"""Test that replacing entire namespace raises error."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
with pytest.raises(ValueError, match="Cannot replace entire namespace"):
state.set("turn", {})
async def test_append_to_nonlist_raises_error(self, mock_state):
"""Test appending to non-list raises error."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.scalar", "string value")
with pytest.raises(ValueError, match="Cannot append to non-list"):
state.append("Local.scalar", "new item")
async def test_eval_empty_string(self, mock_state):
"""Test evaluating empty string returns as-is."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
result = state.eval("")
assert result == ""
async def test_eval_non_string_returns_as_is(self, mock_state):
"""Test evaluating non-string returns as-is."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Cast to Any to test the runtime behavior with non-string inputs
result = state.eval(42) # type: ignore[arg-type]
assert result == 42
result = state.eval([1, 2, 3]) # type: ignore[arg-type]
assert result == [1, 2, 3]
@_requires_powerfx
async def test_eval_simple_and_operator(self, mock_state):
"""Test simple And operator evaluation."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.a", True)
state.set("Local.b", False)
result = state.eval("=Local.a And Local.b")
assert result is False
state.set("Local.b", True)
result = state.eval("=Local.a And Local.b")
assert result is True
@_requires_powerfx
async def test_eval_simple_or_operator(self, mock_state):
"""Test simple Or operator evaluation."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.a", True)
state.set("Local.b", False)
result = state.eval("=Local.a Or Local.b")
assert result is True
state.set("Local.a", False)
result = state.eval("=Local.a Or Local.b")
assert result is False
@_requires_powerfx
async def test_eval_negation(self, mock_state):
"""Test negation (!) evaluation."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.flag", True)
result = state.eval("=!Local.flag")
assert result is False
@_requires_powerfx
async def test_eval_not_function(self, mock_state):
"""Test Not() function evaluation."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.flag", True)
result = state.eval("=Not(Local.flag)")
assert result is False
@_requires_powerfx
async def test_eval_comparison_operators(self, mock_state):
"""Test comparison operators."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.x", 5)
state.set("Local.y", 10)
assert state.eval("=Local.x < Local.y") is True
assert state.eval("=Local.x > Local.y") is False
assert state.eval("=Local.x <= 5") is True
assert state.eval("=Local.x >= 5") is True
assert state.eval("=Local.x <> Local.y") is True
assert state.eval("=Local.x = 5") is True
@_requires_powerfx
async def test_eval_arithmetic_operators(self, mock_state):
"""Test arithmetic operators."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.x", 10)
state.set("Local.y", 3)
assert state.eval("=Local.x + Local.y") == 13
assert state.eval("=Local.x - Local.y") == 7
assert state.eval("=Local.x * Local.y") == 30
assert state.eval("=Local.x / Local.y") == pytest.approx(3.333, rel=0.01)
@_requires_powerfx
async def test_eval_string_literal(self, mock_state):
"""Test string literal evaluation."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
result = state.eval('="hello world"')
assert result == "hello world"
@_requires_powerfx
async def test_eval_float_literal(self, mock_state):
"""Test float literal evaluation."""
from decimal import Decimal
state = DeclarativeWorkflowState(mock_state)
state.initialize()
result = state.eval("=3.14")
# Accepts both float (Python fallback) and Decimal (pythonnet/PowerFx)
assert result == 3.14 or result == Decimal("3.14")
@_requires_powerfx
async def test_eval_variable_reference_with_namespace_mappings(self, mock_state):
"""Test variable reference with PowerFx symbols."""
state = DeclarativeWorkflowState(mock_state)
state.initialize({"query": "test"})
state.set("Local.myVar", "localValue")
# Test Local namespace (PowerFx symbol)
result = state.eval("=Local.myVar")
assert result == "localValue"
# Test Workflow.Inputs (PowerFx symbol)
result = state.eval("=Workflow.Inputs.query")
assert result == "test"
@_requires_powerfx
async def test_eval_if_expression_with_dict(self, mock_state):
"""Test eval_if_expression recursively evaluates dicts."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.name", "Alice")
result = state.eval_if_expression({"greeting": "=Local.name", "static": "hello"})
assert result == {"greeting": "Alice", "static": "hello"}
@_requires_powerfx
async def test_eval_if_expression_with_list(self, mock_state):
"""Test eval_if_expression recursively evaluates lists."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.x", 10)
result = state.eval_if_expression(["=Local.x", "static", "=5"])
assert result == [10, "static", 5]
async def test_interpolate_string_with_local_vars(self, mock_state):
"""Test string interpolation with Local. variables."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.TicketId", "TKT-001")
state.set("Local.TeamName", "Support")
result = state.interpolate_string("Created ticket #{Local.TicketId} for team {Local.TeamName}")
assert result == "Created ticket #TKT-001 for team Support"
async def test_interpolate_string_with_system_vars(self, mock_state):
"""Test string interpolation with System. variables."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("System.ConversationId", "conv-789")
result = state.interpolate_string("Conversation: {System.ConversationId}")
assert result == "Conversation: conv-789"
async def test_interpolate_string_with_none_value(self, mock_state):
"""Test string interpolation with None value returns empty string."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
result = state.interpolate_string("Value: {Local.Missing}")
assert result == "Value: "
# ---------------------------------------------------------------------------
# Basic Executors Tests - Covering _executors_basic.py gaps
# ---------------------------------------------------------------------------
class TestBasicExecutorsCoverage:
"""Tests for basic executors covering uncovered code paths."""
async def test_set_variable_executor(self, mock_context, mock_state):
"""Test SetVariableExecutor (distinct from SetValueExecutor)."""
from agent_framework_declarative._workflows._executors_basic import (
SetVariableExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "SetVariable",
"variable": "Local.result",
"value": "test value",
}
executor = SetVariableExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
result = state.get("Local.result")
assert result == "test value"
async def test_set_variable_executor_with_nested_variable(self, mock_context, mock_state):
"""Test SetVariableExecutor with nested variable object."""
from agent_framework_declarative._workflows._executors_basic import (
SetVariableExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "SetVariable",
"variable": {"path": "Local.nested"},
"value": 42,
}
executor = SetVariableExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
result = state.get("Local.nested")
assert result == 42
@_requires_powerfx
async def test_set_text_variable_executor(self, mock_context, mock_state):
"""Test SetTextVariableExecutor."""
from agent_framework_declarative._workflows._executors_basic import (
SetTextVariableExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.name", "World")
action_def = {
"kind": "SetTextVariable",
"variable": "Local.greeting",
"text": "=Local.name",
}
executor = SetTextVariableExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
result = state.get("Local.greeting")
assert result == "World"
async def test_set_multiple_variables_executor(self, mock_context, mock_state):
"""Test SetMultipleVariablesExecutor."""
from agent_framework_declarative._workflows._executors_basic import (
SetMultipleVariablesExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "SetMultipleVariables",
"assignments": [
{"variable": "Local.a", "value": 1},
{"variable": {"path": "Local.b"}, "value": 2},
{"path": "Local.c", "value": 3},
],
}
executor = SetMultipleVariablesExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
assert state.get("Local.a") == 1
assert state.get("Local.b") == 2
assert state.get("Local.c") == 3
async def test_reset_variable_executor(self, mock_context, mock_state):
"""Test ResetVariableExecutor."""
from agent_framework_declarative._workflows._executors_basic import (
ResetVariableExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.myVar", "some value")
action_def = {
"kind": "ResetVariable",
"variable": "Local.myVar",
}
executor = ResetVariableExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
result = state.get("Local.myVar")
assert result is None
async def test_clear_all_variables_executor(self, mock_context, mock_state):
"""Test ClearAllVariablesExecutor."""
from agent_framework_declarative._workflows._executors_basic import (
ClearAllVariablesExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.a", 1)
state.set("Local.b", 2)
action_def = {"kind": "ClearAllVariables"}
executor = ClearAllVariablesExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
# Turn namespace should be cleared
assert state.get("Local.a") is None
assert state.get("Local.b") is None
async def test_send_activity_with_dict_activity(self, mock_context, mock_state):
"""Test SendActivityExecutor with dict activity containing text field."""
from agent_framework_declarative._workflows._executors_basic import (
SendActivityExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.name", "Alice")
action_def = {
"kind": "SendActivity",
"activity": {"text": "Hello, {Local.name}!"},
}
executor = SendActivityExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
mock_context.yield_output.assert_called_once_with("Hello, Alice!")
async def test_send_activity_with_string_activity(self, mock_context, mock_state):
"""Test SendActivityExecutor with string activity."""
from agent_framework_declarative._workflows._executors_basic import (
SendActivityExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "SendActivity",
"activity": "Plain text message",
}
executor = SendActivityExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
mock_context.yield_output.assert_called_once_with("Plain text message")
@_requires_powerfx
async def test_send_activity_with_expression(self, mock_context, mock_state):
"""Test SendActivityExecutor evaluates expressions."""
from agent_framework_declarative._workflows._executors_basic import (
SendActivityExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.msg", "Dynamic message")
action_def = {
"kind": "SendActivity",
"activity": "=Local.msg",
}
executor = SendActivityExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
mock_context.yield_output.assert_called_once_with("Dynamic message")
# ---------------------------------------------------------------------------
# Agent Executors Tests - Covering _executors_agents.py gaps
# ---------------------------------------------------------------------------
class TestAgentExecutorsCoverage:
"""Tests for agent executors covering uncovered code paths."""
async def test_normalize_variable_path_all_cases(self):
"""Test _normalize_variable_path with all namespace prefixes."""
from agent_framework_declarative._workflows._executors_agents import (
_normalize_variable_path,
)
# Local. -> Local. (unchanged)
assert _normalize_variable_path("Local.MyVar") == "Local.MyVar"
# System. -> System. (unchanged)
assert _normalize_variable_path("System.ConvId") == "System.ConvId"
# Workflow. -> Workflow. (unchanged)
assert _normalize_variable_path("Workflow.Outputs.result") == "Workflow.Outputs.result"
# Already has a namespace with dots - pass through
assert _normalize_variable_path("custom.existing") == "custom.existing"
# No namespace - default to Local.
assert _normalize_variable_path("simpleVar") == "Local.simpleVar"
async def test_agent_executor_get_agent_name_string(self, mock_context, mock_state):
"""Test agent name extraction from simple string config."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": "MyAgent",
}
executor = InvokeAzureAgentExecutor(action_def)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
name = executor._get_agent_name(state)
assert name == "MyAgent"
async def test_agent_executor_get_agent_name_dict(self, mock_context, mock_state):
"""Test agent name extraction from nested dict config."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": {"name": "NestedAgent"},
}
executor = InvokeAzureAgentExecutor(action_def)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
name = executor._get_agent_name(state)
assert name == "NestedAgent"
async def test_agent_executor_get_agent_name_legacy(self, mock_context, mock_state):
"""Test agent name extraction from agentName (legacy)."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agentName": "LegacyAgent",
}
executor = InvokeAzureAgentExecutor(action_def)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
name = executor._get_agent_name(state)
assert name == "LegacyAgent"
async def test_agent_executor_get_agent_name_string_expression(self, mock_context, mock_state):
"""Test agent name extraction from simple string expression."""
from unittest.mock import patch
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": "=Local.SelectedAgent",
}
executor = InvokeAzureAgentExecutor(action_def)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
with patch.object(state, "eval_if_expression", return_value="DynamicAgent"):
name = executor._get_agent_name(state)
assert name == "DynamicAgent"
async def test_agent_executor_get_agent_name_dict_expression(self, mock_context, mock_state):
"""Test agent name extraction from nested dict with expression."""
from unittest.mock import patch
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": {"name": "=Local.ManagerResult.next_speaker.answer"},
}
executor = InvokeAzureAgentExecutor(action_def)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
with patch.object(state, "eval_if_expression", return_value="WeatherAgent"):
name = executor._get_agent_name(state)
assert name == "WeatherAgent"
async def test_agent_executor_get_agent_name_legacy_expression(self, mock_context, mock_state):
"""Test agent name extraction from legacy agentName with expression."""
from unittest.mock import patch
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agentName": "=Local.NextAgent",
}
executor = InvokeAzureAgentExecutor(action_def)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
with patch.object(state, "eval_if_expression", return_value="ResolvedAgent"):
name = executor._get_agent_name(state)
assert name == "ResolvedAgent"
async def test_agent_executor_get_agent_name_expression_returns_none(self, mock_context, mock_state):
"""Test agent name returns None when expression evaluates to None."""
from unittest.mock import patch
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": {"name": "=Local.UndefinedVar"},
}
executor = InvokeAzureAgentExecutor(action_def)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
with patch.object(state, "eval_if_expression", return_value=None):
name = executor._get_agent_name(state)
assert name is None
async def test_agent_executor_get_input_config_simple(self, mock_context, mock_state):
"""Test input config parsing with simple non-dict input."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"input": "simple string input",
}
executor = InvokeAzureAgentExecutor(action_def)
args, messages, external_loop, max_iterations = executor._get_input_config()
assert args == {}
assert messages == "simple string input"
assert external_loop is None
assert max_iterations == 100 # Default
async def test_agent_executor_get_input_config_full(self, mock_context, mock_state):
"""Test input config parsing with full structured input."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"input": {
"arguments": {"param1": "=Local.value"},
"messages": "=conversation.messages",
"externalLoop": {"when": "=Local.needsMore", "maxIterations": 50},
},
}
executor = InvokeAzureAgentExecutor(action_def)
args, messages, external_loop, max_iterations = executor._get_input_config()
assert args == {"param1": "=Local.value"}
assert messages == "=conversation.messages"
assert external_loop == "=Local.needsMore"
assert max_iterations == 50
async def test_agent_executor_get_output_config_simple(self, mock_context, mock_state):
"""Test output config parsing with simple resultProperty."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"resultProperty": "Local.result",
}
executor = InvokeAzureAgentExecutor(action_def)
messages_var, response_obj, result_prop, auto_send = executor._get_output_config()
assert messages_var is None
assert response_obj is None
assert result_prop == "Local.result"
assert auto_send is True
async def test_agent_executor_get_output_config_full(self, mock_context, mock_state):
"""Test output config parsing with full structured output."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"output": {
"messages": "Local.ResponseMessages",
"responseObject": "Local.ParsedResponse",
"property": "Local.result",
"autoSend": False,
},
}
executor = InvokeAzureAgentExecutor(action_def)
messages_var, response_obj, result_prop, auto_send = executor._get_output_config()
assert messages_var == "Local.ResponseMessages"
assert response_obj == "Local.ParsedResponse"
assert result_prop == "Local.result"
assert auto_send is False
@_requires_powerfx
async def test_agent_executor_build_input_text_from_string_messages(self, mock_context, mock_state):
"""Test _build_input_text with string messages expression."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.userInput", "Hello agent!")
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
executor = InvokeAzureAgentExecutor(action_def)
input_text = await executor._build_input_text(state, {}, "=Local.userInput")
assert input_text == "Hello agent!"
@_requires_powerfx
async def test_agent_executor_build_input_text_from_message_list(self, mock_context, mock_state):
"""Test _build_input_text extracts text from message list."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set(
"Conversation.messages",
[
{"role": "user", "content": "First"},
{"role": "assistant", "content": "Response"},
{"role": "user", "content": "Last message"},
],
)
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
executor = InvokeAzureAgentExecutor(action_def)
input_text = await executor._build_input_text(state, {}, "=Conversation.messages")
assert input_text == "Last message"
@_requires_powerfx
async def test_agent_executor_build_input_text_from_message_with_text_attr(self, mock_context, mock_state):
"""Test _build_input_text extracts text from message with text attribute."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.messages", [{"text": "From attribute"}])
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
executor = InvokeAzureAgentExecutor(action_def)
input_text = await executor._build_input_text(state, {}, "=Local.messages")
assert input_text == "From attribute"
async def test_agent_executor_build_input_text_fallback_chain(self, mock_context, mock_state):
"""Test _build_input_text fallback chain when no messages expression."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize({"query": "workflow input"})
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
executor = InvokeAzureAgentExecutor(action_def)
# No messages_expr, so falls back to workflow.inputs
input_text = await executor._build_input_text(state, {}, None)
assert input_text == "workflow input"
async def test_agent_executor_build_input_text_from_system_last_message(self, mock_context, mock_state):
"""Test _build_input_text falls back to system.LastMessage.Text."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("System.LastMessage", {"Text": "From last message"})
action_def = {"kind": "InvokeAzureAgent", "agent": "Test"}
executor = InvokeAzureAgentExecutor(action_def)
input_text = await executor._build_input_text(state, {}, None)
assert input_text == "From last message"
async def test_agent_executor_missing_agent_name(self, mock_context, mock_state):
"""Test agent executor with missing agent name logs warning."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {"kind": "InvokeAzureAgent"} # No agent specified
executor = InvokeAzureAgentExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
# Should complete without error
mock_context.send_message.assert_called_once()
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, ActionComplete)
async def test_agent_executor_with_working_agent(self, mock_context, mock_state):
"""Test agent executor with a working mock agent."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
# Create mock agent
@dataclass
class MockResult:
text: str
messages: list[Any]
mock_agent = MagicMock()
mock_agent.run = AsyncMock(return_value=MockResult(text="Agent response", messages=[]))
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.input", "User query")
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"resultProperty": "Local.result",
}
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
await executor.handle_action(ActionTrigger(), mock_context)
# Verify agent was called
mock_agent.run.assert_called_once()
# Verify result was stored
result = state.get("Local.result")
assert result == "Agent response"
# Verify agent state was set
assert state.get("Agent.response") == "Agent response"
assert state.get("Agent.name") == "TestAgent"
assert state.get("Agent.text") == "Agent response"
async def test_agent_executor_with_agent_from_registry(self, mock_context, mock_state):
"""Test agent executor retrieves agent from shared state registry."""
from agent_framework_declarative._workflows._executors_agents import (
AGENT_REGISTRY_KEY,
InvokeAzureAgentExecutor,
)
# Create mock agent
@dataclass
class MockResult:
text: str
messages: list[Any]
mock_agent = MagicMock()
mock_agent.run = AsyncMock(return_value=MockResult(text="Registry agent", messages=[]))
# Store in registry
mock_state._data[AGENT_REGISTRY_KEY] = {"RegistryAgent": mock_agent}
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.input", "Query")
action_def = {
"kind": "InvokeAzureAgent",
"agent": "RegistryAgent",
}
executor = InvokeAzureAgentExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
mock_agent.run.assert_called_once()
async def test_agent_executor_parses_json_response(self, mock_context, mock_state):
"""Test agent executor parses JSON response into responseObject."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
@dataclass
class MockResult:
text: str
messages: list[Any]
mock_agent = MagicMock()
mock_agent.run = AsyncMock(return_value=MockResult(text='{"status": "ok", "count": 42}', messages=[]))
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.input", "Query")
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"output": {
"responseObject": "Local.Parsed",
},
}
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
await executor.handle_action(ActionTrigger(), mock_context)
parsed = state.get("Local.Parsed")
assert parsed == {"status": "ok", "count": 42}
# ---------------------------------------------------------------------------
# Control Flow Executors Tests - Additional coverage
# ---------------------------------------------------------------------------
class TestControlFlowCoverage:
"""Tests for control flow executors covering uncovered code paths."""
@_requires_powerfx
async def test_foreach_with_source(self, mock_context, mock_state):
"""Test ForeachInitExecutor with the 'source' field."""
from agent_framework_declarative._workflows._executors_control_flow import (
ForeachInitExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.data", [10, 20, 30])
action_def = {
"kind": "Foreach",
"source": "=Local.data",
"itemName": "item",
"indexName": "idx",
}
executor = ForeachInitExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, LoopIterationResult)
assert msg.has_next is True
assert msg.current_item == 10
assert msg.current_index == 0
async def test_foreach_next_continues_iteration(self, mock_context, mock_state):
"""Test ForeachNextExecutor continues to next item."""
from agent_framework_declarative._workflows._executors_control_flow import (
LOOP_STATE_KEY,
ForeachNextExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.data", ["a", "b", "c"])
# Set up loop state as ForeachInitExecutor would
state_data = state.get_state_data()
state_data[LOOP_STATE_KEY] = {
"foreach_init": {
"items": ["a", "b", "c"],
"index": 0,
"length": 3,
}
}
state.set_state_data(state_data)
action_def = {
"kind": "Foreach",
"source": "=Local.data",
"itemName": "item",
}
executor = ForeachNextExecutor(action_def, init_executor_id="foreach_init")
await executor.handle_action(LoopIterationResult(has_next=True), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, LoopIterationResult)
assert msg.current_index == 1
assert msg.current_item == "b"
async def test_join_executor_accepts_condition_result(self, mock_context, mock_state):
"""Test JoinExecutor accepts ConditionResult as trigger."""
from agent_framework_declarative._workflows._executors_control_flow import (
JoinExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {"kind": "_Join"}
executor = JoinExecutor(action_def)
# Trigger with ConditionResult
await executor.handle_action(ConditionResult(matched=True, branch_index=0), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, ActionComplete)
async def test_break_loop_executor(self, mock_context, mock_state):
"""Test BreakLoopExecutor emits LoopControl."""
from agent_framework_declarative._workflows._executors_control_flow import (
BreakLoopExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {"kind": "BreakLoop"}
executor = BreakLoopExecutor(action_def, loop_next_executor_id="loop_next")
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, LoopControl)
assert msg.action == "break"
async def test_continue_loop_executor(self, mock_context, mock_state):
"""Test ContinueLoopExecutor emits LoopControl."""
from agent_framework_declarative._workflows._executors_control_flow import (
ContinueLoopExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {"kind": "ContinueLoop"}
executor = ContinueLoopExecutor(action_def, loop_next_executor_id="loop_next")
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, LoopControl)
assert msg.action == "continue"
async def test_foreach_next_no_loop_state(self, mock_context, mock_state):
"""Test ForeachNextExecutor with missing loop state."""
from agent_framework_declarative._workflows._executors_control_flow import (
ForeachNextExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "Foreach",
"source": "=Local.data",
"itemName": "item",
}
executor = ForeachNextExecutor(action_def, init_executor_id="missing_loop")
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, LoopIterationResult)
assert msg.has_next is False
async def test_foreach_next_loop_complete(self, mock_context, mock_state):
"""Test ForeachNextExecutor when loop is complete."""
from agent_framework_declarative._workflows._executors_control_flow import (
LOOP_STATE_KEY,
ForeachNextExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Set up loop state at last item
state_data = state.get_state_data()
state_data[LOOP_STATE_KEY] = {
"loop_id": {
"items": ["a", "b"],
"index": 1, # Already at last item
"length": 2,
}
}
state.set_state_data(state_data)
action_def = {
"kind": "Foreach",
"source": "=Local.data",
"itemName": "item",
}
executor = ForeachNextExecutor(action_def, init_executor_id="loop_id")
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, LoopIterationResult)
assert msg.has_next is False
async def test_foreach_next_handle_break_control(self, mock_context, mock_state):
"""Test ForeachNextExecutor handles break LoopControl."""
from agent_framework_declarative._workflows._executors_control_flow import (
LOOP_STATE_KEY,
ForeachNextExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Set up loop state
state_data = state.get_state_data()
state_data[LOOP_STATE_KEY] = {
"loop_id": {
"items": ["a", "b", "c"],
"index": 0,
"length": 3,
}
}
state.set_state_data(state_data)
action_def = {
"kind": "Foreach",
"source": "=Local.data",
"itemName": "item",
}
executor = ForeachNextExecutor(action_def, init_executor_id="loop_id")
await executor.handle_loop_control(LoopControl(action="break"), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, LoopIterationResult)
assert msg.has_next is False
async def test_foreach_next_handle_continue_control(self, mock_context, mock_state):
"""Test ForeachNextExecutor handles continue LoopControl."""
from agent_framework_declarative._workflows._executors_control_flow import (
LOOP_STATE_KEY,
ForeachNextExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Set up loop state
state_data = state.get_state_data()
state_data[LOOP_STATE_KEY] = {
"loop_id": {
"items": ["a", "b", "c"],
"index": 0,
"length": 3,
}
}
state.set_state_data(state_data)
action_def = {
"kind": "Foreach",
"source": "=Local.data",
"itemName": "item",
}
executor = ForeachNextExecutor(action_def, init_executor_id="loop_id")
await executor.handle_loop_control(LoopControl(action="continue"), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, LoopIterationResult)
assert msg.has_next is True
assert msg.current_index == 1
async def test_end_workflow_executor(self, mock_context, mock_state):
"""Test EndWorkflowExecutor does not send continuation."""
from agent_framework_declarative._workflows._executors_control_flow import (
EndWorkflowExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {"kind": "EndWorkflow"}
executor = EndWorkflowExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
# Should NOT send any message
mock_context.send_message.assert_not_called()
async def test_end_conversation_executor(self, mock_context, mock_state):
"""Test EndConversationExecutor does not send continuation."""
from agent_framework_declarative._workflows._executors_control_flow import (
EndConversationExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {"kind": "EndConversation"}
executor = EndConversationExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
# Should NOT send any message
mock_context.send_message.assert_not_called()
@_requires_powerfx
async def test_condition_group_evaluator_first_match(self, mock_context, mock_state):
"""Test ConditionGroupEvaluatorExecutor returns first match."""
from agent_framework_declarative._workflows._executors_control_flow import (
ConditionGroupEvaluatorExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.x", 10)
action_def = {"kind": "ConditionGroup"}
conditions = [
{"condition": "=Local.x > 20"},
{"condition": "=Local.x > 5"},
{"condition": "=Local.x > 0"},
]
executor = ConditionGroupEvaluatorExecutor(action_def, conditions=conditions)
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, ConditionResult)
assert msg.matched is True
assert msg.branch_index == 1 # Second condition (x > 5) is first match
@_requires_powerfx
async def test_condition_group_evaluator_no_match(self, mock_context, mock_state):
"""Test ConditionGroupEvaluatorExecutor with no matches."""
from agent_framework_declarative._workflows._executors_control_flow import (
ConditionGroupEvaluatorExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.x", 0)
action_def = {"kind": "ConditionGroup"}
conditions = [
{"condition": "=Local.x > 10"},
{"condition": "=Local.x > 5"},
]
executor = ConditionGroupEvaluatorExecutor(action_def, conditions=conditions)
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, ConditionResult)
assert msg.matched is False
assert msg.branch_index == -1
@_requires_powerfx
async def test_condition_group_evaluator_boolean_true_condition(self, mock_context, mock_state):
"""Test ConditionGroupEvaluatorExecutor with boolean True condition."""
from agent_framework_declarative._workflows._executors_control_flow import (
ConditionGroupEvaluatorExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {"kind": "ConditionGroup"}
conditions = [
{"condition": False}, # Should skip
{"condition": True}, # Should match
]
executor = ConditionGroupEvaluatorExecutor(action_def, conditions=conditions)
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, ConditionResult)
assert msg.matched is True
assert msg.branch_index == 1
@_requires_powerfx
async def test_if_condition_evaluator_true(self, mock_context, mock_state):
"""Test IfConditionEvaluatorExecutor with true condition."""
from agent_framework_declarative._workflows._executors_control_flow import (
IfConditionEvaluatorExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.flag", True)
action_def = {"kind": "If"}
executor = IfConditionEvaluatorExecutor(action_def, condition_expr="=Local.flag")
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, ConditionResult)
assert msg.matched is True
assert msg.branch_index == 0 # Then branch
@_requires_powerfx
async def test_if_condition_evaluator_false(self, mock_context, mock_state):
"""Test IfConditionEvaluatorExecutor with false condition."""
from agent_framework_declarative._workflows._executors_control_flow import (
IfConditionEvaluatorExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.flag", False)
action_def = {"kind": "If"}
executor = IfConditionEvaluatorExecutor(action_def, condition_expr="=Local.flag")
await executor.handle_action(ActionTrigger(), mock_context)
msg = mock_context.send_message.call_args[0][0]
assert isinstance(msg, ConditionResult)
assert msg.matched is False
assert msg.branch_index == -1 # Else branch
# ---------------------------------------------------------------------------
# Declarative Action Executor Base Tests
# ---------------------------------------------------------------------------
class TestDeclarativeActionExecutorBase:
"""Tests for DeclarativeActionExecutor base class."""
async def test_ensure_state_initialized_with_dict_input(self, mock_context, mock_state):
"""Test _ensure_state_initialized with dict input."""
from agent_framework_declarative._workflows._executors_basic import (
SetValueExecutor,
)
action_def = {"kind": "SetValue", "path": "Local.x", "value": 1}
executor = SetValueExecutor(action_def)
# Trigger with dict - should initialize state with it
await executor.handle_action({"custom": "input"}, mock_context)
# State should have been initialized with the dict
state = DeclarativeWorkflowState(mock_state)
inputs = state.get("Workflow.Inputs")
assert inputs == {"custom": "input"}
async def test_ensure_state_initialized_with_string_input(self, mock_context, mock_state):
"""Test _ensure_state_initialized with string input."""
from agent_framework_declarative._workflows._executors_basic import (
SetValueExecutor,
)
action_def = {"kind": "SetValue", "path": "Local.x", "value": 1}
executor = SetValueExecutor(action_def)
# Trigger with string - should wrap in {"input": ...}
await executor.handle_action("string trigger", mock_context)
state = DeclarativeWorkflowState(mock_state)
inputs = state.get("Workflow.Inputs")
assert inputs == {"input": "string trigger"}
async def test_ensure_state_initialized_with_custom_object(self, mock_context, mock_state):
"""Test _ensure_state_initialized with custom object converts to string."""
from agent_framework_declarative._workflows._executors_basic import (
SetValueExecutor,
)
class CustomObj:
def __str__(self):
return "custom string"
action_def = {"kind": "SetValue", "path": "Local.x", "value": 1}
executor = SetValueExecutor(action_def)
await executor.handle_action(CustomObj(), mock_context)
state = DeclarativeWorkflowState(mock_state)
inputs = state.get("Workflow.Inputs")
assert inputs == {"input": "custom string"}
async def test_executor_display_name_property(self, mock_context, mock_state):
"""Test executor display_name property."""
from agent_framework_declarative._workflows._executors_basic import (
SetValueExecutor,
)
action_def = {
"kind": "SetValue",
"displayName": "My Custom Action",
"path": "Local.x",
"value": 1,
}
executor = SetValueExecutor(action_def)
assert executor.display_name == "My Custom Action"
async def test_executor_action_def_property(self, mock_context, mock_state):
"""Test executor action_def property."""
from agent_framework_declarative._workflows._executors_basic import (
SetValueExecutor,
)
action_def = {"kind": "SetValue", "path": "Local.x", "value": 1}
executor = SetValueExecutor(action_def)
assert executor.action_def == action_def
# ---------------------------------------------------------------------------
# Human Input Executors Tests - Covering _executors_external_input.py gaps
# ---------------------------------------------------------------------------
class TestHumanInputExecutorsCoverage:
"""Tests for human input executors covering uncovered code paths."""
async def test_request_external_input_executor(self, mock_context, mock_state):
"""Test RequestExternalInputExecutor."""
from agent_framework_declarative._workflows._executors_external_input import (
ExternalInputRequest,
RequestExternalInputExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "RequestExternalInput",
"requestType": "approval",
"prompt": {"text": "Please approve this request"},
"variable": "Local.approvalResult",
"timeout": 3600,
"requiredFields": ["approver", "notes"],
"metadata": {"priority": "high"},
}
executor = RequestExternalInputExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
mock_context.request_info.assert_called_once()
request = mock_context.request_info.call_args[0][0]
assert isinstance(request, ExternalInputRequest)
assert request.request_type == "approval"
assert request.message == "Please approve this request"
assert request.metadata["priority"] == "high"
assert request.metadata["required_fields"] == ["approver", "notes"]
assert request.metadata["timeout_seconds"] == 3600
async def test_question_executor_with_choices(self, mock_context, mock_state):
"""Test QuestionExecutor with choices as dicts and strings."""
from agent_framework_declarative._workflows._executors_external_input import (
ExternalInputRequest,
QuestionExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "Question",
"question": {"text": "Select an option:"},
"variable": "Local.selection",
"choices": [
{"value": "a", "label": "Option A"},
{"value": "b"}, # No label, should use value
"c", # String choice
],
"allowFreeText": False,
}
executor = QuestionExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
mock_context.request_info.assert_called_once()
request = mock_context.request_info.call_args[0][0]
assert isinstance(request, ExternalInputRequest)
assert request.request_type == "question"
choices = request.metadata["choices"]
assert len(choices) == 3
assert choices[0] == {"value": "a", "label": "Option A"}
assert choices[1] == {"value": "b", "label": "b"}
assert choices[2] == {"value": "c", "label": "c"}
assert request.metadata["allow_free_text"] is False
async def test_question_executor_reads_nested_question_text(self, mock_context, mock_state):
"""QuestionExecutor reads ``question.text``/``variable``/``default`` into the request."""
from agent_framework_declarative._workflows._executors_external_input import (
ExternalInputRequest,
QuestionExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "Question",
"question": {"text": "What is your name?"},
"variable": "Local.userName",
"default": "Guest",
}
executor = QuestionExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
mock_context.request_info.assert_called_once()
request = mock_context.request_info.call_args[0][0]
assert isinstance(request, ExternalInputRequest)
# Canonical text comes through as a plain string, not the stringified dict.
assert request.message == "What is your name?"
# Canonical `variable` overrides the legacy default of Local.answer.
assert request.metadata["output_property"] == "Local.userName"
assert request.metadata["default_value"] == "Guest"
async def test_question_executor_reads_top_level_alternates(self, mock_context, mock_state):
"""Top-level ``text``/``property``/``defaultValue`` are accepted as alternates."""
from agent_framework_declarative._workflows._executors_external_input import (
ExternalInputRequest,
QuestionExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "Question",
"text": "Legacy question",
"property": "Local.legacyAnswer",
"defaultValue": "legacy-default",
}
executor = QuestionExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
request = mock_context.request_info.call_args[0][0]
assert isinstance(request, ExternalInputRequest)
assert request.message == "Legacy question"
assert request.metadata["output_property"] == "Local.legacyAnswer"
assert request.metadata["default_value"] == "legacy-default"
async def test_request_external_input_reads_nested_prompt_text(self, mock_context, mock_state):
"""RequestExternalInputExecutor reads ``prompt.text``/``variable``/``default``."""
from agent_framework_declarative._workflows._executors_external_input import (
ExternalInputRequest,
RequestExternalInputExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "RequestExternalInput",
"prompt": {"text": "Please approve"},
"variable": "Local.approved",
"default": "pending",
}
executor = RequestExternalInputExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
request = mock_context.request_info.call_args[0][0]
assert isinstance(request, ExternalInputRequest)
assert request.message == "Please approve"
assert request.metadata["output_property"] == "Local.approved"
assert request.metadata["default_value"] == "pending"
async def test_request_external_input_reads_top_level_alternates(self, mock_context, mock_state):
"""Top-level ``message``/``property`` are accepted as alternates."""
from agent_framework_declarative._workflows._executors_external_input import (
ExternalInputRequest,
RequestExternalInputExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "RequestExternalInput",
"message": "Legacy message",
"property": "Local.legacyApproval",
}
executor = RequestExternalInputExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
request = mock_context.request_info.call_args[0][0]
assert isinstance(request, ExternalInputRequest)
assert request.message == "Legacy message"
assert request.metadata["output_property"] == "Local.legacyApproval"
# ---------------------------------------------------------------------------
# Additional Agent Executor Tests - External Loop Coverage
# ---------------------------------------------------------------------------
class TestAgentExternalLoopCoverage:
"""Tests for agent executor external loop handling."""
@_requires_powerfx
async def test_agent_executor_with_external_loop(self, mock_context, mock_state):
"""Test agent executor with external loop that triggers."""
from unittest.mock import patch
from agent_framework_declarative._workflows._executors_agents import (
AgentExternalInputRequest,
InvokeAzureAgentExecutor,
)
mock_agent = MagicMock()
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.input", "User query")
state.set("Local.needsMore", True) # Loop condition will be true
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"input": {
"externalLoop": {"when": "=Local.needsMore"},
},
}
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
# Mock the internal method to avoid storing Message objects in state
# (PowerFx cannot serialize Message)
with patch.object(
executor,
"_invoke_agent_and_store_results",
new=AsyncMock(return_value=("Need more info", [], [])),
):
await executor.handle_action(ActionTrigger(), mock_context)
# Should request external input via request_info
mock_context.request_info.assert_called_once()
request = mock_context.request_info.call_args[0][0]
assert isinstance(request, AgentExternalInputRequest)
assert request.agent_name == "TestAgent"
async def test_agent_executor_agent_error_handling(self, mock_context, mock_state):
"""Test agent executor raises AgentInvalidResponseException on failure."""
from agent_framework.exceptions import AgentInvalidResponseException
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
mock_agent = MagicMock()
mock_agent.run = AsyncMock(side_effect=RuntimeError("Agent failed"))
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.input", "Query")
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"resultProperty": "Local.result",
}
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
with pytest.raises(AgentInvalidResponseException) as exc_info:
await executor.handle_action(ActionTrigger(), mock_context)
assert "TestAgent" in str(exc_info.value)
assert "Agent failed" in str(exc_info.value)
# Should still store error in state before raising
error = state.get("Agent.error")
assert "Agent failed" in error
result = state.get("Local.result")
assert result == {"error": "Agent failed"}
async def test_agent_executor_string_result(self, mock_context, mock_state):
"""Test agent executor with agent that returns string directly."""
from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)
mock_agent = MagicMock()
mock_agent.run = AsyncMock(return_value="Direct string response")
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.input", "Query")
action_def = {
"kind": "InvokeAzureAgent",
"agent": "TestAgent",
"resultProperty": "Local.result",
"output": {"autoSend": True},
}
executor = InvokeAzureAgentExecutor(action_def, agents={"TestAgent": mock_agent})
await executor.handle_action(ActionTrigger(), mock_context)
# Should auto-send output
mock_context.yield_output.assert_called_with("Direct string response")
result = state.get("Local.result")
assert result == "Direct string response"
# ---------------------------------------------------------------------------
# PowerFx Functions Coverage
# ---------------------------------------------------------------------------
@_requires_powerfx
class TestPowerFxFunctionsCoverage:
"""Tests for PowerFx function evaluation coverage."""
async def test_eval_lower_upper_functions(self, mock_state):
"""Test Lower and Upper functions."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.text", "Hello World")
result = state.eval("=Lower(Local.text)")
assert result == "hello world"
result = state.eval("=Upper(Local.text)")
assert result == "HELLO WORLD"
async def test_eval_if_function(self, mock_state):
"""Test If function."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.flag", True)
result = state.eval('=If(Local.flag, "yes", "no")')
assert result == "yes"
state.set("Local.flag", False)
result = state.eval('=If(Local.flag, "yes", "no")')
assert result == "no"
async def test_eval_not_function(self, mock_state):
"""Test Not function."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.flag", True)
result = state.eval("=Not(Local.flag)")
assert result is False
async def test_eval_and_or_functions(self, mock_state):
"""Test And and Or functions."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.a", True)
state.set("Local.b", False)
result = state.eval("=And(Local.a, Local.b)")
assert result is False
result = state.eval("=Or(Local.a, Local.b)")
assert result is True
# ---------------------------------------------------------------------------
# Builder control flow tests - Covering Goto/Break/Continue creation
# ---------------------------------------------------------------------------
class TestBuilderControlFlowCreation:
"""Tests for Goto, Break, Continue executor creation in builder."""
def test_create_goto_reference(self):
"""Test creating a goto reference executor."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
# Create builder with minimal yaml definition
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
action_def = {
"kind": "GotoAction",
"target": "some_target_action",
"id": "goto_test",
}
executor = graph_builder._create_goto_reference(action_def, wb, None)
assert executor is not None
assert executor.id == "goto_test"
# Verify pending goto was recorded
assert len(graph_builder._pending_gotos) == 1
assert graph_builder._pending_gotos[0][1] == "some_target_action"
def test_create_goto_reference_auto_id(self):
"""Test creating a goto with auto-generated ID."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
action_def = {
"kind": "GotoAction",
"target": "target_action",
}
executor = graph_builder._create_goto_reference(action_def, wb, None)
assert executor is not None
assert "goto_target_action" in executor.id
def test_create_goto_reference_no_target(self):
"""Test creating a goto with no target returns None."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
action_def = {
"kind": "GotoAction",
# No target specified
}
executor = graph_builder._create_goto_reference(action_def, wb, None)
assert executor is None
def test_goto_invalid_target_raises_error(self):
"""Test that goto to non-existent target raises ValueError."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [
{"kind": "SendActivity", "id": "action1", "activity": {"text": "Hello"}},
{"kind": "GotoAction", "target": "non_existent_action"},
],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "non_existent_action" in str(exc_info.value)
assert "not found" in str(exc_info.value)
def test_create_break_executor(self):
"""Test creating a break executor within a loop context."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_control_flow import ForeachNextExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
# Create a mock loop_next executor
loop_next = ForeachNextExecutor(
{"kind": "Foreach", "source": "=Local.items"},
init_executor_id="foreach_init",
id="foreach_next",
)
wb._add_executor(loop_next)
parent_context = {"loop_next_executor": loop_next}
action_def = {
"kind": "BreakLoop",
"id": "break_test",
}
executor = graph_builder._create_break_executor(action_def, wb, parent_context)
assert executor is not None
assert executor.id == "break_test"
def test_create_break_executor_no_loop_context(self):
"""Test creating a break executor without loop context raises ValueError."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
action_def = {
"kind": "BreakLoop",
}
# No parent_context should raise ValueError
with pytest.raises(ValueError) as exc_info:
graph_builder._create_break_executor(action_def, wb, None)
assert "BreakLoop action can only be used inside a Foreach loop" in str(exc_info.value)
# Empty context should also raise ValueError
with pytest.raises(ValueError) as exc_info:
graph_builder._create_break_executor(action_def, wb, {})
assert "BreakLoop action can only be used inside a Foreach loop" in str(exc_info.value)
def test_create_continue_executor(self):
"""Test creating a continue executor within a loop context."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_control_flow import ForeachNextExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
# Create a mock loop_next executor
loop_next = ForeachNextExecutor(
{"kind": "Foreach", "source": "=Local.items"},
init_executor_id="foreach_init",
id="foreach_next",
)
wb._add_executor(loop_next)
parent_context = {"loop_next_executor": loop_next}
action_def = {
"kind": "ContinueLoop",
"id": "continue_test",
}
executor = graph_builder._create_continue_executor(action_def, wb, parent_context)
assert executor is not None
assert executor.id == "continue_test"
def test_create_continue_executor_no_loop_context(self):
"""Test creating a continue executor without loop context raises ValueError."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
action_def = {
"kind": "ContinueLoop",
}
# No parent_context should raise ValueError
with pytest.raises(ValueError) as exc_info:
graph_builder._create_continue_executor(action_def, wb, None)
assert "ContinueLoop action can only be used inside a Foreach loop" in str(exc_info.value)
class TestBuilderEdgeWiring:
"""Tests for builder edge wiring methods."""
def test_foreach_advance_edge_wired_from_last_body_action(self):
"""Advance edge must come from the last body action."""
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
yaml_def = {
"name": "foreach_seq",
"actions": [
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A", "B"]},
{
"kind": "Foreach",
"id": "loop",
"source": "=Local.items",
"itemName": "item",
"actions": [
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
{"kind": "SendActivity", "id": "step_2", "activity": {"text": "two"}},
{"kind": "SendActivity", "id": "step_3", "activity": {"text": "three"}},
],
},
],
}
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
edges = {(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges}
assert ("step_3", "loop_next") in edges
assert ("step_1", "loop_next") not in edges
assert ("step_2", "loop_next") not in edges
assert ("step_1", "step_2") in edges
assert ("step_2", "step_3") in edges
def test_foreach_advance_edge_skipped_for_terminator_body(self):
"""BreakLoop at end of body wires itself to loop_next; no duplicate edge."""
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
yaml_def = {
"name": "foreach_terminator",
"actions": [
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A"]},
{
"kind": "Foreach",
"id": "loop",
"source": "=Local.items",
"itemName": "item",
"actions": [
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
{"kind": "BreakLoop", "id": "stop"},
],
},
],
}
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
all_edges = [(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges]
assert all_edges.count(("stop", "loop_next")) == 1
assert ("step_1", "loop_next") not in all_edges
def test_foreach_advance_edge_with_if_as_last_body_action(self):
"""Trailing If in a Foreach body wires every branch exit to loop_next."""
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
yaml_def = {
"name": "foreach_if_last",
"actions": [
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A", "B"]},
{
"kind": "Foreach",
"id": "loop",
"source": "=Local.items",
"itemName": "item",
"actions": [
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
{
"kind": "If",
"id": "check",
"condition": '=Local.item = "A"',
"then": [
{"kind": "SendActivity", "id": "then_action", "activity": {"text": "then"}},
],
"else": [
{"kind": "SendActivity", "id": "else_action", "activity": {"text": "else"}},
],
},
],
},
],
}
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
edges = {(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges}
assert ("then_action", "loop_next") in edges
assert ("else_action", "loop_next") in edges
assert ("step_1", "loop_next") not in edges
def test_wire_to_target_with_if_structure(self):
"""Test wiring to an If structure routes to evaluator."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
# Create a mock source executor
source = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "test"}}, id="source")
wb._add_executor(source)
# Create a mock If structure with evaluator
class MockIfStructure:
_is_if_structure = True
def __init__(self):
self.evaluator = SendActivityExecutor(
{"kind": "SendActivity", "activity": {"text": "evaluator"}}, id="evaluator"
)
target = MockIfStructure()
wb._add_executor(target.evaluator)
# Wire should add edge to evaluator
graph_builder._wire_to_target(wb, source, target)
# Verify edge was added (would need to inspect workflow internals)
# For now, just verify no exception was raised
def test_wire_to_target_normal_executor(self):
"""Test wiring to a normal executor adds direct edge."""
from agent_framework import WorkflowBuilder
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
wb = WorkflowBuilder(start_executor=JoinExecutor({"kind": "Dummy"}, id="dummy"))
source = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "source"}}, id="source")
target = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "target"}}, id="target")
wb._add_executor(source)
wb._add_executor(target)
graph_builder._wire_to_target(wb, source, target)
# Verify edge creation (no exception = success)
def test_collect_all_exits_for_nested_structure(self):
"""Test collecting all exits from nested structures."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
# Create mock nested structure
exit1 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "exit1"}}, id="exit1")
exit2 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "exit2"}}, id="exit2")
class InnerStructure:
def __init__(self):
self.branch_exits = [exit1, exit2]
class OuterStructure:
def __init__(self):
self.branch_exits = [InnerStructure()]
outer = OuterStructure()
exits = graph_builder._collect_all_exits(outer)
assert len(exits) == 2
assert exit1 in exits
assert exit2 in exits
def test_collect_all_exits_for_simple_executor(self):
"""Test collecting exits from a simple executor."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
executor = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "test"}}, id="test")
exits = graph_builder._collect_all_exits(executor)
assert len(exits) == 1
assert executor in exits
def test_get_branch_exit_with_chain(self):
"""Test getting branch exit from a chain of executors."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
exec1 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "1"}}, id="e1")
exec2 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "2"}}, id="e2")
exec3 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "3"}}, id="e3")
# Simulate a chain by dynamically setting attribute
exec1._chain_executors = [exec1, exec2, exec3] # type: ignore[attr-defined]
exit_exec = graph_builder._get_branch_exit(exec1)
assert exit_exec == exec3
def test_get_branch_exit_none(self):
"""Test getting branch exit from None."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
exit_exec = graph_builder._get_branch_exit(None)
assert exit_exec is None
def test_get_branch_exit_returns_none_for_goto_terminator(self):
"""Test that _get_branch_exit returns None when branch ends with GotoAction.
GotoAction is a terminator that handles its own control flow (jumping to
the target action). It should NOT be returned as a branch exit, because
that would cause the parent ConditionGroup to wire it to the next
sequential action, creating a dual-edge where both the goto target and
the next action receive messages.
"""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
# GotoAction executor is a JoinExecutor with a GotoAction action_def
goto_executor = JoinExecutor(
{"kind": "GotoAction", "id": "goto_summary", "actionId": "invoke_summary"},
id="goto_summary",
)
# Simulate a single-action branch chain
goto_executor._chain_executors = [goto_executor] # type: ignore[attr-defined]
exit_exec = graph_builder._get_branch_exit(goto_executor)
assert exit_exec is None
def test_get_branch_exit_returns_none_for_end_workflow_terminator(self):
"""Test that _get_branch_exit returns None when branch ends with EndWorkflow."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
end_executor = JoinExecutor(
{"kind": "EndWorkflow", "id": "end"},
id="end",
)
end_executor._chain_executors = [end_executor] # type: ignore[attr-defined]
exit_exec = graph_builder._get_branch_exit(end_executor)
assert exit_exec is None
def test_get_branch_exit_returns_none_for_goto_in_chain(self):
"""Test that _get_branch_exit returns None when chain ends with GotoAction.
Even when a branch has multiple actions before the GotoAction,
the branch exit should be None because the last action is a terminator.
"""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
# A branch with: SendActivity -> GotoAction
activity = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "msg"}}, id="msg")
goto = JoinExecutor(
{"kind": "GotoAction", "id": "goto_target", "actionId": "some_target"},
id="goto_target",
)
activity._chain_executors = [activity, goto] # type: ignore[attr-defined]
exit_exec = graph_builder._get_branch_exit(activity)
assert exit_exec is None
def test_get_branch_exit_returns_executor_for_non_terminator(self):
"""Test that _get_branch_exit still returns the exit for non-terminator branches."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)
exec1 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "1"}}, id="e1")
exec2 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "2"}}, id="e2")
exec1._chain_executors = [exec1, exec2] # type: ignore[attr-defined]
exit_exec = graph_builder._get_branch_exit(exec1)
assert exit_exec == exec2
# ---------------------------------------------------------------------------
# Agent executor external loop response handler tests
# ---------------------------------------------------------------------------
class TestAgentExecutorExternalLoop:
"""Tests for InvokeAzureAgentExecutor external loop response handling."""
async def test_handle_external_input_response_no_state(self, mock_context, mock_state):
"""Test handling external input response when loop state not found."""
from agent_framework_declarative._workflows._executors_agents import (
AgentExternalInputRequest,
AgentExternalInputResponse,
InvokeAzureAgentExecutor,
)
executor = InvokeAzureAgentExecutor({"kind": "InvokeAzureAgent", "agent": "TestAgent"})
# No external loop state in mock_state
original_request = AgentExternalInputRequest(
request_id="req-1",
agent_name="TestAgent",
agent_response="Hello",
iteration=1,
)
response = AgentExternalInputResponse(user_input="hi there")
await executor.handle_external_input_response(original_request, response, mock_context)
# Should send ActionComplete due to missing state
mock_context.send_message.assert_called()
call_args = mock_context.send_message.call_args[0][0]
from agent_framework_declarative._workflows import ActionComplete
assert isinstance(call_args, ActionComplete)
async def test_handle_external_input_response_agent_not_found(self, mock_context, mock_state):
"""Test handling external input raises error when agent not found during resumption."""
from agent_framework.exceptions import AgentInvalidRequestException
from agent_framework_declarative._workflows._executors_agents import (
EXTERNAL_LOOP_STATE_KEY,
AgentExternalInputRequest,
AgentExternalInputResponse,
ExternalLoopState,
InvokeAzureAgentExecutor,
)
# Set up loop state with always true condition (literal)
loop_state = ExternalLoopState(
agent_name="NonExistentAgent",
iteration=1,
external_loop_when="true", # Literal true
messages_var=None,
response_obj_var=None,
result_property=None,
auto_send=True,
messages_path="Conversation.messages",
)
mock_state._data[EXTERNAL_LOOP_STATE_KEY] = loop_state
# Initialize declarative state with simple value
state = DeclarativeWorkflowState(mock_state)
state.initialize()
executor = InvokeAzureAgentExecutor({"kind": "InvokeAzureAgent", "agent": "NonExistentAgent"})
original_request = AgentExternalInputRequest(
request_id="req-1",
agent_name="NonExistentAgent",
agent_response="Hello",
iteration=1,
)
response = AgentExternalInputResponse(user_input="continue")
with pytest.raises(AgentInvalidRequestException) as exc_info:
await executor.handle_external_input_response(original_request, response, mock_context)
assert "NonExistentAgent" in str(exc_info.value)
assert "not found during loop resumption" in str(exc_info.value)
class TestBuilderValidation:
"""Tests for builder validation features (P1 fixes)."""
def test_duplicate_explicit_action_id_raises_error(self):
"""Test that duplicate explicit action IDs are detected."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [
{"id": "my_action", "kind": "SendActivity", "activity": {"text": "First"}},
{"id": "my_action", "kind": "SendActivity", "activity": {"text": "Second"}},
],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "Duplicate action ID 'my_action'" in str(exc_info.value)
def test_duplicate_id_in_nested_actions(self):
"""Test duplicate ID detection in nested If/Switch branches."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [
{
"kind": "If",
"condition": "=true",
"then": [{"id": "shared_id", "kind": "SendActivity", "activity": {"text": "Then"}}],
"else": [{"id": "shared_id", "kind": "SendActivity", "activity": {"text": "Else"}}],
}
],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "Duplicate action ID 'shared_id'" in str(exc_info.value)
def test_missing_required_field_sendactivity(self):
"""Test that missing required fields are detected."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [{"kind": "SendActivity"}], # Missing 'activity' field
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "SendActivity" in str(exc_info.value)
assert "missing required field" in str(exc_info.value)
assert "activity" in str(exc_info.value)
def test_missing_required_field_setvalue(self):
"""Test SetValue without path raises error."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [{"kind": "SetValue", "value": "test"}], # Missing 'path' field
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "SetValue" in str(exc_info.value)
assert "path" in str(exc_info.value)
def test_setvalue_accepts_alternate_variable_field(self):
"""Test SetValue accepts 'variable' as alternate to 'path'."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [{"kind": "SetValue", "variable": {"path": "Local.x"}, "value": "test"}],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
# Should not raise - 'variable' is accepted as alternate
workflow = builder.build()
assert workflow is not None
def test_missing_required_field_foreach(self):
"""Test Foreach without source raises error."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [{"kind": "Foreach", "actions": [{"kind": "SendActivity", "activity": {"text": "Hi"}}]}],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "Foreach" in str(exc_info.value)
assert "source" in str(exc_info.value)
def test_self_referencing_goto_raises_error(self):
"""Test that a goto referencing itself is detected."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [{"id": "loop", "kind": "GotoAction", "actionId": "loop"}],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "loop" in str(exc_info.value)
assert "self-referencing" in str(exc_info.value)
def test_validation_can_be_disabled(self):
"""Test that validation can be disabled for early schema/duplicate checks.
Note: Even with validation disabled, the underlying WorkflowBuilder may
still catch duplicates during graph construction. This flag disables
our upfront validation pass but not runtime checks.
"""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
# Test with missing required field - validation disabled should skip our check
yaml_def = {
"name": "test_workflow",
"actions": [{"kind": "SendActivity"}], # Missing 'activity' - normally caught by validation
}
# With validation disabled, our upfront check is skipped
builder = DeclarativeWorkflowBuilder(yaml_def, validate=False)
# The workflow may still fail for other reasons, but our validation pass is skipped
# In this case, it should succeed because SendActivityExecutor handles missing fields gracefully
workflow = builder.build()
assert workflow is not None
def test_validation_in_condition_group_branches(self):
"""Test validation catches issues in ConditionGroup branches."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [
{
"kind": "ConditionGroup",
"conditions": [
{
"condition": '=Local.choice = "a"',
"actions": [{"id": "dup", "kind": "SendActivity", "activity": {"text": "A"}}],
},
{
"condition": '=Local.choice = "b"',
"actions": [{"id": "dup", "kind": "SendActivity", "activity": {"text": "B"}}],
},
],
}
],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "Duplicate action ID 'dup'" in str(exc_info.value)
def test_validation_in_foreach_body(self):
"""Test validation catches issues in Foreach body."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
yaml_def = {
"name": "test_workflow",
"actions": [
{
"kind": "Foreach",
"source": "=Local.items",
"actions": [{"kind": "SendActivity"}], # Missing 'activity'
}
],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
with pytest.raises(ValueError) as exc_info:
builder.build()
assert "SendActivity" in str(exc_info.value)
assert "activity" in str(exc_info.value)
@_requires_powerfx
class TestExpressionEdgeCases:
"""Tests for expression evaluation edge cases."""
async def test_division_with_valid_values(self, mock_state):
"""Test normal division works correctly."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.x", 10)
state.set("Local.y", 4)
result = state.eval("=Local.x / Local.y")
assert result == 2.5
async def test_multiplication_normal(self, mock_state):
"""Test normal multiplication."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
state.set("Local.x", 6)
state.set("Local.y", 7)
result = state.eval("=Local.x * Local.y")
assert result == 42
@_requires_powerfx
class TestLongMessageTextHandling:
"""Tests for handling long MessageText results that exceed PowerFx limits."""
async def test_short_message_text_embedded_inline(self, mock_state):
"""Test that short MessageText results are embedded inline."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Store a short message
short_text = "Hello world"
state.set("Local.Messages", [{"text": short_text, "contents": [{"type": "text", "text": short_text}]}])
# Evaluate a formula with MessageText - should embed inline
result = state.eval("=Upper(MessageText(Local.Messages))")
assert result == "HELLO WORLD"
# No temp variable should be created for short strings
temp_var = state.get("Local._TempMessageText0")
assert temp_var is None
async def test_long_message_text_stored_in_temp_variable(self, mock_state):
"""Long MessageText results round-trip and the temp key is removed after eval."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Create a message longer than 500 characters
long_text = "A" * 600 # 600 characters exceeds the 500 char threshold
state.set("Local.Messages", [{"text": long_text, "contents": [{"type": "text", "text": long_text}]}])
# Evaluate a formula with MessageText
result = state.eval("=Upper(MessageText(Local.Messages))")
assert result == "A" * 600 # Upper on 'A' is still 'A'
local = state.get_state_data().get("Local", {})
remaining = sorted(k for k in local if k.startswith("_TempMessageText"))
assert not remaining, f"Temporary keys remain in Local: {remaining}"
async def test_find_with_long_message_text(self, mock_state):
"""Test Find function works with long MessageText stored in temp variable."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Create a long message with a keyword to find
long_text = "X" * 550 + "CONGRATULATIONS" + "Y" * 50
state.set("Local.Messages", [{"text": long_text, "contents": [{"type": "text", "text": long_text}]}])
# Test the pattern used in student_teacher workflow
result = state.eval('=!IsBlank(Find("CONGRATULATIONS", Upper(MessageText(Local.Messages))))')
assert result is True
async def test_find_without_keyword_in_long_text(self, mock_state):
"""Test Find returns blank when keyword not found in long text."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
# Long text without the keyword
long_text = "X" * 600
state.set("Local.Messages", [{"text": long_text, "contents": [{"type": "text", "text": long_text}]}])
result = state.eval('=!IsBlank(Find("CONGRATULATIONS", Upper(MessageText(Local.Messages))))')
assert result is False
class TestCreateConversationExecutor:
"""Tests for CreateConversationExecutor."""
async def test_basic_creation(self, mock_context, mock_state):
"""Test that a UUID is generated, stored at conversationId path, and conversation entry created."""
from agent_framework_declarative._workflows._executors_basic import (
CreateConversationExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "CreateConversation",
"conversationId": "Local.myConvId",
}
executor = CreateConversationExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
# A UUID should be stored at the requested path
conv_id = state.get("Local.myConvId")
assert conv_id is not None
assert isinstance(conv_id, str)
assert len(conv_id) == 36 # UUID format
# Conversation entry should exist in System.conversations
conversations = state.get("System.conversations")
assert conversations is not None
assert conv_id in conversations
assert conversations[conv_id]["id"] == conv_id
assert conversations[conv_id]["messages"] == []
async def test_no_conversation_id_param(self, mock_context, mock_state):
"""Test that conversation is still created even without a conversationId param."""
from agent_framework_declarative._workflows._executors_basic import (
CreateConversationExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def = {
"kind": "CreateConversation",
}
executor = CreateConversationExecutor(action_def)
await executor.handle_action(ActionTrigger(), mock_context)
# Conversation entry should still exist in System.conversations
# (initialize() seeds one default conversation, plus the one just created)
conversations = state.get("System.conversations")
assert conversations is not None
assert len(conversations) == 2
async def test_multiple_conversations(self, mock_context, mock_state):
"""Test creating multiple conversations produces distinct IDs."""
from agent_framework_declarative._workflows._executors_basic import (
CreateConversationExecutor,
)
state = DeclarativeWorkflowState(mock_state)
state.initialize()
action_def1 = {
"kind": "CreateConversation",
"conversationId": "Local.conv1",
}
action_def2 = {
"kind": "CreateConversation",
"conversationId": "Local.conv2",
}
executor1 = CreateConversationExecutor(action_def1)
await executor1.handle_action(ActionTrigger(), mock_context)
executor2 = CreateConversationExecutor(action_def2)
await executor2.handle_action(ActionTrigger(), mock_context)
conv1 = state.get("Local.conv1")
conv2 = state.get("Local.conv2")
assert conv1 != conv2
# initialize() seeds one default conversation, plus the two just created
conversations = state.get("System.conversations")
assert len(conversations) == 3
assert conv1 in conversations
assert conv2 in conversations
class TestDeclarativeWorkflowStateConversationIdInit:
"""Tests that DeclarativeWorkflowState.initialize() generates a real UUID for ConversationId."""
async def test_conversation_id_is_not_default(self, mock_state):
"""System.ConversationId should be a UUID, not 'default'."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
conv_id = state.get("System.ConversationId")
assert conv_id is not None
assert conv_id != "default"
# Validate it looks like a UUID
import uuid
uuid.UUID(conv_id) # Raises ValueError if not a valid UUID
async def test_conversations_dict_initialized(self, mock_state):
"""System.conversations should contain an entry matching ConversationId."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
conv_id = state.get("System.ConversationId")
conversations = state.get("System.conversations")
assert conversations is not None
assert conv_id in conversations
assert conversations[conv_id]["id"] == conv_id
assert conversations[conv_id]["messages"] == []
async def test_each_initialize_generates_unique_id(self, mock_state):
"""Each call to initialize() should produce a different ConversationId."""
state = DeclarativeWorkflowState(mock_state)
state.initialize()
id1 = state.get("System.ConversationId")
state.initialize()
id2 = state.get("System.ConversationId")
assert id1 != id2