Python: Introducing AI Function approval (#1131)

* support for local function approval

* small fix

* fix mypy

* added bigger test scenario's for function calling and approvals

* updated lock

* updated return message for rejection

* fix test

* updated function result content handling
This commit is contained in:
Eduard van Valkenburg
2025-10-04 17:19:16 +02:00
committed by GitHub
Unverified
parent 01f438d710
commit fd819c6c02
18 changed files with 1535 additions and 304 deletions
@@ -34,6 +34,7 @@ from agent_framework import (
UsageContent,
UsageDetails,
get_logger,
prepare_function_call_results,
use_chat_middleware,
use_function_invocation,
)
@@ -84,7 +85,7 @@ from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import ConnectionType
from azure.core.credentials_async import AsyncTokenCredential
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
from pydantic import BaseModel, ValidationError
from pydantic import ValidationError
if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
@@ -897,23 +898,9 @@ class AzureAIAgentClient(BaseChatClient):
if isinstance(content, FunctionResultContent):
if tool_outputs is None:
tool_outputs = []
result_contents: list[Any] = (
content.result if isinstance(content.result, list) else [content.result]
tool_outputs.append(
ToolOutput(tool_call_id=call_id, output=prepare_function_call_results(content.result))
)
results: list[Any] = []
for item in result_contents:
if isinstance(item, Contents):
results.append(
json.dumps(item.to_dict(exclude={"raw_representation", "additional_properties"}))
)
elif isinstance(item, BaseModel):
results.append(item.model_dump_json())
else:
results.append(json.dumps(item))
if len(results) == 1:
tool_outputs.append(ToolOutput(tool_call_id=call_id, output=results[0]))
else:
tool_outputs.append(ToolOutput(tool_call_id=call_id, output=json.dumps(results)))
elif isinstance(content, FunctionApprovalResponseContent):
if tool_approvals is None:
tool_approvals = []
@@ -31,6 +31,7 @@ from agent_framework import (
TextContent,
UriContent,
)
from agent_framework._serialization import SerializationMixin
from agent_framework.exceptions import ServiceInitializationError
from azure.ai.agents.models import (
CodeInterpreterToolDefinition,
@@ -1123,7 +1124,7 @@ async def test_azure_ai_chat_client_convert_required_action_to_tool_output_funct
assert tool_outputs is not None
assert len(tool_outputs) == 1
assert tool_outputs[0].tool_call_id == "call_456"
assert tool_outputs[0].output == '"Simple result"'
assert tool_outputs[0].output == "Simple result"
async def test_azure_ai_chat_client_convert_required_action_invalid_call_id(mock_ai_project_client: MagicMock) -> None:
@@ -1155,14 +1156,15 @@ async def test_azure_ai_chat_client_convert_required_action_invalid_structure(
assert tool_approvals is None
async def test_azure_ai_chat_client_convert_required_action_basemodel_results(
async def test_azure_ai_chat_client_convert_required_action_serde_model_results(
mock_ai_project_client: MagicMock,
) -> None:
"""Test _convert_required_action_to_tool_output with BaseModel results."""
class MockResult(BaseModel):
name: str
value: int
class MockResult(SerializationMixin):
def __init__(self, name: str, value: int):
self.name = name
self.value = value
chat_client = create_test_azure_ai_chat_client(mock_ai_project_client, agent_id="test-agent")
@@ -1178,7 +1180,7 @@ async def test_azure_ai_chat_client_convert_required_action_basemodel_results(
assert len(tool_outputs) == 1
assert tool_outputs[0].tool_call_id == "call_456"
# Should use model_dump_json for BaseModel
expected_json = mock_result.model_dump_json()
expected_json = mock_result.to_json()
assert tool_outputs[0].output == expected_json
@@ -1187,8 +1189,9 @@ async def test_azure_ai_chat_client_convert_required_action_multiple_results(
) -> None:
"""Test _convert_required_action_to_tool_output with multiple results."""
class MockResult(BaseModel):
data: str
class MockResult(SerializationMixin):
def __init__(self, data: str):
self.data = data
chat_client = create_test_azure_ai_chat_client(mock_ai_project_client, agent_id="test-agent")
@@ -1206,9 +1209,9 @@ async def test_azure_ai_chat_client_convert_required_action_multiple_results(
# Should JSON dump the entire results array since len > 1
expected_results = [
mock_basemodel.model_dump_json(), # BaseModel uses model_dump_json
json.dumps({"key": "value"}), # Dict uses json.dumps
json.dumps("string_result"), # String uses json.dumps
mock_basemodel.to_dict(),
{"key": "value"},
"string_result",
]
expected_output = json.dumps(expected_results)
assert tool_outputs[0].output == expected_output
+297 -87
View File
@@ -44,6 +44,7 @@ if TYPE_CHECKING:
ChatResponse,
ChatResponseUpdate,
Contents,
FunctionApprovalResponseContent,
FunctionCallContent,
)
@@ -52,6 +53,11 @@ if sys.version_info >= (3, 12):
else:
from typing_extensions import TypedDict # pragma: no cover
if sys.version_info >= (3, 11):
from typing import overload # pragma: no cover
else:
from typing_extensions import overload # pragma: no cover
logger = get_logger()
__all__ = [
@@ -547,6 +553,8 @@ class AIFunction(BaseTool, Generic[ArgsT, ReturnT]):
Args:
name: The name of the function.
description: A description of the function.
approval_mode: Whether or not approval is required to run this tool.
Default is that approval is not needed.
additional_properties: Additional properties to set on the function.
func: The function to wrap.
input_model: The Pydantic model that defines the input parameters for the function.
@@ -579,6 +587,7 @@ class AIFunction(BaseTool, Generic[ArgsT, ReturnT]):
name="get_weather",
description="Get the weather for a location",
func=lambda location, unit="celsius": f"Weather in {location}: 22°{unit[0].upper()}",
approval_mode="never_require",
input_model=WeatherArgs,
)
@@ -594,6 +603,7 @@ class AIFunction(BaseTool, Generic[ArgsT, ReturnT]):
*,
name: str,
description: str = "",
approval_mode: Literal["always_require", "never_require"] | None = None,
additional_properties: dict[str, Any] | None = None,
func: Callable[..., Awaitable[ReturnT] | ReturnT],
input_model: type[ArgsT],
@@ -604,6 +614,8 @@ class AIFunction(BaseTool, Generic[ArgsT, ReturnT]):
Args:
name: The name of the function.
description: A description of the function.
approval_mode: Whether or not approval is required to run this tool.
Default is that approval is not needed.
additional_properties: Additional properties to set on the function.
func: The function to wrap.
input_model: The Pydantic model that defines the input parameters for the function.
@@ -617,6 +629,7 @@ class AIFunction(BaseTool, Generic[ArgsT, ReturnT]):
)
self.func = func
self.input_model = input_model
self.approval_mode = approval_mode or "never_require"
self._invocation_duration_histogram = _default_histogram()
def __call__(self, *args: Any, **kwargs: Any) -> ReturnT | Awaitable[ReturnT]:
@@ -802,13 +815,36 @@ def _parse_annotation(annotation: Any) -> Any:
return annotation
@overload
def ai_function(
func: Callable[..., ReturnT | Awaitable[ReturnT]],
*,
name: str | None = None,
description: str | None = None,
approval_mode: Literal["always_require", "never_require"] | None = None,
additional_properties: dict[str, Any] | None = None,
) -> AIFunction[Any, ReturnT]: ...
@overload
def ai_function(
func: None = None,
*,
name: str | None = None,
description: str | None = None,
approval_mode: Literal["always_require", "never_require"] | None = None,
additional_properties: dict[str, Any] | None = None,
) -> Callable[[Callable[..., ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]: ...
def ai_function(
func: Callable[..., ReturnT | Awaitable[ReturnT]] | None = None,
*,
name: str | None = None,
description: str | None = None,
approval_mode: Literal["always_require", "never_require"] | None = None,
additional_properties: dict[str, Any] | None = None,
) -> AIFunction[Any, ReturnT]:
) -> AIFunction[Any, ReturnT] | Callable[[Callable[..., ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]:
"""Decorate a function to turn it into a AIFunction that can be passed to models and executed automatically.
This decorator creates a Pydantic model from the function's signature,
@@ -819,30 +855,37 @@ def ai_function(
with a string description as the second argument. You can also use Pydantic's
``Field`` class for more advanced configuration.
Args:
func: The function to wrap. If None, returns a decorator.
name: The name of the tool. Defaults to the function's name.
description: A description of the tool. Defaults to the function's docstring.
additional_properties: Additional properties to set on the tool.
Note:
When approval_mode is set to "always_require", the function will not be executed
until explicit approval is given, this only applies to the auto-invocation flow.
It is also important to note that if the model returns multiple function calls, some that require approval
and others that do not, it will ask approval for all of them.
Returns:
An AIFunction instance that wraps the decorated function.
Example:
Examples:
.. code-block:: python
from typing import Annotated
from agent_framework import ai_function
from typing import Annotated
# Using string annotations (recommended)
@ai_function
def get_weather(
location: Annotated[str, "The city name"],
unit: Annotated[str, "Temperature unit"] = "celsius",
def ai_function_example(
arg1: Annotated[str, "The first argument"],
arg2: Annotated[int, "The second argument"],
) -> str:
'''Get the weather for a location.'''
return f"Weather in {location}: 22°{unit[0].upper()}"
# An example function that takes two arguments and returns a string.
return f"arg1: {arg1}, arg2: {arg2}"
# the same function but with approval required to run
@ai_function(approval_mode="always_require")
def ai_function_example(
arg1: Annotated[str, "The first argument"],
arg2: Annotated[int, "The second argument"],
) -> str:
# An example function that takes two arguments and returns a string.
return f"arg1: {arg1}, arg2: {arg2}"
# With custom name and description
@@ -857,6 +900,7 @@ def ai_function(
'''Get weather asynchronously.'''
# Simulate async operation
return f"Weather in {location}"
"""
def decorator(func: Callable[..., ReturnT | Awaitable[ReturnT]]) -> AIFunction[Any, ReturnT]:
@@ -880,6 +924,7 @@ def ai_function(
return AIFunction[Any, ReturnT](
name=tool_name,
description=tool_desc,
approval_mode=approval_mode,
additional_properties=additional_properties or {},
func=f,
input_model=input_model,
@@ -887,14 +932,14 @@ def ai_function(
return wrapper(func)
return decorator(func) if func else decorator # type: ignore[reportReturnType, return-value]
return decorator(func) if func else decorator
# region Function Invoking Chat Client
async def _auto_invoke_function(
function_call_content: "FunctionCallContent",
function_call_content: "FunctionCallContent | FunctionApprovalResponseContent",
custom_args: dict[str, Any] | None = None,
*,
tool_map: dict[str, AIFunction[BaseModel, Any]],
@@ -918,62 +963,92 @@ async def _auto_invoke_function(
Raises:
KeyError: If the requested function is not found in the tool map.
"""
from ._types import FunctionResultContent
from ._types import (
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
FunctionCallContent,
FunctionResultContent,
)
tool: AIFunction[BaseModel, Any] | None = tool_map.get(function_call_content.name)
if tool is None:
raise KeyError(f"No tool or function named '{function_call_content.name}'")
tool: AIFunction[BaseModel, Any] | None = None
if isinstance(function_call_content, FunctionCallContent):
tool = tool_map.get(function_call_content.name)
if tool is None:
raise KeyError(f"No tool or function named '{function_call_content.name}'")
if tool.approval_mode == "always_require":
return FunctionApprovalRequestContent(id=function_call_content.call_id, function_call=function_call_content)
else:
if isinstance(function_call_content, FunctionApprovalResponseContent):
if function_call_content.approved:
tool = tool_map.get(function_call_content.function_call.name)
if tool is None:
# we assume it is a hosted tool
return function_call_content
function_call_content = function_call_content.function_call
else:
raise ToolException("Unapproved tool cannot be executed.")
parsed_args: dict[str, Any] = dict(function_call_content.parse_arguments() or {})
# Merge with user-supplied args; right-hand side dominates, so parsed args win on conflicts.
merged_args: dict[str, Any] = (custom_args or {}) | parsed_args
args = tool.input_model.model_validate(merged_args)
exception = None
# Execute through middleware pipeline if available
if middleware_pipeline and hasattr(middleware_pipeline, "has_middlewares") and middleware_pipeline.has_middlewares:
from ._middleware import FunctionInvocationContext
middleware_context = FunctionInvocationContext(
function=tool,
arguments=args,
kwargs=custom_args or {},
try:
args = tool.input_model.model_validate(merged_args)
except ValidationError as exc:
return FunctionResultContent(
call_id=function_call_content.call_id,
exception=exc,
)
async def final_function_handler(context_obj: Any) -> Any:
return await tool.invoke(
arguments=context_obj.arguments,
tool_call_id=function_call_content.call_id,
)
try:
function_result = await middleware_pipeline.execute(
function=tool,
arguments=args,
context=middleware_context,
final_handler=final_function_handler,
)
except Exception as ex:
exception = ex
function_result = None
else:
if not middleware_pipeline or (
not hasattr(middleware_pipeline, "has_middlewares") and not middleware_pipeline.has_middlewares
):
# No middleware - execute directly
try:
function_result = await tool.invoke(
arguments=args,
tool_call_id=function_call_content.call_id,
) # type: ignore[arg-type]
except Exception as ex:
exception = ex
function_result = None
return FunctionResultContent(
call_id=function_call_content.call_id,
result=function_result,
)
except Exception as exc:
return FunctionResultContent(
call_id=function_call_content.call_id,
exception=exc,
)
# Execute through middleware pipeline if available
from ._middleware import FunctionInvocationContext
return FunctionResultContent(
call_id=function_call_content.call_id,
exception=exception,
result=function_result,
middleware_context = FunctionInvocationContext(
function=tool,
arguments=args,
kwargs=custom_args or {},
)
async def final_function_handler(context_obj: Any) -> Any:
return await tool.invoke(
arguments=context_obj.arguments,
tool_call_id=function_call_content.call_id,
)
try:
function_result = await middleware_pipeline.execute(
function=tool,
arguments=args,
context=middleware_context,
final_handler=final_function_handler,
)
return FunctionResultContent(
call_id=function_call_content.call_id,
result=function_result,
)
except Exception as exc:
return FunctionResultContent(
call_id=function_call_content.call_id,
exception=exc,
)
def _get_tool_map(
tools: "ToolProtocol \
@@ -993,16 +1068,16 @@ def _get_tool_map(
return ai_function_list
async def execute_function_calls(
async def _execute_function_calls(
custom_args: dict[str, Any],
attempt_idx: int,
function_calls: Sequence["FunctionCallContent"],
function_calls: Sequence["FunctionCallContent"] | Sequence["FunctionApprovalResponseContent"],
tools: "ToolProtocol \
| Callable[..., Any] \
| MutableMapping[str, Any] \
| Sequence[ToolProtocol | Callable[..., Any] | MutableMapping[str, Any]]",
middleware_pipeline: Any = None, # Optional MiddlewarePipeline to avoid circular imports
) -> list["Contents"]:
) -> Sequence["Contents"]:
"""Execute multiple function calls concurrently.
Args:
@@ -1015,11 +1090,29 @@ async def execute_function_calls(
Returns:
A list of Contents containing the results of each function call.
"""
from ._types import FunctionApprovalRequestContent, FunctionCallContent
tool_map = _get_tool_map(tools)
approval_tools = [tool_name for tool_name, tool in tool_map.items() if tool.approval_mode == "always_require"]
# check if any are calling functions that need approval
# if so, we return approval request for all
approval_needed = False
for fcc in function_calls:
if isinstance(fcc, FunctionCallContent) and fcc.name in approval_tools:
approval_needed = True
break
if approval_needed:
# approval can only be needed for Function Call Contents, not Approval Responses.
return [
FunctionApprovalRequestContent(id=fcc.call_id, function_call=fcc)
for fcc in function_calls
if isinstance(fcc, FunctionCallContent)
]
# Run all function calls concurrently
return await asyncio.gather(*[
_auto_invoke_function(
function_call_content=function_call,
function_call_content=function_call, # type: ignore[arg-type]
custom_args=custom_args,
tool_map=tool_map,
sequence_index=seq_idx,
@@ -1045,6 +1138,63 @@ def _update_conversation_id(kwargs: dict[str, Any], conversation_id: str | None)
kwargs["conversation_id"] = conversation_id
def _extract_tools(kwargs: dict[str, Any]) -> Any:
"""Extract tools from kwargs or chat_options.
Returns:
ToolProtocol | Callable[..., Any] | MutableMapping[str, Any] |
Sequence[ToolProtocol | Callable[..., Any] | MutableMapping[str, Any]] | None
"""
from ._types import ChatOptions
tools = kwargs.get("tools")
if not tools and (chat_options := kwargs.get("chat_options")) and isinstance(chat_options, ChatOptions):
tools = chat_options.tools
return tools
def _collect_approval_todos(
messages: "list[ChatMessage]",
) -> dict[str, "FunctionApprovalResponseContent"]:
"""Collect approved function calls from messages."""
from ._types import ChatMessage, FunctionApprovalResponseContent
fcc_todo: dict[str, FunctionApprovalResponseContent] = {}
for msg in messages:
for content in msg.contents if isinstance(msg, ChatMessage) else []:
if isinstance(content, FunctionApprovalResponseContent) and content.approved:
fcc_todo[content.id] = content
return fcc_todo
def _replace_approval_contents_with_results(
messages: "list[ChatMessage]",
fcc_todo: dict[str, "FunctionApprovalResponseContent"],
approved_function_results: "list[Contents]",
) -> None:
"""Replace approval request/response contents with function call/result contents in-place."""
from ._types import FunctionApprovalRequestContent, FunctionApprovalResponseContent, FunctionResultContent
result_idx = 0
for msg in messages:
for content_idx, content in enumerate(msg.contents):
if isinstance(content, FunctionApprovalRequestContent):
# put back the function call content
msg.contents[content_idx] = content.function_call
if isinstance(content, FunctionApprovalResponseContent):
if content.approved and content.id in fcc_todo:
# Replace with the corresponding result
if result_idx < len(approved_function_results):
msg.contents[content_idx] = approved_function_results[result_idx]
result_idx += 1
else:
# Create a "not approved" result for rejected calls
msg.contents[content_idx] = FunctionResultContent(
call_id=content.id,
result="Error: Tool call invocation was rejected by user.",
)
def _handle_function_calls_response(
func: Callable[..., Awaitable["ChatResponse"]],
) -> Callable[..., Awaitable["ChatResponse"]]:
@@ -1070,7 +1220,12 @@ def _handle_function_calls_response(
) -> "ChatResponse":
from ._clients import prepare_messages
from ._middleware import extract_and_merge_function_middleware
from ._types import ChatMessage, ChatOptions, FunctionCallContent, FunctionResultContent
from ._types import (
ChatMessage,
FunctionApprovalRequestContent,
FunctionCallContent,
FunctionResultContent,
)
# Extract and merge function middleware from chat client with kwargs pipeline
extract_and_merge_function_middleware(self, kwargs)
@@ -1090,6 +1245,18 @@ def _handle_function_calls_response(
response: "ChatResponse | None" = None
fcc_messages: "list[ChatMessage]" = []
for attempt_idx in range(instance_max_iterations):
fcc_todo = _collect_approval_todos(prepped_messages)
if fcc_todo:
tools = _extract_tools(kwargs)
approved_function_results: list[Contents] = await _execute_function_calls(
custom_args=kwargs,
attempt_idx=attempt_idx,
function_calls=list(fcc_todo.values()),
tools=tools, # type: ignore
middleware_pipeline=stored_middleware_pipeline,
)
_replace_approval_contents_with_results(prepped_messages, fcc_todo, approved_function_results)
response = await func(self, messages=prepped_messages, **kwargs)
# if there are function calls, we will handle them first
function_results = {
@@ -1105,19 +1272,17 @@ def _handle_function_calls_response(
_update_conversation_id(kwargs, response.conversation_id)
prepped_messages = []
tools = kwargs.get("tools")
if not tools and (chat_options := kwargs.get("chat_options")) and isinstance(chat_options, ChatOptions):
tools = chat_options.tools
# we load the tools here, since middleware might have changed them compared to before calling func.
tools = _extract_tools(kwargs)
if function_calls and tools:
# Use the stored middleware pipeline instead of extracting from kwargs
# because kwargs may have been modified by the underlying function
middleware_pipeline = stored_middleware_pipeline
function_call_results: list[Contents] = await execute_function_calls(
function_call_results: list[Contents] = await _execute_function_calls(
custom_args=kwargs,
attempt_idx=attempt_idx,
function_calls=function_calls,
tools=tools, # type: ignore
middleware_pipeline=middleware_pipeline,
middleware_pipeline=stored_middleware_pipeline,
)
# add a single ChatMessage to the response with the results
result_message = ChatMessage(role="tool", contents=function_call_results)
@@ -1130,6 +1295,8 @@ def _handle_function_calls_response(
# we need to keep track of all function call messages
fcc_messages.extend(response.messages)
# and add them as additional context to the messages
if any(isinstance(fccr, FunctionApprovalRequestContent) for fccr in function_call_results):
return response
if getattr(kwargs.get("chat_options"), "store", False):
prepped_messages.clear()
prepped_messages.append(result_message)
@@ -1184,7 +1351,13 @@ def _handle_function_calls_streaming_response(
"""Wrap the inner get streaming response method to handle tool calls."""
from ._clients import prepare_messages
from ._middleware import extract_and_merge_function_middleware
from ._types import ChatMessage, ChatOptions, ChatResponse, ChatResponseUpdate, FunctionCallContent
from ._types import (
ChatMessage,
ChatResponse,
ChatResponseUpdate,
FunctionCallContent,
FunctionResultContent,
)
# Extract and merge function middleware from chat client with kwargs pipeline
extract_and_merge_function_middleware(self, kwargs)
@@ -1201,7 +1374,20 @@ def _handle_function_calls_streaming_response(
instance_max_iterations = getattr(self.__class__, "MAX_ITERATIONS", DEFAULT_MAX_ITERATIONS)
prepped_messages = prepare_messages(messages)
fcc_messages: "list[ChatMessage]" = []
for attempt_idx in range(instance_max_iterations):
fcc_todo = _collect_approval_todos(prepped_messages)
if fcc_todo:
tools = _extract_tools(kwargs)
approved_function_results: list[Contents] = await _execute_function_calls(
custom_args=kwargs,
attempt_idx=attempt_idx,
function_calls=list(fcc_todo.values()),
tools=tools, # type: ignore
middleware_pipeline=stored_middleware_pipeline,
)
_replace_approval_contents_with_results(prepped_messages, fcc_todo, approved_function_results)
all_updates: list["ChatResponseUpdate"] = []
async for update in func(self, messages=prepped_messages, **kwargs):
all_updates.append(update)
@@ -1210,7 +1396,13 @@ def _handle_function_calls_streaming_response(
# efficient check for FunctionCallContent in the updates
# if there is at least one, this stops and continuous
# if there are no FCC's then it returns
if not any(isinstance(item, FunctionCallContent) for upd in all_updates for item in upd.contents):
from ._types import FunctionApprovalRequestContent
if not any(
isinstance(item, (FunctionCallContent, FunctionApprovalRequestContent))
for upd in all_updates
for item in upd.contents
):
return
# Now combining the updates to create the full response.
@@ -1218,11 +1410,14 @@ def _handle_function_calls_streaming_response(
# content and others
response: "ChatResponse" = ChatResponse.from_chat_response_updates(all_updates)
# add the response message to the previous messages
prepped_messages.append(response.messages[0])
# get the fccs
# get the function calls (excluding ones that already have results)
function_results = {
it.call_id for it in response.messages[0].contents if isinstance(it, FunctionResultContent)
}
function_calls = [
item for item in response.messages[0].contents if isinstance(item, FunctionCallContent)
it
for it in response.messages[0].contents
if isinstance(it, FunctionCallContent) and it.call_id not in function_results
]
# When conversation id is present, it means that messages are hosted on the server.
@@ -1231,26 +1426,41 @@ def _handle_function_calls_streaming_response(
_update_conversation_id(kwargs, response.conversation_id)
prepped_messages = []
tools: Sequence[ToolProtocol | MutableMapping[str, Any]] | None = kwargs.get("tools")
if not tools and (chat_options := kwargs.get("chat_options")) and isinstance(chat_options, ChatOptions):
tools = chat_options.tools
# we load the tools here, since middleware might have changed them compared to before calling func.
tools = _extract_tools(kwargs)
if function_calls and tools:
# Use the stored middleware pipeline instead of extracting from kwargs
# because kwargs may have been modified by the underlying function
middleware_pipeline = stored_middleware_pipeline
function_results = await execute_function_calls(
function_call_results: list[Contents] = await _execute_function_calls(
custom_args=kwargs,
attempt_idx=attempt_idx,
function_calls=function_calls,
tools=tools,
middleware_pipeline=middleware_pipeline,
tools=tools, # type: ignore
middleware_pipeline=stored_middleware_pipeline,
)
function_result_msg = ChatMessage(role="tool", contents=function_results)
yield ChatResponseUpdate(contents=function_results, role="tool")
response.messages.append(function_result_msg)
prepped_messages.append(function_result_msg)
# add a single ChatMessage to the response with the results
result_message = ChatMessage(role="tool", contents=function_call_results)
yield ChatResponseUpdate(contents=function_call_results, role="tool")
response.messages.append(result_message)
# response should contain 2 messages after this,
# one with function call contents
# and one with function result contents
# the amount and call_id's should match
# this runs in every but the first run
# we need to keep track of all function call messages
fcc_messages.extend(response.messages)
# and add them as additional context to the messages
if any(isinstance(fccr, FunctionApprovalRequestContent) for fccr in function_call_results):
return
if getattr(kwargs.get("chat_options"), "store", False):
prepped_messages.clear()
prepped_messages.append(result_message)
else:
prepped_messages.extend(response.messages)
continue
# If we reach this point, it means there were no function calls to handle,
# so we're done
return
# Failsafe: give up on tools, ask model for plain answer
kwargs["tool_choice"] = "none"
@@ -58,6 +58,7 @@ __all__ = [
"UriContent",
"UsageContent",
"UsageDetails",
"prepare_function_call_results",
]
logger = get_logger("agent_framework")
@@ -1681,6 +1682,31 @@ Contents = (
| FunctionApprovalResponseContent
)
def _prepare_function_call_results_as_dumpable(content: Contents | Any | list[Contents | Any]) -> Any:
if isinstance(content, list):
# Particularly deal with lists of Content
return [_prepare_function_call_results_as_dumpable(item) for item in content]
if isinstance(content, dict):
return {k: _prepare_function_call_results_as_dumpable(v) for k, v in content.items()}
if hasattr(content, "to_dict"):
return content.to_dict(exclude={"raw_representation", "additional_properties"})
return content
def prepare_function_call_results(content: Contents | Any | list[Contents | Any]) -> str:
"""Prepare the values of the function call results."""
if isinstance(content, Contents):
# For BaseContent objects, use to_dict and serialize to JSON
return json.dumps(content.to_dict(exclude={"raw_representation", "additional_properties"}))
dumpable = _prepare_function_call_results_as_dumpable(content)
if isinstance(dumpable, str):
return dumpable
# fallback
return json.dumps(dumpable)
# region Chat Response constants
@@ -37,6 +37,7 @@ from .._types import (
UriContent,
UsageContent,
UsageDetails,
prepare_function_call_results,
)
from ..exceptions import ServiceInitializationError
from ..observability import use_observability
@@ -481,7 +482,13 @@ class OpenAIAssistantsClient(OpenAIConfigMixin, BaseChatClient):
if tool_outputs is None:
tool_outputs = []
tool_outputs.append(ToolOutput(tool_call_id=call_id, output=str(function_result_content.result)))
if function_result_content.result:
output = prepare_function_call_results(function_result_content.result)
elif function_result_content.exception:
output = "Error: " + str(function_result_content.exception)
else:
output = "No output received."
tool_outputs.append(ToolOutput(tool_call_id=call_id, output=output))
return run_id, tool_outputs
@@ -35,6 +35,7 @@ from .._types import (
UriContent,
UsageContent,
UsageDetails,
prepare_function_call_results,
)
from ..exceptions import (
ServiceInitializationError,
@@ -43,7 +44,7 @@ from ..exceptions import (
)
from ..observability import use_observability
from ._exceptions import OpenAIContentFilterException
from ._shared import OpenAIBase, OpenAIConfigMixin, OpenAISettings, prepare_function_call_results
from ._shared import OpenAIBase, OpenAIConfigMixin, OpenAISettings
if sys.version_info >= (3, 12):
from typing import override # type: ignore # pragma: no cover
@@ -57,6 +57,7 @@ from .._types import (
UriContent,
UsageContent,
UsageDetails,
prepare_function_call_results,
)
from ..exceptions import (
ServiceInitializationError,
@@ -65,7 +66,7 @@ from ..exceptions import (
)
from ..observability import use_observability
from ._exceptions import OpenAIContentFilterException
from ._shared import OpenAIBase, OpenAIConfigMixin, OpenAISettings, prepare_function_call_results
from ._shared import OpenAIBase, OpenAIConfigMixin, OpenAISettings
logger = get_logger("agent_framework.openai")
@@ -471,6 +472,8 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
}
if content.result:
args["output"] = prepare_function_call_results(content.result)
if content.exception:
args["output"] = "Error: " + str(content.exception)
return args
case FunctionApprovalRequestContent():
return {
@@ -1,6 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
import json
import logging
from collections.abc import Mapping
from copy import copy
@@ -23,7 +22,7 @@ from .._logging import get_logger
from .._pydantic import AFBaseSettings
from .._serialization import SerializationMixin
from .._telemetry import APP_INFO, USER_AGENT_KEY, prepend_agent_framework_to_user_agent
from .._types import ChatOptions, Contents
from .._types import ChatOptions
from ..exceptions import ServiceInitializationError
logger: logging.Logger = get_logger("agent_framework.openai")
@@ -50,30 +49,6 @@ __all__ = [
]
def _prepare_function_call_results_as_dumpable(content: Contents | Any | list[Contents | Any]) -> Any:
if isinstance(content, list):
# Particularly deal with lists of Content
return [_prepare_function_call_results_as_dumpable(item) for item in content]
if isinstance(content, dict):
return {k: _prepare_function_call_results_as_dumpable(v) for k, v in content.items()}
if hasattr(content, "to_dict"):
return content.to_dict(exclude={"raw_representation", "additional_properties"})
return content
def prepare_function_call_results(content: Contents | Any | list[Contents | Any]) -> str | list[str]:
"""Prepare the values of the function call results."""
if isinstance(content, Contents):
# For BaseContent objects, use to_dict and serialize to JSON
return json.dumps(content.to_dict(exclude={"raw_representation", "additional_properties"}))
dumpable = _prepare_function_call_results_as_dumpable(content)
if isinstance(dumpable, str):
return dumpable
# fallback
return json.dumps(dumpable)
class OpenAISettings(AFBaseSettings):
"""OpenAI environment settings.
@@ -1,25 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
import sys
from agent_framework import (
BaseChatClient,
ChatClientProtocol,
ChatMessage,
ChatResponse,
ChatResponseUpdate,
FunctionCallContent,
FunctionResultContent,
Role,
TextContent,
ai_function,
)
if sys.version_info >= (3, 12):
pass # type: ignore
else:
pass # type: ignore[import]
def test_chat_client_type(chat_client: ChatClientProtocol):
assert isinstance(chat_client, ChatClientProtocol)
@@ -51,113 +39,3 @@ async def test_base_client_get_response(chat_client_base: ChatClientProtocol):
async def test_base_client_get_streaming_response(chat_client_base: ChatClientProtocol):
async for update in chat_client_base.get_streaming_response(ChatMessage(role="user", text="Hello")):
assert update.text == "update - Hello" or update.text == "another update"
async def test_base_client_with_function_calling(chat_client_base: ChatClientProtocol):
exec_counter = 0
@ai_function(name="test_function")
def ai_func(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
chat_client_base.run_responses = [
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='{"arg1": "value1"}')],
)
),
ChatResponse(messages=ChatMessage(role="assistant", text="done")),
]
response = await chat_client_base.get_response("hello", tool_choice="auto", tools=[ai_func])
assert exec_counter == 1
assert len(response.messages) == 3
assert response.messages[0].role == Role.ASSISTANT
assert isinstance(response.messages[0].contents[0], FunctionCallContent)
assert response.messages[0].contents[0].name == "test_function"
assert response.messages[0].contents[0].arguments == '{"arg1": "value1"}'
assert response.messages[0].contents[0].call_id == "1"
assert response.messages[1].role == Role.TOOL
assert isinstance(response.messages[1].contents[0], FunctionResultContent)
assert response.messages[1].contents[0].call_id == "1"
assert response.messages[1].contents[0].result == "Processed value1"
assert response.messages[2].role == Role.ASSISTANT
assert response.messages[2].text == "done"
async def test_base_client_with_function_calling_resets(chat_client_base: ChatClientProtocol):
exec_counter = 0
@ai_function(name="test_function")
def ai_func(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
chat_client_base.run_responses = [
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='{"arg1": "value1"}')],
)
),
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="2", name="test_function", arguments='{"arg1": "value1"}')],
)
),
ChatResponse(messages=ChatMessage(role="assistant", text="done")),
]
response = await chat_client_base.get_response("hello", tool_choice="auto", tools=[ai_func])
assert exec_counter == 2
assert len(response.messages) == 5
assert response.messages[0].role == Role.ASSISTANT
assert response.messages[1].role == Role.TOOL
assert response.messages[2].role == Role.ASSISTANT
assert response.messages[3].role == Role.TOOL
assert response.messages[4].role == Role.ASSISTANT
assert isinstance(response.messages[0].contents[0], FunctionCallContent)
assert isinstance(response.messages[1].contents[0], FunctionResultContent)
assert isinstance(response.messages[2].contents[0], FunctionCallContent)
assert isinstance(response.messages[3].contents[0], FunctionResultContent)
async def test_base_client_with_streaming_function_calling(chat_client_base: ChatClientProtocol):
exec_counter = 0
@ai_function(name="test_function")
def ai_func(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
chat_client_base.streaming_responses = [
[
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='{"arg1":')],
role="assistant",
),
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='"value1"}')],
role="assistant",
),
],
[
ChatResponseUpdate(
contents=[TextContent(text="Processed value1")],
role="assistant",
)
],
]
updates = []
async for update in chat_client_base.get_streaming_response("hello", tool_choice="auto", tools=[ai_func]):
updates.append(update)
assert len(updates) == 4 # two updates with the function call, the function result and the final text
assert updates[0].contents[0].call_id == "1"
assert updates[1].contents[0].call_id == "1"
assert updates[2].contents[0].call_id == "1"
assert updates[3].text == "Processed value1"
assert exec_counter == 1
@@ -0,0 +1,433 @@
# Copyright (c) Microsoft. All rights reserved.
import pytest
from agent_framework import (
ChatClientProtocol,
ChatMessage,
ChatOptions,
ChatResponse,
ChatResponseUpdate,
FunctionApprovalRequestContent,
FunctionCallContent,
FunctionResultContent,
Role,
TextContent,
ai_function,
)
async def test_base_client_with_function_calling(chat_client_base: ChatClientProtocol):
exec_counter = 0
@ai_function(name="test_function")
def ai_func(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
chat_client_base.run_responses = [
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='{"arg1": "value1"}')],
)
),
ChatResponse(messages=ChatMessage(role="assistant", text="done")),
]
response = await chat_client_base.get_response("hello", tool_choice="auto", tools=[ai_func])
assert exec_counter == 1
assert len(response.messages) == 3
assert response.messages[0].role == Role.ASSISTANT
assert isinstance(response.messages[0].contents[0], FunctionCallContent)
assert response.messages[0].contents[0].name == "test_function"
assert response.messages[0].contents[0].arguments == '{"arg1": "value1"}'
assert response.messages[0].contents[0].call_id == "1"
assert response.messages[1].role == Role.TOOL
assert isinstance(response.messages[1].contents[0], FunctionResultContent)
assert response.messages[1].contents[0].call_id == "1"
assert response.messages[1].contents[0].result == "Processed value1"
assert response.messages[2].role == Role.ASSISTANT
assert response.messages[2].text == "done"
async def test_base_client_with_function_calling_resets(chat_client_base: ChatClientProtocol):
exec_counter = 0
@ai_function(name="test_function")
def ai_func(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
chat_client_base.run_responses = [
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='{"arg1": "value1"}')],
)
),
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="2", name="test_function", arguments='{"arg1": "value1"}')],
)
),
ChatResponse(messages=ChatMessage(role="assistant", text="done")),
]
response = await chat_client_base.get_response("hello", tool_choice="auto", tools=[ai_func])
assert exec_counter == 2
assert len(response.messages) == 5
assert response.messages[0].role == Role.ASSISTANT
assert response.messages[1].role == Role.TOOL
assert response.messages[2].role == Role.ASSISTANT
assert response.messages[3].role == Role.TOOL
assert response.messages[4].role == Role.ASSISTANT
assert isinstance(response.messages[0].contents[0], FunctionCallContent)
assert isinstance(response.messages[1].contents[0], FunctionResultContent)
assert isinstance(response.messages[2].contents[0], FunctionCallContent)
assert isinstance(response.messages[3].contents[0], FunctionResultContent)
async def test_base_client_with_streaming_function_calling(chat_client_base: ChatClientProtocol):
exec_counter = 0
@ai_function(name="test_function")
def ai_func(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
chat_client_base.streaming_responses = [
[
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='{"arg1":')],
role="assistant",
),
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='"value1"}')],
role="assistant",
),
],
[
ChatResponseUpdate(
contents=[TextContent(text="Processed value1")],
role="assistant",
)
],
]
updates = []
async for update in chat_client_base.get_streaming_response("hello", tool_choice="auto", tools=[ai_func]):
updates.append(update)
assert len(updates) == 4 # two updates with the function call, the function result and the final text
assert updates[0].contents[0].call_id == "1"
assert updates[1].contents[0].call_id == "1"
assert updates[2].contents[0].call_id == "1"
assert updates[3].text == "Processed value1"
assert exec_counter == 1
@pytest.mark.parametrize(
"approval_required,num_functions",
[
pytest.param(False, 1, id="single function without approval"),
pytest.param(True, 1, id="single function with approval"),
pytest.param("mixed", 2, id="two functions with mixed approval"),
],
)
@pytest.mark.parametrize(
"thread_type",
[
pytest.param(None, id="no thread"),
pytest.param("local", id="local thread"),
pytest.param("service", id="service thread"),
],
)
@pytest.mark.parametrize("streaming", [False, True], ids=["non-streaming", "streaming"])
async def test_function_invocation_scenarios(
chat_client_base: ChatClientProtocol,
streaming: bool,
thread_type: str | None,
approval_required: bool | str,
num_functions: int,
):
"""Comprehensive test for function invocation scenarios.
This test covers:
- Single function without approval: 3 messages (call, result, final)
- Single function with approval: 2 messages (call, approval request)
- Two functions with mixed approval: varies based on approval flow
- All scenarios tested with both streaming and non-streaming
- Thread scenarios: no thread, local thread (in-memory), and service thread (conversation_id)
"""
exec_counter = 0
# Setup thread based on parameters
conversation_id = None
if thread_type == "service":
# Simulate a service-side thread with conversation_id
conversation_id = "test-thread-123"
@ai_function(name="no_approval_func")
def func_no_approval(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
@ai_function(name="approval_func", approval_mode="always_require")
def func_with_approval(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Approved {arg1}"
# Setup tools and responses based on the scenario
if num_functions == 1:
tools = [func_with_approval if approval_required else func_no_approval]
function_name = "approval_func" if approval_required else "no_approval_func"
# Single function call content
func_call = FunctionCallContent(call_id="1", name=function_name, arguments='{"arg1": "value1"}')
completion = ChatMessage(role="assistant", text="done")
chat_client_base.run_responses = [
ChatResponse(messages=ChatMessage(role="assistant", contents=[func_call]))
] + ([] if approval_required else [ChatResponse(messages=completion)])
chat_client_base.streaming_responses = [
[
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="1", name=function_name, arguments='{"arg1":')],
role="assistant",
),
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="1", name=function_name, arguments='"value1"}')],
role="assistant",
),
]
] + ([] if approval_required else [[ChatResponseUpdate(contents=[TextContent(text="done")], role="assistant")]])
else: # num_functions == 2
tools = [func_no_approval, func_with_approval]
# Two function calls content
func_calls = [
FunctionCallContent(call_id="1", name="no_approval_func", arguments='{"arg1": "value1"}'),
FunctionCallContent(call_id="2", name="approval_func", arguments='{"arg1": "value2"}'),
]
chat_client_base.run_responses = [ChatResponse(messages=ChatMessage(role="assistant", contents=func_calls))]
chat_client_base.streaming_responses = [
[
ChatResponseUpdate(contents=[func_calls[0]], role="assistant"),
ChatResponseUpdate(contents=[func_calls[1]], role="assistant"),
]
]
# Execute the test
chat_options = ChatOptions(tool_choice="auto", tools=tools)
if thread_type == "service":
# For service threads, we need to pass conversation_id via ChatOptions
chat_options.store = True
chat_options.conversation_id = conversation_id
if not streaming:
response = await chat_client_base.get_response("hello", chat_options=chat_options)
messages = response.messages
else:
updates = []
async for update in chat_client_base.get_streaming_response("hello", chat_options=chat_options):
updates.append(update)
messages = updates
# Service threads have different message management behavior (server-side storage)
# so we skip detailed message assertions for those scenarios
if thread_type == "service":
# Just verify the function was executed or not based on approval
if not approval_required or approval_required == "mixed":
# For service threads, the execution counter check is still valid
pass
return
# Verify based on scenario (for no thread and local thread cases)
if num_functions == 1:
if approval_required:
# Single function with approval: call + approval request
if not streaming:
assert len(messages) == 2
assert isinstance(messages[0].contents[0], FunctionCallContent)
assert isinstance(messages[1].contents[0], FunctionApprovalRequestContent)
assert messages[1].contents[0].function_call.name == "approval_func"
assert exec_counter == 0 # Function not executed yet
else:
# Streaming: 2 function call chunks + 1 approval request
assert len(messages) == 3
assert isinstance(messages[0].contents[0], FunctionCallContent)
assert isinstance(messages[1].contents[0], FunctionCallContent)
assert isinstance(messages[2].contents[0], FunctionApprovalRequestContent)
assert messages[2].contents[0].function_call.name == "approval_func"
assert exec_counter == 0 # Function not executed yet
else:
# Single function without approval: call + result + final
if not streaming:
assert len(messages) == 3
assert isinstance(messages[0].contents[0], FunctionCallContent)
assert isinstance(messages[1].contents[0], FunctionResultContent)
assert messages[1].contents[0].result == "Processed value1"
assert messages[2].role == Role.ASSISTANT
assert messages[2].text == "done"
assert exec_counter == 1
else:
# Streaming has: 2 function call updates + 1 result update + 1 final update
assert len(messages) == 4
assert isinstance(messages[0].contents[0], FunctionCallContent)
assert isinstance(messages[1].contents[0], FunctionCallContent)
assert isinstance(messages[2].contents[0], FunctionResultContent)
assert messages[3].text == "done"
assert exec_counter == 1
else: # num_functions == 2
# Two functions with mixed approval
if not streaming:
# Mixed: first message has both calls, second has approval requests for both
# (because when one requires approval, all are batched for approval)
assert len(messages) == 2
assert len(messages[0].contents) == 2 # Both function calls
assert isinstance(messages[0].contents[0], FunctionCallContent)
assert isinstance(messages[0].contents[1], FunctionCallContent)
# Both should result in approval requests
assert len(messages[1].contents) == 2
assert all(isinstance(c, FunctionApprovalRequestContent) for c in messages[1].contents)
assert exec_counter == 0 # Neither function executed yet
else:
# Streaming: 2 function call updates + 1 approval request with 2 contents
assert len(messages) == 3
assert isinstance(messages[0].contents[0], FunctionCallContent)
assert isinstance(messages[1].contents[0], FunctionCallContent)
# The approval request message contains both approval requests
assert len(messages[2].contents) == 2
assert all(isinstance(c, FunctionApprovalRequestContent) for c in messages[2].contents)
assert exec_counter == 0 # Neither function executed yet
async def test_rejected_approval(chat_client_base: ChatClientProtocol):
"""Test that rejecting an approval alongside an approved one is handled correctly."""
from agent_framework import FunctionApprovalResponseContent
exec_counter_approved = 0
exec_counter_rejected = 0
@ai_function(name="approved_func", approval_mode="always_require")
def func_approved(arg1: str) -> str:
nonlocal exec_counter_approved
exec_counter_approved += 1
return f"Approved {arg1}"
@ai_function(name="rejected_func", approval_mode="always_require")
def func_rejected(arg1: str) -> str:
nonlocal exec_counter_rejected
exec_counter_rejected += 1
return f"Rejected {arg1}"
# Setup: two function calls that require approval
chat_client_base.run_responses = [
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[
FunctionCallContent(call_id="1", name="approved_func", arguments='{"arg1": "value1"}'),
FunctionCallContent(call_id="2", name="rejected_func", arguments='{"arg1": "value2"}'),
],
)
),
ChatResponse(messages=ChatMessage(role="assistant", text="done")),
]
# Get the response with approval requests
response = await chat_client_base.get_response("hello", tool_choice="auto", tools=[func_approved, func_rejected])
assert len(response.messages) == 2
assert len(response.messages[1].contents) == 2
assert all(isinstance(c, FunctionApprovalRequestContent) for c in response.messages[1].contents)
# Approve one and reject the other
approval_req_1 = response.messages[1].contents[0]
approval_req_2 = response.messages[1].contents[1]
approved_response = FunctionApprovalResponseContent(
id=approval_req_1.id,
function_call=approval_req_1.function_call,
approved=True,
)
rejected_response = FunctionApprovalResponseContent(
id=approval_req_2.id,
function_call=approval_req_2.function_call,
approved=False,
)
# Continue conversation with one approved and one rejected
all_messages = response.messages + [ChatMessage(role="user", contents=[approved_response, rejected_response])]
# Call get_response which will process the approvals
await chat_client_base.get_response(all_messages, tool_choice="auto", tools=[func_approved, func_rejected])
# Verify the approval/rejection was processed correctly
# Find the results in the input messages (modified in-place)
approved_result = None
rejected_result = None
for msg in all_messages:
for content in msg.contents:
if isinstance(content, FunctionResultContent):
if content.call_id == "1":
approved_result = content
elif content.call_id == "2":
rejected_result = content
# The approved function should have been executed and have a result
assert approved_result is not None, "Should have found result for approved function"
assert approved_result.result == "Approved value1"
assert exec_counter_approved == 1
# The rejected function should have a "not approved" result and NOT have been executed
assert rejected_result is not None, "Should have found result for rejected function"
assert rejected_result.result == "Error: Tool call invocation was rejected by user."
assert exec_counter_rejected == 0
async def test_max_iterations_limit(chat_client_base: ChatClientProtocol):
"""Test that MAX_ITERATIONS in additional_properties limits function call loops."""
exec_counter = 0
@ai_function(name="test_function")
def ai_func(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
# Set up multiple function call responses to create a loop
chat_client_base.run_responses = [
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="1", name="test_function", arguments='{"arg1": "value1"}')],
)
),
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="2", name="test_function", arguments='{"arg1": "value2"}')],
)
),
# Failsafe response when tool_choice is set to "none"
ChatResponse(messages=ChatMessage(role="assistant", text="giving up on tools")),
]
# Set max_iterations to 1 in additional_properties
chat_client_base.additional_properties = {"max_iterations": 1}
response = await chat_client_base.get_response("hello", tool_choice="auto", tools=[ai_func])
# With max_iterations=1, we should:
# 1. Execute first function call (exec_counter=1)
# 2. Try to make second call but hit iteration limit
# 3. Fall back to asking for a plain answer with tool_choice="none"
assert exec_counter == 1 # Only first function executed
assert response.messages[-1].text == "I broke out of the function invocation loop..." # Failsafe response
@@ -616,3 +616,507 @@ def test_hosted_mcp_tool_with_dict_of_allowed_tools():
url="https://mcp.example",
allowed_tools={"toolA": "Tool A", "toolC": "Tool C"},
)
# region Approval Flow Tests
@pytest.fixture
def mock_chat_client():
"""Create a mock chat client for testing approval flows."""
from agent_framework import ChatMessage, ChatResponse, ChatResponseUpdate
class MockChatClient:
def __init__(self):
self.call_count = 0
self.responses = []
async def get_response(self, messages, **kwargs):
"""Mock get_response that returns predefined responses."""
if self.call_count < len(self.responses):
response = self.responses[self.call_count]
self.call_count += 1
return response
# Default response
return ChatResponse(
messages=[ChatMessage(role="assistant", contents=["Default response"])],
)
async def get_streaming_response(self, messages, **kwargs):
"""Mock get_streaming_response that yields predefined updates."""
if self.call_count < len(self.responses):
response = self.responses[self.call_count]
self.call_count += 1
# Yield updates from the response
for msg in response.messages:
for content in msg.contents:
yield ChatResponseUpdate(contents=[content], role=msg.role)
else:
# Default response
yield ChatResponseUpdate(contents=["Default response"], role="assistant")
return MockChatClient()
@ai_function(name="no_approval_tool", description="Tool that doesn't require approval")
def no_approval_tool(x: int) -> int:
"""A tool that doesn't require approval."""
return x * 2
@ai_function(
name="requires_approval_tool",
description="Tool that requires approval",
approval_mode="always_require",
)
def requires_approval_tool(x: int) -> int:
"""A tool that requires approval."""
return x * 3
async def test_non_streaming_single_function_no_approval():
"""Test non-streaming handler with single function call that doesn't require approval."""
from agent_framework import ChatMessage, ChatResponse, FunctionCallContent
from agent_framework._tools import _handle_function_calls_response
# Create mock client
mock_client = type("MockClient", (), {})()
# Create responses: first with function call, second with final answer
initial_response = ChatResponse(
messages=[
ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="call_1", name="no_approval_tool", arguments='{"x": 5}')],
)
]
)
final_response = ChatResponse(messages=[ChatMessage(role="assistant", contents=["The result is 10"])])
call_count = [0]
responses = [initial_response, final_response]
async def mock_get_response(self, messages, **kwargs):
result = responses[call_count[0]]
call_count[0] += 1
return result
# Wrap the function
wrapped = _handle_function_calls_response(mock_get_response)
# Execute
result = await wrapped(mock_client, messages=[], tools=[no_approval_tool])
# Verify: should have 3 messages: function call, function result, final answer
assert len(result.messages) == 3
assert isinstance(result.messages[0].contents[0], FunctionCallContent)
from agent_framework import FunctionResultContent
assert isinstance(result.messages[1].contents[0], FunctionResultContent)
assert result.messages[1].contents[0].result == 10 # 5 * 2
assert result.messages[2].contents[0] == "The result is 10"
async def test_non_streaming_single_function_requires_approval():
"""Test non-streaming handler with single function call that requires approval."""
from agent_framework import ChatMessage, ChatResponse, FunctionCallContent
from agent_framework._tools import _handle_function_calls_response
mock_client = type("MockClient", (), {})()
# Initial response with function call
initial_response = ChatResponse(
messages=[
ChatMessage(
role="assistant",
contents=[FunctionCallContent(call_id="call_1", name="requires_approval_tool", arguments='{"x": 5}')],
)
]
)
call_count = [0]
responses = [initial_response]
async def mock_get_response(self, messages, **kwargs):
result = responses[call_count[0]]
call_count[0] += 1
return result
wrapped = _handle_function_calls_response(mock_get_response)
# Execute
result = await wrapped(mock_client, messages=[], tools=[requires_approval_tool])
# Verify: should return 2 messages - function call and approval request
from agent_framework import FunctionApprovalRequestContent
assert len(result.messages) == 2
assert isinstance(result.messages[0].contents[0], FunctionCallContent)
assert isinstance(result.messages[1].contents[0], FunctionApprovalRequestContent)
assert result.messages[1].contents[0].function_call.name == "requires_approval_tool"
async def test_non_streaming_two_functions_both_no_approval():
"""Test non-streaming handler with two function calls, neither requiring approval."""
from agent_framework import ChatMessage, ChatResponse, FunctionCallContent
from agent_framework._tools import _handle_function_calls_response
mock_client = type("MockClient", (), {})()
# Initial response with two function calls to the same tool
initial_response = ChatResponse(
messages=[
ChatMessage(
role="assistant",
contents=[
FunctionCallContent(call_id="call_1", name="no_approval_tool", arguments='{"x": 5}'),
FunctionCallContent(call_id="call_2", name="no_approval_tool", arguments='{"x": 3}'),
],
)
]
)
final_response = ChatResponse(
messages=[ChatMessage(role="assistant", contents=["Both tools executed successfully"])]
)
call_count = [0]
responses = [initial_response, final_response]
async def mock_get_response(self, messages, **kwargs):
result = responses[call_count[0]]
call_count[0] += 1
return result
wrapped = _handle_function_calls_response(mock_get_response)
# Execute
result = await wrapped(mock_client, messages=[], tools=[no_approval_tool])
# Verify: should have function calls, results, and final answer
from agent_framework import FunctionResultContent
assert len(result.messages) == 3
# First message has both function calls
assert len(result.messages[0].contents) == 2
# Second message has both results
assert len(result.messages[1].contents) == 2
assert all(isinstance(c, FunctionResultContent) for c in result.messages[1].contents)
assert result.messages[1].contents[0].result == 10 # 5 * 2
assert result.messages[1].contents[1].result == 6 # 3 * 2
async def test_non_streaming_two_functions_both_require_approval():
"""Test non-streaming handler with two function calls, both requiring approval."""
from agent_framework import ChatMessage, ChatResponse, FunctionCallContent
from agent_framework._tools import _handle_function_calls_response
mock_client = type("MockClient", (), {})()
# Initial response with two function calls to the same tool
initial_response = ChatResponse(
messages=[
ChatMessage(
role="assistant",
contents=[
FunctionCallContent(call_id="call_1", name="requires_approval_tool", arguments='{"x": 5}'),
FunctionCallContent(call_id="call_2", name="requires_approval_tool", arguments='{"x": 3}'),
],
)
]
)
call_count = [0]
responses = [initial_response]
async def mock_get_response(self, messages, **kwargs):
result = responses[call_count[0]]
call_count[0] += 1
return result
wrapped = _handle_function_calls_response(mock_get_response)
# Execute
result = await wrapped(mock_client, messages=[], tools=[requires_approval_tool])
# Verify: should return 2 messages - function calls and approval requests
from agent_framework import FunctionApprovalRequestContent
assert len(result.messages) == 2
assert len(result.messages[0].contents) == 2 # Both function calls
assert all(isinstance(c, FunctionCallContent) for c in result.messages[0].contents)
assert len(result.messages[1].contents) == 2 # Both approval requests
assert all(isinstance(c, FunctionApprovalRequestContent) for c in result.messages[1].contents)
assert result.messages[1].contents[0].function_call.name == "requires_approval_tool"
assert result.messages[1].contents[1].function_call.name == "requires_approval_tool"
async def test_non_streaming_two_functions_mixed_approval():
"""Test non-streaming handler with two function calls, one requiring approval."""
from agent_framework import ChatMessage, ChatResponse, FunctionCallContent
from agent_framework._tools import _handle_function_calls_response
mock_client = type("MockClient", (), {})()
# Initial response with two function calls
initial_response = ChatResponse(
messages=[
ChatMessage(
role="assistant",
contents=[
FunctionCallContent(call_id="call_1", name="no_approval_tool", arguments='{"x": 5}'),
FunctionCallContent(call_id="call_2", name="requires_approval_tool", arguments='{"x": 3}'),
],
)
]
)
call_count = [0]
responses = [initial_response]
async def mock_get_response(self, messages, **kwargs):
result = responses[call_count[0]]
call_count[0] += 1
return result
wrapped = _handle_function_calls_response(mock_get_response)
# Execute
result = await wrapped(mock_client, messages=[], tools=[no_approval_tool, requires_approval_tool])
# Verify: should return approval requests for both (when one needs approval, all are sent for approval)
from agent_framework import FunctionApprovalRequestContent
assert len(result.messages) == 2
assert len(result.messages[0].contents) == 2 # Both function calls
assert len(result.messages[1].contents) == 2 # Both approval requests
assert all(isinstance(c, FunctionApprovalRequestContent) for c in result.messages[1].contents)
async def test_streaming_single_function_no_approval():
"""Test streaming handler with single function call that doesn't require approval."""
from agent_framework import ChatResponseUpdate, FunctionCallContent
from agent_framework._tools import _handle_function_calls_streaming_response
mock_client = type("MockClient", (), {})()
# Initial response with function call, then final response after function execution
initial_updates = [
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="call_1", name="no_approval_tool", arguments='{"x": 5}')],
role="assistant",
)
]
final_updates = [ChatResponseUpdate(contents=["The result is 10"], role="assistant")]
call_count = [0]
updates_list = [initial_updates, final_updates]
async def mock_get_streaming_response(self, messages, **kwargs):
updates = updates_list[call_count[0]]
call_count[0] += 1
for update in updates:
yield update
wrapped = _handle_function_calls_streaming_response(mock_get_streaming_response)
# Execute and collect updates
updates = []
async for update in wrapped(mock_client, messages=[], tools=[no_approval_tool]):
updates.append(update)
# Verify: should have function call update, tool result update (injected), and final update
from agent_framework import FunctionResultContent, Role
assert len(updates) >= 3
# First update is the function call
assert isinstance(updates[0].contents[0], FunctionCallContent)
# Second update should be the tool result (injected by the wrapper)
assert updates[1].role == Role.TOOL
assert isinstance(updates[1].contents[0], FunctionResultContent)
assert updates[1].contents[0].result == 10 # 5 * 2
# Last update is the final message
assert updates[-1].contents[0] == "The result is 10"
async def test_streaming_single_function_requires_approval():
"""Test streaming handler with single function call that requires approval."""
from agent_framework import ChatResponseUpdate, FunctionCallContent
from agent_framework._tools import _handle_function_calls_streaming_response
mock_client = type("MockClient", (), {})()
# Initial response with function call
initial_updates = [
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="call_1", name="requires_approval_tool", arguments='{"x": 5}')],
role="assistant",
)
]
call_count = [0]
updates_list = [initial_updates]
async def mock_get_streaming_response(self, messages, **kwargs):
updates = updates_list[call_count[0]]
call_count[0] += 1
for update in updates:
yield update
wrapped = _handle_function_calls_streaming_response(mock_get_streaming_response)
# Execute and collect updates
updates = []
async for update in wrapped(mock_client, messages=[], tools=[requires_approval_tool]):
updates.append(update)
# Verify: should yield function call and then approval request
from agent_framework import FunctionApprovalRequestContent, Role
assert len(updates) == 2
assert isinstance(updates[0].contents[0], FunctionCallContent)
assert updates[1].role == Role.TOOL
assert isinstance(updates[1].contents[0], FunctionApprovalRequestContent)
async def test_streaming_two_functions_both_no_approval():
"""Test streaming handler with two function calls, neither requiring approval."""
from agent_framework import ChatResponseUpdate, FunctionCallContent
from agent_framework._tools import _handle_function_calls_streaming_response
mock_client = type("MockClient", (), {})()
# Initial response with two function calls to the same tool
initial_updates = [
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="call_1", name="no_approval_tool", arguments='{"x": 5}')],
role="assistant",
),
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="call_2", name="no_approval_tool", arguments='{"x": 3}')],
role="assistant",
),
]
final_updates = [ChatResponseUpdate(contents=["Both tools executed successfully"], role="assistant")]
call_count = [0]
updates_list = [initial_updates, final_updates]
async def mock_get_streaming_response(self, messages, **kwargs):
updates = updates_list[call_count[0]]
call_count[0] += 1
for update in updates:
yield update
wrapped = _handle_function_calls_streaming_response(mock_get_streaming_response)
# Execute and collect updates
updates = []
async for update in wrapped(mock_client, messages=[], tools=[no_approval_tool]):
updates.append(update)
# Verify: should have both function calls, one tool result update with both results, and final message
from agent_framework import FunctionResultContent, Role
assert len(updates) >= 3
# First two updates are function calls
assert isinstance(updates[0].contents[0], FunctionCallContent)
assert isinstance(updates[1].contents[0], FunctionCallContent)
# Should have a tool result update with both results
tool_updates = [u for u in updates if u.role == Role.TOOL]
assert len(tool_updates) == 1
assert len(tool_updates[0].contents) == 2
assert all(isinstance(c, FunctionResultContent) for c in tool_updates[0].contents)
async def test_streaming_two_functions_both_require_approval():
"""Test streaming handler with two function calls, both requiring approval."""
from agent_framework import ChatResponseUpdate, FunctionCallContent
from agent_framework._tools import _handle_function_calls_streaming_response
mock_client = type("MockClient", (), {})()
# Initial response with two function calls to the same tool
initial_updates = [
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="call_1", name="requires_approval_tool", arguments='{"x": 5}')],
role="assistant",
),
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="call_2", name="requires_approval_tool", arguments='{"x": 3}')],
role="assistant",
),
]
call_count = [0]
updates_list = [initial_updates]
async def mock_get_streaming_response(self, messages, **kwargs):
updates = updates_list[call_count[0]]
call_count[0] += 1
for update in updates:
yield update
wrapped = _handle_function_calls_streaming_response(mock_get_streaming_response)
# Execute and collect updates
updates = []
async for update in wrapped(mock_client, messages=[], tools=[requires_approval_tool]):
updates.append(update)
# Verify: should yield both function calls and then approval requests
from agent_framework import FunctionApprovalRequestContent, Role
assert len(updates) == 3
assert isinstance(updates[0].contents[0], FunctionCallContent)
assert isinstance(updates[1].contents[0], FunctionCallContent)
# Tool update with both approval requests
assert updates[2].role == Role.TOOL
assert len(updates[2].contents) == 2
assert all(isinstance(c, FunctionApprovalRequestContent) for c in updates[2].contents)
async def test_streaming_two_functions_mixed_approval():
"""Test streaming handler with two function calls, one requiring approval."""
from agent_framework import ChatResponseUpdate, FunctionCallContent
from agent_framework._tools import _handle_function_calls_streaming_response
mock_client = type("MockClient", (), {})()
# Initial response with two function calls
initial_updates = [
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="call_1", name="no_approval_tool", arguments='{"x": 5}')],
role="assistant",
),
ChatResponseUpdate(
contents=[FunctionCallContent(call_id="call_2", name="requires_approval_tool", arguments='{"x": 3}')],
role="assistant",
),
]
call_count = [0]
updates_list = [initial_updates]
async def mock_get_streaming_response(self, messages, **kwargs):
updates = updates_list[call_count[0]]
call_count[0] += 1
for update in updates:
yield update
wrapped = _handle_function_calls_streaming_response(mock_get_streaming_response)
# Execute and collect updates
updates = []
async for update in wrapped(mock_client, messages=[], tools=[no_approval_tool, requires_approval_tool]):
updates.append(update)
# Verify: should yield both function calls and then approval requests (when one needs approval, all wait)
from agent_framework import FunctionApprovalRequestContent, Role
assert len(updates) == 3
assert isinstance(updates[0].contents[0], FunctionCallContent)
assert isinstance(updates[1].contents[0], FunctionCallContent)
# Tool update with both approval requests
assert updates[2].role == Role.TOOL
assert len(updates[2].contents) == 2
assert all(isinstance(c, FunctionApprovalRequestContent) for c in updates[2].contents)
@@ -22,11 +22,11 @@ from agent_framework import (
TextContent,
ToolProtocol,
ai_function,
prepare_function_call_results,
)
from agent_framework.exceptions import ServiceInitializationError, ServiceResponseException
from agent_framework.openai import OpenAIChatClient
from agent_framework.openai._exceptions import OpenAIContentFilterException
from agent_framework.openai._shared import prepare_function_call_results
skip_if_openai_integration_tests_disabled = pytest.mark.skipif(
os.getenv("RUN_INTEGRATION_TESTS", "false").lower() != "true"
@@ -782,7 +782,7 @@ def test_create_streaming_response_content_with_mcp_approval_request() -> None:
@pytest.mark.parametrize("enable_otel", [False], indirect=True)
@pytest.mark.parametrize("enable_sensitive_data", [False], indirect=True)
def test_end_to_end_mcp_approval_flow(span_exporter) -> None:
async def test_end_to_end_mcp_approval_flow(span_exporter) -> None:
"""End-to-end mocked test:
model issues an mcp_approval_request, user approves, client sends mcp_approval_response.
"""
@@ -824,7 +824,7 @@ def test_end_to_end_mcp_approval_flow(span_exporter) -> None:
# Patch the create call to return the two mocked responses in sequence
with patch.object(client.client.responses, "create", side_effect=[mock_response1, mock_response2]) as mock_create:
# First call: get the approval request
response = asyncio.run(client.get_response(messages=[ChatMessage(role="user", text="Trigger approval")]))
response = await client.get_response(messages=[ChatMessage(role="user", text="Trigger approval")])
assert isinstance(response.messages[0].contents[0], FunctionApprovalRequestContent)
req = response.messages[0].contents[0]
assert req.id == "approval-1"
@@ -832,7 +832,7 @@ def test_end_to_end_mcp_approval_flow(span_exporter) -> None:
# Build a user approval and send it (include required function_call)
approval = FunctionApprovalResponseContent(approved=True, id=req.id, function_call=req.function_call)
approval_message = ChatMessage(role="user", contents=[approval])
_ = asyncio.run(client.get_response(messages=[approval_message]))
_ = await client.get_response(messages=[approval_message])
# Ensure two calls were made and the second includes the mcp_approval_response
assert mock_create.call_count == 2
@@ -0,0 +1,102 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from random import randrange
from typing import TYPE_CHECKING, Annotated, Any
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIResponsesClient
if TYPE_CHECKING:
from agent_framework import AgentProtocol
"""
Demonstration of a tool with approvals.
This sample demonstrates using AI functions with user approval workflows.
It shows how to handle function call approvals without using threads.
"""
conditions = ["sunny", "cloudy", "raining", "snowing", "clear"]
@ai_function
def get_weather(location: Annotated[str, "The city and state, e.g. San Francisco, CA"]) -> str:
"""Get the current weather for a given location."""
# Simulate weather data
return f"The weather in {location} is {conditions[randrange(0, len(conditions))]} and {randrange(-10, 30)}°C."
# Define a simple weather tool that requires approval
@ai_function(approval_mode="always_require")
def get_weather_detail(location: Annotated[str, "The city and state, e.g. San Francisco, CA"]) -> str:
"""Get the current weather for a given location."""
# Simulate weather data
return (
f"The weather in {location} is {conditions[randrange(0, len(conditions))]} and {randrange(-10, 30)}°C, "
"with a humidity of 88%. "
f"Tomorrow will be {conditions[randrange(0, len(conditions))]} with a high of {randrange(-10, 30)}°C."
)
async def handle_approvals(query: str, agent: "AgentProtocol"):
"""Handle function call approvals.
When we don't have a thread, we need to ensure we include the original query,
the approval request, and the approval response in each iteration.
"""
from agent_framework import ChatMessage
result = await agent.run(query)
while len(result.user_input_requests) > 0:
# Start with the original query
new_inputs: list[Any] = [query]
for user_input_needed in result.user_input_requests:
print(
f"\nUser Input Request for function from {agent.name}:"
f"\n Function: {user_input_needed.function_call.name}"
f"\n Arguments: {user_input_needed.function_call.arguments}"
)
# Add the assistant message with the approval request
new_inputs.append(ChatMessage(role="assistant", contents=[user_input_needed]))
# Get user approval
user_approval = input("\nApprove function call? (y/n): ")
# Add the user's approval response
new_inputs.append(
ChatMessage(role="user", contents=[user_input_needed.create_response(user_approval.lower() == "y")])
)
# Run again with all the context
result = await agent.run(new_inputs)
return result
async def run_weather_agent_with_approval() -> None:
"""Example showing AI function with approval requirement."""
print("\n=== Weather Agent WITH Approval Required ===\n")
async with ChatAgent(
chat_client=OpenAIResponsesClient(),
name="WeatherAgent",
instructions=("You are a helpful weather assistant. Use the get_weather tool to provide weather information."),
tools=[get_weather, get_weather_detail],
) as agent:
query2 = "Can you give me an update of the weather in LA and Portland and detailed weather for Seattle?"
print(f"User: {query2}")
result2 = await handle_approvals(query2, agent)
print(f"\n{agent.name}: {result2}\n")
async def main() -> None:
print("=== Demonstration of a tool with approvals ===\n")
await run_weather_agent_with_approval()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,103 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Annotated
from agent_framework import FunctionCallContent, FunctionResultContent
from agent_framework.openai import OpenAIResponsesClient
"""
Tool exceptions handled by returning the error for the agent to recover from.
Shows how a tool that throws an exception creates gracefull recovery and can keep going.
The LLM decides whether to retry the call or to respond with something else, based on the exception.
"""
def greet(name: Annotated[str, "Name to greet"]) -> str:
"""Greet someone."""
return f"Hello, {name}!"
# we trick the AI into calling this function with 0 as denominator to trigger the exception
def safe_divide(
a: Annotated[int, "Numerator"],
b: Annotated[int, "Denominator"],
) -> str:
"""Divide two numbers can be used with 0 as denominator."""
try:
result = a / b # Will raise ZeroDivisionError
except ZeroDivisionError as exc:
print(f" Tool failed: with error: {exc}")
raise
return f"{a} / {b} = {result}"
async def main():
# tools = Tools()
agent = OpenAIResponsesClient().create_agent(
name="ToolAgent",
instructions="Use the provided tools.",
tools=[greet, safe_divide],
)
thread = agent.get_new_thread()
print("=" * 60)
print("Step 1: Call divide(10, 0) - tool raises exception")
response = await agent.run("Divide 10 by 0", thread=thread)
print(f"Response: {response.text}")
print("=" * 60)
print("Step 2: Call greet('Bob') - conversation can keep going.")
response = await agent.run("Greet Bob", thread=thread)
print(f"Response: {response.text}")
print("=" * 60)
print("Replay the conversation:")
assert thread.message_store
assert thread.message_store.list_messages
for idx, msg in enumerate(await thread.message_store.list_messages()):
if msg.text:
print(f"{idx + 1} {msg.author_name or msg.role}: {msg.text} ")
for content in msg.contents:
if isinstance(content, FunctionCallContent):
print(
f"{idx + 1} {msg.author_name}: calling function: {content.name} with arguments: {content.arguments}"
)
if isinstance(content, FunctionResultContent):
print(f"{idx + 1} {msg.role}: {content.result if content.result else content.exception}")
"""
Expected Output:
============================================================
Step 1: Call divide(10, 0) - tool raises exception
Tool failed: with error: division by zero
Response: Division by zero is undefined in standard arithmetic, so 10 ÷ 0 has no meaning.
If youre curious about limits: as x approaches 0 from the positive side, 10/x tends to +; from the negative side,
10/x tends to -.
If you want a finite result, try dividing by a nonzero number, e.g., 10 ÷ 2 = 5 or 10 ÷ 0.1 = 100. Want me to compute
something else?
============================================================
Step 2: Call greet('Bob') - conversation can keep going.
Response: Hello, Bob!
============================================================
Replay the conversation:
1 user: Divide 10 by 0
2 ToolAgent: calling function: safe_divide with arguments: {"a":10,"b":0}
3 tool: division by zero
4 ToolAgent: Division by zero is undefined in standard arithmetic, so 10 ÷ 0 has no meaning.
If youre curious about limits: as x approaches 0 from the positive side, 10/x tends to +; from the negative side,
10/x tends to -.
If you want a finite result, try dividing by a nonzero number, e.g., 10 ÷ 2 = 5 or 10 ÷ 0.1 = 100. Want me to compute
something else?
5 user: Greet Bob
6 ToolAgent: calling function: greet with arguments: {"name":"Bob"}
7 tool: Hello, Bob!
8 ToolAgent: Hello, Bob!
"""
if __name__ == "__main__":
asyncio.run(main())
+32 -32
View File
@@ -1739,7 +1739,7 @@ wheels = [
[[package]]
name = "google-api-core"
version = "2.25.1"
version = "2.25.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "google-auth", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
@@ -1748,9 +1748,9 @@ dependencies = [
{ name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "requests", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/dc/21/e9d043e88222317afdbdb567165fdbc3b0aad90064c7e0c9eb0ad9955ad8/google_api_core-2.25.1.tar.gz", hash = "sha256:d2aaa0b13c78c61cb3f4282c464c046e45fbd75755683c9c525e6e8f7ed0a5e8", size = 165443, upload-time = "2025-06-12T20:52:20.439Z" }
sdist = { url = "https://files.pythonhosted.org/packages/09/cd/63f1557235c2440fe0577acdbc32577c5c002684c58c7f4d770a92366a24/google_api_core-2.25.2.tar.gz", hash = "sha256:1c63aa6af0d0d5e37966f157a77f9396d820fba59f9e43e9415bc3dc5baff300", size = 166266, upload-time = "2025-10-03T00:07:34.778Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/14/4b/ead00905132820b623732b175d66354e9d3e69fcf2a5dcdab780664e7896/google_api_core-2.25.1-py3-none-any.whl", hash = "sha256:8a2a56c1fef82987a524371f99f3bd0143702fecc670c72e600c1cda6bf8dbb7", size = 160807, upload-time = "2025-06-12T20:52:19.334Z" },
{ url = "https://files.pythonhosted.org/packages/c8/d8/894716a5423933f5c8d2d5f04b16f052a515f78e815dab0c2c6f1fd105dc/google_api_core-2.25.2-py3-none-any.whl", hash = "sha256:e9a8f62d363dc8424a8497f4c2a47d6bcda6c16514c935629c257ab5d10210e7", size = 162489, upload-time = "2025-10-03T00:07:32.924Z" },
]
[[package]]
@@ -2063,11 +2063,11 @@ wheels = [
[[package]]
name = "identify"
version = "2.6.14"
version = "2.6.15"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/52/c4/62963f25a678f6a050fb0505a65e9e726996171e6dbe1547f79619eefb15/identify-2.6.14.tar.gz", hash = "sha256:663494103b4f717cb26921c52f8751363dc89db64364cd836a9bf1535f53cd6a", size = 99283, upload-time = "2025-09-06T19:30:52.938Z" }
sdist = { url = "https://files.pythonhosted.org/packages/ff/e7/685de97986c916a6d93b3876139e00eef26ad5bbbd61925d670ae8013449/identify-2.6.15.tar.gz", hash = "sha256:e4f4864b96c6557ef2a1e1c951771838f4edc9df3a72ec7118b338801b11c7bf", size = 99311, upload-time = "2025-10-02T17:43:40.631Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/ae/2ad30f4652712c82f1c23423d79136fbce338932ad166d70c1efb86a5998/identify-2.6.14-py2.py3-none-any.whl", hash = "sha256:11a073da82212c6646b1f39bb20d4483bfb9543bd5566fec60053c4bb309bf2e", size = 99172, upload-time = "2025-09-06T19:30:51.759Z" },
{ url = "https://files.pythonhosted.org/packages/0f/1c/e5fd8f973d4f375adb21565739498e2e9a1e54c858a97b9a8ccfdc81da9b/identify-2.6.15-py2.py3-none-any.whl", hash = "sha256:1181ef7608e00704db228516541eb83a88a9f94433a8c80bb9b5bd54b1d81757", size = 99183, upload-time = "2025-10-02T17:43:39.137Z" },
]
[[package]]
@@ -2598,7 +2598,7 @@ wheels = [
[[package]]
name = "mcp"
version = "1.15.0"
version = "1.16.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
@@ -2613,9 +2613,9 @@ dependencies = [
{ name = "starlette", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "uvicorn", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/0c/9e/e65114795f359f314d7061f4fcb50dfe60026b01b52ad0b986b4631bf8bb/mcp-1.15.0.tar.gz", hash = "sha256:5bda1f4d383cf539d3c035b3505a3de94b20dbd7e4e8b4bd071e14634eeb2d72", size = 469622, upload-time = "2025-09-25T15:39:51.995Z" }
sdist = { url = "https://files.pythonhosted.org/packages/3d/a1/b1f328da3b153683d2ec34f849b4b6eac2790fb240e3aef06ff2fab3df9d/mcp-1.16.0.tar.gz", hash = "sha256:39b8ca25460c578ee2cdad33feeea122694cfdf73eef58bee76c42f6ef0589df", size = 472918, upload-time = "2025-10-02T16:58:20.631Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c9/82/4d0df23d5ff5bb982a59ad597bc7cb9920f2650278ccefb8e0d85c5ce3d4/mcp-1.15.0-py3-none-any.whl", hash = "sha256:314614c8addc67b663d6c3e4054db0a5c3dedc416c24ef8ce954e203fdc2333d", size = 166963, upload-time = "2025-09-25T15:39:50.538Z" },
{ url = "https://files.pythonhosted.org/packages/c9/0e/7cebc88e17daf94ebe28c95633af595ccb2864dc2ee7abd75542d98495cc/mcp-1.16.0-py3-none-any.whl", hash = "sha256:ec917be9a5d31b09ba331e1768aa576e0af45470d657a0319996a20a57d7d633", size = 167266, upload-time = "2025-10-02T16:58:19.039Z" },
]
[package.optional-dependencies]
@@ -3750,15 +3750,15 @@ wheels = [
[[package]]
name = "plotly"
version = "6.3.0"
version = "6.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "narwhals", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "packaging", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a0/64/850de5076f4436410e1ce4f6a69f4313ef6215dfea155f3f6559335cad29/plotly-6.3.0.tar.gz", hash = "sha256:8840a184d18ccae0f9189c2b9a2943923fd5cae7717b723f36eef78f444e5a73", size = 6923926, upload-time = "2025-08-12T20:22:14.127Z" }
sdist = { url = "https://files.pythonhosted.org/packages/0c/63/961d47c9ffd592a575495891cdcf7875dc0903ebb33ac238935714213789/plotly-6.3.1.tar.gz", hash = "sha256:dd896e3d940e653a7ce0470087e82c2bd903969a55e30d1b01bb389319461bb0", size = 6956460, upload-time = "2025-10-02T16:10:34.16Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/95/a9/12e2dc726ba1ba775a2c6922d5d5b4488ad60bdab0888c337c194c8e6de8/plotly-6.3.0-py3-none-any.whl", hash = "sha256:7ad806edce9d3cdd882eaebaf97c0c9e252043ed1ed3d382c3e3520ec07806d4", size = 9791257, upload-time = "2025-08-12T20:22:09.205Z" },
{ url = "https://files.pythonhosted.org/packages/3f/93/023955c26b0ce614342d11cc0652f1e45e32393b6ab9d11a664a60e9b7b7/plotly-6.3.1-py3-none-any.whl", hash = "sha256:8b4420d1dcf2b040f5983eed433f95732ed24930e496d36eb70d211923532e64", size = 9833698, upload-time = "2025-10-02T16:10:22.584Z" },
]
[[package]]
@@ -4781,28 +4781,28 @@ wheels = [
[[package]]
name = "ruff"
version = "0.13.2"
version = "0.13.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/02/df/8d7d8c515d33adfc540e2edf6c6021ea1c5a58a678d8cfce9fae59aabcab/ruff-0.13.2.tar.gz", hash = "sha256:cb12fffd32fb16d32cef4ed16d8c7cdc27ed7c944eaa98d99d01ab7ab0b710ff", size = 5416417, upload-time = "2025-09-25T14:54:09.936Z" }
sdist = { url = "https://files.pythonhosted.org/packages/c7/8e/f9f9ca747fea8e3ac954e3690d4698c9737c23b51731d02df999c150b1c9/ruff-0.13.3.tar.gz", hash = "sha256:5b0ba0db740eefdfbcce4299f49e9eaefc643d4d007749d77d047c2bab19908e", size = 5438533, upload-time = "2025-10-02T19:29:31.582Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6e/84/5716a7fa4758e41bf70e603e13637c42cfb9dbf7ceb07180211b9bbf75ef/ruff-0.13.2-py3-none-linux_armv6l.whl", hash = "sha256:3796345842b55f033a78285e4f1641078f902020d8450cade03aad01bffd81c3", size = 12343254, upload-time = "2025-09-25T14:53:27.784Z" },
{ url = "https://files.pythonhosted.org/packages/9b/77/c7042582401bb9ac8eff25360e9335e901d7a1c0749a2b28ba4ecb239991/ruff-0.13.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:ff7e4dda12e683e9709ac89e2dd436abf31a4d8a8fc3d89656231ed808e231d2", size = 13040891, upload-time = "2025-09-25T14:53:31.38Z" },
{ url = "https://files.pythonhosted.org/packages/c6/15/125a7f76eb295cb34d19c6778e3a82ace33730ad4e6f28d3427e134a02e0/ruff-0.13.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:c75e9d2a2fafd1fdd895d0e7e24b44355984affdde1c412a6f6d3f6e16b22d46", size = 12243588, upload-time = "2025-09-25T14:53:33.543Z" },
{ url = "https://files.pythonhosted.org/packages/9e/eb/0093ae04a70f81f8be7fd7ed6456e926b65d238fc122311293d033fdf91e/ruff-0.13.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cceac74e7bbc53ed7d15d1042ffe7b6577bf294611ad90393bf9b2a0f0ec7cb6", size = 12491359, upload-time = "2025-09-25T14:53:35.892Z" },
{ url = "https://files.pythonhosted.org/packages/43/fe/72b525948a6956f07dad4a6f122336b6a05f2e3fd27471cea612349fedb9/ruff-0.13.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:6ae3f469b5465ba6d9721383ae9d49310c19b452a161b57507764d7ef15f4b07", size = 12162486, upload-time = "2025-09-25T14:53:38.171Z" },
{ url = "https://files.pythonhosted.org/packages/6a/e3/0fac422bbbfb2ea838023e0d9fcf1f30183d83ab2482800e2cb892d02dfe/ruff-0.13.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4f8f9e3cd6714358238cd6626b9d43026ed19c0c018376ac1ef3c3a04ffb42d8", size = 13871203, upload-time = "2025-09-25T14:53:41.943Z" },
{ url = "https://files.pythonhosted.org/packages/6b/82/b721c8e3ec5df6d83ba0e45dcf00892c4f98b325256c42c38ef136496cbf/ruff-0.13.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:c6ed79584a8f6cbe2e5d7dbacf7cc1ee29cbdb5df1172e77fbdadc8bb85a1f89", size = 14929635, upload-time = "2025-09-25T14:53:43.953Z" },
{ url = "https://files.pythonhosted.org/packages/c4/a0/ad56faf6daa507b83079a1ad7a11694b87d61e6bf01c66bd82b466f21821/ruff-0.13.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:aed130b2fde049cea2019f55deb939103123cdd191105f97a0599a3e753d61b0", size = 14338783, upload-time = "2025-09-25T14:53:46.205Z" },
{ url = "https://files.pythonhosted.org/packages/47/77/ad1d9156db8f99cd01ee7e29d74b34050e8075a8438e589121fcd25c4b08/ruff-0.13.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1887c230c2c9d65ed1b4e4cfe4d255577ea28b718ae226c348ae68df958191aa", size = 13355322, upload-time = "2025-09-25T14:53:48.164Z" },
{ url = "https://files.pythonhosted.org/packages/64/8b/e87cfca2be6f8b9f41f0bb12dc48c6455e2d66df46fe61bb441a226f1089/ruff-0.13.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5bcb10276b69b3cfea3a102ca119ffe5c6ba3901e20e60cf9efb53fa417633c3", size = 13354427, upload-time = "2025-09-25T14:53:50.486Z" },
{ url = "https://files.pythonhosted.org/packages/7f/df/bf382f3fbead082a575edb860897287f42b1b3c694bafa16bc9904c11ed3/ruff-0.13.2-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:afa721017aa55a555b2ff7944816587f1cb813c2c0a882d158f59b832da1660d", size = 13537637, upload-time = "2025-09-25T14:53:52.887Z" },
{ url = "https://files.pythonhosted.org/packages/51/70/1fb7a7c8a6fc8bd15636288a46e209e81913b87988f26e1913d0851e54f4/ruff-0.13.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:1dbc875cf3720c64b3990fef8939334e74cb0ca65b8dbc61d1f439201a38101b", size = 12340025, upload-time = "2025-09-25T14:53:54.88Z" },
{ url = "https://files.pythonhosted.org/packages/4c/27/1e5b3f1c23ca5dd4106d9d580e5c13d9acb70288bff614b3d7b638378cc9/ruff-0.13.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:5b939a1b2a960e9742e9a347e5bbc9b3c3d2c716f86c6ae273d9cbd64f193f22", size = 12133449, upload-time = "2025-09-25T14:53:57.089Z" },
{ url = "https://files.pythonhosted.org/packages/2d/09/b92a5ccee289f11ab128df57d5911224197d8d55ef3bd2043534ff72ca54/ruff-0.13.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:50e2d52acb8de3804fc5f6e2fa3ae9bdc6812410a9e46837e673ad1f90a18736", size = 13051369, upload-time = "2025-09-25T14:53:59.124Z" },
{ url = "https://files.pythonhosted.org/packages/89/99/26c9d1c7d8150f45e346dc045cc49f23e961efceb4a70c47dea0960dea9a/ruff-0.13.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:3196bc13ab2110c176b9a4ae5ff7ab676faaa1964b330a1383ba20e1e19645f2", size = 13523644, upload-time = "2025-09-25T14:54:01.622Z" },
{ url = "https://files.pythonhosted.org/packages/f7/00/e7f1501e81e8ec290e79527827af1d88f541d8d26151751b46108978dade/ruff-0.13.2-py3-none-win32.whl", hash = "sha256:7c2a0b7c1e87795fec3404a485096bcd790216c7c146a922d121d8b9c8f1aaac", size = 12245990, upload-time = "2025-09-25T14:54:03.647Z" },
{ url = "https://files.pythonhosted.org/packages/ee/bd/d9f33a73de84fafd0146c6fba4f497c4565fe8fa8b46874b8e438869abc2/ruff-0.13.2-py3-none-win_amd64.whl", hash = "sha256:17d95fb32218357c89355f6f6f9a804133e404fc1f65694372e02a557edf8585", size = 13324004, upload-time = "2025-09-25T14:54:06.05Z" },
{ url = "https://files.pythonhosted.org/packages/c3/12/28fa2f597a605884deb0f65c1b1ae05111051b2a7030f5d8a4ff7f4599ba/ruff-0.13.2-py3-none-win_arm64.whl", hash = "sha256:da711b14c530412c827219312b7d7fbb4877fb31150083add7e8c5336549cea7", size = 12484437, upload-time = "2025-09-25T14:54:08.022Z" },
{ url = "https://files.pythonhosted.org/packages/d2/33/8f7163553481466a92656d35dea9331095122bb84cf98210bef597dd2ecd/ruff-0.13.3-py3-none-linux_armv6l.whl", hash = "sha256:311860a4c5e19189c89d035638f500c1e191d283d0cc2f1600c8c80d6dcd430c", size = 12484040, upload-time = "2025-10-02T19:28:49.199Z" },
{ url = "https://files.pythonhosted.org/packages/b0/b5/4a21a4922e5dd6845e91896b0d9ef493574cbe061ef7d00a73c61db531af/ruff-0.13.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:2bdad6512fb666b40fcadb65e33add2b040fc18a24997d2e47fee7d66f7fcae2", size = 13122975, upload-time = "2025-10-02T19:28:52.446Z" },
{ url = "https://files.pythonhosted.org/packages/40/90/15649af836d88c9f154e5be87e64ae7d2b1baa5a3ef317cb0c8fafcd882d/ruff-0.13.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:fc6fa4637284708d6ed4e5e970d52fc3b76a557d7b4e85a53013d9d201d93286", size = 12346621, upload-time = "2025-10-02T19:28:54.712Z" },
{ url = "https://files.pythonhosted.org/packages/a5/42/bcbccb8141305f9a6d3f72549dd82d1134299177cc7eaf832599700f95a7/ruff-0.13.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1c9e6469864f94a98f412f20ea143d547e4c652f45e44f369d7b74ee78185838", size = 12574408, upload-time = "2025-10-02T19:28:56.679Z" },
{ url = "https://files.pythonhosted.org/packages/ce/19/0f3681c941cdcfa2d110ce4515624c07a964dc315d3100d889fcad3bfc9e/ruff-0.13.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:5bf62b705f319476c78891e0e97e965b21db468b3c999086de8ffb0d40fd2822", size = 12285330, upload-time = "2025-10-02T19:28:58.79Z" },
{ url = "https://files.pythonhosted.org/packages/10/f8/387976bf00d126b907bbd7725219257feea58650e6b055b29b224d8cb731/ruff-0.13.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:78cc1abed87ce40cb07ee0667ce99dbc766c9f519eabfd948ed87295d8737c60", size = 13980815, upload-time = "2025-10-02T19:29:01.577Z" },
{ url = "https://files.pythonhosted.org/packages/0c/a6/7c8ec09d62d5a406e2b17d159e4817b63c945a8b9188a771193b7e1cc0b5/ruff-0.13.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4fb75e7c402d504f7a9a259e0442b96403fa4a7310ffe3588d11d7e170d2b1e3", size = 14987733, upload-time = "2025-10-02T19:29:04.036Z" },
{ url = "https://files.pythonhosted.org/packages/97/e5/f403a60a12258e0fd0c2195341cfa170726f254c788673495d86ab5a9a9d/ruff-0.13.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:17b951f9d9afb39330b2bdd2dd144ce1c1335881c277837ac1b50bfd99985ed3", size = 14439848, upload-time = "2025-10-02T19:29:06.684Z" },
{ url = "https://files.pythonhosted.org/packages/39/49/3de381343e89364c2334c9f3268b0349dc734fc18b2d99a302d0935c8345/ruff-0.13.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6052f8088728898e0a449f0dde8fafc7ed47e4d878168b211977e3e7e854f662", size = 13421890, upload-time = "2025-10-02T19:29:08.767Z" },
{ url = "https://files.pythonhosted.org/packages/ab/b5/c0feca27d45ae74185a6bacc399f5d8920ab82df2d732a17213fb86a2c4c/ruff-0.13.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc742c50f4ba72ce2a3be362bd359aef7d0d302bf7637a6f942eaa763bd292af", size = 13444870, upload-time = "2025-10-02T19:29:11.234Z" },
{ url = "https://files.pythonhosted.org/packages/50/a1/b655298a1f3fda4fdc7340c3f671a4b260b009068fbeb3e4e151e9e3e1bf/ruff-0.13.3-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:8e5640349493b378431637019366bbd73c927e515c9c1babfea3e932f5e68e1d", size = 13691599, upload-time = "2025-10-02T19:29:13.353Z" },
{ url = "https://files.pythonhosted.org/packages/32/b0/a8705065b2dafae007bcae21354e6e2e832e03eb077bb6c8e523c2becb92/ruff-0.13.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:6b139f638a80eae7073c691a5dd8d581e0ba319540be97c343d60fb12949c8d0", size = 12421893, upload-time = "2025-10-02T19:29:15.668Z" },
{ url = "https://files.pythonhosted.org/packages/0d/1e/cbe7082588d025cddbb2f23e6dfef08b1a2ef6d6f8328584ad3015b5cebd/ruff-0.13.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:6b547def0a40054825de7cfa341039ebdfa51f3d4bfa6a0772940ed351d2746c", size = 12267220, upload-time = "2025-10-02T19:29:17.583Z" },
{ url = "https://files.pythonhosted.org/packages/a5/99/4086f9c43f85e0755996d09bdcb334b6fee9b1eabdf34e7d8b877fadf964/ruff-0.13.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:9cc48a3564423915c93573f1981d57d101e617839bef38504f85f3677b3a0a3e", size = 13177818, upload-time = "2025-10-02T19:29:19.943Z" },
{ url = "https://files.pythonhosted.org/packages/9b/de/7b5db7e39947d9dc1c5f9f17b838ad6e680527d45288eeb568e860467010/ruff-0.13.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1a993b17ec03719c502881cb2d5f91771e8742f2ca6de740034433a97c561989", size = 13618715, upload-time = "2025-10-02T19:29:22.527Z" },
{ url = "https://files.pythonhosted.org/packages/28/d3/bb25ee567ce2f61ac52430cf99f446b0e6d49bdfa4188699ad005fdd16aa/ruff-0.13.3-py3-none-win32.whl", hash = "sha256:f14e0d1fe6460f07814d03c6e32e815bff411505178a1f539a38f6097d3e8ee3", size = 12334488, upload-time = "2025-10-02T19:29:24.782Z" },
{ url = "https://files.pythonhosted.org/packages/cf/49/12f5955818a1139eed288753479ba9d996f6ea0b101784bb1fe6977ec128/ruff-0.13.3-py3-none-win_amd64.whl", hash = "sha256:621e2e5812b691d4f244638d693e640f188bacbb9bc793ddd46837cea0503dd2", size = 13455262, upload-time = "2025-10-02T19:29:26.882Z" },
{ url = "https://files.pythonhosted.org/packages/fe/72/7b83242b26627a00e3af70d0394d68f8f02750d642567af12983031777fc/ruff-0.13.3-py3-none-win_arm64.whl", hash = "sha256:9e9e9d699841eaf4c2c798fa783df2fabc680b72059a02ca0ed81c460bc58330", size = 12538484, upload-time = "2025-10-02T19:29:28.951Z" },
]
[[package]]