mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
87a8fa2a9d
* Improve workflow unit tests * Update test name prefix for clarity. * Update tests to surface any errors. * fix check-point restore-time race in off-thread workflow event stream * Fixes an intermittent checkpoint-restore race in in-process workflow runs.
488 lines
22 KiB
C#
488 lines
22 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using FluentAssertions;
|
|
using Microsoft.Agents.AI.Workflows.InProc;
|
|
using Microsoft.Agents.AI.Workflows.Sample;
|
|
|
|
namespace Microsoft.Agents.AI.Workflows.UnitTests;
|
|
|
|
/// <summary>
|
|
/// Regression tests for GH-2485: pending <see cref="RequestInfoEvent"/> objects must be
|
|
/// re-emitted after resuming a workflow from a checkpoint.
|
|
/// </summary>
|
|
public class CheckpointResumeTests
|
|
{
|
|
/// <summary>
|
|
/// Verifies that a resumed workflow re-emits <see cref="RequestInfoEvent"/>s for
|
|
/// pending external requests that existed at the time of the checkpoint.
|
|
/// </summary>
|
|
[Theory]
|
|
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
|
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
|
internal async Task Checkpoint_Resume_WithPendingRequests_RepublishesRequestInfoEventsAsync(ExecutionEnvironment environment)
|
|
{
|
|
// Arrange
|
|
RequestPort<string, string> requestPort = RequestPort.Create<string, string>("TestPort");
|
|
ForwardMessageExecutor<string> processor = new("Processor");
|
|
|
|
Workflow workflow = new WorkflowBuilder(requestPort)
|
|
.AddEdge(requestPort, processor)
|
|
.Build();
|
|
|
|
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
|
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
|
|
|
// Act 1: Run workflow, collect pending requests and a checkpoint.
|
|
List<ExternalRequest> originalRequests = [];
|
|
CheckpointInfo? checkpoint = null;
|
|
|
|
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
|
.RunStreamingAsync(workflow, "Hello"))
|
|
{
|
|
await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false))
|
|
{
|
|
if (evt is RequestInfoEvent requestInfo)
|
|
{
|
|
originalRequests.Add(requestInfo.Request);
|
|
}
|
|
|
|
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
|
{
|
|
checkpoint = cp;
|
|
}
|
|
}
|
|
|
|
originalRequests.Should().NotBeEmpty("the workflow should have created at least one external request");
|
|
checkpoint.Should().NotBeNull("a checkpoint should have been created");
|
|
}
|
|
|
|
// Act 2: Resume from the checkpoint.
|
|
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
|
.ResumeStreamingAsync(workflow, checkpoint!);
|
|
|
|
// Assert: The pending requests should be re-emitted.
|
|
List<ExternalRequest> reEmittedRequests = [];
|
|
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
|
|
|
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
|
{
|
|
if (evt is RequestInfoEvent requestInfo)
|
|
{
|
|
reEmittedRequests.Add(requestInfo.Request);
|
|
}
|
|
}
|
|
|
|
reEmittedRequests.Should().HaveCount(originalRequests.Count,
|
|
"all pending requests from the checkpoint should be re-emitted after resume");
|
|
reEmittedRequests.Select(r => r.RequestId)
|
|
.Should().BeEquivalentTo(originalRequests.Select(r => r.RequestId),
|
|
"the re-emitted request IDs should match the original pending request IDs");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that <see cref="RunStatus"/> transitions to <see cref="RunStatus.PendingRequests"/>
|
|
/// after resuming from a checkpoint with pending external requests (not stuck at NotStarted).
|
|
/// </summary>
|
|
[Theory]
|
|
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
|
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
|
internal async Task Checkpoint_Resume_WithPendingRequests_RunStatusIsPendingRequestsAsync(ExecutionEnvironment environment)
|
|
{
|
|
// Arrange
|
|
RequestPort<string, string> requestPort = RequestPort.Create<string, string>("TestPort");
|
|
ForwardMessageExecutor<string> processor = new("Processor");
|
|
|
|
Workflow workflow = new WorkflowBuilder(requestPort)
|
|
.AddEdge(requestPort, processor)
|
|
.Build();
|
|
|
|
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
|
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
|
|
|
// First run: collect a checkpoint with pending requests.
|
|
CheckpointInfo? checkpoint = null;
|
|
|
|
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
|
.RunStreamingAsync(workflow, "Hello"))
|
|
{
|
|
await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false))
|
|
{
|
|
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
|
{
|
|
checkpoint = cp;
|
|
}
|
|
}
|
|
|
|
checkpoint.Should().NotBeNull();
|
|
}
|
|
|
|
// Act: Resume from the checkpoint and consume events so the run loop processes.
|
|
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
|
.ResumeStreamingAsync(workflow, checkpoint!);
|
|
|
|
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
|
await foreach (WorkflowEvent _ in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
|
{
|
|
// Consume all events until the stream completes.
|
|
}
|
|
|
|
// Assert
|
|
RunStatus status = await resumed.GetStatusAsync();
|
|
status.Should().Be(RunStatus.PendingRequests,
|
|
"the resumed workflow should report PendingRequests after rehydration");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies the full roundtrip: resume from checkpoint, observe the re-emitted request,
|
|
/// send a response, and verify the workflow completes without duplicating the request.
|
|
/// </summary>
|
|
[Theory]
|
|
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
|
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
|
internal async Task Checkpoint_Resume_RespondToPendingRequest_CompletesWithoutDuplicateAsync(ExecutionEnvironment environment)
|
|
{
|
|
// Arrange
|
|
RequestPort<string, string> requestPort = RequestPort.Create<string, string>("TestPort");
|
|
ForwardMessageExecutor<string> processor = new("Processor");
|
|
|
|
Workflow workflow = new WorkflowBuilder(requestPort)
|
|
.AddEdge(requestPort, processor)
|
|
.Build();
|
|
|
|
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
|
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
|
|
|
// First run: collect checkpoint + pending request.
|
|
ExternalRequest? pendingRequest = null;
|
|
CheckpointInfo? checkpoint = null;
|
|
|
|
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
|
.RunStreamingAsync(workflow, "Hello"))
|
|
{
|
|
await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false))
|
|
{
|
|
if (evt is RequestInfoEvent requestInfo)
|
|
{
|
|
pendingRequest = requestInfo.Request;
|
|
}
|
|
|
|
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
|
{
|
|
checkpoint = cp;
|
|
}
|
|
}
|
|
|
|
pendingRequest.Should().NotBeNull();
|
|
checkpoint.Should().NotBeNull();
|
|
}
|
|
|
|
// Act: Resume and respond to the restored request.
|
|
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
|
.ResumeStreamingAsync(workflow, checkpoint!);
|
|
|
|
int requestEventCount = 0;
|
|
|
|
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
|
|
|
// Use blockOnPendingRequest: false for the first pass to see the re-emitted requests.
|
|
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
|
{
|
|
if (evt is RequestInfoEvent requestInfo)
|
|
{
|
|
requestEventCount++;
|
|
requestInfo.Request.RequestId.Should().Be(pendingRequest!.RequestId,
|
|
"the re-emitted request should match the original");
|
|
}
|
|
}
|
|
|
|
requestEventCount.Should().Be(1,
|
|
"the pending request should be emitted exactly once (no duplicates)");
|
|
|
|
// Assert intermediate state before responding: the run should be in PendingRequests
|
|
// and we should have observed the re-emitted request. If the first WatchStreamAsync
|
|
// didn't complete or yielded nothing, these assertions catch it with a clear message.
|
|
RunStatus statusBeforeResponse = await resumed.GetStatusAsync();
|
|
statusBeforeResponse.Should().Be(RunStatus.PendingRequests,
|
|
"the run should be in PendingRequests state before we send a response");
|
|
|
|
// Now send the response and verify the workflow processes it.
|
|
ExternalResponse response = pendingRequest!.CreateResponse("World");
|
|
await resumed.SendResponseAsync(response);
|
|
|
|
// Consume the resulting events to verify the workflow progresses without errors.
|
|
List<WorkflowEvent> postResponseEvents = [];
|
|
|
|
using CancellationTokenSource cts2 = new(TimeSpan.FromSeconds(10));
|
|
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts2.Token))
|
|
{
|
|
postResponseEvents.Add(evt);
|
|
}
|
|
|
|
postResponseEvents.Should().NotBeEmpty(
|
|
"the workflow should process the response and produce events");
|
|
postResponseEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
|
"no errors should occur when processing the restored request's response");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that restoring a live run to a checkpoint re-emits pending requests and allows
|
|
/// the workflow to continue from that restored point.
|
|
/// </summary>
|
|
[Theory]
|
|
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
|
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
|
internal async Task Checkpoint_Restore_WithPendingRequests_RepublishesRequestInfoEventsAsync(ExecutionEnvironment environment)
|
|
{
|
|
// Arrange
|
|
Workflow workflow = CreateSimpleRequestWorkflow();
|
|
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
|
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
|
|
|
await using StreamingRun run = await env.WithCheckpointing(checkpointManager)
|
|
.RunStreamingAsync(workflow, "Hello");
|
|
|
|
(ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run);
|
|
|
|
// Advance the run past the checkpoint so the restore has meaningful work to undo.
|
|
await run.SendResponseAsync(pendingRequest.CreateResponse("World"));
|
|
|
|
List<WorkflowEvent> firstCompletionEvents = await ReadToHaltAsync(run);
|
|
firstCompletionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
|
"the workflow should continue cleanly before we restore");
|
|
RunStatus statusAfterFirstResponse = await run.GetStatusAsync();
|
|
statusAfterFirstResponse.Should().Be(RunStatus.Idle,
|
|
"the workflow should finish processing the first response before we restore");
|
|
|
|
// Act
|
|
await run.RestoreCheckpointAsync(checkpoint);
|
|
|
|
// Assert
|
|
List<WorkflowEvent> restoredEvents = await ReadToHaltAsync(run);
|
|
ExternalRequest[] replayedRequests = [.. restoredEvents.OfType<RequestInfoEvent>().Select(evt => evt.Request)];
|
|
|
|
replayedRequests.Should().ContainSingle("runtime restore should re-emit the restored pending request");
|
|
replayedRequests[0].RequestId.Should().Be(pendingRequest.RequestId,
|
|
"the replayed request should match the request captured at the checkpoint");
|
|
|
|
await run.SendResponseAsync(replayedRequests[0].CreateResponse("Again"));
|
|
|
|
List<WorkflowEvent> secondCompletionEvents = await ReadToHaltAsync(run);
|
|
secondCompletionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
|
"runtime restore replay should not introduce workflow errors");
|
|
RunStatus statusAfterRestoreResponse = await run.GetStatusAsync();
|
|
statusAfterRestoreResponse.Should().Be(RunStatus.Idle,
|
|
"the workflow should be able to continue after the runtime restore replay");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that restoring a live run clears any queued external responses from the
|
|
/// superseded timeline before importing checkpoint state.
|
|
/// </summary>
|
|
[Fact]
|
|
internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImportAsync()
|
|
{
|
|
Workflow workflow = CreateSimpleRequestWorkflow();
|
|
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
|
InProcessExecutionEnvironment env = ExecutionEnvironment.InProcess_Lockstep.ToWorkflowExecutionEnvironment();
|
|
|
|
await using StreamingRun run = await env.WithCheckpointing(checkpointManager)
|
|
.RunStreamingAsync(workflow, "Hello");
|
|
|
|
(ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run);
|
|
|
|
await run.SendResponseAsync(pendingRequest.CreateResponse("World"));
|
|
await run.RestoreCheckpointAsync(checkpoint);
|
|
|
|
List<WorkflowEvent> restoredEvents = await ReadToHaltAsync(run);
|
|
ExternalRequest replayedRequest = restoredEvents.OfType<RequestInfoEvent>()
|
|
.Select(evt => evt.Request)
|
|
.Should()
|
|
.ContainSingle("the restored run should still be waiting for the checkpointed request")
|
|
.Subject;
|
|
|
|
restoredEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
|
"a queued response from the superseded timeline should not be processed after restore");
|
|
RunStatus statusAfterRestore = await run.GetStatusAsync();
|
|
statusAfterRestore.Should().Be(RunStatus.PendingRequests,
|
|
"the restored run should remain pending until a post-restore response is sent");
|
|
|
|
await run.SendResponseAsync(replayedRequest.CreateResponse("Again"));
|
|
|
|
List<WorkflowEvent> completionEvents = await ReadToHaltAsync(run);
|
|
completionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
|
"the restored request should complete cleanly once a new response is provided");
|
|
RunStatus finalStatus = await run.GetStatusAsync();
|
|
finalStatus.Should().Be(RunStatus.Idle,
|
|
"the workflow should finish once the replayed request receives a fresh response");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow.
|
|
/// </summary>
|
|
[Theory]
|
|
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
|
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
|
internal async Task Checkpoint_Resume_SubworkflowWithPendingRequests_RepublishesQualifiedRequestInfoEventsAsync(ExecutionEnvironment environment)
|
|
{
|
|
// Arrange
|
|
Workflow workflow = CreateCheckpointedSubworkflowRequestWorkflow();
|
|
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
|
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
|
|
|
ExternalRequest pendingRequest;
|
|
CheckpointInfo checkpoint;
|
|
|
|
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
|
.RunStreamingAsync(workflow, "Hello"))
|
|
{
|
|
(pendingRequest, checkpoint) = await CapturePendingRequestAndCheckpointAsync(firstRun);
|
|
}
|
|
|
|
// Act
|
|
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
|
.ResumeStreamingAsync(workflow, checkpoint);
|
|
|
|
// Assert
|
|
List<WorkflowEvent> resumedEvents = await ReadToHaltAsync(resumed);
|
|
ExternalRequest[] replayedRequests = [.. resumedEvents.OfType<RequestInfoEvent>().Select(evt => evt.Request)];
|
|
|
|
replayedRequests.Should().ContainSingle("the resumed parent workflow should surface the subworkflow request once");
|
|
replayedRequests[0].RequestId.Should().Be(pendingRequest.RequestId,
|
|
"the replayed subworkflow request should match the checkpointed request");
|
|
replayedRequests[0].PortInfo.PortId.Should().Be(pendingRequest.PortInfo.PortId,
|
|
"the replayed request should remain qualified through the subworkflow boundary");
|
|
|
|
await resumed.SendResponseAsync(replayedRequests[0].CreateResponse("World"));
|
|
|
|
List<WorkflowEvent> completionEvents = await ReadToHaltAsync(resumed);
|
|
completionEvents.OfType<RequestInfoEvent>().Should().BeEmpty(
|
|
"the resumed subworkflow request should not be replayed twice");
|
|
completionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
|
"subworkflow replay should not introduce workflow errors");
|
|
RunStatus statusAfterSubworkflowResponse = await resumed.GetStatusAsync();
|
|
statusAfterSubworkflowResponse.Should().Be(RunStatus.Idle,
|
|
"the resumed subworkflow should continue after responding to the replayed request");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that when <c>republishPendingEvents</c> is <see langword="false"/>,
|
|
/// no <see cref="RequestInfoEvent"/> is re-emitted after resuming from a checkpoint.
|
|
/// </summary>
|
|
[Theory]
|
|
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
|
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
|
internal async Task Checkpoint_Resume_WithRepublishDisabled_DoesNotEmitRequestInfoEventsAsync(ExecutionEnvironment environment)
|
|
{
|
|
// Arrange
|
|
RequestPort<string, string> requestPort = RequestPort.Create<string, string>("TestPort");
|
|
ForwardMessageExecutor<string> processor = new("Processor");
|
|
|
|
Workflow workflow = new WorkflowBuilder(requestPort)
|
|
.AddEdge(requestPort, processor)
|
|
.Build();
|
|
|
|
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
|
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
|
|
|
// First run: collect a checkpoint with pending requests.
|
|
CheckpointInfo? checkpoint = null;
|
|
|
|
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
|
.RunStreamingAsync(workflow, "Hello"))
|
|
{
|
|
await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false))
|
|
{
|
|
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
|
{
|
|
checkpoint = cp;
|
|
}
|
|
}
|
|
|
|
checkpoint.Should().NotBeNull();
|
|
}
|
|
|
|
// Act: Resume with republishPendingEvents: false via the internal API.
|
|
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
|
.ResumeStreamingInternalAsync(workflow, checkpoint!, republishPendingEvents: false);
|
|
|
|
// Assert: No RequestInfoEvent should appear in the event stream.
|
|
int requestEventCount = 0;
|
|
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
|
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
|
{
|
|
if (evt is RequestInfoEvent)
|
|
{
|
|
requestEventCount++;
|
|
}
|
|
}
|
|
|
|
requestEventCount.Should().Be(0,
|
|
"no RequestInfoEvent should be emitted when republishPendingEvents is false");
|
|
}
|
|
|
|
private static Workflow CreateSimpleRequestWorkflow(
|
|
string requestPortId = "TestPort",
|
|
string processorId = "Processor")
|
|
{
|
|
RequestPort<string, string> requestPort = RequestPort.Create<string, string>(requestPortId);
|
|
ForwardMessageExecutor<string> processor = new(processorId);
|
|
|
|
return new WorkflowBuilder(requestPort)
|
|
.AddEdge(requestPort, processor)
|
|
.Build();
|
|
}
|
|
|
|
private static Workflow CreateCheckpointedSubworkflowRequestWorkflow()
|
|
{
|
|
ExecutorBinding subworkflow = CreateSimpleRequestWorkflow(
|
|
requestPortId: "InnerTestPort",
|
|
processorId: "InnerProcessor")
|
|
.BindAsExecutor("Subworkflow");
|
|
|
|
return new WorkflowBuilder(subworkflow)
|
|
.AddExternalRequest<string, string>(subworkflow, id: "ForwardedSubworkflowRequest")
|
|
.Build();
|
|
}
|
|
|
|
private static async ValueTask<(ExternalRequest PendingRequest, CheckpointInfo Checkpoint)> CapturePendingRequestAndCheckpointAsync(StreamingRun run)
|
|
{
|
|
ExternalRequest? pendingRequest = null;
|
|
CheckpointInfo? checkpoint = null;
|
|
|
|
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false))
|
|
{
|
|
if (evt is RequestInfoEvent requestInfo)
|
|
{
|
|
pendingRequest ??= requestInfo.Request;
|
|
}
|
|
|
|
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
|
{
|
|
checkpoint = cp;
|
|
}
|
|
}
|
|
|
|
pendingRequest.Should().NotBeNull("the workflow should have emitted a pending request");
|
|
checkpoint.Should().NotBeNull("the workflow should have produced a checkpoint");
|
|
return (pendingRequest!, checkpoint!);
|
|
}
|
|
|
|
private static async ValueTask<List<WorkflowEvent>> ReadToHaltAsync(StreamingRun run)
|
|
{
|
|
List<WorkflowEvent> events = [];
|
|
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
|
|
|
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
|
{
|
|
events.Add(evt);
|
|
}
|
|
|
|
return events;
|
|
}
|
|
}
|