mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Python: Enforce approval_mode in Claude and GitHub Copilot agents (#5562)
* Python: Enforce approval_mode in Claude and GitHub Copilot agents Tools declared with approval_mode="always_require" were bypassed by the ClaudeAgent and GitHubCopilotAgent because their SDK-managed tool-calling loops invoke FunctionTool.invoke() directly via package-supplied handlers, skipping the standard _try_execute_function_calls approval gate. Per discussion on #5494, the fix lives in the agents (not in FunctionTool): any flag added to the tool itself can be spoofed by code with the same level of access, so the security boundary is the agent that owns the tool-calling loop. - Add on_function_approval option to ClaudeAgentOptions and GitHubCopilotOptions. Callback receives a FunctionCallContent describing the pending call and returns bool (sync or async). - Gate FunctionTool.invoke() inside each agent's existing tool-handler closure when approval_mode == "always_require". Default policy is deny; callbacks that raise also deny safely. - Deny path returns a tool-error to the model (Claude: text content; Copilot: ToolResult(result_type="failure", error="approval_denied")) so the LLM can react gracefully instead of silently failing. - Tests for both agents covering: deny by default, sync False, sync True, async True, callback-raises -> deny, no-op for never_require tools. - Samples demonstrating sync, async, and deny-by-default flows for both agents. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address PR review: preserve empty arg dicts, reject runtime approval override - _resolve_function_approval no longer collapses {} into None when building the FunctionCallContent passed to the callback (Claude + Copilot). - Claude _apply_runtime_options and Copilot _run_impl/_stream_updates now raise ValueError if on_function_approval is supplied via per-run options, instead of silently ignoring it. Approval policy must be set at agent construction time. - Drop unnecessary # type: ignore[attr-defined] on Content.name/.arguments in samples (Content is a unified class with both attributes defined). - Add regression tests for the new runtime-options validation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * warning when non callback handler and approval needed --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
626b418622
commit
c1cc6ee6df
@@ -4,9 +4,10 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import inspect
|
||||
import logging
|
||||
import sys
|
||||
from collections.abc import AsyncIterable, Awaitable, Callable, MutableMapping, Sequence
|
||||
from collections.abc import AsyncIterable, Awaitable, Callable, Mapping, MutableMapping, Sequence
|
||||
from typing import Any, ClassVar, Generic, Literal, TypedDict, overload
|
||||
|
||||
if sys.version_info >= (3, 11):
|
||||
@@ -59,6 +60,59 @@ DEFAULT_TIMEOUT_SECONDS: float = 60.0
|
||||
PermissionHandlerType = Callable[[PermissionRequest, dict[str, str]], PermissionRequestResult]
|
||||
"""Type for permission request handlers."""
|
||||
|
||||
|
||||
FunctionApprovalCallback = Callable[[Content], "bool | Awaitable[bool]"]
|
||||
"""Callback invoked by the agent before executing a FunctionTool that requires approval.
|
||||
|
||||
The callback receives a ``FunctionCallContent`` describing the pending call
|
||||
(``name``, ``arguments``, and a synthetic ``call_id``) and must return ``True``
|
||||
to allow execution or ``False`` to deny it. Both synchronous and ``await``-able
|
||||
return values are supported.
|
||||
|
||||
The Copilot CLI manages its own tool-calling loop, so the framework cannot
|
||||
round-trip a ``FunctionApprovalRequestContent`` / ``FunctionApprovalResponseContent``
|
||||
pair the way the standard chat-client pipeline does. This callback is the
|
||||
agent-level enforcement point for tools declared with
|
||||
``approval_mode="always_require"``: when no callback is configured the agent
|
||||
denies these calls by default.
|
||||
|
||||
Note: this is independent of ``on_permission_request``, which gates the
|
||||
Copilot SDK's *built-in* shell/file actions; ``on_function_approval`` gates
|
||||
agent-framework ``FunctionTool`` calls.
|
||||
"""
|
||||
|
||||
|
||||
async def _resolve_function_approval(
|
||||
callback: FunctionApprovalCallback | None,
|
||||
func_tool: FunctionTool,
|
||||
arguments: Mapping[str, Any] | None,
|
||||
) -> bool:
|
||||
"""Run the agent-level approval callback for a pending tool call.
|
||||
|
||||
Returns ``True`` only when ``callback`` is configured and explicitly returns
|
||||
a truthy value. A missing callback or any callback failure is treated as a
|
||||
denial so the secure-by-default policy holds even if the user code raises.
|
||||
"""
|
||||
if callback is None:
|
||||
return False
|
||||
request = Content.from_function_call(
|
||||
call_id=f"af-copilot-approval::{func_tool.name}",
|
||||
name=func_tool.name,
|
||||
arguments=None if arguments is None else dict(arguments),
|
||||
)
|
||||
try:
|
||||
outcome = callback(request)
|
||||
if inspect.isawaitable(outcome):
|
||||
outcome = await outcome
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"on_function_approval callback raised for tool '%s'; denying execution.",
|
||||
func_tool.name,
|
||||
)
|
||||
return False
|
||||
return bool(outcome)
|
||||
|
||||
|
||||
logger = logging.getLogger("agent_framework.github_copilot")
|
||||
|
||||
|
||||
@@ -133,6 +187,14 @@ class GitHubCopilotOptions(TypedDict, total=False):
|
||||
instead of the default GitHub Copilot backend.
|
||||
"""
|
||||
|
||||
on_function_approval: FunctionApprovalCallback
|
||||
"""Approval callback for ``FunctionTool`` instances declared with
|
||||
``approval_mode="always_require"``. The callback is awaited (sync or async)
|
||||
inside the SDK tool-handler before the tool is executed; a falsy return
|
||||
value denies the call. If omitted, calls to such tools are denied with an
|
||||
explanatory message returned to the model. This is independent of
|
||||
``on_permission_request``, which gates the Copilot SDK's built-in actions."""
|
||||
|
||||
|
||||
OptionsT = TypeVar(
|
||||
"OptionsT",
|
||||
@@ -238,6 +300,7 @@ class RawGitHubCopilotAgent(BaseAgent, Generic[OptionsT]):
|
||||
on_permission_request: PermissionHandlerType | None = opts.pop("on_permission_request", None)
|
||||
mcp_servers: dict[str, MCPServerConfig] | None = opts.pop("mcp_servers", None)
|
||||
provider: ProviderConfig | None = opts.pop("provider", None)
|
||||
on_function_approval: FunctionApprovalCallback | None = opts.pop("on_function_approval", None)
|
||||
|
||||
self._settings = load_settings(
|
||||
GitHubCopilotSettings,
|
||||
@@ -252,6 +315,7 @@ class RawGitHubCopilotAgent(BaseAgent, Generic[OptionsT]):
|
||||
|
||||
self._tools = normalize_tools(tools)
|
||||
self._permission_handler = on_permission_request
|
||||
self._function_approval_handler: FunctionApprovalCallback | None = on_function_approval
|
||||
self._mcp_servers = mcp_servers
|
||||
self._provider = provider
|
||||
self._default_options = opts
|
||||
@@ -425,6 +489,12 @@ class RawGitHubCopilotAgent(BaseAgent, Generic[OptionsT]):
|
||||
session = self.create_session()
|
||||
|
||||
opts: dict[str, Any] = dict(options) if options else {}
|
||||
if "on_function_approval" in opts:
|
||||
raise ValueError(
|
||||
"on_function_approval is a security-sensitive option and must be set "
|
||||
"via default_options at agent construction time. It cannot be overridden "
|
||||
"per run."
|
||||
)
|
||||
timeout = opts.get("timeout") or self._settings.get("timeout") or DEFAULT_TIMEOUT_SECONDS
|
||||
|
||||
input_messages = normalize_messages(messages)
|
||||
@@ -504,6 +574,12 @@ class RawGitHubCopilotAgent(BaseAgent, Generic[OptionsT]):
|
||||
session = self.create_session()
|
||||
|
||||
opts: dict[str, Any] = dict(options) if options else {}
|
||||
if "on_function_approval" in opts:
|
||||
raise ValueError(
|
||||
"on_function_approval is a security-sensitive option and must be set "
|
||||
"via default_options at agent construction time. It cannot be overridden "
|
||||
"per run."
|
||||
)
|
||||
|
||||
input_messages = normalize_messages(messages)
|
||||
|
||||
@@ -681,10 +757,33 @@ class RawGitHubCopilotAgent(BaseAgent, Generic[OptionsT]):
|
||||
|
||||
def _tool_to_copilot_tool(self, ai_func: FunctionTool) -> CopilotTool:
|
||||
"""Convert an FunctionTool to a Copilot SDK tool."""
|
||||
approval_handler = self._function_approval_handler
|
||||
requires_approval = ai_func.approval_mode == "always_require"
|
||||
|
||||
async def handler(invocation: ToolInvocation) -> ToolResult:
|
||||
args: dict[str, Any] = invocation.arguments or {}
|
||||
try:
|
||||
if requires_approval and not await _resolve_function_approval(approval_handler, ai_func, args):
|
||||
deny_text = (
|
||||
f"Tool '{ai_func.name}' requires human approval "
|
||||
"(approval_mode='always_require') and the request was denied."
|
||||
if approval_handler is not None
|
||||
else (
|
||||
f"Tool '{ai_func.name}' requires human approval "
|
||||
"(approval_mode='always_require') but no on_function_approval "
|
||||
"callback is configured on the agent; the request was denied."
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"Denying execution of tool '%s' (approval_mode='always_require', %s)",
|
||||
ai_func.name,
|
||||
"callback denied" if approval_handler is not None else "no callback configured",
|
||||
)
|
||||
return ToolResult(
|
||||
text_result_for_llm=deny_text,
|
||||
result_type="failure",
|
||||
error="approval_denied",
|
||||
)
|
||||
if ai_func.input_model:
|
||||
args_instance = ai_func.input_model(**args)
|
||||
result = await ai_func.invoke(arguments=args_instance)
|
||||
|
||||
@@ -1483,6 +1483,183 @@ class TestGitHubCopilotAgentToolConversion:
|
||||
assert result[1] == copilot_tool
|
||||
|
||||
|
||||
class TestGitHubCopilotAgentFunctionApproval:
|
||||
"""Tests that ``approval_mode='always_require'`` is enforced at the agent boundary."""
|
||||
|
||||
async def test_handler_denies_when_no_callback_configured(
|
||||
self,
|
||||
mock_client: MagicMock,
|
||||
) -> None:
|
||||
"""Approval-required tool must be denied without executing when no callback is set."""
|
||||
from agent_framework import tool
|
||||
|
||||
invocations: list[Any] = []
|
||||
|
||||
@tool(approval_mode="always_require")
|
||||
def dangerous(path: str) -> str:
|
||||
"""A tool that requires human approval."""
|
||||
invocations.append(path)
|
||||
return f"deleted {path}"
|
||||
|
||||
agent = GitHubCopilotAgent(client=mock_client)
|
||||
copilot_tool = agent._tool_to_copilot_tool(dangerous) # type: ignore[reportPrivateUsage]
|
||||
|
||||
result = await copilot_tool.handler(ToolInvocation(arguments={"path": "/critical"}))
|
||||
|
||||
assert invocations == []
|
||||
assert result.result_type == "failure"
|
||||
assert result.error == "approval_denied"
|
||||
assert "no on_function_approval callback is configured" in result.text_result_for_llm
|
||||
|
||||
async def test_handler_denies_when_callback_returns_false(
|
||||
self,
|
||||
mock_client: MagicMock,
|
||||
) -> None:
|
||||
"""Falsy callback return value must deny the call and skip execution."""
|
||||
from agent_framework import Content, tool
|
||||
|
||||
invocations: list[Any] = []
|
||||
seen: list[Content] = []
|
||||
|
||||
def deny(call: Content) -> bool:
|
||||
seen.append(call)
|
||||
return False
|
||||
|
||||
@tool(approval_mode="always_require")
|
||||
def dangerous(path: str) -> str:
|
||||
"""A tool that requires human approval."""
|
||||
invocations.append(path)
|
||||
return f"deleted {path}"
|
||||
|
||||
agent = GitHubCopilotAgent(
|
||||
client=mock_client,
|
||||
default_options={"on_function_approval": deny},
|
||||
)
|
||||
copilot_tool = agent._tool_to_copilot_tool(dangerous) # type: ignore[reportPrivateUsage]
|
||||
|
||||
result = await copilot_tool.handler(ToolInvocation(arguments={"path": "/critical"}))
|
||||
|
||||
assert invocations == []
|
||||
assert len(seen) == 1
|
||||
assert seen[0].type == "function_call"
|
||||
assert seen[0].name == "dangerous" # type: ignore[attr-defined]
|
||||
assert seen[0].arguments == {"path": "/critical"} # type: ignore[attr-defined]
|
||||
assert result.result_type == "failure"
|
||||
assert result.error == "approval_denied"
|
||||
|
||||
async def test_handler_executes_when_callback_returns_true(
|
||||
self,
|
||||
mock_client: MagicMock,
|
||||
) -> None:
|
||||
"""Truthy callback return value must allow the tool to execute normally."""
|
||||
from agent_framework import Content, tool
|
||||
|
||||
def approve(call: Content) -> bool:
|
||||
return True
|
||||
|
||||
@tool(approval_mode="always_require")
|
||||
def guarded(x: int) -> str:
|
||||
"""A tool that requires human approval."""
|
||||
return f"result={x}"
|
||||
|
||||
agent = GitHubCopilotAgent(
|
||||
client=mock_client,
|
||||
default_options={"on_function_approval": approve},
|
||||
)
|
||||
copilot_tool = agent._tool_to_copilot_tool(guarded) # type: ignore[reportPrivateUsage]
|
||||
|
||||
result = await copilot_tool.handler(ToolInvocation(arguments={"x": 42}))
|
||||
|
||||
assert result.result_type == "success"
|
||||
assert result.text_result_for_llm == "result=42"
|
||||
|
||||
async def test_handler_supports_async_callback(
|
||||
self,
|
||||
mock_client: MagicMock,
|
||||
) -> None:
|
||||
"""Async callback must be awaited and respected."""
|
||||
from agent_framework import Content, tool
|
||||
|
||||
async def approve(call: Content) -> bool:
|
||||
return True
|
||||
|
||||
@tool(approval_mode="always_require")
|
||||
def guarded(x: int) -> str:
|
||||
"""A tool that requires human approval."""
|
||||
return f"async={x}"
|
||||
|
||||
agent = GitHubCopilotAgent(
|
||||
client=mock_client,
|
||||
default_options={"on_function_approval": approve},
|
||||
)
|
||||
copilot_tool = agent._tool_to_copilot_tool(guarded) # type: ignore[reportPrivateUsage]
|
||||
|
||||
result = await copilot_tool.handler(ToolInvocation(arguments={"x": 7}))
|
||||
|
||||
assert result.result_type == "success"
|
||||
assert result.text_result_for_llm == "async=7"
|
||||
|
||||
async def test_callback_failure_denies_safely(
|
||||
self,
|
||||
mock_client: MagicMock,
|
||||
) -> None:
|
||||
"""A callback that raises must result in denial, not in tool execution."""
|
||||
from agent_framework import Content, tool
|
||||
|
||||
invocations: list[Any] = []
|
||||
|
||||
def boom(call: Content) -> bool:
|
||||
raise RuntimeError("nope")
|
||||
|
||||
@tool(approval_mode="always_require")
|
||||
def dangerous(x: int) -> str:
|
||||
"""A tool that requires human approval."""
|
||||
invocations.append(x)
|
||||
return f"x={x}"
|
||||
|
||||
agent = GitHubCopilotAgent(
|
||||
client=mock_client,
|
||||
default_options={"on_function_approval": boom},
|
||||
)
|
||||
copilot_tool = agent._tool_to_copilot_tool(dangerous) # type: ignore[reportPrivateUsage]
|
||||
|
||||
result = await copilot_tool.handler(ToolInvocation(arguments={"x": 1}))
|
||||
|
||||
assert invocations == []
|
||||
assert result.result_type == "failure"
|
||||
assert result.error == "approval_denied"
|
||||
|
||||
async def test_handler_does_not_invoke_callback_for_never_require(
|
||||
self,
|
||||
mock_client: MagicMock,
|
||||
) -> None:
|
||||
"""Tools without approval_mode='always_require' must not trigger the callback."""
|
||||
from agent_framework import Content, tool
|
||||
|
||||
callback_calls: list[Any] = []
|
||||
|
||||
def approve(call: Content) -> bool:
|
||||
callback_calls.append(call)
|
||||
return True
|
||||
|
||||
@tool
|
||||
def safe(x: int) -> str:
|
||||
"""A tool that does not require approval."""
|
||||
return f"safe={x}"
|
||||
|
||||
agent = GitHubCopilotAgent(
|
||||
client=mock_client,
|
||||
default_options={"on_function_approval": approve},
|
||||
)
|
||||
copilot_tool = agent._tool_to_copilot_tool(safe) # type: ignore[reportPrivateUsage]
|
||||
|
||||
result = await copilot_tool.handler(ToolInvocation(arguments={"x": 5}))
|
||||
|
||||
assert callback_calls == []
|
||||
assert result.result_type == "success"
|
||||
assert result.text_result_for_llm == "safe=5"
|
||||
|
||||
|
||||
class TestGitHubCopilotAgentErrorHandling:
|
||||
"""Test cases for error handling."""
|
||||
|
||||
@@ -2128,3 +2305,16 @@ class TestGitHubCopilotAgentContextProviders:
|
||||
await agent.run("Hello", session=session, options={"timeout": 120})
|
||||
|
||||
assert observed_options.get("timeout") == 120
|
||||
|
||||
async def test_runtime_on_function_approval_rejected(self, mock_client: MagicMock) -> None:
|
||||
"""Passing on_function_approval at runtime must raise rather than be silently ignored."""
|
||||
agent = GitHubCopilotAgent(client=mock_client)
|
||||
with pytest.raises(ValueError, match="on_function_approval"):
|
||||
await agent.run("hello", options={"on_function_approval": lambda _c: True})
|
||||
|
||||
async def test_runtime_on_function_approval_rejected_streaming(self, mock_client: MagicMock) -> None:
|
||||
"""Passing on_function_approval at runtime must raise on the streaming path too."""
|
||||
agent = GitHubCopilotAgent(client=mock_client)
|
||||
with pytest.raises(ValueError, match="on_function_approval"):
|
||||
async for _ in agent.run("hello", stream=True, options={"on_function_approval": lambda _c: True}):
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user