Add support for function approval flow in Foundry hosted agent

This commit is contained in:
Tao Chen
2026-05-05 16:24:30 -07:00
Unverified
parent e558d36ff6
commit 81e0043fa3
4 changed files with 692 additions and 151 deletions
@@ -8,7 +8,7 @@ import json
import logging
import os
from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence
from typing import cast
from typing import Protocol, cast
from agent_framework import (
ChatOptions,
@@ -109,11 +109,82 @@ from typing_extensions import Any
logger = logging.getLogger(__name__)
class ApprovalStorage(Protocol):
"""Storage for saving function approval requests."""
async def save_approval_request(self, approval_request_id: str, request: Content) -> None:
"""Save a function approval request under the given ID."""
...
async def load_approval_request(self, approval_request_id: str) -> Content:
"""Load a function approval request by its ID."""
...
class InMemoryFunctionApprovalStorage:
"""An in-memory storage for function approval requests."""
def __init__(self) -> None:
self._store: dict[str, Content] = {}
async def save_approval_request(self, approval_request_id: str, request: Content) -> None:
if approval_request_id in self._store:
raise ValueError(f"Approval request with ID '{approval_request_id}' already exists.")
self._store[approval_request_id] = request
async def load_approval_request(self, approval_request_id: str) -> Content:
if approval_request_id not in self._store:
raise KeyError(f"Approval request with ID '{approval_request_id}' does not exist.")
return self._store[approval_request_id]
class FileBasedFunctionApprovalStorage:
"""A simple file-based storage for function approval requests."""
def __init__(self, storage_path: str) -> None:
self._storage_path = storage_path
def _create_storage_file_if_not_exists_sync(self) -> None:
"""Lazy-create the storage file (and its parent directory) if it does not already exist."""
if not os.path.exists(self._storage_path):
os.makedirs(os.path.dirname(self._storage_path), exist_ok=True)
with open(self._storage_path, "w") as f:
json.dump({}, f)
def _save_sync(self, approval_request_id: str, request: Content) -> None:
self._create_storage_file_if_not_exists_sync()
with open(self._storage_path, "r+") as f:
data = json.load(f)
if approval_request_id in data:
raise ValueError(f"Approval request with ID '{approval_request_id}' already exists.")
data[approval_request_id] = request.to_dict()
# Serialize to a string first so any error doesn't leave the file in a partially written state.
serialized = json.dumps(data)
f.seek(0)
f.write(serialized)
f.truncate()
def _load_sync(self, approval_request_id: str) -> Content:
self._create_storage_file_if_not_exists_sync()
with open(self._storage_path) as f:
data = json.load(f)
if approval_request_id not in data:
raise KeyError(f"Approval request with ID '{approval_request_id}' does not exist.")
return Content.from_dict(data[approval_request_id])
async def save_approval_request(self, approval_request_id: str, request: Content) -> None:
await asyncio.to_thread(self._save_sync, approval_request_id, request)
async def load_approval_request(self, approval_request_id: str) -> Content:
return await asyncio.to_thread(self._load_sync, approval_request_id)
class ResponsesHostServer(ResponsesAgentServerHost):
"""A responses server host for an agent."""
# TODO(@taochen): Allow a different checkpoint storage that stores checkpoints externally
CHECKPOINT_STORAGE_PATH = "/.checkpoints"
FUNCTION_APPROVAL_STORAGE_PATH = "/.function_approvals/approval_requests.json"
def __init__(
self,
@@ -171,6 +242,11 @@ class ResponsesHostServer(ResponsesAgentServerHost):
self._is_workflow_agent = True
self._agent = agent
self._approval_storage = (
FileBasedFunctionApprovalStorage(self.FUNCTION_APPROVAL_STORAGE_PATH.lstrip("/"))
if self.config.is_hosted
else InMemoryFunctionApprovalStorage()
)
self.response_handler(self._handle_response) # pyright: ignore[reportUnknownMemberType]
async def _handle_response(
@@ -192,10 +268,15 @@ class ResponsesHostServer(ResponsesAgentServerHost):
) -> AsyncIterable[ResponseStreamEvent | dict[str, Any]]:
"""Handle the creation of a response for a regular (non-workflow) agent."""
input_items = await context.get_input_items()
input_messages = _items_to_messages(input_items)
input_messages = await _items_to_messages(input_items, approval_storage=self._approval_storage)
history = await context.get_history()
run_kwargs: dict[str, Any] = {"messages": [*_output_items_to_messages(history), *input_messages]}
run_kwargs: dict[str, Any] = {
"messages": [
*(await _output_items_to_messages(history, approval_storage=self._approval_storage)),
*input_messages,
]
}
is_streaming_request = request.stream is not None and request.stream is True
chat_options, are_options_set = _to_chat_options(request)
@@ -216,7 +297,11 @@ class ResponsesHostServer(ResponsesAgentServerHost):
for message in response.messages:
for content in message.contents:
async for item in _to_outputs(response_event_stream, content):
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
yield response_event_stream.emit_completed()
@@ -232,7 +317,11 @@ class ResponsesHostServer(ResponsesAgentServerHost):
for event in tracker.handle(content):
yield event
if tracker.needs_async:
async for item in _to_outputs(response_event_stream, content):
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
tracker.needs_async = False
@@ -254,7 +343,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
by the hosting infrastructure or files will be preserved upon deactivation.
"""
input_items = await context.get_input_items()
input_messages = _items_to_messages(input_items)
input_messages = await _items_to_messages(input_items)
is_streaming_request = request.stream is not None and request.stream is True
_, are_options_set = _to_chat_options(request)
@@ -581,26 +670,32 @@ def _to_chat_options(request: CreateResponse) -> tuple[ChatOptions, bool]:
# region Input Message Conversion
def _items_to_messages(input_items: Sequence[Item]) -> list[Message]:
async def _items_to_messages(
input_items: Sequence[Item], *, approval_storage: ApprovalStorage | None = None
) -> list[Message]:
"""Converts a sequence of input items to a list of Messages, one per item.
Args:
input_items: The input items to convert.
approval_storage: An optional ApprovalStorage instance used to look up
approval requests when converting MCP approval response items.
Returns:
A list of Messages, one per supported input item.
"""
messages: list[Message] = []
for item in input_items:
messages.append(_item_to_message(item))
messages.append(await _item_to_message(item, approval_storage=approval_storage))
return messages
def _item_to_message(item: Item) -> Message:
async def _item_to_message(item: Item, *, approval_storage: ApprovalStorage | None = None) -> Message:
"""Converts an Item to a Message.
Args:
item: The Item to convert.
approval_storage: An optional ApprovalStorage instance used to look up
approval requests when converting MCP approval response items.
Returns:
The converted Message.
@@ -659,27 +754,26 @@ def _item_to_message(item: Item) -> Message:
if item.type == "mcp_approval_request":
mcp_req = cast(ItemMcpApprovalRequest, item)
mcp_call_content = Content.from_mcp_server_tool_call(
mcp_req.id,
mcp_req.name,
server_name=mcp_req.server_label,
arguments=mcp_req.arguments,
)
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(mcp_req.id)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="assistant",
contents=[Content.from_function_approval_request(mcp_req.id, mcp_call_content)],
contents=[function_approval_request_content],
)
if item.type == "mcp_approval_response":
mcp_resp = cast(MCPApprovalResponse, item)
placeholder_content = Content.from_function_call(mcp_resp.approval_request_id, "mcp_approval")
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(
mcp_resp.approval_request_id
)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="user",
contents=[
Content.from_function_approval_response(
mcp_resp.approve, mcp_resp.approval_request_id, placeholder_content
)
],
contents=[function_approval_request_content.to_function_approval_response(mcp_resp.approve)],
)
if item.type == "code_interpreter_call":
@@ -846,26 +940,34 @@ def _item_to_message(item: Item) -> Message:
raise ValueError(f"Unsupported Item type: {item.type}")
def _output_items_to_messages(history: Sequence[OutputItem]) -> list[Message]:
async def _output_items_to_messages(
history: Sequence[OutputItem],
*,
approval_storage: ApprovalStorage | None = None,
) -> list[Message]:
"""Converts a sequence of OutputItem objects to a list of Message objects.
Args:
history (Sequence[OutputItem]): The sequence of OutputItem objects to convert.
approval_storage (ApprovalStorage | None, optional): The approval storage to use for
resolving MCP approval requests. Defaults to None.
Returns:
list[Message]: The list of Message objects.
"""
messages: list[Message] = []
for item in history:
messages.append(_output_item_to_message(item))
messages.append(await _output_item_to_message(item, approval_storage=approval_storage))
return messages
def _output_item_to_message(item: OutputItem) -> Message:
async def _output_item_to_message(item: OutputItem, *, approval_storage: ApprovalStorage | None = None) -> Message:
"""Converts an OutputItem to a Message.
Args:
item (OutputItem): The OutputItem to convert.
approval_storage (ApprovalStorage | None, optional): The approval storage to use for
resolving MCP approval requests. Defaults to None.
Returns:
Message: The converted Message.
@@ -922,24 +1024,27 @@ def _output_item_to_message(item: OutputItem) -> Message:
if item.type == "mcp_approval_request":
mcp_req = cast(OutputItemMcpApprovalRequest, item)
mcp_call_content = Content.from_mcp_server_tool_call(
mcp_req.id,
mcp_req.name,
server_name=mcp_req.server_label,
arguments=mcp_req.arguments,
)
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(mcp_req.id)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="assistant",
contents=[Content.from_function_approval_request(mcp_req.id, mcp_call_content)],
contents=[function_approval_request_content],
)
if item.type == "mcp_approval_response":
mcp_resp = cast(OutputItemMcpApprovalResponseResource, item)
# Build a placeholder function_call Content since the original call details are not available
placeholder_content = Content.from_function_call(mcp_resp.approval_request_id, "mcp_approval")
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(
mcp_resp.approval_request_id
)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="user",
contents=[Content.from_function_approval_response(mcp_resp.approve, mcp_resp.id, placeholder_content)],
contents=[function_approval_request_content.to_function_approval_response(mcp_resp.approve)],
)
if item.type == "code_interpreter_call":
@@ -1237,12 +1342,18 @@ def _arguments_to_str(arguments: str | Mapping[str, Any] | None) -> str:
return json.dumps(arguments)
async def _to_outputs(stream: ResponseEventStream, content: Content) -> AsyncIterator[ResponseStreamEvent]:
async def _to_outputs(
stream: ResponseEventStream,
content: Content,
*,
approval_storage: ApprovalStorage | None = None,
) -> AsyncIterator[ResponseStreamEvent]:
"""Converts a Content object to an async sequence of ResponseStreamEvent objects.
Args:
stream: The ResponseEventStream to use for building events.
content: The Content to convert.
approval_storage: An optional ApprovalStorage instance to use for saving and loading function approval requests.
Yields:
ResponseStreamEvent: The converted event objects.
@@ -1320,6 +1431,28 @@ async def _to_outputs(stream: ResponseEventStream, content: Content) -> AsyncIte
max_output_length=content.max_output_length,
):
yield event
elif content.type == "function_approval_request":
function_call: Content = content.function_call # type: ignore
server_label = function_call.additional_properties.get("server_label", "agent_framework")
approval_request_id: str | None = None
async for event in stream.aoutput_item_mcp_approval_request(
server_label,
function_call.name, # type: ignore
_arguments_to_str(function_call.arguments),
):
yield event
# Extract the approval request ID generated by the infrastructure
# when the approval request item is added to the stream
if (
getattr(event, "item", None) is not None
and getattr(event.item, "id", None) is not None # type: ignore
and approval_request_id is None
):
approval_request_id = cast(str, event.item.id) # type: ignore
# Save the approval request to the approval storage so it can be retrieved later
# for round trips where the original approval request needs to be looked up
if approval_request_id is not None and approval_storage is not None:
await approval_storage.save_approval_request(approval_request_id, content)
else:
# Log a warning for unsupported content types instead of raising an error to avoid breaking the response stream.
logger.warning(f"Content type '{content.type}' is not supported yet. This is usually safe to ignore.")
File diff suppressed because it is too large Load Diff
@@ -10,6 +10,16 @@ The agent uses `FoundryChatClient` from the Agent Framework to create a Response
See [main.py](main.py) for the full implementation.
### Tools
Local tools are Python functions decorated with the Agent Framework's `@tool` decorator and registered with the agent. When the model chooses to call a tool during a conversation, the agent executes the corresponding function and returns the result to the model.
Each tool can be configured with one of two approval modes: **always_require** or **never_require**. With **always_require**, the agent requests explicit user approval before every invocation; with **never_require**, the agent invokes the tool automatically. To illustrate both behaviors, this sample defines two tools—one using `always_require` and the other using `never_require`.
When a tool is set to `always_require`, the agent host emits an `mcp_approval_request` output containing the approval request ID and details of the pending tool call. The client must reply with an `mcp_approval_response` indicating the same request ID and whether the user approved or denied the call before the agent will proceed.
> IMPORTANT: We are temporarily reusing the **mcp_approval_request** and **mcp_approval_response** message types defined in the [AzureAI AgentServer SDK](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-responses/docs/handler-implementation-guide.md#other-tool-call-types) because they map closely to this approval flow. They will likely be superseded by a more formal tool-approval content type in the Responses protocol in the future.
### Agent Hosting
The agent is hosted using the [Agent Framework](https://github.com/microsoft/agent-framework) with the `ResponsesHostServer`, which provisions a REST API endpoint compatible with the OpenAI Responses protocol.
@@ -28,6 +38,24 @@ Send a POST request to the server with a JSON body containing an `"input"` field
curl -X POST http://localhost:8088/responses -H "Content-Type: application/json" -d '{"input": "What is the weather in Seattle?"}'
```
Send a POST request that triggers a tool call configured with `always_require` to see the approval flow in action:
```bash
curl -X POST http://localhost:8088/responses -H "Content-Type: application/json" -d '{"input": "List all the files in the current directory."}'
```
Sample output:
```bash
{"id":"caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG","object":"response","output":[{"type":"function_call","id":"fc_3b6cba8c972b1d2f00JIAQktGC1upcB6Dgxp1AVVLp0MoyRTX4","call_id":"call_hWwwZ8lqVQCAuo8ZyY4LXIya","name":"run_bash","arguments":"{\"command\":\"ls -la\"}","status":"completed","response_id":"caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG","agent_reference":null},{"type":"mcp_approval_request","id":"mcpr_3b6cba8c972b1d2f00IdqsjB6iidFmtsuYp6oI1AoAtUKQZxje","server_label":"agent_framework","name":"run_bash","arguments":"{\"command\":\"ls -la\"}","response_id":"caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG","agent_reference":null}],"created_at":1778021855,"model":"","status":"completed","completed_at":1778021865,"response_id":"caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG","agent_reference":{"type":"agent_reference"},"agent_session_id":"8caaaa19598306a1f2fb6d8939ef06874c52c63a83b57681ea4e4b75cf6a179","background":false}
```
To approve:
```bash
curl -X POST http://localhost:8088/responses -H "Content-Type: application/json" -d '{"input": [{"type": "mcp_approval_response", "approval_request_id": "mcpr_3b6cba8c972b1d2f00IdqsjB6iidFmtsuYp6oI1AoAtUKQZxje", "approve": true}], "previous_response_id": "caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG"}'
```
## Deploying the Agent to Foundry
To host the agent on Foundry, follow the instructions in the [Deploying the Agent to Foundry](../../README.md#deploying-the-agent-to-foundry) section of the README in the parent directory.
@@ -25,7 +25,7 @@ def get_weather(
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
@tool(approval_mode="never_require")
@tool(approval_mode="always_require")
def run_bash(command: str) -> str:
"""Execute a shell command locally and return stdout, stderr, and exit code."""
try: