Python: Workflow add option to visualize internal executors (#2917)

* Workflow add option to visualize internal executors

* Address Copilot comments
This commit is contained in:
Tao Chen
2025-12-18 06:04:03 -08:00
committed by GitHub
Unverified
parent e5c11d38d6
commit b4f2709b6d
2 changed files with 145 additions and 65 deletions
@@ -6,14 +6,12 @@ from dataclasses import dataclass
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
AgentRunEvent,
ChatAgent,
ChatMessage,
Executor,
Role,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
WorkflowViz,
handler,
)
@@ -124,7 +122,7 @@ def create_legal_agent() -> ChatAgent:
async def main() -> None:
"""Build and run the concurrent workflow with visualization."""
# 1) Build a simple fan-out/fan-in workflow
# Build a simple fan-out/fan-in workflow
workflow = (
WorkflowBuilder()
.register_agent(create_researcher_agent, name="researcher")
@@ -138,31 +136,22 @@ async def main() -> None:
.build()
)
# 1.5) Generate workflow visualization
# Generate workflow visualization
print("Generating workflow visualization...")
viz = WorkflowViz(workflow)
# Print out the mermaid string.
print("Mermaid string: \n=======")
print(viz.to_mermaid())
print("=======")
# Print out the DiGraph string.
# Print out the DiGraph string with internal executors.
print("DiGraph string: \n=======")
print(viz.to_digraph())
print(viz.to_digraph(include_internal_executors=True))
print("=======")
# Export the DiGraph visualization as SVG.
svg_file = viz.export(format="svg")
print(f"SVG file saved to: {svg_file}")
# 2) Run with a single prompt
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, AgentRunEvent):
# Show which agent ran and what step completed.
print(event)
elif isinstance(event, WorkflowOutputEvent):
print("===== Final Aggregated Output =====")
print(event.data)
if __name__ == "__main__":
asyncio.run(main())