// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using FluentAssertions; using Microsoft.Agents.AI.Workflows.Execution; namespace Microsoft.Agents.AI.Workflows.UnitTests; public class EdgeRunnerTests { private static async Task CreateAndRunDirectedEdgeTestAsync(bool? conditionMatch = null, bool? targetMatch = null) { const string MessageVariant1 = "test"; const string MessageVariant2 = "something else"; Func? condition = conditionMatch.HasValue ? message => message is string value && value.Equals(conditionMatch.Value ? MessageVariant1 : MessageVariant2, StringComparison.Ordinal) : null; string? targetId = targetMatch.HasValue ? (targetMatch.Value ? "executor2" : "executor1") : null; TestRunContext runContext = new(); runContext.ConfigureExecutors( [ new ForwardMessageExecutor("executor1"), new ForwardMessageExecutor("executor2") ]); DirectEdgeData edgeData = new("executor1", "executor2", new EdgeId(0), condition); DirectEdgeRunner runner = new(runContext, edgeData); MessageEnvelope envelope = new(MessageVariant1, "executor1", targetId: targetId); DeliveryMapping? mapping = await runner.ChaseEdgeAsync(envelope, stepTracer: null, CancellationToken.None); bool expectMessage = (!conditionMatch.HasValue || conditionMatch.Value) && (!targetMatch.HasValue || targetMatch.Value); if (expectMessage) { mapping.Should().NotBeNull(); mapping.CheckDeliveries(["executor2"], [MessageVariant1]); } else { mapping.Should().BeNull(); } } [Fact] public async Task Test_DirectEdgeRunnerAsync() { // Test matrix: // NoCondition vs Condition(=> true) vs Condition(=> false) // Untargeted vs Targeted(matching) vs Targeted(not matching) await CreateAndRunDirectedEdgeTestAsync(); // NoCondition, Untargeted await CreateAndRunDirectedEdgeTestAsync(targetMatch: true); // NoCondition, Targeted await CreateAndRunDirectedEdgeTestAsync(targetMatch: false); // NoCondition, Targeted(not matching) await CreateAndRunDirectedEdgeTestAsync(conditionMatch: true); // Condition(=> true), Untargeted await CreateAndRunDirectedEdgeTestAsync(conditionMatch: false); // Condition(=> false), Untargeted await CreateAndRunDirectedEdgeTestAsync(conditionMatch: true, targetMatch: true); // Condition(=> true), Targeted(matching) await CreateAndRunDirectedEdgeTestAsync(conditionMatch: true, targetMatch: false); // Condition(=> true), Targeted(not matching) await CreateAndRunDirectedEdgeTestAsync(conditionMatch: false, targetMatch: true); // Condition(=> false), Targeted(matching) await CreateAndRunDirectedEdgeTestAsync(conditionMatch: false, targetMatch: false); // Condition(=> false), Targeted(not matching) } private static async Task CreateAndRunFanOutEdgeTestAsync(bool? assignerSelectsEmpty = null, bool? targetMatch = null) { TestRunContext runContext = new(); runContext.ConfigureExecutors([ new ForwardMessageExecutor("executor1"), new ForwardMessageExecutor("executor2"), new ForwardMessageExecutor("executor3") ]); Func>? assigner = assignerSelectsEmpty.HasValue ? (message, count) => assignerSelectsEmpty.Value ? [] : [0] : null; string? targetId = targetMatch.HasValue ? (targetMatch.Value ? "executor2" : "executor1") : null; FanOutEdgeData edgeData = new("executor1", ["executor2", "executor3"], new EdgeId(0), assigner); FanOutEdgeRunner runner = new(runContext, edgeData); MessageEnvelope envelope = new("test", "executor1", targetId: targetId); DeliveryMapping? mapping = await runner.ChaseEdgeAsync(envelope, stepTracer: null, CancellationToken.None); bool expectForwardFrom2 = (!assignerSelectsEmpty.HasValue || !assignerSelectsEmpty.Value) && (!targetMatch.HasValue || targetMatch.Value); bool expectForwardFrom3 = !assignerSelectsEmpty.HasValue && !targetMatch.HasValue; // if there is a target, it is never executor3 HashSet expectedReceivers = []; if (expectForwardFrom2) { expectedReceivers.Add("executor2"); } if (expectForwardFrom3) { expectedReceivers.Add("executor3"); } if (!expectForwardFrom2 && !expectForwardFrom3) { mapping.Should().BeNull(); } else { mapping.Should().NotBeNull(); mapping.CheckDeliveries(expectedReceivers, ["test"]); } } [Fact] public async Task Test_FanOutEdgeRunnerAsync() { // Test matrix: // NoAssigned vs Assigner(includes output) vs Assigner(does not include output) // Untargeted vs Targeted(matching) vs Targeted(not matching) await CreateAndRunFanOutEdgeTestAsync(); // NoAssigner, Untargeted await CreateAndRunFanOutEdgeTestAsync(targetMatch: true); // NoAssigner, Targeted(matching) await CreateAndRunFanOutEdgeTestAsync(targetMatch: false); // NoAssigner, Targeted(not matching) await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: false); // Assigner(includes output), Untargeted await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: true); // Assigner(does not include output), Untargeted await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: false, targetMatch: true); // Assigner(includes output), Targeted(matching) await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: false, targetMatch: false); // Assigner(includes output), Targeted(not matching) await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: true, targetMatch: true); // Assigner(does not include output), Targeted(matching) await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: true, targetMatch: false); // Assigner(does not include output), Targeted(not matching) } [Fact] public async Task Test_FanInEdgeRunnerAsync() { TestRunContext runContext = new(); runContext.ConfigureExecutors([ new ForwardMessageExecutor("executor1"), new ForwardMessageExecutor("executor2"), new ForwardMessageExecutor("executor3") ]); FanInEdgeData edgeData = new(["executor1", "executor2"], "executor3", new EdgeId(0), null); FanInEdgeRunner runner = new(runContext, edgeData); // Step 1: Send message from executor1, should not forward yet. // Step 2: Send targeted message to executor1 from executor2, should not forward // Step 3: Send message from executor1, should not forward yet. // Step 4: Send message from executor2, should forward now. await RunIterationAsync(); // Repeat the same sequence, to ensure state is properly reset inside of FanInEdgeState. runContext.QueuedMessages.Clear(); await RunIterationAsync(); async ValueTask RunIterationAsync() { //await runner.ChaseAsync("executor1", new("part1"), state, tracer: null); //MessageDeliveryValidation.CheckForwarded(runContext.QueuedMessages); DeliveryMapping? mapping = await runner.ChaseEdgeAsync(new("part1", "executor1"), stepTracer: null, CancellationToken.None); mapping.Should().BeNull(); //await runner.ChaseAsync("executor2", new("part-for-1", targetId: "executor1"), state, tracer: null); //MessageDeliveryValidation.CheckForwarded(runContext.QueuedMessages); mapping = await runner.ChaseEdgeAsync(new("part-for-1", "executor2", targetId: "executor1"), stepTracer: null, CancellationToken.None); mapping.Should().BeNull(); //await runner.ChaseAsync("executor1", new("part2", targetId: "executor3"), state, tracer: null); //MessageDeliveryValidation.CheckForwarded(runContext.QueuedMessages); mapping = await runner.ChaseEdgeAsync(new("part2", "executor1", targetId: "executor3"), stepTracer: null, CancellationToken.None); mapping.Should().BeNull(); //await runner.ChaseAsync("executor2", new("final part"), state, tracer: null); //MessageDeliveryValidation.CheckForwarded(runContext.QueuedMessages, ("executor3", ["part1", "part2", "final part"])); mapping = await runner.ChaseEdgeAsync(new("final part", "executor2"), stepTracer: null, CancellationToken.None); mapping.Should().NotBeNull(); mapping.CheckDeliveries(["executor3"], ["part1", "part2", "final part"]); } } [Fact] public async Task Test_FanInEdgeRunner_ConcurrentProcessingAsync() { // Arrange const int SourceCount = 4; const int Iterations = 50; string[] sourceIds = Enumerable.Range(0, SourceCount).Select(i => $"source{i}").ToArray(); const string SinkId = "sink"; TestRunContext runContext = new(); List executors = [.. sourceIds.Select(id => (Executor)new ForwardMessageExecutor(id)), new ForwardMessageExecutor(SinkId)]; runContext.ConfigureExecutors(executors); FanInEdgeData edgeData = new(sourceIds.ToList(), SinkId, new EdgeId(0), null); FanInEdgeRunner runner = new(runContext, edgeData); for (int iteration = 0; iteration < Iterations; iteration++) { // Act: send messages from all sources concurrently using Barrier barrier = new(SourceCount); Task[] tasks = sourceIds.Select(sourceId => Task.Run(async () => { barrier.SignalAndWait(); return await runner.ChaseEdgeAsync(new($"msg-from-{sourceId}", sourceId), stepTracer: null, CancellationToken.None); })).ToArray(); DeliveryMapping?[] results = await Task.WhenAll(tasks); // Assert: exactly one task should return a non-null mapping with all messages DeliveryMapping?[] nonNullResults = results.Where(r => r is not null).ToArray(); nonNullResults.Should().HaveCount(1, $"iteration {iteration}: exactly one thread should release the batch"); DeliveryMapping mapping = nonNullResults[0]!; HashSet expectedMessages = [.. sourceIds.Select(id => (object)$"msg-from-{id}")]; mapping.CheckDeliveries([SinkId], expectedMessages); } } }