diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index bbd71d6fba..ddcd9f4080 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -54,6 +54,7 @@ + diff --git a/dotnet/samples/DurableAgents/ConsoleApps/11_WorkflowEvents/Program.cs b/dotnet/samples/DurableAgents/ConsoleApps/11_WorkflowEvents/Program.cs index b96c77f053..22487b4e14 100644 --- a/dotnet/samples/DurableAgents/ConsoleApps/11_WorkflowEvents/Program.cs +++ b/dotnet/samples/DurableAgents/ConsoleApps/11_WorkflowEvents/Program.cs @@ -92,36 +92,39 @@ async Task RunWorkflowWithStreamingAsync(string orderId, Workflow workflow, Dura // WatchStreamAsync yields events as they're emitted by executors await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { + // Always print the event type name + Console.WriteLine($" Event: {evt.GetType().Name}"); + switch (evt) { // Custom domain events (emitted via AddEventAsync) case OrderLookupStartedEvent e: - WriteColored($" [Lookup] Looking up order {e.OrderId}", ConsoleColor.Cyan); + WriteColored($" [Lookup] Looking up order {e.OrderId}", ConsoleColor.Cyan); break; case OrderFoundEvent e: - WriteColored($" [Lookup] Found: {e.Order.Customer.Name}", ConsoleColor.Cyan); + WriteColored($" [Lookup] Found: {e.Order.Customer.Name}", ConsoleColor.Cyan); break; case CancellationProgressEvent e: - WriteColored($" [Cancel] {e.PercentComplete}% - {e.Status}", ConsoleColor.Yellow); + WriteColored($" [Cancel] {e.PercentComplete}% - {e.Status}", ConsoleColor.Yellow); break; case OrderCancelledEvent e: - WriteColored(" [Cancel] Done", ConsoleColor.Yellow); + WriteColored(" [Cancel] Done", ConsoleColor.Yellow); break; case EmailSentEvent e: - WriteColored($" [Email] Sent to {e.Email}", ConsoleColor.Magenta); + WriteColored($" [Email] Sent to {e.Email}", ConsoleColor.Magenta); break; // Yielded outputs (emitted via YieldOutputAsync) case DurableYieldedOutputEvent e: - WriteColored($" [Output] {e.ExecutorId}", ConsoleColor.DarkGray); + WriteColored($" [Output] {e.ExecutorId}", ConsoleColor.DarkGray); break; // Workflow completion case DurableWorkflowCompletedEvent e: - WriteColored($"Completed: {e.Result}", ConsoleColor.Green); + WriteColored($" Completed: {e.Result}", ConsoleColor.Green); break; case DurableWorkflowFailedEvent e: - WriteColored($"Failed: {e.ErrorMessage}", ConsoleColor.Red); + WriteColored($" Failed: {e.ErrorMessage}", ConsoleColor.Red); break; } } diff --git a/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/12_WorkflowLoop.csproj b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/12_WorkflowLoop.csproj new file mode 100644 index 0000000000..523c6b3612 --- /dev/null +++ b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/12_WorkflowLoop.csproj @@ -0,0 +1,31 @@ + + + net10.0 + Exe + enable + enable + SingleWorkflow + SingleAgent + + + + + + + + + + + + + + + + + + diff --git a/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/FeedbackExecutor.cs b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/FeedbackExecutor.cs new file mode 100644 index 0000000000..84f59f4ac6 --- /dev/null +++ b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/FeedbackExecutor.cs @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; +using Microsoft.Agents.AI; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Extensions.AI; + +namespace SingleAgent; + +internal sealed class FeedbackExecutor : Executor +{ + private readonly AIAgent _agent; + private AgentThread? _thread; + + public int MinimumRating { get; init; } = 9; + + public int MaxAttempts { get; init; } = 3; + + private int _attempts; + + /// + /// Initializes a new instance of the class. + /// + /// A unique identifier for the executor. + /// The chat client to use for the AI agent. + public FeedbackExecutor(string id, IChatClient chatClient) : base(id) + { + ChatClientAgentOptions agentOptions = new() + { + ChatOptions = new() + { + Instructions = "You are a professional editor. You will be given a slogan and the task it is meant to accomplish.", + ResponseFormat = ChatResponseFormat.ForJsonSchema() + } + }; + + this._agent = new ChatClientAgent(chatClient, agentOptions); + } + + public override async ValueTask HandleAsync(SloganResult message, IWorkflowContext context, CancellationToken cancellationToken = default) + { + this._thread ??= await this._agent.GetNewThreadAsync(cancellationToken); + + var sloganMessage = $""" + Here is a slogan for the task '{message.Task}': + Slogan: {message.Slogan} + Please provide feedback on this slogan, including comments, a rating from 1 to 10, and suggested actions for improvement. + """; + + var response = await this._agent.RunAsync(sloganMessage, this._thread, cancellationToken: cancellationToken); + var feedback = JsonSerializer.Deserialize(response.Text) ?? throw new InvalidOperationException("Failed to deserialize feedback."); + + if (feedback.Rating >= this.MinimumRating) + { + await context.YieldOutputAsync($"The following slogan was accepted:\n\n{message.Slogan}", cancellationToken); + return; + } + + if (this._attempts >= this.MaxAttempts) + { + await context.YieldOutputAsync($"The slogan was rejected after {this.MaxAttempts} attempts. Final slogan:\n\n{message.Slogan}", cancellationToken); + return; + } + + await context.SendMessageAsync(feedback, cancellationToken: cancellationToken); + this._attempts++; + } +} diff --git a/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/Program.cs b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/Program.cs new file mode 100644 index 0000000000..a5723c59d1 --- /dev/null +++ b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/Program.cs @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates how to run a CYCLIC WORKFLOW as a durable orchestration. +// The workflow contains a loop: SloganWriter ⟷ FeedbackProvider +// +// WORKFLOW LOOP PATTERN: +// 1. SloganWriter generates a slogan based on user input +// 2. FeedbackProvider evaluates the slogan and provides feedback +// 3. If the rating is below threshold, FeedbackProvider sends feedback back to SloganWriter +// 4. SloganWriter improves the slogan based on feedback +// 5. Loop continues until FeedbackProvider accepts the slogan (rating >= threshold) +// +// This demonstrates: +// - Cyclic workflow support (back-edges in the graph) +// - Multi-type executor handlers (SloganWriter handles both string and FeedbackResult) +// - Message routing via SendMessageAsync for void-returning executors +// - YieldOutputAsync for final output when the loop completes + +using Azure.Identity; +using Microsoft.Agents.AI.DurableTask; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using SingleAgent; +using Azure.AI.OpenAI; + +// Get DTS connection string from environment variable +string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"; + +var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set."); +var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini"; +var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient(); + +// Define executors for the workflow +var sloganWriter = new SloganWriterExecutor("SloganWriter", chatClient); +var feedbackProvider = new FeedbackExecutor("FeedbackProvider", chatClient); + +// Build the workflow by adding executors and connecting them +var workflow = new WorkflowBuilder(sloganWriter) + .WithName("SloganCreationWorkflow") + .AddEdge(sloganWriter, feedbackProvider) + .AddEdge(feedbackProvider, sloganWriter) + .WithOutputFrom(feedbackProvider) + .Build(); + +IHost host = Host.CreateDefaultBuilder(args) + .ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Warning)) + .ConfigureServices(services => + { + services.ConfigureDurableWorkflows( + options => options.Workflows.AddWorkflow(workflow), + workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString), + clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString)); + }) + .Build(); + +await host.StartAsync(); + +DurableTaskClient durableClient = host.Services.GetRequiredService(); + +Console.WriteLine("Workflow Events Demo - Enter input for slogan generation (or 'exit'):"); + +while (true) +{ + Console.Write("> "); + string? input = Console.ReadLine(); + if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase)) + { + break; + } + + try + { + await RunWorkflowWithStreamingAsync(input, workflow, durableClient); + } + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + } + + Console.WriteLine(); +} + +await host.StopAsync(); + +// Runs a workflow and streams events as they occur +async Task RunWorkflowWithStreamingAsync(string orderId, Workflow workflow, DurableTaskClient client) +{ + // StreamAsync starts the workflow and returns a handle for observing events + await using DurableStreamingRun run = await DurableWorkflow.StreamAsync(workflow, orderId, client); + Console.WriteLine($"Started: {run.InstanceId}"); + + // WatchStreamAsync yields events as they're emitted by executors + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + // Always print the event type name + WriteColored($" Event: {evt.GetType().Name}", ConsoleColor.Gray); + } +} + +void WriteColored(string message, ConsoleColor color) +{ + Console.ForegroundColor = color; + Console.WriteLine(message); + Console.ResetColor(); +} diff --git a/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/README.md b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/README.md new file mode 100644 index 0000000000..b03b8cbd73 --- /dev/null +++ b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/README.md @@ -0,0 +1,23 @@ +# Workflow Loop Sample + +This sample demonstrates how to run a cyclic workflow (containing loops) as a durable orchestration. + +## Overview + +The workflow iteratively improves a slogan based on AI feedback until it meets quality criteria. + +### Executors + +- **SloganWriter** - Generates slogans using AI (handles string and FeedbackResult) +- **FeedbackProvider** - Evaluates slogans (calls YieldOutput to accept, SendMessage to loop) + +## Key Concepts + +- Cyclic Workflow Support (back-edges) +- Multi-Type Executor Handlers +- Message Routing via SendMessageAsync +- Workflow Termination via YieldOutputAsync + +## Running + +Set AZURE_OPENAI_ENDPOINT and run: dotnet run diff --git a/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/SloganResult.cs b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/SloganResult.cs new file mode 100644 index 0000000000..1e2f054fe0 --- /dev/null +++ b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/SloganResult.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json.Serialization; + +namespace SingleAgent; + +public sealed class SloganResult +{ + [JsonPropertyName("task")] + public required string Task { get; set; } + + [JsonPropertyName("slogan")] + public required string Slogan { get; set; } +} + +public sealed class FeedbackResult +{ + [JsonPropertyName("comments")] + public string Comments { get; set; } = string.Empty; + + [JsonPropertyName("rating")] + public int Rating { get; set; } + + [JsonPropertyName("actions")] + public string Actions { get; set; } = string.Empty; +} diff --git a/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/SloganWriterExecutor.cs b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/SloganWriterExecutor.cs new file mode 100644 index 0000000000..c33942ed79 --- /dev/null +++ b/dotnet/samples/DurableAgents/ConsoleApps/12_WorkflowLoop/SloganWriterExecutor.cs @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; +using Microsoft.Agents.AI; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Extensions.AI; + +namespace SingleAgent; + +internal sealed class SloganWriterExecutor : Executor +{ + private readonly AIAgent _agent; + private AgentThread? _thread; + + /// + /// Initializes a new instance of the class. + /// + /// A unique identifier for the executor. + /// The chat client to use for the AI agent. + public SloganWriterExecutor(string id, IChatClient chatClient) : base(id) + { + ChatClientAgentOptions agentOptions = new() + { + ChatOptions = new() + { + Instructions = "You are a professional slogan writer. You will be given a task to create a slogan.", + ResponseFormat = ChatResponseFormat.ForJsonSchema() + } + }; + + this._agent = new ChatClientAgent(chatClient, agentOptions); + } + + protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) => + routeBuilder.AddHandler(this.HandleAsync) + .AddHandler(this.HandleAsync); + + public async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) + { + this._thread ??= await this._agent.GetNewThreadAsync(cancellationToken); + + var result = await this._agent.RunAsync(message, this._thread, cancellationToken: cancellationToken); + + return JsonSerializer.Deserialize(result.Text) ?? throw new InvalidOperationException("Failed to deserialize slogan result."); + } + + public async ValueTask HandleAsync(FeedbackResult message, IWorkflowContext context, CancellationToken cancellationToken = default) + { + var feedbackMessage = $""" + Here is the feedback on your previous slogan: + Comments: {message.Comments} + Rating: {message.Rating} + Suggested Actions: {message.Actions} + + Please use this feedback to improve your slogan. + """; + + var result = await this._agent.RunAsync(feedbackMessage, this._thread, cancellationToken: cancellationToken); + return JsonSerializer.Deserialize(result.Text) ?? throw new InvalidOperationException("Failed to deserialize slogan result."); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableWorkflowRunner.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableWorkflowRunner.cs index 4e1cbd559d..7623252a63 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableWorkflowRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableWorkflowRunner.cs @@ -191,69 +191,434 @@ public class DurableWorkflowRunner ILogger logger) { WorkflowExecutionPlan plan = WorkflowHelper.GetExecutionPlan(workflow); - Dictionary results = new(plan.Levels.Sum(l => l.Executors.Count)); + + // Use message-driven execution for all workflows + // This approach naturally handles both DAGs and cyclic workflows + return await this.ExecuteMessageDrivenAsync(context, workflow, plan, initialInput, logger).ConfigureAwait(true); + } + + /// + /// Executes a workflow using message-driven execution. + /// Messages are routed through edges dynamically, naturally supporting both DAGs and cycles. + /// + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")] + private async Task ExecuteMessageDrivenAsync( + TaskOrchestrationContext context, + Workflow workflow, + WorkflowExecutionPlan plan, + string initialInput, + ILogger logger) + { + const int MaxSupersteps = 100; + + // Message queues per executor - stores (message, inputTypeName) tuples + Dictionary> messageQueues = []; + + // Last result from each executor (for edge condition evaluation and final result) + Dictionary lastResults = []; // Track accumulated events and shared state DurableWorkflowCustomStatus customStatus = new(); Dictionary sharedState = []; - foreach (WorkflowExecutionLevel level in plan.Levels) + // Get executor bindings for creating WorkflowExecutorInfo + Dictionary executorBindings = workflow.ReflectExecutors(); + + // Initialize: queue input to start executor (initial input is a string) + EnqueueMessage(messageQueues, plan.StartExecutorId, initialInput, typeof(string).FullName); + + int superstep = 0; + bool haltRequested = false; + string? finalOutput = null; + + while (superstep < MaxSupersteps && !haltRequested) { - // Filter executors based on edge conditions from their predecessors - List eligibleExecutors = GetEligibleExecutors(level.Executors, results, plan, logger); + superstep++; - if (eligibleExecutors.Count == 0) + // Collect all executors with pending messages + List activeExecutors = messageQueues + .Where(kv => kv.Value.Count > 0) + .Select(kv => kv.Key) + .ToList(); + + if (activeExecutors.Count == 0) { - // No eligible executors at this level, continue to next level - continue; + break; // No more work } - if (eligibleExecutors.Count == 1) - { - WorkflowExecutorInfo executorInfo = eligibleExecutors[0]; - string input = GetExecutorInput(executorInfo.ExecutorId, initialInput, results, plan); - string rawResult = await this.ExecuteExecutorAsync(context, executorInfo, input, logger, customStatus, sharedState).ConfigureAwait(true); - results[executorInfo.ExecutorId] = UnwrapActivityResult(rawResult, customStatus, sharedState); +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates, expensive evaluation + logger.LogDebug("Superstep {Step}: {Count} active executor(s): {Executors}", + superstep, activeExecutors.Count, string.Join(", ", activeExecutors)); +#pragma warning restore CA1848, CA1873 - // Update custom status with any new events - UpdateCustomStatus(context, customStatus); - } - else + // Process each active executor + foreach (string executorId in activeExecutors) { - // For parallel execution, each activity gets a snapshot of the current state - // State updates are merged after all activities complete - Task<(string Id, string Result)>[] tasks = new Task<(string Id, string Result)>[eligibleExecutors.Count]; - for (int i = 0; i < eligibleExecutors.Count; i++) + Queue<(string Message, string? InputTypeName)> queue = messageQueues[executorId]; + + // Process all messages for this executor in this superstep + while (queue.Count > 0) { - WorkflowExecutorInfo executorInfo = eligibleExecutors[i]; - string input = GetExecutorInput(executorInfo.ExecutorId, initialInput, results, plan); - tasks[i] = this.ExecuteExecutorWithIdAsync(context, executorInfo, input, logger, customStatus, sharedState); + (string input, string? inputTypeName) = queue.Dequeue(); + + // Create executor info + WorkflowExecutorInfo executorInfo = CreateExecutorInfo(executorId, executorBindings); + + // Execute the activity with type information + string rawResult = await this.ExecuteExecutorAsync( + context, executorInfo, input, inputTypeName, logger, customStatus, sharedState).ConfigureAwait(true); + + (string result, List sentMessages) = UnwrapActivityResult(rawResult, customStatus, sharedState); + lastResults[executorId] = result; + + // Check for explicit halt request (via RequestHaltAsync) + if (CheckForHalt(customStatus, executorId)) + { + haltRequested = true; + finalOutput = result; +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates + logger.LogDebug("Halt requested by executor {ExecutorId}", executorId); +#pragma warning restore CA1848, CA1873 + break; + } + + // Route messages sent via SendMessageAsync (takes priority for void-returning executors) + if (sentMessages.Count > 0) + { + foreach (SentMessageInfo sentMessage in sentMessages) + { + if (!string.IsNullOrEmpty(sentMessage.Message)) + { + // Route to successors with the sent message's type + RouteMessageToSuccessors( + executorId, sentMessage.Message, sentMessage.TypeName, plan, messageQueues, logger); + } + } + } + else if (!string.IsNullOrEmpty(result)) + { + // Route executor's return value to successor executors via edges (for non-void executors) + RouteMessageToSuccessors( + executorId, result, plan, messageQueues, logger); + } } - (string Id, string Result)[] completedTasks = await Task.WhenAll(tasks).ConfigureAwait(true); - foreach ((string id, string rawResult) in completedTasks) + if (haltRequested) { - results[id] = UnwrapActivityResult(rawResult, customStatus, sharedState); + break; } - - // Update custom status with any new events - UpdateCustomStatus(context, customStatus); } + + UpdateCustomStatus(context, customStatus); } - return GetFinalResult(plan, results); + if (superstep >= MaxSupersteps) + { +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates + logger.LogWarning("Workflow reached maximum superstep limit ({MaxSteps})", MaxSupersteps); +#pragma warning restore CA1848, CA1873 + } + + // Return final output or last result from output executors + return finalOutput ?? GetMessageDrivenFinalResult(workflow, lastResults, customStatus); } /// - /// Unwraps an activity result, extracting state updates, events, and returning the actual result. + /// Enqueues a message to an executor's message queue with type information. + /// + private static void EnqueueMessage( + Dictionary> queues, + string executorId, + string message, + string? inputTypeName) + { + if (!queues.TryGetValue(executorId, out Queue<(string, string?)>? queue)) + { + queue = new Queue<(string, string?)>(); + queues[executorId] = queue; + } + + queue.Enqueue((message, inputTypeName)); + } + + /// + /// Creates a WorkflowExecutorInfo for the given executor ID. + /// + private static WorkflowExecutorInfo CreateExecutorInfo( + string executorId, + Dictionary executorBindings) + { + if (!executorBindings.TryGetValue(executorId, out ExecutorBinding? binding)) + { + throw new InvalidOperationException($"Executor '{executorId}' not found in workflow bindings."); + } + + bool isAgentic = WorkflowHelper.IsAgentExecutorType(binding.ExecutorType); + RequestPort? requestPort = (binding is RequestPortBinding rpb) ? rpb.Port : null; + + return new WorkflowExecutorInfo(executorId, isAgentic, requestPort); + } + + /// + /// Checks if the workflow should halt based on halt request events. + /// Note: YieldOutputAsync does NOT halt the workflow - it just yields intermediate output. + /// Only explicit RequestHaltAsync calls should halt the workflow. + /// + private static bool CheckForHalt( + DurableWorkflowCustomStatus customStatus, + string executorId) + { + // Look for explicit halt request events from this executor + for (int i = customStatus.Events.Count - 1; i >= 0; i--) + { + string eventJson = customStatus.Events[i]; + + // Check for DurableHaltRequestedEvent - this is the ONLY event that should halt + if (eventJson.Contains("DurableHaltRequestedEvent", StringComparison.Ordinal) && + eventJson.Contains(executorId, StringComparison.Ordinal)) + { + return true; + } + } + + return false; + } + + /// + /// Routes a message through edges to successor executors. + /// + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")] + private static void RouteMessageToSuccessors( + string sourceId, + string message, + WorkflowExecutionPlan plan, + Dictionary> messageQueues, + ILogger logger) + { + if (!plan.Successors.TryGetValue(sourceId, out List? successors)) + { + return; // No outgoing edges + } + + // Get the output type of the source executor to pass as input type to successors + plan.ExecutorOutputTypes.TryGetValue(sourceId, out Type? sourceOutputType); + string? inputTypeName = sourceOutputType?.FullName; + + foreach (string sinkId in successors) + { + // Check edge condition + if (plan.EdgeConditions.TryGetValue((sourceId, sinkId), out Func? condition) + && condition is not null) + { + try + { + // Deserialize the message for condition evaluation + object? messageObj = DeserializeForCondition(message, sourceOutputType); + + if (!condition(messageObj)) + { +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates + logger.LogDebug("Edge {Source} -> {Sink}: condition returned false, skipping", + sourceId, sinkId); +#pragma warning restore CA1848, CA1873 + continue; + } + } + catch (Exception ex) + { +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates + logger.LogWarning(ex, "Failed to evaluate condition for edge {Source} -> {Sink}, skipping", + sourceId, sinkId); +#pragma warning restore CA1848, CA1873 + continue; + } + } + + // Queue message to successor with type information +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates + logger.LogDebug("Edge {Source} -> {Sink}: routing message", sourceId, sinkId); +#pragma warning restore CA1848, CA1873 + EnqueueMessage(messageQueues, sinkId, message, inputTypeName); + } + } + + /// + /// Routes a message through edges to successor executors, with an explicit type name for the message. + /// Used for messages sent via SendMessageAsync. + /// + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")] + [UnconditionalSuppressMessage("Trimming", "IL2057", Justification = "Type resolution for workflow message types.")] + private static void RouteMessageToSuccessors( + string sourceId, + string message, + string? explicitTypeName, + WorkflowExecutionPlan plan, + Dictionary> messageQueues, + ILogger logger) + { + if (!plan.Successors.TryGetValue(sourceId, out List? successors)) + { + return; // No outgoing edges + } + + // Use explicit type name if provided, otherwise fall back to executor output type + string? inputTypeName = explicitTypeName; + Type? messageType = null; + + if (!string.IsNullOrEmpty(explicitTypeName)) + { + messageType = Type.GetType(explicitTypeName); + } + + if (messageType is null && plan.ExecutorOutputTypes.TryGetValue(sourceId, out Type? sourceOutputType)) + { + messageType = sourceOutputType; + inputTypeName = sourceOutputType?.FullName; + } + + foreach (string sinkId in successors) + { + // Check edge condition + if (plan.EdgeConditions.TryGetValue((sourceId, sinkId), out Func? condition) + && condition is not null) + { + try + { + // Deserialize the message for condition evaluation + object? messageObj = DeserializeForCondition(message, messageType); + + if (!condition(messageObj)) + { +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates + logger.LogDebug("Edge {Source} -> {Sink}: condition returned false, skipping", + sourceId, sinkId); +#pragma warning restore CA1848, CA1873 + continue; + } + } + catch (Exception ex) + { +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates + logger.LogWarning(ex, "Failed to evaluate condition for edge {Source} -> {Sink}, skipping", + sourceId, sinkId); +#pragma warning restore CA1848, CA1873 + continue; + } + } + + // Queue message to successor with type information +#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates + logger.LogDebug("Edge {Source} -> {Sink}: routing sent message (type: {TypeName})", sourceId, sinkId, inputTypeName); +#pragma warning restore CA1848, CA1873 + EnqueueMessage(messageQueues, sinkId, message, inputTypeName); + } + } + + /// + /// Gets the final result for a message-driven workflow execution. + /// Checks yielded outputs first (from YieldOutputAsync calls), then falls back to executor results. + /// + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing event types.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing event types.")] + private static string GetMessageDrivenFinalResult( + Workflow workflow, + Dictionary lastResults, + DurableWorkflowCustomStatus customStatus) + { + // First, check for yielded outputs from YieldOutputAsync calls (most recent first) + // These take priority as they represent explicit workflow outputs + string? yieldedOutput = GetLastYieldedOutput(customStatus); + if (!string.IsNullOrEmpty(yieldedOutput)) + { + return yieldedOutput; + } + + HashSet outputExecutors = workflow.ReflectOutputExecutors(); + + // If specific output executors are defined, use their results + if (outputExecutors.Count > 0) + { + List outputResults = []; + foreach (string outputExecutorId in outputExecutors) + { + if (lastResults.TryGetValue(outputExecutorId, out string? result) && !string.IsNullOrEmpty(result)) + { + outputResults.Add(result); + } + } + + if (outputResults.Count > 0) + { + return outputResults.Count == 1 + ? outputResults[0] + : string.Join("\n---\n", outputResults); + } + } + + // Otherwise, return the last non-empty result from any executor + return lastResults.Values.LastOrDefault(v => !string.IsNullOrEmpty(v)) ?? string.Empty; + } + + /// + /// Extracts the most recent yielded output from the custom status events. + /// + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing event types.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing event types.")] + private static string? GetLastYieldedOutput(DurableWorkflowCustomStatus customStatus) + { + // Look for DurableYieldedOutputEvent in events (most recent first) + for (int i = customStatus.Events.Count - 1; i >= 0; i--) + { + string eventJson = customStatus.Events[i]; + + if (eventJson.Contains("DurableYieldedOutputEvent", StringComparison.Ordinal)) + { + try + { + // Parse the wrapper to get the inner Data + using JsonDocument doc = JsonDocument.Parse(eventJson); + if (doc.RootElement.TryGetProperty("Data", out JsonElement dataElement)) + { + string? dataJson = dataElement.GetString(); + if (dataJson is not null) + { + using JsonDocument dataDoc = JsonDocument.Parse(dataJson); + if (dataDoc.RootElement.TryGetProperty("Output", out JsonElement outputElement)) + { + // The output could be a string or a serialized object + return outputElement.ValueKind == JsonValueKind.String + ? outputElement.GetString() + : outputElement.GetRawText(); + } + } + } + } + catch (JsonException) + { + // Continue to next event if parsing fails + } + } + } + + return null; + } + + /// + /// Unwraps an activity result, extracting state updates, events, sent messages, and returning the actual result. /// [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing known wrapper type.")] [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing known wrapper type.")] - private static string UnwrapActivityResult(string rawResult, DurableWorkflowCustomStatus customStatus, Dictionary sharedState) + private static (string Result, List SentMessages) UnwrapActivityResult( + string rawResult, + DurableWorkflowCustomStatus customStatus, + Dictionary sharedState) { if (string.IsNullOrEmpty(rawResult)) { - return rawResult; + return (rawResult, []); } try @@ -261,9 +626,9 @@ public class DurableWorkflowRunner // Try to deserialize as DurableActivityOutput DurableActivityOutput? output = JsonSerializer.Deserialize(rawResult); - // Check if this is actually a DurableActivityOutput (has Result property set or state updates) + // Check if this is actually a DurableActivityOutput (has Result property set or state updates or sent messages) // This distinguishes it from other JSON objects that would deserialize with default/empty values - if (output is not null && (output.Result is not null || output.StateUpdates.Count > 0 || output.ClearedScopes.Count > 0 || output.Events.Count > 0)) + if (output is not null && (output.Result is not null || output.StateUpdates.Count > 0 || output.ClearedScopes.Count > 0 || output.Events.Count > 0 || output.SentMessages.Count > 0)) { // Apply cleared scopes first foreach (string clearedScope in output.ClearedScopes) @@ -298,7 +663,7 @@ public class DurableWorkflowRunner customStatus.Events.AddRange(output.Events); } - return output.Result ?? string.Empty; + return (output.Result ?? string.Empty, output.SentMessages); } } catch (JsonException) @@ -306,7 +671,7 @@ public class DurableWorkflowRunner // Not a wrapped result, return as-is } - return rawResult; + return (rawResult, []); } /// @@ -364,7 +729,7 @@ public class DurableWorkflowRunner } /// - /// Wrapper for activity input that includes shared state. + /// Wrapper for activity input that includes shared state and type information. /// internal sealed class ActivityInputWithState { @@ -373,84 +738,17 @@ public class DurableWorkflowRunner /// public string? Input { get; set; } + /// + /// Gets or sets the assembly-qualified type name of the input, used for proper deserialization. + /// + public string? InputTypeName { get; set; } + /// /// Gets or sets the shared state dictionary. /// public Dictionary State { get; set; } = []; } - /// - /// Filters executors based on their incoming edge conditions. - /// An executor is eligible if all its incoming edges have conditions that evaluate to true, - /// or if the edges have no conditions. - /// - [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")] - [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")] - private static List GetEligibleExecutors( - List executors, - Dictionary results, - WorkflowExecutionPlan plan, - ILogger logger) - { - List eligible = new(executors.Count); - - foreach (WorkflowExecutorInfo executorInfo in executors) - { - if (!plan.Predecessors.TryGetValue(executorInfo.ExecutorId, out List? predecessors) || predecessors.Count == 0) - { - // Root executor (no predecessors) is always eligible - eligible.Add(executorInfo); - continue; - } - - // Check if any predecessor's edge condition allows this executor to run - bool isEligible = false; - foreach (string predecessorId in predecessors) - { - // Get the condition for this edge (predecessor -> current executor) - if (!plan.EdgeConditions.TryGetValue((predecessorId, executorInfo.ExecutorId), out Func? condition) || condition is null) - { - // No condition registered for this edge or edge has no condition, always eligible - isEligible = true; - break; - } - - // Evaluate the condition using the predecessor's result - if (results.TryGetValue(predecessorId, out string? predecessorResult)) - { - try - { - // Get the predecessor's output type for proper deserialization - plan.ExecutorOutputTypes.TryGetValue(predecessorId, out Type? predecessorOutputType); - - // Deserialize the predecessor result to the expected type for condition evaluation - object? resultObject = DeserializeForCondition(predecessorResult, predecessorOutputType); - if (condition(resultObject)) - { - isEligible = true; - break; - } - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to evaluate condition for edge from '{PredecessorId}' to '{ExecutorId}'", predecessorId, executorInfo.ExecutorId); - } - } - } - - if (isEligible) - { - eligible.Add(executorInfo); - } - else - { - logger.LogExecutorSkipped(executorInfo.ExecutorId); - } - } - - return eligible; - } - /// /// Deserializes a JSON string result into an object for condition evaluation. /// @@ -479,24 +777,13 @@ public class DurableWorkflowRunner } } - private async Task<(string Id, string Result)> ExecuteExecutorWithIdAsync( - TaskOrchestrationContext context, - WorkflowExecutorInfo executorInfo, - string input, - ILogger logger, - DurableWorkflowCustomStatus customStatus, - Dictionary sharedState) - { - string result = await this.ExecuteExecutorAsync(context, executorInfo, input, logger, customStatus, sharedState).ConfigureAwait(true); - return (executorInfo.ExecutorId, result); - } - [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Serializing known wrapper type.")] [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Serializing known wrapper type.")] private async Task ExecuteExecutorAsync( TaskOrchestrationContext context, WorkflowExecutorInfo executorInfo, string input, + string? inputTypeName, ILogger logger, DurableWorkflowCustomStatus customStatus, Dictionary sharedState) @@ -512,10 +799,11 @@ public class DurableWorkflowRunner string executorName = WorkflowNamingHelper.GetExecutorName(executorInfo.ExecutorId); string triggerName = WorkflowNamingHelper.ToOrchestrationFunctionName(executorName); - // Wrap input with shared state for the activity + // Wrap input with shared state and type information for the activity ActivityInputWithState inputWithState = new() { Input = input, + InputTypeName = inputTypeName, State = new Dictionary(sharedState) // Pass a copy of the state }; @@ -580,54 +868,4 @@ public class DurableWorkflowRunner AgentResponse response = await agent.RunAsync(input, thread); return response.Text; } - - private static string GetExecutorInput( - string executorId, - string initialInput, - Dictionary results, - WorkflowExecutionPlan plan) - { - if (!plan.Predecessors.TryGetValue(executorId, out List? predecessors) || predecessors.Count == 0) - { - return initialInput; - } - - if (predecessors.Count == 1) - { - return results.TryGetValue(predecessors[0], out string? result) ? result : initialInput; - } - - List aggregated = new(predecessors.Count); - foreach (string predecessorId in predecessors) - { - if (results.TryGetValue(predecessorId, out string? result)) - { - aggregated.Add(result); - } - } - - return SerializeToJson(aggregated); - } - - private static string GetFinalResult(WorkflowExecutionPlan plan, Dictionary results) - { - WorkflowExecutionLevel lastLevel = plan.Levels[^1]; - List lastExecutors = lastLevel.Executors; - - if (lastExecutors.Count == 1) - { - return results.TryGetValue(lastExecutors[0].ExecutorId, out string? singleResult) ? singleResult : string.Empty; - } - - List finalResults = new(lastExecutors.Count); - foreach (WorkflowExecutorInfo executor in lastExecutors) - { - if (results.TryGetValue(executor.ExecutorId, out string? result)) - { - finalResults.Add(result); - } - } - - return string.Join("\n---\n", finalResults); - } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableWorkflowServiceCollectionExtensions.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableWorkflowServiceCollectionExtensions.cs index aa358affb9..c73e12d7a5 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableWorkflowServiceCollectionExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableWorkflowServiceCollectionExtensions.cs @@ -201,7 +201,8 @@ public static class DurableWorkflowServiceCollectionExtensions // Create executor instance from binding Executor executor = await binding.FactoryAsync!("activity-run").ConfigureAwait(false); - Type inputType = executor.InputTypes.FirstOrDefault() ?? typeof(string); + // Determine the input type - prefer the provided type name, fall back to first supported type + Type inputType = ResolveInputType(inputWithState?.InputTypeName, executor.InputTypes); object typedInput = DeserializeInput(executorInput, inputType); // Create a pipeline context that has access to shared state and executor @@ -213,18 +214,52 @@ public static class DurableWorkflowServiceCollectionExtensions workflowContext, CancellationToken.None).ConfigureAwait(false); - // Always return wrapped output with state updates, events, and result + // Always return wrapped output with state updates, events, sent messages, and result DurableActivityOutput output = new() { Result = SerializeResult(result), StateUpdates = workflowContext.StateUpdates, ClearedScopes = [.. workflowContext.ClearedScopes], - Events = workflowContext.Events.ConvertAll(SerializeEvent) + Events = workflowContext.Events.ConvertAll(SerializeEvent), + SentMessages = workflowContext.SentMessages.ConvertAll(m => new SentMessageInfo { Message = m.Message, TypeName = m.TypeName }) }; return JsonSerializer.Serialize(output); } + /// + /// Resolves the input type from the provided type name, or falls back to the first supported type. + /// + [UnconditionalSuppressMessage("Trimming", "IL2026:RequiresUnreferencedCode", Justification = "Type resolution for registered executor types.")] + [UnconditionalSuppressMessage("Trimming", "IL2057:TypeGetType", Justification = "Type resolution for registered executor types.")] + [UnconditionalSuppressMessage("AOT", "IL3050:RequiresDynamicCode", Justification = "Type resolution for registered executor types.")] + private static Type ResolveInputType(string? inputTypeName, ISet supportedTypes) + { + if (!string.IsNullOrEmpty(inputTypeName)) + { + // Try to find a matching type in the supported types + foreach (Type supportedType in supportedTypes) + { + if (supportedType.AssemblyQualifiedName == inputTypeName || + supportedType.FullName == inputTypeName || + supportedType.Name == inputTypeName) + { + return supportedType; + } + } + + // Try to load the type directly (for types not in supported types) + Type? resolvedType = Type.GetType(inputTypeName); + if (resolvedType is not null) + { + return resolvedType; + } + } + + // Fall back to first supported type or string + return supportedTypes.FirstOrDefault() ?? typeof(string); + } + [UnconditionalSuppressMessage("Trimming", "IL2026:RequiresUnreferencedCode", Justification = "Deserializing known wrapper type.")] [UnconditionalSuppressMessage("AOT", "IL3050:RequiresDynamicCode", Justification = "Deserializing known wrapper type.")] private static DurableActivityInput? TryDeserializeActivityInput(string input) @@ -356,6 +391,11 @@ internal sealed class DurableActivityInput /// public string? Input { get; set; } + /// + /// Gets or sets the assembly-qualified type name of the input, used for proper deserialization. + /// + public string? InputTypeName { get; set; } + /// /// Gets or sets the shared state dictionary (scope-prefixed key -> serialized value). /// @@ -386,4 +426,26 @@ internal sealed class DurableActivityOutput /// Gets or sets the serialized workflow events emitted during activity execution. /// public List Events { get; set; } = []; + + /// + /// Gets or sets messages sent via SendMessageAsync during activity execution. + /// Each entry is a tuple of (serializedMessage, typeName). + /// + public List SentMessages { get; set; } = []; +} + +/// +/// Information about a message sent via SendMessageAsync. +/// +internal sealed class SentMessageInfo +{ + /// + /// Gets or sets the serialized message content. + /// + public string? Message { get; set; } + + /// + /// Gets or sets the full type name of the message. + /// + public string? TypeName { get; set; } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/PipelineActivityContext.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/PipelineActivityContext.cs index 8ccfffde97..a8d170529c 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/PipelineActivityContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/PipelineActivityContext.cs @@ -41,6 +41,12 @@ internal sealed class PipelineActivityContext : IWorkflowContext /// public HashSet ClearedScopes { get; } = []; + /// + /// Gets the messages sent during activity execution via SendMessageAsync. + /// Each entry is (serializedMessage, typeName). + /// + public List<(string Message, string TypeName)> SentMessages { get; } = []; + /// public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) { @@ -53,7 +59,20 @@ internal sealed class PipelineActivityContext : IWorkflowContext } /// - public ValueTask SendMessageAsync(object message, string? targetId = null, CancellationToken cancellationToken = default) => default; + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Serializing workflow message types.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Serializing workflow message types.")] + public ValueTask SendMessageAsync(object message, string? targetId = null, CancellationToken cancellationToken = default) + { + if (message is not null) + { + // Capture the message and its type for routing in the orchestrator + string serializedMessage = JsonSerializer.Serialize(message, message.GetType()); + string typeName = message.GetType().FullName ?? message.GetType().Name; + this.SentMessages.Add((serializedMessage, typeName)); + } + + return default; + } /// public ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/WorkflowExecutionPlan.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/WorkflowExecutionPlan.cs index fc6ed55ad6..66647fcd95 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/WorkflowExecutionPlan.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/WorkflowExecutionPlan.cs @@ -42,4 +42,20 @@ public sealed class WorkflowExecutionPlan /// Gets whether this workflow has any Fan-In points. /// public bool HasFanIn => this.Levels.Any(l => l.IsFanIn); + + /// + /// Gets or sets whether this workflow contains cycles requiring iterative message-driven execution. + /// + public bool HasCycles { get; set; } + + /// + /// Gets the back-edges that create cycles in the workflow graph. + /// These edges are excluded from topological level computation but are followed during message-driven execution. + /// + public List<(string SourceId, string TargetId)> BackEdges { get; } = []; + + /// + /// Gets or sets the starting executor ID for the workflow. + /// + public string StartExecutorId { get; set; } = string.Empty; } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/WorkflowHelper.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/WorkflowHelper.cs index 1237ad7081..f30b6a1efd 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/WorkflowHelper.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/WorkflowHelper.cs @@ -56,6 +56,7 @@ public static class WorkflowHelper /// Analyzes the workflow and returns an execution plan that supports Fan-Out/Fan-In patterns. /// Executors at the same level can be executed in parallel (Fan-Out). /// Fan-In points are identified where multiple executors converge. + /// Cyclic workflows are detected and marked for message-driven execution. /// /// The workflow instance to analyze. /// An execution plan with parallel execution levels. @@ -67,12 +68,14 @@ public static class WorkflowHelper Dictionary> edges = workflow.ReflectEdges(); Dictionary<(string SourceId, string TargetId), Func?> edgeConditions = workflow.ReflectEdgeConditions(); - WorkflowExecutionPlan plan = new(); + WorkflowExecutionPlan plan = new() + { + StartExecutorId = workflow.StartExecutorId + }; // Build adjacency lists (successors and predecessors) Dictionary> successors = new(executors.Count); Dictionary> predecessors = new(executors.Count); - int[] inDegree = new int[executors.Count]; Dictionary executorIndex = new(executors.Count); // Initialize all executors and extract their output types @@ -97,16 +100,39 @@ public static class WorkflowHelper { foreach (string sinkId in edge.Connection.SinkIds) { - if (executorIndex.TryGetValue(sinkId, out int sinkIdx)) + if (executorIndex.ContainsKey(sinkId)) { sourceSuccessors.Add(sinkId); predecessors[sinkId].Add(sourceId); - inDegree[sinkIdx]++; } } } } + // Detect back-edges using DFS from start executor + HashSet<(string Source, string Target)> backEdges = DetectBackEdges(workflow.StartExecutorId, successors); + + // Mark cycles in plan + plan.HasCycles = backEdges.Count > 0; + foreach ((string source, string target) in backEdges) + { + plan.BackEdges.Add((source, target)); + } + + // Calculate in-degrees, EXCLUDING back-edges + int[] inDegree = new int[executors.Count]; + foreach (string executorId in executors.Keys) + { + foreach (string pred in predecessors[executorId]) + { + // Only count edge if it's NOT a back-edge + if (!backEdges.Contains((pred, executorId))) + { + inDegree[executorIndex[executorId]]++; + } + } + } + // Store edge conditions in the plan foreach (KeyValuePair<(string SourceId, string TargetId), Func?> condition in edgeConditions) { @@ -150,15 +176,23 @@ public static class WorkflowHelper RequestPort? requestPort = (executorBinding is RequestPortBinding rpb) ? rpb.Port : null; levelExecutors.Add(new WorkflowExecutorInfo(executorId, isAgentic, requestPort)); - // Check Fan-In for this executor - if (predecessors[executorId].Count > 1) + // Check Fan-In for this executor (excluding back-edges) + int nonBackEdgePredecessors = predecessors[executorId] + .Count(pred => !backEdges.Contains((pred, executorId))); + if (nonBackEdgePredecessors > 1) { isFanIn = true; } - // Decrement in-degree of all successors and enqueue those ready for next level + // Decrement in-degree of all successors (excluding back-edges) and enqueue those ready for next level foreach (string successor in successors[executorId]) { + // Skip back-edges for topological ordering + if (backEdges.Contains((executorId, successor))) + { + continue; + } + int successorIdx = executorIndex[successor]; if (--inDegree[successorIdx] == 0) { @@ -172,7 +206,7 @@ public static class WorkflowHelper currentLevel = nextLevel; } - // Handle cycle detection: if not all executors were processed, there's a cycle + // Handle any remaining executors not processed (shouldn't happen if back-edge detection is correct) if (processedCount < executors.Count) { List remainingExecutors = []; @@ -196,6 +230,60 @@ public static class WorkflowHelper return plan; } + /// + /// Detects back-edges in the graph using DFS. + /// A back-edge is an edge that points to an ancestor in the DFS tree (creates a cycle). + /// + /// The starting executor ID for DFS traversal. + /// The adjacency list mapping each executor to its successors. + /// A set of back-edges as (source, target) tuples. + private static HashSet<(string Source, string Target)> DetectBackEdges( + string startId, + Dictionary> successors) + { + HashSet<(string, string)> backEdges = []; + HashSet visited = []; + HashSet inStack = []; // Nodes in current DFS path + + void Dfs(string nodeId) + { + visited.Add(nodeId); + inStack.Add(nodeId); + + if (successors.TryGetValue(nodeId, out List? neighbors)) + { + foreach (string neighbor in neighbors) + { + if (inStack.Contains(neighbor)) + { + // Edge to ancestor in current path = back-edge (creates cycle) + backEdges.Add((nodeId, neighbor)); + } + else if (!visited.Contains(neighbor)) + { + Dfs(neighbor); + } + } + } + + inStack.Remove(nodeId); + } + + // Start DFS from the workflow's start executor + Dfs(startId); + + // Handle any disconnected components (shouldn't happen in valid workflows, but for safety) + foreach (string nodeId in successors.Keys) + { + if (!visited.Contains(nodeId)) + { + Dfs(nodeId); + } + } + + return backEdges; + } + /// /// Determines whether the specified executor type is an agentic executor. /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs index 0c54d3b8c7..c1b7cc1817 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs @@ -45,7 +45,6 @@ public class Workflow /// A dictionary where each key is a tuple containing the source and target node identifiers, and each value is a /// delegate that evaluates the condition for the corresponding edge. The value is if the /// edge has no associated condition. - [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1024:Use properties where appropriate", Justification = "")] public Dictionary<(string SourceId, string TargetId), Func?> ReflectEdgeConditions() { int capacity = 0;