Python: Remove duplicate samples (#3899)

* Remove duplicate samples

* Correct paths

* Update readme

* Update readme

* Fix ruff

---------

Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com>
This commit is contained in:
Tao Chen
2026-02-12 15:46:41 -08:00
committed by GitHub
Unverified
parent 1441fd903c
commit e064f943ae
25 changed files with 173 additions and 2772 deletions
+1 -1
View File
@@ -233,7 +233,7 @@ if __name__ == "__main__":
asyncio.run(main())
```
For more advanced orchestration patterns including Sequential, Concurrent, Group Chat, Handoff, and Magentic orchestrations, see the [orchestration samples](samples/02-agents/orchestrations).
For more advanced orchestration patterns including Sequential, Concurrent, Group Chat, Handoff, and Magentic orchestrations, see the [orchestration samples](samples/03-workflows/orchestrations).
## More Examples & Samples
+1 -1
View File
@@ -213,7 +213,7 @@ if __name__ == "__main__":
asyncio.run(main())
```
**Note**: Sequential, Concurrent, Group Chat, Handoff, and Magentic orchestrations are available. See examples in [orchestration samples](../../samples/02-agents/orchestrations).
**Note**: Sequential, Concurrent, Group Chat, Handoff, and Magentic orchestrations are available. See examples in [orchestration samples](../../samples/03-workflows/orchestrations).
## More Examples & Samples
File diff suppressed because one or more lines are too long
@@ -30,7 +30,7 @@ export const SAMPLE_ENTITIES: SampleEntity[] = [
description:
"Weather agent using Azure AI Agent (Foundry) with Azure CLI authentication",
type: "agent",
url: "https://raw.githubusercontent.com/microsoft/agent-framework/main/python/samples/getting_started/devui/foundry_agent/agent.py",
url: "https://raw.githubusercontent.com/microsoft/agent-framework/main/python/samples/02-agents/devui/foundry_agent/agent.py",
tags: ["azure-ai", "foundry", "tools"],
author: "Microsoft",
difficulty: "beginner",
@@ -61,7 +61,7 @@ export const SAMPLE_ENTITIES: SampleEntity[] = [
description:
"Weather agent using Azure OpenAI with API key authentication",
type: "agent",
url: "https://raw.githubusercontent.com/microsoft/agent-framework/main/python/samples/getting_started/devui/weather_agent_azure/agent.py",
url: "https://raw.githubusercontent.com/microsoft/agent-framework/main/python/samples/02-agents/devui/weather_agent_azure/agent.py",
tags: ["azure", "openai", "tools"],
author: "Microsoft",
difficulty: "beginner",
@@ -99,7 +99,7 @@ export const SAMPLE_ENTITIES: SampleEntity[] = [
description:
"5-step workflow demonstrating email spam detection with branching logic",
type: "workflow",
url: "https://raw.githubusercontent.com/microsoft/agent-framework/main/python/samples/getting_started/devui/spam_workflow/workflow.py",
url: "https://raw.githubusercontent.com/microsoft/agent-framework/main/python/samples/02-agents/devui/spam_workflow/workflow.py",
tags: ["workflow", "branching", "multi-step"],
author: "Microsoft",
difficulty: "beginner",
@@ -117,7 +117,7 @@ export const SAMPLE_ENTITIES: SampleEntity[] = [
description:
"Advanced data processing workflow with parallel validation, transformation, and quality assurance stages",
type: "workflow",
url: "https://raw.githubusercontent.com/microsoft/agent-framework/main/python/samples/getting_started/devui/fanout_workflow/workflow.py",
url: "https://raw.githubusercontent.com/microsoft/agent-framework/main/python/samples/02-agents/devui/fanout_workflow/workflow.py",
tags: ["workflow", "fan-out", "fan-in", "parallel"],
author: "Microsoft",
difficulty: "advanced",
@@ -186,7 +186,7 @@ def gaia_scorer(model_answer: str, ground_truth: str) -> bool:
if is_float(ground_truth):
# numeric exact match after normalization
return _normalize_number_str(model_answer) == float(ground_truth)
return abs(_normalize_number_str(model_answer) - float(ground_truth)) < 1e-6
if any(ch in ground_truth for ch in [",", ";"]):
# list with per-element compare (number or string)
gt_elems = _split_string(ground_truth)
@@ -196,7 +196,7 @@ def gaia_scorer(model_answer: str, ground_truth: str) -> bool:
comparisons = []
for ma, gt in zip(ma_elems, gt_elems, strict=False):
if is_float(gt):
comparisons.append(_normalize_number_str(ma) == float(gt))
comparisons.append(abs(_normalize_number_str(ma) - float(gt)) < 1e-6)
else:
comparisons.append(_normalize_str(ma, remove_punct=False) == _normalize_str(gt, remove_punct=False))
return all(comparisons)
@@ -17,7 +17,7 @@ with the following configuration:
"agent-framework": {
"command": "uv",
"args": [
"--directory=<path to project>/agent-framework/python/samples/getting_started/mcp",
"--directory=<path to project>/agent-framework/python/samples/02-agents/mcp",
"run",
"agent_as_mcp_server.py"
],
@@ -1,67 +0,0 @@
# Orchestration Getting Started Samples
## Installation
The orchestrations package is included when you install `agent-framework` (which pulls in all optional packages):
```bash
pip install agent-framework
```
Or install the orchestrations package directly:
```bash
pip install agent-framework-orchestrations
```
Orchestration builders are available via the `agent_framework.orchestrations` submodule:
```python
from agent_framework.orchestrations import (
SequentialBuilder,
ConcurrentBuilder,
HandoffBuilder,
GroupChatBuilder,
MagenticBuilder,
)
```
## Samples Overview
| Sample | File | Concepts |
| ------------------------------------------------- | ------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------- |
| Concurrent Orchestration (Default Aggregator) | [concurrent_agents.py](./concurrent_agents.py) | Fan-out to multiple agents; fan-in with default aggregator returning combined Messages |
| Concurrent Orchestration (Custom Aggregator) | [concurrent_custom_aggregator.py](./concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM |
| Concurrent Orchestration (Custom Agent Executors) | [concurrent_custom_agent_executors.py](./concurrent_custom_agent_executors.py) | Child executors own Agents; concurrent fan-out/fan-in via ConcurrentBuilder |
| Group Chat with Agent Manager | [group_chat_agent_manager.py](./group_chat_agent_manager.py) | Agent-based manager using `with_orchestrator(agent=)` to select next speaker |
| Group Chat Philosophical Debate | [group_chat_philosophical_debate.py](./group_chat_philosophical_debate.py) | Agent manager moderates long-form, multi-round debate across diverse participants |
| Group Chat with Simple Function Selector | [group_chat_simple_selector.py](./group_chat_simple_selector.py) | Group chat with a simple function selector for next speaker |
| Handoff (Simple) | [handoff_simple.py](./handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response |
| Handoff (Autonomous) | [handoff_autonomous.py](./handoff_autonomous.py) | Autonomous mode: specialists iterate independently until invoking a handoff tool using `.with_autonomous_mode()` |
| Handoff with Code Interpreter | [handoff_with_code_interpreter_file.py](./handoff_with_code_interpreter_file.py) | Retrieve file IDs from code interpreter output in handoff workflow |
| Magentic Workflow (Multi-Agent) | [magentic.py](./magentic.py) | Orchestrate multiple agents with Magentic manager and streaming |
| Magentic + Human Plan Review | [magentic_human_plan_review.py](./magentic_human_plan_review.py) | Human reviews/updates the plan before execution |
| Magentic + Checkpoint Resume | [magentic_checkpoint.py](./magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints |
| Sequential Orchestration (Agents) | [sequential_agents.py](./sequential_agents.py) | Chain agents sequentially with shared conversation context |
| Sequential Orchestration (Custom Executor) | [sequential_custom_executors.py](./sequential_custom_executors.py) | Mix agents with a summarizer that appends a compact summary |
## Tips
**Magentic checkpointing tip**: Treat `MagenticBuilder.participants` keys as stable identifiers. When resuming from a checkpoint, the rebuilt workflow must reuse the same participant names; otherwise the checkpoint cannot be applied and the run will fail fast.
**Handoff workflow tip**: Handoff workflows maintain the full conversation history including any `Message.additional_properties` emitted by your agents. This ensures routing metadata remains intact across all agent transitions. For specialist-to-specialist handoffs, use `.add_handoff(source, targets)` to configure which agents can route to which others with a fluent, type-safe API.
**Sequential orchestration note**: Sequential orchestration uses a few small adapter nodes for plumbing:
- `input-conversation` normalizes input to `list[Message]`
- `to-conversation:<participant>` converts agent responses into the shared conversation
- `complete` publishes the final output event (type='output')
These may appear in event streams (executor_invoked/executor_completed). They're analogous to concurrent's dispatcher and aggregator and can be ignored if you only care about agent activity.
## Environment Variables
- **AzureOpenAIChatClient**: Set Azure OpenAI environment variables as documented [here](https://github.com/microsoft/agent-framework/blob/main/python/samples/02-agents/chat_client/README.md#environment-variables).
- **OpenAI** (used in some orchestration samples):
- [OpenAIChatClient env vars](https://github.com/microsoft/agent-framework/blob/main/python/samples/02-agents/providers/openai/README.md)
- [OpenAIResponsesClient env vars](https://github.com/microsoft/agent-framework/blob/main/python/samples/02-agents/providers/openai/README.md)
@@ -1,130 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import Message
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import ConcurrentBuilder
from azure.identity import AzureCliCredential
"""
Sample: Concurrent fan-out/fan-in (agent-only API) with default aggregator
Build a high-level concurrent workflow using ConcurrentBuilder and three domain agents.
The default dispatcher fans out the same user prompt to all agents in parallel.
The default aggregator fans in their results and yields output containing
a list[Message] representing the concatenated conversations from all agents.
Demonstrates:
- Minimal wiring with ConcurrentBuilder(participants=[...]).build()
- Fan-out to multiple agents, fan-in aggregation of final ChatMessages
- Workflow completion when idle with no pending work
Prerequisites:
- Azure OpenAI access configured for AzureOpenAIChatClient (use az login + env vars)
- Familiarity with Workflow events (WorkflowEvent)
"""
async def main() -> None:
# 1) Create three domain agents using AzureOpenAIChatClient
client = AzureOpenAIChatClient(credential=AzureCliCredential())
researcher = client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = client.as_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
# 2) Build a concurrent workflow
# Participants are either Agents (type of SupportsAgentRun) or Executors
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
# 3) Run with a single prompt and pretty-print the final combined messages
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()
if outputs:
print("===== Final Aggregated Conversation (messages) =====")
for output in outputs:
messages: list[Message] | Any = output
for i, msg in enumerate(messages, start=1):
name = msg.author_name if msg.author_name else "user"
print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")
"""
Sample Output:
===== Final Aggregated Conversation (messages) =====
------------------------------------------------------------
01 [user]:
We are launching a new budget-friendly electric bike for urban commuters.
------------------------------------------------------------
02 [researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
- **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon,
Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia.
- **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection,
lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles),
and low-maintenance components.
**Opportunities:**
- **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of
operation, and cost savings vs. public transit/car ownership.
...
------------------------------------------------------------
03 [marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
**Target Messaging:**
*For Young Professionals:*
...
------------------------------------------------------------
04 [legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
- Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained.
**2. Product Safety**
- Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions.
...
""" # noqa: E501
if __name__ == "__main__":
asyncio.run(main())
@@ -1,174 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import (
Agent,
AgentExecutorRequest,
AgentExecutorResponse,
Executor,
Message,
WorkflowContext,
handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import ConcurrentBuilder
from azure.identity import AzureCliCredential
"""
Sample: Concurrent Orchestration with Custom Agent Executors
This sample shows a concurrent fan-out/fan-in pattern using child Executor classes
that each own their Agent. The executors accept AgentExecutorRequest inputs
and emit AgentExecutorResponse outputs, which allows reuse of the high-level
ConcurrentBuilder API and the default aggregator.
Demonstrates:
- Executors that create their Agent in __init__ (via AzureOpenAIChatClient)
- A @handler that converts AgentExecutorRequest -> AgentExecutorResponse
- ConcurrentBuilder(participants=[...]) to build fan-out/fan-in
- Default aggregator returning list[Message] (one user + one assistant per agent)
- Workflow completion when all participants become idle
Prerequisites:
- Azure OpenAI configured for AzureOpenAIChatClient (az login + required env vars)
"""
class ResearcherExec(Executor):
agent: Agent
def __init__(self, client: AzureOpenAIChatClient, id: str = "researcher"):
self.agent = client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class MarketerExec(Executor):
agent: Agent
def __init__(self, client: AzureOpenAIChatClient, id: str = "marketer"):
self.agent = client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class LegalExec(Executor):
agent: Agent
def __init__(self, client: AzureOpenAIChatClient, id: str = "legal"):
self.agent = client.as_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
async def main() -> None:
client = AzureOpenAIChatClient(credential=AzureCliCredential())
researcher = ResearcherExec(client)
marketer = MarketerExec(client)
legal = LegalExec(client)
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()
if outputs:
print("===== Final Aggregated Conversation (messages) =====")
messages: list[Message] | Any = outputs[0] # Get the first (and typically only) output
for i, msg in enumerate(messages, start=1):
name = msg.author_name if msg.author_name else "user"
print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")
"""
Sample Output:
===== Final Aggregated Conversation (messages) =====
------------------------------------------------------------
01 [user]:
We are launching a new budget-friendly electric bike for urban commuters.
------------------------------------------------------------
02 [researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
- **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon,
Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia.
- **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection,
lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles),
and low-maintenance components.
**Opportunities:**
- **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of
operation, and cost savings vs. public transit/car ownership.
...
------------------------------------------------------------
03 [marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
**Target Messaging:**
*For Young Professionals:*
...
------------------------------------------------------------
04 [legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
- Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained.
**2. Product Safety**
- Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions.
...
""" # noqa: E501
if __name__ == "__main__":
asyncio.run(main())
@@ -1,124 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import Message
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import ConcurrentBuilder
from azure.identity import AzureCliCredential
"""
Sample: Concurrent Orchestration with Custom Aggregator
Build a concurrent workflow with ConcurrentBuilder that fans out one prompt to
multiple domain agents and fans in their responses. Override the default
aggregator with a custom async callback that uses AzureOpenAIChatClient.get_response()
to synthesize a concise, consolidated summary from the experts' outputs.
The workflow completes when all participants become idle.
Demonstrates:
- ConcurrentBuilder(participants=[...]).with_aggregator(callback)
- Fan-out to agents and fan-in at an aggregator
- Aggregation implemented via an LLM call (client.get_response)
- Workflow output yielded with the synthesized summary string
Prerequisites:
- Azure OpenAI configured for AzureOpenAIChatClient (az login + required env vars)
"""
async def main() -> None:
client = AzureOpenAIChatClient(credential=AzureCliCredential())
researcher = client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = client.as_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
# Define a custom aggregator callback that uses the chat client to summarize
async def summarize_results(results: list[Any]) -> str:
# Extract one final assistant message per agent
expert_sections: list[str] = []
for r in results:
try:
messages = getattr(r.agent_response, "messages", [])
final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}:\n{final_text}")
except Exception as e:
expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}: (error: {type(e).__name__}: {e})")
# Ask the model to synthesize a concise summary of the experts' outputs
system_msg = Message(
"system",
text=(
"You are a helpful assistant that consolidates multiple domain expert outputs "
"into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
),
)
user_msg = Message("user", text="\n\n".join(expert_sections))
response = await client.get_response([system_msg, user_msg])
# Return the model's final assistant text as the completion result
return response.messages[-1].text if response.messages else ""
# Build with a custom aggregator callback function
# - participants([...]) accepts SupportsAgentRun (agents) or Executor instances.
# Each participant becomes a parallel branch (fan-out) from an internal dispatcher.
# - with_aggregator(...) overrides the default aggregator:
# • Default aggregator -> returns list[Message] (one user + one assistant per agent)
# • Custom callback -> return value becomes workflow output (string here)
# The callback can be sync or async; it receives list[AgentExecutorResponse].
workflow = (
ConcurrentBuilder(participants=[researcher, marketer, legal]).with_aggregator(summarize_results).build()
)
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()
if outputs:
print("===== Final Consolidated Output =====")
print(outputs[0]) # Get the first (and typically only) output
"""
Sample Output:
===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.
Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.
Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: “Charge Ahead—City Commutes Made
Affordable.” Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -1,114 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import cast
from agent_framework import (
Agent,
AgentResponseUpdate,
Message,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import GroupChatBuilder
from azure.identity import AzureCliCredential
"""
Sample: Group Chat with Agent-Based Manager
What it does:
- Demonstrates the new set_manager() API for agent-based coordination
- Manager is a full Agent with access to tools, context, and observability
- Coordinates a researcher and writer agent to solve tasks collaboratively
Prerequisites:
- OpenAI environment variables configured for OpenAIChatClient
"""
ORCHESTRATOR_AGENT_INSTRUCTIONS = """
You coordinate a team conversation to solve the user's task.
Guidelines:
- Start with Researcher to gather information
- Then have Writer synthesize the final answer
- Only finish after both have contributed meaningfully
"""
async def main() -> None:
# Create a chat client using Azure OpenAI and Azure CLI credentials for all agents
client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Orchestrator agent that manages the conversation
# Note: This agent (and the underlying chat client) must support structured outputs.
# The group chat workflow relies on this to parse the orchestrator's decisions.
# `response_format` is set internally by the GroupChat workflow when the agent is invoked.
orchestrator_agent = Agent(
name="Orchestrator",
description="Coordinates multi-agent collaboration by selecting speakers",
instructions=ORCHESTRATOR_AGENT_INSTRUCTIONS,
client=client,
)
# Participant agents
researcher = Agent(
name="Researcher",
description="Collects relevant background information",
instructions="Gather concise facts that help a teammate answer the question.",
client=client,
)
writer = Agent(
name="Writer",
description="Synthesizes polished answers from gathered information",
instructions="Compose clear and structured answers using any notes provided.",
client=client,
)
# Build the group chat workflow
# termination_condition: stop after 4 assistant messages
# (The agent orchestrator will intelligently decide when to end before this limit but just in case)
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = (
GroupChatBuilder(
participants=[researcher, writer],
termination_condition=lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 4,
intermediate_outputs=True,
orchestrator_agent=orchestrator_agent,
)
# Set a hard termination condition: stop after 4 assistant messages
# The agent orchestrator will intelligently decide when to end before this limit but just in case
.with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 4)
.build()
)
task = "What are the key benefits of using async/await in Python? Provide a concise summary."
print("\nStarting Group Chat with Agent-Based Manager...\n")
print(f"TASK: {task}\n")
print("=" * 80)
# Keep track of the last response to format output nicely in streaming mode
last_response_id: str | None = None
async for event in workflow.run(task, stream=True):
if event.type == "output":
data = event.data
if isinstance(data, AgentResponseUpdate):
rid = data.response_id
if rid != last_response_id:
if last_response_id is not None:
print("\n")
print(f"{data.author_name}:", end=" ", flush=True)
last_response_id = rid
print(data.text, end="", flush=True)
elif event.type == "output":
# The output of the group chat workflow is a collection of chat messages from all participants
outputs = cast(list[Message], event.data)
print("\n" + "=" * 80)
print("\nFinal Conversation Transcript:\n")
for message in outputs:
print(f"{message.author_name or message.role}: {message.text}\n")
if __name__ == "__main__":
asyncio.run(main())
@@ -1,364 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import logging
from typing import cast
from agent_framework import (
Agent,
AgentResponseUpdate,
Message,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import GroupChatBuilder
from azure.identity import AzureCliCredential
logging.basicConfig(level=logging.WARNING)
"""
Sample: Philosophical Debate with Agent-Based Manager
What it does:
- Creates a diverse group of agents representing different global perspectives
- Uses an agent-based manager to guide a philosophical discussion
- Demonstrates longer, multi-round discourse with natural conversation flow
- Manager decides when discussion has reached meaningful conclusion
Topic: "What does a good life mean to you personally?"
Participants represent:
- Farmer from Southeast Asia (tradition, sustainability, land connection)
- Software Developer from United States (innovation, technology, work-life balance)
- History Teacher from Eastern Europe (legacy, learning, cultural continuity)
- Activist from South America (social justice, environmental rights)
- Spiritual Leader from Middle East (morality, community service)
- Artist from Africa (creative expression, storytelling)
- Immigrant Entrepreneur from Asia in Canada (tradition + adaptation)
- Doctor from Scandinavia (public health, equity, societal support)
Prerequisites:
- OpenAI environment variables configured for OpenAIChatClient
"""
def _get_chat_client() -> AzureOpenAIChatClient:
return AzureOpenAIChatClient(credential=AzureCliCredential())
async def main() -> None:
# Create debate moderator with structured output for speaker selection
# Note: Participant names and descriptions are automatically injected by the orchestrator
moderator = Agent(
name="Moderator",
description="Guides philosophical discussion by selecting next speaker",
instructions="""
You are a thoughtful moderator guiding a philosophical discussion on the topic handed to you by the user.
Your participants bring diverse global perspectives. Select speakers strategically to:
- Create natural conversation flow and responses to previous points
- Ensure all voices are heard throughout the discussion
- Build on themes and contrasts that emerge
- Allow for respectful challenges and counterpoints
- Guide toward meaningful conclusions
Select speakers who can:
1. Respond directly to points just made
2. Introduce fresh perspectives when needed
3. Bridge or contrast different viewpoints
4. Deepen the philosophical exploration
Finish when:
- Multiple rounds have occurred (at least 6-8 exchanges)
- Key themes have been explored from different angles
- Natural conclusion or synthesis has emerged
- Diminishing returns in new insights
In your final_message, provide a brief synthesis highlighting key themes that emerged.
""",
client=_get_chat_client(),
)
farmer = Agent(
name="Farmer",
description="A rural farmer from Southeast Asia",
instructions="""
You're a farmer from Southeast Asia. Your life is deeply connected to land and family.
You value tradition and sustainability. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use concrete examples from your experience
- Keep responses thoughtful but concise (2-4 sentences)
""",
client=_get_chat_client(),
)
developer = Agent(
name="Developer",
description="An urban software developer from the United States",
instructions="""
You're a software developer from the United States. Your life is fast-paced and technology-driven.
You value innovation, freedom, and work-life balance. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use concrete examples from your experience
- Keep responses thoughtful but concise (2-4 sentences)
""",
client=_get_chat_client(),
)
teacher = Agent(
name="Teacher",
description="A retired history teacher from Eastern Europe",
instructions="""
You're a retired history teacher from Eastern Europe. You bring historical and philosophical
perspectives to discussions. You value legacy, learning, and cultural continuity.
You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use concrete examples from history or your teaching experience
- Keep responses thoughtful but concise (2-4 sentences)
""",
client=_get_chat_client(),
)
activist = Agent(
name="Activist",
description="A young activist from South America",
instructions="""
You're a young activist from South America. You focus on social justice, environmental rights,
and generational change. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use concrete examples from your activism
- Keep responses thoughtful but concise (2-4 sentences)
""",
client=_get_chat_client(),
)
spiritual_leader = Agent(
name="SpiritualLeader",
description="A spiritual leader from the Middle East",
instructions="""
You're a spiritual leader from the Middle East. You provide insights grounded in religion,
morality, and community service. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use examples from spiritual teachings or community work
- Keep responses thoughtful but concise (2-4 sentences)
""",
client=_get_chat_client(),
)
artist = Agent(
name="Artist",
description="An artist from Africa",
instructions="""
You're an artist from Africa. You view life through creative expression, storytelling,
and collective memory. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use examples from your art or cultural traditions
- Keep responses thoughtful but concise (2-4 sentences)
""",
client=_get_chat_client(),
)
immigrant = Agent(
name="Immigrant",
description="An immigrant entrepreneur from Asia living in Canada",
instructions="""
You're an immigrant entrepreneur from Asia living in Canada. You balance tradition with adaptation.
You focus on family success, risk, and opportunity. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use examples from your immigrant and entrepreneurial journey
- Keep responses thoughtful but concise (2-4 sentences)
""",
client=_get_chat_client(),
)
doctor = Agent(
name="Doctor",
description="A doctor from Scandinavia",
instructions="""
You're a doctor from Scandinavia. Your perspective is shaped by public health, equity,
and structured societal support. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use examples from healthcare and societal systems
- Keep responses thoughtful but concise (2-4 sentences)
""",
client=_get_chat_client(),
)
# termination_condition: stop after 10 assistant messages
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = (
GroupChatBuilder(
participants=[farmer, developer, teacher, activist, spiritual_leader, artist, immigrant, doctor],
termination_condition=lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 10,
intermediate_outputs=True,
orchestrator_agent=moderator,
)
.with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 10)
.build()
)
topic = "What does a good life mean to you personally?"
print("\n" + "=" * 80)
print("PHILOSOPHICAL DEBATE: Perspectives on a Good Life")
print("=" * 80)
print(f"\nTopic: {topic}")
print("\nParticipants:")
print(" - Farmer (Southeast Asia)")
print(" - Developer (United States)")
print(" - Teacher (Eastern Europe)")
print(" - Activist (South America)")
print(" - SpiritualLeader (Middle East)")
print(" - Artist (Africa)")
print(" - Immigrant (Asia → Canada)")
print(" - Doctor (Scandinavia)")
print("\n" + "=" * 80)
print("DISCUSSION BEGINS")
print("=" * 80 + "\n")
# Keep track of the last response to format output nicely in streaming mode
last_response_id: str | None = None
async for event in workflow.run(f"Please begin the discussion on: {topic}", stream=True):
if event.type == "output":
data = event.data
if isinstance(data, AgentResponseUpdate):
rid = data.response_id
if rid != last_response_id:
if last_response_id is not None:
print("\n")
print(f"{data.author_name}:", end=" ", flush=True)
last_response_id = rid
print(data.text, end="", flush=True)
elif event.type == "output":
# The output of the group chat workflow is a collection of chat messages from all participants
outputs = cast(list[Message], event.data)
print("\n" + "=" * 80)
print("\nFinal Conversation Transcript:\n")
for message in outputs:
print(f"{message.author_name or message.role}: {message.text}\n")
"""
Sample Output:
================================================================================
PHILOSOPHICAL DEBATE: Perspectives on a Good Life
================================================================================
Topic: What does a good life mean to you personally?
Participants:
- Farmer (Southeast Asia)
- Developer (United States)
- Teacher (Eastern Europe)
- Activist (South America)
- SpiritualLeader (Middle East)
- Artist (Africa)
- Immigrant (Asia → Canada)
- Doctor (Scandinavia)
================================================================================
DISCUSSION BEGINS
================================================================================
[Farmer]
To me, a good life is deeply intertwined with the rhythm of the land and the nurturing of relationships with my
family and community. It means cultivating crops that respect our environment, ensuring sustainability for future
generations, and sharing meals made from our harvests around the dinner table. The joy found in everyday
tasks—planting rice or tending to our livestock—creates a sense of fulfillment that cannot be measured by material
wealth. It's the simple moments, like sharing stories with my children under the stars, that truly define a good
life. What good is progress if it isolates us from those we love and the land that sustains us?
[Developer]
As a software developer in an urban environment, a good life for me hinges on the intersection of innovation,
creativity, and balance. It's about having the freedom to explore new technologies that can solve real-world
problems while ensuring that my work doesn't encroach on my personal life. For instance, I value remote work
flexibility, which allows me to maintain connections with family and friends, similar to how the Farmer values
community. While our lifestyles may differ markedly, both of us seek fulfillment—whether through meaningful work or
rich personal experiences. The challenge is finding harmony between technological progress and preserving the
intimate human connections that truly enrich our lives.
[SpiritualLeader]
From my spiritual perspective, a good life embodies a balance between personal fulfillment and service to others,
rooted in compassion and community. In our teachings, we emphasize that true happiness comes from helping those in
need and fostering strong connections with our families and neighbors. Whether it's the Farmer nurturing the earth
or the Developer creating tools to enhance lives, both contribute to the greater good. The essence of a good life
lies in our intentions and actions—finding ways to serve our communities, spread kindness, and live harmoniously
with those around us. Ultimately, as we align our personal beliefs with our communal responsibilities, we cultivate
a richness that transcends material wealth.
[Activist]
As a young activist in South America, a good life for me is about advocating for social justice and environmental
sustainability. It means living in a society where everyone's rights are respected and where marginalized voices,
particularly those of Indigenous communities, are amplified. I see a good life as one where we work collectively to
dismantle oppressive systems—such as deforestation and inequality—while nurturing our planet. For instance, through
my activism, I've witnessed the transformative power of community organizing, where collective efforts lead to real
change, like resisting destructive mining practices that threaten our rivers and lands. A good life, therefore, is
not just lived for oneself but is deeply tied to the well-being of our communities and the health of our
environment. How can we, regardless of our backgrounds, collaborate to foster these essential changes?
[Teacher]
As a retired history teacher from Eastern Europe, my understanding of a good life is deeply rooted in the lessons
drawn from history and the struggle for freedom and dignity. Historical events, such as the fall of the Iron
Curtain, remind us of the profound importance of liberty and collective resilience. A good life, therefore, is about
cherishing our freedoms and working towards a society where everyone has a voice, much as my students and I
discussed the impacts of totalitarian regimes. Additionally, I believe it involves fostering cultural continuity,
where we honor our heritage while embracing progressive values. We must learn from the past—especially the
consequences of neglecting empathy and solidarity—so that we can cultivate a future that values every individual's
contributions to the rich tapestry of our shared humanity. How can we ensure that the lessons of history inform a
more compassionate and just society moving forward?
[Artist]
As an artist from Africa, I define a good life as one steeped in cultural expression, storytelling, and the
celebration of our collective memories. Art is a powerful medium through which we capture our histories, struggles,
and triumphs, creating a tapestry that connects generations. For instance, in my work, I often draw from folktales
and traditional music, weaving narratives that reflect the human experience, much like how the retired teacher
emphasizes learning from history. A good life involves not only personal fulfillment but also the responsibility to
share our narratives and use our creativity to inspire change, whether addressing social injustices or environmental
issues. It's in this interplay of art and activism that we can transcend individual existence and contribute to a
collective good, fostering empathy and understanding among diverse communities. How can we harness art to bridge
differences and amplify marginalized voices in our pursuit of a good life?
================================================================================
DISCUSSION SUMMARY
================================================================================
As our discussion unfolds, several key themes have gracefully emerged, reflecting the richness of diverse
perspectives on what constitutes a good life. From the rural farmer's integration with the land to the developer's
search for balance between technology and personal connection, each viewpoint validates that fulfillment, at its
core, transcends material wealth. The spiritual leader and the activist highlight the importance of community and
social justice, while the history teacher and the artist remind us of the lessons and narratives that shape our
cultural and personal identities.
Ultimately, the good life seems to revolve around meaningful relationships, honoring our legacies while striving for
progress, and nurturing both our inner selves and external communities. This dialogue demonstrates that despite our
varied backgrounds and experiences, the quest for a good life binds us together, urging cooperation and empathy in
our shared human journey.
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -1,135 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import cast
from agent_framework import (
Agent,
AgentResponseUpdate,
Message,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import GroupChatBuilder, GroupChatState
from azure.identity import AzureCliCredential
"""
Sample: Group Chat with a round-robin speaker selector
What it does:
- Demonstrates the selection_func parameter for GroupChat orchestration
- Uses a pure Python function to control speaker selection based on conversation state
Prerequisites:
- OpenAI environment variables configured for OpenAIChatClient
"""
def round_robin_selector(state: GroupChatState) -> str:
"""A round-robin selector function that picks the next speaker based on the current round index."""
participant_names = list(state.participants.keys())
return participant_names[state.current_round % len(participant_names)]
async def main() -> None:
# Create a chat client using Azure OpenAI and Azure CLI credentials for all agents
client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Participant agents
expert = Agent(
name="PythonExpert",
instructions=(
"You are an expert in Python in a workgroup. "
"Your job is to answer Python related questions and refine your answer "
"based on feedback from all the other participants."
),
client=client,
)
verifier = Agent(
name="AnswerVerifier",
instructions=(
"You are a programming expert in a workgroup. "
f"Your job is to review the answer provided by {expert.name} and point "
"out statements that are technically true but practically dangerous."
"If there is nothing woth pointing out, respond with 'The answer looks good to me.'"
),
client=client,
)
clarifier = Agent(
name="AnswerClarifier",
instructions=(
"You are an accessibility expert in a workgroup. "
f"Your job is to review the answer provided by {expert.name} and point "
"out jargons or complex terms that may be difficult for a beginner to understand."
"If there is nothing worth pointing out, respond with 'The answer looks clear to me.'"
),
client=client,
)
skeptic = Agent(
name="Skeptic",
instructions=(
"You are a devil's advocate in a workgroup. "
f"Your job is to review the answer provided by {expert.name} and point "
"out caveats, exceptions, and alternative perspectives."
"If there is nothing worth pointing out, respond with 'I have no further questions.'"
),
client=client,
)
# Build the group chat workflow
# termination_condition: stop after 6 messages (user task + one full rounds + 1)
# One round is expert -> verifier -> clarifier -> skeptic, after which the expert gets to respond again.
# This will end the conversation after the expert has spoken 2 times (one iteration loop)
# Note: it's possible that the expert gets it right the first time and the other participants
# have nothing to add, but for demo purposes we want to see at least one full round of interaction.
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = (
GroupChatBuilder(
participants=[expert, verifier, clarifier, skeptic],
termination_condition=lambda conversation: len(conversation) >= 6,
intermediate_outputs=True,
selection_func=round_robin_selector,
)
# Set a hard termination condition: stop after 6 messages (user task + one full rounds + 1)
# One round is expert -> verifier -> clarifier -> skeptic, after which the expert gets to respond again.
# This will end the conversation after the expert has spoken 2 times (one iteration loop)
# Note: it's possible that the expert gets it right the first time and the other participants
# have nothing to add, but for demo purposes we want to see at least one full round of interaction.
.with_termination_condition(lambda conversation: len(conversation) >= 6)
.build()
)
task = "How does Pythons Protocol differ from abstract base classes?"
print("\nStarting Group Chat with round-robin speaker selector...\n")
print(f"TASK: {task}\n")
print("=" * 80)
# Keep track of the last response to format output nicely in streaming mode
last_response_id: str | None = None
async for event in workflow.run(task, stream=True):
if event.type == "output":
data = event.data
if isinstance(data, AgentResponseUpdate):
rid = data.response_id
if rid != last_response_id:
if last_response_id is not None:
print("\n")
print(f"{data.author_name}:", end=" ", flush=True)
last_response_id = rid
print(data.text, end="", flush=True)
elif event.type == "output":
# The output of the group chat workflow is a collection of chat messages from all participants
outputs = cast(list[Message], event.data)
print("\n" + "=" * 80)
print("\nFinal Conversation Transcript:\n")
for message in outputs:
print(f"{message.author_name or message.role}: {message.text}\n")
if __name__ == "__main__":
asyncio.run(main())
@@ -1,151 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import logging
from typing import cast
from agent_framework import (
Agent,
AgentResponseUpdate,
Message,
resolve_agent_id,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import HandoffBuilder
from azure.identity import AzureCliCredential
logging.basicConfig(level=logging.ERROR)
"""Sample: Autonomous handoff workflow with agent iteration.
This sample demonstrates `.with_autonomous_mode()`, where agents continue
iterating on their task until they explicitly invoke a handoff tool. This allows
specialists to perform long-running autonomous work (research, coding, analysis)
without prematurely returning control to the coordinator or user.
Routing Pattern:
User -> Coordinator -> Specialist (iterates N times) -> Handoff -> Final Output
Prerequisites:
- `az login` (Azure CLI authentication)
- Environment variables for AzureOpenAIChatClient (AZURE_OPENAI_ENDPOINT, etc.)
Key Concepts:
- Autonomous interaction mode: agents iterate until they handoff
- Turn limits: use `.with_autonomous_mode(turn_limits={agent_name: N})` to cap iterations per agent
"""
def create_agents(
client: AzureOpenAIChatClient,
) -> tuple[Agent, Agent, Agent]:
"""Create coordinator and specialists for autonomous iteration."""
coordinator = client.as_agent(
instructions=(
"You are a coordinator. You break down a user query into a research task and a summary task. "
"Assign the two tasks to the appropriate specialists, one after the other."
),
name="coordinator",
)
research_agent = client.as_agent(
instructions=(
"You are a research specialist that explores topics thoroughly using web search. "
"When given a research task, break it down into multiple aspects and explore each one. "
"Continue your research across multiple responses - don't try to finish everything in one "
"response. After each response, think about what else needs to be explored. When you have "
"covered the topic comprehensively (at least 3-4 different aspects), return control to the "
"coordinator. Keep each individual response focused on one aspect."
),
name="research_agent",
)
summary_agent = client.as_agent(
instructions=(
"You summarize research findings. Provide a concise, well-organized summary. When done, return "
"control to the coordinator."
),
name="summary_agent",
)
return coordinator, research_agent, summary_agent
async def main() -> None:
"""Run an autonomous handoff workflow with specialist iteration enabled."""
client = AzureOpenAIChatClient(credential=AzureCliCredential())
coordinator, research_agent, summary_agent = create_agents(client)
# Build the workflow with autonomous mode
# In autonomous mode, agents continue iterating until they invoke a handoff tool
# termination_condition: Terminate after coordinator provides 5 assistant responses
workflow = (
HandoffBuilder(
name="autonomous_iteration_handoff",
participants=[coordinator, research_agent, summary_agent],
termination_condition=lambda conv: (
sum(1 for msg in conv if msg.author_name == "coordinator" and msg.role == "assistant") >= 5
),
)
.with_start_agent(coordinator)
.add_handoff(coordinator, [research_agent, summary_agent])
.add_handoff(research_agent, [coordinator]) # Research can hand back to coordinator
.add_handoff(summary_agent, [coordinator])
.with_autonomous_mode(
# You can set turn limits per agent to allow some agents to go longer.
# If a limit is not set, the agent will get an default limit: 50.
# Internally, handoff prefers agent names as the agent identifiers if set.
# Otherwise, it falls back to agent IDs.
turn_limits={
resolve_agent_id(coordinator): 5,
resolve_agent_id(research_agent): 10,
resolve_agent_id(summary_agent): 5,
}
)
.build()
)
request = "Perform a comprehensive research on Microsoft Agent Framework."
print("Request:", request)
last_response_id: str | None = None
async for event in workflow.run(request, stream=True):
if event.type == "handoff_sent":
print(f"\nHandoff Event: from {event.data.source} to {event.data.target}\n")
elif event.type == "output":
data = event.data
if isinstance(data, AgentResponseUpdate):
if not data.text:
# Skip updates that don't have text content
# These can be tool calls or other non-text events
continue
rid = data.response_id
if rid != last_response_id:
if last_response_id is not None:
print("\n")
print(f"{data.author_name}:", end=" ", flush=True)
last_response_id = rid
print(data.text, end="", flush=True)
elif event.type == "output":
# The output of the handoff workflow is a collection of chat messages from all participants
outputs = cast(list[Message], event.data)
print("\n" + "=" * 80)
print("\nFinal Conversation Transcript:\n")
for message in outputs:
print(f"{message.author_name or message.role}: {message.text}\n")
"""
Expected behavior:
- Coordinator routes to research_agent.
- Research agent iterates multiple times, exploring different aspects of Microsoft Agent Framework.
- Each iteration adds to the conversation without returning to coordinator.
- After thorough research, research_agent calls handoff to coordinator.
- Coordinator routes to summary_agent for final summary.
In autonomous mode, agents continue working until they invoke a handoff tool,
allowing the research_agent to perform 3-4+ responses before handing off.
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -1,296 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Annotated, cast
from agent_framework import (
Agent,
AgentResponse,
Message,
WorkflowEvent,
WorkflowRunState,
tool,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import HandoffAgentUserRequest, HandoffBuilder
from azure.identity import AzureCliCredential
"""Sample: Simple handoff workflow.
A handoff workflow defines a pattern that assembles agents in a mesh topology, allowing
them to transfer control to each other based on the conversation context.
Prerequisites:
- `az login` (Azure CLI authentication)
- Environment variables configured for AzureOpenAIChatClient (AZURE_OPENAI_ENDPOINT, etc.)
Key Concepts:
- Auto-registered handoff tools: HandoffBuilder automatically creates handoff tools
for each participant, allowing the coordinator to transfer control to specialists
- Termination condition: Controls when the workflow stops requesting user input
- Request/response cycle: Workflow requests input, user responds, cycle continues
"""
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
# See:
# samples/02-agents/tools/function_tool_with_approval.py
# samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def process_refund(order_number: Annotated[str, "Order number to process refund for"]) -> str:
"""Simulated function to process a refund for a given order number."""
return f"Refund processed successfully for order {order_number}."
@tool(approval_mode="never_require")
def check_order_status(order_number: Annotated[str, "Order number to check status for"]) -> str:
"""Simulated function to check the status of a given order number."""
return f"Order {order_number} is currently being processed and will ship in 2 business days."
@tool(approval_mode="never_require")
def process_return(order_number: Annotated[str, "Order number to process return for"]) -> str:
"""Simulated function to process a return for a given order number."""
return f"Return initiated successfully for order {order_number}. You will receive return instructions via email."
def create_agents(client: AzureOpenAIChatClient) -> tuple[Agent, Agent, Agent, Agent]:
"""Create and configure the triage and specialist agents.
Args:
client: The AzureOpenAIChatClient to use for creating agents.
Returns:
Tuple of (triage_agent, refund_agent, order_agent, return_agent)
"""
# Triage agent: Acts as the frontline dispatcher
triage_agent = client.as_agent(
instructions=(
"You are frontline support triage. Route customer issues to the appropriate specialist agents "
"based on the problem described."
),
name="triage_agent",
)
# Refund specialist: Handles refund requests
refund_agent = client.as_agent(
instructions="You process refund requests.",
name="refund_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[process_refund],
)
# Order/shipping specialist: Resolves delivery issues
order_agent = client.as_agent(
instructions="You handle order and shipping inquiries.",
name="order_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[check_order_status],
)
# Return specialist: Handles return requests
return_agent = client.as_agent(
instructions="You manage product return requests.",
name="return_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[process_return],
)
return triage_agent, refund_agent, order_agent, return_agent
def _handle_events(events: list[WorkflowEvent]) -> list[WorkflowEvent[HandoffAgentUserRequest]]:
"""Process workflow events and extract any pending user input requests.
This function inspects each event type and:
- Prints workflow status changes (IDLE, IDLE_WITH_PENDING_REQUESTS, etc.)
- Displays final conversation snapshots when workflow completes
- Prints user input request prompts
- Collects all request_info events for response handling
Args:
events: List of WorkflowEvent to process
Returns:
List of WorkflowEvent[HandoffAgentUserRequest] representing pending user input requests
"""
requests: list[WorkflowEvent[HandoffAgentUserRequest]] = []
for event in events:
if event.type == "handoff_sent":
# handoff_sent event: Indicates a handoff has been initiated
print(f"\n[Handoff from {event.data.source} to {event.data.target} initiated.]")
elif event.type == "status" and event.state in {
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
}:
# Status event: Indicates workflow state changes
print(f"\n[Workflow Status] {event.state}")
elif event.type == "output":
# Output event: Contains contents generated by the workflow
data = event.data
if isinstance(data, AgentResponse):
for message in data.messages:
if not message.text:
# Skip messages without text (e.g., tool calls)
continue
speaker = message.author_name or message.role
print(f"- {speaker}: {message.text}")
elif event.type == "output":
# The output of the handoff workflow is a collection of chat messages from all participants
conversation = cast(list[Message], event.data)
if isinstance(conversation, list):
print("\n=== Final Conversation Snapshot ===")
for message in conversation:
speaker = message.author_name or message.role
print(f"- {speaker}: {message.text or [content.type for content in message.contents]}")
print("===================================")
elif event.type == "request_info" and isinstance(event.data, HandoffAgentUserRequest):
_print_handoff_agent_user_request(event.data.agent_response)
requests.append(cast(WorkflowEvent[HandoffAgentUserRequest], event))
return requests
def _print_handoff_agent_user_request(response: AgentResponse) -> None:
"""Display the agent's response messages when requesting user input.
This will happen when an agent generates a response that doesn't trigger
a handoff, i.e., the agent is asking the user for more information.
Args:
response: The AgentResponse from the agent requesting user input
"""
if not response.messages:
raise RuntimeError("Cannot print agent responses: response has no messages.")
print("\n[Agent is requesting your input...]")
# Print agent responses
for message in response.messages:
if not message.text:
# Skip messages without text (e.g., tool calls)
continue
speaker = message.author_name or message.role
print(f"- {speaker}: {message.text}")
async def main() -> None:
"""Main entry point for the handoff workflow demo.
This function demonstrates:
1. Creating triage and specialist agents
2. Building a handoff workflow with custom termination condition
3. Running the workflow with scripted user responses
4. Processing events and handling user input requests
The workflow uses scripted responses instead of interactive input to make
the demo reproducible and testable. In a production application, you would
replace the scripted_responses with actual user input collection.
"""
# Initialize the Azure OpenAI chat client
client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create all agents: triage + specialists
triage, refund, order, support = create_agents(client)
# Build the handoff workflow
# - participants: All agents that can participate in the workflow
# - with_start_agent: The triage agent is designated as the start agent, which means
# it receives all user input first and orchestrates handoffs to specialists
# - termination_condition: Custom logic to stop the request/response loop.
# Without this, the default behavior continues requesting user input until max_turns
# is reached. Here we use a custom condition that checks if the conversation has ended
# naturally (when one of the agents says something like "you're welcome").
workflow = (
HandoffBuilder(
name="customer_support_handoff",
participants=[triage, refund, order, support],
# Custom termination: Check if one of the agents has provided a closing message.
# This looks for the last message containing "welcome", which indicates the
# conversation has concluded naturally.
termination_condition=lambda conversation: (
len(conversation) > 0 and "welcome" in conversation[-1].text.lower()
),
)
.with_start_agent(triage)
.build()
)
# Scripted user responses for reproducible demo
# In a console application, replace this with:
# user_input = input("Your response: ")
# or integrate with a UI/chat interface
scripted_responses = [
"My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it.",
"Please also process a refund for order 1234.",
"Thanks for resolving this.",
]
# Start the workflow with the initial user message
# run(..., stream=True) returns an async iterator of WorkflowEvent
print("[Starting workflow with initial user message...]\n")
initial_message = "Hello, I need assistance with my recent purchase."
print(f"- User: {initial_message}")
workflow_result = workflow.run(initial_message, stream=True)
pending_requests = _handle_events([event async for event in workflow_result])
# Process the request/response cycle
# The workflow will continue requesting input until:
# 1. The termination condition is met, OR
# 2. We run out of scripted responses
while pending_requests:
if not scripted_responses:
# No more scripted responses; terminate the workflow
responses = {req.request_id: HandoffAgentUserRequest.terminate() for req in pending_requests}
else:
# Get the next scripted response
user_response = scripted_responses.pop(0)
print(f"\n- User: {user_response}")
# Send response(s) to all pending requests
# In this demo, there's typically one request per cycle, but the API supports multiple
responses = {
req.request_id: HandoffAgentUserRequest.create_response(user_response) for req in pending_requests
}
# Send responses and get new events
# We use run(responses=...) to get events from the workflow, allowing us to
# display agent responses and handle new requests as they arrive
events = await workflow.run(responses=responses)
pending_requests = _handle_events(events)
"""
Sample Output:
[Starting workflow with initial user message...]
- User: Hello, I need assistance with my recent purchase.
- triage_agent: Could you please provide more details about the issue you're experiencing with your recent purchase? This will help me route you to the appropriate specialist.
[Workflow Status] IDLE_WITH_PENDING_REQUESTS
- User: My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it.
- triage_agent: I've directed your request to our return agent, who will assist you with returning the damaged order. Thank you for your patience!
- return_agent: The return for your order 1234 has been successfully initiated. You will receive return instructions via email shortly. If you have any other questions or need further assistance, feel free to ask!
[Workflow Status] IDLE_WITH_PENDING_REQUESTS
- User: Thanks for resolving this.
=== Final Conversation Snapshot ===
- user: Hello, I need assistance with my recent purchase.
- triage_agent: Could you please provide more details about the issue you're experiencing with your recent purchase? This will help me route you to the appropriate specialist.
- user: My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it.
- triage_agent: I've directed your request to our return agent, who will assist you with returning the damaged order. Thank you for your patience!
- return_agent: The return for your order 1234 has been successfully initiated. You will receive return instructions via email shortly. If you have any other questions or need further assistance, feel free to ask!
- user: Thanks for resolving this.
- triage_agent: You're welcome! If you have any more questions or need assistance in the future, feel free to reach out. Have a great day!
===================================
[Workflow Status] IDLE
""" # noqa: E501
if __name__ == "__main__":
asyncio.run(main())
@@ -1,241 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Handoff Workflow with Code Interpreter File Generation Sample
This sample demonstrates retrieving file IDs from code interpreter output
in a handoff workflow context. A triage agent routes to a code specialist
that generates a text file, and we verify the file_id is captured correctly
from the streaming workflow events.
Verifies GitHub issue #2718: files generated by code interpreter in
HandoffBuilder workflows can be properly retrieved.
Toggle USE_V2_CLIENT to switch between:
- V1: AzureAIAgentClient (azure-ai-agents SDK)
- V2: AzureAIClient (azure-ai-projects 2.x with Responses API)
IMPORTANT: When using V2 AzureAIClient with HandoffBuilder, each agent must
have its own client instance. The V2 client binds to a single server-side
agent name, so sharing a client between agents causes routing issues.
Prerequisites:
- `az login` (Azure CLI authentication)
- V1: AZURE_AI_AGENT_PROJECT_CONNECTION_STRING
- V2: AZURE_AI_PROJECT_ENDPOINT, AZURE_AI_MODEL_DEPLOYMENT_NAME
"""
import asyncio
from collections.abc import AsyncIterable, AsyncIterator
from contextlib import asynccontextmanager
from typing import cast
from agent_framework import (
Agent,
AgentResponseUpdate,
Message,
WorkflowEvent,
WorkflowRunState,
)
from agent_framework.orchestrations import HandoffAgentUserRequest, HandoffBuilder
from azure.identity.aio import AzureCliCredential
# Toggle between V1 (AzureAIAgentClient) and V2 (AzureAIClient)
USE_V2_CLIENT = False
async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]:
"""Collect all events from an async stream."""
return [event async for event in stream]
def _handle_events(events: list[WorkflowEvent]) -> tuple[list[WorkflowEvent[HandoffAgentUserRequest]], list[str]]:
"""Process workflow events and extract file IDs and pending requests.
Returns:
Tuple of (pending_requests, file_ids_found)
"""
requests: list[WorkflowEvent[HandoffAgentUserRequest]] = []
file_ids: list[str] = []
for event in events:
if event.type == "handoff_sent":
print(f"\n[Handoff from {event.data.source} to {event.data.target} initiated.]")
elif event.type == "status" and event.state in {
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
}:
print(f"[status] {event.state.name}")
elif event.type == "request_info" and isinstance(event.data, HandoffAgentUserRequest):
requests.append(cast(WorkflowEvent[HandoffAgentUserRequest], event))
elif event.type == "output":
data = event.data
if isinstance(data, AgentResponseUpdate):
for content in data.contents:
if content.type == "hosted_file":
file_ids.append(content.file_id) # type: ignore
print(f"[Found HostedFileContent: file_id={content.file_id}]")
elif content.type == "text" and content.annotations:
for annotation in content.annotations:
file_id = annotation["file_id"] # type: ignore
file_ids.append(file_id)
print(f"[Found file annotation: file_id={file_id}]")
elif event.type == "output":
conversation = cast(list[Message], event.data)
if isinstance(conversation, list):
print("\n=== Final Conversation Snapshot ===")
for message in conversation:
speaker = message.author_name or message.role
print(f"- {speaker}: {message.text or [content.type for content in message.contents]}")
print("===================================")
return requests, file_ids
@asynccontextmanager
async def create_agents_v1(credential: AzureCliCredential) -> AsyncIterator[tuple[Agent, Agent]]:
"""Create agents using V1 AzureAIAgentClient."""
from agent_framework.azure import AzureAIAgentClient
async with AzureAIAgentClient(credential=credential) as client:
triage = client.as_agent(
name="triage_agent",
instructions=(
"You are a triage agent. Route code-related requests to the code_specialist. "
"When the user asks to create or generate files, hand off to code_specialist "
"by calling handoff_to_code_specialist."
),
)
# Create code interpreter tool using instance method
code_interpreter_tool = client.get_code_interpreter_tool()
code_specialist = client.as_agent(
name="code_specialist",
instructions=(
"You are a Python code specialist. Use the code interpreter to execute Python code "
"and create files when requested. Always save files to /mnt/data/ directory."
),
tools=[code_interpreter_tool],
)
yield triage, code_specialist # type: ignore
@asynccontextmanager
async def create_agents_v2(credential: AzureCliCredential) -> AsyncIterator[tuple[Agent, Agent]]:
"""Create agents using V2 AzureAIClient.
Each agent needs its own client instance because the V2 client binds
to a single server-side agent name.
"""
from agent_framework.azure import AzureAIClient
async with (
AzureAIClient(credential=credential) as triage_client,
AzureAIClient(credential=credential) as code_client,
):
triage = triage_client.as_agent(
name="TriageAgent",
instructions="You are a triage agent. Your ONLY job is to route requests to the appropriate specialist.",
)
# Create code interpreter tool using instance method
code_interpreter_tool = code_client.get_code_interpreter_tool()
code_specialist = code_client.as_agent(
name="CodeSpecialist",
instructions=(
"You are a Python code specialist. You have access to a code interpreter tool. "
"Use the code interpreter to execute Python code and create files. "
"Always save files to /mnt/data/ directory. "
"Do NOT discuss handoffs or routing - just complete the coding task directly."
),
tools=[code_interpreter_tool],
)
yield triage, code_specialist
async def main() -> None:
"""Run a simple handoff workflow with code interpreter file generation."""
client_version = "V2 (AzureAIClient)" if USE_V2_CLIENT else "V1 (AzureAIAgentClient)"
print(f"=== Handoff Workflow with Code Interpreter File Generation [{client_version}] ===\n")
async with AzureCliCredential() as credential:
create_agents = create_agents_v2 if USE_V2_CLIENT else create_agents_v1
async with create_agents(credential) as (triage, code_specialist):
workflow = (
HandoffBuilder(
termination_condition=lambda conv: sum(1 for msg in conv if msg.role == "user") >= 2,
)
.participants([triage, code_specialist])
.with_start_agent(triage)
.build()
)
user_inputs = [
"Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.",
"exit",
]
input_index = 0
all_file_ids: list[str] = []
print(f"User: {user_inputs[0]}")
events = await _drain(workflow.run(user_inputs[0], stream=True))
requests, file_ids = _handle_events(events)
all_file_ids.extend(file_ids)
input_index += 1
while requests:
request = requests[0]
if input_index >= len(user_inputs):
break
user_input = user_inputs[input_index]
print(f"\nUser: {user_input}")
responses = {request.request_id: HandoffAgentUserRequest.create_response(user_input)}
events = await _drain(workflow.run(stream=True, responses=responses))
requests, file_ids = _handle_events(events)
all_file_ids.extend(file_ids)
input_index += 1
print("\n" + "=" * 50)
if all_file_ids:
print(f"SUCCESS: Found {len(all_file_ids)} file ID(s) in handoff workflow:")
for fid in all_file_ids:
print(f" - {fid}")
else:
print("WARNING: No file IDs captured from the handoff workflow.")
print("=" * 50)
"""
Sample Output:
User: Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.
[Found HostedFileContent: file_id=assistant-JT1sA...]
=== Conversation So Far ===
- user: Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.
- triage_agent: I am handing off your request to create the text file "hello.txt" with the specified content to the code specialist. They will assist you shortly.
- code_specialist: The file "hello.txt" has been created with the content "Hello from handoff workflow!". You can download it using the link below:
[hello.txt](sandbox:/mnt/data/hello.txt)
===========================
[status] IDLE_WITH_PENDING_REQUESTS
User: exit
[status] IDLE
==================================================
SUCCESS: Found 1 file ID(s) in handoff workflow:
- assistant-JT1sA...
==================================================
""" # noqa: E501
if __name__ == "__main__":
asyncio.run(main())
@@ -1,144 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
import logging
from typing import cast
from agent_framework import (
Agent,
AgentResponseUpdate,
Message,
WorkflowEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
from agent_framework.orchestrations import GroupChatRequestSentEvent, MagenticBuilder, MagenticProgressLedger
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
"""
Sample: Magentic Orchestration (multi-agent)
What it does:
- Orchestrates multiple agents using `MagenticBuilder` with streaming callbacks.
- ResearcherAgent (Agent backed by an OpenAI chat client) for
finding information.
- CoderAgent (Agent backed by OpenAI Assistants with the hosted
code interpreter tool) for analysis and computation.
The workflow is configured with:
- A Standard Magentic manager (uses a chat client for planning and progress).
- Callbacks for final results, per-message agent responses, and streaming
token updates.
When run, the script builds the workflow, submits a task about estimating the
energy efficiency and CO2 emissions of several ML models, streams intermediate
events, and prints the final answer. The workflow completes when idle.
Prerequisites:
- OpenAI credentials configured for `OpenAIChatClient` and `OpenAIResponsesClient`.
"""
async def main() -> None:
researcher_agent = Agent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
# This agent requires the gpt-4o-search-preview model to perform web searches.
client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
# Create code interpreter tool using instance method
coder_client = OpenAIResponsesClient()
code_interpreter_tool = coder_client.get_code_interpreter_tool()
coder_agent = Agent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
client=coder_client,
tools=code_interpreter_tool,
)
# Create a manager agent for orchestration
manager_agent = Agent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
client=OpenAIChatClient(),
)
print("\nBuilding Magentic Workflow...")
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = MagenticBuilder(
participants=[researcher_agent, coder_agent],
intermediate_outputs=True,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
).build()
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
# Keep track of the last executor to format output nicely in streaming mode
last_response_id: str | None = None
output_event: WorkflowEvent | None = None
async for event in workflow.run(task, stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
response_id = event.data.response_id
if response_id != last_response_id:
if last_response_id is not None:
print("\n")
print(f"- {event.executor_id}:", end=" ", flush=True)
last_response_id = response_id
print(event.data, end="", flush=True)
elif event.type == "magentic_orchestrator":
print(f"\n[Magentic Orchestrator Event] Type: {event.data.event_type.name}")
if isinstance(event.data.content, Message):
print(f"Please review the plan:\n{event.data.content.text}")
elif isinstance(event.data.content, MagenticProgressLedger):
print(f"Please review progress ledger:\n{json.dumps(event.data.content.to_dict(), indent=2)}")
else:
print(f"Unknown data type in MagenticOrchestratorEvent: {type(event.data.content)}")
# Block to allow user to read the plan/progress before continuing
# Note: this is for demonstration only and is not the recommended way to handle human interaction.
# Please refer to `with_plan_review` for proper human interaction during planning phases.
await asyncio.get_event_loop().run_in_executor(None, input, "Press Enter to continue...")
elif event.type == "group_chat" and isinstance(event.data, GroupChatRequestSentEvent):
print(f"\n[REQUEST SENT ({event.data.round_index})] to agent: {event.data.participant_name}")
elif event.type == "output":
output_event = event
if output_event:
# The output of the magentic workflow is a collection of chat messages from all participants
outputs = cast(list[Message], output_event.data)
print("\n" + "=" * 80)
print("\nFinal Conversation Transcript:\n")
for message in outputs:
print(f"{message.author_name or message.role}: {message.text}\n")
if __name__ == "__main__":
asyncio.run(main())
@@ -1,309 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
from pathlib import Path
from typing import cast
from agent_framework import (
Agent,
FileCheckpointStorage,
Message,
WorkflowCheckpoint,
WorkflowEvent,
WorkflowRunState,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import MagenticBuilder, MagenticPlanReviewRequest
from azure.identity._credentials import AzureCliCredential
"""
Sample: Magentic Orchestration + Checkpointing
The goal of this sample is to show the exact mechanics needed to pause a Magentic
workflow that requires human plan review, persist the outstanding request via a
checkpoint, and later resume the workflow by feeding in the saved response.
Concepts highlighted here:
1. **Deterministic executor IDs** - the orchestrator and plan-review request executor
must keep stable IDs so the checkpoint state aligns when we rebuild the graph.
2. **Executor snapshotting** - checkpoints capture the pending plan-review request
map, at superstep boundaries.
3. **Resume with responses** - `Workflow.run(responses=...)` accepts a
`responses` mapping so we can inject the stored human reply during restoration.
Prerequisites:
- OpenAI environment variables configured for `OpenAIChatClient`.
"""
TASK = (
"Draft a concise internal brief describing how our research and implementation teams should collaborate "
"to launch a beta feature for data-driven email summarization. Highlight the key milestones, "
"risks, and communication cadence."
)
# Dedicated folder for captured checkpoints. Keeping it under the sample directory
# makes it easy to inspect the JSON blobs produced by each run.
CHECKPOINT_DIR = Path(__file__).parent / "tmp" / "magentic_checkpoints"
def build_workflow(checkpoint_storage: FileCheckpointStorage):
"""Construct the Magentic workflow graph with checkpointing enabled."""
# Two vanilla ChatAgents act as participants in the orchestration. They do not need
# extra state handling because their inputs/outputs are fully described by chat messages.
researcher = Agent(
name="ResearcherAgent",
description="Collects background facts and references for the project.",
instructions=("You are the research lead. Gather crisp bullet points the team should know."),
client=AzureOpenAIChatClient(credential=AzureCliCredential()),
)
writer = Agent(
name="WriterAgent",
description="Synthesizes the final brief for stakeholders.",
instructions=("You convert the research notes into a structured brief with milestones and risks."),
client=AzureOpenAIChatClient(credential=AzureCliCredential()),
)
# Create a manager agent for orchestration
manager_agent = Agent(
name="MagenticManager",
description="Orchestrator that coordinates the research and writing workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
client=AzureOpenAIChatClient(credential=AzureCliCredential()),
)
# The builder wires in the Magentic orchestrator, sets the plan review path, and
# stores the checkpoint backend so the runtime knows where to persist snapshots.
return MagenticBuilder(
participants=[researcher, writer],
enable_plan_review=True,
checkpoint_storage=checkpoint_storage,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=3,
).build()
async def main() -> None:
# Stage 0: make sure the checkpoint folder is empty so we inspect only checkpoints
# written by this invocation. This prevents stale files from previous runs from
# confusing the analysis.
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
for file in CHECKPOINT_DIR.glob("*.json"):
file.unlink()
checkpoint_storage = FileCheckpointStorage(CHECKPOINT_DIR)
print("\n=== Stage 1: run until plan review request (checkpointing active) ===")
workflow = build_workflow(checkpoint_storage)
# Run the workflow until the first is surfaced. The event carries the
# request_id we must reuse on resume. In a real system this is where the UI would present
# the plan for human review.
plan_review_request: MagenticPlanReviewRequest | None = None
async for event in workflow.run(TASK, stream=True):
if event.type == "request_info" and event.request_type is MagenticPlanReviewRequest:
plan_review_request = event.data
print(f"Captured plan review request: {event.request_id}")
if event.type == "status" and event.state is WorkflowRunState.IDLE_WITH_PENDING_REQUESTS:
break
if plan_review_request is None:
print("No plan review request emitted; nothing to resume.")
return
checkpoints = await checkpoint_storage.list_checkpoints(workflow.id)
if not checkpoints:
print("No checkpoints persisted.")
return
resume_checkpoint = max(
checkpoints,
key=lambda cp: (cp.iteration_count, cp.timestamp),
)
print(f"Using checkpoint {resume_checkpoint.checkpoint_id} at iteration {resume_checkpoint.iteration_count}")
# Show that the checkpoint JSON indeed contains the pending plan-review request record.
checkpoint_path = checkpoint_storage.storage_path / f"{resume_checkpoint.checkpoint_id}.json"
if checkpoint_path.exists():
with checkpoint_path.open() as f:
snapshot = json.load(f)
request_map = snapshot.get("pending_request_info_events", {})
print(f"Pending plan-review requests persisted in checkpoint: {list(request_map.keys())}")
print("\n=== Stage 2: resume from checkpoint and approve plan ===")
resumed_workflow = build_workflow(checkpoint_storage)
# Construct an approval reply to supply when the plan review request is re-emitted.
approval = plan_review_request.approve()
# Resume execution and capture the re-emitted plan review request.
request_info_event: WorkflowEvent | None = None
async for event in resumed_workflow.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True):
if event.type == "request_info" and isinstance(event.data, MagenticPlanReviewRequest):
request_info_event = event
if request_info_event is None:
print("No plan review request re-emitted on resume; cannot approve.")
return
print(f"Resumed plan review request: {request_info_event.request_id}")
# Supply the approval and continue to run to completion.
final_event: WorkflowEvent | None = None
async for event in resumed_workflow.run(stream=True, responses={request_info_event.request_id: approval}):
if event.type == "output":
final_event = event
if final_event is None:
print("Workflow did not complete after resume.")
return
# Final sanity check: display the assistant's answer as proof the orchestration reached
# a natural completion after resuming from the checkpoint.
result = final_event.data
if not result:
print("No result data from workflow.")
return
output_messages = cast(list[Message], result)
print("\n=== Final Answer ===")
# The output of the Magentic workflow is a list of ChatMessages with only one final message
# generated by the orchestrator.
print(output_messages[-1].text)
# ------------------------------------------------------------------
# Stage 3: demonstrate resuming from a later checkpoint (post-plan)
# ------------------------------------------------------------------
def _pending_message_count(cp: WorkflowCheckpoint) -> int:
return sum(len(msg_list) for msg_list in cp.messages.values() if isinstance(msg_list, list))
all_checkpoints = await checkpoint_storage.list_checkpoints(resume_checkpoint.workflow_id)
later_checkpoints_with_messages = [
cp
for cp in all_checkpoints
if cp.iteration_count > resume_checkpoint.iteration_count and _pending_message_count(cp) > 0
]
if later_checkpoints_with_messages:
post_plan_checkpoint = max(
later_checkpoints_with_messages,
key=lambda cp: (cp.iteration_count, cp.timestamp),
)
else:
later_checkpoints = [cp for cp in all_checkpoints if cp.iteration_count > resume_checkpoint.iteration_count]
if not later_checkpoints:
print("\nNo additional checkpoints recorded beyond plan approval; sample complete.")
return
post_plan_checkpoint = max(
later_checkpoints,
key=lambda cp: (cp.iteration_count, cp.timestamp),
)
print("\n=== Stage 3: resume from post-plan checkpoint ===")
pending_messages = _pending_message_count(post_plan_checkpoint)
print(
f"Resuming from checkpoint {post_plan_checkpoint.checkpoint_id} at iteration "
f"{post_plan_checkpoint.iteration_count} (pending messages: {pending_messages})"
)
if pending_messages == 0:
print("Checkpoint has no pending messages; no additional work expected on resume.")
final_event_post: WorkflowEvent | None = None
post_emitted_events = False
post_plan_workflow = build_workflow(checkpoint_storage)
async for event in post_plan_workflow.run(checkpoint_id=post_plan_checkpoint.checkpoint_id, stream=True):
post_emitted_events = True
if event.type == "output":
final_event_post = event
if final_event_post is None:
if not post_emitted_events:
print("No new events were emitted; checkpoint already captured a completed run.")
print("\n=== Final Answer (post-plan resume) ===")
print(output_messages[-1].text)
return
print("Workflow did not complete after post-plan resume.")
return
post_result = final_event_post.data
if not post_result:
print("No result data from post-plan resume.")
return
output_messages = cast(list[Message], post_result)
print("\n=== Final Answer (post-plan resume) ===")
# The output of the Magentic workflow is a list of ChatMessages with only one final message
# generated by the orchestrator.
print(output_messages[-1].text)
"""
Sample Output:
=== Stage 1: run until plan review request (checkpointing active) ===
Captured plan review request: 3a1a4a09-4ed1-4c90-9cf6-9ac488d452c0
Using checkpoint 4c76d77a-6ff8-4d2b-84f6-824771ffac7e at iteration 1
Pending plan-review requests persisted in checkpoint: ['3a1a4a09-4ed1-4c90-9cf6-9ac488d452c0']
=== Stage 2: resume from checkpoint and approve plan ===
=== Final Answer ===
Certainly! Here's your concise internal brief on how the research and implementation teams should collaborate for
the beta launch of the data-driven email summarization feature:
---
**Internal Brief: Collaboration Plan for Data-driven Email Summarization Beta Launch**
**Collaboration Approach**
- **Joint Kickoff:** Research and Implementation teams hold a project kickoff to align on objectives, requirements,
and success metrics.
- **Ongoing Coordination:** Teams collaborate closely; researchers share model developments and insights, while
implementation ensures smooth integration and user experience.
- **Real-time Feedback Loop:** Implementation provides early feedback on technical integration and UX, while
Research evaluates initial performance and user engagement signals post-integration.
**Key Milestones**
1. **Requirement Finalization & Scoping** - Define MVP feature set and success criteria.
2. **Model Prototyping & Evaluation** - Researchers develop and validate summarization models with agreed metrics.
3. **Integration & Internal Testing** - Implementation team integrates the model; internal alpha testing and
compliance checks.
4. **Beta User Onboarding** - Recruit a select cohort of beta users and guide them through onboarding.
5. **Beta Launch & Monitoring** - Soft-launch for beta group, with active monitoring of usage, feedback,
and performance.
6. **Iterative Improvements** - Address issues, refine features, and prepare for possible broader rollout.
**Top Risks**
- **Data Privacy & Compliance:** Strict protocols and compliance reviews to prevent data leakage.
- **Model Quality (Bias, Hallucination):** Careful monitoring of summary accuracy; rapid iterations if critical
errors occur.
- **User Adoption:** Ensuring the beta solves genuine user needs, collecting actionable feedback early.
- **Feedback Quality & Quantity:** Proactively schedule user outreach to ensure substantive beta feedback.
**Communication Cadence**
- **Weekly Team Syncs:** Short all-hands progress and blockers meeting.
- **Bi-Weekly Stakeholder Check-ins:** Leadership and project leads address escalations and strategic decisions.
- **Dedicated Slack Channel:** For real-time queries and updates.
- **Documentation Hub:** Up-to-date project docs and FAQs on a shared internal wiki.
- **Post-Milestone Retrospectives:** After critical phases (e.g., alpha, beta), reviewing what worked and what needs
improvement.
**Summary**
Clear alignment, consistent communication, and iterative feedback are key to a successful beta. All team members are
expected to surface issues quickly and keep documentation current as we drive toward launch.
---
=== Stage 3: resume from post-plan checkpoint ===
Resuming from checkpoint 9a3b... at iteration 3 (pending messages: 0)
No new events were emitted; checkpoint already captured a completed run.
=== Final Answer (post-plan resume) ===
(same brief as above)
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -1,150 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
from collections.abc import AsyncIterable
from typing import cast
from agent_framework import (
Agent,
AgentResponseUpdate,
Message,
WorkflowEvent,
)
from agent_framework.openai import OpenAIChatClient
from agent_framework.orchestrations import MagenticBuilder, MagenticPlanReviewRequest, MagenticPlanReviewResponse
"""
Sample: Magentic Orchestration with Human Plan Review
This sample demonstrates how humans can review and provide feedback on plans
generated by the Magentic workflow orchestrator. When plan review is enabled,
the workflow requests human approval or revision before executing each plan.
Key concepts:
- with_plan_review(): Enables human review of generated plans
- MagenticPlanReviewRequest: The event type for plan review requests
- Human can choose to: approve the plan or provide revision feedback
Plan review options:
- approve(): Accept the proposed plan and continue execution
- revise(feedback): Provide textual feedback to modify the plan
Prerequisites:
- OpenAI credentials configured for `OpenAIChatClient`.
"""
# Keep track of the last response to format output nicely in streaming mode
last_response_id: str | None = None
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, MagenticPlanReviewResponse] | None:
"""Process events from the workflow stream to capture human feedback requests."""
global last_response_id
requests: dict[str, MagenticPlanReviewRequest] = {}
async for event in stream:
if event.type == "request_info" and event.request_type is MagenticPlanReviewRequest:
requests[event.request_id] = cast(MagenticPlanReviewRequest, event.data)
if event.type == "output":
data = event.data
if isinstance(data, AgentResponseUpdate):
rid = data.response_id
if rid != last_response_id:
if last_response_id is not None:
print("\n")
print(f"{data.author_name}:", end=" ", flush=True)
last_response_id = rid
print(data.text, end="", flush=True)
else:
# The output of the workflow comes from the orchestrator and it's a list of messages
print("\n" + "=" * 60)
print("DISCUSSION COMPLETE")
print("=" * 60)
print("Final discussion summary:")
# To make the type checker happy, we cast event.data to the expected type
outputs = cast(list[Message], event.data)
for msg in outputs:
speaker = msg.author_name or msg.role
print(f"[{speaker}]: {msg.text}")
responses: dict[str, MagenticPlanReviewResponse] = {}
if requests:
for request_id, request in requests.items():
print("\n\n[Magentic Plan Review Request]")
if request.current_progress is not None:
print("Current Progress Ledger:")
print(json.dumps(request.current_progress.to_dict(), indent=2))
print()
print(f"Proposed Plan:\n{request.plan.text}\n")
print("Please provide your feedback (press Enter to approve):")
reply = input("> ") # noqa: ASYNC250
if reply.strip() == "":
print("Plan approved.\n")
responses[request_id] = request.approve()
else:
print("Plan revised by human.\n")
responses[request_id] = request.revise(reply)
return responses if responses else None
async def main() -> None:
researcher_agent = Agent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions="You are a Researcher. You find information and gather facts.",
client=OpenAIChatClient(model_id="gpt-4o"),
)
analyst_agent = Agent(
name="AnalystAgent",
description="Data analyst who processes and summarizes research findings",
instructions="You are an Analyst. You analyze findings and create summaries.",
client=OpenAIChatClient(model_id="gpt-4o"),
)
manager_agent = Agent(
name="MagenticManager",
description="Orchestrator that coordinates the workflow",
instructions="You coordinate a team to complete tasks efficiently.",
client=OpenAIChatClient(model_id="gpt-4o"),
)
print("\nBuilding Magentic Workflow with Human Plan Review...")
# enable_plan_review=True: Request human input for plan review
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = MagenticBuilder(
participants=[researcher_agent, analyst_agent],
enable_plan_review=True,
intermediate_outputs=True,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=1,
max_reset_count=2,
).build()
task = "Research sustainable aviation fuel technology and summarize the findings."
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
print("=" * 60)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(task, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
if __name__ == "__main__":
asyncio.run(main())
@@ -1,79 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import cast
from agent_framework import Message
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
"""
Sample: Sequential workflow (agent-focused API) with shared conversation context
Build a high-level sequential workflow using SequentialBuilder and two domain agents.
The shared conversation (list[Message]) flows through each participant. Each agent
appends its assistant message to the context. The workflow outputs the final conversation
list when complete.
Note on internal adapters:
- Sequential orchestration includes small adapter nodes for input normalization
("input-conversation"), agent-response conversion ("to-conversation:<participant>"),
and completion ("complete"). These may appear as ExecutorInvoke/Completed events in
the stream—similar to how concurrent orchestration includes a dispatcher/aggregator.
You can safely ignore them when focusing on agent progress.
Prerequisites:
- Azure OpenAI access configured for AzureOpenAIChatClient (use az login + env vars)
"""
async def main() -> None:
# 1) Create agents
client = AzureOpenAIChatClient(credential=AzureCliCredential())
writer = client.as_agent(
instructions=("You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."),
name="writer",
)
reviewer = client.as_agent(
instructions=("You are a thoughtful reviewer. Give brief feedback on the previous assistant message."),
name="reviewer",
)
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
# 3) Run and collect outputs
outputs: list[list[Message]] = []
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
if event.type == "output":
outputs.append(cast(list[Message], event.data))
if outputs:
print("===== Final Conversation =====")
for i, msg in enumerate(outputs[-1], start=1):
name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")
"""
Sample Output:
===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -1,103 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import (
AgentExecutorResponse,
Executor,
Message,
WorkflowContext,
handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
"""
Sample: Sequential workflow mixing agents and a custom summarizer executor
This demonstrates how SequentialBuilder chains participants with a shared
conversation context (list[Message]). An agent produces content; a custom
executor appends a compact summary to the conversation. The workflow completes
after all participants have executed in sequence, and the final output contains
the complete conversation.
Custom executor contract:
- Provide at least one @handler accepting AgentExecutorResponse and a WorkflowContext[list[Message]]
- Emit the updated conversation via ctx.send_message([...])
Prerequisites:
- Azure OpenAI access configured for AzureOpenAIChatClient (use az login + env vars)
"""
class Summarizer(Executor):
"""Simple summarizer: consumes full conversation and appends an assistant summary."""
@handler
async def summarize(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext[list[Message]]) -> None:
"""Append a summary message to a copy of the full conversation.
Note: A custom executor must be able to handle the message type from the prior participant, and produce
the message type expected by the next participant. In this case, the prior participant is an agent thus
the input is AgentExecutorResponse (an agent will be wrapped in an AgentExecutor, which produces
`AgentExecutorResponse`). If the next participant is also an agent or this is the final participant,
the output must be `list[Message]`.
"""
if not agent_response.full_conversation:
await ctx.send_message([Message("assistant", ["No conversation to summarize."])])
return
users = sum(1 for m in agent_response.full_conversation if m.role == "user")
assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
final_conversation = list(agent_response.full_conversation) + [summary]
await ctx.send_message(final_conversation)
async def main() -> None:
# 1) Create a content agent
client = AzureOpenAIChatClient(credential=AzureCliCredential())
content = client.as_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# 2) Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()
# 3) Run workflow and extract final conversation
events = await workflow.run("Explain the benefits of budget eBikes for commuters.")
outputs = events.get_outputs()
if outputs:
print("===== Final Conversation =====")
messages: list[Message] | Any = outputs[0]
for i, msg in enumerate(messages, start=1):
name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")
"""
Sample Output:
------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -5,7 +5,7 @@
# ]
# ///
# Run with any PEP 723 compatible runner, e.g.:
# uv run samples/getting_started/agents/copilotstudio/copilotstudio_with_explicit_settings.py
# uv run samples/02-agents/providers/copilotstudio/copilotstudio_with_explicit_settings.py
# Copyright (c) Microsoft. All rights reserved.
@@ -34,8 +34,8 @@ Key Concepts:
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
# See:
# samples/getting_started/tools/function_tool_with_approval.py
# samples/getting_started/tools/function_tool_with_approval_and_sessions.py.
# samples/02-agents/tools/function_tool_with_approval.py
# samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def process_refund(order_number: Annotated[str, "Order number to process refund for"]) -> str:
"""Simulated function to process a refund for a given order number."""
@@ -11,37 +11,26 @@ from the streaming workflow events.
Verifies GitHub issue #2718: files generated by code interpreter in
HandoffBuilder workflows can be properly retrieved.
Toggle USE_V2_CLIENT to switch between:
- V1: AzureAIAgentClient (azure-ai-agents SDK)
- V2: AzureAIClient (azure-ai-projects 2.x with Responses API)
IMPORTANT: When using V2 AzureAIClient with HandoffBuilder, each agent must
have its own client instance. The V2 client binds to a single server-side
agent name, so sharing a client between agents causes routing issues.
Prerequisites:
- AZURE_AI_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- `az login` (Azure CLI authentication)
- V1: AZURE_AI_AGENT_PROJECT_CONNECTION_STRING
- V2: AZURE_AI_PROJECT_ENDPOINT, AZURE_AI_MODEL_DEPLOYMENT_NAME
- AZURE_AI_MODEL_DEPLOYMENT_NAME
"""
import asyncio
from collections.abc import AsyncIterable, AsyncIterator
from contextlib import asynccontextmanager
import os
from collections.abc import AsyncIterable
from typing import cast
from agent_framework import (
Agent,
AgentResponseUpdate,
Message,
WorkflowEvent,
WorkflowRunState,
)
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.orchestrations import HandoffAgentUserRequest, HandoffBuilder
from azure.identity.aio import AzureCliCredential
# Toggle between V1 (AzureAIAgentClient) and V2 (AzureAIClient)
USE_V2_CLIENT = False
from azure.identity import AzureCliCredential
async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]:
@@ -66,7 +55,7 @@ def _handle_events(events: list[WorkflowEvent]) -> tuple[list[WorkflowEvent[Hand
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
}:
print(f"[status] {event.state.name}")
print(f"[status] {event.state}")
elif event.type == "request_info" and isinstance(event.data, HandoffAgentUserRequest):
requests.append(cast(WorkflowEvent[HandoffAgentUserRequest], event))
elif event.type == "output":
@@ -81,8 +70,8 @@ def _handle_events(events: list[WorkflowEvent]) -> tuple[list[WorkflowEvent[Hand
file_id = annotation["file_id"] # type: ignore
file_ids.append(file_id)
print(f"[Found file annotation: file_id={file_id}]")
elif event.type == "output":
conversation = cast(list[Message], event.data)
elif isinstance(data, list):
conversation = cast(list[Message], data)
if isinstance(conversation, list):
print("\n=== Final Conversation Snapshot ===")
for message in conversation:
@@ -93,148 +82,104 @@ def _handle_events(events: list[WorkflowEvent]) -> tuple[list[WorkflowEvent[Hand
return requests, file_ids
@asynccontextmanager
async def create_agents_v1(credential: AzureCliCredential) -> AsyncIterator[tuple[Agent, Agent]]:
"""Create agents using V1 AzureAIAgentClient."""
from agent_framework.azure import AzureAIAgentClient
async with AzureAIAgentClient(credential=credential) as client:
triage = client.as_agent(
name="triage_agent",
instructions=(
"You are a triage agent. Route code-related requests to the code_specialist. "
"When the user asks to create or generate files, hand off to code_specialist "
"by calling handoff_to_code_specialist."
),
)
# Create code interpreter tool using instance method
code_interpreter_tool = client.get_code_interpreter_tool()
code_specialist = client.as_agent(
name="code_specialist",
instructions=(
"You are a Python code specialist. Use the code interpreter to execute Python code "
"and create files when requested. Always save files to /mnt/data/ directory."
),
tools=[code_interpreter_tool],
)
yield triage, code_specialist # type: ignore
@asynccontextmanager
async def create_agents_v2(credential: AzureCliCredential) -> AsyncIterator[tuple[Agent, Agent]]:
"""Create agents using V2 AzureAIClient.
Each agent needs its own client instance because the V2 client binds
to a single server-side agent name.
"""
from agent_framework.azure import AzureAIClient
async with (
AzureAIClient(credential=credential) as triage_client,
AzureAIClient(credential=credential) as code_client,
):
triage = triage_client.as_agent(
name="TriageAgent",
instructions="You are a triage agent. Your ONLY job is to route requests to the appropriate specialist.",
)
# Create code interpreter tool using instance method
code_interpreter_tool = code_client.get_code_interpreter_tool()
code_specialist = code_client.as_agent(
name="CodeSpecialist",
instructions=(
"You are a Python code specialist. You have access to a code interpreter tool. "
"Use the code interpreter to execute Python code and create files. "
"Always save files to /mnt/data/ directory. "
"Do NOT discuss handoffs or routing - just complete the coding task directly."
),
tools=[code_interpreter_tool],
)
yield triage, code_specialist
async def main() -> None:
"""Run a simple handoff workflow with code interpreter file generation."""
client_version = "V2 (AzureAIClient)" if USE_V2_CLIENT else "V1 (AzureAIAgentClient)"
print(f"=== Handoff Workflow with Code Interpreter File Generation [{client_version}] ===\n")
print("=== Handoff Workflow with Code Interpreter File Generation ===\n")
async with AzureCliCredential() as credential:
create_agents = create_agents_v2 if USE_V2_CLIENT else create_agents_v1
client = AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
)
async with create_agents(credential) as (triage, code_specialist):
workflow = (
HandoffBuilder(
termination_condition=lambda conv: sum(1 for msg in conv if msg.role == "user") >= 2,
)
.participants([triage, code_specialist])
.with_start_agent(triage)
.build()
)
triage = client.as_agent(
name="triage_agent",
instructions=(
"You are a triage agent. Route code-related requests to the code_specialist. "
"When the user asks to create or generate files, hand off to code_specialist "
"by calling handoff_to_code_specialist."
),
)
user_inputs = [
"Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.",
"exit",
]
input_index = 0
all_file_ids: list[str] = []
code_interpreter_tool = client.get_code_interpreter_tool()
print(f"User: {user_inputs[0]}")
events = await _drain(workflow.run(user_inputs[0], stream=True))
requests, file_ids = _handle_events(events)
all_file_ids.extend(file_ids)
input_index += 1
code_specialist = client.as_agent(
name="code_specialist",
instructions=(
"You are a Python code specialist. Use the code interpreter to execute Python code "
"and create files when requested. Always save files to /mnt/data/ directory."
),
tools=[code_interpreter_tool],
)
while requests:
request = requests[0]
if input_index >= len(user_inputs):
break
user_input = user_inputs[input_index]
print(f"\nUser: {user_input}")
workflow = (
HandoffBuilder(
termination_condition=lambda conv: sum(1 for msg in conv if msg.role == "user") >= 2,
)
.participants([triage, code_specialist])
.with_start_agent(triage)
.build()
)
responses = {request.request_id: HandoffAgentUserRequest.create_response(user_input)}
events = await _drain(workflow.run(stream=True, responses=responses))
requests, file_ids = _handle_events(events)
all_file_ids.extend(file_ids)
input_index += 1
user_inputs = [
"Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.",
"exit",
]
input_index = 0
all_file_ids: list[str] = []
print("\n" + "=" * 50)
if all_file_ids:
print(f"SUCCESS: Found {len(all_file_ids)} file ID(s) in handoff workflow:")
for fid in all_file_ids:
print(f" - {fid}")
else:
print("WARNING: No file IDs captured from the handoff workflow.")
print("=" * 50)
print(f"User: {user_inputs[0]}")
events = await _drain(workflow.run(user_inputs[0], stream=True))
requests, file_ids = _handle_events(events)
all_file_ids.extend(file_ids)
input_index += 1
"""
Sample Output:
while requests:
request = requests[0]
if input_index >= len(user_inputs):
break
user_input = user_inputs[input_index]
print(f"\nUser: {user_input}")
User: Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.
[Found HostedFileContent: file_id=assistant-JT1sA...]
responses = {request.request_id: HandoffAgentUserRequest.create_response(user_input)}
events = await _drain(workflow.run(stream=True, responses=responses))
requests, file_ids = _handle_events(events)
all_file_ids.extend(file_ids)
input_index += 1
=== Conversation So Far ===
- user: Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.
- triage_agent: I am handing off your request to create the text file "hello.txt" with the specified content to the code specialist. They will assist you shortly.
- code_specialist: The file "hello.txt" has been created with the content "Hello from handoff workflow!". You can download it using the link below:
print("\n" + "=" * 50)
if all_file_ids:
print(f"SUCCESS: Found {len(all_file_ids)} file ID(s) in handoff workflow:")
for fid in all_file_ids:
print(f" - {fid}")
else:
print("WARNING: No file IDs captured from the handoff workflow.")
print("=" * 50)
[hello.txt](sandbox:/mnt/data/hello.txt)
===========================
"""
Sample Output:
[status] IDLE_WITH_PENDING_REQUESTS
User: Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.
[Found HostedFileContent: file_id=assistant-JT1sA...]
User: exit
[status] IDLE
=== Conversation So Far ===
- user: Please create a text file called hello.txt with 'Hello from handoff workflow!' inside it.
- triage_agent: I am handing off your request to create the text file "hello.txt" with the specified content to the code specialist. They will assist you shortly.
- code_specialist: The file "hello.txt" has been created with the content "Hello from handoff workflow!". You can download it using the link below:
==================================================
SUCCESS: Found 1 file ID(s) in handoff workflow:
- assistant-JT1sA...
==================================================
""" # noqa: E501
[hello.txt](sandbox:/mnt/data/hello.txt)
===========================
[status] IDLE_WITH_PENDING_REQUESTS
User: exit
[status] IDLE
==================================================
SUCCESS: Found 1 file ID(s) in handoff workflow:
- assistant-JT1sA...
==================================================
""" # noqa: E501
if __name__ == "__main__":
@@ -5,7 +5,7 @@
# ]
# ///
# Run with any PEP 723 compatible runner, e.g.:
# uv run samples/getting_started/evaluation/self_reflection/self_reflection.py
# uv run samples/05-end-to-end/evaluation/self_reflection/self_reflection.py
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
@@ -80,13 +80,15 @@ def create_eval(client: openai.OpenAI, judge_model: str) -> openai.types.EvalCre
"include_sample_schema": True,
})
testing_criteria = [{
"type": "azure_ai_evaluator",
"name": "groundedness",
"evaluator_name": "builtin.groundedness",
"data_mapping": {"query": "{{item.query}}", "response": "{{item.response}}", "context": "{{item.context}}"},
"initialization_parameters": {"deployment_name": f"{judge_model}"},
}]
testing_criteria = [
{
"type": "azure_ai_evaluator",
"name": "groundedness",
"evaluator_name": "builtin.groundedness",
"data_mapping": {"query": "{{item.query}}", "response": "{{item.response}}", "context": "{{item.context}}"},
"initialization_parameters": {"deployment_name": f"{judge_model}"},
}
]
return client.evals.create(
name="Eval",
@@ -96,11 +98,11 @@ def create_eval(client: openai.OpenAI, judge_model: str) -> openai.types.EvalCre
def run_eval(
client: openai.OpenAI,
eval_object: openai.types.EvalCreateResponse,
query: str,
response: str,
context: str,
client: openai.OpenAI,
eval_object: openai.types.EvalCreateResponse,
query: str,
response: str,
context: str,
):
eval_run_object = client.evals.runs.create(
eval_id=eval_object.id,
@@ -129,7 +131,9 @@ def run_eval(
for _ in range(0, MAX_RETRY):
run = client.evals.runs.retrieve(run_id=eval_run_response.id, eval_id=eval_object.id)
if run.status == "failed":
print(f"Eval run failed. Run ID: {run.id}, Status: {run.status}, Error: {getattr(run, 'error', 'Unknown error')}")
print(
f"Eval run failed. Run ID: {run.id}, Status: {run.status}, Error: {getattr(run, 'error', 'Unknown error')}"
)
continue
if run.status == "completed":
return list(client.evals.runs.output_items.list(run_id=run.id, eval_id=eval_object.id))
@@ -201,7 +205,7 @@ async def execute_query_with_self_reflection(
continue
score = eval_run_output_items[0].results[0].score
end_time_eval = time.time()
total_groundedness_eval_time += (end_time_eval - start_time_eval)
total_groundedness_eval_time += end_time_eval - start_time_eval
# Store score in structured format
iteration_scores.append(score)
@@ -259,7 +263,7 @@ async def run_self_reflection_batch(
judge_model: str = DEFAULT_JUDGE_MODEL,
max_self_reflections: int = 3,
env_file: str | None = None,
limit: int | None = None
limit: int | None = None,
):
"""
Run self-reflection on a batch of prompts.
@@ -298,8 +302,15 @@ async def run_self_reflection_batch(
print(f"Processing first {len(df)} prompts (limited by -n {limit})")
# Validate required columns
required_columns = ["system_instruction", "user_request", "context_document",
"full_prompt", "domain", "type", "high_level_type"]
required_columns = [
"system_instruction",
"user_request",
"context_document",
"full_prompt",
"domain",
"type",
"high_level_type",
]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Input file missing required columns: {missing_columns}")
@@ -341,13 +352,15 @@ async def run_self_reflection_batch(
"agent_response_model": agent_model,
"agent_response": result,
"error": None,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
}
results.append(result_data)
print(f" ✓ Completed with score: {result['best_response_score']}/5 "
f"(best at iteration {result['best_iteration']}/{result['num_retries']}, "
f"time: {result['total_end_to_end_time']:.1f}s)\n")
print(
f" ✓ Completed with score: {result['best_response_score']}/5 "
f"(best at iteration {result['best_iteration']}/{result['num_retries']}, "
f"time: {result['total_end_to_end_time']:.1f}s)\n"
)
except Exception as e:
print(f" ✗ Error: {str(e)}\n")
@@ -365,7 +378,7 @@ async def run_self_reflection_batch(
"agent_response_model": agent_model,
"agent_response": None,
"error": str(e),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
}
results.append(error_data)
continue
@@ -391,14 +404,20 @@ async def run_self_reflection_batch(
# Extract scores and iteration data from nested agent_response dict
best_scores = [r["best_response_score"] for r in successful_runs["agent_response"] if r is not None]
iterations = [r["best_iteration"] for r in successful_runs["agent_response"] if r is not None]
iteration_scores_list = [r["iteration_scores"] for r in successful_runs["agent_response"] if r is not None and "iteration_scores" in r]
iteration_scores_list = [
r["iteration_scores"]
for r in successful_runs["agent_response"]
if r is not None and "iteration_scores" in r
]
if best_scores:
avg_score = sum(best_scores) / len(best_scores)
perfect_scores = sum(1 for s in best_scores if s == 5)
print("\nGroundedness Scores:")
print(f" Average best score: {avg_score:.2f}/5")
print(f" Perfect scores (5/5): {perfect_scores}/{len(best_scores)} ({100 * perfect_scores / len(best_scores):.1f}%)")
print(
f" Perfect scores (5/5): {perfect_scores}/{len(best_scores)} ({100 * perfect_scores / len(best_scores):.1f}%)"
)
# Calculate improvement metrics
if iteration_scores_list:
@@ -416,7 +435,9 @@ async def run_self_reflection_batch(
print(f" Average first score: {avg_first_score:.2f}/5")
print(f" Average final score: {avg_last_score:.2f}/5")
print(f" Average improvement: +{avg_improvement:.2f}")
print(f" Responses that improved: {improved_count}/{len(improvements)} ({100 * improved_count / len(improvements):.1f}%)")
print(
f" Responses that improved: {improved_count}/{len(improvements)} ({100 * improved_count / len(improvements):.1f}%)"
)
# Show iteration statistics
if iterations:
@@ -432,13 +453,29 @@ async def run_self_reflection_batch(
async def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(description="Run self-reflection loop on LLM prompts with groundedness evaluation")
parser.add_argument("--input", "-i", default="resources/suboptimal_groundedness_prompts.jsonl", help="Input JSONL file with prompts")
parser.add_argument(
"--input", "-i", default="resources/suboptimal_groundedness_prompts.jsonl", help="Input JSONL file with prompts"
)
parser.add_argument("--output", "-o", default="resources/results.jsonl", help="Output JSONL file for results")
parser.add_argument("--agent-model", "-m", default=DEFAULT_AGENT_MODEL, help=f"Agent model deployment name (default: {DEFAULT_AGENT_MODEL})")
parser.add_argument("--judge-model", "-e", default=DEFAULT_JUDGE_MODEL, help=f"Judge model deployment name (default: {DEFAULT_JUDGE_MODEL})")
parser.add_argument("--max-reflections", type=int, default=3, help="Maximum number of self-reflection iterations (default: 3)")
parser.add_argument(
"--agent-model",
"-m",
default=DEFAULT_AGENT_MODEL,
help=f"Agent model deployment name (default: {DEFAULT_AGENT_MODEL})",
)
parser.add_argument(
"--judge-model",
"-e",
default=DEFAULT_JUDGE_MODEL,
help=f"Judge model deployment name (default: {DEFAULT_JUDGE_MODEL})",
)
parser.add_argument(
"--max-reflections", type=int, default=3, help="Maximum number of self-reflection iterations (default: 3)"
)
parser.add_argument("--env-file", help="Path to .env file with Azure OpenAI credentials")
parser.add_argument("--limit", "-n", type=int, default=None, help="Process only the first N prompts from the input file")
parser.add_argument(
"--limit", "-n", type=int, default=None, help="Process only the first N prompts from the input file"
)
args = parser.parse_args()
@@ -451,7 +488,7 @@ async def main():
judge_model=args.judge_model,
max_self_reflections=args.max_reflections,
env_file=args.env_file,
limit=args.limit
limit=args.limit,
)
print("\n✓ Processing complete!")