diff --git a/.gitignore b/.gitignore index 953fffead3..fef54e78bd 100644 --- a/.gitignore +++ b/.gitignore @@ -199,3 +199,7 @@ temp*/ .temp/ agents.md + +# AI +.claude/ +WARP.md \ No newline at end of file diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index dafc4544dc..db6552a9c8 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -67,7 +67,8 @@ - + + @@ -104,6 +105,9 @@ + + + @@ -267,13 +271,14 @@ - - - - + + + + + @@ -286,14 +291,14 @@ - - - - + + + + \ No newline at end of file diff --git a/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent.csproj b/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Concurrent.csproj similarity index 57% rename from dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent.csproj rename to dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Concurrent.csproj index b34c1bb22d..8b5969cb9a 100644 --- a/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent.csproj +++ b/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Concurrent.csproj @@ -15,9 +15,9 @@ - - - + + + diff --git a/dotnet/samples/GettingStarted/Workflows/Concurrent/Program.cs b/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Program.cs similarity index 100% rename from dotnet/samples/GettingStarted/Workflows/Concurrent/Program.cs rename to dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Program.cs diff --git a/dotnet/samples/GettingStarted/Workflows/Concurrent/MapReduce/MapReduce.csproj b/dotnet/samples/GettingStarted/Workflows/Concurrent/MapReduce/MapReduce.csproj new file mode 100644 index 0000000000..7282e3fde4 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Concurrent/MapReduce/MapReduce.csproj @@ -0,0 +1,20 @@ + + + + Exe + net9.0 + + enable + + + + + + + + + + + + + \ No newline at end of file diff --git a/dotnet/samples/GettingStarted/Workflows/Concurrent/MapReduce/Program.cs b/dotnet/samples/GettingStarted/Workflows/Concurrent/MapReduce/Program.cs new file mode 100644 index 0000000000..f519c67256 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Concurrent/MapReduce/Program.cs @@ -0,0 +1,421 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Agents.AI.Workflows.Reflection; + +namespace WorkflowMapReduceSample; + +/// +/// Sample: Map-Reduce Word Count with Fan-Out and Fan-In over File-Backed Intermediate Results +/// +/// The workflow splits a large text into chunks, maps words to counts in parallel, +/// shuffles intermediate pairs to reducers, then reduces to per-word totals. +/// It also demonstrates workflow visualization for graph visualization. +/// +/// Purpose: +/// Show how to: +/// - Partition input once and coordinate parallel mappers with shared state. +/// - Implement map, shuffle, and reduce executors that pass file paths instead of large payloads. +/// - Use fan-out and fan-in edges to express parallelism and joins. +/// - Persist intermediate results to disk to bound memory usage for large inputs. +/// - Visualize the workflow graph using ToDotString and ToMermaidString and export to SVG. +/// +/// +/// Pre-requisites: +/// - Write access to a temp directory. +/// - A source text file to process. +/// +public static class Program +{ + private static async Task Main() + { + Workflow workflow = BuildWorkflow(); + await RunWorkflowAsync(workflow); + } + + /// + /// Builds a map-reduce workflow using a fan-out/fan-in pattern with mappers, reducers, and other executors. + /// + /// This method constructs a workflow consisting of multiple stages, including splitting, + /// mapping, shuffling, reducing, and completion. The workflow is designed to process data in parallel using a + /// fan-out/fan-in architecture. The resulting workflow is ready for execution and includes all necessary + /// dependencies between the executors. + /// A instance representing the constructed workflow. + public static Workflow BuildWorkflow() + { + // Step 1: Create the mappers and the input splitter + var mappers = Enumerable.Range(0, 3).Select(i => new Mapper($"map_executor_{i}")).ToArray(); + var splitter = new Split(mappers.Select(m => m.Id).ToArray(), "split_data_executor"); + + // Step 2: Create the reducers and the intermidiace shuffler + var reducers = Enumerable.Range(0, 4).Select(i => new Reducer($"reduce_executor_{i}")).ToArray(); + var shuffler = new Shuffler(reducers.Select(r => r.Id).ToArray(), mappers.Select(m => m.Id).ToArray(), "shuffle_executor"); + + // Step 3: Create the output manager + var completion = new CompletionExecutor("completion_executor"); + + // Step 4: Build the concurrent workflow with fan-out/fan-in pattern + return new WorkflowBuilder(splitter) + .AddFanOutEdge(splitter, targets: [.. mappers]) // Split -> many mappers + .AddFanInEdge(shuffler, sources: [.. mappers]) // All mappers -> shuffle + .AddFanOutEdge(shuffler, targets: [.. reducers]) // Shuffle -> many reducers + .AddFanInEdge(completion, sources: [.. reducers]) // All reducers -> completion + .WithOutputFrom(completion) + .Build(); + } + + /// + /// Executes the specified workflow asynchronously using a predefined input text and processes its output events. + /// + /// This method reads input text from a file located in the "resources" directory. If the file is + /// not found, a default sample text is used. The workflow is executed with the input text, and its events are + /// streamed and processed in real-time. If the workflow produces output files, their paths and contents are + /// displayed. + /// The workflow to execute. This defines the sequence of operations to be performed. + /// A task that represents the asynchronous operation. + private static async Task RunWorkflowAsync(Workflow workflow) + { + // Step 1: Read the input text + var resourcesPath = Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "..", "..", "resources"); + var textFilePath = Path.Combine(resourcesPath, "long_text.txt"); + + string rawText; + if (File.Exists(textFilePath)) + { + rawText = await File.ReadAllTextAsync(textFilePath); + } + else + { + // Use sample text if file doesn't exist + Console.WriteLine($"Note: {textFilePath} not found, using sample text"); + rawText = "The quick brown fox jumps over the lazy dog. The dog was very lazy. The fox was very quick."; + } + + // Step 2: Run the workflow + Console.WriteLine("\n=== RUNNING WORKFLOW ===\n"); + StreamingRun run = await InProcessExecution.StreamAsync(workflow, rawText); + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + Console.WriteLine($"Event: {evt}"); + if (evt is WorkflowOutputEvent outputEvent) + { + Console.WriteLine("\nFinal Output Files:"); + if (outputEvent.Data is List filePaths) + { + foreach (var filePath in filePaths) + { + Console.WriteLine($" - {filePath}"); + if (File.Exists(filePath)) + { + var content = await File.ReadAllTextAsync(filePath); + Console.WriteLine($" Contents:\n{content}"); + } + } + } + } + } + } +} + +#region Executors + +/// +/// Splits data into roughly equal chunks based on the number of mapper nodes. +/// +internal sealed class Split(string[] mapperIds, string id) : + ReflectingExecutor(id), + IMessageHandler +{ + private readonly string[] _mapperIds = mapperIds; + private static readonly string[] s_lineSeparators = ["\r\n", "\r", "\n"]; + + /// + /// Tokenize input and assign contiguous index ranges to each mapper via shared state. + /// + public async ValueTask HandleAsync(string message, IWorkflowContext context) + { + // Ensure temp directory exists + Directory.CreateDirectory(MapReduceConstants.TempDir); + + // Process the data into a list of words and remove any empty lines + var wordList = Preprocess(message); + + // Store the tokenized words once so that all mappers can read by index + await context.QueueStateUpdateAsync(MapReduceConstants.DataToProcessKey, wordList, scopeName: MapReduceConstants.StateScope); + + // Divide indices into contiguous slices for each mapper + var mapperCount = this._mapperIds.Length; + var chunkSize = wordList.Length / mapperCount; + + async Task ProcessChunkAsync(int i) + { + // Determine the start and end indices for this mapper's chunk + var startIndex = i * chunkSize; + var endIndex = i < mapperCount - 1 ? startIndex + chunkSize : wordList.Length; + + // Save the indices under the mapper's Id + await context.QueueStateUpdateAsync(this._mapperIds[i], (startIndex, endIndex), scopeName: MapReduceConstants.StateScope); + + // Notify the mapper that data is ready + await context.SendMessageAsync(new SplitComplete(), targetId: this._mapperIds[i]); + } + + // Process all the chunks + var tasks = Enumerable.Range(0, mapperCount).Select(ProcessChunkAsync); + await Task.WhenAll(tasks); + } + + private static string[] Preprocess(string data) + { + var lines = data.Split(s_lineSeparators, StringSplitOptions.RemoveEmptyEntries) + .Select(line => line.Trim()) + .Where(line => !string.IsNullOrWhiteSpace(line)); + + return lines + .SelectMany(line => line.Split(' ', StringSplitOptions.RemoveEmptyEntries)) + .Where(word => !string.IsNullOrWhiteSpace(word)) + .ToArray(); + } +} + +/// +/// Maps each token to a count of 1 and writes pairs to a per-mapper file. +/// +internal sealed class Mapper(string id) : ReflectingExecutor(id), IMessageHandler +{ + /// + /// Read the assigned slice, emit (word, 1) pairs, and persist to disk. + /// + public async ValueTask HandleAsync(SplitComplete message, IWorkflowContext context) + { + var dataToProcess = await context.ReadStateAsync(MapReduceConstants.DataToProcessKey, scopeName: MapReduceConstants.StateScope); + var chunk = await context.ReadStateAsync<(int start, int end)>(this.Id, scopeName: MapReduceConstants.StateScope); + + var results = dataToProcess![chunk.start..chunk.end] + .Select(word => (word, 1)) + .ToArray(); + + // Write this mapper's results as simple text lines for easy debugging + var filePath = Path.Combine(MapReduceConstants.TempDir, $"map_results_{this.Id}.txt"); + var lines = results.Select(r => $"{r.word}: {r.Item2}"); + await File.WriteAllLinesAsync(filePath, lines); + + await context.SendMessageAsync(new MapComplete(filePath)); + } +} + +/// +/// Groups intermediate pairs by key and partitions them across reducers. +/// +internal sealed class Shuffler(string[] reducerIds, string[] mapperIds, string id) : + ReflectingExecutor(id), + IMessageHandler +{ + private readonly string[] _reducerIds = reducerIds; + private readonly string[] _mapperIds = mapperIds; + private readonly List _mapResults = new(); + + /// + /// Aggregate mapper outputs and write one partition file per reducer. + /// + public async ValueTask HandleAsync(MapComplete message, IWorkflowContext context) + { + this._mapResults.Add(message); + + // Wait for all mappers to complete + if (this._mapResults.Count < this._mapperIds.Length) + { + return; + } + + var chunks = await this.PreprocessAsync(this._mapResults); + + async Task ProcessChunkAsync(List<(string key, List values)> chunk, int index) + { + // Write one grouped partition for reducer index and notify that reducer + var filePath = Path.Combine(MapReduceConstants.TempDir, $"shuffle_results_{index}.txt"); + var lines = chunk.Select(kvp => $"{kvp.key}: {JsonSerializer.Serialize(kvp.values)}"); + await File.WriteAllLinesAsync(filePath, lines); + + await context.SendMessageAsync(new ShuffleComplete(filePath, this._reducerIds[index])); + } + + var tasks = chunks.Select((chunk, i) => ProcessChunkAsync(chunk, i)); + await Task.WhenAll(tasks); + } + + /// + /// Load all mapper files, group by key, sort keys, and partition for reducers. + /// + private async Task values)>>> PreprocessAsync(List data) + { + // Load all intermediate pairs + var mapResults = new List<(string key, int value)>(); + foreach (var result in data) + { + var lines = await File.ReadAllLinesAsync(result.FilePath); + foreach (var line in lines) + { + var parts = line.Split(": "); + if (parts.Length == 2) + { + mapResults.Add((parts[0], int.Parse(parts[1]))); + } + } + } + + // Group values by token + var intermediateResults = mapResults + .GroupBy(r => r.key) + .ToDictionary(g => g.Key, g => g.Select(r => r.value).ToList()); + + // Deterministic ordering helps with debugging and test stability + var aggregatedResults = intermediateResults + .Select(kvp => (key: kvp.Key, values: kvp.Value)) + .OrderBy(x => x.key) + .ToList(); + + // Partition keys across reducers as evenly as possible + var reduceExecutorCount = this._reducerIds.Length; // Use actual number of reducers + if (reduceExecutorCount == 0) + { + reduceExecutorCount = 1; + } + + var chunkSize = aggregatedResults.Count / reduceExecutorCount; + var remaining = aggregatedResults.Count % reduceExecutorCount; + + var chunks = new List values)>>(); + for (int i = 0; i < aggregatedResults.Count - remaining; i += chunkSize) + { + chunks.Add(aggregatedResults.GetRange(i, chunkSize)); + } + + if (remaining > 0 && chunks.Count > 0) + { + chunks[^1].AddRange(aggregatedResults.TakeLast(remaining)); + } + else if (chunks.Count == 0) + { + chunks.Add(aggregatedResults); + } + + return chunks; + } +} + +/// +/// Sums grouped counts per key for its assigned partition. +/// +internal sealed class Reducer(string id) : ReflectingExecutor(id), IMessageHandler +{ + /// + /// Read one shuffle partition and reduce it to totals. + /// + public async ValueTask HandleAsync(ShuffleComplete message, IWorkflowContext context) + { + if (message.ReducerId != this.Id) + { + // This partition belongs to a different reducer. Skip. + return; + } + + // Read grouped values from the shuffle output + var lines = await File.ReadAllLinesAsync(message.FilePath); + + // Sum values per key. Values are serialized JSON arrays like [1, 1, ...] + var reducedResults = new Dictionary(); + foreach (var line in lines) + { + var parts = line.Split(": ", 2); + if (parts.Length == 2) + { + var key = parts[0]; + var values = JsonSerializer.Deserialize>(parts[1]); + reducedResults[key] = values?.Sum() ?? 0; + } + } + + // Persist our partition totals + var filePath = Path.Combine(MapReduceConstants.TempDir, $"reduced_results_{this.Id}.txt"); + var outputLines = reducedResults.Select(kvp => $"{kvp.Key}: {kvp.Value}"); + await File.WriteAllLinesAsync(filePath, outputLines); + + await context.SendMessageAsync(new ReduceComplete(filePath)); + } +} + +/// +/// Joins all reducer outputs and yields the final output. +/// +internal sealed class CompletionExecutor(string id) : + ReflectingExecutor(id), + IMessageHandler> +{ + /// + /// Collect reducer output file paths and yield final output. + /// + public async ValueTask HandleAsync(List message, IWorkflowContext context) + { + var filePaths = message.ConvertAll(r => r.FilePath); + await context.YieldOutputAsync(filePaths); + } +} + +#endregion + +#region Events + +/// +/// Marker event published when splitting finishes. Triggers map executors. +/// +internal sealed class SplitComplete : WorkflowEvent; + +/// +/// Signal that a mapper wrote its intermediate pairs to file. +/// +internal sealed class MapComplete(string FilePath) : WorkflowEvent +{ + public string FilePath { get; } = FilePath; +} + +/// +/// Signal that a shuffle partition file is ready for a specific reducer. +/// +internal sealed class ShuffleComplete(string FilePath, string ReducerId) : WorkflowEvent +{ + public string FilePath { get; } = FilePath; + public string ReducerId { get; } = ReducerId; +} + +/// +/// Signal that a reducer wrote final counts for its partition. +/// +internal sealed class ReduceComplete(string FilePath) : WorkflowEvent +{ + public string FilePath { get; } = FilePath; +} + +#endregion + +#region Helpers + +/// +/// Provides constant values used in the MapReduce workflow. +/// +/// This class contains keys and paths that are utilized throughout the MapReduce process, including +/// identifiers for data processing and temporary storage locations. +internal static class MapReduceConstants +{ + public static string DataToProcessKey = "data_to_be_processed"; + public static string TempDir = Path.Combine(Path.GetTempPath(), "workflow_viz_sample"); + public static string StateScope = "MapReduceState"; +} + +#endregion diff --git a/dotnet/samples/GettingStarted/Workflows/Visualization/Program.cs b/dotnet/samples/GettingStarted/Workflows/Visualization/Program.cs new file mode 100644 index 0000000000..8a01356bb9 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Visualization/Program.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using Microsoft.Agents.AI.Workflows; + +namespace WorkflowVisualizationSample; + +/// +/// Sample demonstrating workflow visualization using Mermaid and DOT (Graphviz) formats. +/// +/// +/// This sample shows how to use the ToMermaidString() and ToDotString() extension methods +/// to generate visual representations of workflow graphs. The visualizations can be used +/// for documentation, debugging, and understanding complex workflow structures. +/// +internal static class Program +{ + /// + /// Entry point that generates and displays workflow visualizations in Mermaid and DOT formats. + /// + /// Command line arguments (not used). + private static void Main(string[] args) + { + // Step 1: Build the workflow you want to visualize + Workflow workflow = WorkflowMapReduceSample.Program.BuildWorkflow(); + + // Step 2: Generate and display workflow visualization + Console.WriteLine("Generating workflow visualization..."); + + // Mermaid + Console.WriteLine("Mermaid string: \n======="); + var mermaid = workflow.ToMermaidString(); + Console.WriteLine(mermaid); + Console.WriteLine("======="); + + // DOT + Console.WriteLine("DiGraph string: *** Tip: To export DOT as an image, install Graphviz and pipe the DOT output to 'dot -Tsvg', 'dot -Tpng', etc. *** \n======="); + var dotString = workflow.ToDotString(); + Console.WriteLine(dotString); + Console.WriteLine("======="); + } +} diff --git a/dotnet/samples/GettingStarted/Workflows/Visualization/README.md b/dotnet/samples/GettingStarted/Workflows/Visualization/README.md new file mode 100644 index 0000000000..19befb7631 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Visualization/README.md @@ -0,0 +1,43 @@ +# Workflow Visualization Sample + +This sample demonstrates how to visualize workflows using `ToMermaidString()` and `ToDotString()` extension methods. It uses a map-reduce workflow with fan-out/fan-in patterns as an example. + +## Running the Sample + +```bash +dotnet run +``` + +## Output Formats + +The sample generates two visualization formats: + +### Mermaid +Paste the output into any Mermaid-compatible viewer (GitHub, Mermaid Live Editor, etc.): + +![Mermaid Visualization](Resources/mermaid_render.png) + +### DOT (Graphviz) +Render with Graphviz (requires `graphviz` to be installed): + +```bash +dotnet run | tail -n +20 | dot -Tpng -o workflow.png +``` + +![Graphviz Visualization](Resources/graphviz_render.png) + +## Usage + +```csharp +Workflow workflow = BuildWorkflow(); + +// Generate Mermaid format +string mermaid = workflow.ToMermaidString(); + +// Generate DOT format +string dotString = workflow.ToDotString(); +``` + +## Related Samples + +- [Map-Reduce Workflow Sample](../../../Workflows/Visualization/README.md): The workflow implementation being visualized \ No newline at end of file diff --git a/dotnet/samples/GettingStarted/Workflows/Visualization/Resources/graphviz_render.png b/dotnet/samples/GettingStarted/Workflows/Visualization/Resources/graphviz_render.png new file mode 100644 index 0000000000..2966e3e8c7 Binary files /dev/null and b/dotnet/samples/GettingStarted/Workflows/Visualization/Resources/graphviz_render.png differ diff --git a/dotnet/samples/GettingStarted/Workflows/Visualization/Resources/mermaid_render.png b/dotnet/samples/GettingStarted/Workflows/Visualization/Resources/mermaid_render.png new file mode 100644 index 0000000000..e53f11e51d Binary files /dev/null and b/dotnet/samples/GettingStarted/Workflows/Visualization/Resources/mermaid_render.png differ diff --git a/dotnet/samples/GettingStarted/Workflows/Visualization/Visualization.csproj b/dotnet/samples/GettingStarted/Workflows/Visualization/Visualization.csproj new file mode 100644 index 0000000000..79f5876e0a --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Visualization/Visualization.csproj @@ -0,0 +1,16 @@ + + + + Exe + net9.0 + + enable + disable + + + + + + + + diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Visualization/WorkflowVisualizer.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Visualization/WorkflowVisualizer.cs new file mode 100644 index 0000000000..4c27df2206 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Visualization/WorkflowVisualizer.cs @@ -0,0 +1,283 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Provides visualization utilities for workflows using Graphviz DOT format. +/// +public static class WorkflowVisualizer +{ + /// + /// Export the workflow as a DOT format digraph string. + /// + /// A string representation of the workflow in DOT format. + public static string ToDotString(this Workflow workflow) + { + Throw.IfNull(workflow, nameof(workflow)); + + var lines = new List + { + "digraph Workflow {", + " rankdir=TD;", // Top to bottom layout + " node [shape=box, style=filled, fillcolor=lightblue];", + " edge [color=black, arrowhead=vee];", + "" + }; + + // Emit the top-level workflow nodes/edges + EmitWorkflowDigraph(workflow, lines, " "); + + // Emit sub-workflows hosted by WorkflowExecutor as nested clusters + EmitSubWorkflowsDigraph(workflow, lines, " "); + + lines.Add("}"); + return string.Join("\n", lines); + } + + /// + /// Converts the specified into a Mermaid.js diagram representation. + /// + /// This method generates a textual representation of the workflow in the Mermaid.js format, + /// which can be used to visualize workflows as diagrams. The output is formatted with indentation for + /// readability. + /// The workflow to be converted into a Mermaid.js diagram. Cannot be null. + /// A string containing the Mermaid.js representation of the workflow. + public static string ToMermaidString(this Workflow workflow) + { + List lines = ["flowchart TD"]; + + EmitWorkflowMermaid(workflow, lines, " "); + return string.Join("\n", lines); + } + + #region Private Implementation + + private static void EmitWorkflowDigraph(Workflow workflow, List lines, string indent, string? ns = null) + { + string MapId(string id) => ns != null ? $"{ns}/{id}" : id; + + // Add start node + var startExecutorId = workflow.StartExecutorId; + lines.Add($"{indent}\"{MapId(startExecutorId)}\" [fillcolor=lightgreen, label=\"{startExecutorId}\\n(Start)\"];"); + + // Add other executor nodes + foreach (var executorId in workflow.Registrations.Keys) + { + if (executorId != startExecutorId) + { + lines.Add($"{indent}\"{MapId(executorId)}\" [label=\"{executorId}\"];"); + } + } + + // Compute and emit fan-in nodes + var fanInDescriptors = ComputeFanInDescriptors(workflow); + if (fanInDescriptors.Count > 0) + { + lines.Add(""); + foreach (var (nodeId, _, _) in fanInDescriptors) + { + lines.Add($"{indent}\"{MapId(nodeId)}\" [shape=ellipse, fillcolor=lightgoldenrod, label=\"fan-in\"];"); + } + } + + // Emit fan-in edges + foreach (var (nodeId, sources, target) in fanInDescriptors) + { + foreach (var src in sources) + { + lines.Add($"{indent}\"{MapId(src)}\" -> \"{MapId(nodeId)}\";"); + } + lines.Add($"{indent}\"{MapId(nodeId)}\" -> \"{MapId(target)}\";"); + } + + // Emit normal edges + foreach (var (src, target, isConditional) in ComputeNormalEdges(workflow)) + { + var edgeAttr = isConditional ? " [style=dashed, label=\"conditional\"]" : ""; + lines.Add($"{indent}\"{MapId(src)}\" -> \"{MapId(target)}\"{edgeAttr};"); + } + } + + private static void EmitSubWorkflowsDigraph(Workflow workflow, List lines, string indent) + { + foreach (var kvp in workflow.Registrations) + { + var execId = kvp.Key; + var registration = kvp.Value; + // Check if this is a WorkflowExecutor with a nested workflow + if (TryGetNestedWorkflow(registration, out var nestedWorkflow)) + { + var subgraphId = $"cluster_{ComputeShortHash(execId)}"; + lines.Add($"{indent}subgraph {subgraphId} {{"); + lines.Add($"{indent} label=\"sub-workflow: {execId}\";"); + lines.Add($"{indent} style=dashed;"); + + // Emit the nested workflow inside this cluster using a namespace + EmitWorkflowDigraph(nestedWorkflow, lines, $"{indent} ", execId); + + // Recurse into deeper nested sub-workflows + EmitSubWorkflowsDigraph(nestedWorkflow, lines, $"{indent} "); + + lines.Add($"{indent}}}"); + } + } + } + + private static void EmitWorkflowMermaid(Workflow workflow, List lines, string indent, string? ns = null) + { + string sanitize(string input) + { + return input; + } + + string MapId(string id) => ns != null ? $"{sanitize(ns)}/{sanitize(id)}" : id; + + // Add start node + var startExecutorId = workflow.StartExecutorId; + lines.Add($"{indent}{MapId(startExecutorId)}[\"{startExecutorId} (Start)\"];"); + + // Add other executor nodes + foreach (var executorId in workflow.Registrations.Keys) + { + if (executorId != startExecutorId) + { + lines.Add($"{indent}{MapId(executorId)}[\"{executorId}\"];"); + } + } + + // Compute and emit fan-in nodes + var fanInDescriptors = ComputeFanInDescriptors(workflow); + if (fanInDescriptors.Count > 0) + { + lines.Add(""); + foreach (var (nodeId, _, _) in fanInDescriptors) + { + lines.Add($"{indent}{MapId(nodeId)}((fan-in))"); + } + } + + // Emit fan-in edges + foreach (var (nodeId, sources, target) in fanInDescriptors) + { + foreach (var src in sources) + { + lines.Add($"{indent}{MapId(src)} --> {MapId(nodeId)};"); + } + lines.Add($"{indent}{MapId(nodeId)} --> {MapId(target)};"); + } + + // Emit normal edges + foreach (var (src, target, isConditional) in ComputeNormalEdges(workflow)) + { + if (isConditional) + { + lines.Add($"{indent}{MapId(src)} -. conditional .--> {MapId(target)};"); + } + else + { + lines.Add($"{indent}{MapId(src)} --> {MapId(target)};"); + } + } + } + + private static List<(string NodeId, List Sources, string Target)> ComputeFanInDescriptors(Workflow workflow) + { + var result = new List<(string, List, string)>(); + var seen = new HashSet(); + + foreach (var edgeGroup in workflow.Edges.Values.SelectMany(x => x)) + { + if (edgeGroup.Kind == EdgeKind.FanIn && edgeGroup.FanInEdgeData != null) + { + var fanInData = edgeGroup.FanInEdgeData; + var target = fanInData.SinkId; + var sources = fanInData.SourceIds.ToList(); + var digest = ComputeFanInDigest(target, sources); + var nodeId = $"fan_in::{target}::{digest}"; + + // Avoid duplicates - the same fan-in edge group might appear in multiple source executor lists + if (seen.Add(nodeId)) + { + result.Add((nodeId, sources.OrderBy(x => x, StringComparer.Ordinal).ToList(), target)); + } + } + } + + return result; + } + + private static List<(string Source, string Target, bool IsConditional)> ComputeNormalEdges(Workflow workflow) + { + var edges = new List<(string, string, bool)>(); + foreach (var edgeGroup in workflow.Edges.Values.SelectMany(x => x)) + { + if (edgeGroup.Kind == EdgeKind.FanIn) + { + continue; + } + + switch (edgeGroup.Kind) + { + case EdgeKind.Direct when edgeGroup.DirectEdgeData != null: + var directData = edgeGroup.DirectEdgeData; + var isConditional = directData.Condition != null; + edges.Add((directData.SourceId, directData.SinkId, isConditional)); + break; + + case EdgeKind.FanOut when edgeGroup.FanOutEdgeData != null: + var fanOutData = edgeGroup.FanOutEdgeData; + foreach (var sinkId in fanOutData.SinkIds) + { + edges.Add((fanOutData.SourceId, sinkId, false)); + } + break; + } + } + + return edges; + } + + private static string ComputeFanInDigest(string target, List sources) + { + var sortedSources = sources.OrderBy(x => x, StringComparer.Ordinal).ToList(); + var input = target + "|" + string.Join("|", sortedSources); + using (var sha256 = SHA256.Create()) + { + return ComputeShortHash(input); + } + } + + private static string ComputeShortHash(string input) + { +#if !NET + using var sha256 = SHA256.Create(); + var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(input)); + return BitConverter.ToString(hash).Replace("-", "").Substring(0, 8).ToUpperInvariant(); +#else + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(input)); + return Convert.ToHexString(hash).Substring(0, 8); +#endif + } + + private static bool TryGetNestedWorkflow(ExecutorRegistration registration, [NotNullWhen(true)] out Workflow? workflow) + { + if (registration.RawExecutorishData is Workflow subWorkflow) + { + workflow = subWorkflow; + return true; + } + + workflow = null; + return false; + } + + #endregion +} diff --git a/dotnet/src/Shared/Demos/SampleEnvironment.cs b/dotnet/src/Shared/Demos/SampleEnvironment.cs index c2adccb845..bf1151498a 100644 --- a/dotnet/src/Shared/Demos/SampleEnvironment.cs +++ b/dotnet/src/Shared/Demos/SampleEnvironment.cs @@ -7,7 +7,6 @@ using System.Collections; using SystemEnvironment = System.Environment; namespace SampleHelpers; - internal static class SampleEnvironment { public static string? GetEnvironmentVariable(string key) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowVisualizerTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowVisualizerTests.cs new file mode 100644 index 0000000000..e00e513775 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowVisualizerTests.cs @@ -0,0 +1,397 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using FluentAssertions; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class WorkflowVisualizerTests +{ + private sealed class MockExecutor(string id) : Executor(id) + { + protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) => + routeBuilder.AddHandler((msg, ctx) => ctx.SendMessageAsync(msg)); + } + + private sealed class ListStrTargetExecutor(string id) : Executor(id) + { + protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) => + routeBuilder.AddHandler((msgs, ctx) => ctx.SendMessageAsync(string.Join(",", msgs))); + } + + [Fact] + public void Test_WorkflowViz_ToDotString_Basic() + { + // Create a simple workflow + var executor1 = new MockExecutor("executor1"); + var executor2 = new MockExecutor("executor2"); + + var workflow = new WorkflowBuilder("executor1") + .AddEdge(executor1, executor2) + .Build(); + + var dotContent = workflow.ToDotString(); + + // Check that the DOT content contains expected elements + dotContent.Should().Contain("digraph Workflow {"); + dotContent.Should().Contain("\"executor1\""); + dotContent.Should().Contain("\"executor2\""); + dotContent.Should().Contain("\"executor1\" -> \"executor2\""); + dotContent.Should().Contain("fillcolor=lightgreen"); // Start executor styling + dotContent.Should().Contain("(Start)"); + } + + [Fact] + public void Test_WorkflowViz_Complex_Workflow() + { + // Test visualization of a more complex workflow + var executor1 = new MockExecutor("start"); + var executor2 = new MockExecutor("middle1"); + var executor3 = new MockExecutor("middle2"); + var executor4 = new MockExecutor("end"); + + var workflow = new WorkflowBuilder("start") + .AddEdge(executor1, executor2) + .AddEdge(executor1, executor3) + .AddEdge(executor2, executor4) + .AddEdge(executor3, executor4) + .Build(); + + var dotContent = workflow.ToDotString(); + + // Check all executors are present + dotContent.Should().Contain("\"start\""); + dotContent.Should().Contain("\"middle1\""); + dotContent.Should().Contain("\"middle2\""); + dotContent.Should().Contain("\"end\""); + + // Check all edges are present + dotContent.Should().Contain("\"start\" -> \"middle1\""); + dotContent.Should().Contain("\"start\" -> \"middle2\""); + dotContent.Should().Contain("\"middle1\" -> \"end\""); + dotContent.Should().Contain("\"middle2\" -> \"end\""); + + // Check start executor has special styling + dotContent.Should().Contain("fillcolor=lightgreen"); + } + + [Fact] + public void Test_WorkflowViz_Conditional_Edge() + { + // Test that conditional edges are rendered dashed with a label + var start = new MockExecutor("start"); + var mid = new MockExecutor("mid"); + var end = new MockExecutor("end"); + + // Condition that is never used during viz, but presence should mark the edge + bool OnlyIfFoo(string? msg) => msg == "foo"; + + var workflow = new WorkflowBuilder("start") + .AddEdge(start, mid, OnlyIfFoo) + .AddEdge(mid, end) + .Build(); + + var dotContent = workflow.ToDotString(); + + // Conditional edge should be dashed and labeled + dotContent.Should().Contain("\"start\" -> \"mid\" [style=dashed, label=\"conditional\"];"); + // Non-conditional edge should be plain + dotContent.Should().Contain("\"mid\" -> \"end\""); + dotContent.Should().NotContain("\"mid\" -> \"end\" [style=dashed"); + } + + [Fact] + public void Test_WorkflowViz_FanIn_EdgeGroup() + { + // Test that fan-in edges render an intermediate node with label and routed edges + var start = new MockExecutor("start"); + var s1 = new MockExecutor("s1"); + var s2 = new MockExecutor("s2"); + var t = new ListStrTargetExecutor("t"); + + // Build a connected workflow: start fans out to s1 and s2, which then fan-in to t + var workflow = new WorkflowBuilder("start") + .AddFanOutEdge(start, s1, s2) + .AddFanInEdge(t, s1, s2) // AddFanInEdge(target, sources) + .Build(); + + var dotContent = workflow.ToDotString(); + + // There should be a single fan-in node with special styling and label + var lines = dotContent.Split('\n'); + var fanInLines = Array.FindAll(lines, line => + line.Contains("shape=ellipse") && line.Contains("label=\"fan-in\"")); + fanInLines.Should().HaveCount(1); + + // Extract the intermediate node id from the line + var fanInLine = fanInLines[0]; + var firstQuote = fanInLine.IndexOf('"'); + var secondQuote = fanInLine.IndexOf('"', firstQuote + 1); + firstQuote.Should().BeGreaterThan(-1); + secondQuote.Should().BeGreaterThan(-1); + var fanInNodeId = fanInLine.Substring(firstQuote + 1, secondQuote - firstQuote - 1); + fanInNodeId.Should().NotBeNullOrEmpty(); + + // Edges should be routed through the intermediate node, not direct to target + dotContent.Should().Contain($"\"s1\" -> \"{fanInNodeId}\";"); + dotContent.Should().Contain($"\"s2\" -> \"{fanInNodeId}\";"); + dotContent.Should().Contain($"\"{fanInNodeId}\" -> \"t\";"); + + // Ensure direct edges are not present + dotContent.Should().NotContain("\"s1\" -> \"t\""); + dotContent.Should().NotContain("\"s2\" -> \"t\""); + } + + // Note: Sub-workflow tests are commented out as the current implementation + // of TryGetNestedWorkflow returns false. These can be enabled once + // WorkflowExecutor detection is implemented. + + /* + [Fact] + public void Test_WorkflowViz_SubWorkflow_Digraph() + { + // Test that WorkflowViz can visualize sub-workflows in DOT format + // This test would require WorkflowExecutor implementation + // Currently TryGetNestedWorkflow always returns false + } + + [Fact] + public void Test_WorkflowViz_Nested_SubWorkflows() + { + // Test visualization of deeply nested sub-workflows + // This test would require WorkflowExecutor implementation + // Currently TryGetNestedWorkflow always returns false + } + */ + + [Fact] + public void Test_WorkflowViz_FanOut_Edges() + { + // Test fan-out edge visualization + var start = new MockExecutor("start"); + var target1 = new MockExecutor("target1"); + var target2 = new MockExecutor("target2"); + var target3 = new MockExecutor("target3"); + + var workflow = new WorkflowBuilder("start") + .AddFanOutEdge(start, target1, target2, target3) + .Build(); + + var dotContent = workflow.ToDotString(); + + // Check all fan-out edges are present + dotContent.Should().Contain("\"start\" -> \"target1\""); + dotContent.Should().Contain("\"start\" -> \"target2\""); + dotContent.Should().Contain("\"start\" -> \"target3\""); + } + + [Fact] + public void Test_WorkflowViz_Mixed_EdgeTypes() + { + // Test workflow with mixed edge types (direct, conditional, fan-out, fan-in) + var start = new MockExecutor("start"); + var a = new MockExecutor("a"); + var b = new MockExecutor("b"); + var c = new MockExecutor("c"); + var end = new ListStrTargetExecutor("end"); + + bool Condition(string? msg) => msg?.Contains("test") ?? false; + + var workflow = new WorkflowBuilder("start") + .AddEdge(start, a, Condition) // Conditional edge + .AddFanOutEdge(a, b, c) // Fan-out + .AddFanInEdge(end, b, c) // Fan-in - AddFanInEdge(target, sources) + .Build(); + + var dotContent = workflow.ToDotString(); + + // Check conditional edge + dotContent.Should().Contain("\"start\" -> \"a\" [style=dashed, label=\"conditional\"];"); + + // Check fan-out edges + dotContent.Should().Contain("\"a\" -> \"b\""); + dotContent.Should().Contain("\"a\" -> \"c\""); + + // Check fan-in (should have intermediate node) + dotContent.Should().Contain("shape=ellipse"); + dotContent.Should().Contain("label=\"fan-in\""); + } + + [Fact] + public void Test_WorkflowViz_SingleNode_Workflow() + { + // Test visualization of a single-node workflow + var executor = new MockExecutor("single"); + + var workflow = new WorkflowBuilder("single") + .BindExecutor(executor) + .Build(); + + var dotContent = workflow.ToDotString(); + + // Check single node is present with start styling + dotContent.Should().Contain("\"single\""); + dotContent.Should().Contain("fillcolor=lightgreen"); + dotContent.Should().Contain("(Start)"); + } + + [Fact] + public void Test_WorkflowViz_SelfLoop_Edge() + { + // Test visualization of self-loop edge + var executor = new MockExecutor("loop"); + + bool LoopCondition(string? msg) => (msg?.Length ?? 0) < 10; + + var workflow = new WorkflowBuilder("loop") + .AddEdge(executor, executor, LoopCondition) + .Build(); + + var dotContent = workflow.ToDotString(); + + // Check self-loop edge is present and conditional + dotContent.Should().Contain("\"loop\" -> \"loop\" [style=dashed, label=\"conditional\"];"); + } + + [Fact] + public void Test_WorkflowViz_ToMermaidString_Basic() + { + // Test that WorkflowViz can generate a Mermaid diagram + var executor1 = new MockExecutor("executor1"); + var executor2 = new MockExecutor("executor2"); + + var workflow = new WorkflowBuilder("executor1") + .AddEdge(executor1, executor2) + .Build(); + + var mermaidContent = workflow.ToMermaidString(); + + // Check that the Mermaid content contains expected elements + mermaidContent.Should().Contain("flowchart TD"); + mermaidContent.Should().Contain("executor1[\"executor1 (Start)\"]"); + mermaidContent.Should().Contain("executor2[\"executor2\"]"); + mermaidContent.Should().Contain("executor1 --> executor2"); + } + + [Fact] + public void Test_WorkflowViz_Mermaid_Conditional_Edge() + { + // Test that conditional edges are rendered with dotted lines and labels in Mermaid + var start = new MockExecutor("start"); + var mid = new MockExecutor("mid"); + var end = new MockExecutor("end"); + + bool OnlyIfFoo(string? msg) => msg == "foo"; + + var workflow = new WorkflowBuilder("start") + .AddEdge(start, mid, OnlyIfFoo) + .AddEdge(mid, end) + .Build(); + + var mermaidContent = workflow.ToMermaidString(); + + // Conditional edge should be dotted with label + mermaidContent.Should().Contain("start -. conditional .--> mid"); + // Non-conditional edge should be solid + mermaidContent.Should().Contain("mid --> end"); + mermaidContent.Should().NotContain("end -. conditional"); + } + + [Fact] + public void Test_WorkflowViz_Mermaid_FanIn_EdgeGroup() + { + // Test that fan-in edges render an intermediate node with label and routed edges in Mermaid + var start = new MockExecutor("start"); + var s1 = new MockExecutor("s1"); + var s2 = new MockExecutor("s2"); + var t = new ListStrTargetExecutor("t"); + + var workflow = new WorkflowBuilder("start") + .AddFanOutEdge(start, s1, s2) + .AddFanInEdge(t, s1, s2) + .Build(); + + var mermaidContent = workflow.ToMermaidString(); + + // There should be a fan-in node with special styling + var lines = mermaidContent.Split('\n'); + var fanInLines = Array.FindAll(lines, line => line.Contains("((fan-in))")); + fanInLines.Should().HaveCount(1); + + // Extract the intermediate node id from the line + var fanInLine = fanInLines[0].Trim(); + var fanInNodeId = fanInLine.Substring(0, fanInLine.IndexOf("((fan-in))", StringComparison.Ordinal)).Trim(); + fanInNodeId.Should().NotBeNullOrEmpty(); + + // Edges should be routed through the intermediate node + mermaidContent.Should().Contain($"s1 --> {fanInNodeId}"); + mermaidContent.Should().Contain($"s2 --> {fanInNodeId}"); + mermaidContent.Should().Contain($"{fanInNodeId} --> t"); + + // Ensure direct edges are not present + mermaidContent.Should().NotContain("s1 --> t"); + mermaidContent.Should().NotContain("s2 --> t"); + } + + [Fact] + public void Test_WorkflowViz_Mermaid_Complex_Workflow() + { + // Test Mermaid visualization of a more complex workflow + var executor1 = new MockExecutor("start"); + var executor2 = new MockExecutor("middle1"); + var executor3 = new MockExecutor("middle2"); + var executor4 = new MockExecutor("end"); + + var workflow = new WorkflowBuilder("start") + .AddEdge(executor1, executor2) + .AddEdge(executor1, executor3) + .AddEdge(executor2, executor4) + .AddEdge(executor3, executor4) + .Build(); + + var mermaidContent = workflow.ToMermaidString(); + + // Check all executors are present + mermaidContent.Should().Contain("start[\"start (Start)\"]"); + mermaidContent.Should().Contain("middle1[\"middle1\"]"); + mermaidContent.Should().Contain("middle2[\"middle2\"]"); + mermaidContent.Should().Contain("end[\"end\"]"); + + // Check all edges are present + mermaidContent.Should().Contain("start --> middle1"); + mermaidContent.Should().Contain("start --> middle2"); + mermaidContent.Should().Contain("middle1 --> end"); + mermaidContent.Should().Contain("middle2 --> end"); + } + + [Fact] + public void Test_WorkflowViz_Mermaid_Mixed_EdgeTypes() + { + // Test Mermaid workflow with mixed edge types (direct, conditional, fan-out, fan-in) + var start = new MockExecutor("start"); + var a = new MockExecutor("a"); + var b = new MockExecutor("b"); + var c = new MockExecutor("c"); + var end = new ListStrTargetExecutor("end"); + + bool Condition(string? msg) => msg?.Contains("test") ?? false; + + var workflow = new WorkflowBuilder("start") + .AddEdge(start, a, Condition) // Conditional edge + .AddFanOutEdge(a, b, c) // Fan-out + .AddFanInEdge(end, b, c) // Fan-in + .Build(); + + var mermaidContent = workflow.ToMermaidString(); + + // Check conditional edge + mermaidContent.Should().Contain("start -. conditional .--> a"); + + // Check fan-out edges + mermaidContent.Should().Contain("a --> b"); + mermaidContent.Should().Contain("a --> c"); + + // Check fan-in (should have intermediate node) + mermaidContent.Should().Contain("((fan-in))"); + } +} diff --git a/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py b/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py index 31c1bfc8f4..35284966af 100644 --- a/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py +++ b/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py @@ -299,7 +299,7 @@ async def main(): print("Tip: Install 'viz' extra to export workflow visualization: pip install agent-framework[viz]") # Step 3: Open the text file and read its content. - async with aiofiles.open(os.path.join(DIR, "resources", "long_text.txt"), "r") as f: + async with aiofiles.open(os.path.join(DIR, "../resources", "long_text.txt"), "r") as f: raw_text = await f.read() # Step 4: Run the workflow with the raw text as input. diff --git a/python/uv.lock b/python/uv.lock index 3748a5e72e..863b83b575 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -5942,4 +5942,4 @@ source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/e3/02/0f2892c661036d50ede074e376733dca2ae7c6eb617489437771209d4180/zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166", size = 25547, upload-time = "2025-06-08T17:06:39.4Z" } wheels = [ { url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276, upload-time = "2025-06-08T17:06:38.034Z" }, -] +] \ No newline at end of file