mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
310 lines
11 KiB
Python
310 lines
11 KiB
Python
# /// script
|
|
# requires-python = ">=3.10"
|
|
# dependencies = [
|
|
# "semantic-kernel",
|
|
# ]
|
|
# ///
|
|
# Run with any PEP 723 compatible runner, e.g.:
|
|
# uv run samples/semantic-kernel-migration/orchestrations/handoff.py
|
|
|
|
# Copyright (c) Microsoft. All rights reserved.
|
|
"""Side-by-side handoff orchestrations for Semantic Kernel and Agent Framework."""
|
|
|
|
import asyncio
|
|
from collections.abc import AsyncIterable, Callable, Iterator, Sequence
|
|
|
|
from agent_framework import (
|
|
Agent,
|
|
Message,
|
|
WorkflowEvent,
|
|
)
|
|
from agent_framework.openai import OpenAIChatCompletionClient
|
|
from agent_framework.orchestrations import HandoffAgentUserRequest, HandoffBuilder
|
|
from azure.identity import AzureCliCredential
|
|
from dotenv import load_dotenv
|
|
from semantic_kernel.agents import Agent as SKAgent
|
|
from semantic_kernel.agents import ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs
|
|
from semantic_kernel.agents.runtime import InProcessRuntime
|
|
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
|
|
from semantic_kernel.contents import (
|
|
AuthorRole,
|
|
ChatMessageContent,
|
|
FunctionCallContent,
|
|
FunctionResultContent,
|
|
StreamingChatMessageContent,
|
|
)
|
|
from semantic_kernel.functions import kernel_function
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
CUSTOMER_PROMPT = "I need help with order 12345. I want a replacement and need to know when it will arrive."
|
|
SCRIPTED_RESPONSES = [
|
|
"The item arrived damaged. I'd like a replacement shipped to the same address.",
|
|
"Great! Can you confirm the shipping cost won't be charged again?",
|
|
"Thanks for confirming!",
|
|
]
|
|
|
|
|
|
######################################################################
|
|
# Semantic Kernel orchestration path
|
|
######################################################################
|
|
|
|
|
|
class OrderStatusPlugin:
|
|
@kernel_function
|
|
def check_order_status(self, order_id: str) -> str:
|
|
return f"Order {order_id} is shipped and will arrive in 2-3 days."
|
|
|
|
|
|
class OrderRefundPlugin:
|
|
@kernel_function
|
|
def process_refund(self, order_id: str, reason: str) -> str:
|
|
return f"Refund for order {order_id} has been processed successfully (reason: {reason})."
|
|
|
|
|
|
class OrderReturnPlugin:
|
|
@kernel_function
|
|
def process_return(self, order_id: str, reason: str) -> str:
|
|
return f"Return for order {order_id} has been processed successfully (reason: {reason})."
|
|
|
|
|
|
def build_semantic_kernel_agents() -> tuple[list[SKAgent], OrchestrationHandoffs]:
|
|
credential = AzureCliCredential()
|
|
|
|
triage = ChatCompletionAgent(
|
|
name="TriageAgent",
|
|
description="Customer support triage specialist.",
|
|
instructions="Greet the customer, collect intent, and hand off to the right specialist.",
|
|
service=AzureChatCompletion(credential=credential),
|
|
)
|
|
refund = ChatCompletionAgent(
|
|
name="RefundAgent",
|
|
description="Handles refunds.",
|
|
instructions="Process refund requests.",
|
|
service=AzureChatCompletion(credential=credential),
|
|
plugins=[OrderRefundPlugin()],
|
|
)
|
|
order_status = ChatCompletionAgent(
|
|
name="OrderStatusAgent",
|
|
description="Looks up order status.",
|
|
instructions="Provide shipping timelines and tracking information.",
|
|
service=AzureChatCompletion(credential=credential),
|
|
plugins=[OrderStatusPlugin()],
|
|
)
|
|
order_return = ChatCompletionAgent(
|
|
name="OrderReturnAgent",
|
|
description="Handles returns.",
|
|
instructions="Coordinate order returns.",
|
|
service=AzureChatCompletion(credential=credential),
|
|
plugins=[OrderReturnPlugin()],
|
|
)
|
|
|
|
handoffs = (
|
|
OrchestrationHandoffs()
|
|
.add_many(
|
|
source_agent=triage.name,
|
|
target_agents={
|
|
refund.name: "Route refund-related requests here.",
|
|
order_status.name: "Route shipping questions here.",
|
|
order_return.name: "Route return-related requests here.",
|
|
},
|
|
)
|
|
.add(refund.name, triage.name, "Return to triage for non-refund issues.")
|
|
.add(order_status.name, triage.name, "Return to triage for non-status issues.")
|
|
.add(order_return.name, triage.name, "Return to triage for non-return issues.")
|
|
)
|
|
|
|
return [triage, refund, order_status, order_return], handoffs
|
|
|
|
|
|
_sk_new_message = True
|
|
|
|
|
|
def _sk_streaming_callback(message: StreamingChatMessageContent, is_final: bool) -> None:
|
|
"""Display SK agent messages as they stream."""
|
|
|
|
global _sk_new_message
|
|
if _sk_new_message:
|
|
print(f"{message.name}: ", end="", flush=True)
|
|
_sk_new_message = False
|
|
|
|
if message.content:
|
|
print(message.content, end="", flush=True)
|
|
|
|
for item in message.items:
|
|
if isinstance(item, FunctionCallContent):
|
|
print(f"[tool call: {item.name}({item.arguments})]", end="", flush=True)
|
|
if isinstance(item, FunctionResultContent):
|
|
print(f"[tool result: {item.result}]", end="", flush=True)
|
|
|
|
if is_final:
|
|
print()
|
|
_sk_new_message = True
|
|
|
|
|
|
def _make_sk_human_responder(script: Iterator[str]) -> Callable[[], ChatMessageContent]:
|
|
def _responder() -> ChatMessageContent:
|
|
try:
|
|
user_text = next(script)
|
|
except StopIteration:
|
|
user_text = "Thanks, that's all."
|
|
print(f"[User]: {user_text}")
|
|
return ChatMessageContent(role=AuthorRole.USER, content=user_text)
|
|
|
|
return _responder
|
|
|
|
|
|
async def run_semantic_kernel_example(initial_task: str, scripted_responses: Sequence[str]) -> str:
|
|
agents, handoffs = build_semantic_kernel_agents()
|
|
response_iter = iter(scripted_responses)
|
|
|
|
orchestration = HandoffOrchestration(
|
|
members=agents,
|
|
handoffs=handoffs,
|
|
streaming_agent_response_callback=_sk_streaming_callback,
|
|
human_response_function=_make_sk_human_responder(response_iter),
|
|
)
|
|
|
|
runtime = InProcessRuntime()
|
|
runtime.start()
|
|
|
|
try:
|
|
orchestration_result = await orchestration.invoke(task=initial_task, runtime=runtime)
|
|
final_message = await orchestration_result.get(timeout=30)
|
|
if isinstance(final_message, ChatMessageContent):
|
|
return final_message.content or ""
|
|
return str(final_message)
|
|
finally:
|
|
await runtime.stop_when_idle()
|
|
|
|
|
|
######################################################################
|
|
# Agent Framework orchestration path
|
|
######################################################################
|
|
|
|
|
|
def _create_af_agents(client: OpenAIChatCompletionClient):
|
|
triage = Agent(
|
|
client=client,
|
|
name="triage_agent",
|
|
instructions=(
|
|
"You are a customer support triage agent. Route requests:\n"
|
|
"- handoff_to_refund_agent for refunds\n"
|
|
"- handoff_to_order_status_agent for shipping/timeline questions\n"
|
|
"- handoff_to_order_return_agent for returns"
|
|
),
|
|
require_per_service_call_history_persistence=True,
|
|
)
|
|
refund = Agent(
|
|
client=client,
|
|
name="refund_agent",
|
|
instructions=(
|
|
"Handle refunds. Ask for order id and reason. If shipping info is needed, hand off to order_status_agent."
|
|
),
|
|
require_per_service_call_history_persistence=True,
|
|
)
|
|
status = Agent(
|
|
client=client,
|
|
name="order_status_agent",
|
|
instructions=(
|
|
"Provide order status, tracking, and timelines. If billing questions appear, hand off to refund_agent."
|
|
),
|
|
require_per_service_call_history_persistence=True,
|
|
)
|
|
returns = Agent(
|
|
client=client,
|
|
name="order_return_agent",
|
|
instructions=(
|
|
"Coordinate returns, confirm addresses, and summarize next steps. Hand off to triage_agent if unsure."
|
|
),
|
|
require_per_service_call_history_persistence=True,
|
|
)
|
|
return triage, refund, status, returns
|
|
|
|
|
|
async def _drain_events(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]:
|
|
return [event async for event in stream]
|
|
|
|
|
|
def _collect_handoff_requests(events: list[WorkflowEvent]) -> list[WorkflowEvent]:
|
|
requests: list[WorkflowEvent] = []
|
|
for event in events:
|
|
if event.type == "request_info" and isinstance(event.data, HandoffAgentUserRequest):
|
|
requests.append(event)
|
|
return requests
|
|
|
|
|
|
def _extract_final_conversation(events: list[WorkflowEvent]) -> list[Message]:
|
|
for event in events:
|
|
if event.type == "output":
|
|
return event.data
|
|
return []
|
|
|
|
|
|
async def run_agent_framework_example(initial_task: str, scripted_responses: Sequence[str]) -> str:
|
|
client = OpenAIChatCompletionClient(credential=AzureCliCredential())
|
|
triage, refund, status, returns = _create_af_agents(client)
|
|
|
|
workflow = (
|
|
HandoffBuilder(
|
|
name="sk_af_handoff_migration",
|
|
participants=[triage, refund, status, returns],
|
|
termination_condition=lambda conv: sum(1 for m in conv if m.role == "user") >= 4,
|
|
)
|
|
.with_start_agent(triage)
|
|
.add_handoff(triage, [refund, status, returns])
|
|
.add_handoff(refund, [status, triage])
|
|
.add_handoff(status, [refund, triage])
|
|
.add_handoff(returns, [triage])
|
|
.build()
|
|
)
|
|
|
|
events = await _drain_events(workflow.run(initial_task, stream=True))
|
|
pending = _collect_handoff_requests(events)
|
|
scripted_iter = iter(scripted_responses)
|
|
|
|
final_events = events
|
|
while pending:
|
|
try:
|
|
user_reply = next(scripted_iter)
|
|
except StopIteration:
|
|
user_reply = "Thanks, that's all."
|
|
responses = {request.request_id: [Message(role="user", contents=[user_reply])] for request in pending}
|
|
final_events = await _drain_events(workflow.run(stream=True, responses=responses))
|
|
pending = _collect_handoff_requests(final_events)
|
|
|
|
conversation = _extract_final_conversation(final_events)
|
|
if not conversation:
|
|
return ""
|
|
|
|
# Render final transcript succinctly.
|
|
lines: list[str] = []
|
|
for message in conversation:
|
|
text = message.text or ""
|
|
if not text.strip():
|
|
continue
|
|
speaker = message.author_name or message.role
|
|
lines.append(f"{speaker}: {text}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
######################################################################
|
|
# Console entry point
|
|
######################################################################
|
|
|
|
|
|
async def main() -> None:
|
|
print("===== Agent Framework Handoff =====")
|
|
af_transcript = await run_agent_framework_example(CUSTOMER_PROMPT, SCRIPTED_RESPONSES)
|
|
print(af_transcript or "No output produced.")
|
|
print()
|
|
|
|
print("===== Semantic Kernel Handoff =====")
|
|
sk_result = await run_semantic_kernel_example(CUSTOMER_PROMPT, SCRIPTED_RESPONSES)
|
|
print(sk_result or "No output produced.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|