mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
c74b1b08eb
* Fix race condition issue in FanInEdge while processing messages. * refactored to limit the code segment under lock. * Remove extra materialization of the result. * Added comment to clarify future changes if process message is made async.
243 lines
11 KiB
C#
243 lines
11 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using FluentAssertions;
|
|
using Microsoft.Agents.AI.Workflows.Execution;
|
|
|
|
namespace Microsoft.Agents.AI.Workflows.UnitTests;
|
|
|
|
public class EdgeRunnerTests
|
|
{
|
|
private static async Task CreateAndRunDirectedEdgeTestAsync(bool? conditionMatch = null, bool? targetMatch = null)
|
|
{
|
|
const string MessageVariant1 = "test";
|
|
const string MessageVariant2 = "something else";
|
|
|
|
Func<object?, bool>? condition
|
|
= conditionMatch.HasValue
|
|
? message => message is string value && value.Equals(conditionMatch.Value
|
|
? MessageVariant1
|
|
: MessageVariant2, StringComparison.Ordinal)
|
|
: null;
|
|
|
|
string? targetId
|
|
= targetMatch.HasValue
|
|
? (targetMatch.Value ? "executor2" : "executor1")
|
|
: null;
|
|
|
|
TestRunContext runContext = new();
|
|
runContext.ConfigureExecutors(
|
|
[
|
|
new ForwardMessageExecutor<string>("executor1"),
|
|
new ForwardMessageExecutor<string>("executor2")
|
|
]);
|
|
|
|
DirectEdgeData edgeData = new("executor1", "executor2", new EdgeId(0), condition);
|
|
DirectEdgeRunner runner = new(runContext, edgeData);
|
|
|
|
MessageEnvelope envelope = new(MessageVariant1, "executor1", targetId: targetId);
|
|
|
|
DeliveryMapping? mapping = await runner.ChaseEdgeAsync(envelope, stepTracer: null, CancellationToken.None);
|
|
|
|
bool expectMessage = (!conditionMatch.HasValue || conditionMatch.Value)
|
|
&& (!targetMatch.HasValue || targetMatch.Value);
|
|
|
|
if (expectMessage)
|
|
{
|
|
mapping.Should().NotBeNull();
|
|
mapping.CheckDeliveries(["executor2"], [MessageVariant1]);
|
|
}
|
|
else
|
|
{
|
|
mapping.Should().BeNull();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_DirectEdgeRunnerAsync()
|
|
{
|
|
// Test matrix:
|
|
// NoCondition vs Condition(=> true) vs Condition(=> false)
|
|
// Untargeted vs Targeted(matching) vs Targeted(not matching)
|
|
|
|
await CreateAndRunDirectedEdgeTestAsync(); // NoCondition, Untargeted
|
|
|
|
await CreateAndRunDirectedEdgeTestAsync(targetMatch: true); // NoCondition, Targeted
|
|
await CreateAndRunDirectedEdgeTestAsync(targetMatch: false); // NoCondition, Targeted(not matching)
|
|
|
|
await CreateAndRunDirectedEdgeTestAsync(conditionMatch: true); // Condition(=> true), Untargeted
|
|
await CreateAndRunDirectedEdgeTestAsync(conditionMatch: false); // Condition(=> false), Untargeted
|
|
|
|
await CreateAndRunDirectedEdgeTestAsync(conditionMatch: true, targetMatch: true); // Condition(=> true), Targeted(matching)
|
|
await CreateAndRunDirectedEdgeTestAsync(conditionMatch: true, targetMatch: false); // Condition(=> true), Targeted(not matching)
|
|
await CreateAndRunDirectedEdgeTestAsync(conditionMatch: false, targetMatch: true); // Condition(=> false), Targeted(matching)
|
|
await CreateAndRunDirectedEdgeTestAsync(conditionMatch: false, targetMatch: false); // Condition(=> false), Targeted(not matching)
|
|
}
|
|
|
|
private static async Task CreateAndRunFanOutEdgeTestAsync(bool? assignerSelectsEmpty = null, bool? targetMatch = null)
|
|
{
|
|
TestRunContext runContext = new();
|
|
|
|
runContext.ConfigureExecutors([
|
|
new ForwardMessageExecutor<string>("executor1"),
|
|
new ForwardMessageExecutor<string>("executor2"),
|
|
new ForwardMessageExecutor<string>("executor3")
|
|
]);
|
|
|
|
Func<object?, int, IEnumerable<int>>? assigner
|
|
= assignerSelectsEmpty.HasValue
|
|
? (message, count) => assignerSelectsEmpty.Value ? [] : [0]
|
|
: null;
|
|
|
|
string? targetId
|
|
= targetMatch.HasValue
|
|
? (targetMatch.Value ? "executor2" : "executor1")
|
|
: null;
|
|
|
|
FanOutEdgeData edgeData = new("executor1", ["executor2", "executor3"], new EdgeId(0), assigner);
|
|
FanOutEdgeRunner runner = new(runContext, edgeData);
|
|
|
|
MessageEnvelope envelope = new("test", "executor1", targetId: targetId);
|
|
|
|
DeliveryMapping? mapping = await runner.ChaseEdgeAsync(envelope, stepTracer: null, CancellationToken.None);
|
|
|
|
bool expectForwardFrom2 = (!assignerSelectsEmpty.HasValue || !assignerSelectsEmpty.Value)
|
|
&& (!targetMatch.HasValue || targetMatch.Value);
|
|
bool expectForwardFrom3 = !assignerSelectsEmpty.HasValue && !targetMatch.HasValue; // if there is a target, it is never executor3
|
|
|
|
HashSet<string> expectedReceivers = [];
|
|
if (expectForwardFrom2)
|
|
{
|
|
expectedReceivers.Add("executor2");
|
|
}
|
|
|
|
if (expectForwardFrom3)
|
|
{
|
|
expectedReceivers.Add("executor3");
|
|
}
|
|
|
|
if (!expectForwardFrom2 && !expectForwardFrom3)
|
|
{
|
|
mapping.Should().BeNull();
|
|
}
|
|
else
|
|
{
|
|
mapping.Should().NotBeNull();
|
|
mapping.CheckDeliveries(expectedReceivers, ["test"]);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_FanOutEdgeRunnerAsync()
|
|
{
|
|
// Test matrix:
|
|
// NoAssigned vs Assigner(includes output) vs Assigner(does not include output)
|
|
// Untargeted vs Targeted(matching) vs Targeted(not matching)
|
|
|
|
await CreateAndRunFanOutEdgeTestAsync(); // NoAssigner, Untargeted
|
|
|
|
await CreateAndRunFanOutEdgeTestAsync(targetMatch: true); // NoAssigner, Targeted(matching)
|
|
await CreateAndRunFanOutEdgeTestAsync(targetMatch: false); // NoAssigner, Targeted(not matching)
|
|
|
|
await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: false); // Assigner(includes output), Untargeted
|
|
await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: true); // Assigner(does not include output), Untargeted
|
|
|
|
await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: false, targetMatch: true); // Assigner(includes output), Targeted(matching)
|
|
await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: false, targetMatch: false); // Assigner(includes output), Targeted(not matching)
|
|
await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: true, targetMatch: true); // Assigner(does not include output), Targeted(matching)
|
|
await CreateAndRunFanOutEdgeTestAsync(assignerSelectsEmpty: true, targetMatch: false); // Assigner(does not include output), Targeted(not matching)
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_FanInEdgeRunnerAsync()
|
|
{
|
|
TestRunContext runContext = new();
|
|
runContext.ConfigureExecutors([
|
|
new ForwardMessageExecutor<string>("executor1"),
|
|
new ForwardMessageExecutor<string>("executor2"),
|
|
new ForwardMessageExecutor<string>("executor3")
|
|
]);
|
|
|
|
FanInEdgeData edgeData = new(["executor1", "executor2"], "executor3", new EdgeId(0), null);
|
|
FanInEdgeRunner runner = new(runContext, edgeData);
|
|
|
|
// Step 1: Send message from executor1, should not forward yet.
|
|
// Step 2: Send targeted message to executor1 from executor2, should not forward
|
|
// Step 3: Send message from executor1, should not forward yet.
|
|
// Step 4: Send message from executor2, should forward now.
|
|
|
|
await RunIterationAsync();
|
|
|
|
// Repeat the same sequence, to ensure state is properly reset inside of FanInEdgeState.
|
|
runContext.QueuedMessages.Clear();
|
|
await RunIterationAsync();
|
|
|
|
async ValueTask RunIterationAsync()
|
|
{
|
|
//await runner.ChaseAsync("executor1", new("part1"), state, tracer: null);
|
|
//MessageDeliveryValidation.CheckForwarded(runContext.QueuedMessages);
|
|
DeliveryMapping? mapping = await runner.ChaseEdgeAsync(new("part1", "executor1"), stepTracer: null, CancellationToken.None);
|
|
mapping.Should().BeNull();
|
|
|
|
//await runner.ChaseAsync("executor2", new("part-for-1", targetId: "executor1"), state, tracer: null);
|
|
//MessageDeliveryValidation.CheckForwarded(runContext.QueuedMessages);
|
|
mapping = await runner.ChaseEdgeAsync(new("part-for-1", "executor2", targetId: "executor1"), stepTracer: null, CancellationToken.None);
|
|
mapping.Should().BeNull();
|
|
|
|
//await runner.ChaseAsync("executor1", new("part2", targetId: "executor3"), state, tracer: null);
|
|
//MessageDeliveryValidation.CheckForwarded(runContext.QueuedMessages);
|
|
mapping = await runner.ChaseEdgeAsync(new("part2", "executor1", targetId: "executor3"), stepTracer: null, CancellationToken.None);
|
|
mapping.Should().BeNull();
|
|
|
|
//await runner.ChaseAsync("executor2", new("final part"), state, tracer: null);
|
|
//MessageDeliveryValidation.CheckForwarded(runContext.QueuedMessages, ("executor3", ["part1", "part2", "final part"]));
|
|
mapping = await runner.ChaseEdgeAsync(new("final part", "executor2"), stepTracer: null, CancellationToken.None);
|
|
mapping.Should().NotBeNull();
|
|
mapping.CheckDeliveries(["executor3"], ["part1", "part2", "final part"]);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_FanInEdgeRunner_ConcurrentProcessingAsync()
|
|
{
|
|
// Arrange
|
|
const int SourceCount = 4;
|
|
const int Iterations = 50;
|
|
|
|
string[] sourceIds = Enumerable.Range(0, SourceCount).Select(i => $"source{i}").ToArray();
|
|
const string SinkId = "sink";
|
|
|
|
TestRunContext runContext = new();
|
|
List<Executor> executors = [.. sourceIds.Select(id => (Executor)new ForwardMessageExecutor<string>(id)), new ForwardMessageExecutor<string>(SinkId)];
|
|
runContext.ConfigureExecutors(executors);
|
|
|
|
FanInEdgeData edgeData = new(sourceIds.ToList(), SinkId, new EdgeId(0), null);
|
|
FanInEdgeRunner runner = new(runContext, edgeData);
|
|
|
|
for (int iteration = 0; iteration < Iterations; iteration++)
|
|
{
|
|
// Act: send messages from all sources concurrently
|
|
using Barrier barrier = new(SourceCount);
|
|
Task<DeliveryMapping?>[] tasks = sourceIds.Select(sourceId => Task.Run(async () =>
|
|
{
|
|
barrier.SignalAndWait();
|
|
return await runner.ChaseEdgeAsync(new($"msg-from-{sourceId}", sourceId), stepTracer: null, CancellationToken.None);
|
|
})).ToArray();
|
|
|
|
DeliveryMapping?[] results = await Task.WhenAll(tasks);
|
|
|
|
// Assert: exactly one task should return a non-null mapping with all messages
|
|
DeliveryMapping?[] nonNullResults = results.Where(r => r is not null).ToArray();
|
|
nonNullResults.Should().HaveCount(1, $"iteration {iteration}: exactly one thread should release the batch");
|
|
|
|
DeliveryMapping mapping = nonNullResults[0]!;
|
|
HashSet<object> expectedMessages = [.. sourceIds.Select(id => (object)$"msg-from-{id}")];
|
|
mapping.CheckDeliveries([SinkId], expectedMessages);
|
|
}
|
|
}
|
|
}
|