Python: Add support for function approval flow in Foundry hosted agent (#5666)

* Add support for function approval flow in Foundry hosted agent

* Address comments

* Address comments

* Address comments
This commit is contained in:
Tao Chen
2026-05-07 07:55:26 -07:00
committed by GitHub
Unverified
parent a95493a909
commit 213491da66
5 changed files with 722 additions and 152 deletions
@@ -7,8 +7,11 @@ import base64
import json
import logging
import os
import tempfile
import threading
from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence
from typing import cast
from contextlib import suppress
from typing import Protocol, cast
from agent_framework import (
ChatOptions,
@@ -109,11 +112,105 @@ from typing_extensions import Any
logger = logging.getLogger(__name__)
class ApprovalStorage(Protocol):
"""Storage for saving function approval requests."""
async def save_approval_request(self, approval_request_id: str, request: Content) -> None:
"""Save a function approval request under the given ID."""
...
async def load_approval_request(self, approval_request_id: str) -> Content:
"""Load a function approval request by its ID."""
...
class InMemoryFunctionApprovalStorage:
"""An in-memory storage for function approval requests."""
def __init__(self) -> None:
self._store: dict[str, Content] = {}
async def save_approval_request(self, approval_request_id: str, request: Content) -> None:
if approval_request_id in self._store:
raise ValueError(f"Approval request with ID '{approval_request_id}' already exists.")
self._store[approval_request_id] = request
async def load_approval_request(self, approval_request_id: str) -> Content:
if approval_request_id not in self._store:
raise KeyError(f"Approval request with ID '{approval_request_id}' does not exist.")
return self._store[approval_request_id]
class FileBasedFunctionApprovalStorage:
"""A simple file-based storage for function approval requests.
Concurrent writes from multiple threads in the same process are
serialized by a ``threading.Lock``, and the on-disk JSON file is
updated atomically (write to a temp file, then ``os.replace``) so a
crash mid-write cannot leave a partially written file behind.
"""
def __init__(self, storage_path: str) -> None:
self._storage_path = storage_path
self._lock = threading.Lock()
def _create_storage_file_if_not_exists_sync(self) -> None:
"""Lazy-create the storage file (and its parent directory) if it does not already exist.
Uses exclusive-create mode (``"x"``) so a concurrent creator cannot
be truncated by an ``open(..., "w")`` after a stale existence check.
"""
os.makedirs(os.path.dirname(self._storage_path) or ".", exist_ok=True)
with suppress(FileExistsError), open(self._storage_path, "x") as f:
json.dump({}, f)
def _atomic_write(self, data: dict[str, Any]) -> None:
"""Atomically replace the storage file with the serialized ``data``."""
directory = os.path.dirname(self._storage_path) or "."
# Serialize first so any error doesn't leave a partial file behind.
serialized = json.dumps(data)
fd, tmp_path = tempfile.mkstemp(prefix=".approvals-", suffix=".tmp", dir=directory)
try:
with os.fdopen(fd, "w") as tmp:
tmp.write(serialized)
os.replace(tmp_path, self._storage_path)
except BaseException:
with suppress(OSError):
os.unlink(tmp_path)
raise
def _save_sync(self, approval_request_id: str, request: Content) -> None:
with self._lock:
self._create_storage_file_if_not_exists_sync()
with open(self._storage_path) as f:
data = json.load(f)
if approval_request_id in data:
raise ValueError(f"Approval request with ID '{approval_request_id}' already exists.")
data[approval_request_id] = request.to_dict()
self._atomic_write(data)
def _load_sync(self, approval_request_id: str) -> Content:
with self._lock:
self._create_storage_file_if_not_exists_sync()
with open(self._storage_path) as f:
data = json.load(f)
if approval_request_id not in data:
raise KeyError(f"Approval request with ID '{approval_request_id}' does not exist.")
return Content.from_dict(data[approval_request_id])
async def save_approval_request(self, approval_request_id: str, request: Content) -> None:
await asyncio.to_thread(self._save_sync, approval_request_id, request)
async def load_approval_request(self, approval_request_id: str) -> Content:
return await asyncio.to_thread(self._load_sync, approval_request_id)
class ResponsesHostServer(ResponsesAgentServerHost):
"""A responses server host for an agent."""
# TODO(@taochen): Allow a different checkpoint storage that stores checkpoints externally
CHECKPOINT_STORAGE_PATH = "/.checkpoints"
FUNCTION_APPROVAL_STORAGE_PATH = "/.function_approvals/approval_requests.json"
def __init__(
self,
@@ -171,6 +268,11 @@ class ResponsesHostServer(ResponsesAgentServerHost):
self._is_workflow_agent = True
self._agent = agent
self._approval_storage = (
FileBasedFunctionApprovalStorage(self.FUNCTION_APPROVAL_STORAGE_PATH)
if self.config.is_hosted
else InMemoryFunctionApprovalStorage()
)
self.response_handler(self._handle_response) # pyright: ignore[reportUnknownMemberType]
async def _handle_response(
@@ -192,10 +294,15 @@ class ResponsesHostServer(ResponsesAgentServerHost):
) -> AsyncIterable[ResponseStreamEvent | dict[str, Any]]:
"""Handle the creation of a response for a regular (non-workflow) agent."""
input_items = await context.get_input_items()
input_messages = _items_to_messages(input_items)
input_messages = await _items_to_messages(input_items, approval_storage=self._approval_storage)
history = await context.get_history()
run_kwargs: dict[str, Any] = {"messages": [*_output_items_to_messages(history), *input_messages]}
run_kwargs: dict[str, Any] = {
"messages": [
*(await _output_items_to_messages(history, approval_storage=self._approval_storage)),
*input_messages,
]
}
is_streaming_request = request.stream is not None and request.stream is True
chat_options, are_options_set = _to_chat_options(request)
@@ -216,7 +323,11 @@ class ResponsesHostServer(ResponsesAgentServerHost):
for message in response.messages:
for content in message.contents:
async for item in _to_outputs(response_event_stream, content):
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
yield response_event_stream.emit_completed()
@@ -232,7 +343,11 @@ class ResponsesHostServer(ResponsesAgentServerHost):
for event in tracker.handle(content):
yield event
if tracker.needs_async:
async for item in _to_outputs(response_event_stream, content):
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
tracker.needs_async = False
@@ -254,7 +369,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
by the hosting infrastructure or files will be preserved upon deactivation.
"""
input_items = await context.get_input_items()
input_messages = _items_to_messages(input_items)
input_messages = await _items_to_messages(input_items)
is_streaming_request = request.stream is not None and request.stream is True
_, are_options_set = _to_chat_options(request)
@@ -581,26 +696,32 @@ def _to_chat_options(request: CreateResponse) -> tuple[ChatOptions, bool]:
# region Input Message Conversion
def _items_to_messages(input_items: Sequence[Item]) -> list[Message]:
async def _items_to_messages(
input_items: Sequence[Item], *, approval_storage: ApprovalStorage | None = None
) -> list[Message]:
"""Converts a sequence of input items to a list of Messages, one per item.
Args:
input_items: The input items to convert.
approval_storage: An optional ApprovalStorage instance used to look up
approval requests when converting MCP approval response items.
Returns:
A list of Messages, one per supported input item.
"""
messages: list[Message] = []
for item in input_items:
messages.append(_item_to_message(item))
messages.append(await _item_to_message(item, approval_storage=approval_storage))
return messages
def _item_to_message(item: Item) -> Message:
async def _item_to_message(item: Item, *, approval_storage: ApprovalStorage | None = None) -> Message:
"""Converts an Item to a Message.
Args:
item: The Item to convert.
approval_storage: An optional ApprovalStorage instance used to look up
approval requests when converting MCP approval response items.
Returns:
The converted Message.
@@ -659,27 +780,26 @@ def _item_to_message(item: Item) -> Message:
if item.type == "mcp_approval_request":
mcp_req = cast(ItemMcpApprovalRequest, item)
mcp_call_content = Content.from_mcp_server_tool_call(
mcp_req.id,
mcp_req.name,
server_name=mcp_req.server_label,
arguments=mcp_req.arguments,
)
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(mcp_req.id)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="assistant",
contents=[Content.from_function_approval_request(mcp_req.id, mcp_call_content)],
contents=[function_approval_request_content],
)
if item.type == "mcp_approval_response":
mcp_resp = cast(MCPApprovalResponse, item)
placeholder_content = Content.from_function_call(mcp_resp.approval_request_id, "mcp_approval")
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(
mcp_resp.approval_request_id
)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="user",
contents=[
Content.from_function_approval_response(
mcp_resp.approve, mcp_resp.approval_request_id, placeholder_content
)
],
contents=[function_approval_request_content.to_function_approval_response(mcp_resp.approve)],
)
if item.type == "code_interpreter_call":
@@ -846,26 +966,34 @@ def _item_to_message(item: Item) -> Message:
raise ValueError(f"Unsupported Item type: {item.type}")
def _output_items_to_messages(history: Sequence[OutputItem]) -> list[Message]:
async def _output_items_to_messages(
history: Sequence[OutputItem],
*,
approval_storage: ApprovalStorage | None = None,
) -> list[Message]:
"""Converts a sequence of OutputItem objects to a list of Message objects.
Args:
history (Sequence[OutputItem]): The sequence of OutputItem objects to convert.
approval_storage (ApprovalStorage | None, optional): The approval storage to use for
resolving MCP approval requests. Defaults to None.
Returns:
list[Message]: The list of Message objects.
"""
messages: list[Message] = []
for item in history:
messages.append(_output_item_to_message(item))
messages.append(await _output_item_to_message(item, approval_storage=approval_storage))
return messages
def _output_item_to_message(item: OutputItem) -> Message:
async def _output_item_to_message(item: OutputItem, *, approval_storage: ApprovalStorage | None = None) -> Message:
"""Converts an OutputItem to a Message.
Args:
item (OutputItem): The OutputItem to convert.
approval_storage (ApprovalStorage | None, optional): The approval storage to use for
resolving MCP approval requests. Defaults to None.
Returns:
Message: The converted Message.
@@ -922,24 +1050,27 @@ def _output_item_to_message(item: OutputItem) -> Message:
if item.type == "mcp_approval_request":
mcp_req = cast(OutputItemMcpApprovalRequest, item)
mcp_call_content = Content.from_mcp_server_tool_call(
mcp_req.id,
mcp_req.name,
server_name=mcp_req.server_label,
arguments=mcp_req.arguments,
)
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(mcp_req.id)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="assistant",
contents=[Content.from_function_approval_request(mcp_req.id, mcp_call_content)],
contents=[function_approval_request_content],
)
if item.type == "mcp_approval_response":
mcp_resp = cast(OutputItemMcpApprovalResponseResource, item)
# Build a placeholder function_call Content since the original call details are not available
placeholder_content = Content.from_function_call(mcp_resp.approval_request_id, "mcp_approval")
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(
mcp_resp.approval_request_id
)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="user",
contents=[Content.from_function_approval_response(mcp_resp.approve, mcp_resp.id, placeholder_content)],
contents=[function_approval_request_content.to_function_approval_response(mcp_resp.approve)],
)
if item.type == "code_interpreter_call":
@@ -1237,12 +1368,18 @@ def _arguments_to_str(arguments: str | Mapping[str, Any] | None) -> str:
return json.dumps(arguments)
async def _to_outputs(stream: ResponseEventStream, content: Content) -> AsyncIterator[ResponseStreamEvent]:
async def _to_outputs(
stream: ResponseEventStream,
content: Content,
*,
approval_storage: ApprovalStorage | None = None,
) -> AsyncIterator[ResponseStreamEvent]:
"""Converts a Content object to an async sequence of ResponseStreamEvent objects.
Args:
stream: The ResponseEventStream to use for building events.
content: The Content to convert.
approval_storage: An optional ApprovalStorage instance to use for saving and loading function approval requests.
Yields:
ResponseStreamEvent: The converted event objects.
@@ -1320,6 +1457,31 @@ async def _to_outputs(stream: ResponseEventStream, content: Content) -> AsyncIte
max_output_length=content.max_output_length,
):
yield event
elif content.type == "function_approval_request":
function_call: Content = content.function_call # type: ignore
server_label = function_call.additional_properties.get("server_label", "agent_framework")
request_saved = False
async for event in stream.aoutput_item_mcp_approval_request(
server_label,
function_call.name, # type: ignore
_arguments_to_str(function_call.arguments),
):
if approval_storage is not None and not request_saved:
# Extract the approval request ID generated by the infrastructure
# when the approval request item is added to the stream. Save the
# approval request to the approval storage so it can be retrieved later
# for round trips where the original approval request needs to be looked up.
item = getattr(event, "item", None)
if item is not None and getattr(item, "id", None) is not None:
approval_request_id = cast(str, item.id) # type: ignore
await approval_storage.save_approval_request(approval_request_id, content)
request_saved = True
yield event
if approval_storage is not None and not request_saved:
logger.warning(
"Approval request was not saved to approval storage because the approval request ID "
"could not be extracted from the stream event."
)
else:
# Log a warning for unsupported content types instead of raising an error to avoid breaking the response stream.
logger.warning(f"Content type '{content.type}' is not supported yet. This is usually safe to ignore.")
File diff suppressed because it is too large Load Diff
@@ -157,7 +157,7 @@ cd agent-framework/python/samples/04-hosting/foundry-hosted-agents/responses
2. Install dependencies:
```bash
pip install -r requirements.txt
uv pip install -r requirements.txt
```
3. Create a `.env` file with your Foundry configuration following the `env.example` file in the sample.
@@ -10,6 +10,16 @@ The agent uses `FoundryChatClient` from the Agent Framework to create a Response
See [main.py](main.py) for the full implementation.
### Tools
Local tools are Python functions decorated with the Agent Framework's `@tool` decorator and registered with the agent. When the model chooses to call a tool during a conversation, the agent executes the corresponding function and returns the result to the model.
Each tool can be configured with one of two approval modes: **always_require** or **never_require**. With **always_require**, the agent requests explicit user approval before every invocation; with **never_require**, the agent invokes the tool automatically. To illustrate both behaviors, this sample defines two tools—one using `always_require` and the other using `never_require`.
When a tool is set to `always_require`, the agent host emits an `mcp_approval_request` output containing the approval request ID and details of the pending tool call. The client must reply with an `mcp_approval_response` indicating the same request ID and whether the user approved or denied the call before the agent will proceed.
> IMPORTANT: We are temporarily reusing the **mcp_approval_request** and **mcp_approval_response** message types defined in the [AzureAI AgentServer SDK](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-responses/docs/handler-implementation-guide.md#other-tool-call-types) because they map closely to this approval flow. They will likely be superseded by a more formal tool-approval content type in the Responses protocol in the future.
### Agent Hosting
The agent is hosted using the [Agent Framework](https://github.com/microsoft/agent-framework) with the `ResponsesHostServer`, which provisions a REST API endpoint compatible with the OpenAI Responses protocol.
@@ -28,6 +38,24 @@ Send a POST request to the server with a JSON body containing an `"input"` field
curl -X POST http://localhost:8088/responses -H "Content-Type: application/json" -d '{"input": "What is the weather in Seattle?"}'
```
Send a POST request that triggers a tool call configured with `always_require` to see the approval flow in action:
```bash
curl -X POST http://localhost:8088/responses -H "Content-Type: application/json" -d '{"input": "List all the files in the current directory."}'
```
Sample output:
```bash
{"id":"caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG","object":"response","output":[{"type":"function_call","id":"fc_3b6cba8c972b1d2f00JIAQktGC1upcB6Dgxp1AVVLp0MoyRTX4","call_id":"call_hWwwZ8lqVQCAuo8ZyY4LXIya","name":"run_bash","arguments":"{\"command\":\"ls -la\"}","status":"completed","response_id":"caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG","agent_reference":null},{"type":"mcp_approval_request","id":"mcpr_3b6cba8c972b1d2f00IdqsjB6iidFmtsuYp6oI1AoAtUKQZxje","server_label":"agent_framework","name":"run_bash","arguments":"{\"command\":\"ls -la\"}","response_id":"caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG","agent_reference":null}],"created_at":1778021855,"model":"","status":"completed","completed_at":1778021865,"response_id":"caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG","agent_reference":{"type":"agent_reference"},"agent_session_id":"8caaaa19598306a1f2fb6d8939ef06874c52c63a83b57681ea4e4b75cf6a179","background":false}
```
To approve:
```bash
curl -X POST http://localhost:8088/responses -H "Content-Type: application/json" -d '{"input": [{"type": "mcp_approval_response", "approval_request_id": "mcpr_3b6cba8c972b1d2f00IdqsjB6iidFmtsuYp6oI1AoAtUKQZxje", "approve": true}], "previous_response_id": "caresp_3b6cba8c972b1d2f00bXmjpUGzfgSFsmgjtlgqUwqvROwl5lyG"}'
```
## Deploying the Agent to Foundry
To host the agent on Foundry, follow the instructions in the [Deploying the Agent to Foundry](../../README.md#deploying-the-agent-to-foundry) section of the README in the parent directory.
@@ -25,7 +25,7 @@ def get_weather(
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
@tool(approval_mode="never_require")
@tool(approval_mode="always_require")
def run_bash(command: str) -> str:
"""Execute a shell command locally and return stdout, stderr, and exit code."""
try: