Python: WorkflowBuilder registry (#2486)

* Add workflow builder factory pattern

* Add internal edge groups to registered executors; next samples

* Update samples: Part 1

* register -> register_executor

* update hil samples

* Update other samples

* Update agent  samples

* Update doc string

* Add new sample

* Fix mypy

* Address comments

* Fix mypy
This commit is contained in:
Tao Chen
2025-12-04 21:26:10 -08:00
committed by GitHub
Unverified
parent 6809510413
commit f2ed5b55f6
33 changed files with 1609 additions and 696 deletions
@@ -0,0 +1,104 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import (
AgentRunResponse,
ChatAgent,
Executor,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
executor,
handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
"""
Step 4: Using Factories to Define Executors and Agents
What this example shows
- Defining custom executors using both class-based and function-based approaches.
- Registering executor and agent factories with WorkflowBuilder for lazy instantiation.
- Building a simple workflow that transforms input text through multiple steps.
Benefits of using factories
- Decouples executor and agent creation from workflow definition.
- Isolated instances are created for workflow builder build, allowing for cleaner state management
and handling parallel workflow runs.
It is recommended to use factories when defining executors and agents for production workflows.
Prerequisites
- No external services required.
"""
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
"""Reverse the input string and send it downstream."""
result = text[::-1]
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
def create_agent() -> ChatAgent:
"""Factory function to create a Writer agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You decode messages. Try to reconstruct the original message."),
name="decoder",
)
async def main():
"""Build and run a simple 2-step workflow using the fluent builder API."""
# Build the workflow using a fluent pattern:
# 1) register_executor(factory, name) registers an executor factory
# 2) register_agent(factory, name) registers an agent factory
# 3) add_chain([node_names]) adds a sequence of nodes to the workflow
# 4) set_start_executor(node) declares the entry point
# 5) build() finalizes and returns an immutable Workflow object
workflow = (
WorkflowBuilder()
.register_executor(lambda: UpperCase(id="upper_case_executor"), name="UpperCase")
.register_executor(lambda: reverse_text, name="ReverseText")
.register_agent(create_agent, name="DecoderAgent", output_response=True)
.add_chain(["UpperCase", "ReverseText", "DecoderAgent"])
.set_start_executor("UpperCase")
.build()
)
output: AgentRunResponse | None = None
async for event in workflow.run_stream("hello world"):
if isinstance(event, WorkflowOutputEvent) and isinstance(event.data, AgentRunResponse):
output = event.data
if output:
print(f"Decoded output: {output.text}")
else:
print("No output received.")
"""
Sample Output:
HELLO WORLD
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -1,11 +1,8 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import Awaitable, Callable
from contextlib import AsyncExitStack
from typing import Any
from agent_framework import AgentRunUpdateEvent, WorkflowBuilder, WorkflowOutputEvent
from agent_framework import AgentRunUpdateEvent, ChatAgent, WorkflowBuilder, WorkflowOutputEvent
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential
@@ -29,48 +26,36 @@ Prerequisites:
"""
async def create_azure_ai_agent() -> tuple[Callable[..., Awaitable[Any]], Callable[[], Awaitable[None]]]:
"""Helper method to create a Azure AI agent factory and a close function.
def create_writer_agent(client: AzureAIAgentClient) -> ChatAgent:
return client.create_agent(
name="Writer",
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
)
This makes sure the async context managers are properly handled.
"""
stack = AsyncExitStack()
cred = await stack.enter_async_context(AzureCliCredential())
client = await stack.enter_async_context(AzureAIAgentClient(async_credential=cred))
async def agent(**kwargs: Any) -> Any:
return await stack.enter_async_context(client.create_agent(**kwargs))
async def close() -> None:
await stack.aclose()
return agent, close
def create_reviewer_agent(client: AzureAIAgentClient) -> ChatAgent:
return client.create_agent(
name="Reviewer",
instructions=(
"You are an excellent content reviewer. "
"Provide actionable feedback to the writer about the provided content. "
"Provide the feedback in the most concise manner possible."
),
)
async def main() -> None:
agent, close = await create_azure_ai_agent()
try:
writer = await agent(
name="Writer",
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
)
reviewer = await agent(
name="Reviewer",
instructions=(
"You are an excellent content reviewer. "
"Provide actionable feedback to the writer about the provided content. "
"Provide the feedback in the most concise manner possible."
),
)
async with AzureCliCredential() as cred, AzureAIAgentClient(async_credential=cred) as client:
# Build the workflow by adding agents directly as edges.
# Agents adapt to workflow mode: run_stream() for incremental updates, run() for complete responses.
workflow = (
WorkflowBuilder()
.set_start_executor(writer)
.add_edge(writer, reviewer)
.register_agent(lambda: create_writer_agent(client), name="writer")
.register_agent(lambda: create_reviewer_agent(client), name="reviewer", output_response=True)
.set_start_executor("writer")
.add_edge("writer", "reviewer")
.build()
)
@@ -89,8 +74,6 @@ async def main() -> None:
elif isinstance(event, WorkflowOutputEvent):
print("\n===== Final output =====")
print(event.data)
finally:
await close()
if __name__ == "__main__":
@@ -86,18 +86,17 @@ async def enrich_with_references(
await ctx.send_message(AgentExecutorRequest(messages=conversation))
async def main() -> None:
"""Run the workflow and stream combined updates from both agents."""
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
research_agent = chat_client.create_agent(
def create_research_agent():
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
name="research_agent",
instructions=(
"Produce a short, bullet-style briefing with two actionable ideas. Label the section as 'Initial Draft'."
),
)
final_editor_agent = chat_client.create_agent(
def create_final_editor_agent():
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
name="final_editor_agent",
instructions=(
"Use all conversation context (including external notes) to produce the final answer. "
@@ -105,11 +104,17 @@ async def main() -> None:
),
)
async def main() -> None:
"""Run the workflow and stream combined updates from both agents."""
workflow = (
WorkflowBuilder()
.set_start_executor(research_agent)
.add_edge(research_agent, enrich_with_references)
.add_edge(enrich_with_references, final_editor_agent)
.register_agent(create_research_agent, name="research_agent")
.register_agent(create_final_editor_agent, name="final_editor_agent")
.register_executor(lambda: enrich_with_references, name="enrich_with_references")
.set_start_executor("research_agent")
.add_edge("research_agent", "enrich_with_references")
.add_edge("enrich_with_references", "final_editor_agent")
.build()
)
@@ -26,35 +26,37 @@ Prerequisites:
"""
async def main():
"""Build and run a simple two node agent workflow: Writer then Reviewer."""
# Create the Azure chat client. AzureCliCredential uses your current az login.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Define two domain specific chat agents.
writer_agent = chat_client.create_agent(
def create_writer_agent():
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
name="writer_agent",
name="writer",
)
reviewer_agent = chat_client.create_agent(
def create_reviewer_agent():
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are an excellent content reviewer."
"Provide actionable feedback to the writer about the provided content."
"Provide the feedback in the most concise manner possible."
),
name="reviewer_agent",
name="reviewer",
)
async def main():
"""Build and run a simple two node agent workflow: Writer then Reviewer."""
# Build the workflow using the fluent builder.
# Set the start node and connect an edge from writer to reviewer.
# Agents adapt to workflow mode: run_stream() for incremental updates, run() for complete responses.
workflow = (
WorkflowBuilder()
.set_start_executor(writer_agent)
.add_edge(writer_agent, reviewer_agent)
.register_agent(create_writer_agent, name="writer")
.register_agent(create_reviewer_agent, name="reviewer", output_response=True)
.set_start_executor("writer")
.add_edge("writer", "reviewer")
.build()
)
@@ -10,6 +10,7 @@ from agent_framework import (
AgentExecutorResponse,
AgentRunResponse,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
Executor,
FunctionCallContent,
@@ -166,6 +167,31 @@ class Coordinator(Executor):
)
def create_writer_agent() -> ChatAgent:
"""Creates a writer agent with tools."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
name="writer_agent",
instructions=(
"You are a marketing writer. Call the available tools before drafting copy so you are precise. "
"Always call both tools once before drafting. Summarize tool outputs as bullet points, then "
"produce a 3-sentence draft."
),
tools=[fetch_product_brief, get_brand_voice_profile],
tool_choice=ToolMode.REQUIRED_ANY,
)
def create_final_editor_agent() -> ChatAgent:
"""Creates a final editor agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
name="final_editor_agent",
instructions=(
"You are an editor who polishes marketing copy after human approval. "
"Correct any legal or factual issues. Return the final version even if no changes are made. "
),
)
def display_agent_run_update(event: AgentRunUpdateEvent, last_executor: str | None) -> None:
"""Display an AgentRunUpdateEvent in a readable format."""
printed_tool_calls: set[str] = set()
@@ -211,42 +237,25 @@ def display_agent_run_update(event: AgentRunUpdateEvent, last_executor: str | No
async def main() -> None:
"""Run the workflow and bridge human feedback between two agents."""
# Create agents with tools and instructions.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
writer_agent = chat_client.create_agent(
name="writer_agent",
instructions=(
"You are a marketing writer. Call the available tools before drafting copy so you are precise. "
"Always call both tools once before drafting. Summarize tool outputs as bullet points, then "
"produce a 3-sentence draft."
),
tools=[fetch_product_brief, get_brand_voice_profile],
tool_choice=ToolMode.REQUIRED_ANY,
)
final_editor_agent = chat_client.create_agent(
name="final_editor_agent",
instructions=(
"You are an editor who polishes marketing copy after human approval. "
"Correct any legal or factual issues. Return the final version even if no changes are made. "
),
)
coordinator = Coordinator(
id="coordinator",
writer_id="writer_agent",
final_editor_id="final_editor_agent",
)
# Build the workflow.
workflow = (
WorkflowBuilder()
.set_start_executor(writer_agent)
.add_edge(writer_agent, coordinator)
.add_edge(coordinator, writer_agent)
.add_edge(final_editor_agent, coordinator)
.add_edge(coordinator, final_editor_agent)
.register_agent(create_writer_agent, name="writer_agent")
.register_agent(create_final_editor_agent, name="final_editor_agent")
.register_executor(
lambda: Coordinator(
id="coordinator",
writer_id="writer_agent",
final_editor_id="final_editor_agent",
),
name="coordinator",
)
.set_start_executor("writer_agent")
.add_edge("writer_agent", "coordinator")
.add_edge("coordinator", "writer_agent")
.add_edge("final_editor_agent", "coordinator")
.add_edge("coordinator", "final_editor_agent")
.build()
)
@@ -41,9 +41,9 @@ class Writer(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "writer"):
def __init__(self, id: str = "writer"):
# Create a domain specific agent using your configured AzureOpenAIChatClient.
self.agent = chat_client.create_agent(
self.agent = AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
@@ -83,9 +83,9 @@ class Reviewer(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "reviewer"):
def __init__(self, id: str = "reviewer"):
# Create a domain specific agent that evaluates and refines content.
self.agent = chat_client.create_agent(
self.agent = AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are an excellent content reviewer. You review the content and provide feedback to the writer."
),
@@ -105,16 +105,17 @@ class Reviewer(Executor):
async def main():
"""Build and run a simple two node agent workflow: Writer then Reviewer."""
# Create the Azure chat client. AzureCliCredential uses your current az login.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Instantiate the two agent backed executors.
writer = Writer(chat_client)
reviewer = Reviewer(chat_client)
# Build the workflow using the fluent builder.
# Set the start node and connect an edge from writer to reviewer.
workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build()
workflow = (
WorkflowBuilder()
.register_executor(Writer, name="writer")
.register_executor(Reviewer, name="reviewer")
.set_start_executor("writer")
.add_edge("writer", "reviewer")
.build()
)
# Run the workflow with the user's initial message.
# For foundational clarity, use run (non streaming) and print the workflow output.
@@ -5,6 +5,7 @@ from typing import Never
from agent_framework import (
AgentExecutorResponse,
ChatAgent,
Executor,
HostedCodeInterpreterTool,
WorkflowBuilder,
@@ -70,21 +71,39 @@ class Evaluator(Executor):
await ctx.yield_output(f"Correctness: {correctness}, Consumption: {consumption}")
def create_coding_agent(client: AzureAIAgentClient) -> ChatAgent:
"""Create an AI agent with code interpretation capabilities.
This agent can generate and execute Python code to solve problems.
Args:
client: The AzureAIAgentClient used to create the agent
Returns:
A ChatAgent configured with coding instructions and tools
"""
return client.create_agent(
name="CodingAgent",
instructions=("You are a helpful assistant that can write and execute Python code to solve problems."),
tools=HostedCodeInterpreterTool(),
)
async def main():
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(async_credential=credential) as chat_client,
):
# Create an agent with code interpretation capabilities
agent = chat_client.create_agent(
name="CodingAgent",
instructions=("You are a helpful assistant that can write and execute Python code to solve problems."),
tools=HostedCodeInterpreterTool(),
)
# Build a workflow: Agent generates code -> Evaluator assesses results
# The agent will be wrapped in a special agent executor which produces AgentExecutorResponse
workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, Evaluator(id="evaluator")).build()
workflow = (
WorkflowBuilder()
.register_agent(lambda: create_coding_agent(chat_client), name="coding_agent")
.register_executor(lambda: Evaluator(id="evaluator"), name="evaluator")
.set_start_executor("coding_agent")
.add_edge("coding_agent", "evaluator")
.build()
)
# Execute the workflow with a specific coding task
results = await workflow.run(
@@ -81,7 +81,7 @@ class ReviewerWithHumanInTheLoop(Executor):
@response_handler
async def accept_human_review(
self,
original_request: ReviewRequest,
original_request: HumanReviewRequest,
response: ReviewResponse,
ctx: WorkflowContext[ReviewResponse],
) -> None:
@@ -97,20 +97,25 @@ async def main() -> None:
print("Starting Workflow Agent with Human-in-the-Loop Demo")
print("=" * 50)
# Create executors for the workflow.
print("Creating chat client and executors...")
mini_chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
worker = Worker(id="sub-worker", chat_client=mini_chat_client)
reviewer = ReviewerWithHumanInTheLoop(worker_id=worker.id)
print("Building workflow with Worker-Reviewer cycle...")
# Build a workflow with bidirectional communication between Worker and Reviewer,
# and escalation paths for human review.
agent = (
WorkflowBuilder()
.add_edge(worker, reviewer) # Worker sends requests to Reviewer
.add_edge(reviewer, worker) # Reviewer sends feedback to Worker
.set_start_executor(worker)
.register_executor(
lambda: Worker(
id="sub-worker",
chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()),
),
name="worker",
)
.register_executor(
lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"),
name="reviewer",
)
.add_edge("worker", "reviewer") # Worker sends requests to Reviewer
.add_edge("reviewer", "worker") # Reviewer sends feedback to Worker
.set_start_executor("worker")
.build()
.as_agent() # Convert workflow into an agent interface
)
@@ -195,19 +195,20 @@ async def main() -> None:
print("Starting Workflow Agent Demo")
print("=" * 50)
# Initialize chat clients and executors.
print("Creating chat client and executors...")
mini_chat_client = OpenAIChatClient(model_id="gpt-4.1-nano")
chat_client = OpenAIChatClient(model_id="gpt-4.1")
reviewer = Reviewer(id="reviewer", chat_client=chat_client)
worker = Worker(id="worker", chat_client=mini_chat_client)
print("Building workflow with Worker ↔ Reviewer cycle...")
agent = (
WorkflowBuilder()
.add_edge(worker, reviewer) # Worker sends responses to Reviewer
.add_edge(reviewer, worker) # Reviewer provides feedback to Worker
.set_start_executor(worker)
.register_executor(
lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")),
name="worker",
)
.register_executor(
lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")),
name="reviewer",
)
.add_edge("worker", "reviewer") # Worker sends responses to Reviewer
.add_edge("reviewer", "worker") # Reviewer provides feedback to Worker
.set_start_executor("worker")
.build()
.as_agent() # Wrap workflow as an agent
)
@@ -10,7 +10,6 @@ from typing import Any, override
# `agent_framework.builtin` chat client or mock the writer executor. We keep the
# concrete import here so readers can see an end-to-end configuration.
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
@@ -173,25 +172,25 @@ class ReviewGateway(Executor):
def create_workflow(checkpoint_storage: FileCheckpointStorage) -> Workflow:
"""Assemble the workflow graph used by both the initial run and resume."""
# The Azure client is created once so our agent executor can issue calls to the hosted
# model. The agent id is stable across runs which keeps checkpoints deterministic.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.create_agent(instructions="Write concise, warm release notes that sound human and helpful.")
writer = AgentExecutor(agent, id="writer")
gateway = ReviewGateway(id="review_gateway", writer_id=writer.id)
prepare = BriefPreparer(id="prepare_brief", agent_id=writer.id)
# Wire the workflow DAG. Edges mirror the numbered steps described in the
# module docstring. Because `WorkflowBuilder` is declarative, reading these
# edges is often the quickest way to understand execution order.
workflow_builder = (
WorkflowBuilder(max_iterations=6)
.set_start_executor(prepare)
.add_edge(prepare, writer)
.add_edge(writer, gateway)
.add_edge(gateway, writer) # revisions loop
.register_agent(
lambda: AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions="Write concise, warm release notes that sound human and helpful.",
# The agent name is stable across runs which keeps checkpoints deterministic.
name="writer",
),
name="writer",
)
.register_executor(lambda: ReviewGateway(id="review_gateway", writer_id="writer"), name="review_gateway")
.register_executor(lambda: BriefPreparer(id="prepare_brief", agent_id="writer"), name="prepare_brief")
.set_start_executor("prepare_brief")
.add_edge("prepare_brief", "writer")
.add_edge("writer", "review_gateway")
.add_edge("review_gateway", "writer") # revisions loop
.with_checkpointing(checkpoint_storage=checkpoint_storage)
)
@@ -99,16 +99,14 @@ class WorkerExecutor(Executor):
async def main():
# Create workflow executors
start_executor = StartExecutor(id="start")
worker_executor = WorkerExecutor(id="worker")
# Build workflow with checkpointing enabled
workflow_builder = (
WorkflowBuilder()
.set_start_executor(start_executor)
.add_edge(start_executor, worker_executor)
.add_edge(worker_executor, worker_executor) # Self-loop for iterative processing
.register_executor(lambda: StartExecutor(id="start"), name="start")
.register_executor(lambda: WorkerExecutor(id="worker"), name="worker")
.set_start_executor("start")
.add_edge("start", "worker")
.add_edge("worker", "worker") # Self-loop for iterative processing
)
checkpoint_storage = InMemoryCheckpointStorage()
workflow_builder = workflow_builder.with_checkpointing(checkpoint_storage=checkpoint_storage)
@@ -292,16 +292,16 @@ class LaunchCoordinator(Executor):
def build_sub_workflow() -> WorkflowExecutor:
writer = DraftWriter()
router = DraftReviewRouter()
finaliser = DraftFinaliser()
"""Assemble the sub-workflow used by the parent workflow executor."""
sub_workflow = (
WorkflowBuilder()
.set_start_executor(writer)
.add_edge(writer, router)
.add_edge(router, finaliser)
.add_edge(finaliser, writer) # permits revision loops
.register_executor(DraftWriter, name="writer")
.register_executor(DraftReviewRouter, name="router")
.register_executor(DraftFinaliser, name="finaliser")
.set_start_executor("writer")
.add_edge("writer", "router")
.add_edge("router", "finaliser")
.add_edge("finaliser", "writer") # permits revision loops
.build()
)
@@ -309,14 +309,14 @@ def build_sub_workflow() -> WorkflowExecutor:
def build_parent_workflow(storage: FileCheckpointStorage) -> Workflow:
coordinator = LaunchCoordinator()
sub_executor = build_sub_workflow()
"""Assemble the parent workflow that embeds the sub-workflow."""
return (
WorkflowBuilder()
.set_start_executor(coordinator)
.add_edge(coordinator, sub_executor)
.add_edge(sub_executor, coordinator)
.register_executor(LaunchCoordinator, name="coordinator")
.register_executor(build_sub_workflow, name="sub_executor")
.set_start_executor("coordinator")
.add_edge("coordinator", "sub_executor")
.add_edge("sub_executor", "coordinator")
.with_checkpointing(storage)
.build()
)
@@ -8,7 +8,6 @@ from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
WorkflowEvent,
WorkflowExecutor,
handler,
)
@@ -46,13 +45,6 @@ class TextProcessingResult:
char_count: int
class AllTasksCompleted(WorkflowEvent):
"""Event triggered when all processing tasks are complete."""
def __init__(self, results: list[TextProcessingResult]):
super().__init__(results)
# Sub-workflow executor
class TextProcessor(Executor):
"""Processes text strings - counts words and characters."""
@@ -113,7 +105,11 @@ class TextProcessingOrchestrator(Executor):
await ctx.send_message(request, target_id="text_processor_workflow")
@handler
async def collect_result(self, result: TextProcessingResult, ctx: WorkflowContext) -> None:
async def collect_result(
self,
result: TextProcessingResult,
ctx: WorkflowContext[Never, list[TextProcessingResult]],
) -> None:
"""Collect results from sub-workflows."""
print(f"📥 Collected result from {result.task_id}")
self.results.append(result)
@@ -121,48 +117,54 @@ class TextProcessingOrchestrator(Executor):
# Check if all results are collected
if len(self.results) == self.expected_count:
print("\n🎉 All tasks completed!")
await ctx.add_event(AllTasksCompleted(self.results))
await ctx.yield_output(self.results)
def get_summary(self) -> dict[str, Any]:
"""Get a summary of all processing results."""
total_words = sum(result.word_count for result in self.results)
total_chars = sum(result.char_count for result in self.results)
avg_words = total_words / len(self.results) if self.results else 0
avg_chars = total_chars / len(self.results) if self.results else 0
return {
"total_texts": len(self.results),
"total_words": total_words,
"total_characters": total_chars,
"average_words_per_text": round(avg_words, 2),
"average_characters_per_text": round(avg_chars, 2),
}
def get_result_summary(results: list[TextProcessingResult]) -> dict[str, Any]:
"""Get a summary of all processing results."""
total_words = sum(result.word_count for result in results)
total_chars = sum(result.char_count for result in results)
avg_words = total_words / len(results) if results else 0
avg_chars = total_chars / len(results) if results else 0
return {
"total_texts": len(results),
"total_words": total_words,
"total_characters": total_chars,
"average_words_per_text": round(avg_words, 2),
"average_characters_per_text": round(avg_chars, 2),
}
def create_sub_workflow() -> WorkflowExecutor:
"""Create the text processing sub-workflow."""
print("🚀 Setting up sub-workflow...")
processing_workflow = (
WorkflowBuilder()
.register_executor(TextProcessor, name="text_processor")
.set_start_executor("text_processor")
.build()
)
return WorkflowExecutor(processing_workflow, id="text_processor_workflow")
async def main():
"""Main function to run the basic sub-workflow example."""
print("🚀 Setting up sub-workflow...")
# Step 1: Create the text processing sub-workflow
text_processor = TextProcessor()
processing_workflow = WorkflowBuilder().set_start_executor(text_processor).build()
print("🔧 Setting up parent workflow...")
# Step 2: Create the parent workflow
orchestrator = TextProcessingOrchestrator()
workflow_executor = WorkflowExecutor(processing_workflow, id="text_processor_workflow")
# Step 1: Create the parent workflow
main_workflow = (
WorkflowBuilder()
.set_start_executor(orchestrator)
.add_edge(orchestrator, workflow_executor)
.add_edge(workflow_executor, orchestrator)
.register_executor(TextProcessingOrchestrator, name="text_orchestrator")
.register_executor(create_sub_workflow, name="text_processor_workflow")
.set_start_executor("text_orchestrator")
.add_edge("text_orchestrator", "text_processor_workflow")
.add_edge("text_processor_workflow", "text_orchestrator")
.build()
)
# Step 3: Test data - various text strings
# Step 2: Test data - various text strings
test_texts = [
"Hello world! This is a simple test.",
"Python is a powerful programming language used for many applications.",
@@ -175,15 +177,17 @@ async def main():
print(f"\n🧪 Testing with {len(test_texts)} text strings")
print("=" * 60)
# Step 4: Run the workflow
await main_workflow.run(test_texts)
# Step 3: Run the workflow
result = await main_workflow.run(test_texts)
# Step 5: Display results
# Step 4: Display results
print("\n📊 Processing Results:")
print("=" * 60)
# Sort results by task_id for consistent display
sorted_results = sorted(orchestrator.results, key=lambda r: r.task_id)
task_results = result.get_outputs()
assert len(task_results) == 1
sorted_results = sorted(task_results[0], key=lambda r: r.task_id)
for result in sorted_results:
preview = result.text[:30] + "..." if len(result.text) > 30 else result.text
@@ -191,7 +195,7 @@ async def main():
print(f"{result.task_id}: '{preview}' -> {result.word_count} words, {result.char_count} chars")
# Step 6: Display summary
summary = orchestrator.get_summary()
summary = get_result_summary(sorted_results)
print("\n📈 Summary:")
print("=" * 60)
print(f"📄 Total texts processed: {summary['total_texts']}")
@@ -169,19 +169,18 @@ def build_resource_request_distribution_workflow() -> Workflow:
elif len(self._responses) > self._request_count:
raise ValueError("Received more responses than expected")
orchestrator = RequestDistribution("orchestrator")
resource_requester = ResourceRequester("resource_requester")
policy_checker = PolicyChecker("policy_checker")
result_collector = ResultCollector("result_collector")
return (
WorkflowBuilder()
.set_start_executor(orchestrator)
.add_edge(orchestrator, resource_requester)
.add_edge(orchestrator, policy_checker)
.add_edge(resource_requester, result_collector)
.add_edge(policy_checker, result_collector)
.add_edge(orchestrator, result_collector) # For request count
.register_executor(lambda: RequestDistribution("orchestrator"), name="orchestrator")
.register_executor(lambda: ResourceRequester("resource_requester"), name="resource_requester")
.register_executor(lambda: PolicyChecker("policy_checker"), name="policy_checker")
.register_executor(lambda: ResultCollector("result_collector"), name="result_collector")
.set_start_executor("orchestrator")
.add_edge("orchestrator", "resource_requester")
.add_edge("orchestrator", "policy_checker")
.add_edge("resource_requester", "result_collector")
.add_edge("policy_checker", "result_collector")
.add_edge("orchestrator", "result_collector") # For request count
.build()
)
@@ -288,29 +287,27 @@ class PolicyEngine(Executor):
async def main() -> None:
# Create executors in the main workflow
sub_workflow = build_resource_request_distribution_workflow()
resource_allocator = ResourceAllocator("resource_allocator")
policy_engine = PolicyEngine("policy_engine")
# Create the WorkflowExecutor for the sub-workflow
# Setting allow_direct_output=True to let the sub-workflow output directly.
# This is because the sub-workflow is the both the entry point and the exit
# point of the main workflow.
sub_workflow_executor = WorkflowExecutor(
sub_workflow,
"sub_workflow_executor",
allow_direct_output=True,
)
# Build the main workflow
main_workflow = (
WorkflowBuilder()
.set_start_executor(sub_workflow_executor)
.add_edge(sub_workflow_executor, resource_allocator)
.add_edge(resource_allocator, sub_workflow_executor)
.add_edge(sub_workflow_executor, policy_engine)
.add_edge(policy_engine, sub_workflow_executor)
.register_executor(lambda: ResourceAllocator("resource_allocator"), name="resource_allocator")
.register_executor(lambda: PolicyEngine("policy_engine"), name="policy_engine")
.register_executor(
lambda: WorkflowExecutor(
build_resource_request_distribution_workflow(),
"sub_workflow_executor",
# Setting allow_direct_output=True to let the sub-workflow output directly.
# This is because the sub-workflow is the both the entry point and the exit
# point of the main workflow.
allow_direct_output=True,
),
name="sub_workflow_executor",
)
.set_start_executor("sub_workflow_executor")
.add_edge("sub_workflow_executor", "resource_allocator")
.add_edge("resource_allocator", "sub_workflow_executor")
.add_edge("sub_workflow_executor", "policy_engine")
.add_edge("policy_engine", "sub_workflow_executor")
.build()
)
@@ -154,15 +154,14 @@ def build_email_address_validation_workflow() -> Workflow:
)
# Build the workflow
sanitizer = EmailSanitizer(id="email_sanitizer")
format_validator = EmailFormatValidator(id="email_format_validator")
domain_validator = DomainValidator(id="domain_validator")
return (
WorkflowBuilder()
.set_start_executor(sanitizer)
.add_edge(sanitizer, format_validator)
.add_edge(format_validator, domain_validator)
.register_executor(lambda: EmailSanitizer(id="email_sanitizer"), name="email_sanitizer")
.register_executor(lambda: EmailFormatValidator(id="email_format_validator"), name="email_format_validator")
.register_executor(lambda: DomainValidator(id="domain_validator"), name="domain_validator")
.set_start_executor("email_sanitizer")
.add_edge("email_sanitizer", "email_format_validator")
.add_edge("email_format_validator", "domain_validator")
.build()
)
@@ -270,21 +269,22 @@ async def main() -> None:
# A list of approved domains
approved_domains = {"example.com", "company.com"}
# Create executors in the main workflow
orchestrator = SmartEmailOrchestrator(id="smart_email_orchestrator", approved_domains=approved_domains)
email_delivery = EmailDelivery(id="email_delivery")
# Create the sub-workflow for email address validation
validation_workflow = build_email_address_validation_workflow()
validation_workflow_executor = WorkflowExecutor(validation_workflow, id="email_validation_workflow")
# Build the main workflow
workflow = (
WorkflowBuilder()
.set_start_executor(orchestrator)
.add_edge(orchestrator, validation_workflow_executor)
.add_edge(validation_workflow_executor, orchestrator)
.add_edge(orchestrator, email_delivery)
.register_executor(
lambda: SmartEmailOrchestrator(id="smart_email_orchestrator", approved_domains=approved_domains),
name="smart_email_orchestrator",
)
.register_executor(lambda: EmailDelivery(id="email_delivery"), name="email_delivery")
.register_executor(
lambda: WorkflowExecutor(build_email_address_validation_workflow(), id="email_validation_workflow"),
name="email_validation_workflow",
)
.set_start_executor("smart_email_orchestrator")
.add_edge("smart_email_orchestrator", "email_validation_workflow")
.add_edge("email_validation_workflow", "smart_email_orchestrator")
.add_edge("smart_email_orchestrator", "email_delivery")
.build()
)
@@ -5,9 +5,9 @@ import os
from typing import Any
from agent_framework import ( # Core chat primitives used to build requests
AgentExecutor, # Wraps an LLM agent that can be invoked inside a workflow
AgentExecutorRequest, # Input message bundle for an AgentExecutor
AgentExecutorResponse, # Output from an AgentExecutor
AgentExecutorResponse,
ChatAgent, # Output from an AgentExecutor
ChatMessage,
Role,
WorkflowBuilder, # Fluent builder for wiring executors and edges
@@ -128,38 +128,35 @@ async def to_email_assistant_request(
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
async def main() -> None:
# Create agents
def create_spam_detector_agent() -> ChatAgent:
"""Helper to create a spam detection agent."""
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Agent 1. Classifies spam and returns a DetectionResult object.
# response_format enforces that the LLM returns parsable JSON for the Pydantic model.
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
response_format=DetectionResult,
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
id="spam_detection_agent",
name="spam_detection_agent",
response_format=DetectionResult,
)
# Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input may be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
response_format=EmailResponse,
def create_email_assistant_agent() -> ChatAgent:
"""Helper to create an email assistant agent."""
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input may be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
id="email_assistant_agent",
name="email_assistant_agent",
response_format=EmailResponse,
)
async def main() -> None:
# Build the workflow graph.
# Start at the spam detector.
# If not spam, hop to a transformer that creates a new AgentExecutorRequest,
@@ -167,13 +164,18 @@ async def main() -> None:
# If spam, go directly to the spam handler and finalize.
workflow = (
WorkflowBuilder()
.set_start_executor(spam_detection_agent)
.register_agent(create_spam_detector_agent, name="spam_detection_agent")
.register_agent(create_email_assistant_agent, name="email_assistant_agent")
.register_executor(lambda: to_email_assistant_request, name="to_email_assistant_request")
.register_executor(lambda: handle_email_response, name="send_email")
.register_executor(lambda: handle_spam_classifier_response, name="handle_spam")
.set_start_executor("spam_detection_agent")
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
.add_edge(to_email_assistant_request, email_assistant_agent)
.add_edge(email_assistant_agent, handle_email_response)
.add_edge("spam_detection_agent", "to_email_assistant_request", condition=get_condition(False))
.add_edge("to_email_assistant_request", "email_assistant_agent")
.add_edge("email_assistant_agent", "send_email")
# Spam path: send to spam handler
.add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
.add_edge("spam_detection_agent", "handle_spam", condition=get_condition(True))
.build()
)
@@ -9,9 +9,9 @@ from typing import Literal
from uuid import uuid4
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatAgent,
ChatMessage,
Role,
WorkflowBuilder,
@@ -181,40 +181,38 @@ async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never,
await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))
def create_email_analysis_agent() -> ChatAgent:
"""Creates the email analysis agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
name="email_analysis_agent",
response_format=AnalysisResultAgent,
)
def create_email_assistant_agent() -> ChatAgent:
"""Creates the email assistant agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are an email assistant that helps users draft responses to emails with professionalism."),
name="email_assistant_agent",
response_format=EmailResponse,
)
def create_email_summary_agent() -> ChatAgent:
"""Creates the email summary agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are an assistant that helps users summarize emails."),
name="email_summary_agent",
response_format=EmailSummaryModel,
)
async def main() -> None:
# Agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
email_analysis_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=AnalysisResultAgent,
),
id="email_analysis_agent",
)
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
email_summary_agent = AgentExecutor(
chat_client.create_agent(
instructions=("You are an assistant that helps users summarize emails."),
response_format=EmailSummaryModel,
),
id="email_summary_agent",
)
# Build the workflow
def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
# Order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
@@ -228,24 +226,39 @@ async def main() -> None:
return targets
return [handle_uncertain_id]
workflow = (
workflow_builder = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, email_analysis_agent)
.add_edge(email_analysis_agent, to_analysis_result)
.register_agent(create_email_analysis_agent, name="email_analysis_agent")
.register_agent(create_email_assistant_agent, name="email_assistant_agent")
.register_agent(create_email_summary_agent, name="email_summary_agent")
.register_executor(lambda: store_email, name="store_email")
.register_executor(lambda: to_analysis_result, name="to_analysis_result")
.register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant")
.register_executor(lambda: finalize_and_send, name="finalize_and_send")
.register_executor(lambda: summarize_email, name="summarize_email")
.register_executor(lambda: merge_summary, name="merge_summary")
.register_executor(lambda: handle_spam, name="handle_spam")
.register_executor(lambda: handle_uncertain, name="handle_uncertain")
.register_executor(lambda: database_access, name="database_access")
)
workflow = (
workflow_builder.set_start_executor("store_email")
.add_edge("store_email", "email_analysis_agent")
.add_edge("email_analysis_agent", "to_analysis_result")
.add_multi_selection_edge_group(
to_analysis_result,
[handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
"to_analysis_result",
["handle_spam", "submit_to_email_assistant", "summarize_email", "handle_uncertain"],
selection_func=select_targets,
)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.add_edge(summarize_email, email_summary_agent)
.add_edge(email_summary_agent, merge_summary)
.add_edge("submit_to_email_assistant", "email_assistant_agent")
.add_edge("email_assistant_agent", "finalize_and_send")
.add_edge("summarize_email", "email_summary_agent")
.add_edge("email_summary_agent", "merge_summary")
# Save to DB if short (no summary path)
.add_edge(to_analysis_result, database_access, condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD)
.add_edge("to_analysis_result", "database_access", condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD)
# Save to DB with summary when long
.add_edge(merge_summary, database_access)
.add_edge("merge_summary", "database_access")
.build()
)
@@ -61,20 +61,18 @@ class ReverseTextExecutor(Executor):
async def main() -> None:
"""Build a two step sequential workflow and run it with streaming to observe events."""
# Step 1: Create executor instances.
upper_case_executor = UpperCaseExecutor(id="upper_case_executor")
reverse_text_executor = ReverseTextExecutor(id="reverse_text_executor")
# Step 2: Build the workflow graph.
# Step 1: Build the workflow graph.
# Order matters. We connect upper_case_executor -> reverse_text_executor and set the start.
workflow = (
WorkflowBuilder()
.add_edge(upper_case_executor, reverse_text_executor)
.set_start_executor(upper_case_executor)
.register_executor(lambda: UpperCaseExecutor(id="upper_case_executor"), name="upper_case_executor")
.register_executor(lambda: ReverseTextExecutor(id="reverse_text_executor"), name="reverse_text_executor")
.add_edge("upper_case_executor", "reverse_text_executor")
.set_start_executor("upper_case_executor")
.build()
)
# Step 3: Stream events for a single input.
# Step 2: Stream events for a single input.
# The stream will include executor invoke and completion events, plus workflow outputs.
outputs: list[str] = []
async for event in workflow.run_stream("hello world"):
@@ -52,11 +52,18 @@ async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
async def main():
"""Build a two-step sequential workflow and run it with streaming to observe events."""
# Step 2: Build the workflow with the defined edges.
# Step 1: Build the workflow with the defined edges.
# Order matters. upper_case_executor runs first, then reverse_text_executor.
workflow = WorkflowBuilder().add_edge(to_upper_case, reverse_text).set_start_executor(to_upper_case).build()
workflow = (
WorkflowBuilder()
.register_executor(lambda: to_upper_case, name="upper_case_executor")
.register_executor(lambda: reverse_text, name="reverse_text_executor")
.add_edge("upper_case_executor", "reverse_text_executor")
.set_start_executor("upper_case_executor")
.build()
)
# Step 3: Run the workflow and stream events in real time.
# Step 2: Run the workflow and stream events in real time.
async for event in workflow.run_stream("hello world"):
# You will see executor invoke and completion events as the workflow progresses.
print(f"Event: {event}")
@@ -4,16 +4,15 @@ import asyncio
from enum import Enum
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatAgent,
ChatMessage,
Executor,
ExecutorCompletedEvent,
Role,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
handler,
)
from agent_framework.azure import AzureOpenAIChatClient
@@ -49,9 +48,9 @@ class NumberSignal(Enum):
class GuessNumberExecutor(Executor):
"""An executor that guesses a number."""
def __init__(self, bound: tuple[int, int], id: str | None = None):
def __init__(self, bound: tuple[int, int], id: str):
"""Initialize the executor with a target number."""
super().__init__(id=id or "guess_number")
super().__init__(id=id)
self._lower = bound[0]
self._upper = bound[1]
@@ -116,43 +115,37 @@ class ParseJudgeResponse(Executor):
await ctx.send_message(NumberSignal.BELOW)
def create_judge_agent() -> ChatAgent:
"""Create a judge agent that evaluates guesses."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You strictly respond with one of: MATCHED, ABOVE, BELOW based on the given target and guess."),
name="judge_agent",
)
async def main():
"""Main function to run the workflow."""
# Step 1: Create the executors.
guess_number_executor = GuessNumberExecutor((1, 100))
# Agent judge setup
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
judge_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You strictly respond with one of: MATCHED, ABOVE, BELOW based on the given target and guess."
)
),
id="judge_agent",
)
submit_to_judge = SubmitToJudgeAgent(judge_agent_id=judge_agent.id, target=30, id="submit_judge")
parse_judge = ParseJudgeResponse(id="parse_judge")
# Step 2: Build the workflow with the defined edges.
# Step 1: Build the workflow with the defined edges.
# This time we are creating a loop in the workflow.
workflow = (
WorkflowBuilder()
.add_edge(guess_number_executor, submit_to_judge)
.add_edge(submit_to_judge, judge_agent)
.add_edge(judge_agent, parse_judge)
.add_edge(parse_judge, guess_number_executor)
.set_start_executor(guess_number_executor)
.register_executor(lambda: GuessNumberExecutor((1, 100), "guess_number"), name="guess_number")
.register_agent(create_judge_agent, name="judge_agent")
.register_executor(lambda: SubmitToJudgeAgent(judge_agent_id="judge_agent", target=30), name="submit_judge")
.register_executor(lambda: ParseJudgeResponse(id="parse_judge"), name="parse_judge")
.add_edge("guess_number", "submit_judge")
.add_edge("submit_judge", "judge_agent")
.add_edge("judge_agent", "parse_judge")
.add_edge("parse_judge", "guess_number")
.set_start_executor("guess_number")
.build()
)
# Step 3: Run the workflow and print the events.
# Step 2: Run the workflow and print the events.
iterations = 0
async for event in workflow.run_stream(NumberSignal.INIT):
if isinstance(event, ExecutorCompletedEvent) and event.executor_id == guess_number_executor.id:
if isinstance(event, ExecutorCompletedEvent) and event.executor_id == "guess_number":
iterations += 1
elif isinstance(event, WorkflowOutputEvent):
print(f"Final result: {event.data}")
print(f"Event: {event}")
# This is essentially a binary search, so the number of iterations should be logarithmic.
@@ -7,10 +7,10 @@ from typing import Any, Literal
from uuid import uuid4
from agent_framework import ( # Core chat primitives used to form LLM requests
AgentExecutor, # Wraps an agent so it can run inside a workflow
AgentExecutorRequest, # Message bundle sent to an AgentExecutor
AgentExecutorResponse, # Result returned by an AgentExecutor
Case, # Case entry for a switch-case edge group
Case,
ChatAgent, # Case entry for a switch-case edge group
ChatMessage,
Default, # Default branch when no cases match
Role,
@@ -152,51 +152,56 @@ async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Neve
raise RuntimeError("This executor should only handle Uncertain messages.")
def create_spam_detection_agent() -> ChatAgent:
"""Create and return the spam detection agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Be less confident in your assessments. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
name="spam_detection_agent",
response_format=DetectionResultAgent,
)
def create_email_assistant_agent() -> ChatAgent:
"""Create and return the email assistant agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are an email assistant that helps users draft responses to emails with professionalism."),
name="email_assistant_agent",
response_format=EmailResponse,
)
async def main():
"""Main function to run the workflow."""
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Agents. response_format enforces that the LLM returns JSON that Pydantic can validate.
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Be less confident in your assessments. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=DetectionResultAgent,
),
id="spam_detection_agent",
)
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
# Build workflow: store -> detection agent -> to_detection_result -> switch (NotSpam or Spam or Default).
# The switch-case group evaluates cases in order, then falls back to Default when none match.
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.register_agent(create_spam_detection_agent, name="spam_detection_agent")
.register_agent(create_email_assistant_agent, name="email_assistant_agent")
.register_executor(lambda: store_email, name="store_email")
.register_executor(lambda: to_detection_result, name="to_detection_result")
.register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant")
.register_executor(lambda: finalize_and_send, name="finalize_and_send")
.register_executor(lambda: handle_spam, name="handle_spam")
.register_executor(lambda: handle_uncertain, name="handle_uncertain")
.set_start_executor("store_email")
.add_edge("store_email", "spam_detection_agent")
.add_edge("spam_detection_agent", "to_detection_result")
.add_switch_case_edge_group(
to_detection_result,
"to_detection_result",
[
Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
Case(condition=get_case("Spam"), target=handle_spam),
Default(target=handle_uncertain),
Case(condition=get_case("NotSpam"), target="submit_to_email_assistant"),
Case(condition=get_case("Spam"), target="handle_spam"),
Default(target="handle_uncertain"),
],
)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.add_edge("submit_to_email_assistant", "email_assistant_agent")
.add_edge("email_assistant_agent", "finalize_and_send")
.build()
)
@@ -7,6 +7,7 @@ from typing import Annotated, Never
from agent_framework import (
AgentExecutorResponse,
ChatAgent,
ChatMessage,
Executor,
FunctionApprovalRequestContent,
@@ -210,10 +211,9 @@ async def conclude_workflow(
await ctx.yield_output(email_response.agent_run_response.text)
async def main() -> None:
# Create the agent and executors
chat_client = OpenAIChatClient()
email_writer = chat_client.create_agent(
def create_email_writer_agent() -> ChatAgent:
"""Create the Email Writer agent with tools that require approval."""
return OpenAIChatClient().create_agent(
name="Email Writer",
instructions=("You are an excellent email assistant. You respond to incoming emails."),
# tools with `approval_mode="always_require"` will trigger approval requests
@@ -225,14 +225,21 @@ async def main() -> None:
get_my_information,
],
)
email_preprocessor = EmailPreprocessor(special_email_addresses={"mike@contoso.com"})
async def main() -> None:
# Build the workflow
workflow = (
WorkflowBuilder()
.set_start_executor(email_preprocessor)
.add_edge(email_preprocessor, email_writer)
.add_edge(email_writer, conclude_workflow)
.register_agent(create_email_writer_agent, name="email_writer")
.register_executor(
lambda: EmailPreprocessor(special_email_addresses={"mike@contoso.com"}),
name="email_preprocessor",
)
.register_executor(lambda: conclude_workflow, name="conclude_workflow")
.set_start_executor("email_preprocessor")
.add_edge("email_preprocessor", "email_writer")
.add_edge("email_writer", "conclude_workflow")
.build()
)
@@ -5,7 +5,8 @@ from dataclasses import dataclass
from agent_framework import (
AgentExecutorRequest, # Message bundle sent to an AgentExecutor
AgentExecutorResponse, # Result returned by an AgentExecutor
AgentExecutorResponse,
ChatAgent, # Result returned by an AgentExecutor
ChatMessage, # Chat message structure
Executor, # Base class for workflow executors
RequestInfoEvent, # Event emitted when human input is requested
@@ -142,11 +143,9 @@ class TurnManager(Executor):
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
async def main() -> None:
# Create the chat agent and wrap it in an AgentExecutor.
# response_format enforces that the model produces JSON compatible with GuessOutput.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.create_agent(
def create_guessing_agent() -> ChatAgent:
"""Create the guessing agent with instructions to guess a number between 1 and 10."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
name="GuessingAgent",
instructions=(
"You guess a number between 1 and 10. "
@@ -154,19 +153,22 @@ async def main() -> None:
'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
"No explanations or additional text."
),
# Structured output enforced via Pydantic model.
# response_format enforces that the model produces JSON compatible with GuessOutput.
response_format=GuessOutput,
)
# TurnManager coordinates and gathers human replies while AgentExecutor runs the model.
turn_manager = TurnManager(id="turn_manager")
async def main() -> None:
"""Run the human-in-the-loop guessing game workflow."""
# Build a simple loop: TurnManager <-> AgentExecutor.
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, agent) # Ask agent to make/adjust a guess
.add_edge(agent, turn_manager) # Agent's response comes back to coordinator
.register_agent(create_guessing_agent, name="guessing_agent")
.register_executor(lambda: TurnManager(id="turn_manager"), name="turn_manager")
.set_start_executor("turn_manager")
.add_edge("turn_manager", "guessing_agent") # Ask agent to make/adjust a guess
.add_edge("guessing_agent", "turn_manager") # Agent's response comes back to coordinator
).build()
# Human in the loop run: alternate between invoking the workflow and supplying collected responses.
@@ -72,22 +72,20 @@ class Aggregator(Executor):
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
# 1) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.register_executor(lambda: Dispatcher(id="dispatcher"), name="dispatcher")
.register_executor(lambda: Average(id="average"), name="average")
.register_executor(lambda: Sum(id="summation"), name="summation")
.register_executor(lambda: Aggregator(id="aggregator"), name="aggregator")
.set_start_executor("dispatcher")
.add_fan_out_edges("dispatcher", ["average", "summation"])
.add_fan_in_edges(["average", "summation"], "aggregator")
.build()
)
# 3) Run the workflow
# 2) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
@@ -4,10 +4,10 @@ import asyncio
from dataclasses import dataclass
from agent_framework import ( # Core chat primitives to build LLM requests
AgentExecutor, # Wraps an LLM agent for use inside a workflow
AgentExecutorRequest, # The message bundle sent to an AgentExecutor
AgentExecutorResponse, # The structured result returned by an AgentExecutor
AgentRunEvent, # Tracing event for agent execution steps
AgentRunEvent,
ChatAgent, # Tracing event for agent execution steps
ChatMessage, # Chat message structure
Executor, # Base class for custom Python executors
Role, # Enum of chat roles (user, assistant, system)
@@ -16,7 +16,7 @@ from agent_framework import ( # Core chat primitives to build LLM requests
WorkflowOutputEvent, # Event emitted when workflow yields output
handler, # Decorator to mark an Executor method as invokable
)
from agent_framework.azure import AzureOpenAIChatClient # Client wrapper for Azure OpenAI chat models
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential # Uses your az CLI login for credentials
from typing_extensions import Never
@@ -42,20 +42,11 @@ Prerequisites:
class DispatchToExperts(Executor):
"""Dispatches the incoming prompt to all expert agent executors for parallel processing (fan out)."""
def __init__(self, expert_ids: list[str], id: str | None = None):
super().__init__(id=id or "dispatch_to_experts")
self._expert_ids = expert_ids
@handler
async def dispatch(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
# Wrap the incoming prompt as a user message for each expert and request a response.
# Each send_message targets a different AgentExecutor by id so that branches run in parallel.
initial_message = ChatMessage(Role.USER, text=prompt)
for expert_id in self._expert_ids:
await ctx.send_message(
AgentExecutorRequest(messages=[initial_message], should_respond=True),
target_id=expert_id,
)
await ctx.send_message(AgentExecutorRequest(messages=[initial_message], should_respond=True))
@dataclass
@@ -70,10 +61,6 @@ class AggregatedInsights:
class AggregateInsights(Executor):
"""Aggregates expert agent responses into a single consolidated result (fan in)."""
def __init__(self, expert_ids: list[str], id: str | None = None):
super().__init__(id=id or "aggregate_insights")
self._expert_ids = expert_ids
@handler
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
# Map responses to text by executor id for a simple, predictable demo.
@@ -104,49 +91,51 @@ class AggregateInsights(Executor):
await ctx.yield_output(consolidated)
def create_researcher_agent() -> ChatAgent:
"""Creates a research domain expert agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
def create_marketer_agent() -> ChatAgent:
"""Creates a marketing domain expert agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
def create_legal_agent() -> ChatAgent:
"""Creates a legal/compliance domain expert agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
async def main() -> None:
# 1) Create agent executors for domain experts
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
researcher = AgentExecutor(
chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
),
id="researcher",
)
marketer = AgentExecutor(
chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
),
id="marketer",
)
legal = AgentExecutor(
chat_client.create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
),
id="legal",
)
expert_ids = [researcher.id, marketer.id, legal.id]
dispatcher = DispatchToExperts(expert_ids=expert_ids, id="dispatcher")
aggregator = AggregateInsights(expert_ids=expert_ids, id="aggregator")
# 2) Build a simple fan out and fan in workflow
# 1) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [researcher, marketer, legal]) # Parallel branches
.add_fan_in_edges([researcher, marketer, legal], aggregator) # Join at the aggregator
.register_agent(create_researcher_agent, name="researcher")
.register_agent(create_marketer_agent, name="marketer")
.register_agent(create_legal_agent, name="legal")
.register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher")
.register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator")
.set_start_executor("dispatcher")
.add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"]) # Parallel branches
.add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator") # Join at the aggregator
.build()
)
@@ -259,27 +259,50 @@ class CompletionExecutor(Executor):
async def main():
"""Construct the map reduce workflow, visualize it, then run it over a sample file."""
# Step 1: Create the executors.
map_operations = [Map(id=f"map_executor_{i}") for i in range(3)]
split_operation = Split(
[map_operation.id for map_operation in map_operations],
id="split_data_executor",
# Step 1: Create the workflow builder and register executors.
workflow_builder = (
WorkflowBuilder()
.register_executor(lambda: Map(id="map_executor_0"), name="map_executor_0")
.register_executor(lambda: Map(id="map_executor_1"), name="map_executor_1")
.register_executor(lambda: Map(id="map_executor_2"), name="map_executor_2")
.register_executor(
lambda: Split(["map_executor_0", "map_executor_1", "map_executor_2"], id="split_data_executor"),
name="split_data_executor",
)
.register_executor(lambda: Reduce(id="reduce_executor_0"), name="reduce_executor_0")
.register_executor(lambda: Reduce(id="reduce_executor_1"), name="reduce_executor_1")
.register_executor(lambda: Reduce(id="reduce_executor_2"), name="reduce_executor_2")
.register_executor(lambda: Reduce(id="reduce_executor_3"), name="reduce_executor_3")
.register_executor(
lambda: Shuffle(
["reduce_executor_0", "reduce_executor_1", "reduce_executor_2", "reduce_executor_3"],
id="shuffle_executor",
),
name="shuffle_executor",
)
.register_executor(lambda: CompletionExecutor(id="completion_executor"), name="completion_executor")
)
reduce_operations = [Reduce(id=f"reduce_executor_{i}") for i in range(4)]
shuffle_operation = Shuffle(
[reduce_operation.id for reduce_operation in reduce_operations],
id="shuffle_executor",
)
completion_executor = CompletionExecutor(id="completion_executor")
# Step 2: Build the workflow graph using fan out and fan in edges.
workflow = (
WorkflowBuilder()
.set_start_executor(split_operation)
.add_fan_out_edges(split_operation, map_operations) # Split -> many mappers
.add_fan_in_edges(map_operations, shuffle_operation) # All mappers -> shuffle
.add_fan_out_edges(shuffle_operation, reduce_operations) # Shuffle -> many reducers
.add_fan_in_edges(reduce_operations, completion_executor) # All reducers -> completion
workflow_builder.set_start_executor("split_data_executor")
.add_fan_out_edges(
"split_data_executor",
["map_executor_0", "map_executor_1", "map_executor_2"],
) # Split -> many mappers
.add_fan_in_edges(
["map_executor_0", "map_executor_1", "map_executor_2"],
"shuffle_executor",
) # All mappers -> shuffle
.add_fan_out_edges(
"shuffle_executor",
["reduce_executor_0", "reduce_executor_1", "reduce_executor_2", "reduce_executor_3"],
) # Shuffle -> many reducers
.add_fan_in_edges(
["reduce_executor_0", "reduce_executor_1", "reduce_executor_2", "reduce_executor_3"],
"completion_executor",
) # All reducers -> completion
.build()
)
@@ -1,14 +1,15 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from uuid import uuid4
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
ChatAgent,
ChatMessage,
Role,
WorkflowBuilder,
@@ -154,28 +155,35 @@ async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, st
raise RuntimeError("This executor should only handle spam messages.")
async def main() -> None:
# Create chat client and agents. response_format enforces structured JSON from each agent.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
spam_detection_agent = chat_client.create_agent(
def create_spam_detection_agent() -> ChatAgent:
"""Creates a spam detection agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool) and reason (string)."
),
response_format=DetectionResultAgent,
# response_format enforces structured JSON from each agent.
name="spam_detection_agent",
)
email_assistant_agent = chat_client.create_agent(
def create_email_assistant_agent() -> ChatAgent:
"""Creates an email assistant agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism. "
"Return JSON with a single field 'response' containing the drafted reply."
),
# response_format enforces structured JSON from each agent.
response_format=EmailResponse,
name="email_assistant_agent",
)
async def main() -> None:
"""Build and run the shared state with agents and conditional routing workflow."""
# Build the workflow graph with conditional edges.
# Flow:
# store_email -> spam_detection_agent -> to_detection_result -> branch:
@@ -183,25 +191,28 @@ async def main() -> None:
# True -> handle_spam
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_edge(to_detection_result, submit_to_email_assistant, condition=get_condition(False))
.add_edge(to_detection_result, handle_spam, condition=get_condition(True))
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.register_agent(create_spam_detection_agent, name="spam_detection_agent")
.register_agent(create_email_assistant_agent, name="email_assistant_agent")
.register_executor(lambda: store_email, name="store_email")
.register_executor(lambda: to_detection_result, name="to_detection_result")
.register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant")
.register_executor(lambda: finalize_and_send, name="finalize_and_send")
.register_executor(lambda: handle_spam, name="handle_spam")
.set_start_executor("store_email")
.add_edge("store_email", "spam_detection_agent")
.add_edge("spam_detection_agent", "to_detection_result")
.add_edge("to_detection_result", "submit_to_email_assistant", condition=get_condition(False))
.add_edge("to_detection_result", "handle_spam", condition=get_condition(True))
.add_edge("submit_to_email_assistant", "email_assistant_agent")
.add_edge("email_assistant_agent", "finalize_and_send")
.build()
)
# Read an email from resources/spam.txt if available; otherwise use a default sample.
resources_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
"resources",
"spam.txt",
)
if os.path.exists(resources_path):
with open(resources_path, encoding="utf-8") as f: # noqa: ASYNC230
email = f.read()
current_file = Path(__file__)
resources_path = current_file.parent.parent / "resources" / "spam.txt"
if resources_path.exists():
email = resources_path.read_text(encoding="utf-8")
else:
print("Unable to find resource file, using default text.")
email = "You are a WINNER! Click here for a free lottery offer!!!"
@@ -4,10 +4,10 @@ import asyncio
from dataclasses import dataclass
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
AgentRunEvent,
ChatAgent,
ChatMessage,
Executor,
Role,
@@ -39,19 +39,11 @@ Prerequisites:
class DispatchToExperts(Executor):
"""Dispatches the incoming prompt to all expert agent executors (fan-out)."""
def __init__(self, expert_ids: list[str], id: str | None = None):
super().__init__(id=id or "dispatch_to_experts")
self._expert_ids = expert_ids
@handler
async def dispatch(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
# Wrap the incoming prompt as a user message for each expert and request a response.
initial_message = ChatMessage(Role.USER, text=prompt)
for expert_id in self._expert_ids:
await ctx.send_message(
AgentExecutorRequest(messages=[initial_message], should_respond=True),
target_id=expert_id,
)
await ctx.send_message(AgentExecutorRequest(messages=[initial_message], should_respond=True))
@dataclass
@@ -66,10 +58,6 @@ class AggregatedInsights:
class AggregateInsights(Executor):
"""Aggregates expert agent responses into a single consolidated result (fan-in)."""
def __init__(self, expert_ids: list[str], id: str | None = None):
super().__init__(id=id or "aggregate_insights")
self._expert_ids = expert_ids
@handler
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
# Map responses to text by executor id for a simple, predictable demo.
@@ -100,53 +88,57 @@ class AggregateInsights(Executor):
await ctx.yield_output(consolidated)
def create_researcher_agent() -> ChatAgent:
"""Creates a research domain expert agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
def create_marketer_agent() -> ChatAgent:
"""Creates a marketing domain expert agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
def create_legal_agent() -> ChatAgent:
"""Creates a legal domain expert agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
async def main() -> None:
# 1) Create agent executors for domain experts
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
"""Build and run the concurrent workflow with visualization."""
researcher = AgentExecutor(
chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
),
id="researcher",
)
marketer = AgentExecutor(
chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
),
id="marketer",
)
legal = AgentExecutor(
chat_client.create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
),
id="legal",
)
expert_ids = [researcher.id, marketer.id, legal.id]
dispatcher = DispatchToExperts(expert_ids=expert_ids, id="dispatcher")
aggregator = AggregateInsights(expert_ids=expert_ids, id="aggregator")
# 2) Build a simple fan-out/fan-in workflow
# 1) Build a simple fan-out/fan-in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [researcher, marketer, legal])
.add_fan_in_edges([researcher, marketer, legal], aggregator)
.register_agent(create_researcher_agent, name="researcher")
.register_agent(create_marketer_agent, name="marketer")
.register_agent(create_legal_agent, name="legal")
.register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher")
.register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator")
.set_start_executor("dispatcher")
.add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"])
.add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator")
.build()
)
# 2.5) Generate workflow visualization
# 1.5) Generate workflow visualization
print("Generating workflow visualization...")
viz = WorkflowViz(workflow)
# Print out the mermaid string.
@@ -162,7 +154,7 @@ async def main() -> None:
svg_file = viz.export(format="svg")
print(f"SVG file saved to: {svg_file}")
# 3) Run with a single prompt
# 2) Run with a single prompt
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, AgentRunEvent):
# Show which agent ran and what step completed.