mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
315 lines
14 KiB
Python
315 lines
14 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
import asyncio
|
|
import os
|
|
from typing import Annotated, cast
|
|
|
|
from agent_framework import (
|
|
Agent,
|
|
AgentResponse,
|
|
Message,
|
|
WorkflowEvent,
|
|
WorkflowRunState,
|
|
tool,
|
|
)
|
|
from agent_framework.foundry import FoundryChatClient
|
|
from agent_framework.orchestrations import HandoffAgentUserRequest, HandoffBuilder
|
|
from azure.identity import AzureCliCredential
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
"""Sample: Simple handoff workflow.
|
|
|
|
A handoff workflow defines a pattern that assembles agents in a mesh topology, allowing
|
|
them to transfer control to each other based on the conversation context.
|
|
|
|
Prerequisites:
|
|
- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
|
|
- FOUNDRY_MODEL must be set to your Azure OpenAI model deployment name.
|
|
- Authentication via azure-identity. Use AzureCliCredential and run `az login` before executing the sample.
|
|
|
|
Key Concepts:
|
|
- Auto-registered handoff tools: HandoffBuilder automatically creates handoff tools
|
|
for each participant, allowing the coordinator to transfer control to specialists
|
|
- Termination condition: Controls when the workflow stops requesting user input
|
|
- Request/response cycle: Workflow requests input, user responds, cycle continues
|
|
"""
|
|
|
|
|
|
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
|
|
# See:
|
|
# samples/02-agents/tools/function_tool_with_approval.py
|
|
# samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
|
|
@tool(approval_mode="never_require")
|
|
def process_refund(order_number: Annotated[str, "Order number to process refund for"]) -> str:
|
|
"""Simulated function to process a refund for a given order number."""
|
|
return f"Refund processed successfully for order {order_number}."
|
|
|
|
|
|
@tool(approval_mode="never_require")
|
|
def check_order_status(order_number: Annotated[str, "Order number to check status for"]) -> str:
|
|
"""Simulated function to check the status of a given order number."""
|
|
return f"Order {order_number} is currently being processed and will ship in 2 business days."
|
|
|
|
|
|
@tool(approval_mode="never_require")
|
|
def process_return(order_number: Annotated[str, "Order number to process return for"]) -> str:
|
|
"""Simulated function to process a return for a given order number."""
|
|
return f"Return initiated successfully for order {order_number}. You will receive return instructions via email."
|
|
|
|
|
|
def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent]:
|
|
"""Create and configure the triage and specialist agents.
|
|
|
|
Args:
|
|
client: The FoundryChatClient to use for creating agents.
|
|
|
|
Returns:
|
|
Tuple of (triage_agent, refund_agent, order_agent, return_agent)
|
|
"""
|
|
# Triage agent: Acts as the frontline dispatcher
|
|
triage_agent = Agent(
|
|
client=client,
|
|
instructions=(
|
|
"You are frontline support triage. Route customer issues to the appropriate specialist agents "
|
|
"based on the problem described."
|
|
),
|
|
name="triage_agent",
|
|
require_per_service_call_history_persistence=True,
|
|
)
|
|
|
|
# Refund specialist: Handles refund requests
|
|
refund_agent = Agent(
|
|
client=client,
|
|
instructions="You process refund requests.",
|
|
name="refund_agent",
|
|
# In a real application, an agent can have multiple tools; here we keep it simple
|
|
tools=[process_refund],
|
|
require_per_service_call_history_persistence=True,
|
|
)
|
|
|
|
# Order/shipping specialist: Resolves delivery issues
|
|
order_agent = Agent(
|
|
client=client,
|
|
instructions="You handle order and shipping inquiries.",
|
|
name="order_agent",
|
|
# In a real application, an agent can have multiple tools; here we keep it simple
|
|
tools=[check_order_status],
|
|
require_per_service_call_history_persistence=True,
|
|
)
|
|
|
|
# Return specialist: Handles return requests
|
|
return_agent = Agent(
|
|
client=client,
|
|
instructions="You manage product return requests.",
|
|
name="return_agent",
|
|
# In a real application, an agent can have multiple tools; here we keep it simple
|
|
tools=[process_return],
|
|
require_per_service_call_history_persistence=True,
|
|
)
|
|
|
|
return triage_agent, refund_agent, order_agent, return_agent
|
|
|
|
|
|
def _handle_events(events: list[WorkflowEvent]) -> list[WorkflowEvent[HandoffAgentUserRequest]]:
|
|
"""Process workflow events and extract any pending user input requests.
|
|
|
|
This function inspects each event type and:
|
|
- Prints workflow status changes (IDLE, IDLE_WITH_PENDING_REQUESTS, etc.)
|
|
- Displays final conversation snapshots when workflow completes
|
|
- Prints user input request prompts
|
|
- Collects all request_info events for response handling
|
|
|
|
Args:
|
|
events: List of WorkflowEvent to process
|
|
|
|
Returns:
|
|
List of WorkflowEvent[HandoffAgentUserRequest] representing pending user input requests
|
|
"""
|
|
requests: list[WorkflowEvent[HandoffAgentUserRequest]] = []
|
|
|
|
for event in events:
|
|
if event.type == "handoff_sent":
|
|
# handoff_sent event: Indicates a handoff has been initiated
|
|
print(f"\n[Handoff from {event.data.source} to {event.data.target} initiated.]")
|
|
elif event.type == "status" and event.state in {
|
|
WorkflowRunState.IDLE,
|
|
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
|
|
}:
|
|
# Status event: Indicates workflow state changes
|
|
print(f"\n[Workflow Status] {event.state}")
|
|
elif event.type == "output":
|
|
# Output event: Contains contents generated by the workflow
|
|
data = event.data
|
|
if isinstance(data, AgentResponse):
|
|
for message in data.messages:
|
|
if not message.text:
|
|
# Skip messages without text (e.g., tool calls)
|
|
continue
|
|
speaker = message.author_name or message.role
|
|
print(f"- {speaker}: {message.text}")
|
|
elif event.type == "output":
|
|
# The output of the handoff workflow is a collection of chat messages from all participants
|
|
conversation = cast(list[Message], event.data)
|
|
if isinstance(conversation, list):
|
|
print("\n=== Final Conversation Snapshot ===")
|
|
for message in conversation:
|
|
speaker = message.author_name or message.role
|
|
print(f"- {speaker}: {message.text or [content.type for content in message.contents]}")
|
|
print("===================================")
|
|
elif event.type == "request_info" and isinstance(event.data, HandoffAgentUserRequest):
|
|
_print_handoff_agent_user_request(event.data.agent_response)
|
|
requests.append(cast(WorkflowEvent[HandoffAgentUserRequest], event))
|
|
|
|
return requests
|
|
|
|
|
|
def _print_handoff_agent_user_request(response: AgentResponse) -> None:
|
|
"""Display the agent's response messages when requesting user input.
|
|
|
|
This will happen when an agent generates a response that doesn't trigger
|
|
a handoff, i.e., the agent is asking the user for more information.
|
|
|
|
Args:
|
|
response: The AgentResponse from the agent requesting user input
|
|
"""
|
|
if not response.messages:
|
|
raise RuntimeError("Cannot print agent responses: response has no messages.")
|
|
|
|
print("\n[Agent is requesting your input...]")
|
|
|
|
# Print agent responses
|
|
for message in response.messages:
|
|
if not message.text:
|
|
# Skip messages without text (e.g., tool calls)
|
|
continue
|
|
speaker = message.author_name or message.role
|
|
print(f"- {speaker}: {message.text}")
|
|
|
|
|
|
async def main() -> None:
|
|
"""Main entry point for the handoff workflow demo.
|
|
|
|
This function demonstrates:
|
|
1. Creating triage and specialist agents
|
|
2. Building a handoff workflow with custom termination condition
|
|
3. Running the workflow with scripted user responses
|
|
4. Processing events and handling user input requests
|
|
|
|
The workflow uses scripted responses instead of interactive input to make
|
|
the demo reproducible and testable. In a production application, you would
|
|
replace the scripted_responses with actual user input collection.
|
|
"""
|
|
# Initialize the Azure OpenAI Responses client
|
|
client = FoundryChatClient(
|
|
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
|
|
model=os.environ["FOUNDRY_MODEL"],
|
|
credential=AzureCliCredential(),
|
|
)
|
|
|
|
# Create all agents: triage + specialists
|
|
triage, refund, order, support = create_agents(client)
|
|
|
|
# Build the handoff workflow
|
|
# - participants: All agents that can participate in the workflow
|
|
# - with_start_agent: The triage agent is designated as the start agent, which means
|
|
# it receives all user input first and orchestrates handoffs to specialists
|
|
# - termination_condition: Custom logic to stop the request/response loop.
|
|
# Without this, the default behavior continues requesting user input until max_turns
|
|
# is reached. Here we use a custom condition that checks if the conversation has ended
|
|
# naturally (when one of the agents says something like "you're welcome").
|
|
workflow = (
|
|
HandoffBuilder(
|
|
name="customer_support_handoff",
|
|
participants=[triage, refund, order, support],
|
|
# Custom termination: Check if one of the agents has provided a closing message.
|
|
# This looks for the last message containing "welcome", which indicates the
|
|
# conversation has concluded naturally.
|
|
termination_condition=lambda conversation: (
|
|
len(conversation) > 0 and "welcome" in conversation[-1].text.lower()
|
|
),
|
|
)
|
|
.with_start_agent(triage)
|
|
.build()
|
|
)
|
|
|
|
# Scripted user responses for reproducible demo
|
|
# In a console application, replace this with:
|
|
# user_input = input("Your response: ")
|
|
# or integrate with a UI/chat interface
|
|
scripted_responses = [
|
|
"My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it.",
|
|
"Please also process a refund for order 1234.",
|
|
"Thanks for resolving this.",
|
|
]
|
|
|
|
# Start the workflow with the initial user message
|
|
# run(..., stream=True) returns an async iterator of WorkflowEvent
|
|
print("[Starting workflow with initial user message...]\n")
|
|
initial_message = "Hello, I need assistance with my recent purchase."
|
|
print(f"- User: {initial_message}")
|
|
workflow_result = workflow.run(initial_message, stream=True)
|
|
pending_requests = _handle_events([event async for event in workflow_result])
|
|
|
|
# Process the request/response cycle
|
|
# The workflow will continue requesting input until:
|
|
# 1. The termination condition is met, OR
|
|
# 2. We run out of scripted responses
|
|
while pending_requests:
|
|
if not scripted_responses:
|
|
# No more scripted responses; terminate the workflow
|
|
responses = {req.request_id: HandoffAgentUserRequest.terminate() for req in pending_requests}
|
|
else:
|
|
# Get the next scripted response
|
|
user_response = scripted_responses.pop(0)
|
|
print(f"\n- User: {user_response}")
|
|
|
|
# Send response(s) to all pending requests
|
|
# In this demo, there's typically one request per cycle, but the API supports multiple
|
|
responses = {
|
|
req.request_id: HandoffAgentUserRequest.create_response(user_response) for req in pending_requests
|
|
}
|
|
|
|
# Send responses and get new events
|
|
# We use run(responses=...) to get events from the workflow, allowing us to
|
|
# display agent responses and handle new requests as they arrive
|
|
events = await workflow.run(responses=responses)
|
|
pending_requests = _handle_events(events)
|
|
|
|
"""
|
|
Sample Output:
|
|
|
|
[Starting workflow with initial user message...]
|
|
|
|
- User: Hello, I need assistance with my recent purchase.
|
|
- triage_agent: Could you please provide more details about the issue you're experiencing with your recent purchase? This will help me route you to the appropriate specialist.
|
|
|
|
[Workflow Status] IDLE_WITH_PENDING_REQUESTS
|
|
|
|
- User: My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it.
|
|
- triage_agent: I've directed your request to our return agent, who will assist you with returning the damaged order. Thank you for your patience!
|
|
- return_agent: The return for your order 1234 has been successfully initiated. You will receive return instructions via email shortly. If you have any other questions or need further assistance, feel free to ask!
|
|
|
|
[Workflow Status] IDLE_WITH_PENDING_REQUESTS
|
|
|
|
- User: Thanks for resolving this.
|
|
|
|
=== Final Conversation Snapshot ===
|
|
- user: Hello, I need assistance with my recent purchase.
|
|
- triage_agent: Could you please provide more details about the issue you're experiencing with your recent purchase? This will help me route you to the appropriate specialist.
|
|
- user: My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it.
|
|
- triage_agent: I've directed your request to our return agent, who will assist you with returning the damaged order. Thank you for your patience!
|
|
- return_agent: The return for your order 1234 has been successfully initiated. You will receive return instructions via email shortly. If you have any other questions or need further assistance, feel free to ask!
|
|
- user: Thanks for resolving this.
|
|
- triage_agent: You're welcome! If you have any more questions or need assistance in the future, feel free to reach out. Have a great day!
|
|
===================================
|
|
|
|
[Workflow Status] IDLE
|
|
""" # noqa: E501
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|