Files
agent-framework/python/packages/ag-ui/getting_started/client_advanced.py
Eduard van Valkenburg 3446eb8d5d Python: [BREAKING] update to v1.0.0 (#5062)
* updates to final deprecated pieces and versions

* fix mypy

* fix readme links
2026-04-02 15:26:30 +00:00

246 lines
8.1 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Advanced AG-UI client example with tools and features.
This example demonstrates advanced AGUIChatClient features including:
- Tool/function calling
- Non-streaming responses
- Multiple conversation turns
- Error handling
"""
from __future__ import annotations
import asyncio
import os
from typing import cast
from agent_framework import ChatResponse, ChatResponseUpdate, Message, ResponseStream, tool
from agent_framework.ag_ui import AGUIChatClient
@tool
def get_weather(location: str) -> str:
"""Get the current weather for a location.
Args:
location: The city or location name
"""
# Simulate weather lookup
weather_data = {
"seattle": "Rainy, 55°F",
"san francisco": "Foggy, 62°F",
"new york": "Sunny, 68°F",
"london": "Cloudy, 52°F",
}
return weather_data.get(location.lower(), f"Weather data not available for {location}")
@tool
def calculate(a: float, b: float, operation: str) -> str:
"""Perform basic arithmetic operations.
Args:
a: First number
b: Second number
operation: Operation to perform (add, subtract, multiply, divide)
"""
try:
if operation == "add":
result = a + b
elif operation == "subtract":
result = a - b
elif operation == "multiply":
result = a * b
elif operation == "divide":
result = a / b
else:
return f"Unsupported operation: {operation}"
return f"The result is: {result}"
except Exception as e:
return f"Error calculating: {e}"
async def streaming_example(client: AGUIChatClient, thread_id: str | None = None):
"""Demonstrate streaming responses."""
print("\n" + "=" * 60)
print("STREAMING EXAMPLE")
print("=" * 60)
metadata = {"thread_id": thread_id} if thread_id else None
print("\nUser: Tell me a short joke\n")
print("Assistant: ", end="", flush=True)
stream = client.get_response(
[Message(role="user", contents=["Tell me a short joke"])],
stream=True,
options={"metadata": metadata} if metadata else None,
)
stream = cast(ResponseStream[ChatResponseUpdate, ChatResponse], stream)
async for update in stream:
if not thread_id and update.additional_properties:
thread_id = update.additional_properties.get("thread_id")
for content in update.contents:
if content.type == "text" and content.text: # type: ignore[attr-defined]
print(content.text, end="", flush=True) # type: ignore[attr-defined]
print("\n")
return thread_id
async def non_streaming_example(client: AGUIChatClient, thread_id: str | None = None):
"""Demonstrate non-streaming responses."""
print("\n" + "=" * 60)
print("NON-STREAMING EXAMPLE")
print("=" * 60)
metadata = {"thread_id": thread_id} if thread_id else None
print("\nUser: What is 2 + 2?\n")
response = await client.get_response([Message(role="user", contents=["What is 2 + 2?"])], metadata=metadata)
print(f"Assistant: {response.text}")
if response.additional_properties:
thread_id = response.additional_properties.get("thread_id")
print(f"\n[Thread: {thread_id}]")
return thread_id
async def tool_example(client: AGUIChatClient, thread_id: str | None = None):
"""Demonstrate sending tool definitions to the server.
IMPORTANT: When using AGUIChatClient directly (without Agent wrapper):
- Tools are sent as DEFINITIONS only
- No automatic client-side execution (no function invocation middleware)
- Server must have matching tool implementations to execute them
For CLIENT-SIDE tool execution (like .NET AGUIClient sample):
- Use Agent wrapper with tools
- See client_with_agent.py for the hybrid pattern
- Agent middleware intercepts and executes client tools locally
- Server can have its own tools that execute server-side
- Both client and server tools work together in same conversation
This example sends tool definitions and assumes server-side execution.
"""
print("\n" + "=" * 60)
print("TOOL DEFINITION EXAMPLE")
print("=" * 60)
metadata = {"thread_id": thread_id} if thread_id else None
print("\nUser: What's the weather in Seattle?\n")
print("Sending tool definitions to server...")
print("(Server must be configured with matching tools to execute them)\n")
response = await client.get_response(
[Message(role="user", contents=["What's the weather in Seattle?"])],
tools=[get_weather, calculate],
metadata=metadata,
)
print(f"Assistant: {response.text}")
# Show tool calls if any
tool_called = False
for message in response.messages:
for content in message.contents:
if content.type == "function_call": # type: ignore[attr-defined]
print(f"\n[Tool Called: {content.name}]") # type: ignore[attr-defined]
tool_called = True
if not tool_called:
print("\n[Note: No tools were called - server may not be configured for tool execution]")
if response.additional_properties:
thread_id = response.additional_properties.get("thread_id")
return thread_id
async def conversation_example(client: AGUIChatClient):
"""Demonstrate multi-turn conversation.
Note: Conversation continuity depends on the server maintaining thread state.
Some servers may require explicit message history to be sent with each request.
"""
print("\n" + "=" * 60)
print("MULTI-TURN CONVERSATION EXAMPLE")
print("=" * 60)
print("\nNote: This example uses thread_id for context. Server must support thread-based state.\n")
# First turn
print("User: My name is Alice\n")
response1 = await client.get_response([Message(role="user", contents=["My name is Alice"])])
print(f"Assistant: {response1.text}")
thread_id = response1.additional_properties.get("thread_id")
print(f"\n[Thread: {thread_id}]")
# Second turn - using same thread
print("\nUser: What's my name?\n")
response2 = await client.get_response(
[Message(role="user", contents=["What's my name?"])], options={"metadata": {"thread_id": thread_id}}
)
print(f"Assistant: {response2.text}")
# Check if context was maintained
if "alice" not in response2.text.lower():
print("\n[Note: Server may not maintain thread context - consider using Agent for history management]")
# Third turn
print("\nUser: Can you also tell me what 10 * 5 is?\n")
response3 = await client.get_response(
[Message(role="user", contents=["Can you also tell me what 10 * 5 is?"])],
options={"metadata": {"thread_id": thread_id}},
tools=[calculate],
)
print(f"Assistant: {response3.text}")
async def main():
"""Run all examples."""
# Get server URL from environment or use default
server_url = os.environ.get("AGUI_SERVER_URL", "http://127.0.0.1:5100/")
print("=" * 60)
print("AG-UI Chat Client Advanced Examples")
print("=" * 60)
print(f"\nServer: {server_url}")
print("\nThese examples demonstrate various AGUIChatClient features:")
print(" 1. Streaming responses")
print(" 2. Non-streaming responses")
print(" 3. Tool/function calling")
print(" 4. Multi-turn conversations")
try:
async with AGUIChatClient(endpoint=server_url) as client:
# Run examples in sequence
thread_id = await streaming_example(client)
thread_id = await non_streaming_example(client, thread_id)
await tool_example(client, thread_id)
# Separate conversation with new thread
await conversation_example(client)
print("\n" + "=" * 60)
print("All examples completed successfully!")
print("=" * 60)
except ConnectionError as e:
print(f"\n\033[91mConnection Error: {e}\033[0m")
print("\nMake sure an AG-UI server is running at the specified endpoint.")
except Exception as e:
print(f"\n\033[91mError: {e}\033[0m")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())