# Copyright (c) Microsoft. All rights reserved. import asyncio from random import randint from typing import Annotated from agent_framework import Agent, tool from agent_framework.anthropic import AnthropicClient from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() """ Anthropic Chat Agent Example This sample demonstrates using Anthropic with an agent and a single custom tool. """ # NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production; # see samples/02-agents/tools/function_tool_with_approval.py # and samples/02-agents/tools/function_tool_with_approval_and_sessions.py. @tool(approval_mode="never_require") def get_weather( location: Annotated[str, "The location to get the weather for."], ) -> str: """Get the weather for a given location.""" conditions = ["sunny", "cloudy", "rainy", "stormy"] return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C." async def non_streaming_example() -> None: """Example of non-streaming response (get the complete result at once).""" print("=== Non-streaming Response Example ===") agent = Agent( client=AnthropicClient(model="claude-sonnet-4-5-20250929"), name="WeatherAgent", instructions="You are a helpful weather agent.", tools=get_weather, ) query = "What's the weather like in Seattle?" print(f"User: {query}") result = await agent.run(query) print(f"Result: {result}\n") async def streaming_example() -> None: """Example of streaming response (get results as they are generated).""" print("=== Streaming Response Example ===") agent = Agent( client=AnthropicClient(model="claude-sonnet-4-5-20250929"), name="WeatherAgent", instructions="You are a helpful weather agent.", tools=get_weather, ) query = "What's the weather like in Portland and in Paris?" print(f"User: {query}") print("Agent: ", end="", flush=True) async for chunk in agent.run(query, stream=True): if chunk.text: print(chunk.text, end="", flush=True) print("\n") async def main() -> None: print("=== Anthropic Example ===") await streaming_example() await non_streaming_example() if __name__ == "__main__": asyncio.run(main())