WIP - Replace Kahns level based with loop(like in-proc)

This commit is contained in:
Shyju Krishnankutty
2026-01-26 14:19:40 -08:00
Unverified
parent cf6e0070b4
commit 420e7b6038
14 changed files with 942 additions and 195 deletions
+1
View File
@@ -54,6 +54,7 @@
<Project Path="samples/DurableAgents/ConsoleApps/09_Workflow_Concurrency/09_Workflow_Concurrency.csproj" />
<Project Path="samples/DurableAgents/ConsoleApps/10_Workflow_HITL/10_Workflow_HITL.csproj" />
<Project Path="samples/DurableAgents/ConsoleApps/11_WorkflowEvents/11_WorkflowEvents.csproj" />
<Project Path="samples/DurableAgents/ConsoleApps/12_WorkflowTypeHandling/12_WorkflowTypeHandling.csproj" />
</Folder>
<Folder Name="/Samples/GettingStarted/">
<File Path="samples/GettingStarted/README.md" />
@@ -92,36 +92,39 @@ async Task RunWorkflowWithStreamingAsync(string orderId, Workflow workflow, Dura
// WatchStreamAsync yields events as they're emitted by executors
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// Always print the event type name
Console.WriteLine($" Event: {evt.GetType().Name}");
switch (evt)
{
// Custom domain events (emitted via AddEventAsync)
case OrderLookupStartedEvent e:
WriteColored($" [Lookup] Looking up order {e.OrderId}", ConsoleColor.Cyan);
WriteColored($" [Lookup] Looking up order {e.OrderId}", ConsoleColor.Cyan);
break;
case OrderFoundEvent e:
WriteColored($" [Lookup] Found: {e.Order.Customer.Name}", ConsoleColor.Cyan);
WriteColored($" [Lookup] Found: {e.Order.Customer.Name}", ConsoleColor.Cyan);
break;
case CancellationProgressEvent e:
WriteColored($" [Cancel] {e.PercentComplete}% - {e.Status}", ConsoleColor.Yellow);
WriteColored($" [Cancel] {e.PercentComplete}% - {e.Status}", ConsoleColor.Yellow);
break;
case OrderCancelledEvent e:
WriteColored(" [Cancel] Done", ConsoleColor.Yellow);
WriteColored(" [Cancel] Done", ConsoleColor.Yellow);
break;
case EmailSentEvent e:
WriteColored($" [Email] Sent to {e.Email}", ConsoleColor.Magenta);
WriteColored($" [Email] Sent to {e.Email}", ConsoleColor.Magenta);
break;
// Yielded outputs (emitted via YieldOutputAsync)
case DurableYieldedOutputEvent e:
WriteColored($" [Output] {e.ExecutorId}", ConsoleColor.DarkGray);
WriteColored($" [Output] {e.ExecutorId}", ConsoleColor.DarkGray);
break;
// Workflow completion
case DurableWorkflowCompletedEvent e:
WriteColored($"Completed: {e.Result}", ConsoleColor.Green);
WriteColored($" Completed: {e.Result}", ConsoleColor.Green);
break;
case DurableWorkflowFailedEvent e:
WriteColored($"Failed: {e.ErrorMessage}", ConsoleColor.Red);
WriteColored($" Failed: {e.ErrorMessage}", ConsoleColor.Red);
break;
}
}
@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net10.0</TargetFrameworks>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<AssemblyName>SingleWorkflow</AssemblyName>
<RootNamespace>SingleAgent</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Microsoft.DurableTask.Client.AzureManaged" />
<PackageReference Include="Microsoft.DurableTask.Worker.AzureManaged" />
<PackageReference Include="Microsoft.Extensions.Hosting" />
<PackageReference Include="Azure.AI.OpenAI" />
</ItemGroup>
<!-- Local projects that should be switched to package references when using the sample outside of this MAF repo -->
<!--
<ItemGroup>
<PackageReference Include="Microsoft.Agents.AI.DurableTask" />
<PackageReference Include="Microsoft.Agents.AI.Workflows" />
</ItemGroup>
-->
<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.DurableTask\Microsoft.Agents.AI.DurableTask.csproj" />
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.Workflows\Microsoft.Agents.AI.Workflows.csproj" />
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.OpenAI\Microsoft.Agents.AI.OpenAI.csproj" />
</ItemGroup>
</Project>
@@ -0,0 +1,68 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Text.Json;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
namespace SingleAgent;
internal sealed class FeedbackExecutor : Executor<SloganResult>
{
private readonly AIAgent _agent;
private AgentThread? _thread;
public int MinimumRating { get; init; } = 9;
public int MaxAttempts { get; init; } = 3;
private int _attempts;
/// <summary>
/// Initializes a new instance of the <see cref="FeedbackExecutor"/> class.
/// </summary>
/// <param name="id">A unique identifier for the executor.</param>
/// <param name="chatClient">The chat client to use for the AI agent.</param>
public FeedbackExecutor(string id, IChatClient chatClient) : base(id)
{
ChatClientAgentOptions agentOptions = new()
{
ChatOptions = new()
{
Instructions = "You are a professional editor. You will be given a slogan and the task it is meant to accomplish.",
ResponseFormat = ChatResponseFormat.ForJsonSchema<FeedbackResult>()
}
};
this._agent = new ChatClientAgent(chatClient, agentOptions);
}
public override async ValueTask HandleAsync(SloganResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._thread ??= await this._agent.GetNewThreadAsync(cancellationToken);
var sloganMessage = $"""
Here is a slogan for the task '{message.Task}':
Slogan: {message.Slogan}
Please provide feedback on this slogan, including comments, a rating from 1 to 10, and suggested actions for improvement.
""";
var response = await this._agent.RunAsync(sloganMessage, this._thread, cancellationToken: cancellationToken);
var feedback = JsonSerializer.Deserialize<FeedbackResult>(response.Text) ?? throw new InvalidOperationException("Failed to deserialize feedback.");
if (feedback.Rating >= this.MinimumRating)
{
await context.YieldOutputAsync($"The following slogan was accepted:\n\n{message.Slogan}", cancellationToken);
return;
}
if (this._attempts >= this.MaxAttempts)
{
await context.YieldOutputAsync($"The slogan was rejected after {this.MaxAttempts} attempts. Final slogan:\n\n{message.Slogan}", cancellationToken);
return;
}
await context.SendMessageAsync(feedback, cancellationToken: cancellationToken);
this._attempts++;
}
}
@@ -0,0 +1,112 @@
// Copyright (c) Microsoft. All rights reserved.
// This sample demonstrates how to run a CYCLIC WORKFLOW as a durable orchestration.
// The workflow contains a loop: SloganWriter ⟷ FeedbackProvider
//
// WORKFLOW LOOP PATTERN:
// 1. SloganWriter generates a slogan based on user input
// 2. FeedbackProvider evaluates the slogan and provides feedback
// 3. If the rating is below threshold, FeedbackProvider sends feedback back to SloganWriter
// 4. SloganWriter improves the slogan based on feedback
// 5. Loop continues until FeedbackProvider accepts the slogan (rating >= threshold)
//
// This demonstrates:
// - Cyclic workflow support (back-edges in the graph)
// - Multi-type executor handlers (SloganWriter handles both string and FeedbackResult)
// - Message routing via SendMessageAsync for void-returning executors
// - YieldOutputAsync for final output when the loop completes
using Azure.Identity;
using Microsoft.Agents.AI.DurableTask;
using Microsoft.Agents.AI.Workflows;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.AzureManaged;
using Microsoft.DurableTask.Worker.AzureManaged;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SingleAgent;
using Azure.AI.OpenAI;
// Get DTS connection string from environment variable
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Define executors for the workflow
var sloganWriter = new SloganWriterExecutor("SloganWriter", chatClient);
var feedbackProvider = new FeedbackExecutor("FeedbackProvider", chatClient);
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(sloganWriter)
.WithName("SloganCreationWorkflow")
.AddEdge(sloganWriter, feedbackProvider)
.AddEdge(feedbackProvider, sloganWriter)
.WithOutputFrom(feedbackProvider)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Warning))
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
options => options.Workflows.AddWorkflow(workflow),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
DurableTaskClient durableClient = host.Services.GetRequiredService<DurableTaskClient>();
Console.WriteLine("Workflow Events Demo - Enter input for slogan generation (or 'exit'):");
while (true)
{
Console.Write("> ");
string? input = Console.ReadLine();
if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
break;
}
try
{
await RunWorkflowWithStreamingAsync(input, workflow, durableClient);
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Console.WriteLine();
}
await host.StopAsync();
// Runs a workflow and streams events as they occur
async Task RunWorkflowWithStreamingAsync(string orderId, Workflow workflow, DurableTaskClient client)
{
// StreamAsync starts the workflow and returns a handle for observing events
await using DurableStreamingRun run = await DurableWorkflow.StreamAsync(workflow, orderId, client);
Console.WriteLine($"Started: {run.InstanceId}");
// WatchStreamAsync yields events as they're emitted by executors
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// Always print the event type name
WriteColored($" Event: {evt.GetType().Name}", ConsoleColor.Gray);
}
}
void WriteColored(string message, ConsoleColor color)
{
Console.ForegroundColor = color;
Console.WriteLine(message);
Console.ResetColor();
}
@@ -0,0 +1,23 @@
# Workflow Loop Sample
This sample demonstrates how to run a cyclic workflow (containing loops) as a durable orchestration.
## Overview
The workflow iteratively improves a slogan based on AI feedback until it meets quality criteria.
### Executors
- **SloganWriter** - Generates slogans using AI (handles string and FeedbackResult)
- **FeedbackProvider** - Evaluates slogans (calls YieldOutput to accept, SendMessage to loop)
## Key Concepts
- Cyclic Workflow Support (back-edges)
- Multi-Type Executor Handlers
- Message Routing via SendMessageAsync
- Workflow Termination via YieldOutputAsync
## Running
Set AZURE_OPENAI_ENDPOINT and run: dotnet run
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Text.Json.Serialization;
namespace SingleAgent;
public sealed class SloganResult
{
[JsonPropertyName("task")]
public required string Task { get; set; }
[JsonPropertyName("slogan")]
public required string Slogan { get; set; }
}
public sealed class FeedbackResult
{
[JsonPropertyName("comments")]
public string Comments { get; set; } = string.Empty;
[JsonPropertyName("rating")]
public int Rating { get; set; }
[JsonPropertyName("actions")]
public string Actions { get; set; } = string.Empty;
}
@@ -0,0 +1,61 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Text.Json;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
namespace SingleAgent;
internal sealed class SloganWriterExecutor : Executor
{
private readonly AIAgent _agent;
private AgentThread? _thread;
/// <summary>
/// Initializes a new instance of the <see cref="SloganWriterExecutor"/> class.
/// </summary>
/// <param name="id">A unique identifier for the executor.</param>
/// <param name="chatClient">The chat client to use for the AI agent.</param>
public SloganWriterExecutor(string id, IChatClient chatClient) : base(id)
{
ChatClientAgentOptions agentOptions = new()
{
ChatOptions = new()
{
Instructions = "You are a professional slogan writer. You will be given a task to create a slogan.",
ResponseFormat = ChatResponseFormat.ForJsonSchema<SloganResult>()
}
};
this._agent = new ChatClientAgent(chatClient, agentOptions);
}
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
routeBuilder.AddHandler<string, SloganResult>(this.HandleAsync)
.AddHandler<FeedbackResult, SloganResult>(this.HandleAsync);
public async ValueTask<SloganResult> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._thread ??= await this._agent.GetNewThreadAsync(cancellationToken);
var result = await this._agent.RunAsync(message, this._thread, cancellationToken: cancellationToken);
return JsonSerializer.Deserialize<SloganResult>(result.Text) ?? throw new InvalidOperationException("Failed to deserialize slogan result.");
}
public async ValueTask<SloganResult> HandleAsync(FeedbackResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var feedbackMessage = $"""
Here is the feedback on your previous slogan:
Comments: {message.Comments}
Rating: {message.Rating}
Suggested Actions: {message.Actions}
Please use this feedback to improve your slogan.
""";
var result = await this._agent.RunAsync(feedbackMessage, this._thread, cancellationToken: cancellationToken);
return JsonSerializer.Deserialize<SloganResult>(result.Text) ?? throw new InvalidOperationException("Failed to deserialize slogan result.");
}
}
@@ -191,69 +191,434 @@ public class DurableWorkflowRunner
ILogger logger)
{
WorkflowExecutionPlan plan = WorkflowHelper.GetExecutionPlan(workflow);
Dictionary<string, string> results = new(plan.Levels.Sum(l => l.Executors.Count));
// Use message-driven execution for all workflows
// This approach naturally handles both DAGs and cyclic workflows
return await this.ExecuteMessageDrivenAsync(context, workflow, plan, initialInput, logger).ConfigureAwait(true);
}
/// <summary>
/// Executes a workflow using message-driven execution.
/// Messages are routed through edges dynamically, naturally supporting both DAGs and cycles.
/// </summary>
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")]
private async Task<string> ExecuteMessageDrivenAsync(
TaskOrchestrationContext context,
Workflow workflow,
WorkflowExecutionPlan plan,
string initialInput,
ILogger logger)
{
const int MaxSupersteps = 100;
// Message queues per executor - stores (message, inputTypeName) tuples
Dictionary<string, Queue<(string Message, string? InputTypeName)>> messageQueues = [];
// Last result from each executor (for edge condition evaluation and final result)
Dictionary<string, string> lastResults = [];
// Track accumulated events and shared state
DurableWorkflowCustomStatus customStatus = new();
Dictionary<string, string> sharedState = [];
foreach (WorkflowExecutionLevel level in plan.Levels)
// Get executor bindings for creating WorkflowExecutorInfo
Dictionary<string, ExecutorBinding> executorBindings = workflow.ReflectExecutors();
// Initialize: queue input to start executor (initial input is a string)
EnqueueMessage(messageQueues, plan.StartExecutorId, initialInput, typeof(string).FullName);
int superstep = 0;
bool haltRequested = false;
string? finalOutput = null;
while (superstep < MaxSupersteps && !haltRequested)
{
// Filter executors based on edge conditions from their predecessors
List<WorkflowExecutorInfo> eligibleExecutors = GetEligibleExecutors(level.Executors, results, plan, logger);
superstep++;
if (eligibleExecutors.Count == 0)
// Collect all executors with pending messages
List<string> activeExecutors = messageQueues
.Where(kv => kv.Value.Count > 0)
.Select(kv => kv.Key)
.ToList();
if (activeExecutors.Count == 0)
{
// No eligible executors at this level, continue to next level
continue;
break; // No more work
}
if (eligibleExecutors.Count == 1)
{
WorkflowExecutorInfo executorInfo = eligibleExecutors[0];
string input = GetExecutorInput(executorInfo.ExecutorId, initialInput, results, plan);
string rawResult = await this.ExecuteExecutorAsync(context, executorInfo, input, logger, customStatus, sharedState).ConfigureAwait(true);
results[executorInfo.ExecutorId] = UnwrapActivityResult(rawResult, customStatus, sharedState);
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates, expensive evaluation
logger.LogDebug("Superstep {Step}: {Count} active executor(s): {Executors}",
superstep, activeExecutors.Count, string.Join(", ", activeExecutors));
#pragma warning restore CA1848, CA1873
// Update custom status with any new events
UpdateCustomStatus(context, customStatus);
}
else
// Process each active executor
foreach (string executorId in activeExecutors)
{
// For parallel execution, each activity gets a snapshot of the current state
// State updates are merged after all activities complete
Task<(string Id, string Result)>[] tasks = new Task<(string Id, string Result)>[eligibleExecutors.Count];
for (int i = 0; i < eligibleExecutors.Count; i++)
Queue<(string Message, string? InputTypeName)> queue = messageQueues[executorId];
// Process all messages for this executor in this superstep
while (queue.Count > 0)
{
WorkflowExecutorInfo executorInfo = eligibleExecutors[i];
string input = GetExecutorInput(executorInfo.ExecutorId, initialInput, results, plan);
tasks[i] = this.ExecuteExecutorWithIdAsync(context, executorInfo, input, logger, customStatus, sharedState);
(string input, string? inputTypeName) = queue.Dequeue();
// Create executor info
WorkflowExecutorInfo executorInfo = CreateExecutorInfo(executorId, executorBindings);
// Execute the activity with type information
string rawResult = await this.ExecuteExecutorAsync(
context, executorInfo, input, inputTypeName, logger, customStatus, sharedState).ConfigureAwait(true);
(string result, List<SentMessageInfo> sentMessages) = UnwrapActivityResult(rawResult, customStatus, sharedState);
lastResults[executorId] = result;
// Check for explicit halt request (via RequestHaltAsync)
if (CheckForHalt(customStatus, executorId))
{
haltRequested = true;
finalOutput = result;
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates
logger.LogDebug("Halt requested by executor {ExecutorId}", executorId);
#pragma warning restore CA1848, CA1873
break;
}
// Route messages sent via SendMessageAsync (takes priority for void-returning executors)
if (sentMessages.Count > 0)
{
foreach (SentMessageInfo sentMessage in sentMessages)
{
if (!string.IsNullOrEmpty(sentMessage.Message))
{
// Route to successors with the sent message's type
RouteMessageToSuccessors(
executorId, sentMessage.Message, sentMessage.TypeName, plan, messageQueues, logger);
}
}
}
else if (!string.IsNullOrEmpty(result))
{
// Route executor's return value to successor executors via edges (for non-void executors)
RouteMessageToSuccessors(
executorId, result, plan, messageQueues, logger);
}
}
(string Id, string Result)[] completedTasks = await Task.WhenAll(tasks).ConfigureAwait(true);
foreach ((string id, string rawResult) in completedTasks)
if (haltRequested)
{
results[id] = UnwrapActivityResult(rawResult, customStatus, sharedState);
break;
}
// Update custom status with any new events
UpdateCustomStatus(context, customStatus);
}
UpdateCustomStatus(context, customStatus);
}
return GetFinalResult(plan, results);
if (superstep >= MaxSupersteps)
{
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates
logger.LogWarning("Workflow reached maximum superstep limit ({MaxSteps})", MaxSupersteps);
#pragma warning restore CA1848, CA1873
}
// Return final output or last result from output executors
return finalOutput ?? GetMessageDrivenFinalResult(workflow, lastResults, customStatus);
}
/// <summary>
/// Unwraps an activity result, extracting state updates, events, and returning the actual result.
/// Enqueues a message to an executor's message queue with type information.
/// </summary>
private static void EnqueueMessage(
Dictionary<string, Queue<(string Message, string? InputTypeName)>> queues,
string executorId,
string message,
string? inputTypeName)
{
if (!queues.TryGetValue(executorId, out Queue<(string, string?)>? queue))
{
queue = new Queue<(string, string?)>();
queues[executorId] = queue;
}
queue.Enqueue((message, inputTypeName));
}
/// <summary>
/// Creates a WorkflowExecutorInfo for the given executor ID.
/// </summary>
private static WorkflowExecutorInfo CreateExecutorInfo(
string executorId,
Dictionary<string, ExecutorBinding> executorBindings)
{
if (!executorBindings.TryGetValue(executorId, out ExecutorBinding? binding))
{
throw new InvalidOperationException($"Executor '{executorId}' not found in workflow bindings.");
}
bool isAgentic = WorkflowHelper.IsAgentExecutorType(binding.ExecutorType);
RequestPort? requestPort = (binding is RequestPortBinding rpb) ? rpb.Port : null;
return new WorkflowExecutorInfo(executorId, isAgentic, requestPort);
}
/// <summary>
/// Checks if the workflow should halt based on halt request events.
/// Note: YieldOutputAsync does NOT halt the workflow - it just yields intermediate output.
/// Only explicit RequestHaltAsync calls should halt the workflow.
/// </summary>
private static bool CheckForHalt(
DurableWorkflowCustomStatus customStatus,
string executorId)
{
// Look for explicit halt request events from this executor
for (int i = customStatus.Events.Count - 1; i >= 0; i--)
{
string eventJson = customStatus.Events[i];
// Check for DurableHaltRequestedEvent - this is the ONLY event that should halt
if (eventJson.Contains("DurableHaltRequestedEvent", StringComparison.Ordinal) &&
eventJson.Contains(executorId, StringComparison.Ordinal))
{
return true;
}
}
return false;
}
/// <summary>
/// Routes a message through edges to successor executors.
/// </summary>
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")]
private static void RouteMessageToSuccessors(
string sourceId,
string message,
WorkflowExecutionPlan plan,
Dictionary<string, Queue<(string Message, string? InputTypeName)>> messageQueues,
ILogger logger)
{
if (!plan.Successors.TryGetValue(sourceId, out List<string>? successors))
{
return; // No outgoing edges
}
// Get the output type of the source executor to pass as input type to successors
plan.ExecutorOutputTypes.TryGetValue(sourceId, out Type? sourceOutputType);
string? inputTypeName = sourceOutputType?.FullName;
foreach (string sinkId in successors)
{
// Check edge condition
if (plan.EdgeConditions.TryGetValue((sourceId, sinkId), out Func<object?, bool>? condition)
&& condition is not null)
{
try
{
// Deserialize the message for condition evaluation
object? messageObj = DeserializeForCondition(message, sourceOutputType);
if (!condition(messageObj))
{
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates
logger.LogDebug("Edge {Source} -> {Sink}: condition returned false, skipping",
sourceId, sinkId);
#pragma warning restore CA1848, CA1873
continue;
}
}
catch (Exception ex)
{
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates
logger.LogWarning(ex, "Failed to evaluate condition for edge {Source} -> {Sink}, skipping",
sourceId, sinkId);
#pragma warning restore CA1848, CA1873
continue;
}
}
// Queue message to successor with type information
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates
logger.LogDebug("Edge {Source} -> {Sink}: routing message", sourceId, sinkId);
#pragma warning restore CA1848, CA1873
EnqueueMessage(messageQueues, sinkId, message, inputTypeName);
}
}
/// <summary>
/// Routes a message through edges to successor executors, with an explicit type name for the message.
/// Used for messages sent via SendMessageAsync.
/// </summary>
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")]
[UnconditionalSuppressMessage("Trimming", "IL2057", Justification = "Type resolution for workflow message types.")]
private static void RouteMessageToSuccessors(
string sourceId,
string message,
string? explicitTypeName,
WorkflowExecutionPlan plan,
Dictionary<string, Queue<(string Message, string? InputTypeName)>> messageQueues,
ILogger logger)
{
if (!plan.Successors.TryGetValue(sourceId, out List<string>? successors))
{
return; // No outgoing edges
}
// Use explicit type name if provided, otherwise fall back to executor output type
string? inputTypeName = explicitTypeName;
Type? messageType = null;
if (!string.IsNullOrEmpty(explicitTypeName))
{
messageType = Type.GetType(explicitTypeName);
}
if (messageType is null && plan.ExecutorOutputTypes.TryGetValue(sourceId, out Type? sourceOutputType))
{
messageType = sourceOutputType;
inputTypeName = sourceOutputType?.FullName;
}
foreach (string sinkId in successors)
{
// Check edge condition
if (plan.EdgeConditions.TryGetValue((sourceId, sinkId), out Func<object?, bool>? condition)
&& condition is not null)
{
try
{
// Deserialize the message for condition evaluation
object? messageObj = DeserializeForCondition(message, messageType);
if (!condition(messageObj))
{
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates
logger.LogDebug("Edge {Source} -> {Sink}: condition returned false, skipping",
sourceId, sinkId);
#pragma warning restore CA1848, CA1873
continue;
}
}
catch (Exception ex)
{
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates
logger.LogWarning(ex, "Failed to evaluate condition for edge {Source} -> {Sink}, skipping",
sourceId, sinkId);
#pragma warning restore CA1848, CA1873
continue;
}
}
// Queue message to successor with type information
#pragma warning disable CA1848, CA1873 // Use LoggerMessage delegates
logger.LogDebug("Edge {Source} -> {Sink}: routing sent message (type: {TypeName})", sourceId, sinkId, inputTypeName);
#pragma warning restore CA1848, CA1873
EnqueueMessage(messageQueues, sinkId, message, inputTypeName);
}
}
/// <summary>
/// Gets the final result for a message-driven workflow execution.
/// Checks yielded outputs first (from YieldOutputAsync calls), then falls back to executor results.
/// </summary>
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing event types.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing event types.")]
private static string GetMessageDrivenFinalResult(
Workflow workflow,
Dictionary<string, string> lastResults,
DurableWorkflowCustomStatus customStatus)
{
// First, check for yielded outputs from YieldOutputAsync calls (most recent first)
// These take priority as they represent explicit workflow outputs
string? yieldedOutput = GetLastYieldedOutput(customStatus);
if (!string.IsNullOrEmpty(yieldedOutput))
{
return yieldedOutput;
}
HashSet<string> outputExecutors = workflow.ReflectOutputExecutors();
// If specific output executors are defined, use their results
if (outputExecutors.Count > 0)
{
List<string> outputResults = [];
foreach (string outputExecutorId in outputExecutors)
{
if (lastResults.TryGetValue(outputExecutorId, out string? result) && !string.IsNullOrEmpty(result))
{
outputResults.Add(result);
}
}
if (outputResults.Count > 0)
{
return outputResults.Count == 1
? outputResults[0]
: string.Join("\n---\n", outputResults);
}
}
// Otherwise, return the last non-empty result from any executor
return lastResults.Values.LastOrDefault(v => !string.IsNullOrEmpty(v)) ?? string.Empty;
}
/// <summary>
/// Extracts the most recent yielded output from the custom status events.
/// </summary>
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing event types.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing event types.")]
private static string? GetLastYieldedOutput(DurableWorkflowCustomStatus customStatus)
{
// Look for DurableYieldedOutputEvent in events (most recent first)
for (int i = customStatus.Events.Count - 1; i >= 0; i--)
{
string eventJson = customStatus.Events[i];
if (eventJson.Contains("DurableYieldedOutputEvent", StringComparison.Ordinal))
{
try
{
// Parse the wrapper to get the inner Data
using JsonDocument doc = JsonDocument.Parse(eventJson);
if (doc.RootElement.TryGetProperty("Data", out JsonElement dataElement))
{
string? dataJson = dataElement.GetString();
if (dataJson is not null)
{
using JsonDocument dataDoc = JsonDocument.Parse(dataJson);
if (dataDoc.RootElement.TryGetProperty("Output", out JsonElement outputElement))
{
// The output could be a string or a serialized object
return outputElement.ValueKind == JsonValueKind.String
? outputElement.GetString()
: outputElement.GetRawText();
}
}
}
}
catch (JsonException)
{
// Continue to next event if parsing fails
}
}
}
return null;
}
/// <summary>
/// Unwraps an activity result, extracting state updates, events, sent messages, and returning the actual result.
/// </summary>
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing known wrapper type.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing known wrapper type.")]
private static string UnwrapActivityResult(string rawResult, DurableWorkflowCustomStatus customStatus, Dictionary<string, string> sharedState)
private static (string Result, List<SentMessageInfo> SentMessages) UnwrapActivityResult(
string rawResult,
DurableWorkflowCustomStatus customStatus,
Dictionary<string, string> sharedState)
{
if (string.IsNullOrEmpty(rawResult))
{
return rawResult;
return (rawResult, []);
}
try
@@ -261,9 +626,9 @@ public class DurableWorkflowRunner
// Try to deserialize as DurableActivityOutput
DurableActivityOutput? output = JsonSerializer.Deserialize<DurableActivityOutput>(rawResult);
// Check if this is actually a DurableActivityOutput (has Result property set or state updates)
// Check if this is actually a DurableActivityOutput (has Result property set or state updates or sent messages)
// This distinguishes it from other JSON objects that would deserialize with default/empty values
if (output is not null && (output.Result is not null || output.StateUpdates.Count > 0 || output.ClearedScopes.Count > 0 || output.Events.Count > 0))
if (output is not null && (output.Result is not null || output.StateUpdates.Count > 0 || output.ClearedScopes.Count > 0 || output.Events.Count > 0 || output.SentMessages.Count > 0))
{
// Apply cleared scopes first
foreach (string clearedScope in output.ClearedScopes)
@@ -298,7 +663,7 @@ public class DurableWorkflowRunner
customStatus.Events.AddRange(output.Events);
}
return output.Result ?? string.Empty;
return (output.Result ?? string.Empty, output.SentMessages);
}
}
catch (JsonException)
@@ -306,7 +671,7 @@ public class DurableWorkflowRunner
// Not a wrapped result, return as-is
}
return rawResult;
return (rawResult, []);
}
/// <summary>
@@ -364,7 +729,7 @@ public class DurableWorkflowRunner
}
/// <summary>
/// Wrapper for activity input that includes shared state.
/// Wrapper for activity input that includes shared state and type information.
/// </summary>
internal sealed class ActivityInputWithState
{
@@ -373,84 +738,17 @@ public class DurableWorkflowRunner
/// </summary>
public string? Input { get; set; }
/// <summary>
/// Gets or sets the assembly-qualified type name of the input, used for proper deserialization.
/// </summary>
public string? InputTypeName { get; set; }
/// <summary>
/// Gets or sets the shared state dictionary.
/// </summary>
public Dictionary<string, string> State { get; set; } = [];
}
/// <summary>
/// Filters executors based on their incoming edge conditions.
/// An executor is eligible if all its incoming edges have conditions that evaluate to true,
/// or if the edges have no conditions.
/// </summary>
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types registered at startup.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types registered at startup.")]
private static List<WorkflowExecutorInfo> GetEligibleExecutors(
List<WorkflowExecutorInfo> executors,
Dictionary<string, string> results,
WorkflowExecutionPlan plan,
ILogger logger)
{
List<WorkflowExecutorInfo> eligible = new(executors.Count);
foreach (WorkflowExecutorInfo executorInfo in executors)
{
if (!plan.Predecessors.TryGetValue(executorInfo.ExecutorId, out List<string>? predecessors) || predecessors.Count == 0)
{
// Root executor (no predecessors) is always eligible
eligible.Add(executorInfo);
continue;
}
// Check if any predecessor's edge condition allows this executor to run
bool isEligible = false;
foreach (string predecessorId in predecessors)
{
// Get the condition for this edge (predecessor -> current executor)
if (!plan.EdgeConditions.TryGetValue((predecessorId, executorInfo.ExecutorId), out Func<object?, bool>? condition) || condition is null)
{
// No condition registered for this edge or edge has no condition, always eligible
isEligible = true;
break;
}
// Evaluate the condition using the predecessor's result
if (results.TryGetValue(predecessorId, out string? predecessorResult))
{
try
{
// Get the predecessor's output type for proper deserialization
plan.ExecutorOutputTypes.TryGetValue(predecessorId, out Type? predecessorOutputType);
// Deserialize the predecessor result to the expected type for condition evaluation
object? resultObject = DeserializeForCondition(predecessorResult, predecessorOutputType);
if (condition(resultObject))
{
isEligible = true;
break;
}
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to evaluate condition for edge from '{PredecessorId}' to '{ExecutorId}'", predecessorId, executorInfo.ExecutorId);
}
}
}
if (isEligible)
{
eligible.Add(executorInfo);
}
else
{
logger.LogExecutorSkipped(executorInfo.ExecutorId);
}
}
return eligible;
}
/// <summary>
/// Deserializes a JSON string result into an object for condition evaluation.
/// </summary>
@@ -479,24 +777,13 @@ public class DurableWorkflowRunner
}
}
private async Task<(string Id, string Result)> ExecuteExecutorWithIdAsync(
TaskOrchestrationContext context,
WorkflowExecutorInfo executorInfo,
string input,
ILogger logger,
DurableWorkflowCustomStatus customStatus,
Dictionary<string, string> sharedState)
{
string result = await this.ExecuteExecutorAsync(context, executorInfo, input, logger, customStatus, sharedState).ConfigureAwait(true);
return (executorInfo.ExecutorId, result);
}
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Serializing known wrapper type.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Serializing known wrapper type.")]
private async Task<string> ExecuteExecutorAsync(
TaskOrchestrationContext context,
WorkflowExecutorInfo executorInfo,
string input,
string? inputTypeName,
ILogger logger,
DurableWorkflowCustomStatus customStatus,
Dictionary<string, string> sharedState)
@@ -512,10 +799,11 @@ public class DurableWorkflowRunner
string executorName = WorkflowNamingHelper.GetExecutorName(executorInfo.ExecutorId);
string triggerName = WorkflowNamingHelper.ToOrchestrationFunctionName(executorName);
// Wrap input with shared state for the activity
// Wrap input with shared state and type information for the activity
ActivityInputWithState inputWithState = new()
{
Input = input,
InputTypeName = inputTypeName,
State = new Dictionary<string, string>(sharedState) // Pass a copy of the state
};
@@ -580,54 +868,4 @@ public class DurableWorkflowRunner
AgentResponse response = await agent.RunAsync(input, thread);
return response.Text;
}
private static string GetExecutorInput(
string executorId,
string initialInput,
Dictionary<string, string> results,
WorkflowExecutionPlan plan)
{
if (!plan.Predecessors.TryGetValue(executorId, out List<string>? predecessors) || predecessors.Count == 0)
{
return initialInput;
}
if (predecessors.Count == 1)
{
return results.TryGetValue(predecessors[0], out string? result) ? result : initialInput;
}
List<string> aggregated = new(predecessors.Count);
foreach (string predecessorId in predecessors)
{
if (results.TryGetValue(predecessorId, out string? result))
{
aggregated.Add(result);
}
}
return SerializeToJson(aggregated);
}
private static string GetFinalResult(WorkflowExecutionPlan plan, Dictionary<string, string> results)
{
WorkflowExecutionLevel lastLevel = plan.Levels[^1];
List<WorkflowExecutorInfo> lastExecutors = lastLevel.Executors;
if (lastExecutors.Count == 1)
{
return results.TryGetValue(lastExecutors[0].ExecutorId, out string? singleResult) ? singleResult : string.Empty;
}
List<string> finalResults = new(lastExecutors.Count);
foreach (WorkflowExecutorInfo executor in lastExecutors)
{
if (results.TryGetValue(executor.ExecutorId, out string? result))
{
finalResults.Add(result);
}
}
return string.Join("\n---\n", finalResults);
}
}
@@ -201,7 +201,8 @@ public static class DurableWorkflowServiceCollectionExtensions
// Create executor instance from binding
Executor executor = await binding.FactoryAsync!("activity-run").ConfigureAwait(false);
Type inputType = executor.InputTypes.FirstOrDefault() ?? typeof(string);
// Determine the input type - prefer the provided type name, fall back to first supported type
Type inputType = ResolveInputType(inputWithState?.InputTypeName, executor.InputTypes);
object typedInput = DeserializeInput(executorInput, inputType);
// Create a pipeline context that has access to shared state and executor
@@ -213,18 +214,52 @@ public static class DurableWorkflowServiceCollectionExtensions
workflowContext,
CancellationToken.None).ConfigureAwait(false);
// Always return wrapped output with state updates, events, and result
// Always return wrapped output with state updates, events, sent messages, and result
DurableActivityOutput output = new()
{
Result = SerializeResult(result),
StateUpdates = workflowContext.StateUpdates,
ClearedScopes = [.. workflowContext.ClearedScopes],
Events = workflowContext.Events.ConvertAll(SerializeEvent)
Events = workflowContext.Events.ConvertAll(SerializeEvent),
SentMessages = workflowContext.SentMessages.ConvertAll(m => new SentMessageInfo { Message = m.Message, TypeName = m.TypeName })
};
return JsonSerializer.Serialize(output);
}
/// <summary>
/// Resolves the input type from the provided type name, or falls back to the first supported type.
/// </summary>
[UnconditionalSuppressMessage("Trimming", "IL2026:RequiresUnreferencedCode", Justification = "Type resolution for registered executor types.")]
[UnconditionalSuppressMessage("Trimming", "IL2057:TypeGetType", Justification = "Type resolution for registered executor types.")]
[UnconditionalSuppressMessage("AOT", "IL3050:RequiresDynamicCode", Justification = "Type resolution for registered executor types.")]
private static Type ResolveInputType(string? inputTypeName, ISet<Type> supportedTypes)
{
if (!string.IsNullOrEmpty(inputTypeName))
{
// Try to find a matching type in the supported types
foreach (Type supportedType in supportedTypes)
{
if (supportedType.AssemblyQualifiedName == inputTypeName ||
supportedType.FullName == inputTypeName ||
supportedType.Name == inputTypeName)
{
return supportedType;
}
}
// Try to load the type directly (for types not in supported types)
Type? resolvedType = Type.GetType(inputTypeName);
if (resolvedType is not null)
{
return resolvedType;
}
}
// Fall back to first supported type or string
return supportedTypes.FirstOrDefault() ?? typeof(string);
}
[UnconditionalSuppressMessage("Trimming", "IL2026:RequiresUnreferencedCode", Justification = "Deserializing known wrapper type.")]
[UnconditionalSuppressMessage("AOT", "IL3050:RequiresDynamicCode", Justification = "Deserializing known wrapper type.")]
private static DurableActivityInput? TryDeserializeActivityInput(string input)
@@ -356,6 +391,11 @@ internal sealed class DurableActivityInput
/// </summary>
public string? Input { get; set; }
/// <summary>
/// Gets or sets the assembly-qualified type name of the input, used for proper deserialization.
/// </summary>
public string? InputTypeName { get; set; }
/// <summary>
/// Gets or sets the shared state dictionary (scope-prefixed key -> serialized value).
/// </summary>
@@ -386,4 +426,26 @@ internal sealed class DurableActivityOutput
/// Gets or sets the serialized workflow events emitted during activity execution.
/// </summary>
public List<string> Events { get; set; } = [];
/// <summary>
/// Gets or sets messages sent via SendMessageAsync during activity execution.
/// Each entry is a tuple of (serializedMessage, typeName).
/// </summary>
public List<SentMessageInfo> SentMessages { get; set; } = [];
}
/// <summary>
/// Information about a message sent via SendMessageAsync.
/// </summary>
internal sealed class SentMessageInfo
{
/// <summary>
/// Gets or sets the serialized message content.
/// </summary>
public string? Message { get; set; }
/// <summary>
/// Gets or sets the full type name of the message.
/// </summary>
public string? TypeName { get; set; }
}
@@ -41,6 +41,12 @@ internal sealed class PipelineActivityContext : IWorkflowContext
/// </summary>
public HashSet<string> ClearedScopes { get; } = [];
/// <summary>
/// Gets the messages sent during activity execution via SendMessageAsync.
/// Each entry is (serializedMessage, typeName).
/// </summary>
public List<(string Message, string TypeName)> SentMessages { get; } = [];
/// <inheritdoc/>
public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default)
{
@@ -53,7 +59,20 @@ internal sealed class PipelineActivityContext : IWorkflowContext
}
/// <inheritdoc/>
public ValueTask SendMessageAsync(object message, string? targetId = null, CancellationToken cancellationToken = default) => default;
[UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Serializing workflow message types.")]
[UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Serializing workflow message types.")]
public ValueTask SendMessageAsync(object message, string? targetId = null, CancellationToken cancellationToken = default)
{
if (message is not null)
{
// Capture the message and its type for routing in the orchestrator
string serializedMessage = JsonSerializer.Serialize(message, message.GetType());
string typeName = message.GetType().FullName ?? message.GetType().Name;
this.SentMessages.Add((serializedMessage, typeName));
}
return default;
}
/// <inheritdoc/>
public ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default)
@@ -42,4 +42,20 @@ public sealed class WorkflowExecutionPlan
/// Gets whether this workflow has any Fan-In points.
/// </summary>
public bool HasFanIn => this.Levels.Any(l => l.IsFanIn);
/// <summary>
/// Gets or sets whether this workflow contains cycles requiring iterative message-driven execution.
/// </summary>
public bool HasCycles { get; set; }
/// <summary>
/// Gets the back-edges that create cycles in the workflow graph.
/// These edges are excluded from topological level computation but are followed during message-driven execution.
/// </summary>
public List<(string SourceId, string TargetId)> BackEdges { get; } = [];
/// <summary>
/// Gets or sets the starting executor ID for the workflow.
/// </summary>
public string StartExecutorId { get; set; } = string.Empty;
}
@@ -56,6 +56,7 @@ public static class WorkflowHelper
/// Analyzes the workflow and returns an execution plan that supports Fan-Out/Fan-In patterns.
/// Executors at the same level can be executed in parallel (Fan-Out).
/// Fan-In points are identified where multiple executors converge.
/// Cyclic workflows are detected and marked for message-driven execution.
/// </summary>
/// <param name="workflow">The workflow instance to analyze.</param>
/// <returns>An execution plan with parallel execution levels.</returns>
@@ -67,12 +68,14 @@ public static class WorkflowHelper
Dictionary<string, HashSet<EdgeInfo>> edges = workflow.ReflectEdges();
Dictionary<(string SourceId, string TargetId), Func<object?, bool>?> edgeConditions = workflow.ReflectEdgeConditions();
WorkflowExecutionPlan plan = new();
WorkflowExecutionPlan plan = new()
{
StartExecutorId = workflow.StartExecutorId
};
// Build adjacency lists (successors and predecessors)
Dictionary<string, List<string>> successors = new(executors.Count);
Dictionary<string, List<string>> predecessors = new(executors.Count);
int[] inDegree = new int[executors.Count];
Dictionary<string, int> executorIndex = new(executors.Count);
// Initialize all executors and extract their output types
@@ -97,16 +100,39 @@ public static class WorkflowHelper
{
foreach (string sinkId in edge.Connection.SinkIds)
{
if (executorIndex.TryGetValue(sinkId, out int sinkIdx))
if (executorIndex.ContainsKey(sinkId))
{
sourceSuccessors.Add(sinkId);
predecessors[sinkId].Add(sourceId);
inDegree[sinkIdx]++;
}
}
}
}
// Detect back-edges using DFS from start executor
HashSet<(string Source, string Target)> backEdges = DetectBackEdges(workflow.StartExecutorId, successors);
// Mark cycles in plan
plan.HasCycles = backEdges.Count > 0;
foreach ((string source, string target) in backEdges)
{
plan.BackEdges.Add((source, target));
}
// Calculate in-degrees, EXCLUDING back-edges
int[] inDegree = new int[executors.Count];
foreach (string executorId in executors.Keys)
{
foreach (string pred in predecessors[executorId])
{
// Only count edge if it's NOT a back-edge
if (!backEdges.Contains((pred, executorId)))
{
inDegree[executorIndex[executorId]]++;
}
}
}
// Store edge conditions in the plan
foreach (KeyValuePair<(string SourceId, string TargetId), Func<object?, bool>?> condition in edgeConditions)
{
@@ -150,15 +176,23 @@ public static class WorkflowHelper
RequestPort? requestPort = (executorBinding is RequestPortBinding rpb) ? rpb.Port : null;
levelExecutors.Add(new WorkflowExecutorInfo(executorId, isAgentic, requestPort));
// Check Fan-In for this executor
if (predecessors[executorId].Count > 1)
// Check Fan-In for this executor (excluding back-edges)
int nonBackEdgePredecessors = predecessors[executorId]
.Count(pred => !backEdges.Contains((pred, executorId)));
if (nonBackEdgePredecessors > 1)
{
isFanIn = true;
}
// Decrement in-degree of all successors and enqueue those ready for next level
// Decrement in-degree of all successors (excluding back-edges) and enqueue those ready for next level
foreach (string successor in successors[executorId])
{
// Skip back-edges for topological ordering
if (backEdges.Contains((executorId, successor)))
{
continue;
}
int successorIdx = executorIndex[successor];
if (--inDegree[successorIdx] == 0)
{
@@ -172,7 +206,7 @@ public static class WorkflowHelper
currentLevel = nextLevel;
}
// Handle cycle detection: if not all executors were processed, there's a cycle
// Handle any remaining executors not processed (shouldn't happen if back-edge detection is correct)
if (processedCount < executors.Count)
{
List<WorkflowExecutorInfo> remainingExecutors = [];
@@ -196,6 +230,60 @@ public static class WorkflowHelper
return plan;
}
/// <summary>
/// Detects back-edges in the graph using DFS.
/// A back-edge is an edge that points to an ancestor in the DFS tree (creates a cycle).
/// </summary>
/// <param name="startId">The starting executor ID for DFS traversal.</param>
/// <param name="successors">The adjacency list mapping each executor to its successors.</param>
/// <returns>A set of back-edges as (source, target) tuples.</returns>
private static HashSet<(string Source, string Target)> DetectBackEdges(
string startId,
Dictionary<string, List<string>> successors)
{
HashSet<(string, string)> backEdges = [];
HashSet<string> visited = [];
HashSet<string> inStack = []; // Nodes in current DFS path
void Dfs(string nodeId)
{
visited.Add(nodeId);
inStack.Add(nodeId);
if (successors.TryGetValue(nodeId, out List<string>? neighbors))
{
foreach (string neighbor in neighbors)
{
if (inStack.Contains(neighbor))
{
// Edge to ancestor in current path = back-edge (creates cycle)
backEdges.Add((nodeId, neighbor));
}
else if (!visited.Contains(neighbor))
{
Dfs(neighbor);
}
}
}
inStack.Remove(nodeId);
}
// Start DFS from the workflow's start executor
Dfs(startId);
// Handle any disconnected components (shouldn't happen in valid workflows, but for safety)
foreach (string nodeId in successors.Keys)
{
if (!visited.Contains(nodeId))
{
Dfs(nodeId);
}
}
return backEdges;
}
/// <summary>
/// Determines whether the specified executor type is an agentic executor.
/// </summary>
@@ -45,7 +45,6 @@ public class Workflow
/// <returns>A dictionary where each key is a tuple containing the source and target node identifiers, and each value is a
/// delegate that evaluates the condition for the corresponding edge. The value is <see langword="null"/> if the
/// edge has no associated condition.</returns>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1024:Use properties where appropriate", Justification = "<Pending>")]
public Dictionary<(string SourceId, string TargetId), Func<object?, bool>?> ReflectEdgeConditions()
{
int capacity = 0;