.NET Workflows - Enable "human in the loop" case for declarative integration tests (#1171)

* Enabled

* Rollback visitor

* Update dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.IntegrationTests/Framework/WorkflowTest.cs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Chris
2025-10-03 10:24:22 -07:00
committed by GitHub
Unverified
parent 3ee7461b59
commit 880a99ca05
12 changed files with 156 additions and 32 deletions
@@ -22,7 +22,7 @@ internal abstract class DeclarativeActionExecutor<TAction>(TAction model, Workfl
public new TAction Model => (TAction)base.Model;
}
internal abstract class DeclarativeActionExecutor : Executor<ActionExecutorResult>, IModeledAction
internal abstract class DeclarativeActionExecutor : Executor<ActionExecutorResult>, IResettableExecutor, IModeledAction
{
private string? _parentId;
private readonly WorkflowFormulaState _state;
@@ -54,6 +54,12 @@ internal abstract class DeclarativeActionExecutor : Executor<ActionExecutorResul
protected virtual bool EmitResultEvent => true;
/// <inheritdoc/>
public ValueTask ResetAsync()
{
return default;
}
/// <inheritdoc/>
public override async ValueTask HandleAsync(ActionExecutorResult message, IWorkflowContext context)
{
@@ -16,8 +16,14 @@ internal sealed class DeclarativeWorkflowExecutor<TInput>(
DeclarativeWorkflowOptions options,
WorkflowFormulaState state,
Func<TInput, ChatMessage> inputTransform) :
Executor<TInput>(workflowId), IModeledAction where TInput : notnull
Executor<TInput>(workflowId), IResettableExecutor, IModeledAction where TInput : notnull
{
/// <inheritdoc/>
public ValueTask ResetAsync()
{
return default;
}
public override async ValueTask HandleAsync(TInput message, IWorkflowContext context)
{
// No state to restore if we're starting from the beginning.
@@ -19,7 +19,7 @@ internal sealed class DelegateActionExecutor(string actionId, WorkflowFormulaSta
}
}
internal class DelegateActionExecutor<TMessage> : Executor<TMessage>, IModeledAction where TMessage : notnull
internal class DelegateActionExecutor<TMessage> : Executor<TMessage>, IResettableExecutor, IModeledAction where TMessage : notnull
{
private readonly WorkflowFormulaState _state;
private readonly DelegateAction<TMessage>? _action;
@@ -33,6 +33,12 @@ internal class DelegateActionExecutor<TMessage> : Executor<TMessage>, IModeledAc
this._emitResult = emitResult;
}
/// <inheritdoc/>
public ValueTask ResetAsync()
{
return default;
}
public override async ValueTask HandleAsync(TMessage message, IWorkflowContext context)
{
if (this._action is not null)
@@ -51,7 +51,7 @@ public abstract class ActionExecutor(string id, FormulaSession session) : Action
/// Base class for an action executor that receives the initial trigger message.
/// </summary>
/// <typeparam name="TMessage">The type of message being handled</typeparam>
public abstract class ActionExecutor<TMessage> : Executor<TMessage> where TMessage : notnull
public abstract class ActionExecutor<TMessage> : Executor<TMessage>, IResettableExecutor where TMessage : notnull
{
private readonly FormulaSession _session;
@@ -66,6 +66,12 @@ public abstract class ActionExecutor<TMessage> : Executor<TMessage> where TMessa
this._session = session;
}
/// <inheritdoc/>
public ValueTask ResetAsync()
{
return default;
}
/// <inheritdoc/>
public override async ValueTask HandleAsync(TMessage message, IWorkflowContext context)
{
@@ -16,7 +16,7 @@ namespace Microsoft.Agents.AI.Workflows.Declarative.Kit;
/// Base class for an entry-point workflow executor that receives the initial trigger message.
/// </summary>
/// <typeparam name="TInput">The type of the initial message that starts the workflow.</typeparam>
public abstract class RootExecutor<TInput> : Executor<TInput> where TInput : notnull
public abstract class RootExecutor<TInput> : Executor<TInput>, IResettableExecutor where TInput : notnull
{
private readonly IConfiguration? _configuration;
private readonly WorkflowAgentProvider _agentProvider;
@@ -48,6 +48,12 @@ public abstract class RootExecutor<TInput> : Executor<TInput> where TInput : not
this.Session = new RootFormulaSession(this._state);
}
/// <inheritdoc/>
public ValueTask ResetAsync()
{
return default;
}
/// <inheritdoc/>
public override async ValueTask HandleAsync(TInput message, IWorkflowContext context)
{
@@ -29,7 +29,7 @@ public sealed class DeclarativeCodeGenTest(ITestOutputHelper output) : WorkflowT
[InlineData("Marketing.yaml", "Marketing.json", true)]
[InlineData("MathChat.yaml", "MathChat.json", true)]
[InlineData("DeepResearch.yaml", "DeepResearch.json", Skip = "Long running")]
[InlineData("HumanInLoop.yaml", "HumanInLoop.json", Skip = "Needs test support")]
[InlineData("HumanInLoop.yaml", "HumanInLoop.json", Skip = "Needs template support")]
public Task ValidateScenarioAsync(string workflowFileName, string testcaseFileName, bool externalConveration = false) =>
this.RunWorkflowAsync(Path.Combine(GetRepoFolder(), "workflow-samples", workflowFileName), testcaseFileName, externalConveration);
@@ -41,11 +41,15 @@ public sealed class DeclarativeCodeGenTest(ITestOutputHelper output) : WorkflowT
string workflowProviderCode = DeclarativeWorkflowBuilder.Eject(workflowPath, DeclarativeWorkflowLanguage.CSharp, WorkflowNamespace, WorkflowPrefix);
try
{
WorkflowEvents workflowEvents = await WorkflowHarness.RunCodeAsync(workflowProviderCode, $"{WorkflowPrefix}WorkflowProvider", WorkflowNamespace, workflowOptions, (TInput)GetInput<TInput>(testcase));
foreach (ExecutorEvent invokeEvent in workflowEvents.ExecutorInvokeEvents)
{
this.Output.WriteLine($"EXEC: {invokeEvent.ExecutorId}");
}
WorkflowHarness harness = await WorkflowHarness.GenerateCodeAsync(
runId: Path.GetFileNameWithoutExtension(workflowPath),
workflowProviderCode,
workflowProviderName: $"{WorkflowPrefix}WorkflowProvider",
WorkflowNamespace,
workflowOptions,
(TInput)GetInput<TInput>(testcase));
WorkflowEvents workflowEvents = await harness.RunTestcaseAsync(testcase, (TInput)GetInput<TInput>(testcase)).ConfigureAwait(false);
Assert.Empty(workflowEvents.ActionInvokeEvents);
Assert.Empty(workflowEvents.ActionCompleteEvents);
@@ -29,7 +29,7 @@ public sealed class DeclarativeWorkflowTest(ITestOutputHelper output) : Workflow
[InlineData("Marketing.yaml", "Marketing.json", true)]
[InlineData("MathChat.yaml", "MathChat.json", true)]
[InlineData("DeepResearch.yaml", "DeepResearch.json", Skip = "Long running")]
[InlineData("HumanInLoop.yaml", "HumanInLoop.json", Skip = "Needs test support")]
[InlineData("HumanInLoop.yaml", "HumanInLoop.json")]
public Task ValidateScenarioAsync(string workflowFileName, string testcaseFileName, bool externalConveration = false) =>
this.RunWorkflowAsync(Path.Combine(GetRepoFolder(), "workflow-samples", workflowFileName), testcaseFileName, externalConveration);
@@ -37,17 +37,14 @@ public sealed class DeclarativeWorkflowTest(ITestOutputHelper output) : Workflow
{
Workflow workflow = DeclarativeWorkflowBuilder.Build<TInput>(workflowPath, workflowOptions);
WorkflowEvents workflowEvents = await WorkflowHarness.RunAsync(workflow, (TInput)GetInput<TInput>(testcase));
foreach (DeclarativeActionInvokedEvent actionInvokeEvent in workflowEvents.ActionInvokeEvents)
{
this.Output.WriteLine($"ACTION: {actionInvokeEvent.ActionId} [{actionInvokeEvent.ActionType}]");
}
WorkflowHarness harness = new(workflow, runId: Path.GetFileNameWithoutExtension(workflowPath));
WorkflowEvents workflowEvents = await harness.RunTestcaseAsync(testcase, (TInput)GetInput<TInput>(testcase)).ConfigureAwait(false);
Assert.NotEmpty(workflowEvents.ExecutorInvokeEvents);
Assert.NotEmpty(workflowEvents.ExecutorCompleteEvents);
AssertWorkflow.Conversation(workflowOptions.ConversationId, testcase.Validation.ConversationCount, workflowEvents.ConversationEvents);
AssertWorkflow.EventCounts(workflowEvents.ActionInvokeEvents.Count, testcase);
AssertWorkflow.EventCounts(workflowEvents.ActionCompleteEvents.Count, testcase);
AssertWorkflow.EventCounts(workflowEvents.ActionCompleteEvents.Count, testcase, isCompletion: true);
AssertWorkflow.EventSequence(workflowEvents.ActionInvokeEvents.Select(e => e.ActionId), testcase);
}
}
@@ -28,11 +28,13 @@ public sealed class Testcase
public sealed class TestcaseSetup
{
[JsonConstructor]
public TestcaseSetup(TestcaseInput input)
public TestcaseSetup(TestcaseInput input, IList<TestcaseInput>? responses = null)
{
this.Input = input;
this.Responses = responses ?? [];
}
public TestcaseInput Input { get; }
public IList<TestcaseInput>? Responses { get; }
}
public sealed class TestcaseInput
@@ -17,6 +17,7 @@ internal sealed class WorkflowEvents
this.ConversationEvents = workflowEvents.OfType<ConversationUpdateEvent>().ToList();
this.ExecutorInvokeEvents = workflowEvents.OfType<ExecutorInvokedEvent>().ToList();
this.ExecutorCompleteEvents = workflowEvents.OfType<ExecutorCompletedEvent>().ToList();
this.InputEvents = workflowEvents.OfType<RequestInfoEvent>().ToList();
}
public IReadOnlyList<WorkflowEvent> Events { get; }
@@ -26,4 +27,5 @@ internal sealed class WorkflowEvents
public IReadOnlyList<DeclarativeActionCompletedEvent> ActionCompleteEvents { get; }
public IReadOnlyList<ExecutorInvokedEvent> ExecutorInvokeEvents { get; }
public IReadOnlyList<ExecutorCompletedEvent> ExecutorCompleteEvents { get; }
public IReadOnlyList<RequestInfoEvent> InputEvents { get; }
}
@@ -5,20 +5,57 @@ using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Declarative.Events;
using Shared.Code;
namespace Microsoft.Agents.AI.Workflows.Declarative.IntegrationTests.Framework;
internal static class WorkflowHarness
internal sealed class WorkflowHarness(Workflow workflow, string runId)
{
public static async Task<WorkflowEvents> RunAsync<TInput>(Workflow workflow, TInput input) where TInput : notnull
private readonly CheckpointManager _checkpointManager = CheckpointManager.CreateInMemory();
private CheckpointInfo? LastCheckpoint { get; set; }
public async Task<WorkflowEvents> RunTestcaseAsync<TInput>(Testcase testcase, TInput input) where TInput : notnull
{
StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);
IReadOnlyList<WorkflowEvent> workflowEvents = run.WatchStreamAsync().ToEnumerable().ToList();
WorkflowEvents workflowEvents = await this.RunAsync(input);
int requestCount = (workflowEvents.InputEvents.Count + 1) / 2;
int responseCount = 0;
while (requestCount > responseCount)
{
Assert.NotNull(testcase.Setup.Responses);
Assert.NotEmpty(testcase.Setup.Responses);
string inputText = testcase.Setup.Responses[responseCount].Value;
Console.WriteLine($"INPUT: {inputText}");
InputResponse response = new(inputText);
++responseCount;
WorkflowEvents runEvents = await this.ResumeAsync(response).ConfigureAwait(false);
workflowEvents = new WorkflowEvents([.. workflowEvents.Events, .. runEvents.Events]);
requestCount = (workflowEvents.InputEvents.Count + 1) / 2;
}
return workflowEvents;
}
private async Task<WorkflowEvents> RunAsync<TInput>(TInput input) where TInput : notnull
{
Console.WriteLine("RUNNING WORKFLOW...");
Checkpointed<StreamingRun> run = await InProcessExecution.StreamAsync(workflow, input, this._checkpointManager, runId);
IReadOnlyList<WorkflowEvent> workflowEvents = await this.MonitorWorkflowRunAsync(run).ToArrayAsync();
this.LastCheckpoint = workflowEvents.OfType<SuperStepCompletedEvent>().LastOrDefault()?.CompletionInfo?.Checkpoint;
return new WorkflowEvents(workflowEvents);
}
public static async Task<WorkflowEvents> RunCodeAsync<TInput>(
private async Task<WorkflowEvents> ResumeAsync(InputResponse response)
{
Console.WriteLine("RESUMING WORKFLOW...");
Assert.NotNull(this.LastCheckpoint);
Checkpointed<StreamingRun> run = await InProcessExecution.ResumeStreamAsync(workflow, this.LastCheckpoint, this._checkpointManager, runId);
IReadOnlyList<WorkflowEvent> workflowEvents = await this.MonitorWorkflowRunAsync(run, response).ToArrayAsync();
return new WorkflowEvents(workflowEvents);
}
public static async Task<WorkflowHarness> GenerateCodeAsync<TInput>(
string runId,
string workflowProviderCode,
string workflowProviderName,
string workflowProviderNamespace,
@@ -35,7 +72,44 @@ internal static class WorkflowHarness
object? workflowObject = genericMethod.Invoke(null, [options, null]);
Workflow workflow = Assert.IsType<Workflow>(workflowObject);
Console.WriteLine("RUNNING WORKFLOW...");
return await RunAsync(workflow, input);
return new WorkflowHarness(workflow, runId);
}
private async IAsyncEnumerable<WorkflowEvent> MonitorWorkflowRunAsync(Checkpointed<StreamingRun> run, InputResponse? response = null)
{
await foreach (WorkflowEvent workflowEvent in run.Run.WatchStreamAsync().ConfigureAwait(false))
{
bool exitLoop = false;
switch (workflowEvent)
{
case RequestInfoEvent requestInfo:
Console.WriteLine($"REQUEST #{requestInfo.Request.RequestId}");
if (response is not null)
{
ExternalResponse requestResponse = requestInfo.Request.CreateResponse(response);
await run.Run.SendResponseAsync(requestResponse).ConfigureAwait(false);
response = null;
}
else
{
await run.Run.EndRunAsync().ConfigureAwait(false);
exitLoop = true;
}
break;
case DeclarativeActionInvokedEvent actionInvokeEvent:
Console.WriteLine($"ACTION: {actionInvokeEvent.ActionId} [{actionInvokeEvent.ActionType}]");
break;
}
yield return workflowEvent;
if (exitLoop)
{
break;
}
}
Console.WriteLine("SUSPENDING WORKFLOW...");
}
}
@@ -132,9 +132,10 @@ public abstract class WorkflowTest(ITestOutputHelper output) : IntegrationTest(o
}
}
public static void EventCounts(int actualCount, Testcase testcase)
// "isCompletion" adjusts validation logic to account for when condition completion is not experienced due to goto. Remove this test logic once addressed.
public static void EventCounts(int actualCount, Testcase testcase, bool isCompletion = false)
{
Assert.True(actualCount >= testcase.Validation.MinActionCount, $"Event count less than expected: {testcase.Validation.MinActionCount} ({actualCount}).");
Assert.True(actualCount + (isCompletion ? 1 : 0) >= testcase.Validation.MinActionCount, $"Event count less than expected: {testcase.Validation.MinActionCount} ({actualCount}).");
Assert.True(actualCount <= (testcase.Validation.MaxActionCount ?? testcase.Validation.MinActionCount), $"Event count greater than expected: {testcase.Validation.MaxActionCount ?? testcase.Validation.MinActionCount} ({actualCount}).");
}
@@ -196,6 +197,7 @@ public abstract class WorkflowTest(ITestOutputHelper output) : IntegrationTest(o
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
ReadCommentHandling = JsonCommentHandling.Skip,
WriteIndented = true,
};
}
@@ -4,17 +4,30 @@
"input": {
"type": "String",
"value": "Iko"
}
},
"responses": [
{
"type": "String",
"value": "Adsf"
},
{
"type": "String",
"value": "Iko"
}
]
},
"validation": {
"conversation_count": 1,
"min_action_count": 1,
"min_action_count": 8,
"actions": {
"start": [
"invoke_agent"
"set_project"
],
"repeat": [
"question_confirm"
],
"final": [
"invoke_agent"
"sendActivity_confirmed"
]
}
}