// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
namespace WorkflowMapReduceSample;
///
/// Sample: Map-Reduce Word Count with Fan-Out and Fan-In over File-Backed Intermediate Results
///
/// The workflow splits a large text into chunks, maps words to counts in parallel,
/// shuffles intermediate pairs to reducers, then reduces to per-word totals.
/// It also demonstrates workflow visualization for graph visualization.
///
/// Purpose:
/// Show how to:
/// - Partition input once and coordinate parallel mappers with shared state.
/// - Implement map, shuffle, and reduce executors that pass file paths instead of large payloads.
/// - Use fan-out and fan-in edges to express parallelism and joins.
/// - Persist intermediate results to disk to bound memory usage for large inputs.
/// - Visualize the workflow graph using ToDotString and ToMermaidString and export to SVG.
///
///
/// Pre-requisites:
/// - Write access to a temp directory.
/// - A source text file to process.
///
public static class Program
{
private static async Task Main()
{
Workflow workflow = BuildWorkflow();
await RunWorkflowAsync(workflow);
}
///
/// Builds a map-reduce workflow using a fan-out/fan-in pattern with mappers, reducers, and other executors.
///
/// This method constructs a workflow consisting of multiple stages, including splitting,
/// mapping, shuffling, reducing, and completion. The workflow is designed to process data in parallel using a
/// fan-out/fan-in architecture. The resulting workflow is ready for execution and includes all necessary
/// dependencies between the executors.
/// A instance representing the constructed workflow.
public static Workflow BuildWorkflow()
{
// Step 1: Create the mappers and the input splitter
var mappers = Enumerable.Range(0, 3).Select(i => new Mapper($"map_executor_{i}")).ToArray();
var splitter = new Split(mappers.Select(m => m.Id).ToArray(), "split_data_executor");
// Step 2: Create the reducers and the intermidiace shuffler
var reducers = Enumerable.Range(0, 4).Select(i => new Reducer($"reduce_executor_{i}")).ToArray();
var shuffler = new Shuffler(reducers.Select(r => r.Id).ToArray(), mappers.Select(m => m.Id).ToArray(), "shuffle_executor");
// Step 3: Create the output manager
var completion = new CompletionExecutor("completion_executor");
// Step 4: Build the concurrent workflow with fan-out/fan-in pattern
return new WorkflowBuilder(splitter)
.AddFanOutEdge(splitter, [.. mappers]) // Split -> many mappers
.AddFanInBarrierEdge([.. mappers], shuffler) // All mappers -> shuffle
.AddFanOutEdge(shuffler, [.. reducers]) // Shuffle -> many reducers
.AddFanInBarrierEdge([.. reducers], completion) // All reducers -> completion
.WithOutputFrom(completion)
.Build();
}
///
/// Executes the specified workflow asynchronously using a predefined input text and processes its output events.
///
/// This method reads input text from a file located in the "resources" directory. If the file is
/// not found, a default sample text is used. The workflow is executed with the input text, and its events are
/// streamed and processed in real-time. If the workflow produces output files, their paths and contents are
/// displayed.
/// The workflow to execute. This defines the sequence of operations to be performed.
/// A task that represents the asynchronous operation.
private static async Task RunWorkflowAsync(Workflow workflow)
{
// Step 1: Read the input text
var resourcesPath = Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "..", "..", "resources");
var textFilePath = Path.Combine(resourcesPath, "long_text.txt");
string rawText;
if (File.Exists(textFilePath))
{
rawText = await File.ReadAllTextAsync(textFilePath);
}
else
{
// Use sample text if file doesn't exist
Console.WriteLine($"Note: {textFilePath} not found, using sample text");
rawText = "The quick brown fox jumps over the lazy dog. The dog was very lazy. The fox was very quick.";
}
// Step 2: Run the workflow
Console.WriteLine("\n=== RUNNING WORKFLOW ===\n");
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input: rawText);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
Console.WriteLine($"Event: {evt}");
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine("\nFinal Output Files:");
if (outputEvent.Data is List filePaths)
{
foreach (var filePath in filePaths)
{
Console.WriteLine($" - {filePath}");
if (File.Exists(filePath))
{
var content = await File.ReadAllTextAsync(filePath);
Console.WriteLine($" Contents:\n{content}");
}
}
}
}
else if (evt is WorkflowErrorEvent workflowError)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error occurred.");
Console.ResetColor();
}
else if (evt is ExecutorFailedEvent executorFailed)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Executor '{executorFailed.ExecutorId}' failed with {(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}.");
Console.ResetColor();
}
}
}
}
#region Executors
///
/// Splits data into roughly equal chunks based on the number of mapper nodes.
///
[SendsMessage(typeof(SplitComplete))]
internal sealed class Split(string[] mapperIds, string id) :
Executor(id)
{
private readonly string[] _mapperIds = mapperIds;
private static readonly string[] s_lineSeparators = ["\r\n", "\r", "\n"];
///
/// Tokenize input and assign contiguous index ranges to each mapper via shared state.
///
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Ensure temp directory exists
Directory.CreateDirectory(MapReduceConstants.TempDir);
// Process the data into a list of words and remove any empty lines
var wordList = Preprocess(message);
// Store the tokenized words once so that all mappers can read by index
await context.QueueStateUpdateAsync(MapReduceConstants.DataToProcessKey, wordList, scopeName: MapReduceConstants.StateScope, cancellationToken);
// Divide indices into contiguous slices for each mapper
var mapperCount = this._mapperIds.Length;
var chunkSize = wordList.Length / mapperCount;
async Task ProcessChunkAsync(int i)
{
// Determine the start and end indices for this mapper's chunk
var startIndex = i * chunkSize;
var endIndex = i < mapperCount - 1 ? startIndex + chunkSize : wordList.Length;
// Save the indices under the mapper's Id
await context.QueueStateUpdateAsync(this._mapperIds[i], (startIndex, endIndex), scopeName: MapReduceConstants.StateScope, cancellationToken);
// Notify the mapper that data is ready
await context.SendMessageAsync(new SplitComplete(), targetId: this._mapperIds[i], cancellationToken);
}
// Process all the chunks
var tasks = Enumerable.Range(0, mapperCount).Select(ProcessChunkAsync);
await Task.WhenAll(tasks);
}
private static string[] Preprocess(string data)
{
var lines = data.Split(s_lineSeparators, StringSplitOptions.RemoveEmptyEntries)
.Select(line => line.Trim())
.Where(line => !string.IsNullOrWhiteSpace(line));
return lines
.SelectMany(line => line.Split(' ', StringSplitOptions.RemoveEmptyEntries))
.Where(word => !string.IsNullOrWhiteSpace(word))
.ToArray();
}
}
///
/// Maps each token to a count of 1 and writes pairs to a per-mapper file.
///
[SendsMessage(typeof(MapComplete))]
internal sealed class Mapper(string id) : Executor(id)
{
///
/// Read the assigned slice, emit (word, 1) pairs, and persist to disk.
///
public override async ValueTask HandleAsync(SplitComplete message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var dataToProcess = await context.ReadStateAsync(MapReduceConstants.DataToProcessKey, scopeName: MapReduceConstants.StateScope, cancellationToken);
var chunk = await context.ReadStateAsync<(int start, int end)>(this.Id, scopeName: MapReduceConstants.StateScope, cancellationToken);
var results = dataToProcess![chunk.start..chunk.end]
.Select(word => (word, 1))
.ToArray();
// Write this mapper's results as simple text lines for easy debugging
var filePath = Path.Combine(MapReduceConstants.TempDir, $"map_results_{this.Id}.txt");
var lines = results.Select(r => $"{r.word}: {r.Item2}");
await File.WriteAllLinesAsync(filePath, lines, cancellationToken);
await context.SendMessageAsync(new MapComplete(filePath), cancellationToken: cancellationToken);
}
}
///
/// Groups intermediate pairs by key and partitions them across reducers.
///
[SendsMessage(typeof(ShuffleComplete))]
internal sealed class Shuffler(string[] reducerIds, string[] mapperIds, string id) :
Executor(id)
{
private readonly string[] _reducerIds = reducerIds;
private readonly string[] _mapperIds = mapperIds;
private readonly List _mapResults = [];
///
/// Aggregate mapper outputs and write one partition file per reducer.
///
public override async ValueTask HandleAsync(MapComplete message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._mapResults.Add(message);
// Wait for all mappers to complete
if (this._mapResults.Count < this._mapperIds.Length)
{
return;
}
var chunks = await this.PreprocessAsync(this._mapResults);
async Task ProcessChunkAsync(List<(string key, List values)> chunk, int index)
{
// Write one grouped partition for reducer index and notify that reducer
var filePath = Path.Combine(MapReduceConstants.TempDir, $"shuffle_results_{index}.txt");
var lines = chunk.Select(kvp => $"{kvp.key}: {JsonSerializer.Serialize(kvp.values)}");
await File.WriteAllLinesAsync(filePath, lines, cancellationToken);
await context.SendMessageAsync(new ShuffleComplete(filePath, this._reducerIds[index]), cancellationToken: cancellationToken);
}
var tasks = chunks.Select((chunk, i) => ProcessChunkAsync(chunk, i));
await Task.WhenAll(tasks);
}
///
/// Load all mapper files, group by key, sort keys, and partition for reducers.
///
private async Task values)>>> PreprocessAsync(List data)
{
// Load all intermediate pairs
var mapResults = new List<(string key, int value)>();
foreach (var result in data)
{
var lines = await File.ReadAllLinesAsync(result.FilePath);
foreach (var line in lines)
{
var parts = line.Split(": ");
if (parts.Length == 2)
{
mapResults.Add((parts[0], int.Parse(parts[1])));
}
}
}
// Group values by token
var intermediateResults = mapResults
.GroupBy(r => r.key)
.ToDictionary(g => g.Key, g => g.Select(r => r.value).ToList());
// Deterministic ordering helps with debugging and test stability
var aggregatedResults = intermediateResults
.Select(kvp => (key: kvp.Key, values: kvp.Value))
.OrderBy(x => x.key)
.ToList();
// Partition keys across reducers as evenly as possible
var reduceExecutorCount = this._reducerIds.Length; // Use actual number of reducers
if (reduceExecutorCount == 0)
{
reduceExecutorCount = 1;
}
var chunkSize = aggregatedResults.Count / reduceExecutorCount;
var remaining = aggregatedResults.Count % reduceExecutorCount;
var chunks = new List values)>>();
for (int i = 0; i < aggregatedResults.Count - remaining; i += chunkSize)
{
chunks.Add(aggregatedResults.GetRange(i, chunkSize));
}
if (remaining > 0 && chunks.Count > 0)
{
chunks[^1].AddRange(aggregatedResults.TakeLast(remaining));
}
else if (chunks.Count == 0)
{
chunks.Add(aggregatedResults);
}
return chunks;
}
}
///
/// Sums grouped counts per key for its assigned partition.
///
[SendsMessage(typeof(ReduceComplete))]
internal sealed class Reducer(string id) : Executor(id)
{
///
/// Read one shuffle partition and reduce it to totals.
///
public override async ValueTask HandleAsync(ShuffleComplete message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.ReducerId != this.Id)
{
// This partition belongs to a different reducer. Skip.
return;
}
// Read grouped values from the shuffle output
var lines = await File.ReadAllLinesAsync(message.FilePath, cancellationToken);
// Sum values per key. Values are serialized JSON arrays like [1, 1, ...]
var reducedResults = new Dictionary();
foreach (var line in lines)
{
var parts = line.Split(": ", 2);
if (parts.Length == 2)
{
var key = parts[0];
var values = JsonSerializer.Deserialize>(parts[1]);
reducedResults[key] = values?.Sum() ?? 0;
}
}
// Persist our partition totals
var filePath = Path.Combine(MapReduceConstants.TempDir, $"reduced_results_{this.Id}.txt");
var outputLines = reducedResults.Select(kvp => $"{kvp.Key}: {kvp.Value}");
await File.WriteAllLinesAsync(filePath, outputLines, cancellationToken);
await context.SendMessageAsync(new ReduceComplete(filePath), cancellationToken: cancellationToken);
}
}
///
/// Joins all reducer outputs and yields the final output.
///
[YieldsOutput(typeof(List))]
internal sealed class CompletionExecutor(string id) :
Executor>(id)
{
///
/// Collect reducer output file paths and yield final output.
///
public override async ValueTask HandleAsync(List message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var filePaths = message.ConvertAll(r => r.FilePath);
await context.YieldOutputAsync(filePaths, cancellationToken);
}
}
#endregion
#region Events
///
/// Marker event published when splitting finishes. Triggers map executors.
///
internal sealed class SplitComplete : WorkflowEvent;
///
/// Signal that a mapper wrote its intermediate pairs to file.
///
internal sealed class MapComplete(string FilePath) : WorkflowEvent
{
public string FilePath { get; } = FilePath;
}
///
/// Signal that a shuffle partition file is ready for a specific reducer.
///
internal sealed class ShuffleComplete(string FilePath, string ReducerId) : WorkflowEvent
{
public string FilePath { get; } = FilePath;
public string ReducerId { get; } = ReducerId;
}
///
/// Signal that a reducer wrote final counts for its partition.
///
internal sealed class ReduceComplete(string FilePath) : WorkflowEvent
{
public string FilePath { get; } = FilePath;
}
#endregion
#region Helpers
///
/// Provides constant values used in the MapReduce workflow.
///
/// This class contains keys and paths that are utilized throughout the MapReduce process, including
/// identifiers for data processing and temporary storage locations.
internal static class MapReduceConstants
{
public static string DataToProcessKey = "data_to_be_processed";
public static string TempDir = Path.Combine(Path.GetTempPath(), "workflow_viz_sample");
public static string StateScope = "MapReduceState";
}
#endregion