Python: Azure OpenAI Assistants Chat Client and Agent (#300)

* Initial version of assistant client

* More updates to assistant client

* Finished assistant chat client implementation

* Small fixes and basic example

* Added code interpreter example

* More examples

* Added chat client example

* Small fixes

* Added tests

* Enabled telemetry

* Small fix

* Removed files temporarily

* Revert "Removed files temporarily"

This reverts commit 5cdfa0d299.

* Small fixes

* Addressed PR feedback

* Fixed tests

* Small update

* Added Azure assistants client and examples

* Added tests

* Small fix
This commit is contained in:
Dmytro Struk
2025-08-05 07:51:36 -07:00
committed by GitHub
Unverified
parent 98cf962b72
commit c1d306ec95
12 changed files with 979 additions and 2 deletions
@@ -2,6 +2,7 @@
import importlib.metadata
from ._assistants_client import AzureAssistantsClient
from ._chat_client import AzureChatClient
from ._entra_id_authentication import get_entra_auth_token
from ._shared import AzureOpenAISettings
@@ -12,6 +13,7 @@ except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0" # Fallback for development mode
__all__ = [
"AzureAssistantsClient",
"AzureChatClient",
"AzureOpenAISettings",
"__version__",
@@ -0,0 +1,132 @@
# Copyright (c) Microsoft. All rights reserved.
from collections.abc import Mapping
from typing import Any, ClassVar
from agent_framework.exceptions import ServiceInitializationError
from agent_framework.openai import OpenAIAssistantsClient
from openai.lib.azure import AsyncAzureADTokenProvider, AsyncAzureOpenAI
from pydantic import SecretStr, ValidationError
from pydantic.networks import AnyUrl
from ._shared import (
DEFAULT_AZURE_TOKEN_ENDPOINT,
AzureOpenAISettings,
)
__all__ = ["AzureAssistantsClient"]
class AzureAssistantsClient(OpenAIAssistantsClient):
"""Azure OpenAI Assistants client."""
DEFAULT_AZURE_API_VERSION: ClassVar[str] = "2024-05-01-preview"
MODEL_PROVIDER_NAME: ClassVar[str] = "azure_openai" # type: ignore[reportIncompatibleVariableOverride, misc]
def __init__(
self,
deployment_name: str | None = None,
assistant_id: str | None = None,
assistant_name: str | None = None,
thread_id: str | None = None,
api_key: str | None = None,
endpoint: str | None = None,
base_url: str | None = None,
api_version: str | None = None,
ad_token: str | None = None,
ad_token_provider: AsyncAzureADTokenProvider | None = None,
token_endpoint: str | None = None,
default_headers: Mapping[str, str] | None = None,
async_client: AsyncAzureOpenAI | None = None,
env_file_path: str | None = None,
env_file_encoding: str | None = None,
) -> None:
"""Initialize an Azure OpenAI Assistants client.
Args:
deployment_name: The Azure OpenAI deployment name for the model to use.
assistant_id: The ID of an Azure OpenAI assistant to use.
If not provided, a new assistant will be created (and deleted after the request).
assistant_name: The name to use when creating new assistants.
thread_id: Default thread ID to use for conversations. Can be overridden by
conversation_id property, when making a request.
If not provided, a new thread will be created (and deleted after the request).
api_key: The optional API key to use. If provided will override,
the env vars or .env file value.
endpoint: The optional deployment endpoint. If provided will override the value
in the env vars or .env file.
base_url: The optional deployment base_url. If provided will override the value
in the env vars or .env file.
api_version: The optional deployment api version. If provided will override the value
in the env vars or .env file.
ad_token: The Azure Active Directory token. (Optional)
ad_token_provider: The Azure Active Directory token provider. (Optional)
token_endpoint: The token endpoint to request an Azure token. (Optional)
default_headers: The default headers mapping of string keys to
string values for HTTP requests. (Optional)
async_client: An existing client to use. (Optional)
env_file_path: Use the environment settings file as a fallback
to environment variables. (Optional)
env_file_encoding: The encoding of the environment settings file. (Optional)
"""
try:
azure_openai_settings = AzureOpenAISettings(
api_key=SecretStr(api_key) if api_key else None,
base_url=AnyUrl(base_url) if base_url else None,
endpoint=AnyUrl(endpoint) if endpoint else None,
chat_deployment_name=deployment_name,
api_version=api_version or self.DEFAULT_AZURE_API_VERSION,
env_file_path=env_file_path,
env_file_encoding=env_file_encoding,
token_endpoint=token_endpoint or DEFAULT_AZURE_TOKEN_ENDPOINT,
)
except ValidationError as ex:
raise ServiceInitializationError("Failed to create Azure OpenAI settings.", ex) from ex
if not azure_openai_settings.chat_deployment_name:
raise ServiceInitializationError("The Azure OpenAI deployment name is required.")
# Handle authentication: try API key first, then AD token, then Entra ID
if (
not async_client
and not azure_openai_settings.api_key
and not ad_token
and not ad_token_provider
and azure_openai_settings.token_endpoint
):
# Try to get token using Entra ID if no other auth method is provided
from ._entra_id_authentication import get_entra_auth_token
ad_token = get_entra_auth_token(azure_openai_settings.token_endpoint)
if not async_client and not azure_openai_settings.api_key and not ad_token and not ad_token_provider:
raise ServiceInitializationError("The Azure OpenAI API key, ad_token, or ad_token_provider is required.")
# Create Azure client if not provided
if not async_client:
client_params: dict[str, Any] = {
"api_version": azure_openai_settings.api_version,
"default_headers": default_headers,
}
if azure_openai_settings.api_key:
client_params["api_key"] = azure_openai_settings.api_key.get_secret_value()
elif ad_token:
client_params["azure_ad_token"] = ad_token
elif ad_token_provider:
client_params["azure_ad_token_provider"] = ad_token_provider
if azure_openai_settings.base_url:
client_params["base_url"] = str(azure_openai_settings.base_url)
elif azure_openai_settings.endpoint:
client_params["azure_endpoint"] = str(azure_openai_settings.endpoint)
async_client = AsyncAzureOpenAI(**client_params)
super().__init__(
ai_model_id=azure_openai_settings.chat_deployment_name,
assistant_id=assistant_id,
assistant_name=assistant_name,
thread_id=thread_id,
async_client=async_client, # type: ignore[reportArgumentType]
)
@@ -0,0 +1,382 @@
# Copyright (c) Microsoft. All rights reserved.
import os
from typing import Annotated
from unittest.mock import AsyncMock, MagicMock
import pytest
from agent_framework import (
ChatClient,
ChatMessage,
ChatResponse,
ChatResponseUpdate,
TextContent,
)
from agent_framework.exceptions import ServiceInitializationError
from pydantic import Field
from agent_framework_azure import AzureAssistantsClient
skip_if_azure_integration_tests_disabled = pytest.mark.skipif(
os.getenv("RUN_INTEGRATION_TESTS", "false").lower() != "true"
or os.getenv("AZURE_OPENAI_ENDPOINT", "") in ("", "https://test-endpoint.com"),
reason="No real AZURE_OPENAI_ENDPOINT provided; skipping integration tests."
if os.getenv("RUN_INTEGRATION_TESTS", "false").lower() == "true"
else "Integration tests are disabled.",
)
def create_test_azure_assistants_client(
mock_async_azure_openai: MagicMock,
deployment_name: str | None = None,
assistant_id: str | None = None,
assistant_name: str | None = None,
thread_id: str | None = None,
should_delete_assistant: bool = False,
) -> AzureAssistantsClient:
"""Helper function to create AzureAssistantsClient instances for testing, bypassing Pydantic validation."""
return AzureAssistantsClient.model_construct(
ai_model_id=deployment_name or "test_chat_deployment",
assistant_id=assistant_id,
assistant_name=assistant_name,
thread_id=thread_id,
api_key="test-api-key",
endpoint="https://test-endpoint.com",
client=mock_async_azure_openai,
_should_delete_assistant=should_delete_assistant,
)
@pytest.fixture
def mock_async_azure_openai() -> MagicMock:
"""Mock AsyncAzureOpenAI client."""
mock_client = MagicMock()
# Mock beta.assistants
mock_client.beta.assistants.create = AsyncMock(return_value=MagicMock(id="test-assistant-id"))
mock_client.beta.assistants.delete = AsyncMock()
# Mock beta.threads
mock_client.beta.threads.create = AsyncMock(return_value=MagicMock(id="test-thread-id"))
mock_client.beta.threads.delete = AsyncMock()
# Mock beta.threads.runs
mock_client.beta.threads.runs.create = AsyncMock(return_value=MagicMock(id="test-run-id"))
mock_client.beta.threads.runs.retrieve = AsyncMock()
mock_client.beta.threads.runs.submit_tool_outputs = AsyncMock()
# Mock beta.threads.messages
mock_client.beta.threads.messages.create = AsyncMock()
mock_client.beta.threads.messages.list = AsyncMock(return_value=MagicMock(data=[]))
return mock_client
def test_azure_assistants_client_init_with_client(mock_async_azure_openai: MagicMock) -> None:
"""Test AzureAssistantsClient initialization with existing client."""
chat_client = create_test_azure_assistants_client(
mock_async_azure_openai,
deployment_name="test_chat_deployment",
assistant_id="existing-assistant-id",
thread_id="test-thread-id",
)
assert chat_client.client is mock_async_azure_openai
assert chat_client.ai_model_id == "test_chat_deployment"
assert chat_client.assistant_id == "existing-assistant-id"
assert chat_client.thread_id == "test-thread-id"
assert not chat_client._should_delete_assistant # type: ignore
assert isinstance(chat_client, ChatClient)
def test_azure_assistants_client_init_auto_create_client(
azure_openai_unit_test_env: dict[str, str],
mock_async_azure_openai: MagicMock,
) -> None:
"""Test AzureAssistantsClient initialization with auto-created client."""
chat_client = AzureAssistantsClient.model_construct(
ai_model_id=azure_openai_unit_test_env["AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"],
assistant_id=None,
assistant_name="TestAssistant",
thread_id=None,
api_key=azure_openai_unit_test_env["AZURE_OPENAI_API_KEY"],
endpoint=azure_openai_unit_test_env["AZURE_OPENAI_ENDPOINT"],
client=mock_async_azure_openai,
_should_delete_assistant=False,
)
assert chat_client.client is mock_async_azure_openai
assert chat_client.ai_model_id == azure_openai_unit_test_env["AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"]
assert chat_client.assistant_id is None
assert chat_client.assistant_name == "TestAssistant"
assert not chat_client._should_delete_assistant # type: ignore
def test_azure_assistants_client_init_validation_fail() -> None:
"""Test AzureAssistantsClient initialization with validation failure."""
with pytest.raises(ServiceInitializationError):
# Force failure by providing invalid deployment name type - this should cause validation to fail
AzureAssistantsClient(deployment_name=123, api_key="valid-key") # type: ignore
@pytest.mark.parametrize("exclude_list", [["AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"]], indirect=True)
def test_azure_assistants_client_init_missing_deployment_name(azure_openai_unit_test_env: dict[str, str]) -> None:
"""Test AzureAssistantsClient initialization with missing deployment name."""
with pytest.raises(ServiceInitializationError):
AzureAssistantsClient(
api_key=azure_openai_unit_test_env.get("AZURE_OPENAI_API_KEY", "test-key"), env_file_path="nonexistent.env"
)
def test_azure_assistants_client_init_with_default_headers(azure_openai_unit_test_env: dict[str, str]) -> None:
"""Test AzureAssistantsClient initialization with default headers."""
default_headers = {"X-Unit-Test": "test-guid"}
chat_client = AzureAssistantsClient(
deployment_name="test_chat_deployment",
api_key=azure_openai_unit_test_env["AZURE_OPENAI_API_KEY"],
endpoint=azure_openai_unit_test_env["AZURE_OPENAI_ENDPOINT"],
default_headers=default_headers,
)
assert chat_client.ai_model_id == "test_chat_deployment"
assert isinstance(chat_client, ChatClient)
# Assert that the default header we added is present in the client's default headers
for key, value in default_headers.items():
assert key in chat_client.client.default_headers
assert chat_client.client.default_headers[key] == value
async def test_azure_assistants_client_get_assistant_id_or_create_existing_assistant(
mock_async_azure_openai: MagicMock,
) -> None:
"""Test _get_assistant_id_or_create when assistant_id is already provided."""
chat_client = create_test_azure_assistants_client(mock_async_azure_openai, assistant_id="existing-assistant-id")
assistant_id = await chat_client._get_assistant_id_or_create() # type: ignore
assert assistant_id == "existing-assistant-id"
assert not chat_client._should_delete_assistant # type: ignore
mock_async_azure_openai.beta.assistants.create.assert_not_called()
async def test_azure_assistants_client_get_assistant_id_or_create_create_new(
mock_async_azure_openai: MagicMock,
) -> None:
"""Test _get_assistant_id_or_create when creating a new assistant."""
chat_client = create_test_azure_assistants_client(
mock_async_azure_openai, deployment_name="test_chat_deployment", assistant_name="TestAssistant"
)
assistant_id = await chat_client._get_assistant_id_or_create() # type: ignore
assert assistant_id == "test-assistant-id"
assert chat_client._should_delete_assistant # type: ignore
mock_async_azure_openai.beta.assistants.create.assert_called_once()
async def test_azure_assistants_client_aclose_should_not_delete(
mock_async_azure_openai: MagicMock,
) -> None:
"""Test close when assistant should not be deleted."""
chat_client = create_test_azure_assistants_client(
mock_async_azure_openai, assistant_id="assistant-to-keep", should_delete_assistant=False
)
await chat_client.close() # type: ignore
# Verify assistant deletion was not called
mock_async_azure_openai.beta.assistants.delete.assert_not_called()
assert not chat_client._should_delete_assistant # type: ignore
async def test_azure_assistants_client_aclose_should_delete(mock_async_azure_openai: MagicMock) -> None:
"""Test close method calls cleanup."""
chat_client = create_test_azure_assistants_client(
mock_async_azure_openai, assistant_id="assistant-to-delete", should_delete_assistant=True
)
await chat_client.close()
# Verify assistant deletion was called
mock_async_azure_openai.beta.assistants.delete.assert_called_once_with("assistant-to-delete")
assert not chat_client._should_delete_assistant # type: ignore
async def test_azure_assistants_client_async_context_manager(mock_async_azure_openai: MagicMock) -> None:
"""Test async context manager functionality."""
chat_client = create_test_azure_assistants_client(
mock_async_azure_openai, assistant_id="assistant-to-delete", should_delete_assistant=True
)
# Test context manager
async with chat_client:
pass # Just test that we can enter and exit
# Verify cleanup was called on exit
mock_async_azure_openai.beta.assistants.delete.assert_called_once_with("assistant-to-delete")
def test_azure_assistants_client_serialize(azure_openai_unit_test_env: dict[str, str]) -> None:
"""Test serialization of AzureAssistantsClient."""
default_headers = {"X-Unit-Test": "test-guid"}
# Test basic initialization and to_dict
chat_client = AzureAssistantsClient(
deployment_name="test_chat_deployment",
assistant_id="test-assistant-id",
assistant_name="TestAssistant",
thread_id="test-thread-id",
api_key=azure_openai_unit_test_env["AZURE_OPENAI_API_KEY"],
endpoint=azure_openai_unit_test_env["AZURE_OPENAI_ENDPOINT"],
default_headers=default_headers,
)
dumped_settings = chat_client.to_dict()
assert dumped_settings["ai_model_id"] == "test_chat_deployment"
assert dumped_settings["assistant_id"] == "test-assistant-id"
assert dumped_settings["assistant_name"] == "TestAssistant"
assert dumped_settings["thread_id"] == "test-thread-id"
assert dumped_settings["api_key"] == azure_openai_unit_test_env["AZURE_OPENAI_API_KEY"]
# Assert that the default header we added is present in the dumped_settings default headers
for key, value in default_headers.items():
assert key in dumped_settings["default_headers"]
assert dumped_settings["default_headers"][key] == value
# Assert that the 'User-Agent' header is not present in the dumped_settings default headers
assert "User-Agent" not in dumped_settings["default_headers"]
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
return f"The weather in {location} is sunny with a high of 25°C."
@skip_if_azure_integration_tests_disabled
async def test_azure_assistants_client_get_response() -> None:
"""Test Azure Assistants Client response."""
async with AzureAssistantsClient() as azure_assistants_client:
assert isinstance(azure_assistants_client, ChatClient)
messages: list[ChatMessage] = []
messages.append(
ChatMessage(
role="user",
text="The weather in Seattle is currently sunny with a high of 25°C. "
"It's a beautiful day for outdoor activities.",
)
)
messages.append(ChatMessage(role="user", text="What's the weather like today?"))
# Test that the client can be used to get a response
response = await azure_assistants_client.get_response(messages=messages)
assert response is not None
assert isinstance(response, ChatResponse)
assert any(word in response.text.lower() for word in ["sunny", "25", "weather", "seattle"])
@skip_if_azure_integration_tests_disabled
async def test_azure_assistants_client_get_response_tools() -> None:
"""Test Azure Assistants Client response with tools."""
async with AzureAssistantsClient() as azure_assistants_client:
assert isinstance(azure_assistants_client, ChatClient)
messages: list[ChatMessage] = []
messages.append(ChatMessage(role="user", text="What's the weather like in Seattle?"))
# Test that the client can be used to get a response
response = await azure_assistants_client.get_response(
messages=messages,
tools=[get_weather],
tool_choice="auto",
)
assert response is not None
assert isinstance(response, ChatResponse)
assert any(word in response.text.lower() for word in ["sunny", "25", "weather"])
@skip_if_azure_integration_tests_disabled
async def test_azure_assistants_client_streaming() -> None:
"""Test Azure Assistants Client streaming response."""
async with AzureAssistantsClient() as azure_assistants_client:
assert isinstance(azure_assistants_client, ChatClient)
messages: list[ChatMessage] = []
messages.append(
ChatMessage(
role="user",
text="The weather in Seattle is currently sunny with a high of 25°C. "
"It's a beautiful day for outdoor activities.",
)
)
messages.append(ChatMessage(role="user", text="What's the weather like today?"))
# Test that the client can be used to get a response
response = azure_assistants_client.get_streaming_response(messages=messages)
full_message: str = ""
async for chunk in response:
assert chunk is not None
assert isinstance(chunk, ChatResponseUpdate)
for content in chunk.contents:
if isinstance(content, TextContent) and content.text:
full_message += content.text
assert any(word in full_message.lower() for word in ["sunny", "25", "weather", "seattle"])
@skip_if_azure_integration_tests_disabled
async def test_azure_assistants_client_streaming_tools() -> None:
"""Test Azure Assistants Client streaming response with tools."""
async with AzureAssistantsClient() as azure_assistants_client:
assert isinstance(azure_assistants_client, ChatClient)
messages: list[ChatMessage] = []
messages.append(ChatMessage(role="user", text="What's the weather like in Seattle?"))
# Test that the client can be used to get a response
response = azure_assistants_client.get_streaming_response(
messages=messages,
tools=[get_weather],
tool_choice="auto",
)
full_message: str = ""
async for chunk in response:
assert chunk is not None
assert isinstance(chunk, ChatResponseUpdate)
for content in chunk.contents:
if isinstance(content, TextContent) and content.text:
full_message += content.text
assert any(word in full_message.lower() for word in ["sunny", "25", "weather"])
@skip_if_azure_integration_tests_disabled
async def test_azure_assistants_client_with_existing_assistant() -> None:
"""Test Azure Assistants Client with existing assistant ID."""
# First create an assistant to use in the test
async with AzureAssistantsClient() as temp_client:
# Get the assistant ID by triggering assistant creation
messages = [ChatMessage(role="user", text="Hello")]
await temp_client.get_response(messages=messages)
assistant_id = temp_client.assistant_id
# Now test using the existing assistant
async with AzureAssistantsClient(assistant_id=assistant_id) as azure_assistants_client:
assert isinstance(azure_assistants_client, ChatClient)
assert azure_assistants_client.assistant_id == assistant_id
messages = [ChatMessage(role="user", text="What can you do?")]
# Test that the client can be used to get a response
response = await azure_assistants_client.get_response(messages=messages)
assert response is not None
assert isinstance(response, ChatResponse)
assert len(response.text) > 0
@@ -6,6 +6,7 @@ from typing import Any
PACKAGE_NAME = "agent_framework_azure"
PACKAGE_EXTRA = "azure"
_IMPORTS = [
"AzureAssistantsClient",
"AzureChatClient",
"AzureOpenAISettings",
"get_entra_auth_token",
@@ -1,6 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.
from agent_framework_azure import (
AzureAssistantsClient,
AzureChatClient,
AzureOpenAISettings,
__version__,
@@ -8,6 +9,7 @@ from agent_framework_azure import (
)
__all__ = [
"AzureAssistantsClient",
"AzureChatClient",
"AzureOpenAISettings",
"__version__",
@@ -0,0 +1,65 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from random import randint
from typing import Annotated
from agent_framework import ChatClientAgent
from agent_framework.azure import AzureAssistantsClient
from pydantic import Field
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def non_streaming_example() -> None:
"""Example of non-streaming response (get the complete result at once)."""
print("=== Non-streaming Response Example ===")
# Since no assistant ID is provided, the assistant will be automatically created
# and deleted after getting a response
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
query = "What's the weather like in Seattle?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result}\n")
async def streaming_example() -> None:
"""Example of streaming response (get results as they are generated)."""
print("=== Streaming Response Example ===")
# Since no assistant ID is provided, the assistant will be automatically created
# and deleted after getting a response
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
query = "What's the weather like in Portland?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
async for chunk in agent.run_streaming(query):
if chunk.text:
print(chunk.text, end="", flush=True)
print("\n")
async def main() -> None:
print("=== Basic Azure OpenAI Assistants Chat Client Agent Example ===")
await non_streaming_example()
await streaming_example()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,59 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import AgentRunResponseUpdate, ChatClientAgent, ChatResponseUpdate, HostedCodeInterpreterTool
from agent_framework.azure import AzureAssistantsClient
from openai.types.beta.threads.runs import (
CodeInterpreterToolCallDelta,
RunStepDelta,
RunStepDeltaEvent,
ToolCallDeltaObject,
)
from openai.types.beta.threads.runs.code_interpreter_tool_call_delta import CodeInterpreter
def get_code_interpreter_chunk(chunk: AgentRunResponseUpdate) -> str | None:
"""Helper method to access code interpreter data."""
if (
isinstance(chunk.raw_representation, ChatResponseUpdate)
and isinstance(chunk.raw_representation.raw_representation, RunStepDeltaEvent)
and isinstance(chunk.raw_representation.raw_representation.delta, RunStepDelta)
and isinstance(chunk.raw_representation.raw_representation.delta.step_details, ToolCallDeltaObject)
and chunk.raw_representation.raw_representation.delta.step_details.tool_calls
):
for tool_call in chunk.raw_representation.raw_representation.delta.step_details.tool_calls:
if (
isinstance(tool_call, CodeInterpreterToolCallDelta)
and isinstance(tool_call.code_interpreter, CodeInterpreter)
and tool_call.code_interpreter.input is not None
):
return tool_call.code_interpreter.input
return None
async def main() -> None:
"""Example showing how to use the HostedCodeInterpreterTool with Azure OpenAI Assistants."""
print("=== Azure OpenAI Assistants Agent with Code Interpreter Example ===")
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a helpful assistant that can write and execute Python code to solve problems.",
tools=HostedCodeInterpreterTool(),
) as agent:
query = "What is current datetime?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
generated_code = ""
async for chunk in agent.run_streaming(query):
if chunk.text:
print(chunk.text, end="", flush=True)
code_interpreter_chunk = get_code_interpreter_chunk(chunk)
if code_interpreter_chunk is not None:
generated_code += code_interpreter_chunk
print(f"\nGenerated code:\n{generated_code}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,53 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from random import randint
from typing import Annotated
from agent_framework import ChatClientAgent
from agent_framework.azure import AzureAssistantsClient
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from openai import AsyncAzureOpenAI
from pydantic import Field
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main() -> None:
print("=== Azure OpenAI Assistants Chat Client with Existing Assistant ===")
token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default")
client = AsyncAzureOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_ad_token_provider=token_provider,
api_version="2025-01-01-preview",
)
# Create an assistant that will persist
created_assistant = await client.beta.assistants.create(
model=os.environ["AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"], name="WeatherAssistant"
)
try:
async with ChatClientAgent(
chat_client=AzureAssistantsClient(async_client=client, assistant_id=created_assistant.id),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
result = await agent.run("What's the weather like in Tokyo?")
print(f"Result: {result}\n")
finally:
# Clean up the assistant manually
await client.beta.assistants.delete(created_assistant.id)
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,117 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from datetime import datetime, timezone
from random import randint
from typing import Annotated
from agent_framework import ChatClientAgent
from agent_framework.azure import AzureAssistantsClient
from pydantic import Field
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
def get_time() -> str:
"""Get the current UTC time."""
current_time = datetime.now(timezone.utc)
return f"The current UTC time is {current_time.strftime('%Y-%m-%d %H:%M:%S')}."
async def tools_on_agent_level() -> None:
"""Example showing tools defined when creating the agent."""
print("=== Tools Defined on Agent Level ===")
# Tools are provided when creating the agent
# The agent can use these tools for any query during its lifetime
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a helpful assistant that can provide weather and time information.",
tools=[get_weather, get_time], # Tools defined at agent creation
) as agent:
# First query - agent can use weather tool
query1 = "What's the weather like in New York?"
print(f"User: {query1}")
result1 = await agent.run(query1)
print(f"Agent: {result1}\n")
# Second query - agent can use time tool
query2 = "What's the current UTC time?"
print(f"User: {query2}")
result2 = await agent.run(query2)
print(f"Agent: {result2}\n")
# Third query - agent can use both tools if needed
query3 = "What's the weather in London and what's the current UTC time?"
print(f"User: {query3}")
result3 = await agent.run(query3)
print(f"Agent: {result3}\n")
async def tools_on_run_level() -> None:
"""Example showing tools passed to the run method."""
print("=== Tools Passed to Run Method ===")
# Agent created without tools
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a helpful assistant.",
# No tools defined here
) as agent:
# First query with weather tool
query1 = "What's the weather like in Seattle?"
print(f"User: {query1}")
result1 = await agent.run(query1, tools=[get_weather]) # Tool passed to run method
print(f"Agent: {result1}\n")
# Second query with time tool
query2 = "What's the current UTC time?"
print(f"User: {query2}")
result2 = await agent.run(query2, tools=[get_time]) # Different tool for this query
print(f"Agent: {result2}\n")
# Third query with multiple tools
query3 = "What's the weather in Chicago and what's the current UTC time?"
print(f"User: {query3}")
result3 = await agent.run(query3, tools=[get_weather, get_time]) # Multiple tools
print(f"Agent: {result3}\n")
async def mixed_tools_example() -> None:
"""Example showing both agent-level tools and run-method tools."""
print("=== Mixed Tools Example (Agent + Run Method) ===")
# Agent created with some base tools
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a comprehensive assistant that can help with various information requests.",
tools=[get_weather], # Base tool available for all queries
) as agent:
# Query using both agent tool and additional run-method tools
query = "What's the weather in Denver and what's the current UTC time?"
print(f"User: {query}")
# Agent has access to get_weather (from creation) + additional tools from run method
result = await agent.run(
query,
tools=[get_time], # Additional tools for this specific query
)
print(f"Agent: {result}\n")
async def main() -> None:
print("=== Azure OpenAI Assistants Chat Client Agent with Function Tools Examples ===\n")
await tools_on_agent_level()
await tools_on_run_level()
await mixed_tools_example()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,128 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from random import randint
from typing import Annotated
from agent_framework import ChatClientAgent, ChatClientAgentThread
from agent_framework.azure import AzureAssistantsClient
from pydantic import Field
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def example_with_automatic_thread_creation() -> None:
"""Example showing automatic thread creation (service-managed thread)."""
print("=== Automatic Thread Creation Example ===")
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
# First conversation - no thread provided, will be created automatically
query1 = "What's the weather like in Seattle?"
print(f"User: {query1}")
result1 = await agent.run(query1)
print(f"Agent: {result1.text}")
# Second conversation - still no thread provided, will create another new thread
query2 = "What was the last city I asked about?"
print(f"\nUser: {query2}")
result2 = await agent.run(query2)
print(f"Agent: {result2.text}")
print("Note: Each call creates a separate thread, so the agent doesn't remember previous context.\n")
async def example_with_thread_persistence() -> None:
"""Example showing thread persistence across multiple conversations."""
print("=== Thread Persistence Example ===")
print("Using the same thread across multiple conversations to maintain context.\n")
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First conversation
query1 = "What's the weather like in Tokyo?"
print(f"User: {query1}")
result1 = await agent.run(query1, thread=thread)
print(f"Agent: {result1.text}")
# Second conversation using the same thread - maintains context
query2 = "How about London?"
print(f"\nUser: {query2}")
result2 = await agent.run(query2, thread=thread)
print(f"Agent: {result2.text}")
# Third conversation - agent should remember both previous cities
query3 = "Which of the cities I asked about has better weather?"
print(f"\nUser: {query3}")
result3 = await agent.run(query3, thread=thread)
print(f"Agent: {result3.text}")
print("Note: The agent remembers context from previous messages in the same thread.\n")
async def example_with_existing_thread_id() -> None:
"""Example showing how to work with an existing thread ID from the service."""
print("=== Existing Thread ID Example ===")
print("Using a specific thread ID to continue an existing conversation.\n")
# First, create a conversation and capture the thread ID
existing_thread_id = None
async with ChatClientAgent(
chat_client=AzureAssistantsClient(),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
# Start a conversation and get the thread ID
thread = agent.get_new_thread()
query1 = "What's the weather in Paris?"
print(f"User: {query1}")
result1 = await agent.run(query1, thread=thread)
print(f"Agent: {result1.text}")
# The thread ID is set after the first response
existing_thread_id = thread.id
print(f"Thread ID: {existing_thread_id}")
if existing_thread_id:
print("\n--- Continuing with the same thread ID in a new agent instance ---")
# Create a new agent instance but use the existing thread ID
async with ChatClientAgent(
chat_client=AzureAssistantsClient(thread_id=existing_thread_id),
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
# Create a thread with the existing ID
thread = ChatClientAgentThread(id=existing_thread_id)
query2 = "What was the last city I asked about?"
print(f"User: {query2}")
result2 = await agent.run(query2, thread=thread)
print(f"Agent: {result2.text}")
print("Note: The agent continues the conversation from the previous thread.\n")
async def main() -> None:
print("=== Azure OpenAI Assistants Chat Client Agent Thread Management Examples ===\n")
await example_with_automatic_thread_creation()
await example_with_thread_persistence()
await example_with_existing_thread_id()
if __name__ == "__main__":
asyncio.run(main())
@@ -2,7 +2,7 @@
import asyncio
from agent_framework import AgentRunResponseUpdate, ChatClientAgent, HostedCodeInterpreterTool
from agent_framework import AgentRunResponseUpdate, ChatClientAgent, ChatResponseUpdate, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIAssistantsClient
from openai.types.beta.threads.runs import (
CodeInterpreterToolCallDelta,
@@ -16,7 +16,7 @@ from openai.types.beta.threads.runs.code_interpreter_tool_call_delta import Code
def get_code_interpreter_chunk(chunk: AgentRunResponseUpdate) -> str | None:
"""Helper method to access code interpreter data."""
if (
isinstance(chunk.raw_representation, AgentRunResponseUpdate)
isinstance(chunk.raw_representation, ChatResponseUpdate)
and isinstance(chunk.raw_representation.raw_representation, RunStepDeltaEvent)
and isinstance(chunk.raw_representation.raw_representation.delta, RunStepDelta)
and isinstance(chunk.raw_representation.raw_representation.delta.step_details, ToolCallDeltaObject)
@@ -0,0 +1,36 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from random import randint
from typing import Annotated
from agent_framework.azure import AzureAssistantsClient
from pydantic import Field
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main() -> None:
async with AzureAssistantsClient() as client:
message = "What's the weather in Amsterdam and in Paris?"
stream = False
print(f"User: {message}")
if stream:
print("Assistant: ", end="")
async for chunk in client.get_streaming_response(message, tools=get_weather):
if str(chunk):
print(str(chunk), end="")
print("")
else:
response = await client.get_response(message, tools=get_weather)
print(f"Assistant: {response}")
if __name__ == "__main__":
asyncio.run(main())