// Copyright (c) Microsoft. All rights reserved. using System.Text.Json; using System.Text.Json.Serialization; using Azure.AI.OpenAI; using Azure.Identity; using Microsoft.Agents.AI; using Microsoft.Agents.AI.Workflows; using Microsoft.Extensions.AI; namespace WorkflowMultiSelectionSample; /// /// This sample introduces multi-selection routing where one executor can trigger multiple downstream executors. /// /// Extending the switch-case pattern from the previous sample, the workflow can now /// trigger multiple executors simultaneously when certain conditions are met. /// /// Key features: /// - For legitimate emails: triggers Email Assistant (always) + Email Summary (if email is long) /// - For spam emails: triggers Handle Spam executor only /// - For uncertain emails: triggers Handle Uncertain executor only /// - Database logging happens for both short emails and summarized long emails /// /// This pattern is powerful for workflows that need parallel processing based on data characteristics, /// such as triggering different analytics pipelines or multiple notification systems. /// /// /// Pre-requisites: /// - Foundational samples should be completed first. /// - Shared state is used in this sample to persist email data between executors. /// - An Azure OpenAI chat completion deployment that supports structured outputs must be configured. /// public static class Program { private const int LongEmailThreshold = 100; private static async Task Main() { // Set up the Azure OpenAI client var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set."); var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-5.4-mini"; var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient(); // Create agents AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient); AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient); AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient); // Create executors var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent); var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent); var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent); var sendEmailExecutor = new SendEmailExecutor(); var handleSpamExecutor = new HandleSpamExecutor(); var handleUncertainExecutor = new HandleUncertainExecutor(); var databaseAccessExecutor = new DatabaseAccessExecutor(); // Build the workflow by adding executors and connecting them WorkflowBuilder builder = new(emailAnalysisExecutor); builder.AddFanOutEdge( emailAnalysisExecutor, [ handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor, ], GetTargetAssigner() ) // After the email assistant writes a response, it will be sent to the send email executor .AddEdge(emailAssistantExecutor, sendEmailExecutor) // Save the analysis result to the database if summary is not needed .AddEdge( emailAnalysisExecutor, databaseAccessExecutor, condition: analysisResult => analysisResult?.EmailLength <= LongEmailThreshold) // Save the analysis result to the database with summary .AddEdge(emailSummaryExecutor, databaseAccessExecutor) .WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor); var workflow = builder.Build(); // Read a email from a text file string email = Resources.Read("email.txt"); // Execute the workflow await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new ChatMessage(ChatRole.User, email)); await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { if (evt is WorkflowOutputEvent outputEvent) { Console.WriteLine($"{outputEvent}"); } else if (evt is DatabaseEvent databaseEvent) { Console.WriteLine($"{databaseEvent}"); } else if (evt is WorkflowErrorEvent workflowError) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error occurred."); Console.ResetColor(); } else if (evt is ExecutorFailedEvent executorFailed) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine($"Executor '{executorFailed.ExecutorId}' failed with {(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}."); Console.ResetColor(); } } } /// /// Creates a partitioner for routing messages based on the analysis result. /// /// A function that takes an analysis result and returns the target partitions. private static Func> GetTargetAssigner() { return (analysisResult, targetCount) => { if (analysisResult is not null) { if (analysisResult.spamDecision == SpamDecision.Spam) { return [0]; // Route to spam handler } else if (analysisResult.spamDecision == SpamDecision.NotSpam) { List targets = [1]; // Route to the email assistant if (analysisResult.EmailLength > LongEmailThreshold) { targets.Add(2); // Route to the email summarizer too } return targets; } else { return [3]; } } throw new InvalidOperationException("Invalid analysis result."); }; } /// /// Create an email analysis agent. /// /// A ChatClientAgent configured for email analysis private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) => new(chatClient, new ChatClientAgentOptions() { ChatOptions = new() { Instructions = "You are a spam detection assistant that identifies spam emails.", ResponseFormat = ChatResponseFormat.ForJsonSchema() } }); /// /// Creates an email assistant agent. /// /// A ChatClientAgent configured for email assistance private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) => new(chatClient, new ChatClientAgentOptions() { ChatOptions = new() { Instructions = "You are an email assistant that helps users draft responses to emails with professionalism.", ResponseFormat = ChatResponseFormat.ForJsonSchema() } }); /// /// Creates an agent that summarizes emails. /// /// A ChatClientAgent configured for email summarization private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) => new(chatClient, new ChatClientAgentOptions() { ChatOptions = new() { Instructions = "You are an assistant that helps users summarize emails.", ResponseFormat = ChatResponseFormat.ForJsonSchema() } }); } internal static class EmailStateConstants { public const string EmailStateScope = "EmailState"; } /// /// Represents the possible decisions for spam detection. /// public enum SpamDecision { NotSpam, Spam, Uncertain } /// /// Represents the result of email analysis. /// public sealed class AnalysisResult { [JsonPropertyName("spam_decision")] [JsonConverter(typeof(JsonStringEnumConverter))] public SpamDecision spamDecision { get; set; } [JsonPropertyName("reason")] public string Reason { get; set; } = string.Empty; [JsonIgnore] public int EmailLength { get; set; } [JsonIgnore] public string EmailSummary { get; set; } = string.Empty; [JsonIgnore] public string EmailId { get; set; } = string.Empty; } /// /// Represents an email. /// internal sealed class Email { [JsonPropertyName("email_id")] public string EmailId { get; set; } = string.Empty; [JsonPropertyName("email_content")] public string EmailContent { get; set; } = string.Empty; } /// /// Executor that analyzes emails using an AI agent. /// internal sealed class EmailAnalysisExecutor : Executor { private readonly AIAgent _emailAnalysisAgent; /// /// Creates a new instance of the class. /// /// The AI agent used for email analysis public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor") { this._emailAnalysisAgent = emailAnalysisAgent; } public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default) { // Generate a random email ID and store the email content var newEmail = new Email { EmailId = Guid.NewGuid().ToString("N"), EmailContent = message.Text }; await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope, cancellationToken); // Invoke the agent var response = await this._emailAnalysisAgent.RunAsync(message, cancellationToken: cancellationToken); var AnalysisResult = JsonSerializer.Deserialize(response.Text); AnalysisResult!.EmailId = newEmail.EmailId; AnalysisResult!.EmailLength = newEmail.EmailContent.Length; return AnalysisResult; } } /// /// Represents the response from the email assistant. /// public sealed class EmailResponse { [JsonPropertyName("response")] public string Response { get; set; } = string.Empty; } /// /// Executor that assists with email responses using an AI agent. /// internal sealed class EmailAssistantExecutor : Executor { private readonly AIAgent _emailAssistantAgent; /// /// Creates a new instance of the class. /// /// The AI agent used for email assistance public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor") { this._emailAssistantAgent = emailAssistantAgent; } public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default) { if (message.spamDecision == SpamDecision.Spam) { throw new InvalidOperationException("This executor should only handle non-spam messages."); } // Retrieve the email content from the context var email = await context.ReadStateAsync(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken); // Invoke the agent var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent, cancellationToken: cancellationToken); var emailResponse = JsonSerializer.Deserialize(response.Text); return emailResponse!; } } /// /// Executor that sends emails. /// [YieldsOutput(typeof(string))] internal sealed class SendEmailExecutor() : Executor("SendEmailExecutor") { /// /// Simulate the sending of an email. /// public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) => await context.YieldOutputAsync($"Email sent: {message.Response}", cancellationToken); } /// /// Executor that handles spam messages. /// [YieldsOutput(typeof(string))] internal sealed class HandleSpamExecutor() : Executor("HandleSpamExecutor") { /// /// Simulate the handling of a spam message. /// public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default) { if (message.spamDecision == SpamDecision.Spam) { await context.YieldOutputAsync($"Email marked as spam: {message.Reason}", cancellationToken); } else { throw new InvalidOperationException("This executor should only handle spam messages."); } } } /// /// Executor that handles uncertain messages. /// [YieldsOutput(typeof(string))] internal sealed class HandleUncertainExecutor() : Executor("HandleUncertainExecutor") { /// /// Simulate the handling of an uncertain spam decision. /// public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default) { if (message.spamDecision == SpamDecision.Uncertain) { var email = await context.ReadStateAsync(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken); await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}", cancellationToken); } else { throw new InvalidOperationException("This executor should only handle uncertain spam decisions."); } } } /// /// Represents the response from the email summary agent. /// public sealed class EmailSummary { [JsonPropertyName("summary")] public string Summary { get; set; } = string.Empty; } /// /// Executor that summarizes emails using an AI agent. /// internal sealed class EmailSummaryExecutor : Executor { private readonly AIAgent _emailSummaryAgent; /// /// Creates a new instance of the class. /// /// The AI agent used for email summarization public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor") { this._emailSummaryAgent = emailSummaryAgent; } public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default) { // Read the email content from the shared states var email = await context.ReadStateAsync(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken); // Invoke the agent var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent, cancellationToken: cancellationToken); var emailSummary = JsonSerializer.Deserialize(response.Text); message.EmailSummary = emailSummary!.Summary; return message; } } /// /// A custom workflow event for database operations. /// /// The message associated with the event internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { } /// /// Executor that handles database access. /// internal sealed class DatabaseAccessExecutor() : Executor("DatabaseAccessExecutor") { public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default) { // 1. Save the email content await context.ReadStateAsync(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken); await Task.Delay(100, cancellationToken); // Simulate database access delay // 2. Save the analysis result await Task.Delay(100, cancellationToken); // Simulate database access delay // Not using the `WorkflowCompletedEvent` because this is not the end of the workflow. // The end of the workflow is signaled by the `SendEmailExecutor` or the `HandleUnknownExecutor`. await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."), cancellationToken); } }