Files
agent-framework/python/samples/autogen-migration/orchestrations/03_swarm.py
Eduard van Valkenburg 8091d052d8 Python: refresh dev dependencies and validate runtime bounds (#6238)
Updates third-party dev dependencies across the Python workspace and
validates that all runtime dependency bounds still hold at both ends.

Dev dependency bumps (root, lab, declarative, durabletask):
- uv 0.11.6 -> 0.11.17, ruff 0.15.8 -> 0.15.15,
  pytest-asyncio 1.3.0 -> 1.4.0, mcp 1.27.0 -> 1.27.2,
  azure-monitor-opentelemetry 1.8.7 -> 1.8.8,
  poethepoet 0.42.1 -> 0.46.0, prek 0.3.9 -> 0.4.3,
  types-python-dateutil and types-PyYaml stub bumps.
- Transitive Dependabot items swept via lock: idna 3.11 -> 3.17,
  pip 26.0.1 -> 26.1.2.

Deliberately excluded:
- opentelemetry-sdk stays 1.40.0: azure-monitor-opentelemetry (incl.
  1.8.8) hard-pins opentelemetry-sdk==1.40.
- mypy stays 1.20.0 and pyright stays 1.1.408: the 2.1.0 / 1.1.409
  bumps introduce new diagnostics that fail type checking and need
  dedicated PRs.
- rich kept as a range: agentlightning (lab[lightning]) forces
  rich==13.9.4.

Code/formatting changes driven by the ruff upgrade:
- devui lifespan now uses try/finally so shutdown cleanup always runs
  (ruff RUF075).
- Removed unused TYPE_CHECKING imports in core and foundry flagged by
  ruff 0.15.15.
- Reapplied ruff 0.15.15 formatting to the files it changed.

Validation: validate-dependency-bounds-test "*" passes (31/31 lower +
31/31 upper); typing 62/62; lint 31/31; devui tests pass.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-01 17:53:56 +00:00

252 lines
9.4 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import Agent, AgentResponseUpdate, WorkflowEvent
from dotenv import load_dotenv
"""AutoGen Swarm pattern vs Agent Framework HandoffBuilder.
Demonstrates agent handoff coordination where agents can transfer control
to other specialized agents based on the task requirements.
"""
# Load environment variables from .env file
load_dotenv()
async def run_autogen() -> None:
"""AutoGen's Swarm pattern with human-in-the-loop handoffs."""
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination
from autogen_agentchat.messages import HandoffMessage
from autogen_agentchat.teams import Swarm
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
client = OpenAIChatCompletionClient(model="gpt-4.1-mini")
# Create triage agent that routes to specialists
triage_agent = AssistantAgent(
name="triage",
model_client=client,
system_message=(
"You are a triage agent. Analyze the user's request and hand off to the appropriate specialist.\n"
"If you need information from the user, first send your message, then handoff to user.\n"
"Use TERMINATE when the issue is fully resolved."
),
handoffs=["billing_agent", "technical_support", "user"],
model_client_stream=True,
)
# Create billing specialist
billing_agent = AssistantAgent(
name="billing_agent",
model_client=client,
system_message=(
"You are a billing specialist. Help with payment and billing questions.\n"
"If you need information from the user, first send your message, then handoff to user.\n"
"When the issue is resolved, handoff to triage to finalize."
),
handoffs=["triage", "user"],
model_client_stream=True,
)
# Create technical support specialist
tech_support = AssistantAgent(
name="technical_support",
model_client=client,
system_message=(
"You are technical support. Help with technical issues.\n"
"If you need information from the user, first send your message, then handoff to user.\n"
"When the issue is resolved, handoff to triage to finalize."
),
handoffs=["triage", "user"],
model_client_stream=True,
)
# Create swarm team with human-in-the-loop termination
termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE")
team = Swarm(
participants=[triage_agent, billing_agent, tech_support],
termination_condition=termination,
)
# Scripted user responses for demonstration
scripted_responses = [
"I was charged twice for my subscription",
"Yes, the charge of $49.99 appears twice on my credit card statement.",
"Thank you for your help!",
]
response_index = 0
# Run with human-in-the-loop pattern
print("[AutoGen] Swarm handoff conversation:")
task_result = await Console(team.run_stream(task=scripted_responses[response_index]))
last_message = task_result.messages[-1]
response_index += 1
# Continue conversation when agents handoff to user
while (
isinstance(last_message, HandoffMessage)
and last_message.target == "user"
and response_index < len(scripted_responses)
):
user_message = scripted_responses[response_index]
task_result = await Console(
team.run_stream(task=HandoffMessage(source="user", target=last_message.source, content=user_message))
)
last_message = task_result.messages[-1]
response_index += 1
async def run_agent_framework() -> None:
"""Agent Framework's HandoffBuilder for agent coordination."""
from agent_framework import (
WorkflowRunState,
)
from agent_framework.openai import OpenAIChatClient
from agent_framework.orchestrations import HandoffAgentUserRequest, HandoffBuilder
client = OpenAIChatClient(model="gpt-4.1-mini")
# Create triage agent
triage_agent = Agent(
client=client,
name="triage",
instructions=(
"You are a triage agent. Analyze the user's request and route to the appropriate specialist:\n"
"- For billing issues: call handoff_to_billing_agent\n"
"- For technical issues: call handoff_to_technical_support"
),
description="Routes requests to appropriate specialists",
require_per_service_call_history_persistence=True,
)
# Create billing specialist
billing_agent = Agent(
client=client,
name="billing_agent",
instructions="You are a billing specialist. Help with payment and billing questions. Provide clear assistance.",
description="Handles billing and payment questions",
require_per_service_call_history_persistence=True,
)
# Create technical support specialist
tech_support = Agent(
client=client,
name="technical_support",
instructions="You are technical support. Help with technical issues. Provide clear assistance.",
description="Handles technical support questions",
require_per_service_call_history_persistence=True,
)
# Create handoff workflow - simpler configuration
# After specialists respond, control returns to user (via triage as coordinator)
workflow = (
HandoffBuilder(
name="support_handoff",
participants=[triage_agent, billing_agent, tech_support],
termination_condition=lambda conv: sum(1 for msg in conv if msg.role == "user") > 3,
)
.with_start_agent(triage_agent)
.add_handoff(triage_agent, [billing_agent, tech_support])
.build()
)
# Scripted user responses
scripted_responses = [
"I was charged twice for my subscription",
"Yes, the charge of $49.99 appears twice on my credit card statement.",
"Thank you for your help!",
]
# Run with initial message
print("[Agent Framework] Handoff conversation:")
print("---------- user ----------")
print(scripted_responses[0])
current_executor = None
stream_line_open = False
pending_requests: list[WorkflowEvent] = []
async for event in workflow.run(scripted_responses[0], stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
# Print executor name header when switching to a new agent
if current_executor != event.executor_id:
if stream_line_open:
print()
stream_line_open = False
print(f"---------- {event.executor_id} ----------")
current_executor = event.executor_id
stream_line_open = True
if event.data:
print(event.data.text, end="", flush=True)
elif event.type == "request_info":
if isinstance(event.data, HandoffAgentUserRequest):
pending_requests.append(event)
elif event.type == "status":
if event.state in {WorkflowRunState.IDLE_WITH_PENDING_REQUESTS} and stream_line_open:
print()
stream_line_open = False
# Process scripted responses
response_index = 1
while pending_requests and response_index < len(scripted_responses):
user_response = scripted_responses[response_index]
print("---------- user ----------")
print(user_response)
responses: dict[str, Any] = {
req.request_id: HandoffAgentUserRequest.create_response(user_response) for req in pending_requests
} # type: ignore
pending_requests = []
current_executor = None
stream_line_open = False
async for event in workflow.run(stream=True, responses=responses):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
# Print executor name header when switching to a new agent
if current_executor != event.executor_id:
if stream_line_open:
print()
stream_line_open = False
print(f"---------- {event.executor_id} ----------")
current_executor = event.executor_id
stream_line_open = True
if event.data:
print(event.data.text, end="", flush=True)
elif event.type == "request_info":
if isinstance(event.data, HandoffAgentUserRequest):
pending_requests.append(event)
elif event.type == "status":
if (
event.state in {WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, WorkflowRunState.IDLE}
and stream_line_open
):
print()
stream_line_open = False
response_index += 1
if stream_line_open:
print()
print() # Final newline after conversation
async def main() -> None:
print("=" * 60)
print("Swarm / Handoff Pattern Comparison")
print("=" * 60)
print("AutoGen: Swarm with handoffs")
print("Agent Framework: HandoffBuilder\n")
await run_autogen()
print()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())