Python: Add samples syntax checking with pyright (#3710)

* Add samples syntax checking with pyright

- Add pyrightconfig.samples.json with relaxed type checking but import validation
- Add samples-syntax poe task to check samples for syntax and import errors
- Add samples-syntax to check and pre-commit-check tasks
- Fix 78 sample errors:
  - Update workflow builder imports to use agent_framework_orchestrations
  - Change content type isinstance checks to content.type comparisons
  - Use Content factory methods instead of removed content type classes
  - Fix TypedDict access patterns for Annotation
  - Fix various API mismatches (normalize_messages, ChatMessage.text, role)

* fixed a bunch of samples and tweaks to pre-commit

* updated lock

* updated lock

* fixes

* added lint to samples
This commit is contained in:
Eduard van Valkenburg
2026-02-07 08:10:47 +01:00
committed by GitHub
Unverified
parent 74ac470a56
commit 390f93344c
83 changed files with 606 additions and 498 deletions
@@ -7,7 +7,7 @@ to other specialized agents based on the task requirements.
import asyncio
from agent_framework import WorkflowEvent
from agent_framework import AgentResponseUpdate, WorkflowEvent
from orderedmultidict import Any
@@ -99,7 +99,6 @@ async def run_autogen() -> None:
async def run_agent_framework() -> None:
"""Agent Framework's HandoffBuilder for agent coordination."""
from agent_framework import (
AgentResponseUpdate,
WorkflowRunState,
)
from agent_framework.openai import OpenAIChatClient
@@ -48,7 +48,7 @@ async def run_autogen() -> None:
async def run_agent_framework() -> None:
"""Agent Framework's as_tool() for hierarchical agents with streaming."""
from agent_framework import FunctionCallContent, FunctionResultContent
from agent_framework import Content
from agent_framework.openai import OpenAIChatClient
client = OpenAIChatClient(model_id="gpt-4.1-mini")
@@ -78,7 +78,7 @@ async def run_agent_framework() -> None:
print("[Agent Framework]")
# Track accumulated function calls (they stream in incrementally)
accumulated_calls: dict[str, FunctionCallContent] = {}
accumulated_calls: dict[str, Content] = {}
async for chunk in coordinator.run("Create a tagline for a coffee shop", stream=True):
# Stream text tokens
@@ -88,7 +88,7 @@ async def run_agent_framework() -> None:
# Process streaming function calls and results
if chunk.contents:
for content in chunk.contents:
if isinstance(content, FunctionCallContent):
if content.type == "function_call":
# Accumulate function call content as it streams in
call_id = content.call_id
if call_id in accumulated_calls:
@@ -105,7 +105,7 @@ async def run_agent_framework() -> None:
current_args = accumulated_calls[call_id].arguments
print(f" Arguments: {current_args}", flush=True)
elif isinstance(content, FunctionResultContent):
elif content.type == "function_result":
# Tool result - shows writer's response
result_text = content.result if isinstance(content.result, str) else str(content.result)
if result_text.strip():
+3 -3
View File
@@ -154,11 +154,11 @@ async def main() -> None:
words = ["Hello", " ", "from", " ", "the", " ", "streaming", " ", "response", "!"]
for word in words:
await asyncio.sleep(0.05) # Simulate network delay
yield ChatResponseUpdate(contents=[Content.from_text(word)], role=Role.ASSISTANT)
yield ChatResponseUpdate(contents=[Content.from_text(word)], role="assistant")
def combine_updates(updates: Sequence[ChatResponseUpdate]) -> ChatResponse:
"""Finalizer that combines all updates into a single response."""
return ChatResponse.from_chat_response_updates(updates)
return ChatResponse.from_updates(updates)
stream = ResponseStream(generate_updates(), finalizer=combine_updates)
@@ -237,7 +237,7 @@ async def main() -> None:
)
print("Starting iteration (cleanup happens after):")
async for update in stream4:
async for _update in stream4:
pass # Just consume the stream
print(f"Cleanup was performed: {cleanup_performed['value']}")
@@ -18,7 +18,7 @@ from typing import Annotated, Any
import uvicorn
# Agent Framework imports
from agent_framework import AgentResponseUpdate, ChatAgent, ChatMessage, FunctionResultContent, Role, tool
from agent_framework import AgentResponseUpdate, ChatAgent, ChatMessage, tool
from agent_framework.azure import AzureOpenAIChatClient
# Agent Framework ChatKit integration
@@ -281,7 +281,7 @@ class WeatherChatKitServer(ChatKitServer[dict[str, Any]]):
title_prompt = [
ChatMessage(
role=Role.USER,
role="user",
text=(
f"Generate a very short, concise title (max 40 characters) for a conversation "
f"that starts with:\n\n{conversation_context}\n\n"
@@ -332,7 +332,6 @@ class WeatherChatKitServer(ChatKitServer[dict[str, Any]]):
runs the agent, converts the response back to ChatKit events using stream_agent_response,
and creates interactive weather widgets when weather data is queried.
"""
from agent_framework import FunctionResultContent
if input_user_message is None:
logger.debug("Received None user message, skipping")
@@ -375,7 +374,7 @@ class WeatherChatKitServer(ChatKitServer[dict[str, Any]]):
# Check for function results in the update
if update.contents:
for content in update.contents:
if isinstance(content, FunctionResultContent):
if content.type == "function_result":
result = content.result
# Check if it's a WeatherResponse (string subclass with weather_data attribute)
@@ -458,7 +457,7 @@ class WeatherChatKitServer(ChatKitServer[dict[str, Any]]):
weather_data: WeatherData | None = None
# Create an agent message asking about the weather
agent_messages = [ChatMessage(role=Role.USER, text=f"What's the weather in {city_label}?")]
agent_messages = [ChatMessage(role="user", text=f"What's the weather in {city_label}?")]
logger.debug(f"Processing weather query: {agent_messages[0].text}")
@@ -472,7 +471,7 @@ class WeatherChatKitServer(ChatKitServer[dict[str, Any]]):
# Check for function results in the update
if update.contents:
for content in update.contents:
if isinstance(content, FunctionResultContent):
if content.type == "function_result":
result = content.result
# Check if it's a WeatherResponse (string subclass with weather_data attribute)
@@ -563,7 +562,7 @@ async def chatkit_endpoint(request: Request):
@app.post("/upload/{attachment_id}")
async def upload_file(attachment_id: str, file: UploadFile = File(...)):
async def upload_file(attachment_id: str, file: Annotated[UploadFile, File()]):
"""Handle file upload for two-phase upload.
The client POSTs the file bytes here after creating the attachment
@@ -585,7 +584,7 @@ async def upload_file(attachment_id: str, file: UploadFile = File(...)):
attachment = await data_store.load_attachment(attachment_id, {"user_id": DEFAULT_USER_ID})
# Clear the upload_url since upload is complete
attachment.upload_url = None
attachment.upload_url = None # type: ignore[union-attr]
# Save the updated attachment back to the store
await data_store.save_attachment(attachment, {"user_id": DEFAULT_USER_ID})
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.
from agent_framework import ConcurrentBuilder
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework_orchestrations import ConcurrentBuilder
from azure.ai.agentserver.agentframework import from_agent_framework
from azure.identity import DefaultAzureCredential # pyright: ignore[reportUnknownVariableType]
@@ -21,7 +21,7 @@ def search_hotels(
guests: Annotated[int, Field(description="Number of guests.")] = 2,
) -> str:
"""Search for available hotels based on location and dates.
Returns:
JSON string containing search results with hotel details including name, rating,
price, distance to landmarks, amenities, and availability.
@@ -88,7 +88,7 @@ def get_hotel_details(
hotel_name: Annotated[str, Field(description="Name of the hotel to get details for.")],
) -> str:
"""Get detailed information about a specific hotel.
Returns:
JSON string containing detailed hotel information including description,
check-in/out times, cancellation policy, reviews, and nearby attractions.
@@ -167,7 +167,7 @@ def search_flights(
passengers: Annotated[int, Field(description="Number of passengers.")] = 1,
) -> str:
"""Search for available flights between two locations.
Returns:
JSON string containing flight search results with details including flight numbers,
airlines, departure/arrival times, prices, durations, and baggage allowances.
@@ -289,7 +289,7 @@ def get_flight_details(
flight_number: Annotated[str, Field(description="Flight number (e.g., 'AF007' or 'DL264').")],
) -> str:
"""Get detailed information about a specific flight.
Returns:
JSON string containing detailed flight information including airline, aircraft type,
departure/arrival airports and times, gates, terminals, duration, and amenities.
@@ -331,7 +331,7 @@ def search_activities(
category: Annotated[str | None, Field(description="Activity category (e.g., 'Sightseeing', 'Culture', 'Culinary').")] = None,
) -> str:
"""Search for available activities and attractions at a destination.
Returns:
JSON string containing activity search results with details including name, category,
duration, price, rating, description, availability, and booking requirements.
@@ -440,10 +440,7 @@ def search_activities(
}
]
if category:
activities = [act for act in all_activities if act["category"] == category]
else:
activities = all_activities
activities = [act for act in all_activities if act["category"] == category] if category else all_activities
else:
activities = [
{
@@ -473,7 +470,7 @@ def get_activity_details(
activity_name: Annotated[str, Field(description="Name of the activity to get details for.")],
) -> str:
"""Get detailed information about a specific activity.
Returns:
JSON string containing detailed activity information including description, duration,
price, included items, meeting point, what to bring, cancellation policy, and reviews.
@@ -552,7 +549,7 @@ def confirm_booking(
customer_info: Annotated[dict, Field(description="Customer information including name and email.")],
) -> str:
"""Confirm a booking reservation.
Returns:
JSON string containing confirmation details including confirmation number,
booking status, customer information, and next steps.
@@ -587,9 +584,9 @@ def check_hotel_availability(
rooms: Annotated[int, Field(description="Number of rooms needed.")] = 1,
) -> str:
"""Check availability for hotel rooms.
Sample Date format: "December 15, 2025"
Returns:
JSON string containing availability status, available rooms count, price per night,
and last checked timestamp.
@@ -621,9 +618,9 @@ def check_flight_availability(
passengers: Annotated[int, Field(description="Number of passengers.")] = 1,
) -> str:
"""Check availability for flight seats.
Sample Date format: "December 15, 2025"
Returns:
JSON string containing availability status, available seats count, price per passenger,
and last checked timestamp.
@@ -654,9 +651,9 @@ def check_activity_availability(
participants: Annotated[int, Field(description="Number of participants.")] = 1,
) -> str:
"""Check availability for activity bookings.
Sample Date format: "December 16, 2025"
Returns:
JSON string containing availability status, available spots count, price per person,
and last checked timestamp.
@@ -688,7 +685,7 @@ def process_payment(
booking_reference: Annotated[str, Field(description="Booking reference number for the payment.")],
) -> str:
"""Process payment for a booking.
Returns:
JSON string containing payment result with transaction ID, status, amount, currency,
payment method details, and receipt URL.
@@ -718,7 +715,7 @@ def validate_payment_method(
payment_method: Annotated[dict, Field(description="Payment method to validate (type, number, expiry, cvv).")],
) -> str:
"""Validate payment method details.
Returns:
JSON string containing validation result with is_valid flag, payment method type,
validation messages, supported currencies, and processing fee information.
@@ -154,19 +154,19 @@ async def run_workflow_with_response_tracking(query: str, chat_client: AzureAICl
"""
if chat_client is None:
try:
# Create AIProjectClient with the correct API version for V2 prompt agents
project_client = AIProjectClient(
endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
credential=credential,
api_version="2025-11-15-preview",
)
async with DefaultAzureCredential() as credential:
# Create AIProjectClient with the correct API version for V2 prompt agents
project_client = AIProjectClient(
endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
credential=credential,
api_version="2025-11-15-preview",
)
async with (
DefaultAzureCredential() as credential,
project_client,
AzureAIClient(project_client=project_client, credential=credential) as client,
):
return await _run_workflow_with_client(query, client)
async with (
project_client,
AzureAIClient(project_client=project_client, credential=credential) as client,
):
return await _run_workflow_with_client(query, client)
except Exception as e:
print(f"Error during workflow execution: {e}")
raise
@@ -369,27 +369,36 @@ async def _process_workflow_events(events, conversation_ids, response_ids):
def _track_agent_ids(event, agent, response_ids, conversation_ids):
"""Track agent response and conversation IDs - supporting multiple responses per agent."""
if isinstance(event.data, AgentResponseUpdate):
if (
isinstance(event.data, AgentResponseUpdate)
and hasattr(event.data, "raw_representation")
and event.data.raw_representation
):
# Check for conversation_id and response_id from raw_representation
# V2 API stores conversation_id directly on raw_representation (ChatResponseUpdate)
if hasattr(event.data, "raw_representation") and event.data.raw_representation:
raw = event.data.raw_representation
raw = event.data.raw_representation
# Try conversation_id directly on raw representation
if hasattr(raw, "conversation_id") and raw.conversation_id:
# Try conversation_id directly on raw representation
if (
hasattr(raw, "conversation_id")
and raw.conversation_id # type: ignore[union-attr]
and raw.conversation_id not in conversation_ids[agent] # type: ignore[union-attr]
):
# Only add if not already in the list
conversation_ids[agent].append(raw.conversation_id) # type: ignore[union-attr]
# Extract response_id from the OpenAI event (available from first event)
if hasattr(raw, "raw_representation") and raw.raw_representation: # type: ignore[union-attr]
openai_event = raw.raw_representation # type: ignore[union-attr]
# Check if event has response object with id
if (
hasattr(openai_event, "response")
and hasattr(openai_event.response, "id")
and openai_event.response.id not in response_ids[agent]
):
# Only add if not already in the list
if raw.conversation_id not in conversation_ids[agent]:
conversation_ids[agent].append(raw.conversation_id)
# Extract response_id from the OpenAI event (available from first event)
if hasattr(raw, "raw_representation") and raw.raw_representation:
openai_event = raw.raw_representation
# Check if event has response object with id
if hasattr(openai_event, "response") and hasattr(openai_event.response, "id"):
# Only add if not already in the list
if openai_event.response.id not in response_ids[agent]:
response_ids[agent].append(openai_event.response.id)
response_ids[agent].append(openai_event.response.id)
async def create_and_run_workflow():
@@ -29,7 +29,7 @@ def print_section(title: str):
async def run_workflow():
"""Execute the multi-agent travel planning workflow.
Returns:
Dictionary containing workflow data with agent response IDs
"""
@@ -2,7 +2,7 @@
import asyncio
from agent_framework import HostedMCPTool, HostedWebSearchTool, TextReasoningContent, UsageContent
from agent_framework import HostedMCPTool, HostedWebSearchTool
from agent_framework.anthropic import AnthropicChatOptions, AnthropicClient
"""
@@ -40,9 +40,9 @@ async def main() -> None:
print("Agent: ", end="", flush=True)
async for chunk in agent.run(query, stream=True):
for content in chunk.contents:
if isinstance(content, TextReasoningContent):
if content.type == "text_reasoning":
print(f"\033[32m{content.text}\033[0m", end="", flush=True)
if isinstance(content, UsageContent):
if content.type == "usage":
print(f"\n\033[34m[Usage so far: {content.usage_details}]\033[0m\n", end="", flush=True)
if chunk.text:
print(chunk.text, end="", flush=True)
@@ -2,7 +2,7 @@
import asyncio
from agent_framework import HostedMCPTool, HostedWebSearchTool, TextReasoningContent, UsageContent
from agent_framework import HostedMCPTool, HostedWebSearchTool
from agent_framework.anthropic import AnthropicClient
from anthropic import AsyncAnthropicFoundry
@@ -51,9 +51,9 @@ async def main() -> None:
print("Agent: ", end="", flush=True)
async for chunk in agent.run(query, stream=True):
for content in chunk.contents:
if isinstance(content, TextReasoningContent):
if content.type == "text_reasoning":
print(f"\033[32m{content.text}\033[0m", end="", flush=True)
if isinstance(content, UsageContent):
if content.type == "usage":
print(f"\n\033[34m[Usage so far: {content.usage_details}]\033[0m\n", end="", flush=True)
if chunk.text:
print(chunk.text, end="", flush=True)
@@ -4,7 +4,7 @@ import asyncio
import logging
from pathlib import Path
from agent_framework import HostedCodeInterpreterTool, HostedFileContent
from agent_framework import Content, HostedCodeInterpreterTool
from agent_framework.anthropic import AnthropicChatOptions, AnthropicClient
logger = logging.getLogger(__name__)
@@ -52,7 +52,7 @@ async def main() -> None:
query = "Create a presentation about renewable energy with 5 slides"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
files: list[HostedFileContent] = []
files: list[Content] = []
async for chunk in agent.run(query, stream=True):
for content in chunk.contents:
match content.type:
@@ -6,11 +6,10 @@ from pathlib import Path
from agent_framework import (
AgentResponseUpdate,
Annotation,
ChatAgent,
CitationAnnotation,
Content,
HostedCodeInterpreterTool,
HostedFileContent,
TextContent,
)
from agent_framework.azure import AzureAIProjectAgentProvider
from azure.identity.aio import AzureCliCredential
@@ -34,19 +33,17 @@ QUERY = (
)
async def download_container_files(
file_contents: list[CitationAnnotation | HostedFileContent], agent: ChatAgent
) -> list[Path]:
async def download_container_files(file_contents: list[Annotation | Content], agent: ChatAgent) -> list[Path]:
"""Download container files using the OpenAI containers API.
Code interpreter generates files in containers, which require both file_id
and container_id to download. The container_id is stored in additional_properties.
This function works for both streaming (HostedFileContent) and non-streaming
(CitationAnnotation) responses.
This function works for both streaming (Content with type="hosted_file") and non-streaming
(Annotation) responses.
Args:
file_contents: List of CitationAnnotation or HostedFileContent objects
file_contents: List of Annotation or Content objects
containing file_id and container_id.
agent: The ChatAgent instance with access to the AzureAIClient.
@@ -64,28 +61,36 @@ async def download_container_files(
print(f"\nDownloading {len(file_contents)} container file(s) to {output_dir.absolute()}...")
# Access the OpenAI client from AzureAIClient
openai_client = agent.chat_client.client
openai_client = agent.chat_client.client # type: ignore[attr-defined]
downloaded_files: list[Path] = []
for content in file_contents:
file_id = content.file_id
# Handle both Annotation (TypedDict) and Content objects
if isinstance(content, dict): # Annotation TypedDict
file_id = content.get("file_id")
additional_props = content.get("additional_properties", {})
url = content.get("url")
else: # Content object
file_id = content.file_id
additional_props = content.additional_properties or {}
url = content.uri
# Extract container_id from additional_properties
if not content.additional_properties or "container_id" not in content.additional_properties:
if not additional_props or "container_id" not in additional_props:
print(f" File {file_id}: ✗ Missing container_id")
continue
container_id = content.additional_properties["container_id"]
container_id = additional_props["container_id"]
# Extract filename based on content type
if isinstance(content, CitationAnnotation):
filename = content.url or f"{file_id}.txt"
if isinstance(content, dict): # Annotation TypedDict
filename = url or f"{file_id}.txt"
# Extract filename from sandbox URL if present (e.g., sandbox:/mnt/data/sample.txt)
if filename.startswith("sandbox:"):
filename = filename.split("/")[-1]
else: # HostedFileContent
filename = content.additional_properties.get("filename") or f"{file_id}.txt"
else: # Content
filename = additional_props.get("filename") or f"{file_id}.txt"
output_path = output_dir / filename
@@ -133,17 +138,18 @@ async def non_streaming_example() -> None:
print(f"Agent: {result.text}\n")
# Check for annotations in the response
annotations_found: list[CitationAnnotation] = []
annotations_found: list[Annotation] = []
# AgentResponse has messages property, which contains ChatMessage objects
for message in result.messages:
for content in message.contents:
if content.type == "text" and content.annotations:
for annotation in content.annotations:
if isinstance(annotation, CitationAnnotation) and annotation.file_id:
if annotation.get("file_id"):
annotations_found.append(annotation)
print(f"Found file annotation: file_id={annotation.file_id}")
if annotation.additional_properties and "container_id" in annotation.additional_properties:
print(f" container_id={annotation.additional_properties['container_id']}")
print(f"Found file annotation: file_id={annotation['file_id']}")
additional_props = annotation.get("additional_properties", {})
if additional_props and "container_id" in additional_props:
print(f" container_id={additional_props['container_id']}")
if annotations_found:
print(f"SUCCESS: Found {len(annotations_found)} file annotation(s)")
@@ -174,7 +180,7 @@ async def streaming_example() -> None:
)
print(f"User: {QUERY}\n")
file_contents_found: list[HostedFileContent] = []
file_contents_found: list[Content] = []
text_chunks: list[str] = []
async for update in agent.run(QUERY, stream=True):
@@ -185,11 +191,11 @@ async def streaming_example() -> None:
text_chunks.append(content.text)
if content.annotations:
for annotation in content.annotations:
if isinstance(annotation, CitationAnnotation) and annotation.file_id:
print(f"Found streaming CitationAnnotation: file_id={annotation.file_id}")
elif isinstance(content, HostedFileContent):
if annotation.get("file_id"):
print(f"Found streaming annotation: file_id={annotation['file_id']}")
elif content.type == "hosted_file":
file_contents_found.append(content)
print(f"Found streaming HostedFileContent: file_id={content.file_id}")
print(f"Found streaming hosted_file: file_id={content.file_id}")
if content.additional_properties and "container_id" in content.additional_properties:
print(f" container_id={content.additional_properties['container_id']}")
@@ -49,9 +49,9 @@ async def non_streaming_example() -> None:
for content in message.contents:
if content.type == "text" and content.annotations:
for annotation in content.annotations:
if annotation.file_id:
annotations_found.append(annotation.file_id)
print(f"Found file annotation: file_id={annotation.file_id}")
if annotation.get("file_id"):
annotations_found.append(annotation["file_id"])
print(f"Found file annotation: file_id={annotation['file_id']}")
if annotations_found:
print(f"SUCCESS: Found {len(annotations_found)} file annotation(s)")
@@ -86,9 +86,9 @@ async def streaming_example() -> None:
text_chunks.append(content.text)
if content.annotations:
for annotation in content.annotations:
if annotation.file_id:
annotations_found.append(annotation.file_id)
print(f"Found streaming annotation: file_id={annotation.file_id}")
if annotation.get("file_id"):
annotations_found.append(annotation["file_id"])
print(f"Found streaming annotation: file_id={annotation['file_id']}")
elif content.type == "hosted_file":
file_ids_found.append(content.file_id)
print(f"Found streaming HostedFileContent: file_id={content.file_id}")
@@ -4,7 +4,7 @@ import asyncio
import os
from pathlib import Path
from agent_framework import HostedFileSearchTool, HostedVectorStoreContent
from agent_framework import Content, HostedFileSearchTool
from agent_framework.azure import AzureAIProjectAgentProvider
from azure.ai.agents.aio import AgentsClient
from azure.ai.agents.models import FileInfo, VectorStore
@@ -46,7 +46,7 @@ async def main() -> None:
print(f"Created vector store, vector store ID: {vector_store.id}")
# 2. Create file search tool with uploaded resources
file_search_tool = HostedFileSearchTool(inputs=[HostedVectorStoreContent(vector_store_id=vector_store.id)])
file_search_tool = HostedFileSearchTool(inputs=[Content.from_hosted_vector_store(vector_store_id=vector_store.id)])
# 3. Create an agent with file search capabilities using the provider
agent = await provider.create_agent(
@@ -3,7 +3,7 @@
import asyncio
from typing import Any
from agent_framework import SupportsAgentRun, AgentResponse, AgentThread, ChatMessage, HostedMCPTool
from agent_framework import AgentResponse, AgentThread, ChatMessage, HostedMCPTool, SupportsAgentRun
from agent_framework.azure import AzureAIProjectAgentProvider
from azure.identity.aio import AzureCliCredential
@@ -5,7 +5,6 @@ import os
from agent_framework import (
HostedCodeInterpreterTool,
HostedFileContent,
)
from agent_framework.azure import AzureAIAgentsProvider
from azure.ai.agents.aio import AgentsClient
@@ -63,7 +62,7 @@ async def main() -> None:
for content in chunk.contents:
if content.type == "text":
print(content.text, end="", flush=True)
elif content.type == "hosted_file" and isinstance(content, HostedFileContent):
elif content.type == "hosted_file" and content.file_id:
file_ids.append(content.file_id)
print(f"\n[File generated: {content.file_id}]")
@@ -4,7 +4,7 @@ import asyncio
import os
from pathlib import Path
from agent_framework import HostedFileSearchTool, HostedVectorStoreContent
from agent_framework import Content, HostedFileSearchTool
from agent_framework.azure import AzureAIAgentsProvider
from azure.ai.agents.aio import AgentsClient
from azure.ai.agents.models import FileInfo, VectorStore
@@ -46,7 +46,7 @@ async def main() -> None:
print(f"Created vector store, vector store ID: {vector_store.id}")
# 2. Create file search tool with uploaded resources
file_search_tool = HostedFileSearchTool(inputs=[HostedVectorStoreContent(vector_store_id=vector_store.id)])
file_search_tool = HostedFileSearchTool(inputs=[Content.from_hosted_vector_store(vector_store_id=vector_store.id)])
# 3. Create an agent with file search capabilities
agent = await provider.create_agent(
@@ -3,7 +3,7 @@
import asyncio
from typing import Any
from agent_framework import SupportsAgentRun, AgentResponse, AgentThread, HostedMCPTool
from agent_framework import AgentResponse, AgentThread, HostedMCPTool, SupportsAgentRun
from agent_framework.azure import AzureAIAgentsProvider
from azure.identity.aio import AzureCliCredential
@@ -5,10 +5,10 @@ from datetime import datetime, timezone
from typing import Any
from agent_framework import (
SupportsAgentRun,
AgentThread,
HostedMCPTool,
HostedWebSearchTool,
SupportsAgentRun,
tool,
)
from agent_framework.azure import AzureAIAgentsProvider
@@ -2,7 +2,7 @@
import asyncio
from agent_framework import ChatAgent, HostedFileSearchTool, HostedVectorStoreContent
from agent_framework import ChatAgent, Content, HostedFileSearchTool
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
@@ -22,7 +22,7 @@ Prerequisites:
# Helper functions
async def create_vector_store(client: AzureOpenAIResponsesClient) -> tuple[str, HostedVectorStoreContent]:
async def create_vector_store(client: AzureOpenAIResponsesClient) -> tuple[str, Content]:
"""Create a vector store with sample documents."""
file = await client.client.files.create(
file=("todays_weather.txt", b"The weather today is sunny with a high of 75F."), purpose="assistants"
@@ -35,7 +35,7 @@ async def create_vector_store(client: AzureOpenAIResponsesClient) -> tuple[str,
if result.last_error is not None:
raise Exception(f"Vector store file processing failed with status: {result.last_error.message}")
return file.id, HostedVectorStoreContent(vector_store_id=vector_store.id)
return file.id, Content.from_hosted_vector_store(vector_store_id=vector_store.id)
async def delete_vector_store(client: AzureOpenAIResponsesClient, file_id: str, vector_store_id: str) -> None:
@@ -15,7 +15,7 @@ Azure OpenAI Responses Client, including user approval workflows for function ca
"""
if TYPE_CHECKING:
from agent_framework import SupportsAgentRun, AgentThread
from agent_framework import AgentThread, SupportsAgentRun
async def handle_approvals_without_thread(query: str, agent: "SupportsAgentRun"):
@@ -12,7 +12,7 @@ from agent_framework import (
ChatMessage,
Content,
Role,
TextContent,
normalize_messages,
)
"""
@@ -88,12 +88,14 @@ class EchoAgent(BaseAgent):
) -> AgentResponse:
"""Non-streaming implementation."""
# Normalize input messages to a list
normalized_messages = self._normalize_messages(messages)
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_message = ChatMessage(
role=Role.ASSISTANT,
contents=[Content.from_text(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
contents=[
Content.from_text(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")
],
)
else:
# For simplicity, echo the last user message
@@ -120,7 +122,7 @@ class EchoAgent(BaseAgent):
) -> AsyncIterable[AgentResponseUpdate]:
"""Streaming implementation."""
# Normalize input messages to a list
normalized_messages = self._normalize_messages(messages)
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
@@ -5,7 +5,15 @@ from collections.abc import Awaitable, Callable
from random import randint
from typing import Annotated
from agent_framework import ChatAgent, ChatContext, ChatMessage, ChatResponse, Role, chat_middleware, tool
from agent_framework import (
ChatAgent,
ChatContext,
ChatMessage,
ChatResponse,
MiddlewareTermination,
chat_middleware,
tool,
)
from agent_framework.openai import OpenAIResponsesClient
from pydantic import Field
@@ -39,7 +47,7 @@ async def security_and_override_middleware(
context.result = ChatResponse(
messages=[
ChatMessage(
role=Role.ASSISTANT,
role="assistant",
text="I cannot process requests containing sensitive information. "
"Please rephrase your question without including passwords, secrets, or other "
"sensitive data.",
@@ -48,8 +56,7 @@ async def security_and_override_middleware(
)
# Set terminate flag to stop execution
context.terminate = True
return
raise MiddlewareTermination
# Continue to next middleware or AI execution
await next(context)
@@ -70,7 +70,7 @@ async def main() -> None:
# Show information about the generated image
for message in result.messages:
for content in message.contents:
if content.type == "image_generation" and content.outputs:
if content.type == "image_generation_tool_result" and content.outputs:
for output in content.outputs:
if output.type in ("data", "uri") and output.uri:
show_image_info(output.uri)
@@ -32,7 +32,7 @@ async def main() -> None:
print(f"Result: {result}\n")
for message in result.messages:
code_blocks = [c for c in message.contents if c.type == "code_interpreter_tool_input"]
code_blocks = [c for c in message.contents if c.type == "code_interpreter_tool_call"]
outputs = [c for c in message.contents if c.type == "code_interpreter_tool_result"]
if code_blocks:
code_inputs = code_blocks[0].inputs or []
@@ -14,7 +14,7 @@ OpenAI Responses Client, including user approval workflows for function call sec
"""
if TYPE_CHECKING:
from agent_framework import SupportsAgentRun, AgentThread
from agent_framework import AgentThread, SupportsAgentRun
async def handle_approvals_without_thread(query: str, agent: "SupportsAgentRun"):
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Host a single Azure OpenAI-powered agent inside Azure Functions.
Components used in this sample:
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Host multiple Azure OpenAI agents inside a single Azure Functions app.
Components used in this sample:
@@ -163,7 +163,7 @@ class RedisStreamResponseHandler:
has_seen_data = True
# Process entries from the stream
for stream_name, stream_entries in entries:
for _stream_name, stream_entries in entries:
for entry_id, entry_data in stream_entries:
start_id = entry_id.decode() if isinstance(entry_id, bytes) else entry_id
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Chain two runs of a single agent inside a Durable Functions orchestration.
Components used in this sample:
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Fan out concurrent runs across two agents inside a Durable Functions orchestration.
Components used in this sample:
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Route email requests through conditional orchestration with two agents.
Components used in this sample:
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Iterate on generated content with a human-in-the-loop Durable orchestration.
Components used in this sample:
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Example showing how to configure AI agents with different trigger configurations.
@@ -4,10 +4,10 @@ import asyncio
from random import randint
from typing import Annotated
from agent_framework import ChatResponse, tool
from agent_framework import tool
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel, Field
from pydantic import BaseModel
"""
Azure Responses Client Direct Usage Example
@@ -20,42 +20,75 @@ Shows function calling capabilities with custom business logic.
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production; see samples/getting_started/tools/function_tool_with_approval.py and samples/getting_started/tools/function_tool_with_approval_and_threads.py.
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
location: Annotated[str, "The location to get the weather for."],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
class OutputStruct(BaseModel):
@tool(approval_mode="never_require")
def get_time():
"""Get the current time."""
from datetime import datetime
now = datetime.now()
return f"The current date time is {now.strftime('%Y-%m-%d - %H:%M:%S')}."
class WeatherDetail(BaseModel):
"""Structured output for weather information."""
location: str
weather: str
class Weather(BaseModel):
"""Container for multiple outputs."""
date_time: str
weather_details: list[WeatherDetail]
async def main() -> None:
# For authentication, run `az login` command in terminal or replace AzureCliCredential with preferred
# authentication option.
client = AzureOpenAIResponsesClient(credential=AzureCliCredential())
client = AzureOpenAIResponsesClient(credential=AzureCliCredential(), api_version="preview")
message = "What's the weather in Amsterdam and in Paris?"
stream = True
print(f"User: {message}")
response = client.get_response(
message,
options={"response_format": Weather, "tools": [get_weather, get_time]},
stream=stream,
)
if stream:
response = await ChatResponse.from_chat_response_generator(
client.get_response(message, tools=get_weather, options={"response_format": OutputStruct}, stream=True),
output_format_type=OutputStruct,
)
if result := response.try_parse_value(OutputStruct):
print(f"Assistant: {result}")
else:
print(f"Assistant: {response.text}")
response = await response.get_final_response()
else:
response = await client.get_response(message, tools=get_weather, options={"response_format": OutputStruct})
if result := response.try_parse_value(OutputStruct):
print(f"Assistant: {result}")
else:
print(f"Assistant: {response.text}")
response = await response
if result := response.value:
print(f"Assistant: {result.model_dump_json(indent=2)}")
else:
print(f"Assistant: {response.text}")
# Expected output (time will be different):
"""
User: What's the weather in Amsterdam and in Paris?
Assistant: {
"date_time": "2026-02-06 - 13:30:40",
"weather_details": [
{
"location": "Amsterdam",
"weather": "The weather in Amsterdam is cloudy with a high of 21°C."
},
{
"location": "Paris",
"weather": "The weather in Paris is sunny with a high of 27°C."
}
]
}
"""
if __name__ == "__main__":
@@ -4,13 +4,12 @@ import asyncio
import random
import sys
from collections.abc import AsyncIterable, Awaitable, Mapping, Sequence
from typing import Any, ClassVar, Generic, TypedDict
from typing import Any, ClassVar, Generic
from agent_framework import (
BaseChatClient,
ChatMessage,
ChatMiddlewareLayer,
ChatOptions,
ChatResponse,
ChatResponseUpdate,
Content,
@@ -22,9 +21,9 @@ from agent_framework._clients import TOptions_co
from agent_framework.observability import ChatTelemetryLayer
if sys.version_info >= (3, 13):
from typing import TypeVar
pass
else:
from typing_extensions import TypeVar
pass
if sys.version_info >= (3, 12):
from typing import override # type: ignore # pragma: no cover
else:
@@ -38,13 +37,6 @@ This sample demonstrates implementing a custom chat client and optionally compos
middleware, telemetry, and function invocation layers explicitly.
"""
TOptions_co = TypeVar(
"TOptions_co",
bound=TypedDict, # type: ignore[valid-type]
default="ChatOptions",
covariant=True,
)
class EchoingChatClient(BaseChatClient[TOptions_co], Generic[TOptions_co]):
"""A custom chat client that echoes messages back with modifications.
@@ -32,14 +32,14 @@ async def main() -> None:
message = "What's the weather in Amsterdam and in Paris?"
stream = True
print(f"User: {message}")
print("Assistant: ", end="")
response = client.get_response(message, stream=stream, options={"tools": get_weather})
if stream:
print("Assistant: ", end="")
response = client.get_response(message, stream=True, tools=get_weather)
# TODO: review names of the methods, could be related to things like HTTP clients?
response.with_update_hook(lambda chunk: print(chunk.text, end=""))
response.with_transform_hook(lambda chunk: print(chunk.text, end=""))
await response.get_final_response()
else:
response = await client.get_response(message, tools=get_weather)
response = await response
print(f"Assistant: {response}")
@@ -211,7 +211,7 @@ class PreferencesContextProvider(ContextProvider):
msgs = [request_messages] if isinstance(request_messages, ChatMessage) else list(request_messages)
for msg in msgs:
content = msg.content if hasattr(msg, "content") else ""
content = msg.text if hasattr(msg, "text") else ""
# Very simple extraction - in production, use LLM-based extraction
if isinstance(content, str) and "prefer" in content.lower() and ":" in content:
parts = content.split(":")
@@ -63,15 +63,16 @@ async def main() -> None:
thread_id = "azure_test_thread"
# Factory for creating Azure Redis chat message store
chat_message_store_factory = lambda: RedisChatMessageStore(
credential_provider=credential_provider,
host=redis_host,
port=10000,
ssl=True,
thread_id=thread_id,
key_prefix="chat_messages",
max_messages=100,
)
def chat_message_store_factory():
return RedisChatMessageStore(
credential_provider=credential_provider,
host=redis_host,
port=10000,
ssl=True,
thread_id=thread_id,
key_prefix="chat_messages",
max_messages=100,
)
# Create chat client
client = OpenAIChatClient()
@@ -52,12 +52,14 @@ async def main() -> None:
vector_distance_metric="cosine",
thread_id=thread_id,
)
chat_message_store_factory = lambda: RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id=thread_id,
key_prefix="chat_messages",
max_messages=100,
)
def chat_message_store_factory():
return RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id=thread_id,
key_prefix="chat_messages",
max_messages=100,
)
# Create chat client for the agent
client = OpenAIChatClient(model_id=os.getenv("OPENAI_CHAT_MODEL_ID"), api_key=os.getenv("OPENAI_API_KEY"))
@@ -14,8 +14,8 @@ from agent_framework import (
ChatResponseUpdate,
Content,
FunctionInvocationContext,
Role,
TextContent,
MiddlewareTermination,
ResponseStream,
chat_middleware,
function_middleware,
tool,
@@ -44,7 +44,7 @@ async def security_filter_middleware(
# Check only the last message (most recent user input)
last_message = context.messages[-1] if context.messages else None
if last_message and last_message.role == Role.USER and last_message.text:
if last_message and last_message.role == "user" and last_message.text:
message_lower = last_message.text.lower()
for term in blocked_terms:
if term in message_lower:
@@ -56,26 +56,25 @@ async def security_filter_middleware(
if context.stream:
# Streaming mode: return async generator
async def blocked_stream() -> AsyncIterable[ChatResponseUpdate]:
async def blocked_stream(msg: str = error_message) -> AsyncIterable[ChatResponseUpdate]:
yield ChatResponseUpdate(
contents=[Content.from_text(text=error_message)],
role=Role.ASSISTANT,
contents=[Content.from_text(text=msg)],
role="assistant",
)
context.result = blocked_stream()
context.result = ResponseStream(blocked_stream(), finalizer=ChatResponse.from_updates)
else:
# Non-streaming mode: return complete response
context.result = ChatResponse(
messages=[
ChatMessage(
role=Role.ASSISTANT,
role="assistant",
text=error_message,
)
]
)
context.terminate = True
return
raise MiddlewareTermination
await next(context)
@@ -93,8 +92,7 @@ async def atlantis_location_filter_middleware(
"Blocked! Hold up right there!! Tell the user that "
"'Atlantis is a special place, we must never ask about the weather there!!'"
)
context.terminate = True
return
raise MiddlewareTermination
await next(context)
@@ -1,11 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
"""Client application for interacting with a Durable Task hosted agent.
This client connects to the Durable Task Scheduler and sends requests to
registered agents, demonstrating how to interact with agents from external processes.
Prerequisites:
Prerequisites:
- The worker must be running with the agent registered
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running
"""
@@ -29,12 +31,12 @@ def get_client(
log_handler: logging.Handler | None = None
) -> DurableAIAgentClient:
"""Create a configured DurableAIAgentClient.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for client logging
Returns:
Configured DurableAIAgentClient instance
"""
@@ -59,7 +61,7 @@ def get_client(
def run_client(agent_client: DurableAIAgentClient) -> None:
"""Run client interactions with the Joker agent.
Args:
agent_client: The DurableAIAgentClient instance
"""
@@ -1,11 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
"""Single Agent Sample - Durable Task Integration (Combined Worker + Client)
This sample demonstrates running both the worker and client in a single process.
The worker is started first to register the agent, then client operations are
performed against the running worker.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running (e.g., using Docker)
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Worker process for hosting a single Azure OpenAI-powered agent using Durable Task.
This worker registers agents as durable entities and continuously listens for requests.
@@ -1,12 +1,14 @@
# Copyright (c) Microsoft. All rights reserved.
"""Client application for interacting with multiple hosted agents.
This client connects to the Durable Task Scheduler and interacts with two different
agents (WeatherAgent and MathAgent), demonstrating how to work with multiple agents
each with their own specialized capabilities and tools.
Prerequisites:
Prerequisites:
- The worker must be running with both agents registered
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running
"""
@@ -30,12 +32,12 @@ def get_client(
log_handler: logging.Handler | None = None
) -> DurableAIAgentClient:
"""Create a configured DurableAIAgentClient.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for client logging
Returns:
Configured DurableAIAgentClient instance
"""
@@ -60,7 +62,7 @@ def get_client(
def run_client(agent_client: DurableAIAgentClient) -> None:
"""Run client interactions with both WeatherAgent and MathAgent.
Args:
agent_client: The DurableAIAgentClient instance
"""
@@ -1,11 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
"""Multi-Agent Sample - Durable Task Integration (Combined Worker + Client)
This sample demonstrates running both the worker and client in a single process
for multiple agents with different tools. The worker registers two agents
(WeatherAgent and MathAgent), each with their own specialized capabilities.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running (e.g., using Docker)
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Worker process for hosting multiple agents with different tools using Durable Task.
This worker registers two agents - a weather assistant and a math assistant - each
@@ -7,7 +7,7 @@ This client demonstrates:
2. Streaming the response from Redis in real-time
3. Handling reconnection and cursor-based resumption
Prerequisites:
Prerequisites:
- The worker must be running with the TravelPlanner agent registered
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
- Redis must be running
@@ -59,12 +59,12 @@ def get_client(
log_handler: logging.Handler | None = None
) -> DurableAIAgentClient:
"""Create a configured DurableAIAgentClient.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional log handler for client logging
Returns:
Configured DurableAIAgentClient instance
"""
@@ -89,7 +89,7 @@ def get_client(
async def stream_from_redis(thread_id: str, cursor: str | None = None) -> None:
"""Stream agent responses from Redis.
Args:
thread_id: The conversation/thread ID to stream from
cursor: Optional cursor to resume from. If None, starts from beginning.
@@ -132,7 +132,7 @@ async def stream_from_redis(thread_id: str, cursor: str | None = None) -> None:
def run_client(agent_client: DurableAIAgentClient) -> None:
"""Run client interactions with the TravelPlanner agent.
Args:
agent_client: The DurableAIAgentClient instance
"""
@@ -8,8 +8,8 @@ with reliable Redis-based streaming for agent responses.
The worker is started first to register the TravelPlanner agent with Redis streaming
callback, then client operations are performed against the running worker.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running (e.g., using Docker)
- Redis must be running (e.g., docker run -d --name redis -p 6379:6379 redis:latest)
@@ -5,8 +5,8 @@
This worker registers the TravelPlanner agent with the Durable Task Scheduler
and uses RedisStreamCallback to persist streaming responses to Redis for reliable delivery.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Start a Durable Task Scheduler (e.g., using Docker)
- Start Redis (e.g., docker run -d --name redis -p 6379:6379 redis:latest)
@@ -145,7 +145,7 @@ class RedisStreamCallback(AgentResponseCallbackProtocol):
def create_travel_agent() -> "ChatAgent":
"""Create the TravelPlanner agent using Azure OpenAI.
Returns:
ChatAgent: The configured TravelPlanner agent with travel planning tools.
"""
@@ -174,12 +174,12 @@ def get_worker(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerWorker:
"""Create a configured DurableTaskSchedulerWorker.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional log handler for worker logging
Returns:
Configured DurableTaskSchedulerWorker instance
"""
@@ -202,10 +202,10 @@ def get_worker(
def setup_worker(worker: DurableTaskSchedulerWorker) -> DurableAIAgentWorker:
"""Set up the worker with the TravelPlanner agent and Redis streaming callback.
Args:
worker: The DurableTaskSchedulerWorker instance
Returns:
DurableAIAgentWorker with agent and callback registered
"""
@@ -1,12 +1,14 @@
# Copyright (c) Microsoft. All rights reserved.
"""Client application for starting a single agent chaining orchestration.
This client connects to the Durable Task Scheduler and starts an orchestration
that runs a writer agent twice sequentially on the same thread, demonstrating
how conversation context is maintained across multiple agent invocations.
Prerequisites:
Prerequisites:
- The worker must be running with the writer agent and orchestration registered
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running
"""
@@ -30,12 +32,12 @@ def get_client(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerClient:
"""Create a configured DurableTaskSchedulerClient.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for client logging
Returns:
Configured DurableTaskSchedulerClient instance
"""
@@ -58,7 +60,7 @@ def get_client(
def run_client(client: DurableTaskSchedulerClient) -> None:
"""Run client to start and monitor the orchestration.
Args:
client: The DurableTaskSchedulerClient instance
"""
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Single Agent Orchestration Chaining Sample - Durable Task Integration
This sample demonstrates chaining two invocations of the same agent inside a Durable Task
@@ -10,8 +12,8 @@ Components used:
- DurableTaskSchedulerClient and orchestration for sequential agent invocations
- Thread management to maintain conversation context across invocations
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running (e.g., using Docker emulator)
@@ -1,11 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
"""Worker process for hosting a single agent with chaining orchestration using Durable Task.
This worker registers a writer agent and an orchestration function that demonstrates
chaining behavior by running the agent twice sequentially on the same thread,
preserving conversation context between invocations.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Start a Durable Task Scheduler (e.g., using Docker)
"""
@@ -31,10 +33,10 @@ WRITER_AGENT_NAME = "WriterAgent"
def create_writer_agent() -> "ChatAgent":
"""Create the Writer agent using Azure OpenAI.
This agent refines short pieces of text, enhancing initial sentences
and polishing improved versions further.
Returns:
ChatAgent: The configured Writer agent
"""
@@ -51,7 +53,7 @@ def create_writer_agent() -> "ChatAgent":
def get_orchestration():
"""Get the orchestration function for this sample.
Returns:
The orchestration function to register with the worker
"""
@@ -62,18 +64,18 @@ def single_agent_chaining_orchestration(
context: OrchestrationContext, _: str
) -> Generator[Task[AgentResponse], AgentResponse, str]:
"""Orchestration that runs the writer agent twice on the same thread.
This demonstrates chaining behavior where the output of the first agent run
becomes part of the input for the second run, all while maintaining the
conversation context through a shared thread.
Args:
context: The orchestration context
_: Input parameter (unused)
Yields:
Task[AgentRunResponse]: Tasks that resolve to AgentRunResponse
Returns:
str: The final refined text from the second agent run
"""
@@ -123,12 +125,12 @@ def get_worker(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerWorker:
"""Create a configured DurableTaskSchedulerWorker.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for worker logging
Returns:
Configured DurableTaskSchedulerWorker instance
"""
@@ -151,10 +153,10 @@ def get_worker(
def setup_worker(worker: DurableTaskSchedulerWorker) -> DurableAIAgentWorker:
"""Set up the worker with agents and orchestrations registered.
Args:
worker: The DurableTaskSchedulerWorker instance
Returns:
DurableAIAgentWorker with agents and orchestrations registered
"""
@@ -1,12 +1,14 @@
# Copyright (c) Microsoft. All rights reserved.
"""Client application for starting a multi-agent concurrent orchestration.
This client connects to the Durable Task Scheduler and starts an orchestration
that runs two agents (physicist and chemist) concurrently, then retrieves and
displays the aggregated results.
Prerequisites:
Prerequisites:
- The worker must be running with both agents and orchestration registered
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running
"""
@@ -30,12 +32,12 @@ def get_client(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerClient:
"""Create a configured DurableTaskSchedulerClient.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for client logging
Returns:
Configured DurableTaskSchedulerClient instance
"""
@@ -58,7 +60,7 @@ def get_client(
def run_client(client: DurableTaskSchedulerClient, prompt: str = "What is temperature?") -> None:
"""Run client to start and monitor the orchestration.
Args:
client: The DurableTaskSchedulerClient instance
prompt: The prompt to send to both agents
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Multi-Agent Orchestration Sample - Durable Task Integration (Combined Worker + Client)
This sample demonstrates running both the worker and client in a single process for
@@ -7,8 +9,8 @@ concurrent multi-agent orchestration. The worker registers two domain-specific a
The orchestration uses OrchestrationAgentExecutor to execute agents concurrently
and aggregate their responses.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running (e.g., using Docker)
@@ -1,11 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
"""Worker process for hosting multiple agents with orchestration using Durable Task.
This worker registers two domain-specific agents (physicist and chemist) and an orchestration
function that runs them concurrently. The orchestration uses OrchestrationAgentExecutor
function that runs them concurrently. The orchestration uses OrchestrationAgentExecutor
to execute agents in parallel and aggregate their responses.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Start a Durable Task Scheduler (e.g., using Docker)
"""
@@ -33,7 +35,7 @@ CHEMIST_AGENT_NAME = "ChemistAgent"
def create_physicist_agent() -> "ChatAgent":
"""Create the Physicist agent using Azure OpenAI.
Returns:
ChatAgent: The configured Physicist agent
"""
@@ -45,7 +47,7 @@ def create_physicist_agent() -> "ChatAgent":
def create_chemist_agent() -> "ChatAgent":
"""Create the Chemist agent using Azure OpenAI.
Returns:
ChatAgent: The configured Chemist agent
"""
@@ -57,14 +59,14 @@ def create_chemist_agent() -> "ChatAgent":
def multi_agent_concurrent_orchestration(context: OrchestrationContext, prompt: str) -> Generator[Task[Any], Any, dict[str, str]]:
"""Orchestration that runs both agents in parallel and aggregates results.
Uses DurableAIAgentOrchestrationContext to wrap the orchestration context and
access agents via the OrchestrationAgentExecutor.
Args:
context: The orchestration context
prompt: The prompt to send to both agents
Returns:
dict: Dictionary with 'physicist' and 'chemist' response texts
"""
@@ -115,12 +117,12 @@ def get_worker(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerWorker:
"""Create a configured DurableTaskSchedulerWorker.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for worker logging
Returns:
Configured DurableTaskSchedulerWorker instance
"""
@@ -143,10 +145,10 @@ def get_worker(
def setup_worker(worker: DurableTaskSchedulerWorker) -> DurableAIAgentWorker:
"""Set up the worker with agents and orchestrations registered.
Args:
worker: The DurableTaskSchedulerWorker instance
Returns:
DurableAIAgentWorker with agents and orchestrations registered
"""
@@ -1,11 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
"""Client application for starting a spam detection orchestration.
This client connects to the Durable Task Scheduler and starts an orchestration
that uses conditional logic to either handle spam emails or draft professional responses.
Prerequisites:
Prerequisites:
- The worker must be running with both agents, orchestration, and activities registered
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running
"""
@@ -28,12 +30,12 @@ def get_client(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerClient:
"""Create a configured DurableTaskSchedulerClient.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for client logging
Returns:
Configured DurableTaskSchedulerClient instance
"""
@@ -60,7 +62,7 @@ def run_client(
email_content: str = "Hello! I wanted to reach out about our upcoming project meeting."
) -> None:
"""Run client to start and monitor the spam detection orchestration.
Args:
client: The DurableTaskSchedulerClient instance
email_id: The email ID
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Multi-Agent Orchestration with Conditionals Sample - Durable Task Integration
This sample demonstrates conditional orchestration logic with two agents:
@@ -7,8 +9,8 @@ This sample demonstrates conditional orchestration logic with two agents:
The orchestration branches based on spam detection results, calling different
activity functions to handle spam or send legitimate email responses.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running (e.g., using Docker)
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Worker process for hosting spam detection and email assistant agents with conditional orchestration.
This worker registers two domain-specific agents (spam detector and email assistant) and an
@@ -51,7 +53,7 @@ class EmailPayload(BaseModel):
def create_spam_agent() -> "ChatAgent":
"""Create the Spam Detection agent using Azure OpenAI.
Returns:
ChatAgent: The configured Spam Detection agent
"""
@@ -63,7 +65,7 @@ def create_spam_agent() -> "ChatAgent":
def create_email_agent() -> "ChatAgent":
"""Create the Email Assistant agent using Azure OpenAI.
Returns:
ChatAgent: The configured Email Assistant agent
"""
@@ -75,11 +77,11 @@ def create_email_agent() -> "ChatAgent":
def handle_spam_email(context: ActivityContext, reason: str) -> str:
"""Activity function to handle spam emails.
Args:
context: The activity context
reason: The reason why the email was marked as spam
Returns:
str: Confirmation message
"""
@@ -89,11 +91,11 @@ def handle_spam_email(context: ActivityContext, reason: str) -> str:
def send_email(context: ActivityContext, message: str) -> str:
"""Activity function to send emails.
Args:
context: The activity context
message: The email message to send
Returns:
str: Confirmation message
"""
@@ -103,17 +105,17 @@ def send_email(context: ActivityContext, message: str) -> str:
def spam_detection_orchestration(context: OrchestrationContext, payload_raw: Any) -> Generator[Task[Any], Any, str]:
"""Orchestration that detects spam and conditionally drafts email responses.
This orchestration:
1. Validates the input payload
2. Runs the spam detection agent
3. If spam: calls handle_spam_email activity
4. If legitimate: runs email assistant agent and calls send_email activity
Args:
context: The orchestration context
payload_raw: The input payload dictionary
Returns:
str: Result message from activity functions
"""
@@ -198,12 +200,12 @@ def get_worker(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerWorker:
"""Create a configured DurableTaskSchedulerWorker.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for worker logging
Returns:
Configured DurableTaskSchedulerWorker instance
"""
@@ -226,10 +228,10 @@ def get_worker(
def setup_worker(worker: DurableTaskSchedulerWorker) -> DurableAIAgentWorker:
"""Set up the worker with agents, orchestrations, and activities registered.
Args:
worker: The DurableTaskSchedulerWorker instance
Returns:
DurableAIAgentWorker with agents, orchestrations, and activities registered
"""
@@ -1,11 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
"""Client application for starting a human-in-the-loop content generation orchestration.
This client connects to the Durable Task Scheduler and demonstrates the HITL pattern
by starting an orchestration, sending approval/rejection events, and monitoring progress.
Prerequisites:
Prerequisites:
- The worker must be running with the agent, orchestration, and activities registered
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running
"""
@@ -34,12 +36,12 @@ def get_client(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerClient:
"""Create a configured DurableTaskSchedulerClient.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for client logging
Returns:
Configured DurableTaskSchedulerClient instance
"""
@@ -64,7 +66,7 @@ def _log_completion_result(
metadata: OrchestrationState | None,
) -> None:
"""Log the orchestration completion result.
Args:
metadata: The orchestration metadata
"""
@@ -94,7 +96,7 @@ def _wait_and_log_completion(
timeout: int = 60
) -> None:
"""Wait for orchestration completion and log the result.
Args:
client: The DurableTaskSchedulerClient instance
instance_id: The orchestration instance ID
@@ -116,7 +118,7 @@ def send_approval(
feedback: str = ""
) -> None:
"""Send approval or rejection event to the orchestration.
Args:
client: The DurableTaskSchedulerClient instance
instance_id: The orchestration instance ID
@@ -148,14 +150,14 @@ def wait_for_notification(
timeout_seconds: int = 10
) -> bool:
"""Wait for the orchestration to reach a notification point.
Polls the orchestration status until it appears to be waiting for approval.
Args:
client: The DurableTaskSchedulerClient instance
instance_id: The orchestration instance ID
timeout_seconds: Maximum time to wait
Returns:
True if notification detected, False if timeout
"""
@@ -202,7 +204,7 @@ def wait_for_notification(
def run_interactive_client(client: DurableTaskSchedulerClient) -> None:
"""Run an interactive client that prompts for user input and handles approval workflow.
Args:
client: The DurableTaskSchedulerClient instance
"""
@@ -1,3 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
"""Human-in-the-Loop Orchestration Sample - Durable Task Integration
This sample demonstrates the HITL pattern with a WriterAgent that generates content
@@ -7,8 +9,8 @@ and waits for human approval. The orchestration handles:
- Iterative refinement based on feedback
- Activity functions for notifications and publishing
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Durable Task Scheduler must be running (e.g., using Docker)
@@ -1,11 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
"""Worker process for hosting a writer agent with human-in-the-loop orchestration.
This worker registers a WriterAgent and an orchestration function that implements
a human-in-the-loop review workflow. The orchestration pauses for external events
(human approval/rejection) with timeout handling, and iterates based on feedback.
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
Prerequisites:
- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
(plus AZURE_OPENAI_API_KEY or Azure CLI authentication)
- Start a Durable Task Scheduler (e.g., using Docker)
"""
@@ -54,7 +56,7 @@ class HumanApproval(BaseModel):
def create_writer_agent() -> "ChatAgent":
"""Create the Writer agent using Azure OpenAI.
Returns:
ChatAgent: The configured Writer agent
"""
@@ -73,7 +75,7 @@ def create_writer_agent() -> "ChatAgent":
def notify_user_for_approval(context: ActivityContext, content: dict[str, str]) -> str:
"""Activity function to notify user for approval.
Args:
context: The activity context
content: The generated content dictionary
@@ -88,7 +90,7 @@ def notify_user_for_approval(context: ActivityContext, content: dict[str, str])
def publish_content(context: ActivityContext, content: dict[str, str]) -> str:
"""Activity function to publish approved content.
Args:
context: The activity context
content: The generated content dictionary
@@ -105,7 +107,7 @@ def content_generation_hitl_orchestration(
payload_raw: Any
) -> Generator[Task[Any], Any, dict[str, str]]:
"""Human-in-the-loop orchestration for content generation with approval workflow.
This orchestration:
1. Generates initial content using WriterAgent
2. Loops up to max_review_attempts times:
@@ -115,14 +117,14 @@ def content_generation_hitl_orchestration(
d. If rejected: incorporates feedback and regenerates
e. If timeout: raises TimeoutError
3. Raises RuntimeError if max attempts exhausted
Args:
context: The orchestration context
payload_raw: The input payload
Returns:
dict: Result with published content
Raises:
ValueError: If input is invalid or agent returns no content
TimeoutError: If human approval times out
@@ -285,12 +287,12 @@ def get_worker(
log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerWorker:
"""Create a configured DurableTaskSchedulerWorker.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for worker logging
Returns:
Configured DurableTaskSchedulerWorker instance
"""
@@ -313,10 +315,10 @@ def get_worker(
def setup_worker(worker: DurableTaskSchedulerWorker) -> DurableAIAgentWorker:
"""Set up the worker with agents, orchestrations, and activities registered.
Args:
worker: The DurableTaskSchedulerWorker instance
Returns:
DurableAIAgentWorker with agents, orchestrations, and activities registered
"""
@@ -26,7 +26,7 @@ Self-Reflection LLM Runner
Reflexion: language agents with verbal reinforcement learning.
Noah Shinn, Federico Cassano, Ashwin Gopinath, Karthik Narasimhan, and Shunyu Yao. 2023.
In Proceedings of the 37th International Conference on Neural Information Processing Systems (NIPS '23). Curran Associates Inc., Red Hook, NY, USA, Article 377, 86348652.
https://arxiv.org/abs/2303.11366
https://arxiv.org/abs/2303.11366
This module implements a self-reflection loop for LLM responses using groundedness evaluation.
It loads prompts from a JSONL file, runs them through an LLM with self-reflection,
@@ -123,8 +123,7 @@ def run_eval(
print(f"Eval run failed. Run ID: {run.id}, Status: {run.status}, Error: {getattr(run, 'error', 'Unknown error')}")
continue
if run.status == "completed":
output_items = list(client.evals.runs.output_items.list(run_id=run.id, eval_id=eval_object.id))
return output_items
return list(client.evals.runs.output_items.list(run_id=run.id, eval_id=eval_object.id))
time.sleep(5)
print("Eval result retrieval timeout.")
@@ -142,14 +141,14 @@ async def execute_query_with_self_reflection(
) -> dict[str, Any]:
"""
Execute a query with self-reflection loop.
Args:
agent: ChatAgent instance to use for generating responses
full_user_query: Complete prompt including system prompt, user request, and context
context: Context document for groundedness evaluation
evaluator: Groundedness evaluator function
max_self_reflections: Maximum number of self-reflection iterations
Returns:
Dictionary containing:
- best_response: The best response achieved
@@ -10,6 +10,7 @@ from agent_framework import (
ChatMessage,
ChatMiddleware,
ChatResponse,
MiddlewareTermination,
chat_middleware,
tool,
)
@@ -127,8 +128,7 @@ async def security_and_override_middleware(
)
# Set terminate flag to stop execution
context.terminate = True
return
raise MiddlewareTermination
# Continue to next middleware or AI execution
await next(context)
@@ -10,6 +10,7 @@ from agent_framework import (
AgentMiddleware,
AgentResponse,
ChatMessage,
MiddlewareTermination,
tool,
)
from agent_framework.azure import AzureAIAgentClient
@@ -72,8 +73,7 @@ class PreTerminationMiddleware(AgentMiddleware):
)
# Set terminate flag to prevent further processing
context.terminate = True
break
raise MiddlewareTermination
await next(context)
@@ -98,7 +98,7 @@ class PostTerminationMiddleware(AgentMiddleware):
f"[PostTerminationMiddleware] Maximum responses ({self.max_responses}) reached. "
"Terminating further processing."
)
context.terminate = True
raise MiddlewareTermination
# Allow the agent to process normally
await next(context)
@@ -15,7 +15,6 @@ from agent_framework import (
ChatResponse,
ChatResponseUpdate,
ResponseStream,
Role,
tool,
)
from agent_framework.openai import OpenAIResponsesClient
@@ -76,12 +75,12 @@ async def weather_override_middleware(context: ChatContext, next: Callable[[Chat
index["value"] += 1
return update
context.result.with_update_hook(_update_hook)
context.result.with_transform_hook(_update_hook)
else:
# For non-streaming: just replace with a new message
current_text = context.result.text or ""
current_text = context.result.text or "" # type: ignore
custom_message = f"Weather Advisory: [0] {''.join(chunks)} Original message was: {current_text}"
context.result = ChatResponse(messages=[ChatMessage(role=Role.ASSISTANT, text=custom_message)])
context.result = ChatResponse(messages=[ChatMessage(role="assistant", text=custom_message)])
async def validate_weather_middleware(context: ChatContext, next: Callable[[ChatContext], Awaitable[None]]) -> None:
@@ -96,12 +95,12 @@ async def validate_weather_middleware(context: ChatContext, next: Callable[[Chat
if context.stream and isinstance(context.result, ResponseStream):
def _append_validation_note(response: ChatResponse) -> ChatResponse:
response.messages.append(ChatMessage(role=Role.ASSISTANT, text=validation_note))
response.messages.append(ChatMessage(role="assistant", text=validation_note))
return response
context.result.with_finalizer(_append_validation_note)
context.result.with_result_hook(_append_validation_note)
elif isinstance(context.result, ChatResponse):
context.result.messages.append(ChatMessage(role=Role.ASSISTANT, text=validation_note))
context.result.messages.append(ChatMessage(role="assistant", text=validation_note))
async def agent_cleanup_middleware(context: AgentContext, next: Callable[[AgentContext], Awaitable[None]]) -> None:
@@ -154,7 +153,7 @@ async def agent_cleanup_middleware(context: AgentContext, next: Callable[[AgentC
if not found_validation:
raise RuntimeError("Expected validation note not found in agent response.")
cleaned_messages.append(ChatMessage(role=Role.ASSISTANT, text=" Agent: OK"))
cleaned_messages.append(ChatMessage(role="assistant", text=" Agent: OK"))
response.messages = cleaned_messages
return response
@@ -172,8 +171,8 @@ async def agent_cleanup_middleware(context: AgentContext, next: Callable[[AgentC
content.text = text
return update
context.result.with_update_hook(_clean_update)
context.result.with_finalizer(_sanitize)
context.result.with_transform_hook(_clean_update)
context.result.with_result_hook(_sanitize)
elif isinstance(context.result, AgentResponse):
context.result = _sanitize(context.result)
@@ -192,6 +191,19 @@ async def main() -> None:
tools=get_weather,
middleware=[agent_cleanup_middleware],
)
# Streaming example
print("\n--- Streaming Example ---")
query = "What's the weather like in Portland?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
response = agent.run(query, stream=True)
# add the hooks to print what you want to see
response.with_transform_hook(lambda chunk: print(chunk.text, end="", flush=True)).with_result_hook(
lambda final: print(f"\nFinal streamed response: {final.text}", flush=True)
)
# consume the stream to trigger the hooks
await response.get_final_response()
# Non-streaming example
print("\n--- Non-streaming Example ---")
query = "What's the weather like in Seattle?"
@@ -199,18 +211,6 @@ async def main() -> None:
result = await agent.run(query)
print(f"Agent: {result}")
# Streaming example
print("\n--- Streaming Example ---")
query = "What's the weather like in Portland?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
response = agent.run(query, stream=True)
async for chunk in response:
if chunk.text:
print(chunk.text, end="", flush=True)
print("\n")
print(f"Final Result: {(await response.get_final_response()).text}")
if __name__ == "__main__":
asyncio.run(main())
@@ -6,7 +6,6 @@ from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
)
from agent_framework.observability import configure_otel_providers, get_tracer
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any, Never
from typing import Any
from agent_framework import (
ChatAgent,
@@ -14,6 +14,7 @@ from agent_framework import (
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import ConcurrentBuilder
from azure.identity import AzureCliCredential
from typing_extensions import Never
"""
Sample: Concurrent Orchestration with participant factories and Custom Aggregator
@@ -150,11 +150,10 @@ def _handle_events(events: list[WorkflowEvent]) -> list[WorkflowEvent[HandoffAge
speaker = message.author_name or message.role
print(f"- {speaker}: {message.text or [content.type for content in message.contents]}")
print("===================================")
elif event.type == "request_info":
elif event.type == "request_info" and isinstance(event.data, HandoffAgentUserRequest):
# Request info event: Workflow is requesting user input
if isinstance(event.data, HandoffAgentUserRequest):
_print_handoff_agent_user_request(event.data.agent_response)
requests.append(cast(WorkflowEvent[HandoffAgentUserRequest], event))
_print_handoff_agent_user_request(event.data.agent_response)
requests.append(cast(WorkflowEvent[HandoffAgentUserRequest], event))
return requests
@@ -25,10 +25,9 @@ def add(
async def main():
client = OpenAIResponsesClient()
if client.function_invocation_configuration is not None:
client.function_invocation_configuration.include_detailed_errors = True
client.function_invocation_configuration.max_iterations = 40
print(f"Function invocation configured as: \n{client.function_invocation_configuration.to_json(indent=2)}")
client.function_invocation_configuration["include_detailed_errors"] = True
client.function_invocation_configuration["max_iterations"] = 40
print(f"Function invocation configured as: \n{client.function_invocation_configuration}")
agent = client.as_agent(name="ToolAgent", instructions="Use the provided tools.", tools=add)
@@ -1,5 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import FunctionTool
from agent_framework.openai import OpenAIResponsesClient
@@ -70,6 +72,5 @@ Result: {
if __name__ == "__main__":
import asyncio
asyncio.run(main())
@@ -3,7 +3,7 @@
import asyncio
from typing import Annotated
from agent_framework import FunctionCallContent, FunctionResultContent, tool
from agent_framework import tool
from agent_framework.openai import OpenAIResponsesClient
"""
@@ -21,7 +21,6 @@ def greet(name: Annotated[str, "Name to greet"]) -> str:
return f"Hello, {name}!"
@tool(approval_mode="never_require")
# we trick the AI into calling this function with 0 as denominator to trigger the exception
@tool(approval_mode="never_require")
def safe_divide(
@@ -62,11 +61,11 @@ async def main():
if msg.text:
print(f"{idx + 1} {msg.author_name or msg.role}: {msg.text} ")
for content in msg.contents:
if isinstance(content, FunctionCallContent):
if content.type == "function_call":
print(
f"{idx + 1} {msg.author_name}: calling function: {content.name} with arguments: {content.arguments}"
)
if isinstance(content, FunctionResultContent):
if content.type == "function_result":
print(f"{idx + 1} {msg.role}: {content.result if content.result else content.exception}")
@@ -3,7 +3,7 @@
import asyncio
from typing import Annotated
from agent_framework import FunctionCallContent, FunctionResultContent, tool
from agent_framework import tool
from agent_framework.openai import OpenAIResponsesClient
"""
@@ -55,11 +55,11 @@ async def main():
if msg.text:
print(f"{idx + 1} {msg.author_name or msg.role}: {msg.text} ")
for content in msg.contents:
if isinstance(content, FunctionCallContent):
if content.type == "function_call":
print(
f"{idx + 1} {msg.author_name}: calling function: {content.name} with arguments: {content.arguments}"
)
if isinstance(content, FunctionResultContent):
if content.type == "function_result":
print(f"{idx + 1} {msg.role}: {content.result if content.result else content.exception}")
@@ -3,7 +3,7 @@
import asyncio
from typing import Annotated
from agent_framework import FunctionCallContent, FunctionResultContent, tool
from agent_framework import tool
from agent_framework.openai import OpenAIResponsesClient
"""
@@ -44,11 +44,11 @@ async def main():
if msg.text:
print(f"{idx + 1} {msg.author_name or msg.role}: {msg.text} ")
for content in msg.contents:
if isinstance(content, FunctionCallContent):
if content.type == "function_call":
print(
f"{idx + 1} {msg.author_name}: calling function: {content.name} with arguments: {content.arguments}"
)
if isinstance(content, FunctionResultContent):
if content.type == "function_result":
print(f"{idx + 1} {msg.role}: {content.result if content.result else content.exception}")
@@ -14,7 +14,7 @@ The second reverses the text and yields the workflow output. Events are printed
Purpose:
Show how to declare executors with the @executor decorator, connect them with WorkflowBuilder,
pass intermediate values using ctx.send_message, and yield final output using ctx.yield_output().
Demonstrate how streaming exposes executor_invoked events (type='executor_invoked') and
Demonstrate how streaming exposes executor_invoked events (type='executor_invoked') and
executor_completed events (type='executor_completed') for observability.
Prerequisites:
@@ -28,7 +28,7 @@ Pipeline layout:
writer_agent -> Coordinator -> writer_agent -> Coordinator -> final_editor_agent -> Coordinator -> output
The writer agent drafts marketing copy. A custom executor emits a request_info event (type='request_info') so a
human can comment, then relays the human guidance back into the conversation before the final editor agent
human can comment, then relays the human guidance back into the conversation before the final editor agent
produces the polished output.
Demonstrates:
@@ -43,7 +43,9 @@ Prerequisites:
# 1. Define tools for different agents
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production; see samples/getting_started/tools/function_tool_with_approval.py and samples/getting_started/tools/function_tool_with_approval_and_threads.py.
# NOTE: approval_mode="never_require" is for sample brevity.
# Use "always_require" in production; see samples/getting_started/tools/function_tool_with_approval.py
# and samples/getting_started/tools/function_tool_with_approval_and_threads.py.
@tool(approval_mode="never_require")
def run_tests(test_suite: Annotated[str, "Name of the test suite to run"]) -> str:
"""Run automated tests for the application."""