mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
.Net: Visualizing dotnet workflows (#882)
* Adding more test for workflow vizualization. * Improving README for sample. * More cleanup. * Removing rendering of workflow visualization and adding basic support for mermaid format. * Adding basic mermaid tests. * Improving sample. Switching to another branch. * About to merge from main. * Formatting. * More fromatting. * Removng uneeded call to ToUpper. * Adding README. * Moving samples under workflow to workflows. * Removing uneeded README for map-reduce sample.
This commit is contained in:
committed by
GitHub
Unverified
parent
522bf4bf23
commit
df3e12b2ed
@@ -199,3 +199,7 @@ temp*/
|
||||
.temp/
|
||||
|
||||
agents.md
|
||||
|
||||
# AI
|
||||
.claude/
|
||||
WARP.md
|
||||
@@ -67,7 +67,8 @@
|
||||
<File Path="samples/GettingStarted/Workflows/README.md" />
|
||||
</Folder>
|
||||
<Folder Name="/Samples/GettingStarted/Workflows/Concurrent/">
|
||||
<Project Path="samples/GettingStarted/Workflows/Concurrent/Concurrent.csproj" />
|
||||
<Project Path="samples/GettingStarted/Workflows/Concurrent/Concurrent/Concurrent.csproj" />
|
||||
<Project Path="samples/GettingStarted/Workflows/Concurrent/MapReduce/MapReduce.csproj" />
|
||||
</Folder>
|
||||
<Folder Name="/Samples/GettingStarted/Workflows/ConditionalEdges/">
|
||||
<Project Path="samples/GettingStarted/Workflows/ConditionalEdges/01_EdgeCondition/01_EdgeCondition.csproj" />
|
||||
@@ -104,6 +105,9 @@
|
||||
<Folder Name="/Samples/GettingStarted/Workflows/HumanInTheLoop/">
|
||||
<Project Path="samples/GettingStarted/Workflows/HumanInTheLoop/HumanInTheLoopBasic/HumanInTheLoopBasic.csproj" />
|
||||
</Folder>
|
||||
<Folder Name="/Samples/GettingStarted/Workflows/Visualization/">
|
||||
<Project Path="samples/GettingStarted/Workflows/Visualization/Visualization.csproj" Id="99bf0bc6-2440-428e-b3e7-d880e4b7a5fd" />
|
||||
</Folder>
|
||||
<Folder Name="/Samples/GettingStarted/Workflows/_Foundational/">
|
||||
<Project Path="samples/GettingStarted/Workflows/_Foundational/01_ExecutorsAndEdges/01_ExecutorsAndEdges.csproj" />
|
||||
<Project Path="samples/GettingStarted/Workflows/_Foundational/02_Streaming/02_Streaming.csproj" />
|
||||
@@ -267,13 +271,14 @@
|
||||
<Project Path="src/Microsoft.Agents.AI.Abstractions/Microsoft.Agents.AI.Abstractions.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.AzureAI/Microsoft.Agents.AI.AzureAI.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.CopilotStudio/Microsoft.Agents.AI.CopilotStudio.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.OpenAI/Microsoft.Agents.AI.OpenAI.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI/Microsoft.Agents.AI.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.Workflows.Declarative/Microsoft.Agents.AI.Workflows.Declarative.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.Workflows/Microsoft.Agents.AI.Workflows.csproj" />
|
||||
|
||||
<Project Path="src/Microsoft.Agents.AI.Hosting.A2A.AspNetCore/Microsoft.Agents.AI.Hosting.A2A.AspNetCore.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.Hosting.A2A/Microsoft.Agents.AI.Hosting.A2A.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.Hosting/Microsoft.Agents.AI.Hosting.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.OpenAI/Microsoft.Agents.AI.OpenAI.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.Workflows.Declarative/Microsoft.Agents.AI.Workflows.Declarative.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI.Workflows/Microsoft.Agents.AI.Workflows.csproj" />
|
||||
<Project Path="src/Microsoft.Agents.AI/Microsoft.Agents.AI.csproj" />
|
||||
</Folder>
|
||||
<Folder Name="/Tests/" />
|
||||
<Folder Name="/Tests/IntegrationTests/">
|
||||
@@ -286,14 +291,14 @@
|
||||
<Project Path="tests/OpenAIResponse.IntegrationTests/OpenAIResponse.IntegrationTests.csproj" />
|
||||
</Folder>
|
||||
<Folder Name="/Tests/UnitTests/">
|
||||
<Project Path="tests/Microsoft.Agents.AI.Abstractions.UnitTests/Microsoft.Agents.AI.Abstractions.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.UnitTests/Microsoft.Agents.AI.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.Workflows.UnitTests/Microsoft.Agents.AI.Workflows.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.A2A.UnitTests/Microsoft.Agents.AI.A2A.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.Abstractions.UnitTests/Microsoft.Agents.AI.Abstractions.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.AzureAI.UnitTests/Microsoft.Agents.AI.AzureAI.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.Hosting.A2A.Tests/Microsoft.Agents.AI.Hosting.A2A.Tests.csproj" Id="2a1c544d-237d-4436-8732-ba0c447ac06b" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.Hosting.UnitTests/Microsoft.Agents.AI.Hosting.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.OpenAI.UnitTests/Microsoft.Agents.AI.OpenAI.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.UnitTests/Microsoft.Agents.AI.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests.csproj" />
|
||||
<Project Path="tests/Microsoft.Agents.AI.Workflows.UnitTests/Microsoft.Agents.AI.Workflows.UnitTests.csproj" />
|
||||
</Folder>
|
||||
</Solution>
|
||||
+3
-3
@@ -15,9 +15,9 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.Workflows\Microsoft.Agents.AI.Workflows.csproj" />
|
||||
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.AzureAI\Microsoft.Agents.AI.AzureAI.csproj" />
|
||||
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI\Microsoft.Agents.AI.csproj" />
|
||||
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI.AzureAI\Microsoft.Agents.AI.AzureAI.csproj" />
|
||||
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI.Workflows\Microsoft.Agents.AI.Workflows.csproj" />
|
||||
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI\Microsoft.Agents.AI.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,20 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net9.0</TargetFramework>
|
||||
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI.Workflows\Microsoft.Agents.AI.Workflows.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Azure.AI.OpenAI" />
|
||||
<PackageReference Include="Azure.Identity" />
|
||||
<PackageReference Include="Microsoft.Extensions.AI.OpenAI" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,421 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Text.Json;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Agents.AI.Workflows;
|
||||
using Microsoft.Agents.AI.Workflows.Reflection;
|
||||
|
||||
namespace WorkflowMapReduceSample;
|
||||
|
||||
/// <summary>
|
||||
/// Sample: Map-Reduce Word Count with Fan-Out and Fan-In over File-Backed Intermediate Results
|
||||
///
|
||||
/// The workflow splits a large text into chunks, maps words to counts in parallel,
|
||||
/// shuffles intermediate pairs to reducers, then reduces to per-word totals.
|
||||
/// It also demonstrates workflow visualization for graph visualization.
|
||||
///
|
||||
/// Purpose:
|
||||
/// Show how to:
|
||||
/// - Partition input once and coordinate parallel mappers with shared state.
|
||||
/// - Implement map, shuffle, and reduce executors that pass file paths instead of large payloads.
|
||||
/// - Use fan-out and fan-in edges to express parallelism and joins.
|
||||
/// - Persist intermediate results to disk to bound memory usage for large inputs.
|
||||
/// - Visualize the workflow graph using ToDotString and ToMermaidString and export to SVG.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Pre-requisites:
|
||||
/// - Write access to a temp directory.
|
||||
/// - A source text file to process.
|
||||
/// </remarks>
|
||||
public static class Program
|
||||
{
|
||||
private static async Task Main()
|
||||
{
|
||||
Workflow workflow = BuildWorkflow();
|
||||
await RunWorkflowAsync(workflow);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds a map-reduce workflow using a fan-out/fan-in pattern with mappers, reducers, and other executors.
|
||||
/// </summary>
|
||||
/// <remarks>This method constructs a workflow consisting of multiple stages, including splitting,
|
||||
/// mapping, shuffling, reducing, and completion. The workflow is designed to process data in parallel using a
|
||||
/// fan-out/fan-in architecture. The resulting workflow is ready for execution and includes all necessary
|
||||
/// dependencies between the executors.</remarks>
|
||||
/// <returns>A <see cref="Workflow"/> instance representing the constructed workflow.</returns>
|
||||
public static Workflow BuildWorkflow()
|
||||
{
|
||||
// Step 1: Create the mappers and the input splitter
|
||||
var mappers = Enumerable.Range(0, 3).Select(i => new Mapper($"map_executor_{i}")).ToArray();
|
||||
var splitter = new Split(mappers.Select(m => m.Id).ToArray(), "split_data_executor");
|
||||
|
||||
// Step 2: Create the reducers and the intermidiace shuffler
|
||||
var reducers = Enumerable.Range(0, 4).Select(i => new Reducer($"reduce_executor_{i}")).ToArray();
|
||||
var shuffler = new Shuffler(reducers.Select(r => r.Id).ToArray(), mappers.Select(m => m.Id).ToArray(), "shuffle_executor");
|
||||
|
||||
// Step 3: Create the output manager
|
||||
var completion = new CompletionExecutor("completion_executor");
|
||||
|
||||
// Step 4: Build the concurrent workflow with fan-out/fan-in pattern
|
||||
return new WorkflowBuilder(splitter)
|
||||
.AddFanOutEdge(splitter, targets: [.. mappers]) // Split -> many mappers
|
||||
.AddFanInEdge(shuffler, sources: [.. mappers]) // All mappers -> shuffle
|
||||
.AddFanOutEdge(shuffler, targets: [.. reducers]) // Shuffle -> many reducers
|
||||
.AddFanInEdge(completion, sources: [.. reducers]) // All reducers -> completion
|
||||
.WithOutputFrom(completion)
|
||||
.Build();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Executes the specified workflow asynchronously using a predefined input text and processes its output events.
|
||||
/// </summary>
|
||||
/// <remarks>This method reads input text from a file located in the "resources" directory. If the file is
|
||||
/// not found, a default sample text is used. The workflow is executed with the input text, and its events are
|
||||
/// streamed and processed in real-time. If the workflow produces output files, their paths and contents are
|
||||
/// displayed.</remarks>
|
||||
/// <param name="workflow">The workflow to execute. This defines the sequence of operations to be performed.</param>
|
||||
/// <returns>A task that represents the asynchronous operation.</returns>
|
||||
private static async Task RunWorkflowAsync(Workflow workflow)
|
||||
{
|
||||
// Step 1: Read the input text
|
||||
var resourcesPath = Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "..", "..", "resources");
|
||||
var textFilePath = Path.Combine(resourcesPath, "long_text.txt");
|
||||
|
||||
string rawText;
|
||||
if (File.Exists(textFilePath))
|
||||
{
|
||||
rawText = await File.ReadAllTextAsync(textFilePath);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Use sample text if file doesn't exist
|
||||
Console.WriteLine($"Note: {textFilePath} not found, using sample text");
|
||||
rawText = "The quick brown fox jumps over the lazy dog. The dog was very lazy. The fox was very quick.";
|
||||
}
|
||||
|
||||
// Step 2: Run the workflow
|
||||
Console.WriteLine("\n=== RUNNING WORKFLOW ===\n");
|
||||
StreamingRun run = await InProcessExecution.StreamAsync(workflow, rawText);
|
||||
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
|
||||
{
|
||||
Console.WriteLine($"Event: {evt}");
|
||||
if (evt is WorkflowOutputEvent outputEvent)
|
||||
{
|
||||
Console.WriteLine("\nFinal Output Files:");
|
||||
if (outputEvent.Data is List<string> filePaths)
|
||||
{
|
||||
foreach (var filePath in filePaths)
|
||||
{
|
||||
Console.WriteLine($" - {filePath}");
|
||||
if (File.Exists(filePath))
|
||||
{
|
||||
var content = await File.ReadAllTextAsync(filePath);
|
||||
Console.WriteLine($" Contents:\n{content}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#region Executors
|
||||
|
||||
/// <summary>
|
||||
/// Splits data into roughly equal chunks based on the number of mapper nodes.
|
||||
/// </summary>
|
||||
internal sealed class Split(string[] mapperIds, string id) :
|
||||
ReflectingExecutor<Split>(id),
|
||||
IMessageHandler<string>
|
||||
{
|
||||
private readonly string[] _mapperIds = mapperIds;
|
||||
private static readonly string[] s_lineSeparators = ["\r\n", "\r", "\n"];
|
||||
|
||||
/// <summary>
|
||||
/// Tokenize input and assign contiguous index ranges to each mapper via shared state.
|
||||
/// </summary>
|
||||
public async ValueTask HandleAsync(string message, IWorkflowContext context)
|
||||
{
|
||||
// Ensure temp directory exists
|
||||
Directory.CreateDirectory(MapReduceConstants.TempDir);
|
||||
|
||||
// Process the data into a list of words and remove any empty lines
|
||||
var wordList = Preprocess(message);
|
||||
|
||||
// Store the tokenized words once so that all mappers can read by index
|
||||
await context.QueueStateUpdateAsync(MapReduceConstants.DataToProcessKey, wordList, scopeName: MapReduceConstants.StateScope);
|
||||
|
||||
// Divide indices into contiguous slices for each mapper
|
||||
var mapperCount = this._mapperIds.Length;
|
||||
var chunkSize = wordList.Length / mapperCount;
|
||||
|
||||
async Task ProcessChunkAsync(int i)
|
||||
{
|
||||
// Determine the start and end indices for this mapper's chunk
|
||||
var startIndex = i * chunkSize;
|
||||
var endIndex = i < mapperCount - 1 ? startIndex + chunkSize : wordList.Length;
|
||||
|
||||
// Save the indices under the mapper's Id
|
||||
await context.QueueStateUpdateAsync(this._mapperIds[i], (startIndex, endIndex), scopeName: MapReduceConstants.StateScope);
|
||||
|
||||
// Notify the mapper that data is ready
|
||||
await context.SendMessageAsync(new SplitComplete(), targetId: this._mapperIds[i]);
|
||||
}
|
||||
|
||||
// Process all the chunks
|
||||
var tasks = Enumerable.Range(0, mapperCount).Select(ProcessChunkAsync);
|
||||
await Task.WhenAll(tasks);
|
||||
}
|
||||
|
||||
private static string[] Preprocess(string data)
|
||||
{
|
||||
var lines = data.Split(s_lineSeparators, StringSplitOptions.RemoveEmptyEntries)
|
||||
.Select(line => line.Trim())
|
||||
.Where(line => !string.IsNullOrWhiteSpace(line));
|
||||
|
||||
return lines
|
||||
.SelectMany(line => line.Split(' ', StringSplitOptions.RemoveEmptyEntries))
|
||||
.Where(word => !string.IsNullOrWhiteSpace(word))
|
||||
.ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Maps each token to a count of 1 and writes pairs to a per-mapper file.
|
||||
/// </summary>
|
||||
internal sealed class Mapper(string id) : ReflectingExecutor<Mapper>(id), IMessageHandler<SplitComplete>
|
||||
{
|
||||
/// <summary>
|
||||
/// Read the assigned slice, emit (word, 1) pairs, and persist to disk.
|
||||
/// </summary>
|
||||
public async ValueTask HandleAsync(SplitComplete message, IWorkflowContext context)
|
||||
{
|
||||
var dataToProcess = await context.ReadStateAsync<string[]>(MapReduceConstants.DataToProcessKey, scopeName: MapReduceConstants.StateScope);
|
||||
var chunk = await context.ReadStateAsync<(int start, int end)>(this.Id, scopeName: MapReduceConstants.StateScope);
|
||||
|
||||
var results = dataToProcess![chunk.start..chunk.end]
|
||||
.Select(word => (word, 1))
|
||||
.ToArray();
|
||||
|
||||
// Write this mapper's results as simple text lines for easy debugging
|
||||
var filePath = Path.Combine(MapReduceConstants.TempDir, $"map_results_{this.Id}.txt");
|
||||
var lines = results.Select(r => $"{r.word}: {r.Item2}");
|
||||
await File.WriteAllLinesAsync(filePath, lines);
|
||||
|
||||
await context.SendMessageAsync(new MapComplete(filePath));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Groups intermediate pairs by key and partitions them across reducers.
|
||||
/// </summary>
|
||||
internal sealed class Shuffler(string[] reducerIds, string[] mapperIds, string id) :
|
||||
ReflectingExecutor<Shuffler>(id),
|
||||
IMessageHandler<MapComplete>
|
||||
{
|
||||
private readonly string[] _reducerIds = reducerIds;
|
||||
private readonly string[] _mapperIds = mapperIds;
|
||||
private readonly List<MapComplete> _mapResults = new();
|
||||
|
||||
/// <summary>
|
||||
/// Aggregate mapper outputs and write one partition file per reducer.
|
||||
/// </summary>
|
||||
public async ValueTask HandleAsync(MapComplete message, IWorkflowContext context)
|
||||
{
|
||||
this._mapResults.Add(message);
|
||||
|
||||
// Wait for all mappers to complete
|
||||
if (this._mapResults.Count < this._mapperIds.Length)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var chunks = await this.PreprocessAsync(this._mapResults);
|
||||
|
||||
async Task ProcessChunkAsync(List<(string key, List<int> values)> chunk, int index)
|
||||
{
|
||||
// Write one grouped partition for reducer index and notify that reducer
|
||||
var filePath = Path.Combine(MapReduceConstants.TempDir, $"shuffle_results_{index}.txt");
|
||||
var lines = chunk.Select(kvp => $"{kvp.key}: {JsonSerializer.Serialize(kvp.values)}");
|
||||
await File.WriteAllLinesAsync(filePath, lines);
|
||||
|
||||
await context.SendMessageAsync(new ShuffleComplete(filePath, this._reducerIds[index]));
|
||||
}
|
||||
|
||||
var tasks = chunks.Select((chunk, i) => ProcessChunkAsync(chunk, i));
|
||||
await Task.WhenAll(tasks);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Load all mapper files, group by key, sort keys, and partition for reducers.
|
||||
/// </summary>
|
||||
private async Task<List<List<(string key, List<int> values)>>> PreprocessAsync(List<MapComplete> data)
|
||||
{
|
||||
// Load all intermediate pairs
|
||||
var mapResults = new List<(string key, int value)>();
|
||||
foreach (var result in data)
|
||||
{
|
||||
var lines = await File.ReadAllLinesAsync(result.FilePath);
|
||||
foreach (var line in lines)
|
||||
{
|
||||
var parts = line.Split(": ");
|
||||
if (parts.Length == 2)
|
||||
{
|
||||
mapResults.Add((parts[0], int.Parse(parts[1])));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Group values by token
|
||||
var intermediateResults = mapResults
|
||||
.GroupBy(r => r.key)
|
||||
.ToDictionary(g => g.Key, g => g.Select(r => r.value).ToList());
|
||||
|
||||
// Deterministic ordering helps with debugging and test stability
|
||||
var aggregatedResults = intermediateResults
|
||||
.Select(kvp => (key: kvp.Key, values: kvp.Value))
|
||||
.OrderBy(x => x.key)
|
||||
.ToList();
|
||||
|
||||
// Partition keys across reducers as evenly as possible
|
||||
var reduceExecutorCount = this._reducerIds.Length; // Use actual number of reducers
|
||||
if (reduceExecutorCount == 0)
|
||||
{
|
||||
reduceExecutorCount = 1;
|
||||
}
|
||||
|
||||
var chunkSize = aggregatedResults.Count / reduceExecutorCount;
|
||||
var remaining = aggregatedResults.Count % reduceExecutorCount;
|
||||
|
||||
var chunks = new List<List<(string key, List<int> values)>>();
|
||||
for (int i = 0; i < aggregatedResults.Count - remaining; i += chunkSize)
|
||||
{
|
||||
chunks.Add(aggregatedResults.GetRange(i, chunkSize));
|
||||
}
|
||||
|
||||
if (remaining > 0 && chunks.Count > 0)
|
||||
{
|
||||
chunks[^1].AddRange(aggregatedResults.TakeLast(remaining));
|
||||
}
|
||||
else if (chunks.Count == 0)
|
||||
{
|
||||
chunks.Add(aggregatedResults);
|
||||
}
|
||||
|
||||
return chunks;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sums grouped counts per key for its assigned partition.
|
||||
/// </summary>
|
||||
internal sealed class Reducer(string id) : ReflectingExecutor<Reducer>(id), IMessageHandler<ShuffleComplete>
|
||||
{
|
||||
/// <summary>
|
||||
/// Read one shuffle partition and reduce it to totals.
|
||||
/// </summary>
|
||||
public async ValueTask HandleAsync(ShuffleComplete message, IWorkflowContext context)
|
||||
{
|
||||
if (message.ReducerId != this.Id)
|
||||
{
|
||||
// This partition belongs to a different reducer. Skip.
|
||||
return;
|
||||
}
|
||||
|
||||
// Read grouped values from the shuffle output
|
||||
var lines = await File.ReadAllLinesAsync(message.FilePath);
|
||||
|
||||
// Sum values per key. Values are serialized JSON arrays like [1, 1, ...]
|
||||
var reducedResults = new Dictionary<string, int>();
|
||||
foreach (var line in lines)
|
||||
{
|
||||
var parts = line.Split(": ", 2);
|
||||
if (parts.Length == 2)
|
||||
{
|
||||
var key = parts[0];
|
||||
var values = JsonSerializer.Deserialize<List<int>>(parts[1]);
|
||||
reducedResults[key] = values?.Sum() ?? 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Persist our partition totals
|
||||
var filePath = Path.Combine(MapReduceConstants.TempDir, $"reduced_results_{this.Id}.txt");
|
||||
var outputLines = reducedResults.Select(kvp => $"{kvp.Key}: {kvp.Value}");
|
||||
await File.WriteAllLinesAsync(filePath, outputLines);
|
||||
|
||||
await context.SendMessageAsync(new ReduceComplete(filePath));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Joins all reducer outputs and yields the final output.
|
||||
/// </summary>
|
||||
internal sealed class CompletionExecutor(string id) :
|
||||
ReflectingExecutor<CompletionExecutor>(id),
|
||||
IMessageHandler<List<ReduceComplete>>
|
||||
{
|
||||
/// <summary>
|
||||
/// Collect reducer output file paths and yield final output.
|
||||
/// </summary>
|
||||
public async ValueTask HandleAsync(List<ReduceComplete> message, IWorkflowContext context)
|
||||
{
|
||||
var filePaths = message.ConvertAll(r => r.FilePath);
|
||||
await context.YieldOutputAsync(filePaths);
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Events
|
||||
|
||||
/// <summary>
|
||||
/// Marker event published when splitting finishes. Triggers map executors.
|
||||
/// </summary>
|
||||
internal sealed class SplitComplete : WorkflowEvent;
|
||||
|
||||
/// <summary>
|
||||
/// Signal that a mapper wrote its intermediate pairs to file.
|
||||
/// </summary>
|
||||
internal sealed class MapComplete(string FilePath) : WorkflowEvent
|
||||
{
|
||||
public string FilePath { get; } = FilePath;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Signal that a shuffle partition file is ready for a specific reducer.
|
||||
/// </summary>
|
||||
internal sealed class ShuffleComplete(string FilePath, string ReducerId) : WorkflowEvent
|
||||
{
|
||||
public string FilePath { get; } = FilePath;
|
||||
public string ReducerId { get; } = ReducerId;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Signal that a reducer wrote final counts for its partition.
|
||||
/// </summary>
|
||||
internal sealed class ReduceComplete(string FilePath) : WorkflowEvent
|
||||
{
|
||||
public string FilePath { get; } = FilePath;
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Helpers
|
||||
|
||||
/// <summary>
|
||||
/// Provides constant values used in the MapReduce workflow.
|
||||
/// </summary>
|
||||
/// <remarks>This class contains keys and paths that are utilized throughout the MapReduce process, including
|
||||
/// identifiers for data processing and temporary storage locations.</remarks>
|
||||
internal static class MapReduceConstants
|
||||
{
|
||||
public static string DataToProcessKey = "data_to_be_processed";
|
||||
public static string TempDir = Path.Combine(Path.GetTempPath(), "workflow_viz_sample");
|
||||
public static string StateScope = "MapReduceState";
|
||||
}
|
||||
|
||||
#endregion
|
||||
@@ -0,0 +1,42 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using Microsoft.Agents.AI.Workflows;
|
||||
|
||||
namespace WorkflowVisualizationSample;
|
||||
|
||||
/// <summary>
|
||||
/// Sample demonstrating workflow visualization using Mermaid and DOT (Graphviz) formats.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// This sample shows how to use the ToMermaidString() and ToDotString() extension methods
|
||||
/// to generate visual representations of workflow graphs. The visualizations can be used
|
||||
/// for documentation, debugging, and understanding complex workflow structures.
|
||||
/// </remarks>
|
||||
internal static class Program
|
||||
{
|
||||
/// <summary>
|
||||
/// Entry point that generates and displays workflow visualizations in Mermaid and DOT formats.
|
||||
/// </summary>
|
||||
/// <param name="args">Command line arguments (not used).</param>
|
||||
private static void Main(string[] args)
|
||||
{
|
||||
// Step 1: Build the workflow you want to visualize
|
||||
Workflow workflow = WorkflowMapReduceSample.Program.BuildWorkflow();
|
||||
|
||||
// Step 2: Generate and display workflow visualization
|
||||
Console.WriteLine("Generating workflow visualization...");
|
||||
|
||||
// Mermaid
|
||||
Console.WriteLine("Mermaid string: \n=======");
|
||||
var mermaid = workflow.ToMermaidString();
|
||||
Console.WriteLine(mermaid);
|
||||
Console.WriteLine("=======");
|
||||
|
||||
// DOT
|
||||
Console.WriteLine("DiGraph string: *** Tip: To export DOT as an image, install Graphviz and pipe the DOT output to 'dot -Tsvg', 'dot -Tpng', etc. *** \n=======");
|
||||
var dotString = workflow.ToDotString();
|
||||
Console.WriteLine(dotString);
|
||||
Console.WriteLine("=======");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
# Workflow Visualization Sample
|
||||
|
||||
This sample demonstrates how to visualize workflows using `ToMermaidString()` and `ToDotString()` extension methods. It uses a map-reduce workflow with fan-out/fan-in patterns as an example.
|
||||
|
||||
## Running the Sample
|
||||
|
||||
```bash
|
||||
dotnet run
|
||||
```
|
||||
|
||||
## Output Formats
|
||||
|
||||
The sample generates two visualization formats:
|
||||
|
||||
### Mermaid
|
||||
Paste the output into any Mermaid-compatible viewer (GitHub, Mermaid Live Editor, etc.):
|
||||
|
||||

|
||||
|
||||
### DOT (Graphviz)
|
||||
Render with Graphviz (requires `graphviz` to be installed):
|
||||
|
||||
```bash
|
||||
dotnet run | tail -n +20 | dot -Tpng -o workflow.png
|
||||
```
|
||||
|
||||

|
||||
|
||||
## Usage
|
||||
|
||||
```csharp
|
||||
Workflow workflow = BuildWorkflow();
|
||||
|
||||
// Generate Mermaid format
|
||||
string mermaid = workflow.ToMermaidString();
|
||||
|
||||
// Generate DOT format
|
||||
string dotString = workflow.ToDotString();
|
||||
```
|
||||
|
||||
## Related Samples
|
||||
|
||||
- [Map-Reduce Workflow Sample](../../../Workflows/Visualization/README.md): The workflow implementation being visualized
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 66 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 304 KiB |
@@ -0,0 +1,16 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net9.0</TargetFramework>
|
||||
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>disable</ImplicitUsings>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.Workflows\Microsoft.Agents.AI.Workflows.csproj" />
|
||||
<ProjectReference Include="..\Concurrent\MapReduce\MapReduce.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,283 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.Linq;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using Microsoft.Shared.Diagnostics;
|
||||
|
||||
namespace Microsoft.Agents.AI.Workflows;
|
||||
|
||||
/// <summary>
|
||||
/// Provides visualization utilities for workflows using Graphviz DOT format.
|
||||
/// </summary>
|
||||
public static class WorkflowVisualizer
|
||||
{
|
||||
/// <summary>
|
||||
/// Export the workflow as a DOT format digraph string.
|
||||
/// </summary>
|
||||
/// <returns>A string representation of the workflow in DOT format.</returns>
|
||||
public static string ToDotString(this Workflow workflow)
|
||||
{
|
||||
Throw.IfNull(workflow, nameof(workflow));
|
||||
|
||||
var lines = new List<string>
|
||||
{
|
||||
"digraph Workflow {",
|
||||
" rankdir=TD;", // Top to bottom layout
|
||||
" node [shape=box, style=filled, fillcolor=lightblue];",
|
||||
" edge [color=black, arrowhead=vee];",
|
||||
""
|
||||
};
|
||||
|
||||
// Emit the top-level workflow nodes/edges
|
||||
EmitWorkflowDigraph(workflow, lines, " ");
|
||||
|
||||
// Emit sub-workflows hosted by WorkflowExecutor as nested clusters
|
||||
EmitSubWorkflowsDigraph(workflow, lines, " ");
|
||||
|
||||
lines.Add("}");
|
||||
return string.Join("\n", lines);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Converts the specified <see cref="Workflow"/> into a Mermaid.js diagram representation.
|
||||
/// </summary>
|
||||
/// <remarks>This method generates a textual representation of the workflow in the Mermaid.js format,
|
||||
/// which can be used to visualize workflows as diagrams. The output is formatted with indentation for
|
||||
/// readability.</remarks>
|
||||
/// <param name="workflow">The workflow to be converted into a Mermaid.js diagram. Cannot be null.</param>
|
||||
/// <returns>A string containing the Mermaid.js representation of the workflow.</returns>
|
||||
public static string ToMermaidString(this Workflow workflow)
|
||||
{
|
||||
List<string> lines = ["flowchart TD"];
|
||||
|
||||
EmitWorkflowMermaid(workflow, lines, " ");
|
||||
return string.Join("\n", lines);
|
||||
}
|
||||
|
||||
#region Private Implementation
|
||||
|
||||
private static void EmitWorkflowDigraph(Workflow workflow, List<string> lines, string indent, string? ns = null)
|
||||
{
|
||||
string MapId(string id) => ns != null ? $"{ns}/{id}" : id;
|
||||
|
||||
// Add start node
|
||||
var startExecutorId = workflow.StartExecutorId;
|
||||
lines.Add($"{indent}\"{MapId(startExecutorId)}\" [fillcolor=lightgreen, label=\"{startExecutorId}\\n(Start)\"];");
|
||||
|
||||
// Add other executor nodes
|
||||
foreach (var executorId in workflow.Registrations.Keys)
|
||||
{
|
||||
if (executorId != startExecutorId)
|
||||
{
|
||||
lines.Add($"{indent}\"{MapId(executorId)}\" [label=\"{executorId}\"];");
|
||||
}
|
||||
}
|
||||
|
||||
// Compute and emit fan-in nodes
|
||||
var fanInDescriptors = ComputeFanInDescriptors(workflow);
|
||||
if (fanInDescriptors.Count > 0)
|
||||
{
|
||||
lines.Add("");
|
||||
foreach (var (nodeId, _, _) in fanInDescriptors)
|
||||
{
|
||||
lines.Add($"{indent}\"{MapId(nodeId)}\" [shape=ellipse, fillcolor=lightgoldenrod, label=\"fan-in\"];");
|
||||
}
|
||||
}
|
||||
|
||||
// Emit fan-in edges
|
||||
foreach (var (nodeId, sources, target) in fanInDescriptors)
|
||||
{
|
||||
foreach (var src in sources)
|
||||
{
|
||||
lines.Add($"{indent}\"{MapId(src)}\" -> \"{MapId(nodeId)}\";");
|
||||
}
|
||||
lines.Add($"{indent}\"{MapId(nodeId)}\" -> \"{MapId(target)}\";");
|
||||
}
|
||||
|
||||
// Emit normal edges
|
||||
foreach (var (src, target, isConditional) in ComputeNormalEdges(workflow))
|
||||
{
|
||||
var edgeAttr = isConditional ? " [style=dashed, label=\"conditional\"]" : "";
|
||||
lines.Add($"{indent}\"{MapId(src)}\" -> \"{MapId(target)}\"{edgeAttr};");
|
||||
}
|
||||
}
|
||||
|
||||
private static void EmitSubWorkflowsDigraph(Workflow workflow, List<string> lines, string indent)
|
||||
{
|
||||
foreach (var kvp in workflow.Registrations)
|
||||
{
|
||||
var execId = kvp.Key;
|
||||
var registration = kvp.Value;
|
||||
// Check if this is a WorkflowExecutor with a nested workflow
|
||||
if (TryGetNestedWorkflow(registration, out var nestedWorkflow))
|
||||
{
|
||||
var subgraphId = $"cluster_{ComputeShortHash(execId)}";
|
||||
lines.Add($"{indent}subgraph {subgraphId} {{");
|
||||
lines.Add($"{indent} label=\"sub-workflow: {execId}\";");
|
||||
lines.Add($"{indent} style=dashed;");
|
||||
|
||||
// Emit the nested workflow inside this cluster using a namespace
|
||||
EmitWorkflowDigraph(nestedWorkflow, lines, $"{indent} ", execId);
|
||||
|
||||
// Recurse into deeper nested sub-workflows
|
||||
EmitSubWorkflowsDigraph(nestedWorkflow, lines, $"{indent} ");
|
||||
|
||||
lines.Add($"{indent}}}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void EmitWorkflowMermaid(Workflow workflow, List<string> lines, string indent, string? ns = null)
|
||||
{
|
||||
string sanitize(string input)
|
||||
{
|
||||
return input;
|
||||
}
|
||||
|
||||
string MapId(string id) => ns != null ? $"{sanitize(ns)}/{sanitize(id)}" : id;
|
||||
|
||||
// Add start node
|
||||
var startExecutorId = workflow.StartExecutorId;
|
||||
lines.Add($"{indent}{MapId(startExecutorId)}[\"{startExecutorId} (Start)\"];");
|
||||
|
||||
// Add other executor nodes
|
||||
foreach (var executorId in workflow.Registrations.Keys)
|
||||
{
|
||||
if (executorId != startExecutorId)
|
||||
{
|
||||
lines.Add($"{indent}{MapId(executorId)}[\"{executorId}\"];");
|
||||
}
|
||||
}
|
||||
|
||||
// Compute and emit fan-in nodes
|
||||
var fanInDescriptors = ComputeFanInDescriptors(workflow);
|
||||
if (fanInDescriptors.Count > 0)
|
||||
{
|
||||
lines.Add("");
|
||||
foreach (var (nodeId, _, _) in fanInDescriptors)
|
||||
{
|
||||
lines.Add($"{indent}{MapId(nodeId)}((fan-in))");
|
||||
}
|
||||
}
|
||||
|
||||
// Emit fan-in edges
|
||||
foreach (var (nodeId, sources, target) in fanInDescriptors)
|
||||
{
|
||||
foreach (var src in sources)
|
||||
{
|
||||
lines.Add($"{indent}{MapId(src)} --> {MapId(nodeId)};");
|
||||
}
|
||||
lines.Add($"{indent}{MapId(nodeId)} --> {MapId(target)};");
|
||||
}
|
||||
|
||||
// Emit normal edges
|
||||
foreach (var (src, target, isConditional) in ComputeNormalEdges(workflow))
|
||||
{
|
||||
if (isConditional)
|
||||
{
|
||||
lines.Add($"{indent}{MapId(src)} -. conditional .--> {MapId(target)};");
|
||||
}
|
||||
else
|
||||
{
|
||||
lines.Add($"{indent}{MapId(src)} --> {MapId(target)};");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static List<(string NodeId, List<string> Sources, string Target)> ComputeFanInDescriptors(Workflow workflow)
|
||||
{
|
||||
var result = new List<(string, List<string>, string)>();
|
||||
var seen = new HashSet<string>();
|
||||
|
||||
foreach (var edgeGroup in workflow.Edges.Values.SelectMany(x => x))
|
||||
{
|
||||
if (edgeGroup.Kind == EdgeKind.FanIn && edgeGroup.FanInEdgeData != null)
|
||||
{
|
||||
var fanInData = edgeGroup.FanInEdgeData;
|
||||
var target = fanInData.SinkId;
|
||||
var sources = fanInData.SourceIds.ToList();
|
||||
var digest = ComputeFanInDigest(target, sources);
|
||||
var nodeId = $"fan_in::{target}::{digest}";
|
||||
|
||||
// Avoid duplicates - the same fan-in edge group might appear in multiple source executor lists
|
||||
if (seen.Add(nodeId))
|
||||
{
|
||||
result.Add((nodeId, sources.OrderBy(x => x, StringComparer.Ordinal).ToList(), target));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static List<(string Source, string Target, bool IsConditional)> ComputeNormalEdges(Workflow workflow)
|
||||
{
|
||||
var edges = new List<(string, string, bool)>();
|
||||
foreach (var edgeGroup in workflow.Edges.Values.SelectMany(x => x))
|
||||
{
|
||||
if (edgeGroup.Kind == EdgeKind.FanIn)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (edgeGroup.Kind)
|
||||
{
|
||||
case EdgeKind.Direct when edgeGroup.DirectEdgeData != null:
|
||||
var directData = edgeGroup.DirectEdgeData;
|
||||
var isConditional = directData.Condition != null;
|
||||
edges.Add((directData.SourceId, directData.SinkId, isConditional));
|
||||
break;
|
||||
|
||||
case EdgeKind.FanOut when edgeGroup.FanOutEdgeData != null:
|
||||
var fanOutData = edgeGroup.FanOutEdgeData;
|
||||
foreach (var sinkId in fanOutData.SinkIds)
|
||||
{
|
||||
edges.Add((fanOutData.SourceId, sinkId, false));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return edges;
|
||||
}
|
||||
|
||||
private static string ComputeFanInDigest(string target, List<string> sources)
|
||||
{
|
||||
var sortedSources = sources.OrderBy(x => x, StringComparer.Ordinal).ToList();
|
||||
var input = target + "|" + string.Join("|", sortedSources);
|
||||
using (var sha256 = SHA256.Create())
|
||||
{
|
||||
return ComputeShortHash(input);
|
||||
}
|
||||
}
|
||||
|
||||
private static string ComputeShortHash(string input)
|
||||
{
|
||||
#if !NET
|
||||
using var sha256 = SHA256.Create();
|
||||
var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(input));
|
||||
return BitConverter.ToString(hash).Replace("-", "").Substring(0, 8).ToUpperInvariant();
|
||||
#else
|
||||
var hash = SHA256.HashData(Encoding.UTF8.GetBytes(input));
|
||||
return Convert.ToHexString(hash).Substring(0, 8);
|
||||
#endif
|
||||
}
|
||||
|
||||
private static bool TryGetNestedWorkflow(ExecutorRegistration registration, [NotNullWhen(true)] out Workflow? workflow)
|
||||
{
|
||||
if (registration.RawExecutorishData is Workflow subWorkflow)
|
||||
{
|
||||
workflow = subWorkflow;
|
||||
return true;
|
||||
}
|
||||
|
||||
workflow = null;
|
||||
return false;
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
@@ -7,7 +7,6 @@ using System.Collections;
|
||||
using SystemEnvironment = System.Environment;
|
||||
|
||||
namespace SampleHelpers;
|
||||
|
||||
internal static class SampleEnvironment
|
||||
{
|
||||
public static string? GetEnvironmentVariable(string key)
|
||||
|
||||
@@ -0,0 +1,397 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using FluentAssertions;
|
||||
|
||||
namespace Microsoft.Agents.AI.Workflows.UnitTests;
|
||||
|
||||
public class WorkflowVisualizerTests
|
||||
{
|
||||
private sealed class MockExecutor(string id) : Executor(id)
|
||||
{
|
||||
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
|
||||
routeBuilder.AddHandler<string>((msg, ctx) => ctx.SendMessageAsync(msg));
|
||||
}
|
||||
|
||||
private sealed class ListStrTargetExecutor(string id) : Executor(id)
|
||||
{
|
||||
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
|
||||
routeBuilder.AddHandler<string[]>((msgs, ctx) => ctx.SendMessageAsync(string.Join(",", msgs)));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_ToDotString_Basic()
|
||||
{
|
||||
// Create a simple workflow
|
||||
var executor1 = new MockExecutor("executor1");
|
||||
var executor2 = new MockExecutor("executor2");
|
||||
|
||||
var workflow = new WorkflowBuilder("executor1")
|
||||
.AddEdge(executor1, executor2)
|
||||
.Build();
|
||||
|
||||
var dotContent = workflow.ToDotString();
|
||||
|
||||
// Check that the DOT content contains expected elements
|
||||
dotContent.Should().Contain("digraph Workflow {");
|
||||
dotContent.Should().Contain("\"executor1\"");
|
||||
dotContent.Should().Contain("\"executor2\"");
|
||||
dotContent.Should().Contain("\"executor1\" -> \"executor2\"");
|
||||
dotContent.Should().Contain("fillcolor=lightgreen"); // Start executor styling
|
||||
dotContent.Should().Contain("(Start)");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_Complex_Workflow()
|
||||
{
|
||||
// Test visualization of a more complex workflow
|
||||
var executor1 = new MockExecutor("start");
|
||||
var executor2 = new MockExecutor("middle1");
|
||||
var executor3 = new MockExecutor("middle2");
|
||||
var executor4 = new MockExecutor("end");
|
||||
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddEdge(executor1, executor2)
|
||||
.AddEdge(executor1, executor3)
|
||||
.AddEdge(executor2, executor4)
|
||||
.AddEdge(executor3, executor4)
|
||||
.Build();
|
||||
|
||||
var dotContent = workflow.ToDotString();
|
||||
|
||||
// Check all executors are present
|
||||
dotContent.Should().Contain("\"start\"");
|
||||
dotContent.Should().Contain("\"middle1\"");
|
||||
dotContent.Should().Contain("\"middle2\"");
|
||||
dotContent.Should().Contain("\"end\"");
|
||||
|
||||
// Check all edges are present
|
||||
dotContent.Should().Contain("\"start\" -> \"middle1\"");
|
||||
dotContent.Should().Contain("\"start\" -> \"middle2\"");
|
||||
dotContent.Should().Contain("\"middle1\" -> \"end\"");
|
||||
dotContent.Should().Contain("\"middle2\" -> \"end\"");
|
||||
|
||||
// Check start executor has special styling
|
||||
dotContent.Should().Contain("fillcolor=lightgreen");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_Conditional_Edge()
|
||||
{
|
||||
// Test that conditional edges are rendered dashed with a label
|
||||
var start = new MockExecutor("start");
|
||||
var mid = new MockExecutor("mid");
|
||||
var end = new MockExecutor("end");
|
||||
|
||||
// Condition that is never used during viz, but presence should mark the edge
|
||||
bool OnlyIfFoo(string? msg) => msg == "foo";
|
||||
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddEdge<string>(start, mid, OnlyIfFoo)
|
||||
.AddEdge(mid, end)
|
||||
.Build();
|
||||
|
||||
var dotContent = workflow.ToDotString();
|
||||
|
||||
// Conditional edge should be dashed and labeled
|
||||
dotContent.Should().Contain("\"start\" -> \"mid\" [style=dashed, label=\"conditional\"];");
|
||||
// Non-conditional edge should be plain
|
||||
dotContent.Should().Contain("\"mid\" -> \"end\"");
|
||||
dotContent.Should().NotContain("\"mid\" -> \"end\" [style=dashed");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_FanIn_EdgeGroup()
|
||||
{
|
||||
// Test that fan-in edges render an intermediate node with label and routed edges
|
||||
var start = new MockExecutor("start");
|
||||
var s1 = new MockExecutor("s1");
|
||||
var s2 = new MockExecutor("s2");
|
||||
var t = new ListStrTargetExecutor("t");
|
||||
|
||||
// Build a connected workflow: start fans out to s1 and s2, which then fan-in to t
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddFanOutEdge(start, s1, s2)
|
||||
.AddFanInEdge(t, s1, s2) // AddFanInEdge(target, sources)
|
||||
.Build();
|
||||
|
||||
var dotContent = workflow.ToDotString();
|
||||
|
||||
// There should be a single fan-in node with special styling and label
|
||||
var lines = dotContent.Split('\n');
|
||||
var fanInLines = Array.FindAll(lines, line =>
|
||||
line.Contains("shape=ellipse") && line.Contains("label=\"fan-in\""));
|
||||
fanInLines.Should().HaveCount(1);
|
||||
|
||||
// Extract the intermediate node id from the line
|
||||
var fanInLine = fanInLines[0];
|
||||
var firstQuote = fanInLine.IndexOf('"');
|
||||
var secondQuote = fanInLine.IndexOf('"', firstQuote + 1);
|
||||
firstQuote.Should().BeGreaterThan(-1);
|
||||
secondQuote.Should().BeGreaterThan(-1);
|
||||
var fanInNodeId = fanInLine.Substring(firstQuote + 1, secondQuote - firstQuote - 1);
|
||||
fanInNodeId.Should().NotBeNullOrEmpty();
|
||||
|
||||
// Edges should be routed through the intermediate node, not direct to target
|
||||
dotContent.Should().Contain($"\"s1\" -> \"{fanInNodeId}\";");
|
||||
dotContent.Should().Contain($"\"s2\" -> \"{fanInNodeId}\";");
|
||||
dotContent.Should().Contain($"\"{fanInNodeId}\" -> \"t\";");
|
||||
|
||||
// Ensure direct edges are not present
|
||||
dotContent.Should().NotContain("\"s1\" -> \"t\"");
|
||||
dotContent.Should().NotContain("\"s2\" -> \"t\"");
|
||||
}
|
||||
|
||||
// Note: Sub-workflow tests are commented out as the current implementation
|
||||
// of TryGetNestedWorkflow returns false. These can be enabled once
|
||||
// WorkflowExecutor detection is implemented.
|
||||
|
||||
/*
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_SubWorkflow_Digraph()
|
||||
{
|
||||
// Test that WorkflowViz can visualize sub-workflows in DOT format
|
||||
// This test would require WorkflowExecutor implementation
|
||||
// Currently TryGetNestedWorkflow always returns false
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_Nested_SubWorkflows()
|
||||
{
|
||||
// Test visualization of deeply nested sub-workflows
|
||||
// This test would require WorkflowExecutor implementation
|
||||
// Currently TryGetNestedWorkflow always returns false
|
||||
}
|
||||
*/
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_FanOut_Edges()
|
||||
{
|
||||
// Test fan-out edge visualization
|
||||
var start = new MockExecutor("start");
|
||||
var target1 = new MockExecutor("target1");
|
||||
var target2 = new MockExecutor("target2");
|
||||
var target3 = new MockExecutor("target3");
|
||||
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddFanOutEdge(start, target1, target2, target3)
|
||||
.Build();
|
||||
|
||||
var dotContent = workflow.ToDotString();
|
||||
|
||||
// Check all fan-out edges are present
|
||||
dotContent.Should().Contain("\"start\" -> \"target1\"");
|
||||
dotContent.Should().Contain("\"start\" -> \"target2\"");
|
||||
dotContent.Should().Contain("\"start\" -> \"target3\"");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_Mixed_EdgeTypes()
|
||||
{
|
||||
// Test workflow with mixed edge types (direct, conditional, fan-out, fan-in)
|
||||
var start = new MockExecutor("start");
|
||||
var a = new MockExecutor("a");
|
||||
var b = new MockExecutor("b");
|
||||
var c = new MockExecutor("c");
|
||||
var end = new ListStrTargetExecutor("end");
|
||||
|
||||
bool Condition(string? msg) => msg?.Contains("test") ?? false;
|
||||
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddEdge<string>(start, a, Condition) // Conditional edge
|
||||
.AddFanOutEdge(a, b, c) // Fan-out
|
||||
.AddFanInEdge(end, b, c) // Fan-in - AddFanInEdge(target, sources)
|
||||
.Build();
|
||||
|
||||
var dotContent = workflow.ToDotString();
|
||||
|
||||
// Check conditional edge
|
||||
dotContent.Should().Contain("\"start\" -> \"a\" [style=dashed, label=\"conditional\"];");
|
||||
|
||||
// Check fan-out edges
|
||||
dotContent.Should().Contain("\"a\" -> \"b\"");
|
||||
dotContent.Should().Contain("\"a\" -> \"c\"");
|
||||
|
||||
// Check fan-in (should have intermediate node)
|
||||
dotContent.Should().Contain("shape=ellipse");
|
||||
dotContent.Should().Contain("label=\"fan-in\"");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_SingleNode_Workflow()
|
||||
{
|
||||
// Test visualization of a single-node workflow
|
||||
var executor = new MockExecutor("single");
|
||||
|
||||
var workflow = new WorkflowBuilder("single")
|
||||
.BindExecutor(executor)
|
||||
.Build();
|
||||
|
||||
var dotContent = workflow.ToDotString();
|
||||
|
||||
// Check single node is present with start styling
|
||||
dotContent.Should().Contain("\"single\"");
|
||||
dotContent.Should().Contain("fillcolor=lightgreen");
|
||||
dotContent.Should().Contain("(Start)");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_SelfLoop_Edge()
|
||||
{
|
||||
// Test visualization of self-loop edge
|
||||
var executor = new MockExecutor("loop");
|
||||
|
||||
bool LoopCondition(string? msg) => (msg?.Length ?? 0) < 10;
|
||||
|
||||
var workflow = new WorkflowBuilder("loop")
|
||||
.AddEdge<string>(executor, executor, LoopCondition)
|
||||
.Build();
|
||||
|
||||
var dotContent = workflow.ToDotString();
|
||||
|
||||
// Check self-loop edge is present and conditional
|
||||
dotContent.Should().Contain("\"loop\" -> \"loop\" [style=dashed, label=\"conditional\"];");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_ToMermaidString_Basic()
|
||||
{
|
||||
// Test that WorkflowViz can generate a Mermaid diagram
|
||||
var executor1 = new MockExecutor("executor1");
|
||||
var executor2 = new MockExecutor("executor2");
|
||||
|
||||
var workflow = new WorkflowBuilder("executor1")
|
||||
.AddEdge(executor1, executor2)
|
||||
.Build();
|
||||
|
||||
var mermaidContent = workflow.ToMermaidString();
|
||||
|
||||
// Check that the Mermaid content contains expected elements
|
||||
mermaidContent.Should().Contain("flowchart TD");
|
||||
mermaidContent.Should().Contain("executor1[\"executor1 (Start)\"]");
|
||||
mermaidContent.Should().Contain("executor2[\"executor2\"]");
|
||||
mermaidContent.Should().Contain("executor1 --> executor2");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_Mermaid_Conditional_Edge()
|
||||
{
|
||||
// Test that conditional edges are rendered with dotted lines and labels in Mermaid
|
||||
var start = new MockExecutor("start");
|
||||
var mid = new MockExecutor("mid");
|
||||
var end = new MockExecutor("end");
|
||||
|
||||
bool OnlyIfFoo(string? msg) => msg == "foo";
|
||||
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddEdge<string>(start, mid, OnlyIfFoo)
|
||||
.AddEdge(mid, end)
|
||||
.Build();
|
||||
|
||||
var mermaidContent = workflow.ToMermaidString();
|
||||
|
||||
// Conditional edge should be dotted with label
|
||||
mermaidContent.Should().Contain("start -. conditional .--> mid");
|
||||
// Non-conditional edge should be solid
|
||||
mermaidContent.Should().Contain("mid --> end");
|
||||
mermaidContent.Should().NotContain("end -. conditional");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_Mermaid_FanIn_EdgeGroup()
|
||||
{
|
||||
// Test that fan-in edges render an intermediate node with label and routed edges in Mermaid
|
||||
var start = new MockExecutor("start");
|
||||
var s1 = new MockExecutor("s1");
|
||||
var s2 = new MockExecutor("s2");
|
||||
var t = new ListStrTargetExecutor("t");
|
||||
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddFanOutEdge(start, s1, s2)
|
||||
.AddFanInEdge(t, s1, s2)
|
||||
.Build();
|
||||
|
||||
var mermaidContent = workflow.ToMermaidString();
|
||||
|
||||
// There should be a fan-in node with special styling
|
||||
var lines = mermaidContent.Split('\n');
|
||||
var fanInLines = Array.FindAll(lines, line => line.Contains("((fan-in))"));
|
||||
fanInLines.Should().HaveCount(1);
|
||||
|
||||
// Extract the intermediate node id from the line
|
||||
var fanInLine = fanInLines[0].Trim();
|
||||
var fanInNodeId = fanInLine.Substring(0, fanInLine.IndexOf("((fan-in))", StringComparison.Ordinal)).Trim();
|
||||
fanInNodeId.Should().NotBeNullOrEmpty();
|
||||
|
||||
// Edges should be routed through the intermediate node
|
||||
mermaidContent.Should().Contain($"s1 --> {fanInNodeId}");
|
||||
mermaidContent.Should().Contain($"s2 --> {fanInNodeId}");
|
||||
mermaidContent.Should().Contain($"{fanInNodeId} --> t");
|
||||
|
||||
// Ensure direct edges are not present
|
||||
mermaidContent.Should().NotContain("s1 --> t");
|
||||
mermaidContent.Should().NotContain("s2 --> t");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_Mermaid_Complex_Workflow()
|
||||
{
|
||||
// Test Mermaid visualization of a more complex workflow
|
||||
var executor1 = new MockExecutor("start");
|
||||
var executor2 = new MockExecutor("middle1");
|
||||
var executor3 = new MockExecutor("middle2");
|
||||
var executor4 = new MockExecutor("end");
|
||||
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddEdge(executor1, executor2)
|
||||
.AddEdge(executor1, executor3)
|
||||
.AddEdge(executor2, executor4)
|
||||
.AddEdge(executor3, executor4)
|
||||
.Build();
|
||||
|
||||
var mermaidContent = workflow.ToMermaidString();
|
||||
|
||||
// Check all executors are present
|
||||
mermaidContent.Should().Contain("start[\"start (Start)\"]");
|
||||
mermaidContent.Should().Contain("middle1[\"middle1\"]");
|
||||
mermaidContent.Should().Contain("middle2[\"middle2\"]");
|
||||
mermaidContent.Should().Contain("end[\"end\"]");
|
||||
|
||||
// Check all edges are present
|
||||
mermaidContent.Should().Contain("start --> middle1");
|
||||
mermaidContent.Should().Contain("start --> middle2");
|
||||
mermaidContent.Should().Contain("middle1 --> end");
|
||||
mermaidContent.Should().Contain("middle2 --> end");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_WorkflowViz_Mermaid_Mixed_EdgeTypes()
|
||||
{
|
||||
// Test Mermaid workflow with mixed edge types (direct, conditional, fan-out, fan-in)
|
||||
var start = new MockExecutor("start");
|
||||
var a = new MockExecutor("a");
|
||||
var b = new MockExecutor("b");
|
||||
var c = new MockExecutor("c");
|
||||
var end = new ListStrTargetExecutor("end");
|
||||
|
||||
bool Condition(string? msg) => msg?.Contains("test") ?? false;
|
||||
|
||||
var workflow = new WorkflowBuilder("start")
|
||||
.AddEdge<string>(start, a, Condition) // Conditional edge
|
||||
.AddFanOutEdge(a, b, c) // Fan-out
|
||||
.AddFanInEdge(end, b, c) // Fan-in
|
||||
.Build();
|
||||
|
||||
var mermaidContent = workflow.ToMermaidString();
|
||||
|
||||
// Check conditional edge
|
||||
mermaidContent.Should().Contain("start -. conditional .--> a");
|
||||
|
||||
// Check fan-out edges
|
||||
mermaidContent.Should().Contain("a --> b");
|
||||
mermaidContent.Should().Contain("a --> c");
|
||||
|
||||
// Check fan-in (should have intermediate node)
|
||||
mermaidContent.Should().Contain("((fan-in))");
|
||||
}
|
||||
}
|
||||
+1
-1
@@ -299,7 +299,7 @@ async def main():
|
||||
print("Tip: Install 'viz' extra to export workflow visualization: pip install agent-framework[viz]")
|
||||
|
||||
# Step 3: Open the text file and read its content.
|
||||
async with aiofiles.open(os.path.join(DIR, "resources", "long_text.txt"), "r") as f:
|
||||
async with aiofiles.open(os.path.join(DIR, "../resources", "long_text.txt"), "r") as f:
|
||||
raw_text = await f.read()
|
||||
|
||||
# Step 4: Run the workflow with the raw text as input.
|
||||
|
||||
Generated
+1
-1
@@ -5942,4 +5942,4 @@ source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/e3/02/0f2892c661036d50ede074e376733dca2ae7c6eb617489437771209d4180/zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166", size = 25547, upload-time = "2025-06-08T17:06:39.4Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276, upload-time = "2025-06-08T17:06:38.034Z" },
|
||||
]
|
||||
]
|
||||
Reference in New Issue
Block a user