Files
Peter Ibekwe 69adf6d97e .NET: Fix off-thread RunStatus race where GetStatusAsync can return Running after ResumeAsync halts (#5412)
* Fix off-thread RunStatus race where GetStatusAsync can return Running after ResumeAsync halts

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Simplify test comment.

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-04-23 15:27:28 +00:00

583 lines
23 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.InProc;
using Microsoft.Agents.AI.Workflows.Sample;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
internal enum ExecutionEnvironment
{
InProcess_Lockstep,
InProcess_OffThread,
InProcess_Concurrent
}
public class SampleSmokeTest
{
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step1Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
await Step1EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
const string INPUT = "Hello, World!";
Assert.Collection(lines,
line => Assert.Contains($"UppercaseExecutor: {INPUT.ToUpperInvariant()}", line),
line => Assert.Contains($"ReverseTextExecutor: {new string(INPUT.ToUpperInvariant().Reverse().ToArray())}", line)
);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step1aAsync(ExecutionEnvironment environment)
{
using StringWriter writer = new();
await Step1aEntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
const string INPUT = "Hello, World!";
Assert.Collection(lines,
line => Assert.Contains($"UppercaseExecutor: {INPUT.ToUpperInvariant()}", line),
line => Assert.Contains($"ReverseTextExecutor: {string.Concat(INPUT.ToUpperInvariant().Reverse())}", line)
);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step2Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
string spamResult = await Step2EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
Assert.Equal(RemoveSpamExecutor.ActionResult, spamResult);
string nonSpamResult = await Step2EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), "This is a valid message.");
Assert.Equal(RespondToMessageExecutor.ActionResult, nonSpamResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step3Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
string guessResult = await Step3EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
Assert.Equal("Guessed the number: 42", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step4Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
VerifyingPlaybackResponder<string, int> responder = new(
("Guess the number.", 50),
("Your guess was too high. Try again.", 23),
("Your guess was too low. Try again.", 42));
string guessResult = await Step4EntryPoint.RunAsync(writer, userGuessCallback: responder.InvokeNext, environment.ToWorkflowExecutionEnvironment());
Assert.Equal("You guessed correctly! You Win!", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step5Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
VerifyingPlaybackResponder<string, int> responder = new(
// Iteration 1
("Guess the number.", 50),
("Your guess was too high. Try again.", 23),
// Iteration 2
("Your guess was too high. Try again.", 23),
("Your guess was too low. Try again.", 42)
);
string guessResult = await Step5EntryPoint.RunAsync(writer, userGuessCallback: responder.InvokeNext, environment.ToWorkflowExecutionEnvironment());
Assert.Equal("You guessed correctly! You Win!", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step5aAsync(ExecutionEnvironment environment)
{
using StringWriter writer = new();
VerifyingPlaybackResponder<string, int> responder = new(
// Iteration 1
("Guess the number.", 50),
("Your guess was too high. Try again.", 23),
// Iteration 2
("Your guess was too high. Try again.", 23),
("Your guess was too low. Try again.", 42)
);
string guessResult = await Step5EntryPoint.RunAsync(writer, userGuessCallback: responder.InvokeNext, environment.ToWorkflowExecutionEnvironment(), rehydrateToRestore: true);
Assert.Equal("You guessed correctly! You Win!", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step5bAsync(ExecutionEnvironment environment)
{
using StringWriter writer = new();
VerifyingPlaybackResponder<string, int> responder = new(
// Iteration 1
("Guess the number.", 50),
("Your guess was too high. Try again.", 23),
// Iteration 2
("Your guess was too high. Try again.", 23),
("Your guess was too low. Try again.", 42)
);
JsonSerializerOptions options = new(SampleJsonContext.Default.Options);
options.MakeReadOnly();
CheckpointManager memoryJsonManager = CheckpointManager.CreateJson(new InMemoryJsonStore(), options);
string guessResult = await Step5EntryPoint.RunAsync(writer, userGuessCallback: responder.InvokeNext, environment.ToWorkflowExecutionEnvironment(), rehydrateToRestore: true, checkpointManager: memoryJsonManager);
Assert.Equal("You guessed correctly! You Win!", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step6Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
await Step6EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
Assert.Collection(lines,
line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line),
line => Assert.Contains($"{Step6EntryPoint.EchoAgentId}: {Step6EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line)
);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step7Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
await Step7EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
Assert.Collection(lines,
line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line),
line => Assert.Contains($"{Step7EntryPoint.EchoAgentId}: {Step7EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line),
line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line),
line => Assert.Contains($"{Step7EntryPoint.EchoAgentId}: {Step7EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line)
);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step8Async(ExecutionEnvironment environment)
{
List<string> textsToProcess = [
"Hello world! This is a simple test.",
"Python is a powerful programming language used for many applications.",
"Short text.",
"This is a longer text with multiple sentences. It contains more words and characters. We use it to test our text processing workflow.",
"",
" Spaces around text ",
];
using StringWriter writer = new();
List<TextProcessingResult> results = await Step8EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), textsToProcess);
Assert.Equal(textsToProcess.Count, results.Count);
Assert.Collection(results,
textsToProcess.Select(CreateValidator).ToArray());
Action<TextProcessingResult> CreateValidator(string textToProcess, int index)
{
return result =>
{
TextProcessingResult expected = new(
TaskId: $"Task{index}",
Text: textToProcess,
WordCount: textToProcess.Split([' '], StringSplitOptions.RemoveEmptyEntries).Length,
ChatCount: textToProcess.Length
);
result.Should().Be(expected);
};
}
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step9Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
_ = await Step9EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
}
/// <summary>
/// Stress regression for the off-thread run-status race: after
/// <c>Run.ResumeAsync</c> returns at a halt boundary,
/// callers must observe a stable terminal status and never a transient
/// <see cref="RunStatus.Running"/>. Step9 is the canonical multi-response resume
/// sample; prior to the fix in <see cref="Execution.StreamingRunEventStream"/>,
/// its `runStatus.Should().Be(RunStatus.Idle)` assertion failed intermittently
/// on roughly 1-in-10 iterations under InProcess_OffThread.
/// </summary>
[Fact]
internal async Task Test_RunSample_Step9_OffThread_MultiResponseResume_StatusIsStableAsync()
{
const int Iterations = 50;
for (int i = 0; i < Iterations; i++)
{
using StringWriter writer = new();
_ = await Step9EntryPoint.RunAsync(
writer,
ExecutionEnvironment.InProcess_OffThread.ToWorkflowExecutionEnvironment());
}
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step10Async(ExecutionEnvironment environment)
{
List<string> inputs = ["1", "2", "3"];
using StringWriter writer = new();
await Step10EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs);
string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries);
Assert.Collection(lines,
inputs.Select(CreateValidator).ToArray());
Action<string> CreateValidator(string expected) => actual => actual.Should().Be($"Echo: {expected}");
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step11Async(ExecutionEnvironment environment)
{
List<string> inputs = ["1", "2", "3"];
using StringWriter writer = new();
await Step11EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs);
string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries);
Array.Sort(lines, StringComparer.OrdinalIgnoreCase);
string[] expected = Enumerable.Range(1, Step11EntryPoint.AgentCount)
.SelectMany(agentNumber => inputs.Select(input => Step11EntryPoint.ExpectedOutputForInput(input, agentNumber)))
.ToArray();
Array.Sort(expected, StringComparer.OrdinalIgnoreCase);
Assert.Collection(lines,
expected.Select(CreateValidator).ToArray());
Action<string> CreateValidator(string expected) => actual => actual.Should().Be(expected);
}
public class Step12ExpectedOutputCalculator(int agentCount)
{
private readonly int[] _bookmarks = new int[agentCount];
private readonly List<string> _history = new();
private readonly HashSet<int> _skipIndices = new();
public IEnumerable<string> ExpectedOutputs =>
this._history.Where((element, index) => !this._skipIndices.Contains(index));
public void ProcessInput(string newInput)
{
this._skipIndices.Add(this._history.Count);
this._history.Add(newInput);
for (int i = 0; i < agentCount; i++)
{
int agentId = i + 1;
int agentBookmark = this._bookmarks[i];
int count = this._history.Count - agentBookmark;
count.Should().BeGreaterThanOrEqualTo(0);
foreach (string input in this._history.Skip(agentBookmark).ToList())
{
this._history.Add($"{agentId}:{input}");
}
this._bookmarks[i] = this._history.Count;
}
}
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step12Async(ExecutionEnvironment environment)
{
List<string> inputs = ["1", "2", "3"];
// The expectation is that each agent will echo each input along with every echo from previous agents
// E.g.:
// (user): 1
// ----- outputs below
// (a1): 1:1
// (a2): 2:1
// (a2): 2:1:1
// If there were three agents, it would then be followed by:
// (a3): 3:1
// (a3): 3:1:1
// (a3): 3:2:1
// (a3): 3:2:1:1
// If there are multiple inputs (there are), then each successive input adds to the depth of the previous
// ones, so, for example, once we do input = "1", "2":
// (user): 1
// (a1): 1:1 <- a1 "last seen"
// (a2): 2:1
// (a2): 2:1:1 <- a2 "last seen"
// (user): 2
// ----- outputs below
// (a1): 1:2:1
// (a1): 1:2:1:1
// (a1): 1:2 <- from user input, a1 "last seen"
// (a2): 2:2 <- from user input (note that a2 seems like it is seeing these in a different "order" than a1 - but it is not)
// (a2): 2:1:2:1
// (a2): 2:1:2:1:1
// (a2): 2:1:2 <- from a1's first echo, a2 "last seen"
Step12ExpectedOutputCalculator outputGenerator = new(Step12EntryPoint.AgentCount);
foreach (string input in inputs)
{
outputGenerator.ProcessInput(input);
}
string[] expected = outputGenerator.ExpectedOutputs.ToArray();
using StringWriter writer = new();
await Step12EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs);
string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries);
Console.Error.WriteLine("Expected lines: ");
foreach (string expectedLine in expected)
{
Console.Error.WriteLine($"\t{expectedLine}");
}
Console.Error.WriteLine("Actual lines: ");
foreach (string line in lines)
{
Console.Error.WriteLine($"\t{line}");
}
Assert.Collection(lines,
expected.Select(CreateValidator).ToArray());
Action<string> CreateValidator(string expected) => actual => actual.Should().Be(expected);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step13Async(ExecutionEnvironment environment)
{
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment().WithCheckpointing(checkpointManager);
CheckpointInfo? resumeFrom = null;
await RunAndValidateAsync(1);
// this should crash before fix
await RunAndValidateAsync(2);
async ValueTask RunAndValidateAsync(int step)
{
using StringWriter writer = new();
string input = $"[{step}] Hello, World!";
resumeFrom = await Step13EntryPoint.RunAsync(writer, input, executionEnvironment, resumeFrom);
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
const string ExpectedSource = "EchoSubworkflow";
Assert.Collection(lines,
line => Assert.Contains($"{ExpectedSource}: {input}", line)
);
}
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step13aAsync(ExecutionEnvironment environment)
{
IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment();
AgentSession? session = null;
await RunAndValidateAsync(1);
// this should crash before fix
await RunAndValidateAsync(2);
async ValueTask RunAndValidateAsync(int step)
{
using StringWriter writer = new();
string input = $"[{step}] Hello, World!";
session = await Step13EntryPoint.RunAsAgentAsync(writer, input, executionEnvironment, session);
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
// We expect to get the message that was passed in directly; since we are passing it in as a string, there is no associated
// author information. The ExpectedSource is empty string.
const string ExpectedSource = "";
Assert.Collection(lines,
line => Assert.Contains($"{ExpectedSource}: {input}", line)
);
}
}
/// <summary>
/// Tests that shared state works WITHIN a subworkflow (internal persistence).
/// This verifies state written by one executor in a subworkflow can be read
/// by another executor in the SAME subworkflow.
/// </summary>
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Test_RunSample_Step14_SharedState_WorksWithinSubworkflowAsync(ExecutionEnvironment environment)
{
// Arrange
IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment();
const string Text = " Lorem ipsum dolor sit amet, consectetur adipiscing elit. ";
int expectedCharCount = Text.Trim().Length;
// Act & Assert - All executors inside the subworkflow should share state
using StringWriter writer = new();
int result = await Step14EntryPoint.RunSubworkflowInternalStateAsync(Text, writer, executionEnvironment);
result.Should().Be(expectedCharCount, "executors within subworkflow should share state correctly");
}
/// <summary>
/// Documents that shared state is currently isolated across subworkflow boundaries.
/// This is the behavior reported in issue #2419.
/// When/if cross-boundary state sharing is implemented, this test should be updated
/// to expect success instead of failure.
/// </summary>
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Test_RunSample_Step14a_SharedState_IsolatedAcrossSubworkflowBoundaryAsync(ExecutionEnvironment environment)
{
// Arrange
IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment();
const string Text = " Lorem ipsum dolor sit amet, consectetur adipiscing elit. ";
// Act - Attempt to use shared state across parent/subworkflow boundary
using StringWriter writer = new();
Exception? error = await Step14EntryPoint.RunCrossBoundaryStateAsync(Text, writer, executionEnvironment);
// Assert - Currently, state is isolated across subworkflow boundaries (issue #2419)
// The subworkflow executor cannot see state written by the parent workflow
error.Should().NotBeNull("state written in parent workflow is not visible in subworkflow");
// The exception may be wrapped in TargetInvocationException, so check inner exception too
Exception actualError = error is System.Reflection.TargetInvocationException tie && tie.InnerException != null
? tie.InnerException
: error;
actualError.Should().BeOfType<InvalidOperationException>();
}
}
internal sealed class VerifyingPlaybackResponder<TInput, TResponse>
{
public (TInput input, TResponse response)[] Responses { get; }
private int _position;
public VerifyingPlaybackResponder(params (TInput input, TResponse response)[] responses)
{
this.Responses = responses;
}
public int Remaining => Math.Max(0, this.Responses.Length - this._position);
public TResponse InvokeNext(TInput input)
{
Assert.True(this.Remaining > 0);
(TInput expectedInput, TResponse expectedResponse) = this.Responses[this._position++];
Assert.Equal(expectedInput, input);
return expectedResponse;
}
}