mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
ca580a8316
* Initial plan * Add WorkflowErrorEvent and ExecutorFailedEvent error checking to all workflow samples Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/c5d77400-d7ed-4fbe-9103-f5d74aabcf2b Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Fix if/else if consistency for error event handlers per code review feedback Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/c5d77400-d7ed-4fbe-9103-f5d74aabcf2b Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Address PR comments * fixup: PR comments --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> Co-authored-by: Jacob Alber <jaalber@microsoft.com>
443 lines
17 KiB
C#
443 lines
17 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using System.Text.Json;
|
|
using System.Text.Json.Serialization;
|
|
using Azure.AI.OpenAI;
|
|
using Azure.Identity;
|
|
using Microsoft.Agents.AI;
|
|
using Microsoft.Agents.AI.Workflows;
|
|
using Microsoft.Extensions.AI;
|
|
|
|
namespace WorkflowMultiSelectionSample;
|
|
|
|
/// <summary>
|
|
/// This sample introduces multi-selection routing where one executor can trigger multiple downstream executors.
|
|
///
|
|
/// Extending the switch-case pattern from the previous sample, the workflow can now
|
|
/// trigger multiple executors simultaneously when certain conditions are met.
|
|
///
|
|
/// Key features:
|
|
/// - For legitimate emails: triggers Email Assistant (always) + Email Summary (if email is long)
|
|
/// - For spam emails: triggers Handle Spam executor only
|
|
/// - For uncertain emails: triggers Handle Uncertain executor only
|
|
/// - Database logging happens for both short emails and summarized long emails
|
|
///
|
|
/// This pattern is powerful for workflows that need parallel processing based on data characteristics,
|
|
/// such as triggering different analytics pipelines or multiple notification systems.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Pre-requisites:
|
|
/// - Foundational samples should be completed first.
|
|
/// - Shared state is used in this sample to persist email data between executors.
|
|
/// - An Azure OpenAI chat completion deployment that supports structured outputs must be configured.
|
|
/// </remarks>
|
|
public static class Program
|
|
{
|
|
private const int LongEmailThreshold = 100;
|
|
|
|
private static async Task Main()
|
|
{
|
|
// Set up the Azure OpenAI client
|
|
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
|
|
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-5.4-mini";
|
|
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
|
|
|
|
// Create agents
|
|
AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
|
|
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
|
|
AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);
|
|
|
|
// Create executors
|
|
var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
|
|
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
|
|
var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
|
|
var sendEmailExecutor = new SendEmailExecutor();
|
|
var handleSpamExecutor = new HandleSpamExecutor();
|
|
var handleUncertainExecutor = new HandleUncertainExecutor();
|
|
var databaseAccessExecutor = new DatabaseAccessExecutor();
|
|
|
|
// Build the workflow by adding executors and connecting them
|
|
WorkflowBuilder builder = new(emailAnalysisExecutor);
|
|
builder.AddFanOutEdge(
|
|
emailAnalysisExecutor,
|
|
[
|
|
handleSpamExecutor,
|
|
emailAssistantExecutor,
|
|
emailSummaryExecutor,
|
|
handleUncertainExecutor,
|
|
],
|
|
GetTargetAssigner()
|
|
)
|
|
// After the email assistant writes a response, it will be sent to the send email executor
|
|
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
|
|
// Save the analysis result to the database if summary is not needed
|
|
.AddEdge<AnalysisResult>(
|
|
emailAnalysisExecutor,
|
|
databaseAccessExecutor,
|
|
condition: analysisResult => analysisResult?.EmailLength <= LongEmailThreshold)
|
|
// Save the analysis result to the database with summary
|
|
.AddEdge(emailSummaryExecutor, databaseAccessExecutor)
|
|
.WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);
|
|
|
|
var workflow = builder.Build();
|
|
|
|
// Read a email from a text file
|
|
string email = Resources.Read("email.txt");
|
|
|
|
// Execute the workflow
|
|
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new ChatMessage(ChatRole.User, email));
|
|
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
|
|
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
|
|
{
|
|
if (evt is WorkflowOutputEvent outputEvent)
|
|
{
|
|
Console.WriteLine($"{outputEvent}");
|
|
}
|
|
else if (evt is DatabaseEvent databaseEvent)
|
|
{
|
|
Console.WriteLine($"{databaseEvent}");
|
|
}
|
|
else if (evt is WorkflowErrorEvent workflowError)
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Red;
|
|
Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error occurred.");
|
|
Console.ResetColor();
|
|
}
|
|
else if (evt is ExecutorFailedEvent executorFailed)
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Red;
|
|
Console.Error.WriteLine($"Executor '{executorFailed.ExecutorId}' failed with {(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}.");
|
|
Console.ResetColor();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a partitioner for routing messages based on the analysis result.
|
|
/// </summary>
|
|
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
|
|
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
|
|
{
|
|
return (analysisResult, targetCount) =>
|
|
{
|
|
if (analysisResult is not null)
|
|
{
|
|
if (analysisResult.spamDecision == SpamDecision.Spam)
|
|
{
|
|
return [0]; // Route to spam handler
|
|
}
|
|
else if (analysisResult.spamDecision == SpamDecision.NotSpam)
|
|
{
|
|
List<int> targets = [1]; // Route to the email assistant
|
|
|
|
if (analysisResult.EmailLength > LongEmailThreshold)
|
|
{
|
|
targets.Add(2); // Route to the email summarizer too
|
|
}
|
|
|
|
return targets;
|
|
}
|
|
else
|
|
{
|
|
return [3];
|
|
}
|
|
}
|
|
throw new InvalidOperationException("Invalid analysis result.");
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create an email analysis agent.
|
|
/// </summary>
|
|
/// <returns>A ChatClientAgent configured for email analysis</returns>
|
|
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
|
|
new(chatClient, new ChatClientAgentOptions()
|
|
{
|
|
ChatOptions = new()
|
|
{
|
|
Instructions = "You are a spam detection assistant that identifies spam emails.",
|
|
ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
|
|
}
|
|
});
|
|
|
|
/// <summary>
|
|
/// Creates an email assistant agent.
|
|
/// </summary>
|
|
/// <returns>A ChatClientAgent configured for email assistance</returns>
|
|
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
|
|
new(chatClient, new ChatClientAgentOptions()
|
|
{
|
|
ChatOptions = new()
|
|
{
|
|
Instructions = "You are an email assistant that helps users draft responses to emails with professionalism.",
|
|
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
|
|
}
|
|
});
|
|
|
|
/// <summary>
|
|
/// Creates an agent that summarizes emails.
|
|
/// </summary>
|
|
/// <returns>A ChatClientAgent configured for email summarization</returns>
|
|
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
|
|
new(chatClient, new ChatClientAgentOptions()
|
|
{
|
|
ChatOptions = new()
|
|
{
|
|
Instructions = "You are an assistant that helps users summarize emails.",
|
|
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
|
|
}
|
|
});
|
|
}
|
|
|
|
internal static class EmailStateConstants
|
|
{
|
|
public const string EmailStateScope = "EmailState";
|
|
}
|
|
|
|
/// <summary>
|
|
/// Represents the possible decisions for spam detection.
|
|
/// </summary>
|
|
public enum SpamDecision
|
|
{
|
|
NotSpam,
|
|
Spam,
|
|
Uncertain
|
|
}
|
|
|
|
/// <summary>
|
|
/// Represents the result of email analysis.
|
|
/// </summary>
|
|
public sealed class AnalysisResult
|
|
{
|
|
[JsonPropertyName("spam_decision")]
|
|
[JsonConverter(typeof(JsonStringEnumConverter))]
|
|
public SpamDecision spamDecision { get; set; }
|
|
|
|
[JsonPropertyName("reason")]
|
|
public string Reason { get; set; } = string.Empty;
|
|
|
|
[JsonIgnore]
|
|
public int EmailLength { get; set; }
|
|
|
|
[JsonIgnore]
|
|
public string EmailSummary { get; set; } = string.Empty;
|
|
|
|
[JsonIgnore]
|
|
public string EmailId { get; set; } = string.Empty;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Represents an email.
|
|
/// </summary>
|
|
internal sealed class Email
|
|
{
|
|
[JsonPropertyName("email_id")]
|
|
public string EmailId { get; set; } = string.Empty;
|
|
|
|
[JsonPropertyName("email_content")]
|
|
public string EmailContent { get; set; } = string.Empty;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Executor that analyzes emails using an AI agent.
|
|
/// </summary>
|
|
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
|
|
{
|
|
private readonly AIAgent _emailAnalysisAgent;
|
|
|
|
/// <summary>
|
|
/// Creates a new instance of the <see cref="EmailAnalysisExecutor"/> class.
|
|
/// </summary>
|
|
/// <param name="emailAnalysisAgent">The AI agent used for email analysis</param>
|
|
public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
|
|
{
|
|
this._emailAnalysisAgent = emailAnalysisAgent;
|
|
}
|
|
|
|
public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
// Generate a random email ID and store the email content
|
|
var newEmail = new Email
|
|
{
|
|
EmailId = Guid.NewGuid().ToString("N"),
|
|
EmailContent = message.Text
|
|
};
|
|
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope, cancellationToken);
|
|
|
|
// Invoke the agent
|
|
var response = await this._emailAnalysisAgent.RunAsync(message, cancellationToken: cancellationToken);
|
|
var AnalysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);
|
|
|
|
AnalysisResult!.EmailId = newEmail.EmailId;
|
|
AnalysisResult!.EmailLength = newEmail.EmailContent.Length;
|
|
|
|
return AnalysisResult;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Represents the response from the email assistant.
|
|
/// </summary>
|
|
public sealed class EmailResponse
|
|
{
|
|
[JsonPropertyName("response")]
|
|
public string Response { get; set; } = string.Empty;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Executor that assists with email responses using an AI agent.
|
|
/// </summary>
|
|
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
|
|
{
|
|
private readonly AIAgent _emailAssistantAgent;
|
|
|
|
/// <summary>
|
|
/// Creates a new instance of the <see cref="EmailAssistantExecutor"/> class.
|
|
/// </summary>
|
|
/// <param name="emailAssistantAgent">The AI agent used for email assistance</param>
|
|
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
|
|
{
|
|
this._emailAssistantAgent = emailAssistantAgent;
|
|
}
|
|
|
|
public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
if (message.spamDecision == SpamDecision.Spam)
|
|
{
|
|
throw new InvalidOperationException("This executor should only handle non-spam messages.");
|
|
}
|
|
|
|
// Retrieve the email content from the context
|
|
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken);
|
|
|
|
// Invoke the agent
|
|
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent, cancellationToken: cancellationToken);
|
|
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
|
|
|
|
return emailResponse!;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Executor that sends emails.
|
|
/// </summary>
|
|
[YieldsOutput(typeof(string))]
|
|
internal sealed class SendEmailExecutor() : Executor<EmailResponse>("SendEmailExecutor")
|
|
{
|
|
/// <summary>
|
|
/// Simulate the sending of an email.
|
|
/// </summary>
|
|
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
|
|
await context.YieldOutputAsync($"Email sent: {message.Response}", cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Executor that handles spam messages.
|
|
/// </summary>
|
|
[YieldsOutput(typeof(string))]
|
|
internal sealed class HandleSpamExecutor() : Executor<AnalysisResult>("HandleSpamExecutor")
|
|
{
|
|
/// <summary>
|
|
/// Simulate the handling of a spam message.
|
|
/// </summary>
|
|
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
if (message.spamDecision == SpamDecision.Spam)
|
|
{
|
|
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}", cancellationToken);
|
|
}
|
|
else
|
|
{
|
|
throw new InvalidOperationException("This executor should only handle spam messages.");
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Executor that handles uncertain messages.
|
|
/// </summary>
|
|
[YieldsOutput(typeof(string))]
|
|
internal sealed class HandleUncertainExecutor() : Executor<AnalysisResult>("HandleUncertainExecutor")
|
|
{
|
|
/// <summary>
|
|
/// Simulate the handling of an uncertain spam decision.
|
|
/// </summary>
|
|
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
if (message.spamDecision == SpamDecision.Uncertain)
|
|
{
|
|
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken);
|
|
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}", cancellationToken);
|
|
}
|
|
else
|
|
{
|
|
throw new InvalidOperationException("This executor should only handle uncertain spam decisions.");
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Represents the response from the email summary agent.
|
|
/// </summary>
|
|
public sealed class EmailSummary
|
|
{
|
|
[JsonPropertyName("summary")]
|
|
public string Summary { get; set; } = string.Empty;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Executor that summarizes emails using an AI agent.
|
|
/// </summary>
|
|
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
|
|
{
|
|
private readonly AIAgent _emailSummaryAgent;
|
|
|
|
/// <summary>
|
|
/// Creates a new instance of the <see cref="EmailSummaryExecutor"/> class.
|
|
/// </summary>
|
|
/// <param name="emailSummaryAgent">The AI agent used for email summarization</param>
|
|
public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
|
|
{
|
|
this._emailSummaryAgent = emailSummaryAgent;
|
|
}
|
|
|
|
public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
// Read the email content from the shared states
|
|
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken);
|
|
|
|
// Invoke the agent
|
|
var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent, cancellationToken: cancellationToken);
|
|
var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);
|
|
message.EmailSummary = emailSummary!.Summary;
|
|
|
|
return message;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// A custom workflow event for database operations.
|
|
/// </summary>
|
|
/// <param name="message">The message associated with the event</param>
|
|
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }
|
|
|
|
/// <summary>
|
|
/// Executor that handles database access.
|
|
/// </summary>
|
|
internal sealed class DatabaseAccessExecutor() : Executor<AnalysisResult>("DatabaseAccessExecutor")
|
|
{
|
|
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
// 1. Save the email content
|
|
await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken);
|
|
await Task.Delay(100, cancellationToken); // Simulate database access delay
|
|
|
|
// 2. Save the analysis result
|
|
await Task.Delay(100, cancellationToken); // Simulate database access delay
|
|
|
|
// Not using the `WorkflowCompletedEvent` because this is not the end of the workflow.
|
|
// The end of the workflow is signaled by the `SendEmailExecutor` or the `HandleUnknownExecutor`.
|
|
await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."), cancellationToken);
|
|
}
|
|
}
|