Python: WorkflowViz to support sub workflows (#571)

* add visualization for sub workflows

* add visualization to subworkflow samples

* Update python/packages/workflow/agent_framework_workflow/_viz.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update python/packages/workflow/agent_framework_workflow/_viz.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* update

* remove changes to samples

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Chris <66376200+crickman@users.noreply.github.com>
Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com>
This commit is contained in:
Eric Zhu
2025-09-03 19:25:47 -07:00
committed by GitHub
Unverified
parent 55124150eb
commit 20ddf7cb31
2 changed files with 277 additions and 72 deletions
@@ -1,16 +1,19 @@
# Copyright (c) Microsoft. All rights reserved.
"""Workflow visualization module using graphviz."""
import hashlib
import re
import tempfile
import uuid
from pathlib import Path
from typing import Literal
from ._edge import FanInEdgeGroup
from ._workflow import Workflow
# Import of WorkflowExecutor is performed lazily inside methods to avoid cycles
"""Workflow visualization module using graphviz."""
class WorkflowViz:
"""A class for visualizing workflows using graphviz."""
@@ -35,34 +38,11 @@ class WorkflowViz:
lines.append(" edge [color=black, arrowhead=vee];")
lines.append("")
# Add start executor with special styling
start_executor_id = self._workflow.start_executor_id
lines.append(f' "{start_executor_id}" [fillcolor=lightgreen, label="{start_executor_id}\\n(Start)"];')
# Emit the top-level workflow nodes/edges
self._emit_workflow_digraph(self._workflow, lines, indent=" ")
# Add all other executors
for executor_id in self._workflow.executors:
if executor_id != start_executor_id:
lines.append(f' "{executor_id}" [label="{executor_id}"];')
# Build shared structures
fan_in_nodes = self._compute_fan_in_descriptors() # (node_id, sources, target)
normal_edges = self._compute_normal_edges() # (src, tgt, is_conditional)
if fan_in_nodes:
lines.append("")
for node_id, _, _ in fan_in_nodes:
lines.append(f' "{node_id}" [shape=ellipse, fillcolor=lightgoldenrod, label="fan-in"];')
# Route fan-in via intermediate nodes
for node_id, sources, target in fan_in_nodes:
for src in sources:
lines.append(f' "{src}" -> "{node_id}";')
lines.append(f' "{node_id}" -> "{target}";')
# Draw normal edges
for src, tgt, is_cond in normal_edges:
edge_attr = ' [style=dashed, label="conditional"]' if is_cond else ""
lines.append(f' "{src}" -> "{tgt}"{edge_attr};')
# Emit sub-workflows hosted by WorkflowExecutor as nested clusters
self._emit_sub_workflows_digraph(self._workflow, lines, indent=" ")
lines.append("}")
return "\n".join(lines)
@@ -178,46 +158,11 @@ class WorkflowViz:
lines: list[str] = ["flowchart TD"]
# Nodes
start_executor_id = self._workflow.start_executor_id
start_id = _san(start_executor_id)
# End statements with semicolons for better compatibility and quote labels for special chars
lines.append(f' {start_id}["{start_executor_id} (Start)"];')
# Emit top-level workflow
self._emit_workflow_mermaid(self._workflow, lines, indent=" ")
for executor_id in self._workflow.executors:
if executor_id == start_executor_id:
continue
eid = _san(executor_id)
lines.append(f' {eid}["{executor_id}"];')
# Build shared structures
fan_in_nodes_dot = self._compute_fan_in_descriptors() # uses DOT node ids
# Convert DOT-style node ids to Mermaid-safe ones
fan_in_nodes: list[tuple[str, list[str], str]] = []
for dot_node_id, sources, target in fan_in_nodes_dot:
digest = dot_node_id.split("::")[-1]
fan_node_id = f"fan_in__{_san(target)}__{digest}"
fan_in_nodes.append((fan_node_id, sources, target))
for fan_node_id, _, _ in fan_in_nodes:
# Use double parentheses to make it circular in Mermaid
# (Keep this line without a trailing semicolon to match existing tests.)
lines.append(f" {fan_node_id}((fan-in))")
# Fan-in edges
for fan_node_id, sources, target in fan_in_nodes:
for s in sources:
lines.append(f" {_san(s)} --> {fan_node_id};")
lines.append(f" {fan_node_id} --> {_san(target)};")
# Normal edges
for src, tgt, is_cond in self._compute_normal_edges():
s = _san(src)
t = _san(tgt)
if is_cond:
lines.append(f" {s} -. conditional .-> {t};")
else:
lines.append(f" {s} --> {t};")
# Emit sub-workflows as Mermaid subgraphs
self._emit_sub_workflows_mermaid(self._workflow, lines, indent=" ")
return "\n".join(lines)
@@ -227,13 +172,14 @@ class WorkflowViz:
sources_sorted = sorted(sources)
return hashlib.sha256((target + "|" + "|".join(sources_sorted)).encode("utf-8")).hexdigest()[:8]
def _compute_fan_in_descriptors(self) -> list[tuple[str, list[str], str]]:
def _compute_fan_in_descriptors(self, wf: Workflow | None = None) -> list[tuple[str, list[str], str]]:
"""Return list of (node_id, sources, target) for fan-in groups.
node_id is DOT-oriented: fan_in::target::digest
"""
result: list[tuple[str, list[str], str]] = []
for group in self._workflow.edge_groups:
workflow = wf or self._workflow
for group in workflow.edge_groups:
if isinstance(group, FanInEdgeGroup):
target = group.target_executor_ids[0]
sources = list(group.source_executor_ids)
@@ -242,10 +188,11 @@ class WorkflowViz:
result.append((node_id, sorted(sources), target))
return result
def _compute_normal_edges(self) -> list[tuple[str, str, bool]]:
def _compute_normal_edges(self, wf: Workflow | None = None) -> list[tuple[str, str, bool]]:
"""Return list of (source_id, target_id, is_conditional) for non-fan-in groups."""
edges: list[tuple[str, str, bool]] = []
for group in self._workflow.edge_groups:
workflow = wf or self._workflow
for group in workflow.edge_groups:
if isinstance(group, FanInEdgeGroup):
continue
for edge in group.edges:
@@ -254,3 +201,142 @@ class WorkflowViz:
return edges
# endregion
# region Internal emitters (DOT)
def _emit_workflow_digraph(self, wf: Workflow, lines: list[str], indent: str, ns: str | None = None) -> None:
"""Emit DOT nodes/edges for the given workflow.
If ns (namespace) is provided, node ids are prefixed with f"{ns}/" for uniqueness,
but labels remain the original executor ids.
"""
def map_id(x: str) -> str:
return f"{ns}/{x}" if ns else x
# Nodes
start_executor_id = wf.start_executor_id
lines.append(
f'{indent}"{map_id(start_executor_id)}" [fillcolor=lightgreen, label="{start_executor_id}\\n(Start)"];'
)
for executor_id in wf.executors:
if executor_id != start_executor_id:
lines.append(f'{indent}"{map_id(executor_id)}" [label="{executor_id}"];')
# Fan-in nodes
fan_in_nodes = self._compute_fan_in_descriptors(wf)
if fan_in_nodes:
lines.append("")
for node_id, _, _ in fan_in_nodes:
lines.append(f'{indent}"{map_id(node_id)}" [shape=ellipse, fillcolor=lightgoldenrod, label="fan-in"];')
# Fan-in edges
for node_id, sources, target in fan_in_nodes:
for src in sources:
lines.append(f'{indent}"{map_id(src)}" -> "{map_id(node_id)}";')
lines.append(f'{indent}"{map_id(node_id)}" -> "{map_id(target)}";')
# Normal edges
for src, tgt, is_cond in self._compute_normal_edges(wf):
edge_attr = ' [style=dashed, label="conditional"]' if is_cond else ""
lines.append(f'{indent}"{map_id(src)}" -> "{map_id(tgt)}"{edge_attr};')
def _emit_sub_workflows_digraph(self, wf: Workflow, lines: list[str], indent: str) -> None:
"""Emit DOT subgraphs for any WorkflowExecutor instances found in the workflow."""
# Lazy import to avoid any potential import cycles
try:
from ._executor import WorkflowExecutor # type: ignore
except ImportError: # pragma: no cover - best-effort; if unavailable, skip subgraphs
return
for exec_id, exec_obj in wf.executors.items():
if isinstance(exec_obj, WorkflowExecutor) and hasattr(exec_obj, "workflow") and exec_obj.workflow:
subgraph_id = f"cluster_{uuid.uuid5(uuid.NAMESPACE_OID, exec_id).hex[:8]}"
lines.append(f"{indent}subgraph {subgraph_id} {{")
lines.append(f'{indent} label="sub-workflow: {exec_id}";')
lines.append(f"{indent} style=dashed;")
# Emit the nested workflow inside this cluster using a namespace
ns = exec_id
self._emit_workflow_digraph(exec_obj.workflow, lines, indent=f"{indent} ", ns=ns)
# Recurse into deeper nested sub-workflows
self._emit_sub_workflows_digraph(exec_obj.workflow, lines, indent=f"{indent} ")
lines.append(f"{indent}}}")
# endregion
# region Internal emitters (Mermaid)
def _emit_workflow_mermaid(self, wf: Workflow, lines: list[str], indent: str, ns: str | None = None) -> None:
def _san(s: str) -> str:
s2 = re.sub(r"[^0-9A-Za-z_]", "_", s)
if not s2 or not s2[0].isalpha():
s2 = f"n_{s2}"
return s2
def map_id(x: str) -> str:
if ns:
return f"{_san(ns)}__{_san(x)}"
return _san(x)
# Nodes
start_executor_id = wf.start_executor_id
lines.append(f'{indent}{map_id(start_executor_id)}["{start_executor_id} (Start)"];')
for executor_id in wf.executors:
if executor_id == start_executor_id:
continue
lines.append(f'{indent}{map_id(executor_id)}["{executor_id}"];')
# Fan-in nodes
fan_in_nodes_dot = self._compute_fan_in_descriptors(wf)
fan_in_nodes: list[tuple[str, list[str], str]] = []
for dot_node_id, sources, target in fan_in_nodes_dot:
digest = dot_node_id.split("::")[-1]
base = f"{target}__{digest}"
fan_node_id = f"fan_in__{_san(ns) + '__' if ns else ''}{_san(base)}"
fan_in_nodes.append((fan_node_id, sources, target))
for fan_node_id, _, _ in fan_in_nodes:
# Keep this line without trailing semicolon to match existing tests
lines.append(f"{indent}{fan_node_id}((fan-in))")
# Fan-in edges
for fan_node_id, sources, target in fan_in_nodes:
for s in sources:
lines.append(f"{indent}{map_id(s)} --> {fan_node_id};")
lines.append(f"{indent}{fan_node_id} --> {map_id(target)};")
# Normal edges
for src, tgt, is_cond in self._compute_normal_edges(wf):
s = map_id(src)
t = map_id(tgt)
if is_cond:
lines.append(f"{indent}{s} -. conditional .-> {t};")
else:
lines.append(f"{indent}{s} --> {t};")
def _emit_sub_workflows_mermaid(self, wf: Workflow, lines: list[str], indent: str) -> None:
try:
from ._executor import WorkflowExecutor # type: ignore
except ImportError: # pragma: no cover
return
def _san(s: str) -> str:
s2 = re.sub(r"[^0-9A-Za-z_]", "_", s)
if not s2 or not s2[0].isalpha():
s2 = f"n_{s2}"
return s2
for exec_id, exec_obj in wf.executors.items():
if isinstance(exec_obj, WorkflowExecutor) and hasattr(exec_obj, "workflow") and exec_obj.workflow:
sg_id = _san(exec_id)
lines.append(f"{indent}subgraph {sg_id}")
# Render nested workflow within this subgraph using namespacing
self._emit_workflow_mermaid(exec_obj.workflow, lines, indent=f"{indent} ", ns=exec_id)
# Recurse into deeper sub-workflows
self._emit_sub_workflows_mermaid(exec_obj.workflow, lines, indent=f"{indent} ")
lines.append(f"{indent}end")
# endregion
+119
View File
@@ -5,6 +5,8 @@
import pytest
from agent_framework.workflow import Executor, WorkflowBuilder, WorkflowContext, WorkflowViz, handler
from agent_framework_workflow import WorkflowExecutor
class MockExecutor(Executor):
"""A mock executor for testing purposes."""
@@ -23,6 +25,41 @@ class ListStrTargetExecutor(Executor):
pass
@pytest.fixture
def basic_sub_workflow():
"""Fixture that creates a basic sub-workflow setup for testing."""
# Create a sub-workflow
sub_exec1 = MockExecutor(id="sub_exec1")
sub_exec2 = MockExecutor(id="sub_exec2")
sub_workflow = WorkflowBuilder().add_edge(sub_exec1, sub_exec2).set_start_executor(sub_exec1).build()
# Create a workflow executor that wraps the sub-workflow
workflow_executor = WorkflowExecutor(sub_workflow, id="workflow_executor_1")
# Create a main workflow that includes the workflow executor
main_exec = MockExecutor(id="main_executor")
final_exec = MockExecutor(id="final_executor")
main_workflow = (
WorkflowBuilder()
.add_edge(main_exec, workflow_executor)
.add_edge(workflow_executor, final_exec)
.set_start_executor(main_exec)
.build()
)
return {
"main_workflow": main_workflow,
"workflow_executor": workflow_executor,
"sub_workflow": sub_workflow,
"main_exec": main_exec,
"final_exec": final_exec,
"sub_exec1": sub_exec1,
"sub_exec2": sub_exec2,
}
def test_workflow_viz_to_digraph():
"""Test that WorkflowViz can generate a DOT digraph."""
# Create a simple workflow
@@ -283,3 +320,85 @@ def test_workflow_viz_mermaid_fan_in_edge_group():
# Ensure direct edges to target are not present
assert "s1 --> t" not in mermaid
assert "s2 --> t" not in mermaid
def test_workflow_viz_sub_workflow_digraph(basic_sub_workflow):
"""Test that WorkflowViz can visualize sub-workflows in DOT format."""
main_workflow = basic_sub_workflow["main_workflow"]
viz = WorkflowViz(main_workflow)
dot_content = viz.to_digraph()
# Check that main workflow nodes are present
assert "main_executor" in dot_content
assert "workflow_executor_1" in dot_content
assert "final_executor" in dot_content
# Check that sub-workflow is rendered as a cluster
assert "subgraph cluster_" in dot_content
assert "sub-workflow: workflow_executor_1" in dot_content
# Check that sub-workflow nodes are namespaced
assert '"workflow_executor_1/sub_exec1"' in dot_content
assert '"workflow_executor_1/sub_exec2"' in dot_content
# Check that sub-workflow edges are present
assert '"workflow_executor_1/sub_exec1" -> "workflow_executor_1/sub_exec2"' in dot_content
def test_workflow_viz_sub_workflow_mermaid(basic_sub_workflow):
"""Test that WorkflowViz can visualize sub-workflows in Mermaid format."""
main_workflow = basic_sub_workflow["main_workflow"]
viz = WorkflowViz(main_workflow)
mermaid_content = viz.to_mermaid()
# Check that main workflow nodes are present
assert "main_executor" in mermaid_content
assert "workflow_executor_1" in mermaid_content
assert "final_executor" in mermaid_content
# Check that sub-workflow is rendered as a subgraph
assert "subgraph workflow_executor_1" in mermaid_content
assert "end" in mermaid_content
# Check that sub-workflow nodes are namespaced properly for Mermaid
assert "workflow_executor_1__sub_exec1" in mermaid_content
assert "workflow_executor_1__sub_exec2" in mermaid_content
def test_workflow_viz_nested_sub_workflows():
"""Test visualization of deeply nested sub-workflows."""
# Create innermost sub-workflow
inner_exec = MockExecutor(id="inner_exec")
inner_workflow = WorkflowBuilder().set_start_executor(inner_exec).build()
# Create middle sub-workflow that contains the inner one
inner_workflow_executor = WorkflowExecutor(inner_workflow, id="inner_wf_exec")
middle_exec = MockExecutor(id="middle_exec")
middle_workflow = (
WorkflowBuilder().add_edge(middle_exec, inner_workflow_executor).set_start_executor(middle_exec).build()
)
# Create outer workflow
middle_workflow_executor = WorkflowExecutor(middle_workflow, id="middle_wf_exec")
outer_exec = MockExecutor(id="outer_exec")
outer_workflow = (
WorkflowBuilder().add_edge(outer_exec, middle_workflow_executor).set_start_executor(outer_exec).build()
)
viz = WorkflowViz(outer_workflow)
dot_content = viz.to_digraph()
# Check that all levels are present
assert "outer_exec" in dot_content
assert "middle_wf_exec" in dot_content
assert "inner_wf_exec" in dot_content
# Check for nested clusters
assert "subgraph cluster_" in dot_content
# Should have multiple subgraphs for nested structure
subgraph_count = dot_content.count("subgraph cluster_")
assert subgraph_count >= 2 # At least one for each level of nesting