Files
agent-framework/python/samples/04-hosting/foundry-hosted-agents/responses/07_observability/main.py
T
Tao Chen 5a087885a2 Python: Add hosted agent sample with observability (#5608)
* Add hosted agent sample with observability

* Address comments

* Remove unneeded changes

* Update README
2026-05-04 22:31:47 +00:00

58 lines
1.9 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from random import randint
from typing import Annotated
from agent_framework import Agent, tool
from agent_framework.foundry import FoundryChatClient
from agent_framework_foundry_hosting import ResponsesHostServer
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv
from pydantic import Field
# Load environment variables from .env file
load_dotenv()
@tool(approval_mode="never_require", description="Get the current location of the user.")
def get_current_location() -> str:
"""Get the current location of the agent."""
locations = ["New York", "London", "Paris", "Tokyo"]
return locations[randint(0, len(locations) - 1)]
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main():
client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=DefaultAzureCredential(),
)
agent = Agent(
client=client,
instructions="You are a friendly assistant. Keep your answers brief.",
tools=[get_weather, get_current_location],
# History will be managed by the hosting infrastructure, thus there
# is no need to store history by the service. Learn more at:
# https://developers.openai.com/api/reference/resources/responses/methods/create
default_options={"store": False},
)
server = ResponsesHostServer(agent)
await server.run_async()
if __name__ == "__main__":
asyncio.run(main())