mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
ca580a8316
* Initial plan * Add WorkflowErrorEvent and ExecutorFailedEvent error checking to all workflow samples Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/c5d77400-d7ed-4fbe-9103-f5d74aabcf2b Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Fix if/else if consistency for error event handlers per code review feedback Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/c5d77400-d7ed-4fbe-9103-f5d74aabcf2b Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Address PR comments * fixup: PR comments --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> Co-authored-by: Jacob Alber <jaalber@microsoft.com>
135 lines
5.5 KiB
C#
135 lines
5.5 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using Microsoft.Agents.AI.Workflows;
|
|
|
|
namespace WorkflowSharedStatesSample;
|
|
|
|
/// <summary>
|
|
/// This sample introduces the concept of shared states within a workflow.
|
|
/// It demonstrates how multiple executors can read from and write to shared states,
|
|
/// allowing for more complex data sharing and coordination between tasks.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Pre-requisites:
|
|
/// - Foundational samples should be completed first.
|
|
/// - This sample also uses the fan-out and fan-in patterns to achieve parallel processing.
|
|
/// </remarks>
|
|
public static class Program
|
|
{
|
|
private static async Task Main()
|
|
{
|
|
// Create the executors
|
|
var fileRead = new FileReadExecutor();
|
|
var wordCount = new WordCountingExecutor();
|
|
var paragraphCount = new ParagraphCountingExecutor();
|
|
var aggregate = new AggregationExecutor();
|
|
|
|
// Build the workflow by connecting executors sequentially
|
|
var workflow = new WorkflowBuilder(fileRead)
|
|
.AddFanOutEdge(fileRead, [wordCount, paragraphCount])
|
|
.AddFanInBarrierEdge([wordCount, paragraphCount], aggregate)
|
|
.WithOutputFrom(aggregate)
|
|
.Build();
|
|
|
|
// Execute the workflow with input data
|
|
await using Run run = await InProcessExecution.RunAsync(workflow, "Lorem_Ipsum.txt");
|
|
foreach (WorkflowEvent evt in run.NewEvents)
|
|
{
|
|
if (evt is WorkflowOutputEvent outputEvent)
|
|
{
|
|
Console.WriteLine(outputEvent.Data);
|
|
}
|
|
else if (evt is WorkflowErrorEvent workflowError)
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Red;
|
|
Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error occurred.");
|
|
Console.ResetColor();
|
|
}
|
|
else if (evt is ExecutorFailedEvent executorFailed)
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Red;
|
|
Console.Error.WriteLine($"Executor '{executorFailed.ExecutorId}' failed with {(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}.");
|
|
Console.ResetColor();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Constants for shared state scopes.
|
|
/// </summary>
|
|
internal static class FileContentStateConstants
|
|
{
|
|
public const string FileContentStateScope = "FileContentState";
|
|
}
|
|
|
|
internal sealed class FileReadExecutor() : Executor<string, string>("FileReadExecutor")
|
|
{
|
|
public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
// Read file content from embedded resource
|
|
string fileContent = Resources.Read(message);
|
|
// Store file content in a shared state for access by other executors
|
|
string fileID = Guid.NewGuid().ToString("N");
|
|
await context.QueueStateUpdateAsync(fileID, fileContent, scopeName: FileContentStateConstants.FileContentStateScope, cancellationToken);
|
|
|
|
return fileID;
|
|
}
|
|
}
|
|
|
|
internal sealed class FileStats
|
|
{
|
|
public int ParagraphCount { get; set; }
|
|
public int WordCount { get; set; }
|
|
}
|
|
|
|
internal sealed class WordCountingExecutor() : Executor<string, FileStats>("WordCountingExecutor")
|
|
{
|
|
public override async ValueTask<FileStats> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
// Retrieve the file content from the shared state
|
|
var fileContent = await context.ReadStateAsync<string>(message, scopeName: FileContentStateConstants.FileContentStateScope, cancellationToken)
|
|
?? throw new InvalidOperationException("File content state not found");
|
|
|
|
int wordCount = fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
|
|
|
|
return new FileStats { WordCount = wordCount };
|
|
}
|
|
}
|
|
|
|
internal sealed class ParagraphCountingExecutor() : Executor<string, FileStats>("ParagraphCountingExecutor")
|
|
{
|
|
public override async ValueTask<FileStats> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
// Retrieve the file content from the shared state
|
|
var fileContent = await context.ReadStateAsync<string>(message, scopeName: FileContentStateConstants.FileContentStateScope, cancellationToken)
|
|
?? throw new InvalidOperationException("File content state not found");
|
|
|
|
int paragraphCount = fileContent.Split(['\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
|
|
|
|
return new FileStats { ParagraphCount = paragraphCount };
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// The aggregation executor collects results from both executors and yields the final output.
|
|
/// </summary>
|
|
[YieldsOutput(typeof(string))]
|
|
internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
|
|
{
|
|
private readonly List<FileStats> _messages = [];
|
|
|
|
public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
this._messages.Add(message);
|
|
|
|
if (this._messages.Count == 2)
|
|
{
|
|
// Aggregate the results from both executors
|
|
var totalParagraphCount = this._messages.Sum(m => m.ParagraphCount);
|
|
var totalWordCount = this._messages.Sum(m => m.WordCount);
|
|
await context.YieldOutputAsync($"Total Paragraphs: {totalParagraphCount}, Total Words: {totalWordCount}", cancellationToken);
|
|
}
|
|
}
|
|
}
|