// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using FluentAssertions; using Microsoft.Agents.AI.Workflows.InProc; using Microsoft.Agents.AI.Workflows.Sample; namespace Microsoft.Agents.AI.Workflows.UnitTests; /// /// Regression tests for GH-2485: pending objects must be /// re-emitted after resuming a workflow from a checkpoint. /// public class CheckpointResumeTests { /// /// Verifies that a resumed workflow re-emits s for /// pending external requests that existed at the time of the checkpoint. /// [Theory] [InlineData(ExecutionEnvironment.InProcess_OffThread)] [InlineData(ExecutionEnvironment.InProcess_Lockstep)] internal async Task Checkpoint_Resume_WithPendingRequests_RepublishesRequestInfoEventsAsync(ExecutionEnvironment environment) { // Arrange RequestPort requestPort = RequestPort.Create("TestPort"); ForwardMessageExecutor processor = new("Processor"); Workflow workflow = new WorkflowBuilder(requestPort) .AddEdge(requestPort, processor) .Build(); CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // Act 1: Run workflow, collect pending requests and a checkpoint. List originalRequests = []; CheckpointInfo? checkpoint = null; await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager) .RunStreamingAsync(workflow, "Hello")) { await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false)) { if (evt is RequestInfoEvent requestInfo) { originalRequests.Add(requestInfo.Request); } if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp) { checkpoint = cp; } } originalRequests.Should().NotBeEmpty("the workflow should have created at least one external request"); checkpoint.Should().NotBeNull("a checkpoint should have been created"); } // Act 2: Resume from the checkpoint. await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager) .ResumeStreamingAsync(workflow, checkpoint!); // Assert: The pending requests should be re-emitted. List reEmittedRequests = []; using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10)); await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token)) { if (evt is RequestInfoEvent requestInfo) { reEmittedRequests.Add(requestInfo.Request); } } reEmittedRequests.Should().HaveCount(originalRequests.Count, "all pending requests from the checkpoint should be re-emitted after resume"); reEmittedRequests.Select(r => r.RequestId) .Should().BeEquivalentTo(originalRequests.Select(r => r.RequestId), "the re-emitted request IDs should match the original pending request IDs"); } /// /// Verifies that transitions to /// after resuming from a checkpoint with pending external requests (not stuck at NotStarted). /// [Theory] [InlineData(ExecutionEnvironment.InProcess_OffThread)] [InlineData(ExecutionEnvironment.InProcess_Lockstep)] internal async Task Checkpoint_Resume_WithPendingRequests_RunStatusIsPendingRequestsAsync(ExecutionEnvironment environment) { // Arrange RequestPort requestPort = RequestPort.Create("TestPort"); ForwardMessageExecutor processor = new("Processor"); Workflow workflow = new WorkflowBuilder(requestPort) .AddEdge(requestPort, processor) .Build(); CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // First run: collect a checkpoint with pending requests. CheckpointInfo? checkpoint = null; await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager) .RunStreamingAsync(workflow, "Hello")) { await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false)) { if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp) { checkpoint = cp; } } checkpoint.Should().NotBeNull(); } // Act: Resume from the checkpoint and consume events so the run loop processes. await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager) .ResumeStreamingAsync(workflow, checkpoint!); using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10)); await foreach (WorkflowEvent _ in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token)) { // Consume all events until the stream completes. } // Assert RunStatus status = await resumed.GetStatusAsync(); status.Should().Be(RunStatus.PendingRequests, "the resumed workflow should report PendingRequests after rehydration"); } /// /// Verifies the full roundtrip: resume from checkpoint, observe the re-emitted request, /// send a response, and verify the workflow completes without duplicating the request. /// [Theory] [InlineData(ExecutionEnvironment.InProcess_OffThread)] [InlineData(ExecutionEnvironment.InProcess_Lockstep)] internal async Task Checkpoint_Resume_RespondToPendingRequest_CompletesWithoutDuplicateAsync(ExecutionEnvironment environment) { // Arrange RequestPort requestPort = RequestPort.Create("TestPort"); ForwardMessageExecutor processor = new("Processor"); Workflow workflow = new WorkflowBuilder(requestPort) .AddEdge(requestPort, processor) .Build(); CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // First run: collect checkpoint + pending request. ExternalRequest? pendingRequest = null; CheckpointInfo? checkpoint = null; await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager) .RunStreamingAsync(workflow, "Hello")) { await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false)) { if (evt is RequestInfoEvent requestInfo) { pendingRequest = requestInfo.Request; } if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp) { checkpoint = cp; } } pendingRequest.Should().NotBeNull(); checkpoint.Should().NotBeNull(); } // Act: Resume and respond to the restored request. await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager) .ResumeStreamingAsync(workflow, checkpoint!); int requestEventCount = 0; using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10)); // Use blockOnPendingRequest: false for the first pass to see the re-emitted requests. await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token)) { if (evt is RequestInfoEvent requestInfo) { requestEventCount++; requestInfo.Request.RequestId.Should().Be(pendingRequest!.RequestId, "the re-emitted request should match the original"); } } requestEventCount.Should().Be(1, "the pending request should be emitted exactly once (no duplicates)"); // Assert intermediate state before responding: the run should be in PendingRequests // and we should have observed the re-emitted request. If the first WatchStreamAsync // didn't complete or yielded nothing, these assertions catch it with a clear message. RunStatus statusBeforeResponse = await resumed.GetStatusAsync(); statusBeforeResponse.Should().Be(RunStatus.PendingRequests, "the run should be in PendingRequests state before we send a response"); // Now send the response and verify the workflow processes it. ExternalResponse response = pendingRequest!.CreateResponse("World"); await resumed.SendResponseAsync(response); // Consume the resulting events to verify the workflow progresses without errors. List postResponseEvents = []; using CancellationTokenSource cts2 = new(TimeSpan.FromSeconds(10)); await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts2.Token)) { postResponseEvents.Add(evt); } postResponseEvents.Should().NotBeEmpty( "the workflow should process the response and produce events"); postResponseEvents.OfType().Should().BeEmpty( "no errors should occur when processing the restored request's response"); } /// /// Verifies that restoring a live run to a checkpoint re-emits pending requests and allows /// the workflow to continue from that restored point. /// [Theory] [InlineData(ExecutionEnvironment.InProcess_OffThread)] [InlineData(ExecutionEnvironment.InProcess_Lockstep)] internal async Task Checkpoint_Restore_WithPendingRequests_RepublishesRequestInfoEventsAsync(ExecutionEnvironment environment) { // Arrange Workflow workflow = CreateSimpleRequestWorkflow(); CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); await using StreamingRun run = await env.WithCheckpointing(checkpointManager) .RunStreamingAsync(workflow, "Hello"); (ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run); // Advance the run past the checkpoint so the restore has meaningful work to undo. await run.SendResponseAsync(pendingRequest.CreateResponse("World")); List firstCompletionEvents = await ReadToHaltAsync(run); firstCompletionEvents.OfType().Should().BeEmpty( "the workflow should continue cleanly before we restore"); RunStatus statusAfterFirstResponse = await run.GetStatusAsync(); statusAfterFirstResponse.Should().Be(RunStatus.Idle, "the workflow should finish processing the first response before we restore"); // Act await run.RestoreCheckpointAsync(checkpoint); // Assert List restoredEvents = await ReadToHaltAsync(run); ExternalRequest[] replayedRequests = [.. restoredEvents.OfType().Select(evt => evt.Request)]; replayedRequests.Should().ContainSingle("runtime restore should re-emit the restored pending request"); replayedRequests[0].RequestId.Should().Be(pendingRequest.RequestId, "the replayed request should match the request captured at the checkpoint"); await run.SendResponseAsync(replayedRequests[0].CreateResponse("Again")); List secondCompletionEvents = await ReadToHaltAsync(run); secondCompletionEvents.OfType().Should().BeEmpty( "runtime restore replay should not introduce workflow errors"); RunStatus statusAfterRestoreResponse = await run.GetStatusAsync(); statusAfterRestoreResponse.Should().Be(RunStatus.Idle, "the workflow should be able to continue after the runtime restore replay"); } /// /// Verifies that restoring a live run clears any queued external responses from the /// superseded timeline before importing checkpoint state. /// [Fact] internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImportAsync() { Workflow workflow = CreateSimpleRequestWorkflow(); CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); InProcessExecutionEnvironment env = ExecutionEnvironment.InProcess_Lockstep.ToWorkflowExecutionEnvironment(); await using StreamingRun run = await env.WithCheckpointing(checkpointManager) .RunStreamingAsync(workflow, "Hello"); (ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run); await run.SendResponseAsync(pendingRequest.CreateResponse("World")); await run.RestoreCheckpointAsync(checkpoint); List restoredEvents = await ReadToHaltAsync(run); ExternalRequest replayedRequest = restoredEvents.OfType() .Select(evt => evt.Request) .Should() .ContainSingle("the restored run should still be waiting for the checkpointed request") .Subject; restoredEvents.OfType().Should().BeEmpty( "a queued response from the superseded timeline should not be processed after restore"); RunStatus statusAfterRestore = await run.GetStatusAsync(); statusAfterRestore.Should().Be(RunStatus.PendingRequests, "the restored run should remain pending until a post-restore response is sent"); await run.SendResponseAsync(replayedRequest.CreateResponse("Again")); List completionEvents = await ReadToHaltAsync(run); completionEvents.OfType().Should().BeEmpty( "the restored request should complete cleanly once a new response is provided"); RunStatus finalStatus = await run.GetStatusAsync(); finalStatus.Should().Be(RunStatus.Idle, "the workflow should finish once the replayed request receives a fresh response"); } /// /// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow. /// [Theory] [InlineData(ExecutionEnvironment.InProcess_OffThread)] [InlineData(ExecutionEnvironment.InProcess_Lockstep)] internal async Task Checkpoint_Resume_SubworkflowWithPendingRequests_RepublishesQualifiedRequestInfoEventsAsync(ExecutionEnvironment environment) { // Arrange Workflow workflow = CreateCheckpointedSubworkflowRequestWorkflow(); CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); ExternalRequest pendingRequest; CheckpointInfo checkpoint; await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager) .RunStreamingAsync(workflow, "Hello")) { (pendingRequest, checkpoint) = await CapturePendingRequestAndCheckpointAsync(firstRun); } // Act await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager) .ResumeStreamingAsync(workflow, checkpoint); // Assert List resumedEvents = await ReadToHaltAsync(resumed); ExternalRequest[] replayedRequests = [.. resumedEvents.OfType().Select(evt => evt.Request)]; replayedRequests.Should().ContainSingle("the resumed parent workflow should surface the subworkflow request once"); replayedRequests[0].RequestId.Should().Be(pendingRequest.RequestId, "the replayed subworkflow request should match the checkpointed request"); replayedRequests[0].PortInfo.PortId.Should().Be(pendingRequest.PortInfo.PortId, "the replayed request should remain qualified through the subworkflow boundary"); await resumed.SendResponseAsync(replayedRequests[0].CreateResponse("World")); List completionEvents = await ReadToHaltAsync(resumed); completionEvents.OfType().Should().BeEmpty( "the resumed subworkflow request should not be replayed twice"); completionEvents.OfType().Should().BeEmpty( "subworkflow replay should not introduce workflow errors"); RunStatus statusAfterSubworkflowResponse = await resumed.GetStatusAsync(); statusAfterSubworkflowResponse.Should().Be(RunStatus.Idle, "the resumed subworkflow should continue after responding to the replayed request"); } /// /// Verifies that when republishPendingEvents is , /// no is re-emitted after resuming from a checkpoint. /// [Theory] [InlineData(ExecutionEnvironment.InProcess_OffThread)] [InlineData(ExecutionEnvironment.InProcess_Lockstep)] internal async Task Checkpoint_Resume_WithRepublishDisabled_DoesNotEmitRequestInfoEventsAsync(ExecutionEnvironment environment) { // Arrange RequestPort requestPort = RequestPort.Create("TestPort"); ForwardMessageExecutor processor = new("Processor"); Workflow workflow = new WorkflowBuilder(requestPort) .AddEdge(requestPort, processor) .Build(); CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // First run: collect a checkpoint with pending requests. CheckpointInfo? checkpoint = null; await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager) .RunStreamingAsync(workflow, "Hello")) { await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false)) { if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp) { checkpoint = cp; } } checkpoint.Should().NotBeNull(); } // Act: Resume with republishPendingEvents: false via the internal API. await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager) .ResumeStreamingInternalAsync(workflow, checkpoint!, republishPendingEvents: false); // Assert: No RequestInfoEvent should appear in the event stream. int requestEventCount = 0; using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10)); await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token)) { if (evt is RequestInfoEvent) { requestEventCount++; } } requestEventCount.Should().Be(0, "no RequestInfoEvent should be emitted when republishPendingEvents is false"); } private static Workflow CreateSimpleRequestWorkflow( string requestPortId = "TestPort", string processorId = "Processor") { RequestPort requestPort = RequestPort.Create(requestPortId); ForwardMessageExecutor processor = new(processorId); return new WorkflowBuilder(requestPort) .AddEdge(requestPort, processor) .Build(); } private static Workflow CreateCheckpointedSubworkflowRequestWorkflow() { ExecutorBinding subworkflow = CreateSimpleRequestWorkflow( requestPortId: "InnerTestPort", processorId: "InnerProcessor") .BindAsExecutor("Subworkflow"); return new WorkflowBuilder(subworkflow) .AddExternalRequest(subworkflow, id: "ForwardedSubworkflowRequest") .Build(); } private static async ValueTask<(ExternalRequest PendingRequest, CheckpointInfo Checkpoint)> CapturePendingRequestAndCheckpointAsync(StreamingRun run) { ExternalRequest? pendingRequest = null; CheckpointInfo? checkpoint = null; await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false)) { if (evt is RequestInfoEvent requestInfo) { pendingRequest ??= requestInfo.Request; } if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp) { checkpoint = cp; } } pendingRequest.Should().NotBeNull("the workflow should have emitted a pending request"); checkpoint.Should().NotBeNull("the workflow should have produced a checkpoint"); return (pendingRequest!, checkpoint!); } private static async ValueTask> ReadToHaltAsync(StreamingRun run) { List events = []; using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10)); await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cts.Token)) { events.Add(evt); } return events; } }