From 20ddf7cb318ef886dc76b280ebed35de6e6e4edb Mon Sep 17 00:00:00 2001 From: Eric Zhu Date: Wed, 3 Sep 2025 19:25:47 -0700 Subject: [PATCH] Python: WorkflowViz to support sub workflows (#571) * add visualization for sub workflows * add visualization to subworkflow samples * Update python/packages/workflow/agent_framework_workflow/_viz.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update python/packages/workflow/agent_framework_workflow/_viz.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * update * remove changes to samples --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com> --- .../workflow/agent_framework_workflow/_viz.py | 230 ++++++++++++------ python/packages/workflow/tests/test_viz.py | 119 +++++++++ 2 files changed, 277 insertions(+), 72 deletions(-) diff --git a/python/packages/workflow/agent_framework_workflow/_viz.py b/python/packages/workflow/agent_framework_workflow/_viz.py index 3d7350e21f..916d980993 100644 --- a/python/packages/workflow/agent_framework_workflow/_viz.py +++ b/python/packages/workflow/agent_framework_workflow/_viz.py @@ -1,16 +1,19 @@ # Copyright (c) Microsoft. All rights reserved. -"""Workflow visualization module using graphviz.""" - import hashlib import re import tempfile +import uuid from pathlib import Path from typing import Literal from ._edge import FanInEdgeGroup from ._workflow import Workflow +# Import of WorkflowExecutor is performed lazily inside methods to avoid cycles + +"""Workflow visualization module using graphviz.""" + class WorkflowViz: """A class for visualizing workflows using graphviz.""" @@ -35,34 +38,11 @@ class WorkflowViz: lines.append(" edge [color=black, arrowhead=vee];") lines.append("") - # Add start executor with special styling - start_executor_id = self._workflow.start_executor_id - lines.append(f' "{start_executor_id}" [fillcolor=lightgreen, label="{start_executor_id}\\n(Start)"];') + # Emit the top-level workflow nodes/edges + self._emit_workflow_digraph(self._workflow, lines, indent=" ") - # Add all other executors - for executor_id in self._workflow.executors: - if executor_id != start_executor_id: - lines.append(f' "{executor_id}" [label="{executor_id}"];') - - # Build shared structures - fan_in_nodes = self._compute_fan_in_descriptors() # (node_id, sources, target) - normal_edges = self._compute_normal_edges() # (src, tgt, is_conditional) - - if fan_in_nodes: - lines.append("") - for node_id, _, _ in fan_in_nodes: - lines.append(f' "{node_id}" [shape=ellipse, fillcolor=lightgoldenrod, label="fan-in"];') - - # Route fan-in via intermediate nodes - for node_id, sources, target in fan_in_nodes: - for src in sources: - lines.append(f' "{src}" -> "{node_id}";') - lines.append(f' "{node_id}" -> "{target}";') - - # Draw normal edges - for src, tgt, is_cond in normal_edges: - edge_attr = ' [style=dashed, label="conditional"]' if is_cond else "" - lines.append(f' "{src}" -> "{tgt}"{edge_attr};') + # Emit sub-workflows hosted by WorkflowExecutor as nested clusters + self._emit_sub_workflows_digraph(self._workflow, lines, indent=" ") lines.append("}") return "\n".join(lines) @@ -178,46 +158,11 @@ class WorkflowViz: lines: list[str] = ["flowchart TD"] - # Nodes - start_executor_id = self._workflow.start_executor_id - start_id = _san(start_executor_id) - # End statements with semicolons for better compatibility and quote labels for special chars - lines.append(f' {start_id}["{start_executor_id} (Start)"];') + # Emit top-level workflow + self._emit_workflow_mermaid(self._workflow, lines, indent=" ") - for executor_id in self._workflow.executors: - if executor_id == start_executor_id: - continue - eid = _san(executor_id) - lines.append(f' {eid}["{executor_id}"];') - - # Build shared structures - fan_in_nodes_dot = self._compute_fan_in_descriptors() # uses DOT node ids - # Convert DOT-style node ids to Mermaid-safe ones - fan_in_nodes: list[tuple[str, list[str], str]] = [] - for dot_node_id, sources, target in fan_in_nodes_dot: - digest = dot_node_id.split("::")[-1] - fan_node_id = f"fan_in__{_san(target)}__{digest}" - fan_in_nodes.append((fan_node_id, sources, target)) - - for fan_node_id, _, _ in fan_in_nodes: - # Use double parentheses to make it circular in Mermaid - # (Keep this line without a trailing semicolon to match existing tests.) - lines.append(f" {fan_node_id}((fan-in))") - - # Fan-in edges - for fan_node_id, sources, target in fan_in_nodes: - for s in sources: - lines.append(f" {_san(s)} --> {fan_node_id};") - lines.append(f" {fan_node_id} --> {_san(target)};") - - # Normal edges - for src, tgt, is_cond in self._compute_normal_edges(): - s = _san(src) - t = _san(tgt) - if is_cond: - lines.append(f" {s} -. conditional .-> {t};") - else: - lines.append(f" {s} --> {t};") + # Emit sub-workflows as Mermaid subgraphs + self._emit_sub_workflows_mermaid(self._workflow, lines, indent=" ") return "\n".join(lines) @@ -227,13 +172,14 @@ class WorkflowViz: sources_sorted = sorted(sources) return hashlib.sha256((target + "|" + "|".join(sources_sorted)).encode("utf-8")).hexdigest()[:8] - def _compute_fan_in_descriptors(self) -> list[tuple[str, list[str], str]]: + def _compute_fan_in_descriptors(self, wf: Workflow | None = None) -> list[tuple[str, list[str], str]]: """Return list of (node_id, sources, target) for fan-in groups. node_id is DOT-oriented: fan_in::target::digest """ result: list[tuple[str, list[str], str]] = [] - for group in self._workflow.edge_groups: + workflow = wf or self._workflow + for group in workflow.edge_groups: if isinstance(group, FanInEdgeGroup): target = group.target_executor_ids[0] sources = list(group.source_executor_ids) @@ -242,10 +188,11 @@ class WorkflowViz: result.append((node_id, sorted(sources), target)) return result - def _compute_normal_edges(self) -> list[tuple[str, str, bool]]: + def _compute_normal_edges(self, wf: Workflow | None = None) -> list[tuple[str, str, bool]]: """Return list of (source_id, target_id, is_conditional) for non-fan-in groups.""" edges: list[tuple[str, str, bool]] = [] - for group in self._workflow.edge_groups: + workflow = wf or self._workflow + for group in workflow.edge_groups: if isinstance(group, FanInEdgeGroup): continue for edge in group.edges: @@ -254,3 +201,142 @@ class WorkflowViz: return edges # endregion + + # region Internal emitters (DOT) + + def _emit_workflow_digraph(self, wf: Workflow, lines: list[str], indent: str, ns: str | None = None) -> None: + """Emit DOT nodes/edges for the given workflow. + + If ns (namespace) is provided, node ids are prefixed with f"{ns}/" for uniqueness, + but labels remain the original executor ids. + """ + + def map_id(x: str) -> str: + return f"{ns}/{x}" if ns else x + + # Nodes + start_executor_id = wf.start_executor_id + lines.append( + f'{indent}"{map_id(start_executor_id)}" [fillcolor=lightgreen, label="{start_executor_id}\\n(Start)"];' + ) + for executor_id in wf.executors: + if executor_id != start_executor_id: + lines.append(f'{indent}"{map_id(executor_id)}" [label="{executor_id}"];') + + # Fan-in nodes + fan_in_nodes = self._compute_fan_in_descriptors(wf) + if fan_in_nodes: + lines.append("") + for node_id, _, _ in fan_in_nodes: + lines.append(f'{indent}"{map_id(node_id)}" [shape=ellipse, fillcolor=lightgoldenrod, label="fan-in"];') + + # Fan-in edges + for node_id, sources, target in fan_in_nodes: + for src in sources: + lines.append(f'{indent}"{map_id(src)}" -> "{map_id(node_id)}";') + lines.append(f'{indent}"{map_id(node_id)}" -> "{map_id(target)}";') + + # Normal edges + for src, tgt, is_cond in self._compute_normal_edges(wf): + edge_attr = ' [style=dashed, label="conditional"]' if is_cond else "" + lines.append(f'{indent}"{map_id(src)}" -> "{map_id(tgt)}"{edge_attr};') + + def _emit_sub_workflows_digraph(self, wf: Workflow, lines: list[str], indent: str) -> None: + """Emit DOT subgraphs for any WorkflowExecutor instances found in the workflow.""" + # Lazy import to avoid any potential import cycles + try: + from ._executor import WorkflowExecutor # type: ignore + except ImportError: # pragma: no cover - best-effort; if unavailable, skip subgraphs + return + + for exec_id, exec_obj in wf.executors.items(): + if isinstance(exec_obj, WorkflowExecutor) and hasattr(exec_obj, "workflow") and exec_obj.workflow: + subgraph_id = f"cluster_{uuid.uuid5(uuid.NAMESPACE_OID, exec_id).hex[:8]}" + lines.append(f"{indent}subgraph {subgraph_id} {{") + lines.append(f'{indent} label="sub-workflow: {exec_id}";') + lines.append(f"{indent} style=dashed;") + + # Emit the nested workflow inside this cluster using a namespace + ns = exec_id + self._emit_workflow_digraph(exec_obj.workflow, lines, indent=f"{indent} ", ns=ns) + + # Recurse into deeper nested sub-workflows + self._emit_sub_workflows_digraph(exec_obj.workflow, lines, indent=f"{indent} ") + + lines.append(f"{indent}}}") + + # endregion + + # region Internal emitters (Mermaid) + + def _emit_workflow_mermaid(self, wf: Workflow, lines: list[str], indent: str, ns: str | None = None) -> None: + def _san(s: str) -> str: + s2 = re.sub(r"[^0-9A-Za-z_]", "_", s) + if not s2 or not s2[0].isalpha(): + s2 = f"n_{s2}" + return s2 + + def map_id(x: str) -> str: + if ns: + return f"{_san(ns)}__{_san(x)}" + return _san(x) + + # Nodes + start_executor_id = wf.start_executor_id + lines.append(f'{indent}{map_id(start_executor_id)}["{start_executor_id} (Start)"];') + for executor_id in wf.executors: + if executor_id == start_executor_id: + continue + lines.append(f'{indent}{map_id(executor_id)}["{executor_id}"];') + + # Fan-in nodes + fan_in_nodes_dot = self._compute_fan_in_descriptors(wf) + fan_in_nodes: list[tuple[str, list[str], str]] = [] + for dot_node_id, sources, target in fan_in_nodes_dot: + digest = dot_node_id.split("::")[-1] + base = f"{target}__{digest}" + fan_node_id = f"fan_in__{_san(ns) + '__' if ns else ''}{_san(base)}" + fan_in_nodes.append((fan_node_id, sources, target)) + + for fan_node_id, _, _ in fan_in_nodes: + # Keep this line without trailing semicolon to match existing tests + lines.append(f"{indent}{fan_node_id}((fan-in))") + + # Fan-in edges + for fan_node_id, sources, target in fan_in_nodes: + for s in sources: + lines.append(f"{indent}{map_id(s)} --> {fan_node_id};") + lines.append(f"{indent}{fan_node_id} --> {map_id(target)};") + + # Normal edges + for src, tgt, is_cond in self._compute_normal_edges(wf): + s = map_id(src) + t = map_id(tgt) + if is_cond: + lines.append(f"{indent}{s} -. conditional .-> {t};") + else: + lines.append(f"{indent}{s} --> {t};") + + def _emit_sub_workflows_mermaid(self, wf: Workflow, lines: list[str], indent: str) -> None: + try: + from ._executor import WorkflowExecutor # type: ignore + except ImportError: # pragma: no cover + return + + def _san(s: str) -> str: + s2 = re.sub(r"[^0-9A-Za-z_]", "_", s) + if not s2 or not s2[0].isalpha(): + s2 = f"n_{s2}" + return s2 + + for exec_id, exec_obj in wf.executors.items(): + if isinstance(exec_obj, WorkflowExecutor) and hasattr(exec_obj, "workflow") and exec_obj.workflow: + sg_id = _san(exec_id) + lines.append(f"{indent}subgraph {sg_id}") + # Render nested workflow within this subgraph using namespacing + self._emit_workflow_mermaid(exec_obj.workflow, lines, indent=f"{indent} ", ns=exec_id) + # Recurse into deeper sub-workflows + self._emit_sub_workflows_mermaid(exec_obj.workflow, lines, indent=f"{indent} ") + lines.append(f"{indent}end") + + # endregion diff --git a/python/packages/workflow/tests/test_viz.py b/python/packages/workflow/tests/test_viz.py index 07a6d914d7..474cd1ade2 100644 --- a/python/packages/workflow/tests/test_viz.py +++ b/python/packages/workflow/tests/test_viz.py @@ -5,6 +5,8 @@ import pytest from agent_framework.workflow import Executor, WorkflowBuilder, WorkflowContext, WorkflowViz, handler +from agent_framework_workflow import WorkflowExecutor + class MockExecutor(Executor): """A mock executor for testing purposes.""" @@ -23,6 +25,41 @@ class ListStrTargetExecutor(Executor): pass +@pytest.fixture +def basic_sub_workflow(): + """Fixture that creates a basic sub-workflow setup for testing.""" + # Create a sub-workflow + sub_exec1 = MockExecutor(id="sub_exec1") + sub_exec2 = MockExecutor(id="sub_exec2") + + sub_workflow = WorkflowBuilder().add_edge(sub_exec1, sub_exec2).set_start_executor(sub_exec1).build() + + # Create a workflow executor that wraps the sub-workflow + workflow_executor = WorkflowExecutor(sub_workflow, id="workflow_executor_1") + + # Create a main workflow that includes the workflow executor + main_exec = MockExecutor(id="main_executor") + final_exec = MockExecutor(id="final_executor") + + main_workflow = ( + WorkflowBuilder() + .add_edge(main_exec, workflow_executor) + .add_edge(workflow_executor, final_exec) + .set_start_executor(main_exec) + .build() + ) + + return { + "main_workflow": main_workflow, + "workflow_executor": workflow_executor, + "sub_workflow": sub_workflow, + "main_exec": main_exec, + "final_exec": final_exec, + "sub_exec1": sub_exec1, + "sub_exec2": sub_exec2, + } + + def test_workflow_viz_to_digraph(): """Test that WorkflowViz can generate a DOT digraph.""" # Create a simple workflow @@ -283,3 +320,85 @@ def test_workflow_viz_mermaid_fan_in_edge_group(): # Ensure direct edges to target are not present assert "s1 --> t" not in mermaid assert "s2 --> t" not in mermaid + + +def test_workflow_viz_sub_workflow_digraph(basic_sub_workflow): + """Test that WorkflowViz can visualize sub-workflows in DOT format.""" + main_workflow = basic_sub_workflow["main_workflow"] + + viz = WorkflowViz(main_workflow) + dot_content = viz.to_digraph() + + # Check that main workflow nodes are present + assert "main_executor" in dot_content + assert "workflow_executor_1" in dot_content + assert "final_executor" in dot_content + + # Check that sub-workflow is rendered as a cluster + assert "subgraph cluster_" in dot_content + assert "sub-workflow: workflow_executor_1" in dot_content + + # Check that sub-workflow nodes are namespaced + assert '"workflow_executor_1/sub_exec1"' in dot_content + assert '"workflow_executor_1/sub_exec2"' in dot_content + + # Check that sub-workflow edges are present + assert '"workflow_executor_1/sub_exec1" -> "workflow_executor_1/sub_exec2"' in dot_content + + +def test_workflow_viz_sub_workflow_mermaid(basic_sub_workflow): + """Test that WorkflowViz can visualize sub-workflows in Mermaid format.""" + main_workflow = basic_sub_workflow["main_workflow"] + + viz = WorkflowViz(main_workflow) + mermaid_content = viz.to_mermaid() + + # Check that main workflow nodes are present + assert "main_executor" in mermaid_content + assert "workflow_executor_1" in mermaid_content + assert "final_executor" in mermaid_content + + # Check that sub-workflow is rendered as a subgraph + assert "subgraph workflow_executor_1" in mermaid_content + assert "end" in mermaid_content + + # Check that sub-workflow nodes are namespaced properly for Mermaid + assert "workflow_executor_1__sub_exec1" in mermaid_content + assert "workflow_executor_1__sub_exec2" in mermaid_content + + +def test_workflow_viz_nested_sub_workflows(): + """Test visualization of deeply nested sub-workflows.""" + # Create innermost sub-workflow + inner_exec = MockExecutor(id="inner_exec") + inner_workflow = WorkflowBuilder().set_start_executor(inner_exec).build() + + # Create middle sub-workflow that contains the inner one + inner_workflow_executor = WorkflowExecutor(inner_workflow, id="inner_wf_exec") + middle_exec = MockExecutor(id="middle_exec") + + middle_workflow = ( + WorkflowBuilder().add_edge(middle_exec, inner_workflow_executor).set_start_executor(middle_exec).build() + ) + + # Create outer workflow + middle_workflow_executor = WorkflowExecutor(middle_workflow, id="middle_wf_exec") + outer_exec = MockExecutor(id="outer_exec") + + outer_workflow = ( + WorkflowBuilder().add_edge(outer_exec, middle_workflow_executor).set_start_executor(outer_exec).build() + ) + + viz = WorkflowViz(outer_workflow) + dot_content = viz.to_digraph() + + # Check that all levels are present + assert "outer_exec" in dot_content + assert "middle_wf_exec" in dot_content + assert "inner_wf_exec" in dot_content + + # Check for nested clusters + assert "subgraph cluster_" in dot_content + # Should have multiple subgraphs for nested structure + subgraph_count = dot_content.count("subgraph cluster_") + assert subgraph_count >= 2 # At least one for each level of nesting