# Copyright (c) Microsoft. All rights reserved. """Tests for the graph-based declarative workflow executors.""" from typing import Any from unittest.mock import AsyncMock, MagicMock import pytest try: import powerfx # noqa: F401 _powerfx_available = True except (ImportError, RuntimeError): _powerfx_available = False _requires_powerfx = pytest.mark.skipif(not _powerfx_available, reason="PowerFx engine not available") from agent_framework_declarative._workflows import ( # noqa: E402 ALL_ACTION_EXECUTORS, DECLARATIVE_STATE_KEY, ActionComplete, ActionTrigger, DeclarativeWorkflowBuilder, DeclarativeWorkflowState, ForeachInitExecutor, LoopIterationResult, SendActivityExecutor, SetValueExecutor, ) class TestDeclarativeWorkflowState: """Tests for DeclarativeWorkflowState.""" @pytest.fixture def mock_state(self): """Create a mock shared state with async get/set methods.""" mock_state = MagicMock() mock_state._data = {} def mock_get(key, default=None): return mock_state._data.get(key, default) def mock_set(key, value): mock_state._data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) return mock_state @pytest.mark.asyncio async def test_initialize_state(self, mock_state): """Test initializing the workflow state.""" state = DeclarativeWorkflowState(mock_state) state.initialize({"query": "test"}) # Verify state was set mock_state.set.assert_called_once() call_args = mock_state.set.call_args assert call_args[0][0] == DECLARATIVE_STATE_KEY state_data = call_args[0][1] assert state_data["Inputs"] == {"query": "test"} assert state_data["Outputs"] == {} assert state_data["Local"] == {} @pytest.mark.asyncio async def test_get_and_set_values(self, mock_state): """Test getting and setting values.""" state = DeclarativeWorkflowState(mock_state) state.initialize() # Set a turn value state.set("Local.counter", 5) # Get the value result = state.get("Local.counter") assert result == 5 @pytest.mark.asyncio async def test_get_inputs(self, mock_state): """Test getting workflow inputs.""" state = DeclarativeWorkflowState(mock_state) state.initialize({"name": "Alice", "age": 30}) # Get via path name = state.get("Workflow.Inputs.name") assert name == "Alice" # Get all inputs inputs = state.get("Workflow.Inputs") assert inputs == {"name": "Alice", "age": 30} @pytest.mark.asyncio async def test_append_value(self, mock_state): """Test appending values to a list.""" state = DeclarativeWorkflowState(mock_state) state.initialize() # Append to non-existent list creates it state.append("Local.items", "first") result = state.get("Local.items") assert result == ["first"] # Append to existing list state.append("Local.items", "second") result = state.get("Local.items") assert result == ["first", "second"] @_requires_powerfx @pytest.mark.asyncio async def test_eval_expression(self, mock_state): """Test evaluating expressions.""" state = DeclarativeWorkflowState(mock_state) state.initialize() # Non-expression returns as-is result = state.eval("plain text") assert result == "plain text" # Boolean literals result = state.eval("=true") assert result is True result = state.eval("=false") assert result is False # String literals result = state.eval('="hello"') assert result == "hello" # Numeric literals result = state.eval("=42") assert result == 42 class TestDeclarativeActionExecutor: """Tests for DeclarativeActionExecutor subclasses.""" @pytest.fixture def mock_context(self, mock_state): """Create a mock workflow context.""" ctx = MagicMock() ctx.state = mock_state ctx.send_message = AsyncMock() ctx.yield_output = AsyncMock() return ctx @pytest.fixture def mock_state(self): """Create a mock shared state.""" mock_state = MagicMock() mock_state._data = {} def mock_get(key, default=None): return mock_state._data.get(key, default) def mock_set(key, value): mock_state._data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) return mock_state @pytest.mark.asyncio async def test_set_value_executor(self, mock_context, mock_state): """Test SetValueExecutor.""" # Initialize state state = DeclarativeWorkflowState(mock_state) state.initialize() action_def = { "kind": "SetValue", "path": "Local.result", "value": "test value", } executor = SetValueExecutor(action_def) # Execute await executor.handle_action(ActionTrigger(), mock_context) # Verify action complete was sent mock_context.send_message.assert_called_once() message = mock_context.send_message.call_args[0][0] assert isinstance(message, ActionComplete) @pytest.mark.asyncio async def test_send_activity_executor(self, mock_context, mock_state): """Test SendActivityExecutor.""" state = DeclarativeWorkflowState(mock_state) state.initialize() action_def = { "kind": "SendActivity", "activity": {"text": "Hello, world!"}, } executor = SendActivityExecutor(action_def) # Execute await executor.handle_action(ActionTrigger(), mock_context) # Verify output was yielded mock_context.yield_output.assert_called_once_with("Hello, world!") # Note: ConditionEvaluatorExecutor tests removed - conditions are now evaluated on edges @_requires_powerfx async def test_foreach_init_with_source(self, mock_context, mock_state): """Test ForeachInitExecutor with the 'source' field.""" state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.items", ["a", "b", "c"]) action_def = { "kind": "Foreach", "source": "=Local.items", "itemName": "item", } executor = ForeachInitExecutor(action_def) # Execute await executor.handle_action(ActionTrigger(), mock_context) # Verify result mock_context.send_message.assert_called_once() message = mock_context.send_message.call_args[0][0] assert isinstance(message, LoopIterationResult) assert message.has_next is True assert message.current_index == 0 assert message.current_item == "a" @pytest.mark.asyncio async def test_foreach_init_empty(self, mock_context, mock_state): """Test ForeachInitExecutor with empty items list.""" state = DeclarativeWorkflowState(mock_state) state.initialize() # Use a literal empty list - no expression evaluation needed action_def = { "kind": "Foreach", "source": [], # Direct empty list, not an expression "itemName": "item", } executor = ForeachInitExecutor(action_def) # Execute await executor.handle_action(ActionTrigger(), mock_context) # Verify result mock_context.send_message.assert_called_once() message = mock_context.send_message.call_args[0][0] assert isinstance(message, LoopIterationResult) assert message.has_next is False class TestDeclarativeWorkflowBuilder: """Tests for DeclarativeWorkflowBuilder.""" def test_all_action_executors_available(self): """Test that all expected action types have executors.""" expected_actions = [ "SetValue", "SetVariable", "SendActivity", "EndWorkflow", "InvokeAzureAgent", "Question", ] for action in expected_actions: assert action in ALL_ACTION_EXECUTORS, f"Missing executor for {action}" def test_build_empty_workflow(self): """Test building a workflow with no actions raises an error.""" yaml_def = {"name": "empty_workflow", "actions": []} builder = DeclarativeWorkflowBuilder(yaml_def) with pytest.raises(ValueError, match="Cannot build workflow with no actions"): builder.build() def test_build_simple_workflow(self): """Test building a workflow with simple sequential actions.""" yaml_def = { "name": "simple_workflow", "actions": [ {"kind": "SendActivity", "id": "greet", "activity": {"text": "Hello!"}}, {"kind": "SetValue", "id": "set_count", "path": "Local.count", "value": 1}, ], } builder = DeclarativeWorkflowBuilder(yaml_def) workflow = builder.build() assert workflow is not None # Verify executors were created assert "greet" in builder._executors assert "set_count" in builder._executors def test_build_workflow_with_if(self): """Test building a workflow with If control flow.""" yaml_def = { "name": "conditional_workflow", "actions": [ { "kind": "If", "id": "check_flag", "condition": "=Local.flag", "then": [ {"kind": "SendActivity", "id": "say_yes", "activity": {"text": "Yes!"}}, ], "else": [ {"kind": "SendActivity", "id": "say_no", "activity": {"text": "No!"}}, ], }, ], } builder = DeclarativeWorkflowBuilder(yaml_def) workflow = builder.build() assert workflow is not None # Verify branch executors were created # Note: No join executors - branches wire directly to successor assert "say_yes" in builder._executors assert "say_no" in builder._executors # Entry node is created when If is first action assert "_workflow_entry" in builder._executors def test_build_workflow_with_foreach(self): """Test building a workflow with Foreach loop.""" yaml_def = { "name": "loop_workflow", "actions": [ { "kind": "Foreach", "id": "process_items", "source": "=Local.items", "itemName": "item", "actions": [ {"kind": "SendActivity", "id": "show_item", "activity": {"text": "=Local.item"}}, ], }, ], } builder = DeclarativeWorkflowBuilder(yaml_def) workflow = builder.build() assert workflow is not None # Verify loop executors were created assert "process_items_init" in builder._executors assert "process_items_next" in builder._executors assert "process_items_exit" in builder._executors assert "show_item" in builder._executors def test_build_workflow_with_condition_group(self): """Test building a workflow with ConditionGroup control flow.""" yaml_def = { "name": "condition_group_workflow", "actions": [ { "kind": "ConditionGroup", "id": "check_status", "conditions": [ { "condition": '=Local.status = "active"', "actions": [ {"kind": "SendActivity", "id": "say_active", "activity": {"text": "Active"}}, ], }, { "condition": '=Local.status = "pending"', "actions": [ {"kind": "SendActivity", "id": "say_pending", "activity": {"text": "Pending"}}, ], }, ], "elseActions": [ {"kind": "SendActivity", "id": "say_unknown", "activity": {"text": "Unknown"}}, ], }, ], } builder = DeclarativeWorkflowBuilder(yaml_def) workflow = builder.build() assert workflow is not None # Verify ConditionGroup branch executors were created # Note: No join executors - branches wire directly to successor assert "say_active" in builder._executors assert "say_pending" in builder._executors assert "say_unknown" in builder._executors # Entry node is created when ConditionGroup is first action assert "_workflow_entry" in builder._executors class TestAgentExecutors: """Tests for agent-related executors.""" @pytest.fixture def mock_context(self, mock_state): """Create a mock workflow context.""" ctx = MagicMock() ctx.state = mock_state ctx.send_message = AsyncMock() ctx.yield_output = AsyncMock() return ctx @pytest.fixture def mock_state(self): """Create a mock shared state.""" mock_state = MagicMock() mock_state._data = {} def mock_get(key, default=None): return mock_state._data.get(key, default) def mock_set(key, value): mock_state._data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) return mock_state @pytest.mark.asyncio async def test_invoke_agent_not_found(self, mock_context, mock_state): """Test InvokeAzureAgentExecutor raises error when agent not found.""" from agent_framework.exceptions import AgentInvalidRequestException from agent_framework_declarative._workflows import ( InvokeAzureAgentExecutor, ) state = DeclarativeWorkflowState(mock_state) state.initialize() action_def = { "kind": "InvokeAzureAgent", "agent": "non_existent_agent", "input": "test input", } executor = InvokeAzureAgentExecutor(action_def) # Execute - should raise AgentInvalidRequestException with pytest.raises(AgentInvalidRequestException) as exc_info: await executor.handle_action(ActionTrigger(), mock_context) assert "non_existent_agent" in str(exc_info.value) assert "not found in registry" in str(exc_info.value) class TestHumanInputExecutors: """Tests for human input executors.""" @pytest.fixture def mock_context(self, mock_state): """Create a mock workflow context.""" ctx = MagicMock() ctx.state = mock_state ctx.send_message = AsyncMock() ctx.yield_output = AsyncMock() ctx.request_info = AsyncMock() return ctx @pytest.fixture def mock_state(self): """Create a mock shared state.""" mock_state = MagicMock() mock_state._data = {} def mock_get(key, default=None): return mock_state._data.get(key, default) def mock_set(key, value): mock_state._data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) return mock_state @pytest.mark.asyncio async def test_question_executor(self, mock_context, mock_state): """Test QuestionExecutor.""" from agent_framework_declarative._workflows import ( ExternalInputRequest, QuestionExecutor, ) state = DeclarativeWorkflowState(mock_state) state.initialize() action_def = { "kind": "Question", "question": {"text": "What is your name?"}, "variable": "Local.name", "default": "Anonymous", } executor = QuestionExecutor(action_def) # Execute await executor.handle_action(ActionTrigger(), mock_context) # Verify request_info was called with ExternalInputRequest mock_context.request_info.assert_called_once() request = mock_context.request_info.call_args[0][0] assert isinstance(request, ExternalInputRequest) assert request.request_type == "question" assert "What is your name?" in request.message @_requires_powerfx class TestParseValueExecutor: """Tests for the ParseValue action executor.""" @pytest.fixture def mock_context(self, mock_state): """Create a mock workflow context.""" ctx = MagicMock() ctx.state = mock_state ctx.send_message = AsyncMock() ctx.yield_output = AsyncMock() return ctx @pytest.fixture def mock_state(self): """Create a mock shared state.""" mock_state = MagicMock() mock_state._data = {} def mock_get(key, default=None): return mock_state._data.get(key, default) def mock_set(key, value): mock_state._data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) return mock_state @pytest.mark.asyncio async def test_parse_value_string(self, mock_context, mock_state): """Test ParseValue with string type.""" from agent_framework_declarative._workflows._executors_basic import ParseValueExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.rawValue", "hello world") action_def = { "kind": "ParseValue", "variable": "Local.parsedValue", "value": "=Local.rawValue", "valueType": "string", } executor = ParseValueExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.parsedValue") assert result == "hello world" @pytest.mark.asyncio async def test_parse_value_number(self, mock_context, mock_state): """Test ParseValue with number type.""" from agent_framework_declarative._workflows._executors_basic import ParseValueExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.rawValue", "123") action_def = { "kind": "ParseValue", "variable": "Local.parsedValue", "value": "=Local.rawValue", "valueType": "number", } executor = ParseValueExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.parsedValue") assert result == 123 @pytest.mark.asyncio async def test_parse_value_float(self, mock_context, mock_state): """Test ParseValue with float number.""" from agent_framework_declarative._workflows._executors_basic import ParseValueExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.rawValue", "3.14") action_def = { "kind": "ParseValue", "variable": "Local.parsedValue", "value": "=Local.rawValue", "valueType": "number", } executor = ParseValueExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.parsedValue") assert result == 3.14 @pytest.mark.asyncio async def test_parse_value_boolean_true(self, mock_context, mock_state): """Test ParseValue with boolean type (true).""" from agent_framework_declarative._workflows._executors_basic import ParseValueExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.rawValue", "true") action_def = { "kind": "ParseValue", "variable": "Local.parsedValue", "value": "=Local.rawValue", "valueType": "boolean", } executor = ParseValueExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.parsedValue") assert result is True @pytest.mark.asyncio async def test_parse_value_boolean_false(self, mock_context, mock_state): """Test ParseValue with boolean type (false).""" from agent_framework_declarative._workflows._executors_basic import ParseValueExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.rawValue", "no") action_def = { "kind": "ParseValue", "variable": "Local.parsedValue", "value": "=Local.rawValue", "valueType": "boolean", } executor = ParseValueExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.parsedValue") assert result is False @pytest.mark.asyncio async def test_parse_value_object_from_json(self, mock_context, mock_state): """Test ParseValue with object type from JSON string.""" from agent_framework_declarative._workflows._executors_basic import ParseValueExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.rawValue", '{"name": "Alice", "age": 30}') action_def = { "kind": "ParseValue", "variable": "Local.parsedValue", "value": "=Local.rawValue", "valueType": "object", } executor = ParseValueExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.parsedValue") assert result == {"name": "Alice", "age": 30} @pytest.mark.asyncio async def test_parse_value_array_from_json(self, mock_context, mock_state): """Test ParseValue with array type from JSON string.""" from agent_framework_declarative._workflows._executors_basic import ParseValueExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.rawValue", '["a", "b", "c"]') action_def = { "kind": "ParseValue", "variable": "Local.parsedValue", "value": "=Local.rawValue", "valueType": "array", } executor = ParseValueExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.parsedValue") assert result == ["a", "b", "c"] @pytest.mark.asyncio async def test_parse_value_no_type_conversion(self, mock_context, mock_state): """Test ParseValue without type conversion.""" from agent_framework_declarative._workflows._executors_basic import ParseValueExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.rawValue", {"status": "active"}) action_def = { "kind": "ParseValue", "variable": "Local.parsedValue", "value": "=Local.rawValue", } executor = ParseValueExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.parsedValue") assert result == {"status": "active"} class TestEditTableExecutor: """Tests for the EditTable action executor.""" @pytest.fixture def mock_context(self, mock_state): """Create a mock workflow context.""" ctx = MagicMock() ctx.state = mock_state ctx.send_message = AsyncMock() ctx.yield_output = AsyncMock() return ctx @pytest.fixture def mock_state(self): """Create a mock shared state.""" mock_state = MagicMock() mock_state._data = {} def mock_get(key, default=None): return mock_state._data.get(key, default) def mock_set(key, value): mock_state._data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) return mock_state @pytest.mark.asyncio async def test_edit_table_add(self, mock_context, mock_state): """Test EditTable with add operation.""" from agent_framework_declarative._workflows._executors_basic import EditTableExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.items", ["a", "b"]) action_def = { "kind": "EditTable", "table": "Local.items", "operation": "add", "value": "c", } executor = EditTableExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.items") assert result == ["a", "b", "c"] @pytest.mark.asyncio async def test_edit_table_insert_at_index(self, mock_context, mock_state): """Test EditTable with insert at specific index.""" from agent_framework_declarative._workflows._executors_basic import EditTableExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.items", ["a", "c"]) action_def = { "kind": "EditTable", "table": "Local.items", "operation": "add", "value": "b", "index": 1, } executor = EditTableExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.items") assert result == ["a", "b", "c"] @pytest.mark.asyncio async def test_edit_table_remove_by_value(self, mock_context, mock_state): """Test EditTable with remove by value.""" from agent_framework_declarative._workflows._executors_basic import EditTableExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.items", ["a", "b", "c"]) action_def = { "kind": "EditTable", "table": "Local.items", "operation": "remove", "value": "b", } executor = EditTableExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.items") assert result == ["a", "c"] @pytest.mark.asyncio async def test_edit_table_remove_by_index(self, mock_context, mock_state): """Test EditTable with remove by index.""" from agent_framework_declarative._workflows._executors_basic import EditTableExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.items", ["a", "b", "c"]) action_def = { "kind": "EditTable", "table": "Local.items", "operation": "remove", "index": 1, } executor = EditTableExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.items") assert result == ["a", "c"] @pytest.mark.asyncio async def test_edit_table_clear(self, mock_context, mock_state): """Test EditTable with clear operation.""" from agent_framework_declarative._workflows._executors_basic import EditTableExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.items", ["a", "b", "c"]) action_def = { "kind": "EditTable", "table": "Local.items", "operation": "clear", } executor = EditTableExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.items") assert result == [] @pytest.mark.asyncio async def test_edit_table_update_at_index(self, mock_context, mock_state): """Test EditTable with update at index.""" from agent_framework_declarative._workflows._executors_basic import EditTableExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.items", ["a", "b", "c"]) action_def = { "kind": "EditTable", "table": "Local.items", "operation": "update", "value": "B", "index": 1, } executor = EditTableExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.items") assert result == ["a", "B", "c"] @pytest.mark.asyncio async def test_edit_table_creates_new_list(self, mock_context, mock_state): """Test EditTable creates new list if not exists.""" from agent_framework_declarative._workflows._executors_basic import EditTableExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() action_def = { "kind": "EditTable", "table": "Local.newItems", "operation": "add", "value": "first", } executor = EditTableExecutor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.newItems") assert result == ["first"] class TestEditTableV2Executor: """Tests for the EditTableV2 action executor.""" @pytest.fixture def mock_context(self, mock_state): """Create a mock workflow context.""" ctx = MagicMock() ctx.state = mock_state ctx.send_message = AsyncMock() ctx.yield_output = AsyncMock() return ctx @pytest.fixture def mock_state(self): """Create a mock shared state.""" mock_state = MagicMock() mock_state._data = {} def mock_get(key, default=None): return mock_state._data.get(key, default) def mock_set(key, value): mock_state._data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) return mock_state @pytest.mark.asyncio async def test_edit_table_v2_add(self, mock_context, mock_state): """Test EditTableV2 with add operation.""" from agent_framework_declarative._workflows._executors_basic import EditTableV2Executor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.records", [{"id": 1, "name": "Alice"}]) action_def = { "kind": "EditTableV2", "table": "Local.records", "operation": "add", "item": {"id": 2, "name": "Bob"}, } executor = EditTableV2Executor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.records") assert result == [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] @pytest.mark.asyncio async def test_edit_table_v2_add_or_update_new(self, mock_context, mock_state): """Test EditTableV2 with addOrUpdate - adding new record.""" from agent_framework_declarative._workflows._executors_basic import EditTableV2Executor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.records", [{"id": 1, "name": "Alice"}]) action_def = { "kind": "EditTableV2", "table": "Local.records", "operation": "addOrUpdate", "item": {"id": 2, "name": "Bob"}, "key": "id", } executor = EditTableV2Executor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.records") assert result == [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] @pytest.mark.asyncio async def test_edit_table_v2_add_or_update_existing(self, mock_context, mock_state): """Test EditTableV2 with addOrUpdate - updating existing record.""" from agent_framework_declarative._workflows._executors_basic import EditTableV2Executor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.records", [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]) action_def = { "kind": "EditTableV2", "table": "Local.records", "operation": "addOrUpdate", "item": {"id": 1, "name": "Alice Updated"}, "key": "id", } executor = EditTableV2Executor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.records") assert result == [{"id": 1, "name": "Alice Updated"}, {"id": 2, "name": "Bob"}] @pytest.mark.asyncio async def test_edit_table_v2_remove_by_key(self, mock_context, mock_state): """Test EditTableV2 with remove by key.""" from agent_framework_declarative._workflows._executors_basic import EditTableV2Executor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.records", [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]) action_def = { "kind": "EditTableV2", "table": "Local.records", "operation": "remove", "item": {"id": 1}, "key": "id", } executor = EditTableV2Executor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.records") assert result == [{"id": 2, "name": "Bob"}] @pytest.mark.asyncio async def test_edit_table_v2_clear(self, mock_context, mock_state): """Test EditTableV2 with clear operation.""" from agent_framework_declarative._workflows._executors_basic import EditTableV2Executor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.records", [{"id": 1}, {"id": 2}]) action_def = { "kind": "EditTableV2", "table": "Local.records", "operation": "clear", } executor = EditTableV2Executor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.records") assert result == [] @pytest.mark.asyncio async def test_edit_table_v2_update_by_key(self, mock_context, mock_state): """Test EditTableV2 with update by key.""" from agent_framework_declarative._workflows._executors_basic import EditTableV2Executor state = DeclarativeWorkflowState(mock_state) state.initialize() state.set("Local.records", [{"id": 1, "status": "pending"}, {"id": 2, "status": "pending"}]) action_def = { "kind": "EditTableV2", "table": "Local.records", "operation": "update", "item": {"id": 1, "status": "complete"}, "key": "id", } executor = EditTableV2Executor(action_def) await executor.handle_action(ActionTrigger(), mock_context) result = state.get("Local.records") assert result == [{"id": 1, "status": "complete"}, {"id": 2, "status": "pending"}] class TestCancelDialogExecutors: """Tests for CancelDialog and CancelAllDialogs executors.""" @pytest.fixture def mock_context(self, mock_state): """Create a mock workflow context.""" ctx = MagicMock() ctx.state = mock_state ctx.send_message = AsyncMock() ctx.yield_output = AsyncMock() return ctx @pytest.fixture def mock_state(self): """Create a mock shared state.""" mock_state = MagicMock() mock_state._data = {} def mock_get(key, default=None): return mock_state._data.get(key, default) def mock_set(key, value): mock_state._data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) return mock_state @pytest.mark.asyncio async def test_cancel_dialog_executor(self, mock_context, mock_state): """Test CancelDialogExecutor completes without error.""" from agent_framework_declarative._workflows._executors_control_flow import CancelDialogExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() action_def = { "kind": "CancelDialog", } executor = CancelDialogExecutor(action_def) # Should complete without raising await executor.handle_action(ActionTrigger(), mock_context) # CancelDialog is a no-op that signals termination # No assertions needed - just verify it doesn't raise @pytest.mark.asyncio async def test_cancel_all_dialogs_executor(self, mock_context, mock_state): """Test CancelAllDialogsExecutor completes without error.""" from agent_framework_declarative._workflows._executors_control_flow import CancelAllDialogsExecutor state = DeclarativeWorkflowState(mock_state) state.initialize() action_def = { "kind": "CancelAllDialogs", } executor = CancelAllDialogsExecutor(action_def) # Should complete without raising await executor.handle_action(ActionTrigger(), mock_context) # CancelAllDialogs is a no-op that signals termination # No assertions needed - just verify it doesn't raise class TestExtractJsonFromResponse: """Tests for the _extract_json_from_response helper function.""" def test_pure_json_object(self): """Test parsing pure JSON object.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = '{"TicketId": "123", "Status": "pending"}' result = _extract_json_from_response(text) assert result == {"TicketId": "123", "Status": "pending"} def test_pure_json_array(self): """Test parsing pure JSON array.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = '["item1", "item2", "item3"]' result = _extract_json_from_response(text) assert result == ["item1", "item2", "item3"] def test_json_in_markdown_code_block(self): """Test extracting JSON from markdown code block.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = """Here's the response: ```json {"TicketId": "456", "Summary": "Test ticket"} ``` """ result = _extract_json_from_response(text) assert result == {"TicketId": "456", "Summary": "Test ticket"} def test_json_in_plain_code_block(self): """Test extracting JSON from plain markdown code block.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = """The result: ``` {"Status": "complete"} ``` """ result = _extract_json_from_response(text) assert result == {"Status": "complete"} def test_json_with_leading_text(self): """Test extracting JSON with leading text.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = 'Here is the ticket information: {"TicketId": "789", "Priority": "high"}' result = _extract_json_from_response(text) assert result == {"TicketId": "789", "Priority": "high"} def test_json_with_trailing_text(self): """Test extracting JSON with trailing text.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = '{"IsResolved": true, "NeedsTicket": false} That is the status.' result = _extract_json_from_response(text) assert result == {"IsResolved": True, "NeedsTicket": False} def test_nested_json_object(self): """Test extracting nested JSON object.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = 'Result: {"outer": {"inner": {"value": 42}}}' result = _extract_json_from_response(text) assert result == {"outer": {"inner": {"value": 42}}} def test_json_with_array_inside(self): """Test extracting JSON with arrays inside.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = 'Data: {"items": ["a", "b", "c"], "count": 3}' result = _extract_json_from_response(text) assert result == {"items": ["a", "b", "c"], "count": 3} def test_json_with_escaped_quotes(self): """Test extracting JSON with escaped quotes in strings.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = r'Response: {"message": "He said \"hello\"", "valid": true}' result = _extract_json_from_response(text) assert result == {"message": 'He said "hello"', "valid": True} def test_empty_string_returns_none(self): """Test that empty string returns None.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) result = _extract_json_from_response("") assert result is None def test_whitespace_only_returns_none(self): """Test that whitespace-only string returns None.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) result = _extract_json_from_response(" \n\t ") assert result is None def test_no_json_raises_error(self): """Test that text without JSON raises JSONDecodeError.""" import json from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) with pytest.raises(json.JSONDecodeError): _extract_json_from_response("This is just plain text with no JSON") def test_json_with_braces_in_string(self): """Test JSON with braces inside string values.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = 'Info: {"template": "Hello {name}, your id is {id}"}' result = _extract_json_from_response(text) assert result == {"template": "Hello {name}, your id is {id}"} def test_multiple_json_objects_returns_last(self): """Test that multiple JSON objects returns the last one (final result).""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) # Simulates streaming agent output with partial then final result text = '{"TicketId":"TBD","TicketSummary":"partial"}{"TicketId":"75178c95","TicketSummary":"final result"}' result = _extract_json_from_response(text) assert result == {"TicketId": "75178c95", "TicketSummary": "final result"} def test_multiple_json_objects_with_different_schemas(self): """Test multiple JSON objects with different structures returns the last.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) # First object is from one agent, second is from another text = '{"IsResolved":false,"NeedsTicket":true}{"TicketId":"abc123","Summary":"Issue logged"}' result = _extract_json_from_response(text) assert result == {"TicketId": "abc123", "Summary": "Issue logged"} def test_multiple_json_objects_with_text_between(self): """Test multiple JSON objects separated by text.""" from agent_framework_declarative._workflows._executors_agents import ( _extract_json_from_response, ) text = 'First: {"status": "pending"} then later: {"status": "complete", "id": 42}' result = _extract_json_from_response(text) assert result == {"status": "complete", "id": 42} class TestPowerFxConditionalImport: """The _declarative_base module should be importable without dotnet/powerfx.""" def test_import_guard_exists(self): """The powerfx import must be wrapped in try/except.""" import agent_framework_declarative._workflows._declarative_base as base_mod assert hasattr(base_mod, "DeclarativeWorkflowState") assert hasattr(base_mod, "Engine") # Engine should either be the real class or None — never an ImportError engine = base_mod.Engine assert engine is None or callable(engine) def test_eval_raises_when_engine_unavailable(self): """eval() should raise RuntimeError when Engine is None.""" import agent_framework_declarative._workflows._declarative_base as base_mod mock_state = MagicMock() mock_state._data: dict[str, Any] = {} mock_state.get = MagicMock(side_effect=lambda k, d=None: mock_state._data.get(k, d)) mock_state.set = MagicMock(side_effect=lambda k, v: mock_state._data.__setitem__(k, v)) state = DeclarativeWorkflowState(mock_state) state.initialize({"name": "test"}) original_engine = base_mod.Engine try: base_mod.Engine = None with pytest.raises(RuntimeError, match="PowerFx is not available"): state.eval("=Local.counter + 1") finally: base_mod.Engine = original_engine def test_eval_passes_through_plain_strings_without_engine(self): """Non-PowerFx strings (no leading '=') should work without Engine.""" import agent_framework_declarative._workflows._declarative_base as base_mod mock_state = MagicMock() mock_state._data: dict[str, Any] = {} mock_state.get = MagicMock(side_effect=lambda k, d=None: mock_state._data.get(k, d)) mock_state.set = MagicMock(side_effect=lambda k, v: mock_state._data.__setitem__(k, v)) state = DeclarativeWorkflowState(mock_state) state.initialize() original_engine = base_mod.Engine try: base_mod.Engine = None assert state.eval("hello world") == "hello world" assert state.eval("") == "" assert state.eval(42) == 42 finally: base_mod.Engine = original_engine class TestExecutorKwargsForwarding: """Workflow run kwargs should be forwarded through executor agent invocations.""" @pytest.mark.asyncio async def test_invoke_agent_forwards_kwargs(self): """InvokeAzureAgentExecutor should forward run_kwargs to agent.run().""" from agent_framework._workflows._const import WORKFLOW_RUN_KWARGS_KEY from agent_framework._workflows._state import State from agent_framework_declarative._workflows._executors_agents import ( InvokeAzureAgentExecutor, ) # Create a mock State with kwargs stored mock_state = MagicMock(spec=State) state_data: dict[str, Any] = {} def mock_get(key, default=None): return state_data.get(key, default) def mock_set(key, value): state_data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) # Store kwargs in state like Workflow.run() does test_kwargs = {"user_token": "abc123", "service_config": {"endpoint": "http://test"}} state_data[WORKFLOW_RUN_KWARGS_KEY] = test_kwargs # Initialize declarative state dws = DeclarativeWorkflowState(mock_state) dws.initialize({"input": "hello"}) # Create a mock agent mock_response = MagicMock() mock_response.text = "response text" mock_response.messages = [] mock_response.tool_calls = [] mock_agent = AsyncMock() mock_agent.run = AsyncMock(return_value=mock_response) # Create a mock workflow context mock_ctx = MagicMock() mock_ctx.get_state = MagicMock(side_effect=mock_get) mock_ctx.yield_output = AsyncMock() executor = InvokeAzureAgentExecutor.__new__(InvokeAzureAgentExecutor) executor._agents = {"test_agent": mock_agent} await executor._invoke_agent_and_store_results( agent=mock_agent, agent_name="test_agent", input_text="hello", state=dws, ctx=mock_ctx, messages_var=None, response_obj_var=None, result_property=None, auto_send=True, ) # Verify agent.run was called with kwargs mock_agent.run.assert_called_once() call_kwargs = mock_agent.run.call_args # Check options contains additional_function_arguments assert "options" in call_kwargs.kwargs assert call_kwargs.kwargs["options"]["additional_function_arguments"] == test_kwargs # Check direct kwargs were passed assert call_kwargs.kwargs.get("user_token") == "abc123" assert call_kwargs.kwargs.get("service_config") == {"endpoint": "http://test"} @pytest.mark.asyncio async def test_invoke_agent_merges_caller_options(self): """Caller-provided options in run_kwargs should be merged, not cause TypeError.""" from agent_framework._workflows._const import WORKFLOW_RUN_KWARGS_KEY from agent_framework._workflows._state import State from agent_framework_declarative._workflows._executors_agents import ( InvokeAzureAgentExecutor, ) mock_state = MagicMock(spec=State) state_data: dict[str, Any] = {} def mock_get(key, default=None): return state_data.get(key, default) def mock_set(key, value): state_data[key] = value mock_state.get = MagicMock(side_effect=mock_get) mock_state.set = MagicMock(side_effect=mock_set) # Include 'options' in run_kwargs to test merge behavior test_kwargs = { "user_token": "abc123", "options": {"temperature": 0.5}, } state_data[WORKFLOW_RUN_KWARGS_KEY] = test_kwargs dws = DeclarativeWorkflowState(mock_state) dws.initialize({"input": "hello"}) mock_response = MagicMock() mock_response.text = "response text" mock_response.messages = [] mock_response.tool_calls = [] mock_agent = AsyncMock() mock_agent.run = AsyncMock(return_value=mock_response) mock_ctx = MagicMock() mock_ctx.get_state = MagicMock(side_effect=mock_get) mock_ctx.yield_output = AsyncMock() executor = InvokeAzureAgentExecutor.__new__(InvokeAzureAgentExecutor) executor._agents = {"test_agent": mock_agent} await executor._invoke_agent_and_store_results( agent=mock_agent, agent_name="test_agent", input_text="hello", state=dws, ctx=mock_ctx, messages_var=None, response_obj_var=None, result_property=None, auto_send=True, ) mock_agent.run.assert_called_once() call_kwargs = mock_agent.run.call_args # Caller options should be merged with additional_function_arguments merged_options = call_kwargs.kwargs["options"] assert merged_options["temperature"] == 0.5 assert "additional_function_arguments" in merged_options # Direct kwargs should be passed without 'options' (no duplicate keyword) assert call_kwargs.kwargs.get("user_token") == "abc123"