[BREAKING] Python: Move single-config fluent methods to constructor parameters (#3693)

* Move single-config fluent methods to constructor parameters

* Updates

* Adjust magentic and group chat
This commit is contained in:
Evan Mattson
2026-02-07 15:01:52 +09:00
committed by GitHub
Unverified
parent 5d355ac507
commit 74ac470a56
132 changed files with 1341 additions and 2386 deletions
@@ -77,7 +77,7 @@ async def run_agent_framework() -> None:
)
# Create sequential workflow
workflow = SequentialBuilder().participants([researcher, writer, editor]).build()
workflow = SequentialBuilder(participants=[researcher, writer, editor]).build()
# Run the workflow
print("[Agent Framework] Sequential conversation:")
@@ -137,7 +137,7 @@ async def run_agent_framework_with_cycle() -> None:
await context.send_message(AgentExecutorRequest(messages=response.full_conversation, should_respond=True))
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor=researcher)
.add_edge(researcher, writer)
.add_edge(writer, editor)
.add_edge(
@@ -145,7 +145,6 @@ async def run_agent_framework_with_cycle() -> None:
check_approval,
)
.add_edge(check_approval, researcher)
.set_start_executor(researcher)
.build()
)
@@ -85,18 +85,14 @@ async def run_agent_framework() -> None:
description="Expert in databases and SQL",
)
workflow = (
GroupChatBuilder()
.participants([python_expert, javascript_expert, database_expert])
.with_orchestrator(
agent=client.as_agent(
name="selector_manager",
instructions="Based on the conversation, select the most appropriate expert to respond next.",
),
)
.with_max_rounds(1)
.build()
)
workflow = GroupChatBuilder(
participants=[python_expert, javascript_expert, database_expert],
max_rounds=1,
orchestrator_agent=client.as_agent(
name="selector_manager",
instructions="Based on the conversation, select the most appropriate expert to respond next.",
),
).build()
# Run with a question that requires expert selection
print("[Agent Framework] Group chat conversation:")
@@ -138,10 +138,10 @@ async def run_agent_framework() -> None:
HandoffBuilder(
name="support_handoff",
participants=[triage_agent, billing_agent, tech_support],
termination_condition=lambda conv: sum(1 for msg in conv if msg.role == "user") > 3,
)
.with_start_agent(triage_agent)
.add_handoff(triage_agent, [billing_agent, tech_support])
.with_termination_condition(lambda conv: sum(1 for msg in conv if msg.role == "user") > 3)
.build()
)
@@ -91,21 +91,17 @@ async def run_agent_framework() -> None:
)
# Create Magentic workflow
workflow = (
MagenticBuilder()
.participants([researcher, coder, reviewer])
.with_manager(
agent=client.as_agent(
name="magentic_manager",
instructions="You coordinate a team to complete complex tasks efficiently.",
description="Orchestrator for team coordination",
),
max_round_count=20,
max_stall_count=3,
max_reset_count=1,
)
.build()
)
workflow = MagenticBuilder(
participants=[researcher, coder, reviewer],
manager_agent=client.as_agent(
name="magentic_manager",
instructions="You coordinate a team to complete complex tasks efficiently.",
description="Orchestrator for team coordination",
),
max_round_count=20,
max_stall_count=3,
max_reset_count=1,
).build()
# Run complex task
last_message_id: str | None = None
@@ -31,7 +31,7 @@ def main():
)
# Build a concurrent workflow
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
# Convert the workflow to an agent
workflow_agent = workflow.as_agent()
@@ -319,8 +319,7 @@ async def _create_workflow(project_client, credential):
# 7. booking_info_aggregation, booking_payment, activity_search → final_coordinator (final aggregation, fan-in)
workflow = (
WorkflowBuilder(name="Travel Planning Workflow")
.set_start_executor(start_executor)
WorkflowBuilder(name="Travel Planning Workflow", start_executor=start_executor)
.add_edge(start_executor, travel_request_handler)
.add_fan_out_edges(travel_request_handler, [hotel_search_agent, flight_search_agent, activity_search_agent])
.add_edge(hotel_search_agent, booking_info_aggregation_agent)
@@ -8,7 +8,6 @@ from agent_framework import (
AgentResponseUpdate,
ChatAgent,
CitationAnnotation,
Content,
HostedCodeInterpreterTool,
HostedFileContent,
TextContent,
@@ -4,7 +4,7 @@ import asyncio
import base64
import anyio
from agent_framework import Content, HostedImageGenerationTool
from agent_framework import HostedImageGenerationTool
from agent_framework.openai import OpenAIResponsesClient
"""OpenAI Responses Client Streaming Image Generation Example
@@ -662,8 +662,8 @@ def create_complex_workflow():
WorkflowBuilder(
name="Data Processing Pipeline",
description="Complex workflow with parallel validation, transformation, and quality assurance stages",
start_executor=data_ingestion,
)
.set_start_executor(data_ingestion)
# Fan-out to validation stage
.add_fan_out_edges(data_ingestion, [schema_validator, quality_validator, security_validator])
# Fan-in from validation to aggregator
@@ -102,8 +102,8 @@ def main():
WorkflowBuilder(
name="Text Transformer",
description="Simple 2-step workflow that converts text to uppercase and adds exclamation",
start_executor=upper_executor,
)
.set_start_executor(upper_executor)
.add_edge(upper_executor, exclaim_executor)
.build()
)
@@ -392,13 +392,13 @@ legitimate_message_handler = LegitimateMessageHandler(id="legitimate_message_han
final_processor = FinalProcessor(id="final_processor")
# Build the comprehensive 4-step workflow with branching logic and HIL support
# Note: No .with_checkpointing() call - DevUI will pass checkpoint_storage at runtime
# Note: No checkpoint_storage in constructor - DevUI will pass checkpoint_storage at runtime
workflow = (
WorkflowBuilder(
name="Email Spam Detector",
description="4-step email classification workflow with human-in-the-loop spam approval",
start_executor=email_preprocessor,
)
.set_start_executor(email_preprocessor)
.add_edge(email_preprocessor, spam_detector)
# HIL handled within spam_detector via @response_handler
# Continue with branching logic after human approval
@@ -132,8 +132,8 @@ workflow = (
WorkflowBuilder(
name="Content Review Workflow",
description="Multi-agent content creation workflow with quality-based routing (Writer → Reviewer → Editor/Publisher)",
start_executor=writer,
)
.set_start_executor(writer)
.add_edge(writer, reviewer)
# Branch 1: High quality (>= 80) goes directly to publisher
.add_edge(reviewer, publisher, condition=is_approved)
@@ -81,9 +81,8 @@ async def run_sequential_workflow() -> None:
# Step 2: Build the workflow with the defined edges.
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor=upper_case_executor)
.add_edge(upper_case_executor, reverse_text_executor)
.set_start_executor(upper_case_executor)
.build()
)
@@ -17,7 +17,7 @@ The default aggregator fans in their results and yields output containing
a list[ChatMessage] representing the concatenated conversations from all agents.
Demonstrates:
- Minimal wiring with ConcurrentBuilder().participants([...]).build()
- Minimal wiring with ConcurrentBuilder(participants=[...]).build()
- Fan-out to multiple agents, fan-in aggregation of final ChatMessages
- Workflow completion when idle with no pending work
@@ -57,7 +57,7 @@ async def main() -> None:
# 2) Build a concurrent workflow
# Participants are either Agents (type of SupportsAgentRun) or Executors
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
# 3) Run with a single prompt and pretty-print the final combined messages
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
@@ -27,7 +27,7 @@ ConcurrentBuilder API and the default aggregator.
Demonstrates:
- Executors that create their ChatAgent in __init__ (via AzureOpenAIChatClient)
- A @handler that converts AgentExecutorRequest -> AgentExecutorResponse
- ConcurrentBuilder().participants([...]) to build fan-out/fan-in
- ConcurrentBuilder(participants=[...]) to build fan-out/fan-in
- Default aggregator returning list[ChatMessage] (one user + one assistant per agent)
- Workflow completion when all participants become idle
@@ -103,7 +103,7 @@ async def main() -> None:
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()
@@ -18,7 +18,7 @@ to synthesize a concise, consolidated summary from the experts' outputs.
The workflow completes when all participants become idle.
Demonstrates:
- ConcurrentBuilder().participants([...]).with_aggregator(callback)
- ConcurrentBuilder(participants=[...]).with_aggregator(callback)
- Fan-out to agents and fan-in at an aggregator
- Aggregation implemented via an LLM call (chat_client.get_response)
- Workflow output yielded with the synthesized summary string
@@ -87,7 +87,7 @@ async def main() -> None:
# • Custom callback -> return value becomes workflow output (string here)
# The callback can be sync or async; it receives list[AgentExecutorResponse].
workflow = (
ConcurrentBuilder().participants([researcher, marketer, legal]).with_aggregator(summarize_results).build()
ConcurrentBuilder(participants=[researcher, marketer, legal]).with_aggregator(summarize_results).build()
)
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
@@ -33,7 +33,7 @@ instances created by the same builder. This is particularly useful when you need
requests or tasks in parallel with stateful participants.
Demonstrates:
- ConcurrentBuilder().register_participants([...]).with_aggregator(callback)
- ConcurrentBuilder(participant_factories=[...]).with_aggregator(callback)
- Fan-out to agents and fan-in at an aggregator
- Aggregation implemented via an LLM call (chat_client.get_response)
- Workflow output yielded with the synthesized summary string
@@ -125,8 +125,7 @@ async def main() -> None:
# SupportsAgentRun (agents) or Executor instances.
# - register_aggregator(...) takes a factory function that returns an Executor instance.
concurrent_builder = (
ConcurrentBuilder()
.register_participants([create_researcher, create_marketer, create_legal])
ConcurrentBuilder(participant_factories=[create_researcher, create_marketer, create_legal])
.register_aggregator(SummarizationExecutor)
)
@@ -65,16 +65,20 @@ async def main() -> None:
)
# Build the group chat workflow
# termination_condition: stop after 4 assistant messages
# (The agent orchestrator will intelligently decide when to end before this limit but just in case)
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = (
GroupChatBuilder()
.with_orchestrator(agent=orchestrator_agent)
.participants([researcher, writer])
GroupChatBuilder(
participants=[researcher, writer],
termination_condition=lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 4,
intermediate_outputs=True,
orchestrator_agent=orchestrator_agent,
)
# Set a hard termination condition: stop after 4 assistant messages
# The agent orchestrator will intelligently decide when to end before this limit but just in case
.with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 4)
# Enable intermediate outputs to observe the conversation as it unfolds
# Intermediate outputs will be emitted as WorkflowEvent with type "output" events
.with_intermediate_outputs()
.build()
)
@@ -207,14 +207,17 @@ Share your perspective authentically. Feel free to:
chat_client=_get_chat_client(),
)
# termination_condition: stop after 10 assistant messages
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = (
GroupChatBuilder()
.with_orchestrator(agent=moderator)
.participants([farmer, developer, teacher, activist, spiritual_leader, artist, immigrant, doctor])
GroupChatBuilder(
participants=[farmer, developer, teacher, activist, spiritual_leader, artist, immigrant, doctor],
termination_condition=lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 10,
intermediate_outputs=True,
orchestrator_agent=moderator,
)
.with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 10)
# Enable intermediate outputs to observe the conversation as it unfolds
# Intermediate outputs will be emitted as WorkflowEvent with type "output" events
.with_intermediate_outputs()
.build()
)
@@ -16,7 +16,7 @@ from azure.identity import AzureCliCredential
Sample: Group Chat with a round-robin speaker selector
What it does:
- Demonstrates the with_orchestrator() API for GroupChat orchestration
- Demonstrates the selection_func parameter for GroupChat orchestration
- Uses a pure Python function to control speaker selection based on conversation state
Prerequisites:
@@ -80,19 +80,26 @@ async def main() -> None:
)
# Build the group chat workflow
# termination_condition: stop after 6 messages (user task + one full rounds + 1)
# One round is expert -> verifier -> clarifier -> skeptic, after which the expert gets to respond again.
# This will end the conversation after the expert has spoken 2 times (one iteration loop)
# Note: it's possible that the expert gets it right the first time and the other participants
# have nothing to add, but for demo purposes we want to see at least one full round of interaction.
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = (
GroupChatBuilder()
.participants([expert, verifier, clarifier, skeptic])
.with_orchestrator(selection_func=round_robin_selector)
GroupChatBuilder(
participants=[expert, verifier, clarifier, skeptic],
termination_condition=lambda conversation: len(conversation) >= 6,
intermediate_outputs=True,
selection_func=round_robin_selector,
)
# Set a hard termination condition: stop after 6 messages (user task + one full rounds + 1)
# One round is expert -> verifier -> clarifier -> skeptic, after which the expert gets to respond again.
# This will end the conversation after the expert has spoken 2 times (one iteration loop)
# Note: it's possible that the expert gets it right the first time and the other participants
# have nothing to add, but for demo purposes we want to see at least one full round of interaction.
.with_termination_condition(lambda conversation: len(conversation) >= 6)
# Enable intermediate outputs to observe the conversation as it unfolds
# Intermediate outputs will be emitted as WorkflowEvent with type "output" events
.with_intermediate_outputs()
.build()
)
@@ -78,10 +78,15 @@ async def main() -> None:
# Build the workflow with autonomous mode
# In autonomous mode, agents continue iterating until they invoke a handoff tool
# termination_condition: Terminate after coordinator provides 5 assistant responses
workflow = (
HandoffBuilder(
name="autonomous_iteration_handoff",
participants=[coordinator, research_agent, summary_agent],
termination_condition=lambda conv: sum(
1 for msg in conv if msg.author_name == "coordinator" and msg.role == "assistant"
)
>= 5,
)
.with_start_agent(coordinator)
.add_handoff(coordinator, [research_agent, summary_agent])
@@ -98,10 +103,6 @@ async def main() -> None:
resolve_agent_id(summary_agent): 5,
}
)
.with_termination_condition(
# Terminate after coordinator provides 5 assistant responses
lambda conv: sum(1 for msg in conv if msg.author_name == "coordinator" and msg.role == "assistant") >= 5
)
.build()
)
@@ -217,6 +217,9 @@ async def _run_workflow(workflow: Workflow, user_inputs: list[str]) -> None:
async def main() -> None:
"""Run the autonomous handoff workflow with participant factories."""
# Build the handoff workflow using participant factories
# termination_condition: Custom termination that checks if the triage agent has provided a closing message.
# This looks for the last message being from triage_agent and containing "welcome",
# which indicates the conversation has concluded naturally.
workflow_builder = (
HandoffBuilder(
name="Autonomous Handoff with Participant Factories",
@@ -226,18 +229,13 @@ async def main() -> None:
"order_status": create_order_status_agent,
"return": create_return_agent,
},
)
.with_start_agent("triage")
.with_termination_condition(
# Custom termination: Check if the triage agent has provided a closing message.
# This looks for the last message being from triage_agent and containing "welcome",
# which indicates the conversation has concluded naturally.
lambda conversation: (
termination_condition=lambda conversation: (
len(conversation) > 0
and conversation[-1].author_name == "triage_agent"
and "welcome" in conversation[-1].text.lower()
)
),
)
.with_start_agent("triage")
)
# Scripted user responses for reproducible demo
@@ -198,7 +198,7 @@ async def main() -> None:
# - participants: All agents that can participate in the workflow
# - with_start_agent: The triage agent is designated as the start agent, which means
# it receives all user input first and orchestrates handoffs to specialists
# - with_termination_condition: Custom logic to stop the request/response loop.
# - termination_condition: Custom logic to stop the request/response loop.
# Without this, the default behavior continues requesting user input until max_turns
# is reached. Here we use a custom condition that checks if the conversation has ended
# naturally (when one of the agents says something like "you're welcome").
@@ -206,14 +206,14 @@ async def main() -> None:
HandoffBuilder(
name="customer_support_handoff",
participants=[triage, refund, order, support],
)
.with_start_agent(triage)
.with_termination_condition(
# Custom termination: Check if one of the agents has provided a closing message.
# This looks for the last message containing "welcome", which indicates the
# conversation has concluded naturally.
lambda conversation: len(conversation) > 0 and "welcome" in conversation[-1].text.lower()
termination_condition=lambda conversation: (
len(conversation) > 0 and "welcome" in conversation[-1].text.lower()
),
)
.with_start_agent(triage)
.build()
)
@@ -163,10 +163,11 @@ async def main() -> None:
async with create_agents(credential) as (triage, code_specialist):
workflow = (
HandoffBuilder()
HandoffBuilder(
termination_condition=lambda conv: sum(1 for msg in conv if msg.role == "user") >= 2,
)
.participants([triage, code_specialist])
.with_start_agent(triage)
.with_termination_condition(lambda conv: sum(1 for msg in conv if msg.role == "user") >= 2)
.build()
)
@@ -72,20 +72,16 @@ async def main() -> None:
print("\nBuilding Magentic Workflow...")
workflow = (
MagenticBuilder()
.participants([researcher_agent, coder_agent])
.with_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
# Enable intermediate outputs to observe the conversation as it unfolds
# Intermediate outputs will be emitted as WorkflowEvent events
.with_intermediate_outputs()
.build()
)
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = MagenticBuilder(
participants=[researcher_agent, coder_agent],
intermediate_outputs=True,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
).build()
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
@@ -76,18 +76,14 @@ def build_workflow(checkpoint_storage: FileCheckpointStorage):
# The builder wires in the Magentic orchestrator, sets the plan review path, and
# stores the checkpoint backend so the runtime knows where to persist snapshots.
return (
MagenticBuilder()
.participants([researcher, writer])
.with_plan_review()
.with_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
)
.with_checkpointing(checkpoint_storage)
.build()
)
return MagenticBuilder(
participants=[researcher, writer],
enable_plan_review=True,
checkpoint_storage=checkpoint_storage,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=3,
).build()
async def main() -> None:
@@ -115,22 +115,18 @@ async def main() -> None:
print("\nBuilding Magentic Workflow with Human Plan Review...")
workflow = (
MagenticBuilder()
.participants([researcher_agent, analyst_agent])
.with_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=1,
max_reset_count=2,
)
# Request human input for plan review
.with_plan_review()
# Enable intermediate outputs to observe the conversation as it unfolds
# Intermediate outputs will be emitted as WorkflowEvent with type "output"
.with_intermediate_outputs()
.build()
)
# enable_plan_review=True: Request human input for plan review
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = MagenticBuilder(
participants=[researcher_agent, analyst_agent],
enable_plan_review=True,
intermediate_outputs=True,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=1,
max_reset_count=2,
).build()
task = "Research sustainable aviation fuel technology and summarize the findings."
@@ -43,7 +43,7 @@ async def main() -> None:
)
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder().participants([writer, reviewer]).build()
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
# 3) Run and collect outputs
outputs: list[list[ChatMessage]] = []
@@ -66,7 +66,7 @@ async def main() -> None:
# 2) Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder().participants([content, summarizer]).build()
workflow = SequentialBuilder(participants=[content, summarizer]).build()
# 3) Run workflow and extract final conversation
events = await workflow.run("Explain the benefits of budget eBikes for commuters.")
@@ -70,7 +70,7 @@ async def run_workflow(workflow: Workflow, query: str) -> None:
async def main() -> None:
# 1) Create a builder with participant factories
builder = SequentialBuilder().register_participants([
builder = SequentialBuilder(participant_factories=[
lambda: Accumulate("accumulator"),
create_agent,
])
@@ -160,10 +160,10 @@ async def main():
upper_case = UpperCase(id="upper_case_executor")
# Build the workflow using a fluent pattern:
# 1) add_edge(from_node, to_node) defines a directed edge upper_case -> reverse_text
# 2) set_start_executor(node) declares the entry point
# 1) start_executor=... in constructor declares the entry point
# 2) add_edge(from_node, to_node) defines a directed edge upper_case -> reverse_text
# 3) build() finalizes and returns an immutable Workflow object
workflow1 = WorkflowBuilder().add_edge(upper_case, reverse_text).set_start_executor(upper_case).build()
workflow1 = WorkflowBuilder(start_executor=upper_case).add_edge(upper_case, reverse_text).build()
# Run the workflow by sending the initial message to the start node.
# The run(...) call returns an event collection; its get_outputs() method
@@ -181,10 +181,9 @@ async def main():
# exclamation_adder uses @handler(input=str, output=str) to
# explicitly declare types instead of relying on introspection.
workflow2 = (
WorkflowBuilder()
WorkflowBuilder(start_executor=upper_case)
.add_edge(upper_case, exclamation_adder)
.add_edge(exclamation_adder, reverse_text)
.set_start_executor(upper_case)
.build()
)
@@ -45,8 +45,8 @@ async def main():
)
# Build the workflow using the fluent builder.
# Set the start node and connect an edge from writer to reviewer.
workflow = WorkflowBuilder().set_start_executor(writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Set the start node via constructor and connect an edge from writer to reviewer.
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Run the workflow with the user's initial message.
# For foundational clarity, use run (non streaming) and print the terminal event.
@@ -44,8 +44,8 @@ async def main():
)
# Build the workflow using the fluent builder.
# Set the start node and connect an edge from writer to reviewer.
workflow = WorkflowBuilder().set_start_executor(writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Set the start node via constructor and connect an edge from writer to reviewer.
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Track the last author to format streaming output.
last_author: str | None = None
@@ -73,12 +73,11 @@ async def main():
# 4) set_start_executor(node) declares the entry point
# 5) build() finalizes and returns an immutable Workflow object
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="UpperCase")
.register_executor(lambda: UpperCase(id="upper_case_executor"), name="UpperCase")
.register_executor(lambda: reverse_text, name="ReverseText")
.register_agent(create_agent, name="DecoderAgent")
.add_chain(["UpperCase", "ReverseText", "DecoderAgent"])
.set_start_executor("UpperCase")
.build()
)
@@ -38,8 +38,8 @@ async def main() -> None:
)
# Build the workflow by adding agents directly as edges.
# Agents adapt to workflow mode: run(stream=True) for complete responses, run() for incremental updates.
workflow = WorkflowBuilder().set_start_executor(writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Agents adapt to workflow mode: run(stream=True) for incremental updates, run() for complete responses.
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Track the last author to format streaming output.
last_author: str | None = None
@@ -71,7 +71,7 @@ async def main() -> None:
shared_thread.message_store = ChatMessageStore()
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="writer")
.register_agent(factory_func=lambda: writer, name="writer", agent_thread=shared_thread)
.register_agent(factory_func=lambda: reviewer, name="reviewer", agent_thread=shared_thread)
.register_executor(
@@ -79,7 +79,6 @@ async def main() -> None:
name="intercept_agent_response",
)
.add_chain(["writer", "intercept_agent_response", "reviewer"])
.set_start_executor("writer")
.build()
)
@@ -110,8 +110,7 @@ async def main() -> None:
)
workflow = (
WorkflowBuilder()
.set_start_executor(research_agent)
WorkflowBuilder(start_executor=research_agent)
.add_edge(research_agent, enrich_with_references)
.add_edge(enrich_with_references, final_editor_agent)
.build()
@@ -40,7 +40,7 @@ async def main():
# Build the workflow using the fluent builder.
# Set the start node and connect an edge from writer to reviewer.
# Agents adapt to workflow mode: run(stream=True) for incremental updates, run() for complete responses.
workflow = WorkflowBuilder().set_start_executor(writer_agent).add_edge(writer_agent, reviewer_agent).build()
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Track the last author to format streaming output.
last_author: str | None = None
@@ -240,7 +240,7 @@ async def main() -> None:
# Build the workflow.
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="writer_agent")
.register_agent(create_writer_agent, name="writer_agent")
.register_agent(create_final_editor_agent, name="final_editor_agent")
.register_executor(
@@ -251,7 +251,6 @@ async def main() -> None:
),
name="coordinator",
)
.set_start_executor("writer_agent")
.add_edge("writer_agent", "coordinator")
.add_edge("coordinator", "writer_agent")
.add_edge("final_editor_agent", "coordinator")
@@ -65,7 +65,7 @@ async def main() -> None:
)
# 2) Build a concurrent workflow
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
# 3) Expose the concurrent workflow as an agent for easy reuse
agent = workflow.as_agent(name="ConcurrentWorkflowAgent")
@@ -113,7 +113,7 @@ async def main():
# Build the workflow using the fluent builder.
# Set the start node and connect an edge from writer to reviewer.
workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build()
workflow = WorkflowBuilder(start_executor=writer).add_edge(writer, reviewer).build()
# Run the workflow with the user's initial message.
# For foundational clarity, use run (non streaming) and print the workflow output.
@@ -33,20 +33,16 @@ async def main() -> None:
chat_client=OpenAIResponsesClient(),
)
workflow = (
GroupChatBuilder()
.with_orchestrator(
agent=OpenAIChatClient().as_agent(
name="Orchestrator",
instructions="You coordinate a team conversation to solve the user's task.",
)
)
.participants([researcher, writer])
# Enable intermediate outputs to observe the conversation as it unfolds
# Intermediate outputs will be emitted as WorkflowEvent with type "output" events
.with_intermediate_outputs()
.build()
)
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = GroupChatBuilder(
participants=[researcher, writer],
intermediate_outputs=True,
orchestrator_agent=OpenAIChatClient().as_agent(
name="Orchestrator",
instructions="You coordinate a team conversation to solve the user's task.",
),
).build()
task = "Outline the core considerations for planning a community hackathon, and finish with a concise action plan."
@@ -156,7 +156,7 @@ async def main() -> None:
# - participants: All agents that can participate in the workflow
# - with_start_agent: The triage agent is designated as the start agent, which means
# it receives all user input first and orchestrates handoffs to specialists
# - with_termination_condition: Custom logic to stop the request/response loop.
# - termination_condition: Custom logic to stop the request/response loop.
# Without this, the default behavior continues requesting user input until max_turns
# is reached. Here we use a custom condition that checks if the conversation has ended
# naturally (when one of the agents says something like "you're welcome").
@@ -164,14 +164,14 @@ async def main() -> None:
HandoffBuilder(
name="customer_support_handoff",
participants=[triage, refund, order, support],
)
.with_start_agent(triage)
.with_termination_condition(
# Custom termination: Check if one of the agents has provided a closing message.
# This looks for the last message containing "welcome", which indicates the
# conversation has concluded naturally.
lambda conversation: len(conversation) > 0 and "welcome" in conversation[-1].text.lower()
termination_condition=lambda conversation: (
len(conversation) > 0 and "welcome" in conversation[-1].text.lower()
),
)
.with_start_agent(triage)
.build()
.as_agent() # Convert workflow to agent interface
)
@@ -50,20 +50,16 @@ async def main() -> None:
print("\nBuilding Magentic Workflow...")
workflow = (
MagenticBuilder()
.participants([researcher_agent, coder_agent])
.with_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
# Enable intermediate outputs to observe the conversation as it unfolds
# Intermediate outputs will be emitted as WorkflowEvent with type "output" events
.with_intermediate_outputs()
.build()
)
# intermediate_outputs=True: Enable intermediate outputs to observe the conversation as it unfolds
# (Intermediate outputs will be emitted as WorkflowOutputEvent events)
workflow = MagenticBuilder(
participants=[researcher_agent, coder_agent],
intermediate_outputs=True,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
).build()
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
@@ -40,7 +40,7 @@ async def main() -> None:
)
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder().participants([writer, reviewer]).build()
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
# 3) Treat the workflow itself as an agent for follow-up invocations
agent = workflow.as_agent(name="SequentialWorkflowAgent")
@@ -99,7 +99,7 @@ async def main() -> None:
# Build a workflow with bidirectional communication between Worker and Reviewer,
# and escalation paths for human review.
agent = (
WorkflowBuilder()
WorkflowBuilder(start_executor="worker")
.register_executor(
lambda: Worker(
id="sub-worker",
@@ -113,7 +113,6 @@ async def main() -> None:
)
.add_edge("worker", "reviewer") # Worker sends requests to Reviewer
.add_edge("reviewer", "worker") # Reviewer sends feedback to Worker
.set_start_executor("worker")
.build()
.as_agent() # Convert workflow into an agent interface
)
@@ -94,7 +94,7 @@ async def main() -> None:
)
# Build a sequential workflow
workflow = SequentialBuilder().participants([agent]).build()
workflow = SequentialBuilder(participants=[agent]).build()
# Expose the workflow as an agent using .as_agent()
workflow_agent = workflow.as_agent(name="WorkflowAgent")
@@ -187,7 +187,7 @@ async def main() -> None:
print("Building workflow with Worker ↔ Reviewer cycle...")
agent = (
WorkflowBuilder()
WorkflowBuilder(start_executor="worker")
.register_executor(
lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")),
name="worker",
@@ -198,7 +198,6 @@ async def main() -> None:
)
.add_edge("worker", "reviewer") # Worker sends responses to Reviewer
.add_edge("reviewer", "worker") # Reviewer provides feedback to Worker
.set_start_executor("worker")
.build()
.as_agent() # Wrap workflow as an agent
)
@@ -59,7 +59,7 @@ async def main() -> None:
)
# Build a sequential workflow: assistant -> summarizer
workflow = SequentialBuilder().register_participants([create_assistant, create_summarizer]).build()
workflow = SequentialBuilder(participant_factories=[create_assistant, create_summarizer]).build()
# Wrap the workflow as an agent
agent = workflow.as_agent(name="ConversationalWorkflowAgent")
@@ -130,7 +130,7 @@ async def demonstrate_thread_serialization() -> None:
instructions="You are a helpful assistant with good memory. Remember details from our conversation.",
)
workflow = SequentialBuilder().register_participants([create_assistant]).build()
workflow = SequentialBuilder(participant_factories=[create_assistant]).build()
agent = workflow.as_agent(name="MemoryWorkflowAgent")
# Create initial thread and have a conversation
@@ -179,7 +179,9 @@ def create_workflow(checkpoint_storage: FileCheckpointStorage) -> Workflow:
# module docstring. Because `WorkflowBuilder` is declarative, reading these
# edges is often the quickest way to understand execution order.
workflow_builder = (
WorkflowBuilder(max_iterations=6)
WorkflowBuilder(
max_iterations=6, start_executor="prepare_brief", checkpoint_storage=checkpoint_storage
)
.register_agent(
lambda: AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions="Write concise, warm release notes that sound human and helpful.",
@@ -190,11 +192,9 @@ def create_workflow(checkpoint_storage: FileCheckpointStorage) -> Workflow:
)
.register_executor(lambda: ReviewGateway(id="review_gateway", writer_id="writer"), name="review_gateway")
.register_executor(lambda: BriefPreparer(id="prepare_brief", agent_id="writer"), name="prepare_brief")
.set_start_executor("prepare_brief")
.add_edge("prepare_brief", "writer")
.add_edge("writer", "review_gateway")
.add_edge("review_gateway", "writer") # revisions loop
.with_checkpointing(checkpoint_storage=checkpoint_storage)
)
return workflow_builder.build()
@@ -104,16 +104,14 @@ class WorkerExecutor(Executor):
async def main():
# Build workflow with checkpointing enabled
checkpoint_storage = InMemoryCheckpointStorage()
workflow_builder = (
WorkflowBuilder()
WorkflowBuilder(start_executor="start", checkpoint_storage=checkpoint_storage)
.register_executor(lambda: StartExecutor(id="start"), name="start")
.register_executor(lambda: WorkerExecutor(id="worker"), name="worker")
.set_start_executor("start")
.add_edge("start", "worker")
.add_edge("worker", "worker") # Self-loop for iterative processing
)
checkpoint_storage = InMemoryCheckpointStorage()
workflow_builder = workflow_builder.with_checkpointing(checkpoint_storage=checkpoint_storage)
# Run workflow with automatic checkpoint recovery
latest_checkpoint: WorkflowCheckpoint | None = None
@@ -97,17 +97,16 @@ def create_workflow(checkpoint_storage: FileCheckpointStorage) -> tuple[Workflow
client = AzureOpenAIChatClient(credential=AzureCliCredential())
triage, refund, order = create_agents(client)
# checkpoint_storage: Enable checkpointing for resume
# termination_condition: Terminate after 5 user messages for this demo
workflow = (
HandoffBuilder(
name="checkpoint_handoff_demo",
participants=[triage, refund, order],
checkpoint_storage=checkpoint_storage,
termination_condition=lambda conv: sum(1 for msg in conv if msg.role == "user") >= 5,
)
.with_start_agent(triage)
.with_checkpointing(checkpoint_storage)
.with_termination_condition(
# Terminate after 5 user messages for this demo
lambda conv: sum(1 for msg in conv if msg.role == "user") >= 5
)
.build()
)
@@ -298,11 +298,10 @@ class LaunchCoordinator(Executor):
def build_sub_workflow() -> WorkflowExecutor:
"""Assemble the sub-workflow used by the parent workflow executor."""
sub_workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="writer")
.register_executor(DraftWriter, name="writer")
.register_executor(DraftReviewRouter, name="router")
.register_executor(DraftFinaliser, name="finaliser")
.set_start_executor("writer")
.add_edge("writer", "router")
.add_edge("router", "finaliser")
.add_edge("finaliser", "writer") # permits revision loops
@@ -315,13 +314,11 @@ def build_sub_workflow() -> WorkflowExecutor:
def build_parent_workflow(storage: FileCheckpointStorage) -> Workflow:
"""Assemble the parent workflow that embeds the sub-workflow."""
return (
WorkflowBuilder()
WorkflowBuilder(start_executor="coordinator", checkpoint_storage=storage)
.register_executor(LaunchCoordinator, name="coordinator")
.register_executor(build_sub_workflow, name="sub_executor")
.set_start_executor("coordinator")
.add_edge("coordinator", "sub_executor")
.add_edge("sub_executor", "coordinator")
.with_checkpointing(storage)
.build()
)
@@ -56,7 +56,7 @@ async def basic_checkpointing() -> None:
)
# Build sequential workflow with participant factories
workflow = SequentialBuilder().register_participants([create_assistant, create_reviewer]).build()
workflow = SequentialBuilder(participant_factories=[create_assistant, create_reviewer]).build()
agent = workflow.as_agent(name="CheckpointedAgent")
# Create checkpoint storage
@@ -93,7 +93,7 @@ async def checkpointing_with_thread() -> None:
instructions="You are a helpful assistant with good memory. Reference previous conversation when relevant.",
)
workflow = SequentialBuilder().register_participants([create_assistant]).build()
workflow = SequentialBuilder(participant_factories=[create_assistant]).build()
agent = workflow.as_agent(name="MemoryAgent")
# Create both thread (for conversation) and checkpoint storage (for workflow state)
@@ -137,7 +137,7 @@ async def streaming_with_checkpoints() -> None:
instructions="You are a helpful assistant.",
)
workflow = SequentialBuilder().register_participants([create_assistant]).build()
workflow = SequentialBuilder(participant_factories=[create_assistant]).build()
agent = workflow.as_agent(name="StreamingCheckpointAgent")
checkpoint_storage = InMemoryCheckpointStorage()
@@ -141,9 +141,8 @@ def create_sub_workflow() -> WorkflowExecutor:
print("🚀 Setting up sub-workflow...")
processing_workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="text_processor")
.register_executor(TextProcessor, name="text_processor")
.set_start_executor("text_processor")
.build()
)
@@ -155,10 +154,9 @@ async def main():
print("🔧 Setting up parent workflow...")
# Step 1: Create the parent workflow
main_workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="text_orchestrator")
.register_executor(TextProcessingOrchestrator, name="text_orchestrator")
.register_executor(create_sub_workflow, name="text_processor_workflow")
.set_start_executor("text_orchestrator")
.add_edge("text_orchestrator", "text_processor_workflow")
.add_edge("text_processor_workflow", "text_orchestrator")
.build()
@@ -88,7 +88,7 @@ async def main() -> None:
)
# Build the inner (sub) workflow with the agent
inner_workflow = SequentialBuilder().participants([inner_agent]).build()
inner_workflow = SequentialBuilder(participants=[inner_agent]).build()
# Wrap the inner workflow in a WorkflowExecutor to use it as a sub-workflow
subworkflow_executor = WorkflowExecutor(
@@ -97,7 +97,7 @@ async def main() -> None:
)
# Build the outer (parent) workflow containing the sub-workflow
outer_workflow = SequentialBuilder().participants([subworkflow_executor]).build()
outer_workflow = SequentialBuilder(participants=[subworkflow_executor]).build()
# Define custom context that will flow through to the sub-workflow's agent
user_token = {
@@ -170,12 +170,11 @@ def build_resource_request_distribution_workflow() -> Workflow:
raise ValueError("Received more responses than expected")
return (
WorkflowBuilder()
WorkflowBuilder(start_executor="orchestrator")
.register_executor(lambda: RequestDistribution("orchestrator"), name="orchestrator")
.register_executor(lambda: ResourceRequester("resource_requester"), name="resource_requester")
.register_executor(lambda: PolicyChecker("policy_checker"), name="policy_checker")
.register_executor(lambda: ResultCollector("result_collector"), name="result_collector")
.set_start_executor("orchestrator")
.add_edge("orchestrator", "resource_requester")
.add_edge("orchestrator", "policy_checker")
.add_edge("resource_requester", "result_collector")
@@ -289,7 +288,7 @@ class PolicyEngine(Executor):
async def main() -> None:
# Build the main workflow
main_workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="sub_workflow_executor")
.register_executor(lambda: ResourceAllocator("resource_allocator"), name="resource_allocator")
.register_executor(lambda: PolicyEngine("policy_engine"), name="policy_engine")
.register_executor(
@@ -303,7 +302,6 @@ async def main() -> None:
),
name="sub_workflow_executor",
)
.set_start_executor("sub_workflow_executor")
.add_edge("sub_workflow_executor", "resource_allocator")
.add_edge("resource_allocator", "sub_workflow_executor")
.add_edge("sub_workflow_executor", "policy_engine")
@@ -154,11 +154,10 @@ def build_email_address_validation_workflow() -> Workflow:
# Build the workflow
return (
WorkflowBuilder()
WorkflowBuilder(start_executor="email_sanitizer")
.register_executor(lambda: EmailSanitizer(id="email_sanitizer"), name="email_sanitizer")
.register_executor(lambda: EmailFormatValidator(id="email_format_validator"), name="email_format_validator")
.register_executor(lambda: DomainValidator(id="domain_validator"), name="domain_validator")
.set_start_executor("email_sanitizer")
.add_edge("email_sanitizer", "email_format_validator")
.add_edge("email_format_validator", "domain_validator")
.build()
@@ -270,7 +269,7 @@ async def main() -> None:
# Build the main workflow
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="smart_email_orchestrator")
.register_executor(
lambda: SmartEmailOrchestrator(id="smart_email_orchestrator", approved_domains=approved_domains),
name="smart_email_orchestrator",
@@ -280,7 +279,6 @@ async def main() -> None:
lambda: WorkflowExecutor(build_email_address_validation_workflow(), id="email_validation_workflow"),
name="email_validation_workflow",
)
.set_start_executor("smart_email_orchestrator")
.add_edge("smart_email_orchestrator", "email_validation_workflow")
.add_edge("email_validation_workflow", "smart_email_orchestrator")
.add_edge("smart_email_orchestrator", "email_delivery")
@@ -162,13 +162,12 @@ async def main() -> None:
# then call the email assistant, then finalize.
# If spam, go directly to the spam handler and finalize.
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="spam_detection_agent")
.register_agent(create_spam_detector_agent, name="spam_detection_agent")
.register_agent(create_email_assistant_agent, name="email_assistant_agent")
.register_executor(lambda: to_email_assistant_request, name="to_email_assistant_request")
.register_executor(lambda: handle_email_response, name="send_email")
.register_executor(lambda: handle_spam_classifier_response, name="handle_spam")
.set_start_executor("spam_detection_agent")
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge("spam_detection_agent", "to_email_assistant_request", condition=get_condition(False))
.add_edge("to_email_assistant_request", "email_assistant_agent")
@@ -225,7 +225,7 @@ async def main() -> None:
return [handle_uncertain_id]
workflow_builder = (
WorkflowBuilder()
WorkflowBuilder(start_executor="store_email")
.register_agent(create_email_analysis_agent, name="email_analysis_agent")
.register_agent(create_email_assistant_agent, name="email_assistant_agent")
.register_agent(create_email_summary_agent, name="email_summary_agent")
@@ -242,7 +242,6 @@ async def main() -> None:
workflow = (
workflow_builder
.set_start_executor("store_email")
.add_edge("store_email", "email_analysis_agent")
.add_edge("email_analysis_agent", "to_analysis_result")
.add_multi_selection_edge_group(
@@ -63,11 +63,10 @@ async def main() -> None:
# Step 1: Build the workflow graph.
# Order matters. We connect upper_case_executor -> reverse_text_executor and set the start.
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="upper_case_executor")
.register_executor(lambda: UpperCaseExecutor(id="upper_case_executor"), name="upper_case_executor")
.register_executor(lambda: ReverseTextExecutor(id="reverse_text_executor"), name="reverse_text_executor")
.add_edge("upper_case_executor", "reverse_text_executor")
.set_start_executor("upper_case_executor")
.build()
)
@@ -56,11 +56,10 @@ async def main():
# Step 1: Build the workflow with the defined edges.
# Order matters. upper_case_executor runs first, then reverse_text_executor.
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="upper_case_executor")
.register_executor(lambda: to_upper_case, name="upper_case_executor")
.register_executor(lambda: reverse_text, name="reverse_text_executor")
.add_edge("upper_case_executor", "reverse_text_executor")
.set_start_executor("upper_case_executor")
.build()
)
@@ -126,7 +126,7 @@ async def main():
# Step 1: Build the workflow with the defined edges.
# This time we are creating a loop in the workflow.
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="guess_number")
.register_executor(lambda: GuessNumberExecutor((1, 100), "guess_number"), name="guess_number")
.register_agent(create_judge_agent, name="judge_agent")
.register_executor(lambda: SubmitToJudgeAgent(judge_agent_id="judge_agent", target=30), name="submit_judge")
@@ -135,7 +135,6 @@ async def main():
.add_edge("submit_judge", "judge_agent")
.add_edge("judge_agent", "parse_judge")
.add_edge("parse_judge", "guess_number")
.set_start_executor("guess_number")
.build()
)
@@ -179,7 +179,7 @@ async def main():
# Build workflow: store -> detection agent -> to_detection_result -> switch (NotSpam or Spam or Default).
# The switch-case group evaluates cases in order, then falls back to Default when none match.
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="store_email")
.register_agent(create_spam_detection_agent, name="spam_detection_agent")
.register_agent(create_email_assistant_agent, name="email_assistant_agent")
.register_executor(lambda: store_email, name="store_email")
@@ -188,7 +188,6 @@ async def main():
.register_executor(lambda: finalize_and_send, name="finalize_and_send")
.register_executor(lambda: handle_spam, name="handle_spam")
.register_executor(lambda: handle_uncertain, name="handle_uncertain")
.set_start_executor("store_email")
.add_edge("store_email", "spam_detection_agent")
.add_edge("spam_detection_agent", "to_detection_result")
.add_switch_case_edge_group(
@@ -51,13 +51,12 @@ async def step3(text: str, ctx: WorkflowContext[Never, str]) -> None:
def build_workflow():
"""Build a simple 3-step sequential workflow (~6 seconds total)."""
return (
WorkflowBuilder()
WorkflowBuilder(start_executor="step1")
.register_executor(lambda: step1, name="step1")
.register_executor(lambda: step2, name="step2")
.register_executor(lambda: step3, name="step3")
.add_edge("step1", "step2")
.add_edge("step2", "step3")
.set_start_executor("step1")
.build()
)
@@ -184,8 +184,7 @@ async def main() -> None:
# Build the workflow.
workflow = (
WorkflowBuilder()
.set_start_executor(writer_agent)
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, coordinator)
.add_edge(coordinator, writer_agent)
.add_edge(final_editor_agent, coordinator)
@@ -233,11 +233,9 @@ async def main() -> None:
# Build the workflow
workflow = (
WorkflowBuilder()
.set_start_executor(email_processor)
WorkflowBuilder(start_executor=email_processor, output_executors=[conclude_workflow])
.add_edge(email_processor, email_writer_agent)
.add_edge(email_writer_agent, conclude_workflow)
.with_output_from([conclude_workflow])
.build()
)
@@ -174,8 +174,7 @@ async def main() -> None:
# Build workflow with request info enabled and custom aggregator
workflow = (
ConcurrentBuilder()
.participants([technical_analyst, business_analyst, user_experience_analyst])
ConcurrentBuilder(participants=[technical_analyst, business_analyst, user_experience_analyst])
.with_aggregator(aggregate_with_synthesis)
# Only enable request info for the technical analyst agent
.with_request_info(agents=["technical_analyst"])
@@ -137,11 +137,13 @@ async def main() -> None:
# Build workflow with request info enabled
# Using agents= filter to only pause before pragmatist speaks (not every turn)
# max_rounds=6: Limit to 6 rounds
workflow = (
GroupChatBuilder()
.with_orchestrator(agent=orchestrator)
.participants([optimist, pragmatist, creative])
.with_max_rounds(6)
GroupChatBuilder(
participants=[optimist, pragmatist, creative],
max_rounds=6,
orchestrator_agent=orchestrator,
)
.with_request_info(agents=[pragmatist]) # Only pause before pragmatist speaks
.build()
)
@@ -198,8 +198,7 @@ async def main() -> None:
# Build a simple loop: TurnManager <-> AgentExecutor.
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
WorkflowBuilder(start_executor=turn_manager)
.add_edge(turn_manager, guessing_agent) # Ask agent to make/adjust a guess
.add_edge(guessing_agent, turn_manager) # Agent's response comes back to coordinator
).build()
@@ -114,8 +114,7 @@ async def main() -> None:
# Build workflow with request info enabled (pauses after each agent responds)
workflow = (
SequentialBuilder()
.participants([drafter, editor, finalizer])
SequentialBuilder(participants=[drafter, editor, finalizer])
# Only enable request info for the editor agent
.with_request_info(agents=["editor"])
.build()
@@ -84,7 +84,7 @@ async def main() -> None:
upper_case = UpperCaseExecutor()
reverse_text = ReverseTextExecutor()
workflow = WorkflowBuilder().add_edge(upper_case, reverse_text).set_start_executor(upper_case).build()
workflow = WorkflowBuilder(start_executor=upper_case).add_edge(upper_case, reverse_text).build()
print("Running workflow with executor I/O observation...\n")
@@ -1,144 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
from typing import cast
from agent_framework import (
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
MagenticBuilder,
MagenticPlanReviewRequest,
WorkflowEvent,
)
from agent_framework.openai import OpenAIChatClient
"""
Sample: Magentic Orchestration with Human Plan Review
This sample demonstrates how humans can review and provide feedback on plans
generated by the Magentic workflow orchestrator. When plan review is enabled,
the workflow requests human approval or revision before executing each plan.
Key concepts:
- with_plan_review(): Enables human review of generated plans
- MagenticPlanReviewRequest: The event type for plan review requests
- Human can choose to: approve the plan or provide revision feedback
Plan review options:
- approve(): Accept the proposed plan and continue execution
- revise(feedback): Provide textual feedback to modify the plan
Prerequisites:
- OpenAI credentials configured for `OpenAIChatClient`.
"""
async def main() -> None:
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions="You are a Researcher. You find information and gather facts.",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
analyst_agent = ChatAgent(
name="AnalystAgent",
description="Data analyst who processes and summarizes research findings",
instructions="You are an Analyst. You analyze findings and create summaries.",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the workflow",
instructions="You coordinate a team to complete tasks efficiently.",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
print("\nBuilding Magentic Workflow with Human Plan Review...")
workflow = (
MagenticBuilder()
.participants([researcher_agent, analyst_agent])
.with_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=1,
max_reset_count=2,
)
.with_plan_review() # Request human input for plan review
.build()
)
task = "Research sustainable aviation fuel technology and summarize the findings."
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
print("=" * 60)
pending_request: WorkflowEvent | None = None
pending_responses: dict[str, object] | None = None
output_event: WorkflowEvent | None = None
while not output_event:
if pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
else:
stream = workflow.run(task, stream=True)
last_message_id: str | None = None
async for event in stream:
if isinstance(event, AgentRunUpdateEvent):
message_id = event.data.message_id
if message_id != last_message_id:
if last_message_id is not None:
print("\n")
print(f"- {event.executor_id}:", end=" ", flush=True)
last_message_id = message_id
print(event.data, end="", flush=True)
elif event.type == "request_info" and event.request_type is MagenticPlanReviewRequest:
pending_request = event
elif event.type == "output":
output_event = event
pending_responses = None
# Handle plan review request if any
if pending_request is not None:
event_data = cast(MagenticPlanReviewRequest, pending_request.data)
print("\n\n[Magentic Plan Review Request]")
if event_data.current_progress is not None:
print("Current Progress Ledger:")
print(json.dumps(event_data.current_progress.to_dict(), indent=2))
print()
print(f"Proposed Plan:\n{event_data.plan.text}\n")
print("Please provide your feedback (press Enter to approve):")
reply = await asyncio.get_event_loop().run_in_executor(None, input, "> ")
if reply.strip() == "":
print("Plan approved.\n")
pending_responses = {pending_request.request_id: event_data.approve()}
else:
print("Plan revised by human.\n")
pending_responses = {pending_request.request_id: event_data.revise(reply)}
pending_request = None
print("\n" + "=" * 60)
print("WORKFLOW COMPLETED")
print("=" * 60)
print("Final Output:")
# The output of the Magentic workflow is a list of ChatMessages with only one final message
# generated by the orchestrator.
output_messages = cast(list[ChatMessage], output_event.data)
if output_messages:
output = output_messages[-1].text
print(output)
if __name__ == "__main__":
asyncio.run(main())
@@ -73,12 +73,11 @@ class Aggregator(Executor):
async def main() -> None:
# 1) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="dispatcher")
.register_executor(lambda: Dispatcher(id="dispatcher"), name="dispatcher")
.register_executor(lambda: Average(id="average"), name="average")
.register_executor(lambda: Sum(id="summation"), name="summation")
.register_executor(lambda: Aggregator(id="aggregator"), name="aggregator")
.set_start_executor("dispatcher")
.add_fan_out_edges("dispatcher", ["average", "summation"])
.add_fan_in_edges(["average", "summation"], "aggregator")
.build()
@@ -123,13 +123,12 @@ def create_legal_agent() -> ChatAgent:
async def main() -> None:
# 1) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="dispatcher")
.register_agent(create_researcher_agent, name="researcher")
.register_agent(create_marketer_agent, name="marketer")
.register_agent(create_legal_agent, name="legal")
.register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher")
.register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator")
.set_start_executor("dispatcher")
.add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"]) # Parallel branches
.add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator") # Join at the aggregator
.build()
@@ -261,7 +261,7 @@ async def main():
# Step 1: Create the workflow builder and register executors.
workflow_builder = (
WorkflowBuilder()
WorkflowBuilder(start_executor="split_data_executor")
.register_executor(lambda: Map(id="map_executor_0"), name="map_executor_0")
.register_executor(lambda: Map(id="map_executor_1"), name="map_executor_1")
.register_executor(lambda: Map(id="map_executor_2"), name="map_executor_2")
@@ -286,7 +286,6 @@ async def main():
# Step 2: Build the workflow graph using fan out and fan in edges.
workflow = (
workflow_builder
.set_start_executor("split_data_executor")
.add_fan_out_edges(
"split_data_executor",
["map_executor_0", "map_executor_1", "map_executor_2"],
@@ -189,7 +189,7 @@ async def main() -> None:
# False -> submit_to_email_assistant -> email_assistant_agent -> finalize_and_send
# True -> handle_spam
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="store_email")
.register_agent(create_spam_detection_agent, name="spam_detection_agent")
.register_agent(create_email_assistant_agent, name="email_assistant_agent")
.register_executor(lambda: store_email, name="store_email")
@@ -197,7 +197,6 @@ async def main() -> None:
.register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant")
.register_executor(lambda: finalize_and_send, name="finalize_and_send")
.register_executor(lambda: handle_spam, name="handle_spam")
.set_start_executor("store_email")
.add_edge("store_email", "spam_detection_agent")
.add_edge("spam_detection_agent", "to_detection_result")
.add_edge("to_detection_result", "submit_to_email_assistant", condition=get_condition(False))
@@ -88,7 +88,7 @@ async def main() -> None:
)
# Build a simple sequential workflow
workflow = SequentialBuilder().participants([agent]).build()
workflow = SequentialBuilder(participants=[agent]).build()
# Define custom context that will flow to tools via kwargs
custom_data = {
@@ -148,7 +148,7 @@ async def main() -> None:
# 4. Build a concurrent workflow with both agents
# ConcurrentBuilder requires at least 2 participants for fan-out
workflow = ConcurrentBuilder().participants([microsoft_agent, google_agent]).build()
workflow = ConcurrentBuilder(participants=[microsoft_agent, google_agent]).build()
# 5. Start the workflow - both agents will process the same task in parallel
print("Starting concurrent workflow with tool approval...")
@@ -146,18 +146,16 @@ async def main() -> None:
)
# 4. Build a group chat workflow with the selector function
workflow = (
GroupChatBuilder()
.with_orchestrator(selection_func=select_next_speaker)
.participants([qa_engineer, devops_engineer])
# Set a hard limit to 4 rounds
# First round: QAEngineer speaks
# Second round: DevOpsEngineer speaks (check staging + create rollback)
# Third round: DevOpsEngineer speaks with an approval request (deploy to production)
# Fourth round: DevOpsEngineer speaks again after approval
.with_max_rounds(4)
.build()
)
# max_rounds=4: Set a hard limit to 4 rounds
# First round: QAEngineer speaks
# Second round: DevOpsEngineer speaks (check staging + create rollback)
# Third round: DevOpsEngineer speaks with an approval request (deploy to production)
# Fourth round: DevOpsEngineer speaks again after approval
workflow = GroupChatBuilder(
participants=[qa_engineer, devops_engineer],
max_rounds=4,
selection_func=select_next_speaker,
).build()
# 5. Start the workflow
print("Starting group chat workflow for software deployment...")
@@ -111,7 +111,7 @@ async def main() -> None:
)
# 3. Build a sequential workflow with the agent
workflow = SequentialBuilder().participants([database_agent]).build()
workflow = SequentialBuilder(participants=[database_agent]).build()
# 4. Start the workflow with a user task
print("Starting sequential workflow with tool approval...")
@@ -123,13 +123,12 @@ async def main() -> None:
# Build a simple fan-out/fan-in workflow
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor="dispatcher")
.register_agent(create_researcher_agent, name="researcher")
.register_agent(create_marketer_agent, name="marketer")
.register_agent(create_legal_agent, name="legal")
.register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher")
.register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator")
.set_start_executor("dispatcher")
.add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"])
.add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator")
.build()
@@ -87,7 +87,7 @@ async def run_agent_framework_example(prompt: str) -> Sequence[list[ChatMessage]
name="chemistry",
)
workflow = ConcurrentBuilder().participants([physics, chemistry]).build()
workflow = ConcurrentBuilder(participants=[physics, chemistry]).build()
outputs: list[list[ChatMessage]] = []
async for event in workflow.run(prompt, stream=True):
@@ -7,8 +7,9 @@ import sys
from collections.abc import Sequence
from typing import Any, cast
from agent_framework import ChatAgent, ChatMessage, GroupChatBuilderWorkflowEvent
from agent_framework import ChatAgent, ChatMessage
from agent_framework.azure import AzureOpenAIChatClient, AzureOpenAIResponsesClient
from agent_framework.orchestrations import GroupChatBuilder
from azure.identity import AzureCliCredential
from semantic_kernel.agents import Agent, ChatCompletionAgent, GroupChatOrchestration
from semantic_kernel.agents.orchestration.group_chat import (
@@ -231,12 +232,10 @@ async def run_agent_framework_example(task: str) -> str:
chat_client=AzureOpenAIResponsesClient(credential=credential),
)
workflow = (
GroupChatBuilder()
.with_orchestrator(agent=AzureOpenAIChatClient(credential=credential).as_agent())
.participants([researcher, planner])
.build()
)
workflow = GroupChatBuilder(
participants=[researcher, planner],
orchestrator_agent=AzureOpenAIChatClient(credential=credential).as_agent(),
).build()
final_response = ""
async for event in workflow.run(task, stream=True):
@@ -6,8 +6,9 @@ import asyncio
from collections.abc import Sequence
from typing import cast
from agent_framework import ChatAgent, HostedCodeInterpreterTool, MagenticBuilderWorkflowEvent
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
from agent_framework.orchestrations import MagenticBuilder
from semantic_kernel.agents import (
Agent,
ChatCompletionAgent,
@@ -144,7 +145,7 @@ async def run_agent_framework_example(prompt: str) -> str | None:
chat_client=OpenAIChatClient(),
)
workflow = MagenticBuilder().participants([researcher, coder]).with_manager(agent=manager_agent).build()
workflow = MagenticBuilder(participants=[researcher, coder], manager_agent=manager_agent).build()
final_text: str | None = None
async for event in workflow.run(prompt, stream=True):
@@ -74,7 +74,7 @@ async def run_agent_framework_example(prompt: str) -> list[ChatMessage]:
name="reviewer",
)
workflow = SequentialBuilder().participants([writer, reviewer]).build()
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
conversation_outputs: list[list[ChatMessage]] = []
async for event in workflow.run(prompt, stream=True):
@@ -221,12 +221,11 @@ async def run_agent_framework_workflow_example() -> str | None:
aggregate = FanInExecutor(required_cycles=3)
workflow = (
WorkflowBuilder()
WorkflowBuilder(start_executor=kickoff)
.add_edge(kickoff, step_a)
.add_edge(kickoff, step_b)
.add_fan_in_edges([step_a, step_b], aggregate)
.add_edge(aggregate, kickoff)
.set_start_executor(kickoff)
.build()
)
@@ -232,7 +232,7 @@ def _build_inner_workflow() -> WorkflowExecutor:
inner_echo = InnerEchoExecutor()
inner_repeat = InnerRepeatExecutor()
inner_workflow = WorkflowBuilder().set_start_executor(inner_echo).add_edge(inner_echo, inner_repeat).build()
inner_workflow = WorkflowBuilder(start_executor=inner_echo).add_edge(inner_echo, inner_repeat).build()
return WorkflowExecutor(inner_workflow, id="inner_workflow")
@@ -246,8 +246,7 @@ async def run_agent_framework_nested_workflow(initial_message: str) -> Sequence[
collector = CollectResultExecutor()
outer_workflow = (
WorkflowBuilder()
.set_start_executor(kickoff)
WorkflowBuilder(start_executor=kickoff)
.add_edge(kickoff, outer_echo)
.add_edge(outer_echo, outer_repeat)
.add_edge(outer_repeat, inner_executor)