Python: additional Foundry Tools (#611)

* initial work on additional foundry tools

* fixes

* fix tests

* fix import

* updated lock

* added hosted MCP for foundry

* fixes

* fix for test

* updated samples

* fix result parsing
This commit is contained in:
Eduard van Valkenburg
2025-09-23 13:20:20 +02:00
committed by GitHub
Unverified
parent 3571a7d321
commit 9eb68ab2f9
9 changed files with 642 additions and 232 deletions
@@ -1,6 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.
import json
import os
import sys
from collections.abc import AsyncIterable, MutableMapping, MutableSequence
from typing import Any, ClassVar, TypeVar
@@ -16,14 +17,23 @@ from agent_framework import (
ChatToolMode,
Contents,
DataContent,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
FunctionCallContent,
FunctionResultContent,
HostedCodeInterpreterTool,
HostedFileContent,
HostedFileSearchTool,
HostedMCPTool,
HostedVectorStoreContent,
HostedWebSearchTool,
Role,
TextContent,
ToolProtocol,
UriContent,
UsageContent,
UsageDetails,
get_logger,
use_function_invocation,
)
from agent_framework._pydantic import AFBaseSettings
@@ -36,10 +46,16 @@ from azure.ai.agents.models import (
AgentStreamEvent,
AsyncAgentEventHandler,
AsyncAgentRunStream,
AzureAISearchQueryType,
AzureAISearchTool,
BingCustomSearchTool,
BingGroundingTool,
CodeInterpreterToolDefinition,
FileSearchTool,
FunctionName,
FunctionToolOutput,
ListSortOrder,
McpTool,
MessageDeltaChunk,
MessageImageUrlParam,
MessageInputContentBlock,
@@ -47,19 +63,28 @@ from azure.ai.agents.models import (
MessageInputTextBlock,
MessageRole,
RequiredFunctionToolCall,
RequiredMcpToolCall,
ResponseFormatJsonSchema,
ResponseFormatJsonSchemaType,
RunError,
RunStatus,
RunStep,
RunStepDeltaChunk,
RunStepDeltaCodeInterpreterDetailItemObject,
RunStepDeltaCodeInterpreterImageOutput,
RunStepDeltaCodeInterpreterLogOutput,
SubmitToolApprovalAction,
SubmitToolOutputsAction,
ThreadMessageOptions,
ThreadRun,
ToolApproval,
ToolDefinition,
ToolOutput,
)
from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import ConnectionType
from azure.core.credentials_async import AsyncTokenCredential
from pydantic import Field, PrivateAttr, ValidationError
from azure.core.exceptions import HttpResponseError
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
@@ -67,6 +92,9 @@ else:
from typing_extensions import Self # pragma: no cover
logger = get_logger("agent_framework.foundry")
class FoundrySettings(AFBaseSettings):
"""Foundry model settings.
@@ -255,7 +283,7 @@ class FoundryChatClient(BaseChatClient):
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
# Extract necessary state from messages and options
run_options, tool_results = self._create_run_options(messages, chat_options, **kwargs)
run_options, required_action_results = await self._create_run_options(messages, chat_options, **kwargs)
# Get the thread ID
thread_id: str | None = (
@@ -264,17 +292,16 @@ class FoundryChatClient(BaseChatClient):
else run_options.get("conversation_id", self.thread_id)
)
if thread_id is None and tool_results is not None:
if thread_id is None and required_action_results is not None:
raise ValueError("No thread ID was provided, but chat messages includes tool results.")
# Determine which agent to use and create if needed
agent_id = await self._get_agent_id_or_create(run_options)
# Create the streaming response
stream, thread_id = await self._create_agent_stream(thread_id, agent_id, run_options, tool_results)
# Process and yield each update from the stream
async for update in self._process_stream_events(stream, thread_id):
async for update in self._process_stream(
*(await self._create_agent_stream(thread_id, agent_id, run_options, required_action_results))
):
yield update
async def _get_agent_id_or_create(self, run_options: dict[str, Any] | None = None) -> str:
@@ -308,7 +335,7 @@ class FoundryChatClient(BaseChatClient):
thread_id: str | None,
agent_id: str,
run_options: dict[str, Any],
tool_results: list[FunctionResultContent] | None,
required_action_results: list[FunctionResultContent | FunctionApprovalResponseContent] | None,
) -> tuple[AsyncAgentRunStream[AsyncAgentEventHandler[Any]] | AsyncAgentEventHandler[Any], str]:
"""Create the agent stream for processing.
@@ -320,13 +347,27 @@ class FoundryChatClient(BaseChatClient):
stream: AsyncAgentRunStream[AsyncAgentEventHandler[Any]] | AsyncAgentEventHandler[Any]
handler: AsyncAgentEventHandler[Any] = AsyncAgentEventHandler()
tool_run_id, tool_outputs = self._convert_function_results_to_tool_output(tool_results)
tool_run_id, tool_outputs, tool_approvals = self._convert_required_action_to_tool_output(
required_action_results
)
if thread_run is not None and tool_run_id is not None and tool_run_id == thread_run.id and tool_outputs:
if (
thread_run is not None
and tool_run_id is not None
and tool_run_id == thread_run.id
and (tool_outputs or tool_approvals)
): # type: ignore[reportUnknownMemberType]
# There's an active run and we have tool results to submit, so submit the results.
await self.client.agents.runs.submit_tool_outputs_stream( # type: ignore[reportUnknownMemberType]
thread_run.thread_id, tool_run_id, tool_outputs=tool_outputs, event_handler=handler
)
args: dict[str, Any] = {
"thread_id": thread_run.thread_id,
"run_id": tool_run_id,
"event_handler": handler,
}
if tool_outputs:
args["tool_outputs"] = tool_outputs
if tool_approvals:
args["tool_approvals"] = tool_approvals
await self.client.agents.runs.submit_tool_outputs_stream(**args) # type: ignore[reportUnknownMemberType]
# Pass the handler to the stream to continue processing
stream = handler # type: ignore
final_thread_id = thread_run.thread_id
@@ -384,116 +425,186 @@ class FoundryChatClient(BaseChatClient):
# and remove until here.
return thread_id
async def _process_stream_events(
self,
stream: AsyncAgentRunStream[AsyncAgentEventHandler[Any]] | AsyncAgentEventHandler[Any],
thread_id: str,
) -> AsyncIterable[ChatResponseUpdate]:
"""Process events from the agent stream and yield ChatResponseUpdate objects."""
# Use 'async with' only if the stream supports async context management (main agent stream).
# Tool output handlers only support async iteration, not context management.
if isinstance(stream, AsyncAgentRunStream):
async with stream as response_stream: # type: ignore
async for update in self._process_stream_events_from_iterator(response_stream, thread_id):
yield update
else:
async for update in self._process_stream_events_from_iterator(stream, thread_id):
yield update
async def _process_stream_events_from_iterator(
self, stream_iter: AsyncAgentEventHandler[Any], thread_id: str
async def _process_stream(
self, stream: AsyncAgentRunStream[AsyncAgentEventHandler[Any]] | AsyncAgentEventHandler[Any], thread_id: str
) -> AsyncIterable[ChatResponseUpdate]:
"""Process events from the stream iterator and yield ChatResponseUpdate objects."""
response_id: str | None = None
async for event_type, event_data, _ in stream_iter: # type: ignore
if event_type == AgentStreamEvent.THREAD_RUN_CREATED and isinstance(event_data, ThreadRun):
yield ChatResponseUpdate(
contents=[],
conversation_id=event_data.thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
role=Role.ASSISTANT,
ai_model_id=event_data.model,
)
elif event_type == AgentStreamEvent.THREAD_RUN_STEP_CREATED and isinstance(event_data, RunStep):
response_id = event_data.run_id
elif event_type == AgentStreamEvent.THREAD_MESSAGE_DELTA and isinstance(event_data, MessageDeltaChunk):
role = Role.USER if event_data.delta.role == MessageRole.USER else Role.ASSISTANT
yield ChatResponseUpdate(
role=role,
text=event_data.text,
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
)
elif (
event_type == AgentStreamEvent.THREAD_RUN_REQUIRES_ACTION
and isinstance(event_data, ThreadRun)
and isinstance(event_data.required_action, SubmitToolOutputsAction)
):
contents = self._create_function_call_contents(event_data, response_id)
if contents:
yield ChatResponseUpdate(
role=Role.ASSISTANT,
contents=contents,
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
)
elif (
event_type in [AgentStreamEvent.THREAD_RUN_COMPLETED, AgentStreamEvent.THREAD_RUN_STEP_COMPLETED]
and isinstance(event_data, RunStep)
and event_data.usage is not None
):
usage_content = UsageContent(
UsageDetails(
input_token_count=event_data.usage.prompt_tokens,
output_token_count=event_data.usage.completion_tokens,
total_token_count=event_data.usage.total_tokens,
)
)
yield ChatResponseUpdate(
role=Role.ASSISTANT,
contents=[usage_content],
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
)
elif (
event_type == AgentStreamEvent.THREAD_RUN_FAILED
and isinstance(event_data, ThreadRun)
and isinstance(event_data.last_error, RunError)
):
raise ServiceResponseException(event_data.last_error.message)
else:
yield ChatResponseUpdate(
contents=[],
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data, # type: ignore
response_id=response_id,
role=Role.ASSISTANT,
)
response_stream = await stream.__aenter__() if isinstance(stream, AsyncAgentRunStream) else stream # type: ignore[no-untyped-call]
try:
async for event_type, event_data, _ in response_stream: # type: ignore
match event_data:
case MessageDeltaChunk():
# only one event_type: AgentStreamEvent.THREAD_MESSAGE_DELTA
role = Role.USER if event_data.delta.role == MessageRole.USER else Role.ASSISTANT
yield ChatResponseUpdate(
role=role,
text=event_data.text,
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
)
case ThreadRun():
# possible event_types:
# AgentStreamEvent.THREAD_RUN_CREATED
# AgentStreamEvent.THREAD_RUN_QUEUED
# AgentStreamEvent.THREAD_RUN_INCOMPLETE
# AgentStreamEvent.THREAD_RUN_IN_PROGRESS
# AgentStreamEvent.THREAD_RUN_REQUIRES_ACTION
# AgentStreamEvent.THREAD_RUN_COMPLETED
# AgentStreamEvent.THREAD_RUN_FAILED
# AgentStreamEvent.THREAD_RUN_CANCELLING
# AgentStreamEvent.THREAD_RUN_CANCELLED
# AgentStreamEvent.THREAD_RUN_EXPIRED
match event_type:
case AgentStreamEvent.THREAD_RUN_REQUIRES_ACTION:
if event_data.required_action and event_data.required_action.type in [
"submit_tool_outputs",
"submit_tool_approval",
]:
contents = self._create_function_call_contents(event_data, response_id)
if contents:
yield ChatResponseUpdate(
role=Role.ASSISTANT,
contents=contents,
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
)
case AgentStreamEvent.THREAD_RUN_FAILED:
raise ServiceResponseException(event_data.last_error.message)
case _:
yield ChatResponseUpdate(
contents=[],
conversation_id=event_data.thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
role=Role.ASSISTANT,
ai_model_id=event_data.model,
)
case RunStep():
# possible event_types:
# AgentStreamEvent.THREAD_RUN_STEP_CREATED,
# AgentStreamEvent.THREAD_RUN_STEP_IN_PROGRESS,
# AgentStreamEvent.THREAD_RUN_STEP_COMPLETED,
# AgentStreamEvent.THREAD_RUN_STEP_FAILED,
# AgentStreamEvent.THREAD_RUN_STEP_CANCELLED,
# AgentStreamEvent.THREAD_RUN_STEP_EXPIRED,
match event_type:
case AgentStreamEvent.THREAD_RUN_STEP_CREATED:
response_id = event_data.run_id
case AgentStreamEvent.THREAD_RUN_COMPLETED | AgentStreamEvent.THREAD_RUN_STEP_COMPLETED:
if event_data.usage:
usage_content = UsageContent(
UsageDetails(
input_token_count=event_data.usage.prompt_tokens,
output_token_count=event_data.usage.completion_tokens,
total_token_count=event_data.usage.total_tokens,
)
)
yield ChatResponseUpdate(
role=Role.ASSISTANT,
contents=[usage_content],
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
)
case _:
yield ChatResponseUpdate(
contents=[],
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data,
response_id=response_id,
role=Role.ASSISTANT,
)
case RunStepDeltaChunk(): # type: ignore
if (
event_data.delta.step_details is not None
and event_data.delta.step_details.type == "tool_calls"
and event_data.delta.step_details.tool_calls is not None # type: ignore[attr-defined]
):
for tool_call in event_data.delta.step_details.tool_calls: # type: ignore[attr-defined]
if tool_call.type == "code_interpreter" and isinstance(
tool_call.code_interpreter,
RunStepDeltaCodeInterpreterDetailItemObject,
):
contents = []
if tool_call.code_interpreter.input is not None:
logger.debug(f"Code Interpreter Input: {tool_call.code_interpreter.input}")
if tool_call.code_interpreter.outputs is not None:
for output in tool_call.code_interpreter.outputs:
if isinstance(output, RunStepDeltaCodeInterpreterLogOutput) and output.logs:
contents.append(TextContent(text=output.logs))
if (
isinstance(output, RunStepDeltaCodeInterpreterImageOutput)
and output.image is not None
and output.image.file_id is not None
):
contents.append(HostedFileContent(file_id=output.image.file_id))
yield ChatResponseUpdate(
role=Role.ASSISTANT,
contents=contents,
conversation_id=thread_id,
message_id=response_id,
raw_representation=tool_call.code_interpreter,
response_id=response_id,
)
case _: # ThreadMessage or string
# possible event_types for ThreadMessage:
# AgentStreamEvent.THREAD_MESSAGE_CREATED
# AgentStreamEvent.THREAD_MESSAGE_IN_PROGRESS
# AgentStreamEvent.THREAD_MESSAGE_COMPLETED
# AgentStreamEvent.THREAD_MESSAGE_INCOMPLETE
yield ChatResponseUpdate(
contents=[],
conversation_id=thread_id,
message_id=response_id,
raw_representation=event_data, # type: ignore
response_id=response_id,
role=Role.ASSISTANT,
)
except Exception as ex:
logger.error(f"Error processing stream: {ex}")
raise
finally:
if isinstance(stream, AsyncAgentRunStream):
await stream.__aexit__(None, None, None) # type: ignore[no-untyped-call]
def _create_function_call_contents(self, event_data: ThreadRun, response_id: str | None) -> list[Contents]:
"""Create function call contents from a tool action event."""
contents: list[Contents] = []
if isinstance(event_data, ThreadRun) and isinstance(event_data.required_action, SubmitToolOutputsAction):
for tool_call in event_data.required_action.submit_tool_outputs.tool_calls:
if isinstance(tool_call, RequiredFunctionToolCall):
contents.append(
FunctionCallContent(
call_id=f'["{response_id}", "{tool_call.id}"]',
name=tool_call.function.name,
arguments=tool_call.function.arguments,
)
if isinstance(event_data, ThreadRun) and event_data.required_action is not None:
if isinstance(event_data.required_action, SubmitToolOutputsAction):
return [
FunctionCallContent(
call_id=f'["{response_id}", "{tool.id}"]',
name=tool.function.name,
arguments=tool.function.arguments,
)
return contents
for tool in event_data.required_action.submit_tool_outputs.tool_calls
if isinstance(tool, RequiredFunctionToolCall)
]
if isinstance(event_data.required_action, SubmitToolApprovalAction):
return [
FunctionApprovalRequestContent(
id=f'["{response_id}", "{tool.id}"]',
function_call=FunctionCallContent(
call_id=f'["{response_id}", "{tool.id}"]',
name=tool.name,
arguments=tool.arguments,
raw_representation=tool,
),
raw_representation=tool,
)
for tool in event_data.required_action.submit_tool_approval.tool_calls
if isinstance(tool, RequiredMcpToolCall)
]
return []
async def _close_client_if_needed(self) -> None:
"""Close client session if we created it."""
@@ -507,12 +618,12 @@ class FoundryChatClient(BaseChatClient):
self.agent_id = None
self._should_delete_agent = False
def _create_run_options(
async def _create_run_options(
self,
messages: MutableSequence[ChatMessage],
chat_options: ChatOptions | None,
**kwargs: Any,
) -> tuple[dict[str, Any], list[FunctionResultContent] | None]:
) -> tuple[dict[str, Any], list[FunctionResultContent | FunctionApprovalResponseContent] | None]:
run_options: dict[str, Any] = {**kwargs}
if chat_options is not None:
@@ -523,18 +634,10 @@ class FoundryChatClient(BaseChatClient):
run_options["parallel_tool_calls"] = chat_options.allow_multiple_tool_calls
if chat_options.tool_choice is not None:
tool_definitions: list[MutableMapping[str, Any]] = []
if chat_options.tool_choice != "none" and chat_options.tools is not None:
for tool in chat_options.tools:
if isinstance(tool, AIFunction):
tool_definitions.append(tool.to_json_schema_spec()) # type: ignore[reportUnknownArgumentType]
elif isinstance(tool, HostedCodeInterpreterTool):
tool_definitions.append(CodeInterpreterToolDefinition())
elif isinstance(tool, MutableMapping):
tool_definitions.append(tool)
if len(tool_definitions) > 0:
run_options["tools"] = tool_definitions
if chat_options.tool_choice != "none" and chat_options.tools:
tool_definitions = await self._prep_tools(chat_options.tools)
if tool_definitions:
run_options["tools"] = tool_definitions
if chat_options.tool_choice == "none":
run_options["tool_choice"] = AgentsToolChoiceOptionMode.NONE
@@ -559,7 +662,7 @@ class FoundryChatClient(BaseChatClient):
)
instructions: list[str] = []
tool_results: list[FunctionResultContent] | None = None
required_action_results: list[FunctionResultContent | FunctionApprovalResponseContent] | None = None
additional_messages: list[ThreadMessageOptions] | None = None
@@ -580,10 +683,10 @@ class FoundryChatClient(BaseChatClient):
message_contents.append(MessageInputTextBlock(text=content.text))
elif isinstance(content, (DataContent, UriContent)) and content.has_top_level_media_type("image"):
message_contents.append(MessageInputImageUrlBlock(image_url=MessageImageUrlParam(url=content.uri)))
elif isinstance(content, FunctionResultContent):
if tool_results is None:
tool_results = []
tool_results.append(content)
elif isinstance(content, (FunctionResultContent, FunctionApprovalResponseContent)):
if required_action_results is None:
required_action_results = []
required_action_results.append(content)
elif isinstance(content.raw_representation, MessageInputContentBlock):
message_contents.append(content.raw_representation)
@@ -603,21 +706,137 @@ class FoundryChatClient(BaseChatClient):
if len(instructions) > 0:
run_options["instructions"] = "".join(instructions)
return run_options, tool_results
return run_options, required_action_results
def _convert_function_results_to_tool_output(
async def _prep_tools(
self, tools: list["ToolProtocol | MutableMapping[str, Any]"]
) -> list[ToolDefinition | dict[str, Any]]:
"""Prepare tool definitions for the run options."""
tool_definitions: list[ToolDefinition | dict[str, Any]] = []
for tool in tools:
match tool:
case AIFunction():
tool_definitions.append(tool.to_json_schema_spec()) # type: ignore[reportUnknownArgumentType]
case HostedWebSearchTool():
additional_props = tool.additional_properties or {}
config_args: dict[str, Any] = {}
if count := additional_props.get("count"):
config_args["count"] = count
if freshness := additional_props.get("freshness"):
config_args["freshness"] = freshness
if market := additional_props.get("market"):
config_args["market"] = market
if set_lang := additional_props.get("set_lang"):
config_args["set_lang"] = set_lang
# Bing Grounding
connection_id = additional_props.get("connection_id") or os.getenv("BING_CONNECTION_ID")
# Custom Bing Search
custom_connection_name = additional_props.get("custom_connection_name") or os.getenv(
"BING_CUSTOM_CONNECTION_NAME"
)
custom_configuration_name = additional_props.get("custom_instance_name") or os.getenv(
"BING_CUSTOM_INSTANCE_NAME"
)
bing_search: BingGroundingTool | BingCustomSearchTool | None = None
if connection_id and not custom_connection_name and not custom_configuration_name:
bing_search = BingGroundingTool(connection_id=connection_id, **config_args)
if custom_connection_name and custom_configuration_name:
try:
bing_custom_connection = await self.client.connections.get(name=custom_connection_name)
except HttpResponseError as err:
raise ServiceInitializationError(
f"Bing custom connection '{custom_connection_name}' not found in Foundry.", err
) from err
else:
bing_search = BingCustomSearchTool(
connection_id=bing_custom_connection.id,
instance_name=custom_configuration_name,
**config_args,
)
if not bing_search:
raise ServiceInitializationError(
"Bing search tool requires either a 'connection_id' for Bing Grounding "
"or both 'custom_connection_name' and 'custom_instance_name' for Custom Bing Search. "
"These can be provided via the tool's additional_properties or environment variables: "
"'BING_CONNECTION_ID', 'BING_CUSTOM_CONNECTION_NAME', 'BING_CUSTOM_INSTANCE_NAME'"
)
tool_definitions.extend(bing_search.definitions)
case HostedCodeInterpreterTool():
tool_definitions.append(CodeInterpreterToolDefinition())
case HostedMCPTool():
tool_definitions.extend(
McpTool(
server_label=tool.name.replace(" ", "_"),
server_url=str(tool.url),
allowed_tools=list(tool.allowed_tools) if tool.allowed_tools else [],
).definitions
)
case HostedFileSearchTool():
vector_stores = [inp for inp in tool.inputs or [] if isinstance(inp, HostedVectorStoreContent)]
if vector_stores:
file_search = FileSearchTool(vector_store_ids=[vs.vector_store_id for vs in vector_stores])
tool_definitions.extend(file_search.definitions)
else:
additional_props = tool.additional_properties or {}
index_name = additional_props.get("index_name") or os.getenv("AZURE_AI_SEARCH_INDEX_NAME")
if not index_name:
raise ServiceInitializationError(
"File search tool requires at least one vector store input, for file search in Foundry "
"or an 'index_name' to use Azure AI Search, "
"in additional_properties or environment variable 'AZURE_AI_SEARCH_INDEX_NAME'."
)
try:
azs_conn_id = await self.client.connections.get_default(ConnectionType.AZURE_AI_SEARCH)
except HttpResponseError as err:
raise ServiceInitializationError(
"No default Azure AI Search connection found in Foundry. "
"Please create one or provide vector store inputs for the file search tool.",
err,
) from err
else:
query_type_enum = AzureAISearchQueryType.SIMPLE
if query_type := additional_props.get("query_type"):
try:
query_type_enum = AzureAISearchQueryType(query_type)
except ValueError as ex:
raise ServiceInitializationError(
f"Invalid query_type '{query_type}' for Azure AI Search. "
f"Valid values are: {[qt.value for qt in AzureAISearchQueryType]}",
ex,
) from ex
ai_search = AzureAISearchTool(
index_connection_id=azs_conn_id.id,
index_name=index_name,
query_type=query_type_enum,
top_k=additional_props.get("top_k", 3),
filter=additional_props.get("filter", ""),
)
tool_definitions.extend(ai_search.definitions)
case dict():
tool_definitions.append(tool)
case _:
raise ServiceInitializationError(f"Unsupported tool type: {type(tool)}")
return tool_definitions
def _convert_required_action_to_tool_output(
self,
tool_results: list[FunctionResultContent] | None,
) -> tuple[str | None, list[ToolOutput] | None]:
required_action_results: list[FunctionResultContent | FunctionApprovalResponseContent] | None,
) -> tuple[str | None, list[ToolOutput] | None, list[ToolApproval] | None]:
run_id: str | None = None
tool_outputs: list[ToolOutput] | None = None
tool_approvals: list[ToolApproval] | None = None
if tool_results:
for function_result_content in tool_results:
# When creating the FunctionCallContent, we created it with a CallId == [runId, callId].
# We need to extract the run ID and ensure that the FunctionToolOutput we send back to Azure
if required_action_results:
for content in required_action_results:
# When creating the FunctionCallContent/ApprovalRequestContent,
# we created it with a CallId == [runId, callId].
# We need to extract the run ID and ensure that the Output/Approval we send back to Azure
# is only the call ID.
run_and_call_ids: list[str] = json.loads(function_result_content.call_id)
run_and_call_ids: list[str] = (
json.loads(content.call_id)
if isinstance(content, FunctionResultContent)
else json.loads(content.id)
)
if (
not run_and_call_ids
@@ -631,13 +850,28 @@ class FoundryChatClient(BaseChatClient):
run_id = run_and_call_ids[0]
call_id = run_and_call_ids[1]
if tool_outputs is None:
tool_outputs = []
tool_outputs.append(
FunctionToolOutput(tool_call_id=call_id, output=str(function_result_content.result))
)
if isinstance(content, FunctionResultContent):
if tool_outputs is None:
tool_outputs = []
result_contents: list[Any] = ( # type: ignore
content.result if isinstance(content.result, list) else [content.result] # type: ignore
)
results: list[Any] = []
for item in result_contents:
if isinstance(item, BaseModel):
results.append(item.model_dump_json())
else:
results.append(json.dumps(item))
if len(results) == 1:
tool_outputs.append(FunctionToolOutput(tool_call_id=call_id, output=results[0]))
else:
tool_outputs.append(FunctionToolOutput(tool_call_id=call_id, output=json.dumps(results)))
elif isinstance(content, FunctionApprovalResponseContent):
if tool_approvals is None:
tool_approvals = []
tool_approvals.append(ToolApproval(tool_call_id=call_id, approve=content.approved))
return run_id, tool_outputs
return run_id, tool_outputs, tool_approvals
def _update_agent_name(self, agent_name: str | None) -> None:
"""Update the agent name in the chat client.
@@ -372,32 +372,32 @@ async def test_foundry_chat_client_async_context_manager(mock_ai_project_client:
mock_ai_project_client.agents.delete_agent.assert_called_once_with("agent-to-delete")
def test_foundry_chat_client_create_run_options_basic(mock_ai_project_client: MagicMock) -> None:
async def test_foundry_chat_client_create_run_options_basic(mock_ai_project_client: MagicMock) -> None:
"""Test _create_run_options with basic ChatOptions."""
chat_client = create_test_foundry_chat_client(mock_ai_project_client)
messages = [ChatMessage(role=Role.USER, text="Hello")]
chat_options = ChatOptions(max_tokens=100, temperature=0.7)
run_options, tool_results = chat_client._create_run_options(messages, chat_options) # type: ignore
run_options, tool_results = await chat_client._create_run_options(messages, chat_options) # type: ignore
assert run_options is not None
assert tool_results is None
def test_foundry_chat_client_create_run_options_no_chat_options(mock_ai_project_client: MagicMock) -> None:
async def test_foundry_chat_client_create_run_options_no_chat_options(mock_ai_project_client: MagicMock) -> None:
"""Test _create_run_options with no ChatOptions."""
chat_client = create_test_foundry_chat_client(mock_ai_project_client)
messages = [ChatMessage(role=Role.USER, text="Hello")]
run_options, tool_results = chat_client._create_run_options(messages, None) # type: ignore
run_options, tool_results = await chat_client._create_run_options(messages, None) # type: ignore
assert run_options is not None
assert tool_results is None
def test_foundry_chat_client_create_run_options_with_image_content(mock_ai_project_client: MagicMock) -> None:
async def test_foundry_chat_client_create_run_options_with_image_content(mock_ai_project_client: MagicMock) -> None:
"""Test _create_run_options with image content."""
chat_client = create_test_foundry_chat_client(mock_ai_project_client, agent_id="test-agent")
@@ -405,7 +405,7 @@ def test_foundry_chat_client_create_run_options_with_image_content(mock_ai_proje
image_content = UriContent(uri="https://example.com/image.jpg", media_type="image/jpeg")
messages = [ChatMessage(role=Role.USER, contents=[image_content])]
run_options, _ = chat_client._create_run_options(messages, None) # type: ignore
run_options, _ = await chat_client._create_run_options(messages, None) # type: ignore
assert "additional_messages" in run_options
assert len(run_options["additional_messages"]) == 1
@@ -415,13 +415,14 @@ def test_foundry_chat_client_create_run_options_with_image_content(mock_ai_proje
def test_foundry_chat_client_convert_function_results_to_tool_output_none(mock_ai_project_client: MagicMock) -> None:
"""Test _convert_function_results_to_tool_output with None input."""
"""Test _convert_required_action_to_tool_output with None input."""
chat_client = create_test_foundry_chat_client(mock_ai_project_client)
run_id, tool_outputs = chat_client._convert_function_results_to_tool_output(None) # type: ignore
run_id, tool_outputs, tool_approvals = chat_client._convert_required_action_to_tool_output(None) # type: ignore
assert run_id is None
assert tool_outputs is None
assert tool_approvals is None
async def test_foundry_chat_client_close_client_when_should_close_true(mock_ai_project_client: MagicMock) -> None:
@@ -476,7 +477,7 @@ def test_foundry_chat_client_update_agent_name_with_none_input(mock_ai_project_c
assert chat_client.agent_name is None
def test_foundry_chat_client_create_run_options_with_messages(mock_ai_project_client: MagicMock) -> None:
async def test_foundry_chat_client_create_run_options_with_messages(mock_ai_project_client: MagicMock) -> None:
"""Test _create_run_options with different message types."""
chat_client = create_test_foundry_chat_client(mock_ai_project_client)
@@ -486,7 +487,7 @@ def test_foundry_chat_client_create_run_options_with_messages(mock_ai_project_cl
ChatMessage(role=Role.USER, text="Hello"),
]
run_options, _ = chat_client._create_run_options(messages, None) # type: ignore
run_options, _ = await chat_client._create_run_options(messages, None) # type: ignore
assert "instructions" in run_options
assert "You are a helpful assistant" in run_options["instructions"]
@@ -224,11 +224,18 @@ class HostedWebSearchTool(BaseTool):
additional_properties: Additional properties associated with the tool
(e.g., {"user_location": {"city": "Seattle", "country": "US"}}).
**kwargs: Additional keyword arguments to pass to the base class.
if additional_properties is not provided, any kwargs will be added to additional_properties.
"""
args: dict[str, Any] = {
"name": "web_search",
}
super().__init__(**args, **kwargs)
if additional_properties is not None:
args["additional_properties"] = additional_properties
elif kwargs:
args["additional_properties"] = kwargs
if description is not None:
args["description"] = description
super().__init__(**args)
class HostedMCPSpecificApproval(TypedDict, total=False):
@@ -273,7 +273,11 @@ def _process_update(
if response.additional_properties is None:
response.additional_properties = {}
response.additional_properties.update(update.additional_properties)
if response.raw_representation is None:
response.raw_representation = []
if not isinstance(response.raw_representation, list):
response.raw_representation = [response.raw_representation]
response.raw_representation.append(update.raw_representation)
if isinstance(response, ChatResponse) and isinstance(update, ChatResponseUpdate):
if update.conversation_id is not None:
response.conversation_id = update.conversation_id
@@ -347,7 +351,7 @@ class BaseAnnotation(AFBaseModel):
annotated_regions: list[AnnotatedRegions] | None = None
additional_properties: dict[str, Any] | None = None
raw_representation: Any | None = Field(default=None, repr=False)
raw_representation: Any | None = Field(default=None, repr=False, exclude=True)
class CitationAnnotation(BaseAnnotation):
@@ -11,7 +11,7 @@ from typing import TYPE_CHECKING, Any, ClassVar, Final, TypeVar
from opentelemetry import metrics, trace
from opentelemetry.semconv_ai import GenAISystem, Meters, SpanAttributes
from pydantic import PrivateAttr
from pydantic import BaseModel, PrivateAttr
from . import __version__ as version_info
from ._logging import get_logger
@@ -408,19 +408,14 @@ class OtelSettings(AFBaseSettings):
) -> None:
"""Setup telemetry based on the settings.
If both connection_string and otlp_endpoint both will be used.
Args:
credential: The credential to use for Azure Monitor Entra ID authentication. Default is None.
additional_exporters: A list of additional exporters to add to the configuration. Default is None.
force_setup: Force the setup to be executed even if it has already been executed. Default is False.
"""
if (not self.ENABLED and not self.ENABLED) or (self._executed_setup and not force_setup):
if (not self.ENABLED) or (self._executed_setup and not force_setup):
return
if not self.applicationinsights_connection_string and not self.otlp_endpoint and not additional_exporters:
logger.warning("Telemetry is enabled but no connection string or OTLP endpoint is provided.")
global_logger = logging.getLogger()
global_logger.setLevel(logging.NOTSET)
exporters: list["LogExporter | SpanExporter | MetricExporter"] = additional_exporters or []
@@ -639,11 +634,6 @@ def setup_observability(
these will be added directly, and allows you to customize the spans completely
"""
if isinstance(otlp_endpoint, str):
otlp_endpoint = [otlp_endpoint]
if isinstance(applicationinsights_connection_string, str):
applicationinsights_connection_string = [applicationinsights_connection_string]
global OTEL_SETTINGS
# Update the otel settings with the provided values
OTEL_SETTINGS.enable_otel = True
@@ -654,30 +644,36 @@ def setup_observability(
# Run the initial setup, which will create the providers, and add env setting exporters
new_exporters: list["LogExporter | SpanExporter | MetricExporter"] = []
if OTEL_SETTINGS.ENABLED and (otlp_endpoint or applicationinsights_connection_string or exporters):
# check if endpoints or connection strings are already configured
# create the exporters, after checking if they are already configured through the env.
new_exporters = exporters or []
if otlp_endpoint:
otlp_endpoint = [
endpoint for endpoint in otlp_endpoint if OTEL_SETTINGS.check_endpoint_already_configured(endpoint)
]
if applicationinsights_connection_string:
applicationinsights_connection_string = [
conn_str
for conn_str in applicationinsights_connection_string
if OTEL_SETTINGS.check_connection_string_already_configured(conn_str)
]
if otlp_endpoint or applicationinsights_connection_string or exporters:
new_exporters = exporters or []
if isinstance(otlp_endpoint, str):
otlp_endpoint = [otlp_endpoint]
new_exporters.extend(
get_exporters(
otlp_endpoints=otlp_endpoint,
connection_strings=applicationinsights_connection_string,
_get_otlp_exporters(
endpoints=[
endpoint
for endpoint in otlp_endpoint
if not OTEL_SETTINGS.check_endpoint_already_configured(endpoint)
]
)
)
if applicationinsights_connection_string:
if isinstance(applicationinsights_connection_string, str):
applicationinsights_connection_string = [applicationinsights_connection_string]
new_exporters.extend(
_get_azure_monitor_exporters(
connection_strings=[
conn_str
for conn_str in applicationinsights_connection_string
if not OTEL_SETTINGS.check_connection_string_already_configured(conn_str)
],
credential=credential,
)
)
OTEL_SETTINGS.setup_observability(
credential=credential, additional_exporters=new_exporters, force_setup=bool(new_exporters)
)
# Add any additional exporters
# region Chat Client Telemetry
@@ -1243,7 +1239,23 @@ def _to_otel_part(content: "Contents") -> dict[str, Any] | None:
case "function_call":
return {"type": "tool_call", "id": content.call_id, "name": content.name, "arguments": content.arguments}
case "function_result":
return {"type": "tool_call_response", "id": content.call_id, "response": content.result}
response: Any | None = None
if content.result:
if isinstance(content.result, list):
res: list[Any] = []
for item in content.result: # type: ignore
from ._types import BaseContent
if isinstance(item, BaseContent):
res.append(_to_otel_part(item)) # type: ignore
elif isinstance(item, BaseModel):
res.append(item.model_dump(exclude_none=True))
else:
res.append(json.dumps(item))
response = json.dumps(res)
else:
response = json.dumps(content.result)
return {"type": "tool_call_response", "id": content.call_id, "response": response}
case _:
# GenericPart in otel output messages json spec.
# just required type, and arbitrary other fields.
@@ -2,35 +2,30 @@
import asyncio
from agent_framework import AgentRunResponseUpdate, ChatAgent, ChatResponseUpdate, HostedCodeInterpreterTool
from agent_framework.foundry import FoundryChatClient
from azure.ai.agents.models import (
RunStepDelta,
RunStepDeltaChunk,
RunStepDeltaCodeInterpreterDetailItemObject,
RunStepDeltaCodeInterpreterToolCall,
RunStepDeltaToolCallObject,
from agent_framework import (
AgentRunResponse,
HostedCodeInterpreterTool,
)
from agent_framework.foundry import FoundryChatClient
from azure.identity.aio import AzureCliCredential
def get_code_interpreter_chunk(chunk: AgentRunResponseUpdate) -> str | None:
def print_code_interpreter_inputs(response: AgentRunResponse) -> None:
"""Helper method to access code interpreter data."""
if (
isinstance(chunk.raw_representation, ChatResponseUpdate)
and isinstance(chunk.raw_representation.raw_representation, RunStepDeltaChunk)
and isinstance(chunk.raw_representation.raw_representation.delta, RunStepDelta)
and isinstance(chunk.raw_representation.raw_representation.delta.step_details, RunStepDeltaToolCallObject)
and chunk.raw_representation.raw_representation.delta.step_details.tool_calls
):
for tool_call in chunk.raw_representation.raw_representation.delta.step_details.tool_calls:
if (
isinstance(tool_call, RunStepDeltaCodeInterpreterToolCall)
and isinstance(tool_call.code_interpreter, RunStepDeltaCodeInterpreterDetailItemObject)
and tool_call.code_interpreter.input is not None
):
return tool_call.code_interpreter.input
return None
from agent_framework import ChatResponseUpdate
from azure.ai.agents.models import (
RunStepDeltaCodeInterpreterDetailItemObject,
)
print("\nCode Interpreter Inputs during the run:")
if response.raw_representation is None:
return
for chunk in response.raw_representation:
if isinstance(chunk, ChatResponseUpdate) and isinstance(
chunk.raw_representation, RunStepDeltaCodeInterpreterDetailItemObject
):
print(chunk.raw_representation.input, end="")
print("\n")
async def main() -> None:
@@ -41,24 +36,19 @@ async def main() -> None:
# authentication option.
async with (
AzureCliCredential() as credential,
ChatAgent(
chat_client=FoundryChatClient(async_credential=credential),
FoundryChatClient(async_credential=credential) as chat_client,
):
agent = chat_client.create_agent(
name="CodingAgent",
instructions="You are a helpful assistant that can write and execute Python code to solve problems.",
tools=HostedCodeInterpreterTool(),
) as agent,
):
query = "Generate the factorial of 100 using python code."
)
query = "Generate the factorial of 100 using python code, show the code and execute it."
print(f"User: {query}")
print("Agent: ", end="", flush=True)
generated_code = ""
async for chunk in agent.run_stream(query):
if chunk.text:
print(chunk.text, end="", flush=True)
code_interpreter_chunk = get_code_interpreter_chunk(chunk)
if code_interpreter_chunk is not None:
generated_code += code_interpreter_chunk
print(f"\nGenerated code:\n{generated_code}")
response = await AgentRunResponse.from_agent_response_generator(agent.run_stream(query))
print(f"Agent: {response}")
# To review the code interpreter outputs, you can access them from the response raw_representations, just uncomment the next line:
# print_code_interpreter_inputs(response)
if __name__ == "__main__":
@@ -0,0 +1,65 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import AgentProtocol, AgentThread, HostedMCPTool
from agent_framework.foundry import FoundryChatClient
from azure.identity.aio import AzureCliCredential
async def handle_approvals_with_thread(query: str, agent: "AgentProtocol", thread: "AgentThread"):
"""Here we let the thread deal with the previous responses, and we just rerun with the approval."""
from agent_framework import ChatMessage
result = await agent.run(query, thread=thread, store=True)
while len(result.user_input_requests) > 0:
new_input: list[Any] = []
for user_input_needed in result.user_input_requests:
print(
f"User Input Request for function from {agent.name}: {user_input_needed.function_call.name}"
f" with arguments: {user_input_needed.function_call.arguments}"
)
user_approval = input("Approve function call? (y/n): ")
new_input.append(
ChatMessage(
role="user",
contents=[user_input_needed.create_response(user_approval.lower() == "y")],
)
)
result = await agent.run(new_input, thread=thread, store=True)
return result
async def main() -> None:
"""Example showing Hosted MCP tools for a Foundry Agent."""
async with (
AzureCliCredential() as credential,
FoundryChatClient(async_credential=credential) as chat_client,
):
# enable foundry observability
await chat_client.setup_foundry_observability()
agent = chat_client.create_agent(
name="DocsAgent",
instructions="You are a helpful assistant that can help with microsoft documentation questions.",
tools=HostedMCPTool(
name="Microsoft Learn MCP",
url="https://learn.microsoft.com/api/mcp",
),
)
thread = agent.get_new_thread()
# First query
query1 = "How to create an Azure storage account using az cli?"
print(f"User: {query1}")
result1 = await handle_approvals_with_thread(query1, agent, thread)
print(f"{agent.name}: {result1}\n")
print("\n=======================================\n")
# Second query
query2 = "What is Microsoft Semantic Kernel?"
print(f"User: {query2}")
result2 = await handle_approvals_with_thread(query2, agent, thread)
print(f"{agent.name}: {result2}\n")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,82 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from datetime import datetime, timezone
from typing import Any
from agent_framework import (
AgentProtocol,
AgentThread,
HostedMCPTool,
HostedWebSearchTool,
)
from agent_framework.foundry import FoundryChatClient
from azure.identity.aio import AzureCliCredential
def get_time() -> str:
"""Get the current UTC time."""
current_time = datetime.now(timezone.utc)
return f"The current UTC time is {current_time.strftime('%Y-%m-%d %H:%M:%S')}."
async def handle_approvals_with_thread(query: str, agent: "AgentProtocol", thread: "AgentThread"):
"""Here we let the thread deal with the previous responses, and we just rerun with the approval."""
from agent_framework import ChatMessage
result = await agent.run(query, thread=thread, store=True)
while len(result.user_input_requests) > 0:
new_input: list[Any] = []
for user_input_needed in result.user_input_requests:
print(
f"User Input Request for function from {agent.name}: {user_input_needed.function_call.name}"
f" with arguments: {user_input_needed.function_call.arguments}"
)
user_approval = input("Approve function call? (y/n): ")
new_input.append(
ChatMessage(
role="user",
contents=[user_input_needed.create_response(user_approval.lower() == "y")],
)
)
result = await agent.run(new_input, thread=thread, store=True)
return result
async def main() -> None:
"""Example showing Hosted MCP tools for a Foundry Agent."""
async with (
AzureCliCredential() as credential,
FoundryChatClient(async_credential=credential) as chat_client,
):
# enable foundry observability
await chat_client.setup_foundry_observability()
agent = chat_client.create_agent(
name="DocsAgent",
instructions="You are a helpful assistant that can help with microsoft documentation questions.",
tools=[
HostedMCPTool(
name="Microsoft Learn MCP",
url="https://learn.microsoft.com/api/mcp",
),
# needs BING_CONNECTION_ID set in the env
HostedWebSearchTool(count=5),
get_time,
],
)
thread = agent.get_new_thread()
# First query
query1 = "How to create an Azure storage account using az cli and what time is it?"
print(f"User: {query1}")
result1 = await handle_approvals_with_thread(query1, agent, thread)
print(f"{agent.name}: {result1}\n")
print("\n=======================================\n")
# Second query
query2 = "What is Microsoft Semantic Kernel and use a web search to see what is Reddit saying about it?"
print(f"User: {query2}")
result2 = await handle_approvals_with_thread(query2, agent, thread)
print(f"{agent.name}: {result2}\n")
if __name__ == "__main__":
asyncio.run(main())
+15
View File
@@ -3066,6 +3066,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/07/7f/88079bc3e4aa188d78692328453f906dca35fa9f286623af13df0b0a1ead/opentelemetry_instrumentation_flask-0.58b0-py3-none-any.whl", hash = "sha256:b0d57ad4db7bd0177ddf8c7ae3adf8bd90e2ebfa2dd30884c6a97c97197e4ac5", size = 14685, upload-time = "2025-09-11T11:41:30.02Z" },
]
[[package]]
name = "opentelemetry-instrumentation-openai"
version = "0.47.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "opentelemetry-instrumentation", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "opentelemetry-semantic-conventions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "opentelemetry-semantic-conventions-ai", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/76/46/3e59891b433869c731131b5ececdaa3ee1d4a45d19169299b52d1397c8e5/opentelemetry_instrumentation_openai-0.47.1.tar.gz", hash = "sha256:a9ad8d898f8f03581fff1764b605d2c277381ced3083a66e7bddb951b95afb2a", size = 25411, upload-time = "2025-09-14T12:09:19.202Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/af/4e/ee98a8e9b9d58425ff84aff63d06fdf004727cdac2cfe3a2916a1869a2e2/opentelemetry_instrumentation_openai-0.47.1-py3-none-any.whl", hash = "sha256:2bc426c1324f7e9babee8d2a02d7966562ec993da5d280c597bd29a92997e2ed", size = 35274, upload-time = "2025-09-14T12:08:48.987Z" },
]
[[package]]
name = "opentelemetry-instrumentation-psycopg2"
version = "0.58b0"