diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_mcp.py b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_mcp.py index 9a7d918704..af43518165 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_mcp.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_mcp.py @@ -10,16 +10,11 @@ optional conversation history. Supports a human-in-loop approval flow via Security notes: -- The executor never echoes header VALUES (auth tokens, API keys) into the - approval request — only header NAMES are surfaced to the caller. This - matches the security posture of :mod:`._executors_http` (which never logs - request headers either) and prevents secrets from leaking through workflow - events that are typically observable to operators / UIs. -- The :class:`MCPToolApprovalRequest` payload is the source of truth for the - resumed invocation: ``tool_name``, ``server_url``, ``server_label``, - ``arguments``, and ``connection_name`` come from the request the reviewer - approved. Headers are re-evaluated from the action definition on resume so - that secret values are not persisted in the workflow's checkpoint state. +- Approval requests surface header NAMES only; header values are not echoed, + matching the posture of :mod:`._executors_http`. +- :class:`MCPToolApprovalRequest` carries the values the resume handler will + use; header values are re-evaluated on resume to keep secrets out of + checkpoint state. - Tool outputs flow back into agent conversations through ``conversationId`` and through Tool-role messages emitted to ``output.messages``. They share the same prompt-injection risk surface as ``HttpRequestAction``: workflow @@ -69,23 +64,16 @@ logger = logging.getLogger(__name__) class MCPToolApprovalRequest: """Approval request emitted before invoking an MCP tool. - Mirrors :class:`agent_framework_declarative.ToolApprovalRequest` but for - MCP-style invocations. Only header NAMES are surfaced — header values are - intentionally omitted because they typically carry authentication - secrets. - Attributes: - request_id: Unique identifier for this approval request. Matches the - id workflow event-emitters use. - tool_name: Evaluated name of the tool to be invoked. + request_id: Identifier matching the framework's pending-request key. + tool_name: Evaluated tool name. server_url: Evaluated MCP server URL. - server_label: Optional human-readable label for diagnostics. - arguments: Evaluated arguments to be forwarded to the tool. - header_names: Sorted list of outbound header names (no values). Empty - when no headers are configured. - connection_name: Optional connection identifier the invocation will - use. Surfaced so the reviewer can see which connection is bound - to the approved call. + server_label: Optional human-readable label. + arguments: Evaluated tool arguments. + header_names: Outbound header names (values withheld). + connection_name: Connection identifier the invocation will use. + metadata: Internal routing data pinned at approval-request time + (e.g. ``conversation_id``) for use by the resume handler. """ request_id: str @@ -95,6 +83,7 @@ class MCPToolApprovalRequest: arguments: dict[str, Any] header_names: list[str] = field(default_factory=lambda: []) connection_name: str | None = None + metadata: dict[str, Any] = field(default_factory=lambda: {}) # --------------------------------------------------------------------------- @@ -102,21 +91,15 @@ class MCPToolApprovalRequest: # --------------------------------------------------------------------------- -def _get_messages_path(state: DeclarativeWorkflowState, conversation_id_expr: str | None) -> str | None: - """Return the configured conversation messages path, if any. - - Returns ``System.conversations.{evaluated_id}.messages`` when a - ``conversation_id_expr`` is configured and evaluates to a non-empty value. - Returns ``None`` when no conversation id expression is configured or when - the expression evaluates to ``None`` or an empty string (mirrors .NET - ``GetConversationId`` behaviour). - """ - if not conversation_id_expr: +def _evaluate_conversation_id(state: DeclarativeWorkflowState, conversation_id_expr: Any) -> str | None: + """Return the evaluated ``conversationId`` string, or None when empty/unset.""" + if not isinstance(conversation_id_expr, str) or not conversation_id_expr: return None evaluated = state.eval_if_expression(conversation_id_expr) - if evaluated is None or (isinstance(evaluated, str) and not evaluated): + if evaluated is None: return None - return f"System.conversations.{evaluated}.messages" + text = str(evaluated) + return text or None def _get_output_path(action_def: Mapping[str, Any], key: str) -> str | None: @@ -239,6 +222,7 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): if require_approval: request_id = str(uuid.uuid4()) + conversation_id = _evaluate_conversation_id(state, conversation_id_expr) request = MCPToolApprovalRequest( request_id=request_id, tool_name=tool_name, @@ -247,6 +231,7 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): arguments=arguments, header_names=sorted(headers.keys()), connection_name=connection_name, + metadata={"conversation_id": conversation_id}, ) logger.info( "%s: requesting approval for MCP tool '%s' on '%s'", @@ -255,7 +240,6 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): server_url, ) await ctx.request_info(request, ToolApprovalResponse, request_id=request_id) - # Workflow yields here — resume in handle_approval_response. return # No approval required - invoke directly. @@ -273,7 +257,7 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): state=state, result=result, auto_send=auto_send, - conversation_id_expr=conversation_id_expr if isinstance(conversation_id_expr, str) else None, + conversation_id=_evaluate_conversation_id(state, conversation_id_expr), output_messages_path=output_messages_path, output_result_path=output_result_path, ) @@ -288,25 +272,19 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): response: ToolApprovalResponse, ctx: WorkflowContext[ActionComplete, str], ) -> None: - """Resume after the workflow yielded for an approval request. - - Invocation fields (``tool_name``, ``server_url``, ``server_label``, - ``arguments``, ``connection_name``) are sourced from - ``original_request``. Output configuration is re-derived from the - action definition; header values are re-evaluated from the action - definition so secrets remain out of checkpoint state. - """ + """Resume the invocation using the values pinned on ``original_request``.""" state = self._get_state(ctx.state) tool_name = original_request.tool_name server_url = original_request.server_url server_label = original_request.server_label arguments = original_request.arguments - connection_name = original_request.connection_name + connection_name = getattr(original_request, "connection_name", None) + metadata: dict[str, Any] = getattr(original_request, "metadata", None) or {} + raw_conversation_id = metadata.get("conversation_id") + conversation_id = str(raw_conversation_id) if isinstance(raw_conversation_id, str) and raw_conversation_id else None auto_send = self._get_auto_send(state) - conversation_id_value = self._action_def.get("conversationId") - conversation_id_expr = conversation_id_value if isinstance(conversation_id_value, str) else None output_messages_path = _get_output_path(self._action_def, "messages") output_result_path = _get_output_path(self._action_def, "result") @@ -321,7 +299,6 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): await ctx.send_message(ActionComplete()) return - # Approved — re-evaluate headers (not surfaced at approval time for security). headers = self._evaluate_headers(state, self._action_def.get("headers")) invocation = MCPToolInvocation( @@ -338,7 +315,7 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): state=state, result=result, auto_send=auto_send, - conversation_id_expr=conversation_id_expr, + conversation_id=conversation_id, output_messages_path=output_messages_path, output_result_path=output_result_path, ) @@ -499,7 +476,7 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): state: DeclarativeWorkflowState, result: MCPToolResult, auto_send: bool, - conversation_id_expr: str | None, + conversation_id: str | None, output_messages_path: str | None, output_result_path: str | None, ) -> None: @@ -528,14 +505,10 @@ class InvokeMcpToolActionExecutor(DeclarativeActionExecutor): if auto_send and parsed_results: await ctx.yield_output(_format_outputs_for_send(parsed_results)) - if conversation_id_expr: - messages_path = _get_messages_path(state, conversation_id_expr) - if messages_path is not None: - # Mirrors .NET: conversation gets ASSISTANT-role message with - # the same outputs (so chat history reads it as the agent's - # contribution). - assistant_message = Message(role="assistant", contents=list(result.outputs)) - state.append(messages_path, assistant_message) + if conversation_id: + messages_path = f"System.conversations.{conversation_id}.messages" + assistant_message = Message(role="assistant", contents=list(result.outputs)) + state.append(messages_path, assistant_message) @staticmethod def _assign_error( diff --git a/python/packages/declarative/tests/test_declarative_approval_binding.py b/python/packages/declarative/tests/test_declarative_approval_binding.py index f96b16ee8a..ba0d4108f1 100644 --- a/python/packages/declarative/tests/test_declarative_approval_binding.py +++ b/python/packages/declarative/tests/test_declarative_approval_binding.py @@ -21,6 +21,7 @@ The resumed invocation MUST come from the framework-delivered """ import sys +from dataclasses import dataclass from typing import Any from unittest.mock import AsyncMock, MagicMock @@ -51,6 +52,7 @@ from agent_framework_declarative._workflows import ( # noqa: E402 ToolApprovalRequest, ToolApprovalResponse, ) +from agent_framework_declarative._workflows._declarative_base import DeclarativeWorkflowState # noqa: E402 from agent_framework_declarative._workflows._executors_mcp import ( # noqa: E402 InvokeMcpToolActionExecutor, ) @@ -439,3 +441,88 @@ class TestMcpToolApprovalBinding: request = mock_context.request_info.call_args[0][0] assert isinstance(request, MCPToolApprovalRequest) assert request.connection_name == "conn-from-action" + + @pytest.mark.asyncio + async def test_request_payload_pins_conversation_id(self, mock_state, mock_context) -> None: + """Evaluated ``conversationId`` is pinned in ``metadata`` at request-emit time.""" + from agent_framework_declarative._workflows._declarative_base import ActionTrigger + + _seed_state(mock_state) + state = DeclarativeWorkflowState(mock_state) + state.set("Local.targetConversation", "conv-original") + action = self._action() + action["conversationId"] = "=Local.targetConversation" + executor = InvokeMcpToolActionExecutor(action, mcp_tool_handler=_RecordingMcpHandler()) + + await executor.handle_action(ActionTrigger(), mock_context) + + mock_context.request_info.assert_called_once() + request = mock_context.request_info.call_args[0][0] + assert isinstance(request, MCPToolApprovalRequest) + assert request.metadata.get("conversation_id") == "conv-original" + + @pytest.mark.asyncio + async def test_resume_routes_output_to_pinned_conversation_not_mutated_state( + self, mock_state, mock_context + ) -> None: + """Output appends to the conversation pinned on ``original_request``, not the + current state evaluation.""" + _seed_state(mock_state) + state = DeclarativeWorkflowState(mock_state) + state.set("System.conversations.conv-original.messages", []) + state.set("System.conversations.conv-mutated.messages", []) + state.set("Local.targetConversation", "conv-mutated") + + handler = _RecordingMcpHandler(MCPToolResult(outputs=[Content.from_text("approved-output")])) + action = self._action() + action["conversationId"] = "=Local.targetConversation" + executor = InvokeMcpToolActionExecutor(action, mcp_tool_handler=handler) + + original_request = MCPToolApprovalRequest( + request_id="r-1", + tool_name="search", + server_url="https://mcp.example/api", + server_label=None, + arguments={"q": "x"}, + connection_name=None, + metadata={"conversation_id": "conv-original"}, + ) + await executor.handle_approval_response(original_request, ToolApprovalResponse(approved=True), mock_context) + + assert len(state.get("System.conversations.conv-original.messages") or []) == 1 + assert state.get("System.conversations.conv-mutated.messages") == [] + + @pytest.mark.asyncio + async def test_resume_handles_legacy_request_without_new_fields(self, mock_state, mock_context) -> None: + """Resume tolerates payloads lacking ``connection_name`` / ``metadata`` (legacy pickle shape).""" + + @dataclass + class _LegacyMCPApprovalRequest: + request_id: str + tool_name: str + server_url: str + server_label: str | None + arguments: dict[str, Any] + header_names: list[str] + + _seed_state(mock_state) + handler = _RecordingMcpHandler() + executor = InvokeMcpToolActionExecutor(self._action(), mcp_tool_handler=handler) + + legacy_request = _LegacyMCPApprovalRequest( + request_id="r-1", + tool_name="search", + server_url="https://mcp.example/api", + server_label=None, + arguments={"q": "x"}, + header_names=[], + ) + await executor.handle_approval_response( + legacy_request, # type: ignore[arg-type] + ToolApprovalResponse(approved=True), + mock_context, + ) + + assert handler.call_count == 1 + assert handler.last is not None + assert handler.last.connection_name is None