Python: Added fixes and more examples for Foundry agent (#217)

* Added non-streaming and streaming examples

* Updated resource management

* Added examples with thread management

* Added function tools examples

* Small rename

* Added code interpreter example

* Updated example

* Addressed PR feedback

* Addressed PR feedback
This commit is contained in:
Dmytro Struk
2025-07-24 07:47:45 -07:00
committed by GitHub
Unverified
parent 62be887947
commit 14efb626ab
7 changed files with 409 additions and 19 deletions
@@ -2,6 +2,7 @@
import contextlib
import json
import sys
from collections.abc import AsyncIterable, MutableMapping, MutableSequence
from typing import Any, ClassVar
@@ -19,6 +20,7 @@ from agent_framework import (
DataContent,
FunctionCallContent,
FunctionResultContent,
HostedCodeInterpreterTool,
TextContent,
UriContent,
UsageContent,
@@ -34,6 +36,7 @@ from azure.ai.agents.models import (
AgentStreamEvent,
AsyncAgentEventHandler,
AsyncAgentRunStream,
CodeInterpreterToolDefinition,
FunctionName,
ListSortOrder,
MessageDeltaChunk,
@@ -56,6 +59,11 @@ from azure.ai.projects.aio import AIProjectClient
from azure.core.credentials_async import AsyncTokenCredential
from pydantic import Field, PrivateAttr, ValidationError
if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
else:
from typing_extensions import Self # pragma: no cover
class FoundrySettings(AFBaseSettings):
"""Foundry model settings.
@@ -85,9 +93,12 @@ class FoundrySettings(AFBaseSettings):
@use_tool_calling
class FoundryChatClient(ChatClientBase):
client: AIProjectClient = Field(...)
credential: AsyncTokenCredential | None = Field(...)
agent_id: str | None = Field(default=None)
thread_id: str | None = Field(default=None)
_should_delete_agent: bool = PrivateAttr(default=False) # Track whether we should delete the agent
_should_close_client: bool = PrivateAttr(default=False) # Track whether we should close client connection
_should_close_credential: bool = PrivateAttr(default=False) # Track whether we should close credential
_foundry_settings: FoundrySettings = PrivateAttr()
def __init__(
@@ -133,6 +144,8 @@ class FoundryChatClient(ChatClientBase):
raise ServiceInitializationError("Failed to create Foundry settings.", ex) from ex
# If no client is provided, create one
should_close_client = False
should_close_credential = False
if client is None:
if not foundry_settings.project_endpoint:
raise ServiceInitializationError("Project endpoint is required when client is not provided.")
@@ -145,20 +158,25 @@ class FoundryChatClient(ChatClientBase):
from azure.identity.aio import DefaultAzureCredential
credential = DefaultAzureCredential()
should_close_credential = True
client = AIProjectClient(endpoint=foundry_settings.project_endpoint, credential=credential)
should_close_client = True
super().__init__(
client=client, # type: ignore[reportCallIssue]
credential=credential, # type: ignore[reportCallIssue]
agent_id=agent_id, # type: ignore[reportCallIssue]
thread_id=thread_id, # type: ignore[reportCallIssue]
**kwargs,
)
self._should_delete_agent = False
self._should_close_client = should_close_client
self._should_close_credential = should_close_credential
self._foundry_settings = foundry_settings
async def __aenter__(self) -> "FoundryChatClient":
async def __aenter__(self) -> "Self":
"""Async context manager entry."""
return self
@@ -169,6 +187,8 @@ class FoundryChatClient(ChatClientBase):
async def close(self) -> None:
"""Close the client and clean up any agents we created."""
await self._cleanup_agent_if_needed()
await self._close_client_if_needed()
await self._close_credential_if_needed()
@classmethod
def from_dict(cls, settings: dict[str, Any]) -> "FoundryChatClient":
@@ -432,6 +452,16 @@ class FoundryChatClient(ChatClientBase):
return contents
async def _close_credential_if_needed(self) -> None:
"""Close credential if we created it."""
if self._should_close_credential and self.credential is not None:
await self.credential.close()
async def _close_client_if_needed(self) -> None:
"""Close client session if we created it."""
if self._should_close_client:
await self.client.close()
async def _cleanup_agent_if_needed(self) -> None:
"""Clean up the agent if we created it."""
if self._should_delete_agent and self.agent_id is not None:
@@ -454,20 +484,20 @@ class FoundryChatClient(ChatClientBase):
run_options["temperature"] = chat_options.temperature
run_options["parallel_tool_calls"] = chat_options.allow_multiple_tool_calls
if chat_options.tools is not None:
# TODO (eavanvalkenburg): replace with _prepare_tools_and_tool_choice overload
if chat_options.tool_choice is not None:
tool_definitions: list[MutableMapping[str, Any]] = []
for tool in chat_options.tools:
if isinstance(tool, AIFunction):
tool_definitions.append(ai_function_to_json_schema_spec(tool))
else:
tool_definitions.append(tool) # type: ignore
if chat_options.tool_choice != "none" and chat_options.tools is not None:
for tool in chat_options.tools:
if isinstance(tool, AIFunction):
tool_definitions.append(ai_function_to_json_schema_spec(tool))
elif isinstance(tool, HostedCodeInterpreterTool):
tool_definitions.append(CodeInterpreterToolDefinition())
elif isinstance(tool, MutableMapping):
tool_definitions.append(tool)
if len(tool_definitions) > 0:
run_options["tools"] = tool_definitions
if chat_options.tool_choice is not None:
if chat_options.tool_choice == "none":
run_options["tool_choice"] = AgentsToolChoiceOptionMode.NONE
elif chat_options.tool_choice == "auto":
@@ -453,6 +453,7 @@ class ChatClientAgent(AgentBase):
chat_options=self.chat_options
& ChatOptions(
ai_model_id=model,
conversation_id=thread.id,
frequency_penalty=frequency_penalty,
logit_bias=logit_bias,
max_tokens=max_tokens,
@@ -561,6 +562,7 @@ class ChatClientAgent(AgentBase):
messages=thread_messages,
chat_options=self.chat_options
& ChatOptions(
conversation_id=thread.id,
frequency_penalty=frequency_penalty,
logit_bias=logit_bias,
max_tokens=max_tokens,
+31 -6
View File
@@ -1,13 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
import inspect
from collections.abc import Awaitable, Callable, Mapping
from collections.abc import Awaitable, Callable
from functools import wraps
from typing import Any, Generic, Protocol, TypeVar, runtime_checkable
from pydantic import BaseModel, create_model
__all__ = ["AIFunction", "AITool", "ai_function"]
__all__ = ["AIFunction", "AITool", "HostedCodeInterpreterTool", "ai_function"]
@runtime_checkable
@@ -34,10 +34,6 @@ class AITool(Protocol):
"""Return a string representation of the tool."""
...
def parameters(self) -> Mapping[str, Any]:
"""Return the parameters of the tool as a JSON schema."""
...
ArgsT = TypeVar("ArgsT", bound=BaseModel)
ReturnT = TypeVar("ReturnT")
@@ -159,3 +155,32 @@ def ai_function(
return wrapper(func)
return decorator(func) if func else decorator # type: ignore[reportReturnType, return-value]
class HostedCodeInterpreterTool(AITool):
"""Represents a hosted tool that can be specified to an AI service to enable it to execute generated code.
This tool does not implement code interpretation itself. It serves as a marker to inform a service
that it is allowed to execute generated code if the service is capable of doing so.
"""
def __init__(
self,
name: str = "code_interpreter",
description: str | None = None,
additional_properties: dict[str, Any] | None = None,
):
"""Initialize a HostedCodeInterpreterTool.
Args:
name: The name of the tool. Defaults to "code_interpreter".
description: A description of the tool.
additional_properties: Additional properties associated with the tool, specific to the service used.
"""
self.name = name
self.description = description
self.additional_properties = additional_properties
def __str__(self) -> str:
"""Return a string representation of the tool."""
return f"HostedCodeInterpreterTool(name={self.name})"
@@ -17,8 +17,9 @@ def get_weather(
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main() -> None:
print("=== Basic Foundry Chat Client Example ===")
async def non_streaming_example() -> None:
"""Example of non-streaming response (get the complete result at once)."""
print("=== Non-streaming Response Example ===")
# Since no Agent ID is provided, the agent will be automatically created
# and deleted after getting a response
@@ -27,9 +28,38 @@ async def main() -> None:
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
result = await agent.run("What's the weather like in Seattle?")
query = "What's the weather like in Seattle?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Result: {result}\n")
async def streaming_example() -> None:
"""Example of streaming response (get results as they are generated)."""
print("=== Streaming Response Example ===")
# Since no Agent ID is provided, the agent will be automatically created
# and deleted after getting a response
async with ChatClientAgent(
chat_client=FoundryChatClient(),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
query = "What's the weather like in Portland?"
print(f"User: {query}")
print("Assistant: ", end="", flush=True)
async for chunk in agent.run_stream(query):
if chunk.text:
print(chunk.text, end="", flush=True)
print("\n")
async def main() -> None:
print("=== Basic Foundry Chat Client Example ===")
await non_streaming_example()
await streaming_example()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,58 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import AgentRunResponseUpdate, ChatClientAgent, HostedCodeInterpreterTool
from agent_framework.foundry import FoundryChatClient
from azure.ai.agents.models import (
RunStepDelta,
RunStepDeltaChunk,
RunStepDeltaCodeInterpreterDetailItemObject,
RunStepDeltaCodeInterpreterToolCall,
RunStepDeltaToolCallObject,
)
def get_code_interpreter_chunk(chunk: AgentRunResponseUpdate) -> str | None:
"""Helper method to access code interpreter data."""
if (
isinstance(chunk.raw_representation, RunStepDeltaChunk)
and isinstance(chunk.raw_representation.delta, RunStepDelta)
and isinstance(chunk.raw_representation.delta.step_details, RunStepDeltaToolCallObject)
and chunk.raw_representation.delta.step_details.tool_calls
):
for tool_call in chunk.raw_representation.delta.step_details.tool_calls:
if (
isinstance(tool_call, RunStepDeltaCodeInterpreterToolCall)
and isinstance(tool_call.code_interpreter, RunStepDeltaCodeInterpreterDetailItemObject)
and tool_call.code_interpreter.input is not None
):
return tool_call.code_interpreter.input
return None
async def main() -> None:
"""Example showing how to use the HostedCodeInterpreterTool with Foundry."""
print("=== Foundry Chat Client with Code Interpreter Example ===")
async with ChatClientAgent(
chat_client=FoundryChatClient(),
instructions="You are a helpful assistant that can write and execute Python code to solve problems.",
tools=HostedCodeInterpreterTool(),
) as agent:
query = "What is current datetime?"
print(f"User: {query}")
print("Assistant: ", end="", flush=True)
generated_code = ""
async for chunk in agent.run_stream(query):
if chunk.text:
print(chunk.text, end="", flush=True)
code_interpreter_chunk = get_code_interpreter_chunk(chunk)
if code_interpreter_chunk is not None:
generated_code += code_interpreter_chunk
print(f"\nGenerated code:\n{generated_code}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,117 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from datetime import datetime, timezone
from random import randint
from typing import Annotated
from agent_framework import ChatClientAgent
from agent_framework.foundry import FoundryChatClient
from pydantic import Field
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
def get_time() -> str:
"""Get the current UTC time."""
current_time = datetime.now(timezone.utc)
return f"The current UTC time is {current_time.strftime('%Y-%m-%d %H:%M:%S')}."
async def tools_on_agent_level() -> None:
"""Example showing tools defined when creating the agent."""
print("=== Tools Defined on Agent Level ===")
# Tools are provided when creating the agent
# The agent can use these tools for any query during its lifetime
async with ChatClientAgent(
chat_client=FoundryChatClient(),
instructions="You are a helpful assistant that can provide weather and time information.",
tools=[get_weather, get_time], # Tools defined at agent creation
) as agent:
# First query - agent can use weather tool
query1 = "What's the weather like in New York?"
print(f"User: {query1}")
result1 = await agent.run(query1)
print(f"Assistant: {result1}\n")
# Second query - agent can use time tool
query2 = "What's the current UTC time?"
print(f"User: {query2}")
result2 = await agent.run(query2)
print(f"Assistant: {result2}\n")
# Third query - agent can use both tools if needed
query3 = "What's the weather in London and what's the current UTC time?"
print(f"User: {query3}")
result3 = await agent.run(query3)
print(f"Assistant: {result3}\n")
async def tools_on_run_level() -> None:
"""Example showing tools passed to the run method."""
print("=== Tools Passed to Run Method ===")
# Agent created without tools
async with ChatClientAgent(
chat_client=FoundryChatClient(),
instructions="You are a helpful assistant.",
# No tools defined here
) as agent:
# First query with weather tool
query1 = "What's the weather like in Seattle?"
print(f"User: {query1}")
result1 = await agent.run(query1, tools=[get_weather]) # Tool passed to run method
print(f"Assistant: {result1}\n")
# Second query with time tool
query2 = "What's the current UTC time?"
print(f"User: {query2}")
result2 = await agent.run(query2, tools=[get_time]) # Different tool for this query
print(f"Assistant: {result2}\n")
# Third query with multiple tools
query3 = "What's the weather in Chicago and what's the current UTC time?"
print(f"User: {query3}")
result3 = await agent.run(query3, tools=[get_weather, get_time]) # Multiple tools
print(f"Assistant: {result3}\n")
async def mixed_tools_example() -> None:
"""Example showing both agent-level tools and run-method tools."""
print("=== Mixed Tools Example (Agent + Run Method) ===")
# Agent created with some base tools
async with ChatClientAgent(
chat_client=FoundryChatClient(),
instructions="You are a comprehensive assistant that can help with various information requests.",
tools=[get_weather], # Base tool available for all queries
) as agent:
# Query using both agent tool and additional run-method tools
query = "What's the weather in Denver and what's the current UTC time?"
print(f"User: {query}")
# Agent has access to get_weather (from creation) + additional tools from run method
result = await agent.run(
query,
tools=[get_time], # Additional tools for this specific query
)
print(f"Assistant: {result}\n")
async def main() -> None:
print("=== Foundry Chat Client Agent with Function Tools Examples ===\n")
await tools_on_agent_level()
await tools_on_run_level()
await mixed_tools_example()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,128 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from random import randint
from typing import Annotated
from agent_framework import ChatClientAgent, ChatClientAgentThread
from agent_framework.foundry import FoundryChatClient
from pydantic import Field
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def example_with_automatic_thread_creation() -> None:
"""Example showing automatic thread creation (service-managed thread)."""
print("=== Automatic Thread Creation Example ===")
async with ChatClientAgent(
chat_client=FoundryChatClient(),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
# First conversation - no thread provided, will be created automatically
query1 = "What's the weather like in Seattle?"
print(f"User: {query1}")
result1 = await agent.run(query1)
print(f"Assistant: {result1.text}")
# Second conversation - still no thread provided, will create another new thread
query2 = "What was the last city I asked about?"
print(f"\nUser: {query2}")
result2 = await agent.run(query2)
print(f"Assistant: {result2.text}")
print("Note: Each call creates a separate thread, so the agent doesn't remember previous context.\n")
async def example_with_thread_persistence() -> None:
"""Example showing thread persistence across multiple conversations."""
print("=== Thread Persistence Example ===")
print("Using the same thread across multiple conversations to maintain context.\n")
async with ChatClientAgent(
chat_client=FoundryChatClient(),
instructions="You are a helpful weather agent. Remember previous cities asked about.",
tools=get_weather,
) as agent:
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First conversation
query1 = "What's the weather like in Tokyo?"
print(f"User: {query1}")
result1 = await agent.run(query1, thread=thread)
print(f"Assistant: {result1.text}")
# Second conversation using the same thread - maintains context
query2 = "How about comparing it to London?"
print(f"\nUser: {query2}")
result2 = await agent.run(query2, thread=thread)
print(f"Assistant: {result2.text}")
# Third conversation - agent should remember both previous cities
query3 = "Which of the cities I asked about has better weather?"
print(f"\nUser: {query3}")
result3 = await agent.run(query3, thread=thread)
print(f"Assistant: {result3.text}")
print("Note: The agent remembers context from previous messages in the same thread.\n")
async def example_with_existing_thread_id() -> None:
"""Example showing how to work with an existing thread ID from the service."""
print("=== Existing Thread ID Example ===")
print("Using a specific thread ID to continue an existing conversation.\n")
# First, create a conversation and capture the thread ID
existing_thread_id = None
async with ChatClientAgent(
chat_client=FoundryChatClient(),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
# Start a conversation and get the thread ID
thread = agent.get_new_thread()
query1 = "What's the weather in Paris?"
print(f"User: {query1}")
result1 = await agent.run(query1, thread=thread)
print(f"Assistant: {result1.text}")
# The thread ID is set after the first response
existing_thread_id = thread.id
print(f"Thread ID: {existing_thread_id}")
if existing_thread_id:
print("\n--- Continuing with the same thread ID in a new agent instance ---")
# Create a new agent instance but use the existing thread ID
async with ChatClientAgent(
chat_client=FoundryChatClient(thread_id=existing_thread_id),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
# Create a thread with the existing ID
thread = ChatClientAgentThread(id=existing_thread_id)
query2 = "What was the last city I asked about?"
print(f"User: {query2}")
result2 = await agent.run(query2, thread=thread)
print(f"Assistant: {result2.text}")
print("Note: The agent continues the conversation from the previous thread.\n")
async def main() -> None:
print("=== Foundry Chat Client Agent Thread Management Examples ===\n")
await example_with_automatic_thread_creation()
await example_with_thread_persistence()
await example_with_existing_thread_id()
if __name__ == "__main__":
asyncio.run(main())