Files
Jacob Alber 5777ed26e6 .NET: fix: Add session support for Handoff-hosted Agents (#5280)
* fix: Add session support for Handoff-hosted Agents

In order to better support using `Workflows` hosted as `AIAgents` inside of Handoff workflows, we need to make proper use of AgentSession. This causes potential issues around checkpointing and making sure that we properly compute only the new incoming messages for each agent invocation.

* fix: AgentSession checkpointing using AIAgent's Serialize/Deserialize methods

We cannot rely on implicit serialization through `HandoffHostState` because we are missing type information.

* fix: Thread safety issue in `MultiPartyConversation.AllMessages`

* fix: Enable unwrapping of FunctionResultContent when ExternalRequest was wrapped into FunctionCallContent
2026-04-17 20:15:27 +00:00

560 lines
22 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.InProc;
using Microsoft.Agents.AI.Workflows.Sample;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
internal enum ExecutionEnvironment
{
InProcess_Lockstep,
InProcess_OffThread,
InProcess_Concurrent
}
public class SampleSmokeTest
{
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step1Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
await Step1EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
const string INPUT = "Hello, World!";
Assert.Collection(lines,
line => Assert.Contains($"UppercaseExecutor: {INPUT.ToUpperInvariant()}", line),
line => Assert.Contains($"ReverseTextExecutor: {new string(INPUT.ToUpperInvariant().Reverse().ToArray())}", line)
);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step1aAsync(ExecutionEnvironment environment)
{
using StringWriter writer = new();
await Step1aEntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
const string INPUT = "Hello, World!";
Assert.Collection(lines,
line => Assert.Contains($"UppercaseExecutor: {INPUT.ToUpperInvariant()}", line),
line => Assert.Contains($"ReverseTextExecutor: {string.Concat(INPUT.ToUpperInvariant().Reverse())}", line)
);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step2Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
string spamResult = await Step2EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
Assert.Equal(RemoveSpamExecutor.ActionResult, spamResult);
string nonSpamResult = await Step2EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), "This is a valid message.");
Assert.Equal(RespondToMessageExecutor.ActionResult, nonSpamResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step3Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
string guessResult = await Step3EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
Assert.Equal("Guessed the number: 42", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step4Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
VerifyingPlaybackResponder<string, int> responder = new(
("Guess the number.", 50),
("Your guess was too high. Try again.", 23),
("Your guess was too low. Try again.", 42));
string guessResult = await Step4EntryPoint.RunAsync(writer, userGuessCallback: responder.InvokeNext, environment.ToWorkflowExecutionEnvironment());
Assert.Equal("You guessed correctly! You Win!", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step5Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
VerifyingPlaybackResponder<string, int> responder = new(
// Iteration 1
("Guess the number.", 50),
("Your guess was too high. Try again.", 23),
// Iteration 2
("Your guess was too high. Try again.", 23),
("Your guess was too low. Try again.", 42)
);
string guessResult = await Step5EntryPoint.RunAsync(writer, userGuessCallback: responder.InvokeNext, environment.ToWorkflowExecutionEnvironment());
Assert.Equal("You guessed correctly! You Win!", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step5aAsync(ExecutionEnvironment environment)
{
using StringWriter writer = new();
VerifyingPlaybackResponder<string, int> responder = new(
// Iteration 1
("Guess the number.", 50),
("Your guess was too high. Try again.", 23),
// Iteration 2
("Your guess was too high. Try again.", 23),
("Your guess was too low. Try again.", 42)
);
string guessResult = await Step5EntryPoint.RunAsync(writer, userGuessCallback: responder.InvokeNext, environment.ToWorkflowExecutionEnvironment(), rehydrateToRestore: true);
Assert.Equal("You guessed correctly! You Win!", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step5bAsync(ExecutionEnvironment environment)
{
using StringWriter writer = new();
VerifyingPlaybackResponder<string, int> responder = new(
// Iteration 1
("Guess the number.", 50),
("Your guess was too high. Try again.", 23),
// Iteration 2
("Your guess was too high. Try again.", 23),
("Your guess was too low. Try again.", 42)
);
JsonSerializerOptions options = new(SampleJsonContext.Default.Options);
options.MakeReadOnly();
CheckpointManager memoryJsonManager = CheckpointManager.CreateJson(new InMemoryJsonStore(), options);
string guessResult = await Step5EntryPoint.RunAsync(writer, userGuessCallback: responder.InvokeNext, environment.ToWorkflowExecutionEnvironment(), rehydrateToRestore: true, checkpointManager: memoryJsonManager);
Assert.Equal("You guessed correctly! You Win!", guessResult);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step6Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
await Step6EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
Assert.Collection(lines,
line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line),
line => Assert.Contains($"{Step6EntryPoint.EchoAgentId}: {Step6EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line)
);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step7Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
await Step7EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
Assert.Collection(lines,
line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line),
line => Assert.Contains($"{Step7EntryPoint.EchoAgentId}: {Step7EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line),
line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line),
line => Assert.Contains($"{Step7EntryPoint.EchoAgentId}: {Step7EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line)
);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step8Async(ExecutionEnvironment environment)
{
List<string> textsToProcess = [
"Hello world! This is a simple test.",
"Python is a powerful programming language used for many applications.",
"Short text.",
"This is a longer text with multiple sentences. It contains more words and characters. We use it to test our text processing workflow.",
"",
" Spaces around text ",
];
using StringWriter writer = new();
List<TextProcessingResult> results = await Step8EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), textsToProcess);
Assert.Equal(textsToProcess.Count, results.Count);
Assert.Collection(results,
textsToProcess.Select(CreateValidator).ToArray());
Action<TextProcessingResult> CreateValidator(string textToProcess, int index)
{
return result =>
{
TextProcessingResult expected = new(
TaskId: $"Task{index}",
Text: textToProcess,
WordCount: textToProcess.Split([' '], StringSplitOptions.RemoveEmptyEntries).Length,
ChatCount: textToProcess.Length
);
result.Should().Be(expected);
};
}
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step9Async(ExecutionEnvironment environment)
{
using StringWriter writer = new();
_ = await Step9EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment());
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step10Async(ExecutionEnvironment environment)
{
List<string> inputs = ["1", "2", "3"];
using StringWriter writer = new();
await Step10EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs);
string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries);
Assert.Collection(lines,
inputs.Select(CreateValidator).ToArray());
Action<string> CreateValidator(string expected) => actual => actual.Should().Be($"Echo: {expected}");
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step11Async(ExecutionEnvironment environment)
{
List<string> inputs = ["1", "2", "3"];
using StringWriter writer = new();
await Step11EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs);
string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries);
Array.Sort(lines, StringComparer.OrdinalIgnoreCase);
string[] expected = Enumerable.Range(1, Step11EntryPoint.AgentCount)
.SelectMany(agentNumber => inputs.Select(input => Step11EntryPoint.ExpectedOutputForInput(input, agentNumber)))
.ToArray();
Array.Sort(expected, StringComparer.OrdinalIgnoreCase);
Assert.Collection(lines,
expected.Select(CreateValidator).ToArray());
Action<string> CreateValidator(string expected) => actual => actual.Should().Be(expected);
}
public class Step12ExpectedOutputCalculator(int agentCount)
{
private readonly int[] _bookmarks = new int[agentCount];
private readonly List<string> _history = new();
private readonly HashSet<int> _skipIndices = new();
public IEnumerable<string> ExpectedOutputs =>
this._history.Where((element, index) => !this._skipIndices.Contains(index));
public void ProcessInput(string newInput)
{
this._skipIndices.Add(this._history.Count);
this._history.Add(newInput);
for (int i = 0; i < agentCount; i++)
{
int agentId = i + 1;
int agentBookmark = this._bookmarks[i];
int count = this._history.Count - agentBookmark;
count.Should().BeGreaterThanOrEqualTo(0);
foreach (string input in this._history.Skip(agentBookmark).ToList())
{
this._history.Add($"{agentId}:{input}");
}
this._bookmarks[i] = this._history.Count;
}
}
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step12Async(ExecutionEnvironment environment)
{
List<string> inputs = ["1", "2", "3"];
// The expectation is that each agent will echo each input along with every echo from previous agents
// E.g.:
// (user): 1
// ----- outputs below
// (a1): 1:1
// (a2): 2:1
// (a2): 2:1:1
// If there were three agents, it would then be followed by:
// (a3): 3:1
// (a3): 3:1:1
// (a3): 3:2:1
// (a3): 3:2:1:1
// If there are multiple inputs (there are), then each successive input adds to the depth of the previous
// ones, so, for example, once we do input = "1", "2":
// (user): 1
// (a1): 1:1 <- a1 "last seen"
// (a2): 2:1
// (a2): 2:1:1 <- a2 "last seen"
// (user): 2
// ----- outputs below
// (a1): 1:2:1
// (a1): 1:2:1:1
// (a1): 1:2 <- from user input, a1 "last seen"
// (a2): 2:2 <- from user input (note that a2 seems like it is seeing these in a different "order" than a1 - but it is not)
// (a2): 2:1:2:1
// (a2): 2:1:2:1:1
// (a2): 2:1:2 <- from a1's first echo, a2 "last seen"
Step12ExpectedOutputCalculator outputGenerator = new(Step12EntryPoint.AgentCount);
foreach (string input in inputs)
{
outputGenerator.ProcessInput(input);
}
string[] expected = outputGenerator.ExpectedOutputs.ToArray();
using StringWriter writer = new();
await Step12EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs);
string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries);
Console.Error.WriteLine("Expected lines: ");
foreach (string expectedLine in expected)
{
Console.Error.WriteLine($"\t{expectedLine}");
}
Console.Error.WriteLine("Actual lines: ");
foreach (string line in lines)
{
Console.Error.WriteLine($"\t{line}");
}
Assert.Collection(lines,
expected.Select(CreateValidator).ToArray());
Action<string> CreateValidator(string expected) => actual => actual.Should().Be(expected);
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step13Async(ExecutionEnvironment environment)
{
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment().WithCheckpointing(checkpointManager);
CheckpointInfo? resumeFrom = null;
await RunAndValidateAsync(1);
// this should crash before fix
await RunAndValidateAsync(2);
async ValueTask RunAndValidateAsync(int step)
{
using StringWriter writer = new();
string input = $"[{step}] Hello, World!";
resumeFrom = await Step13EntryPoint.RunAsync(writer, input, executionEnvironment, resumeFrom);
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
const string ExpectedSource = "EchoSubworkflow";
Assert.Collection(lines,
line => Assert.Contains($"{ExpectedSource}: {input}", line)
);
}
}
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step13aAsync(ExecutionEnvironment environment)
{
IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment();
AgentSession? session = null;
await RunAndValidateAsync(1);
// this should crash before fix
await RunAndValidateAsync(2);
async ValueTask RunAndValidateAsync(int step)
{
using StringWriter writer = new();
string input = $"[{step}] Hello, World!";
session = await Step13EntryPoint.RunAsAgentAsync(writer, input, executionEnvironment, session);
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
// We expect to get the message that was passed in directly; since we are passing it in as a string, there is no associated
// author information. The ExpectedSource is empty string.
const string ExpectedSource = "";
Assert.Collection(lines,
line => Assert.Contains($"{ExpectedSource}: {input}", line)
);
}
}
/// <summary>
/// Tests that shared state works WITHIN a subworkflow (internal persistence).
/// This verifies state written by one executor in a subworkflow can be read
/// by another executor in the SAME subworkflow.
/// </summary>
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Test_RunSample_Step14_SharedState_WorksWithinSubworkflowAsync(ExecutionEnvironment environment)
{
// Arrange
IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment();
const string Text = " Lorem ipsum dolor sit amet, consectetur adipiscing elit. ";
int expectedCharCount = Text.Trim().Length;
// Act & Assert - All executors inside the subworkflow should share state
using StringWriter writer = new();
int result = await Step14EntryPoint.RunSubworkflowInternalStateAsync(Text, writer, executionEnvironment);
result.Should().Be(expectedCharCount, "executors within subworkflow should share state correctly");
}
/// <summary>
/// Documents that shared state is currently isolated across subworkflow boundaries.
/// This is the behavior reported in issue #2419.
/// When/if cross-boundary state sharing is implemented, this test should be updated
/// to expect success instead of failure.
/// </summary>
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Test_RunSample_Step14a_SharedState_IsolatedAcrossSubworkflowBoundaryAsync(ExecutionEnvironment environment)
{
// Arrange
IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment();
const string Text = " Lorem ipsum dolor sit amet, consectetur adipiscing elit. ";
// Act - Attempt to use shared state across parent/subworkflow boundary
using StringWriter writer = new();
Exception? error = await Step14EntryPoint.RunCrossBoundaryStateAsync(Text, writer, executionEnvironment);
// Assert - Currently, state is isolated across subworkflow boundaries (issue #2419)
// The subworkflow executor cannot see state written by the parent workflow
error.Should().NotBeNull("state written in parent workflow is not visible in subworkflow");
// The exception may be wrapped in TargetInvocationException, so check inner exception too
Exception actualError = error is System.Reflection.TargetInvocationException tie && tie.InnerException != null
? tie.InnerException
: error;
actualError.Should().BeOfType<InvalidOperationException>();
}
}
internal sealed class VerifyingPlaybackResponder<TInput, TResponse>
{
public (TInput input, TResponse response)[] Responses { get; }
private int _position;
public VerifyingPlaybackResponder(params (TInput input, TResponse response)[] responses)
{
this.Responses = responses;
}
public int Remaining => Math.Max(0, this.Responses.Length - this._position);
public TResponse InvokeNext(TInput input)
{
Assert.True(this.Remaining > 0);
(TInput expectedInput, TResponse expectedResponse) = this.Responses[this._position++];
Assert.Equal(expectedInput, input);
return expectedResponse;
}
}