mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
.NET: Fix RequestInfoEvent lost when resuming workflow from checkpoint (#4955)
* Fix RequestInfoEvent lost when resuming workflow from checkpoint * Fix streaming run double disposal in tests and lockstep republishing before Started event is emitted. * Fix bug to remove messages after sending to avoid losing messages on send failure. * Fix declarative test harness
This commit is contained in:
committed by
GitHub
Unverified
parent
25696a72dc
commit
38de991481
@@ -21,6 +21,15 @@ internal interface ICheckpointingHandle
|
||||
/// <summary>
|
||||
/// Restores the system state from the specified checkpoint asynchronously.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// This contract is used by live runtime restore paths. Implementations may re-emit pending
|
||||
/// external request events as part of the restore once the active event stream is ready to
|
||||
/// observe them.
|
||||
///
|
||||
/// Initial resume paths that create a new event stream should restore state first and defer
|
||||
/// any replay until after the subscriber is attached, rather than calling this contract
|
||||
/// directly before the stream is ready.
|
||||
/// </remarks>
|
||||
/// <param name="checkpointInfo">The checkpoint information that identifies the state to restore. Cannot be null.</param>
|
||||
/// <param name="cancellationToken">A cancellation token that can be used to cancel the restore operation.</param>
|
||||
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous restore operation.</returns>
|
||||
|
||||
@@ -36,9 +36,10 @@ internal sealed class AsyncRunHandle : ICheckpointingHandle, IAsyncDisposable
|
||||
|
||||
this._eventStream.Start();
|
||||
|
||||
// If there are already unprocessed messages (e.g., from a checkpoint restore that happened
|
||||
// before this handle was created), signal the run loop to start processing them
|
||||
if (stepRunner.HasUnprocessedMessages)
|
||||
// If there are already unprocessed messages or unserviced requests (e.g., from a
|
||||
// checkpoint restore that happened before this handle was created), signal the run
|
||||
// loop to start processing them
|
||||
if (stepRunner.HasUnprocessedMessages || stepRunner.HasUnservicedRequests)
|
||||
{
|
||||
this.SignalInputToRunLoop();
|
||||
}
|
||||
@@ -192,13 +193,17 @@ internal sealed class AsyncRunHandle : ICheckpointingHandle, IAsyncDisposable
|
||||
{
|
||||
streamingEventStream.ClearBufferedEvents();
|
||||
}
|
||||
else if (this._eventStream is LockstepRunEventStream lockstepEventStream)
|
||||
{
|
||||
lockstepEventStream.ClearBufferedEvents();
|
||||
}
|
||||
|
||||
// Restore the workflow state - this will republish unserviced requests as new events
|
||||
// Restore the workflow state through the live runtime-restore path.
|
||||
// This can re-emit pending requests into the already-active event stream.
|
||||
await this._checkpointingHandle.RestoreCheckpointAsync(checkpointInfo, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// After restore, signal the run loop to process any restored messages
|
||||
// This is necessary because ClearBufferedEvents() doesn't signal, and the restored
|
||||
// queued messages won't automatically wake up the run loop
|
||||
// After restore, signal the run loop to process any restored messages. Initial resume
|
||||
// paths handle this separately when they create the event stream after restoring state.
|
||||
this.SignalInputToRunLoop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,14 @@ internal interface ISuperStepRunner
|
||||
|
||||
ConcurrentEventSink OutgoingEvents { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Re-emits <see cref="RequestInfoEvent"/>s for any pending external requests.
|
||||
/// Called by event streams after subscribing to <see cref="OutgoingEvents"/> so that
|
||||
/// requests restored from a checkpoint are observable even when the restore happened
|
||||
/// before the subscription was active.
|
||||
/// </summary>
|
||||
ValueTask RepublishPendingEventsAsync(CancellationToken cancellationToken = default);
|
||||
|
||||
ValueTask<bool> RunSuperStepAsync(CancellationToken cancellationToken);
|
||||
|
||||
// This cannot be cancelled
|
||||
|
||||
@@ -15,6 +15,7 @@ internal sealed class LockstepRunEventStream : IRunEventStream
|
||||
{
|
||||
private readonly CancellationTokenSource _stopCancellation = new();
|
||||
private readonly InputWaiter _inputWaiter = new();
|
||||
private ConcurrentQueue<WorkflowEvent> _eventSink = new();
|
||||
private int _isDisposed;
|
||||
|
||||
private readonly ISuperStepRunner _stepRunner;
|
||||
@@ -35,6 +36,8 @@ internal sealed class LockstepRunEventStream : IRunEventStream
|
||||
// doesn't leak into caller code via AsyncLocal.
|
||||
Activity? previousActivity = Activity.Current;
|
||||
|
||||
this._stepRunner.OutgoingEvents.EventRaised += this.OnWorkflowEventAsync;
|
||||
|
||||
this._sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();
|
||||
this._sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
|
||||
.SetTag(Tags.SessionId, this._stepRunner.SessionId);
|
||||
@@ -56,10 +59,6 @@ internal sealed class LockstepRunEventStream : IRunEventStream
|
||||
|
||||
using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken);
|
||||
|
||||
ConcurrentQueue<WorkflowEvent> eventSink = [];
|
||||
|
||||
this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync;
|
||||
|
||||
// Re-establish session as parent so the run activity nests correctly.
|
||||
Activity.Current = this._sessionActivity;
|
||||
|
||||
@@ -73,7 +72,31 @@ internal sealed class LockstepRunEventStream : IRunEventStream
|
||||
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));
|
||||
|
||||
// Emit WorkflowStartedEvent to the event stream for consumers
|
||||
eventSink.Enqueue(new WorkflowStartedEvent());
|
||||
this._eventSink.Enqueue(new WorkflowStartedEvent());
|
||||
|
||||
// Re-emit any pending external requests that were restored from a checkpoint
|
||||
// before this subscription was active. For non-resume starts this is a no-op.
|
||||
// This runs after WorkflowStartedEvent so consumers always see the started event first.
|
||||
await this._stepRunner.RepublishPendingEventsAsync(linkedSource.Token).ConfigureAwait(false);
|
||||
|
||||
// When resuming from a checkpoint with only pending requests (no queued messages),
|
||||
// the inner processing loop won't execute, so we must drain events now.
|
||||
// For normal starts this is a no-op since the inner loop handles the drain.
|
||||
if (!this._stepRunner.HasUnprocessedMessages)
|
||||
{
|
||||
var (drainedEvents, shouldHalt) = this.DrainAndFilterEvents();
|
||||
foreach (WorkflowEvent raisedEvent in drainedEvents)
|
||||
{
|
||||
yield return raisedEvent;
|
||||
}
|
||||
|
||||
if (shouldHalt)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle;
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
@@ -107,26 +130,19 @@ internal sealed class LockstepRunEventStream : IRunEventStream
|
||||
yield break; // Exit if cancellation is requested
|
||||
}
|
||||
|
||||
bool hadRequestHaltEvent = false;
|
||||
foreach (WorkflowEvent raisedEvent in Interlocked.Exchange(ref eventSink, []))
|
||||
var (drainedEvents, shouldHalt) = this.DrainAndFilterEvents();
|
||||
|
||||
foreach (WorkflowEvent raisedEvent in drainedEvents)
|
||||
{
|
||||
if (linkedSource.Token.IsCancellationRequested)
|
||||
{
|
||||
yield break; // Exit if cancellation is requested
|
||||
}
|
||||
|
||||
// TODO: Do we actually want to interpret this as a termination request?
|
||||
if (raisedEvent is RequestHaltEvent)
|
||||
{
|
||||
hadRequestHaltEvent = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
yield return raisedEvent;
|
||||
}
|
||||
yield return raisedEvent;
|
||||
}
|
||||
|
||||
if (hadRequestHaltEvent || linkedSource.Token.IsCancellationRequested)
|
||||
if (shouldHalt || linkedSource.Token.IsCancellationRequested)
|
||||
{
|
||||
// If we had a completion event, we are done.
|
||||
yield break;
|
||||
@@ -151,25 +167,23 @@ internal sealed class LockstepRunEventStream : IRunEventStream
|
||||
finally
|
||||
{
|
||||
this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle;
|
||||
this._stepRunner.OutgoingEvents.EventRaised -= OnWorkflowEventAsync;
|
||||
|
||||
// Explicitly dispose the Activity so Activity.Stop fires deterministically,
|
||||
// regardless of how the async iterator enumerator is disposed.
|
||||
runActivity?.Dispose();
|
||||
}
|
||||
|
||||
ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e)
|
||||
{
|
||||
eventSink.Enqueue(e);
|
||||
return default;
|
||||
}
|
||||
|
||||
// If we are Idle or Ended, we should break out of the loop
|
||||
// If we are PendingRequests and not blocking on pending requests, we should break out of the loop
|
||||
// If cancellation is requested, we should break out of the loop
|
||||
bool ShouldBreak() => this.RunStatus is RunStatus.Idle or RunStatus.Ended ||
|
||||
(this.RunStatus == RunStatus.PendingRequests && !blockOnPendingRequest) ||
|
||||
linkedSource.Token.IsCancellationRequested;
|
||||
(this.RunStatus == RunStatus.PendingRequests && !blockOnPendingRequest) ||
|
||||
linkedSource.Token.IsCancellationRequested;
|
||||
}
|
||||
|
||||
internal void ClearBufferedEvents()
|
||||
{
|
||||
Interlocked.Exchange(ref this._eventSink, new ConcurrentQueue<WorkflowEvent>());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -192,6 +206,7 @@ internal sealed class LockstepRunEventStream : IRunEventStream
|
||||
if (Interlocked.Exchange(ref this._isDisposed, 1) == 0)
|
||||
{
|
||||
this._stopCancellation.Cancel();
|
||||
this._stepRunner.OutgoingEvents.EventRaised -= this.OnWorkflowEventAsync;
|
||||
|
||||
// Stop the session activity
|
||||
if (this._sessionActivity is not null)
|
||||
@@ -207,4 +222,32 @@ internal sealed class LockstepRunEventStream : IRunEventStream
|
||||
|
||||
return default;
|
||||
}
|
||||
|
||||
private ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e)
|
||||
{
|
||||
this._eventSink.Enqueue(e);
|
||||
return default;
|
||||
}
|
||||
|
||||
// Atomically drains the event sink and separates workflow events from halt signals.
|
||||
// Used by both the early-drain (resume with pending requests only) and
|
||||
// the inner superstep drain to keep halt-detection logic in one place.
|
||||
private (List<WorkflowEvent> Events, bool ShouldHalt) DrainAndFilterEvents()
|
||||
{
|
||||
List<WorkflowEvent> events = [];
|
||||
bool shouldHalt = false;
|
||||
foreach (WorkflowEvent e in Interlocked.Exchange(ref this._eventSink, new ConcurrentQueue<WorkflowEvent>()))
|
||||
{
|
||||
if (e is RequestHaltEvent)
|
||||
{
|
||||
shouldHalt = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
events.Add(e);
|
||||
}
|
||||
}
|
||||
|
||||
return (events, shouldHalt);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,10 @@ internal sealed class StreamingRunEventStream : IRunEventStream
|
||||
// Subscribe to events - they will flow directly to the channel as they're raised
|
||||
this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync;
|
||||
|
||||
// Re-emit any pending external requests that were restored from a checkpoint
|
||||
// before this subscription was active. For non-resume starts this is a no-op.
|
||||
await this._stepRunner.RepublishPendingEventsAsync(linkedSource.Token).ConfigureAwait(false);
|
||||
|
||||
// Start the session-level activity that spans the entire run loop lifetime.
|
||||
// Individual run-stage activities are nested within this session activity.
|
||||
Activity? sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();
|
||||
|
||||
@@ -50,10 +50,13 @@ public sealed class InProcessExecutionEnvironment : IWorkflowExecutionEnvironmen
|
||||
return runner.BeginStreamAsync(this.ExecutionMode, cancellationToken);
|
||||
}
|
||||
|
||||
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
|
||||
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken = default)
|
||||
=> this.ResumeRunAsync(workflow, fromCheckpoint, knownValidInputTypes, republishPendingEvents: true, cancellationToken);
|
||||
|
||||
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, bool republishPendingEvents, CancellationToken cancellationToken = default)
|
||||
{
|
||||
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, this.CheckpointManager, fromCheckpoint.SessionId, this.EnableConcurrentRuns, knownValidInputTypes);
|
||||
return runner.ResumeStreamAsync(this.ExecutionMode, fromCheckpoint, cancellationToken);
|
||||
return runner.ResumeStreamAsync(this.ExecutionMode, fromCheckpoint, republishPendingEvents, cancellationToken);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
@@ -104,6 +107,32 @@ public sealed class InProcessExecutionEnvironment : IWorkflowExecutionEnvironmen
|
||||
return new(runHandle);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Resumes a streaming workflow run from a checkpoint with control over whether
|
||||
/// pending request events are republished through the event stream.
|
||||
/// </summary>
|
||||
/// <param name="workflow">The workflow to resume.</param>
|
||||
/// <param name="fromCheckpoint">The checkpoint to resume from.</param>
|
||||
/// <param name="republishPendingEvents">
|
||||
/// When <see langword="true"/>, any pending request events are republished through the event
|
||||
/// stream after subscribing. When <see langword="false"/>, the caller is responsible for
|
||||
/// handling pending requests (e.g., <see cref="WorkflowSession"/> already sends responses).
|
||||
/// </param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
internal async ValueTask<StreamingRun> ResumeStreamingInternalAsync(
|
||||
Workflow workflow,
|
||||
CheckpointInfo fromCheckpoint,
|
||||
bool republishPendingEvents,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
this.VerifyCheckpointingConfigured();
|
||||
|
||||
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, fromCheckpoint, [], republishPendingEvents, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return new(runHandle);
|
||||
}
|
||||
|
||||
private async ValueTask<AsyncRunHandle> BeginRunHandlingChatProtocolAsync<TInput>(Workflow workflow,
|
||||
TInput input,
|
||||
string? sessionId = null,
|
||||
|
||||
@@ -71,6 +71,28 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
|
||||
/// <inheritdoc cref="ISuperStepRunner.StartExecutorId"/>
|
||||
public string StartExecutorId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Gating flag for deferred event republishing after checkpoint restore.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Written with <see cref="Volatile.Write(ref int, int)"/> in <see cref="ResumeStreamAsync(ExecutionMode, CheckpointInfo, bool, CancellationToken)"/>
|
||||
/// and consumed atomically with <see cref="Interlocked.Exchange(ref int, int)"/> in
|
||||
/// <see cref="ISuperStepRunner.RepublishPendingEventsAsync"/>. The write does not need a full
|
||||
/// memory barrier because it is sequenced before the <see cref="AsyncRunHandle"/> constructor
|
||||
/// by the <see langword="await"/> in <see cref="ResumeStreamAsync(ExecutionMode, CheckpointInfo, bool, CancellationToken)"/>. The constructor is the
|
||||
/// only code path that triggers consumption (via the event stream's subscribe and republish flow).
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Note: <see cref="AsyncRunHandle"/> also reads <see cref="ISuperStepRunner.HasUnservicedRequests"/>
|
||||
/// in its constructor to signal the run loop, but that property reads from
|
||||
/// <see cref="InProcessRunnerContext"/>'s request dictionary (restored during
|
||||
/// <see cref="RestoreCheckpointCoreAsync"/>), not from this flag. The two are independent:
|
||||
/// <c>HasUnservicedRequests</c> triggers the run loop; <c>_needsRepublish</c> triggers event emission.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
private int _needsRepublish;
|
||||
|
||||
/// <inheritdoc cref="ISuperStepRunner.TelemetryContext"/>
|
||||
public WorkflowTelemetryContext TelemetryContext => this.Workflow.TelemetryContext;
|
||||
|
||||
@@ -145,7 +167,10 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
|
||||
return new(new AsyncRunHandle(this, this, mode));
|
||||
}
|
||||
|
||||
public async ValueTask<AsyncRunHandle> ResumeStreamAsync(ExecutionMode mode, CheckpointInfo fromCheckpoint, CancellationToken cancellationToken = default)
|
||||
public ValueTask<AsyncRunHandle> ResumeStreamAsync(ExecutionMode mode, CheckpointInfo fromCheckpoint, CancellationToken cancellationToken = default)
|
||||
=> this.ResumeStreamAsync(mode, fromCheckpoint, republishPendingEvents: true, cancellationToken);
|
||||
|
||||
public async ValueTask<AsyncRunHandle> ResumeStreamAsync(ExecutionMode mode, CheckpointInfo fromCheckpoint, bool republishPendingEvents, CancellationToken cancellationToken = default)
|
||||
{
|
||||
this.RunContext.CheckEnded();
|
||||
Throw.IfNull(fromCheckpoint);
|
||||
@@ -154,7 +179,18 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
|
||||
throw new InvalidOperationException("This runner was not configured with a CheckpointManager, so it cannot restore checkpoints.");
|
||||
}
|
||||
|
||||
await this.RestoreCheckpointAsync(fromCheckpoint, cancellationToken).ConfigureAwait(false);
|
||||
// Restore checkpoint state without republishing pending request events.
|
||||
// The event stream will republish them after subscribing so that events
|
||||
// are never lost to an absent subscriber.
|
||||
await this.RestoreCheckpointCoreAsync(fromCheckpoint, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (republishPendingEvents)
|
||||
{
|
||||
// Signal the event stream to republish pending requests after subscribing.
|
||||
// This is consumed atomically by RepublishPendingEventsAsync.
|
||||
Volatile.Write(ref this._needsRepublish, 1);
|
||||
}
|
||||
|
||||
return new AsyncRunHandle(this, this, mode);
|
||||
}
|
||||
|
||||
@@ -163,6 +199,16 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
|
||||
bool ISuperStepRunner.TryGetResponsePortExecutorId(string portId, out string? executorId)
|
||||
=> this.RunContext.TryGetResponsePortExecutorId(portId, out executorId);
|
||||
|
||||
ValueTask ISuperStepRunner.RepublishPendingEventsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (Interlocked.Exchange(ref this._needsRepublish, 0) != 0)
|
||||
{
|
||||
return this.RunContext.RepublishUnservicedRequestsAsync(cancellationToken);
|
||||
}
|
||||
|
||||
return default;
|
||||
}
|
||||
|
||||
public bool IsCheckpointingEnabled => this.RunContext.IsCheckpointingEnabled;
|
||||
|
||||
public IReadOnlyList<CheckpointInfo> Checkpoints => this._checkpoints;
|
||||
@@ -310,7 +356,31 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
|
||||
this._checkpoints.Add(this._lastCheckpointInfo);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Restores checkpoint state and re-emits any pending external request events.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// This is the <see cref="ICheckpointingHandle"/> implementation used for runtime restores
|
||||
/// where the event stream subscription is already active. For initial resumes,
|
||||
/// <see cref="ResumeStreamAsync(ExecutionMode, CheckpointInfo, CancellationToken)"/> calls
|
||||
/// <see cref="RestoreCheckpointCoreAsync"/> directly and defers republishing to the event stream.
|
||||
/// </remarks>
|
||||
public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await this.RestoreCheckpointCoreAsync(checkpointInfo, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Republish pending request events. This is safe for runtime restores where
|
||||
// the event stream is already subscribed. For initial resumes the event stream
|
||||
// handles republishing itself, so ResumeStreamAsync calls RestoreCheckpointCoreAsync directly.
|
||||
await this.RunContext.RepublishUnservicedRequestsAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Restores checkpoint state (queued messages, executor state, edge state, etc.)
|
||||
/// without republishing pending request events. The caller is responsible for
|
||||
/// ensuring events are republished after an event subscriber is attached.
|
||||
/// </summary>
|
||||
private async ValueTask RestoreCheckpointCoreAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
|
||||
{
|
||||
this.RunContext.CheckEnded();
|
||||
Throw.IfNull(checkpointInfo);
|
||||
@@ -335,11 +405,9 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
|
||||
await this.RunContext.ImportStateAsync(checkpoint).ConfigureAwait(false);
|
||||
|
||||
Task executorNotifyTask = this.RunContext.NotifyCheckpointLoadedAsync(cancellationToken);
|
||||
ValueTask republishRequestsTask = this.RunContext.RepublishUnservicedRequestsAsync(cancellationToken);
|
||||
|
||||
await this.EdgeMap.ImportStateAsync(checkpoint).ConfigureAwait(false);
|
||||
await Task.WhenAll(executorNotifyTask,
|
||||
republishRequestsTask.AsTask(),
|
||||
restoreCheckpointIndexTask.AsTask()).ConfigureAwait(false);
|
||||
|
||||
this._lastCheckpointInfo = checkpointInfo;
|
||||
|
||||
@@ -14,6 +14,7 @@ internal sealed class RequestPortOptions;
|
||||
|
||||
internal sealed class RequestInfoExecutor : Executor
|
||||
{
|
||||
private const string WrappedRequestsStateKey = nameof(WrappedRequestsStateKey);
|
||||
private readonly Dictionary<string, ExternalRequest> _wrappedRequests = [];
|
||||
private RequestPort Port { get; }
|
||||
private IExternalRequestSink? RequestSink { get; set; }
|
||||
@@ -124,22 +125,46 @@ internal sealed class RequestInfoExecutor : Executor
|
||||
return null;
|
||||
}
|
||||
|
||||
if (this._allowWrapped && this._wrappedRequests.TryGetValue(message.RequestId, out ExternalRequest? originalRequest))
|
||||
{
|
||||
await context.SendMessageAsync(originalRequest.RewrapResponse(message), cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
await context.SendMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (!message.Data.IsType(this.Port.Response, out object? data))
|
||||
{
|
||||
throw this.Port.CreateExceptionForType(message);
|
||||
}
|
||||
|
||||
await context.SendMessageAsync(data, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
if (this._allowWrapped && this._wrappedRequests.TryGetValue(message.RequestId, out ExternalRequest? originalRequest))
|
||||
{
|
||||
await context.SendMessageAsync(originalRequest.RewrapResponse(message), cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
this._wrappedRequests.Remove(message.RequestId);
|
||||
}
|
||||
else
|
||||
{
|
||||
await context.SendMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
await context.SendMessageAsync(data, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
protected internal override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await context.QueueStateUpdateAsync(WrappedRequestsStateKey,
|
||||
new Dictionary<string, ExternalRequest>(this._wrappedRequests, StringComparer.Ordinal),
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
await base.OnCheckpointingAsync(context, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
protected internal override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await base.OnCheckpointRestoredAsync(context, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
this._wrappedRequests.Clear();
|
||||
|
||||
Dictionary<string, ExternalRequest> wrappedRequests =
|
||||
await context.ReadStateAsync<Dictionary<string, ExternalRequest>>(WrappedRequestsStateKey, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false) ?? [];
|
||||
|
||||
foreach (KeyValuePair<string, ExternalRequest> wrappedRequest in wrappedRequests)
|
||||
{
|
||||
this._wrappedRequests[wrappedRequest.Key] = wrappedRequest.Value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
@@ -23,6 +24,7 @@ internal class WorkflowHostExecutor : Executor, IAsyncDisposable
|
||||
private InProcessRunner? _activeRunner;
|
||||
private InMemoryCheckpointManager? _checkpointManager;
|
||||
private readonly ExecutorOptions _options;
|
||||
private readonly ConcurrentDictionary<string, RequestPortInfo> _pendingResponsePorts = new(StringComparer.Ordinal);
|
||||
|
||||
private ISuperStepJoinContext? _joinContext;
|
||||
private string? _joinId;
|
||||
@@ -163,6 +165,11 @@ internal class WorkflowHostExecutor : Executor, IAsyncDisposable
|
||||
|
||||
private ExternalResponse? CheckAndUnqualifyResponse([DisallowNull] ExternalResponse response)
|
||||
{
|
||||
if (this._pendingResponsePorts.TryRemove(response.RequestId, out RequestPortInfo? originalPort))
|
||||
{
|
||||
return response with { PortInfo = originalPort };
|
||||
}
|
||||
|
||||
if (!Throw.IfNull(response).PortInfo.PortId.StartsWith($"{this.Id}.", StringComparison.Ordinal))
|
||||
{
|
||||
return null;
|
||||
@@ -193,6 +200,7 @@ internal class WorkflowHostExecutor : Executor, IAsyncDisposable
|
||||
break;
|
||||
case RequestInfoEvent requestInfoEvt:
|
||||
ExternalRequest request = requestInfoEvt.Request;
|
||||
this._pendingResponsePorts[request.RequestId] = request.PortInfo;
|
||||
resultTask = this._joinContext?.SendMessageAsync(this.Id, this.QualifyRequestPortId(request)).AsTask() ?? Task.CompletedTask;
|
||||
break;
|
||||
case WorkflowErrorEvent errorEvent:
|
||||
@@ -246,9 +254,13 @@ internal class WorkflowHostExecutor : Executor, IAsyncDisposable
|
||||
}
|
||||
|
||||
private const string CheckpointManagerStateKey = nameof(CheckpointManager);
|
||||
private const string PendingResponsePortsStateKey = nameof(PendingResponsePortsStateKey);
|
||||
protected internal override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await context.QueueStateUpdateAsync(CheckpointManagerStateKey, this._checkpointManager, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
await context.QueueStateUpdateAsync(PendingResponsePortsStateKey,
|
||||
new Dictionary<string, RequestPortInfo>(this._pendingResponsePorts, StringComparer.Ordinal),
|
||||
cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await base.OnCheckpointingAsync(context, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
@@ -269,6 +281,15 @@ internal class WorkflowHostExecutor : Executor, IAsyncDisposable
|
||||
await this.ResetAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
this._pendingResponsePorts.Clear();
|
||||
Dictionary<string, RequestPortInfo> pendingResponsePorts =
|
||||
await context.ReadStateAsync<Dictionary<string, RequestPortInfo>>(PendingResponsePortsStateKey, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false) ?? [];
|
||||
foreach (KeyValuePair<string, RequestPortInfo> pendingResponsePort in pendingResponsePorts)
|
||||
{
|
||||
this._pendingResponsePorts[pendingResponsePort.Key] = pendingResponsePort.Value;
|
||||
}
|
||||
|
||||
await this.EnsureRunSendMessageAsync(resume: true, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -280,6 +301,8 @@ internal class WorkflowHostExecutor : Executor, IAsyncDisposable
|
||||
this._run = null;
|
||||
}
|
||||
|
||||
this._pendingResponsePorts.Clear();
|
||||
|
||||
if (this._activeRunner != null)
|
||||
{
|
||||
this._activeRunner.OutgoingEvents.EventRaised -= this.ForwardWorkflowEventAsync;
|
||||
|
||||
@@ -19,7 +19,14 @@ namespace Microsoft.Agents.AI.Workflows;
|
||||
internal sealed class WorkflowSession : AgentSession
|
||||
{
|
||||
private readonly Workflow _workflow;
|
||||
private readonly IWorkflowExecutionEnvironment _executionEnvironment;
|
||||
|
||||
/// <summary>
|
||||
/// The execution environment for this session. Concrete type is required because
|
||||
/// <see cref="CreateOrResumeRunAsync"/> uses the internal
|
||||
/// <see cref="InProcessExecutionEnvironment.ResumeStreamingInternalAsync"/> API.
|
||||
/// </summary>
|
||||
private readonly InProcessExecutionEnvironment _inProcEnvironment;
|
||||
|
||||
private readonly bool _includeExceptionDetails;
|
||||
private readonly bool _includeWorkflowOutputsInResponse;
|
||||
|
||||
@@ -63,17 +70,22 @@ internal sealed class WorkflowSession : AgentSession
|
||||
public WorkflowSession(Workflow workflow, string sessionId, IWorkflowExecutionEnvironment executionEnvironment, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false)
|
||||
{
|
||||
this._workflow = Throw.IfNull(workflow);
|
||||
this._executionEnvironment = Throw.IfNull(executionEnvironment);
|
||||
this._includeExceptionDetails = includeExceptionDetails;
|
||||
this._includeWorkflowOutputsInResponse = includeWorkflowOutputsInResponse;
|
||||
|
||||
if (VerifyCheckpointingConfiguration(executionEnvironment, out InProcessExecutionEnvironment? inProcEnv))
|
||||
IWorkflowExecutionEnvironment env = Throw.IfNull(executionEnvironment);
|
||||
if (VerifyCheckpointingConfiguration(env, out InProcessExecutionEnvironment? inProcEnv))
|
||||
{
|
||||
// We have an InProcessExecutionEnvironment which is not configured for checkpointing. Ensure it has an externalizable checkpoint manager,
|
||||
// since we are responsible for maintaining the state.
|
||||
this._executionEnvironment = inProcEnv.WithCheckpointing(this.EnsureExternalizedInMemoryCheckpointing());
|
||||
env = inProcEnv.WithCheckpointing(this.EnsureExternalizedInMemoryCheckpointing());
|
||||
}
|
||||
|
||||
this._inProcEnvironment = env as InProcessExecutionEnvironment
|
||||
?? throw new InvalidOperationException(
|
||||
$"WorkflowSession requires an {nameof(InProcessExecutionEnvironment)}, " +
|
||||
$"but received {env.GetType().Name}.");
|
||||
|
||||
this.SessionId = Throw.IfNullOrEmpty(sessionId);
|
||||
this.ChatHistoryProvider = new WorkflowChatHistoryProvider();
|
||||
}
|
||||
@@ -86,24 +98,30 @@ internal sealed class WorkflowSession : AgentSession
|
||||
public WorkflowSession(Workflow workflow, JsonElement serializedSession, IWorkflowExecutionEnvironment executionEnvironment, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false, JsonSerializerOptions? jsonSerializerOptions = null)
|
||||
{
|
||||
this._workflow = Throw.IfNull(workflow);
|
||||
this._executionEnvironment = Throw.IfNull(executionEnvironment);
|
||||
this._includeExceptionDetails = includeExceptionDetails;
|
||||
this._includeWorkflowOutputsInResponse = includeWorkflowOutputsInResponse;
|
||||
|
||||
IWorkflowExecutionEnvironment env = Throw.IfNull(executionEnvironment);
|
||||
|
||||
JsonMarshaller marshaller = new(jsonSerializerOptions);
|
||||
SessionState sessionState = marshaller.Marshal<SessionState>(serializedSession);
|
||||
|
||||
this._inMemoryCheckpointManager = sessionState.CheckpointManager;
|
||||
if (this._inMemoryCheckpointManager != null &&
|
||||
VerifyCheckpointingConfiguration(executionEnvironment, out InProcessExecutionEnvironment? inProcEnv))
|
||||
VerifyCheckpointingConfiguration(env, out InProcessExecutionEnvironment? inProcEnv))
|
||||
{
|
||||
this._executionEnvironment = inProcEnv.WithCheckpointing(this.EnsureExternalizedInMemoryCheckpointing());
|
||||
env = inProcEnv.WithCheckpointing(this.EnsureExternalizedInMemoryCheckpointing());
|
||||
}
|
||||
else if (this._inMemoryCheckpointManager != null)
|
||||
{
|
||||
throw new ArgumentException("The session was saved with an externalized checkpoint manager, but the incoming execution environment does not support it.", nameof(executionEnvironment));
|
||||
}
|
||||
|
||||
this._inProcEnvironment = env as InProcessExecutionEnvironment
|
||||
?? throw new InvalidOperationException(
|
||||
$"WorkflowSession requires an {nameof(InProcessExecutionEnvironment)}, " +
|
||||
$"but received {env.GetType().Name}.");
|
||||
|
||||
this.SessionId = sessionState.SessionId;
|
||||
this.ChatHistoryProvider = new WorkflowChatHistoryProvider();
|
||||
|
||||
@@ -160,10 +178,15 @@ internal sealed class WorkflowSession : AgentSession
|
||||
// and does not need to be checked again here.
|
||||
if (this.LastCheckpoint is not null)
|
||||
{
|
||||
// Use the internal resume path that suppresses pending request republishing.
|
||||
// WorkflowSession handles pending requests itself by converting matching responses
|
||||
// via SendMessagesWithResponseConversionAsync, so event-stream republishing would
|
||||
// cause unwanted duplicate events visible to the consumer.
|
||||
StreamingRun run =
|
||||
await this._executionEnvironment
|
||||
.ResumeStreamingAsync(this._workflow,
|
||||
await this._inProcEnvironment
|
||||
.ResumeStreamingInternalAsync(this._workflow,
|
||||
this.LastCheckpoint,
|
||||
republishPendingEvents: false,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
@@ -172,7 +195,7 @@ internal sealed class WorkflowSession : AgentSession
|
||||
return new ResumeRunResult(run, dispatchInfo);
|
||||
}
|
||||
|
||||
StreamingRun newRun = await this._executionEnvironment
|
||||
StreamingRun newRun = await this._inProcEnvironment
|
||||
.RunStreamingAsync(this._workflow,
|
||||
messages,
|
||||
this.SessionId,
|
||||
|
||||
+7
@@ -126,6 +126,13 @@ internal sealed class WorkflowHarness(Workflow workflow, string runId)
|
||||
{
|
||||
hasRequest = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// This is a republished event for the request we're already responding to
|
||||
// (emitted by RepublishUnservicedRequestsAsync during checkpoint resume).
|
||||
// Skip yielding it so downstream code doesn't treat it as a new pending request.
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
|
||||
case ConversationUpdateEvent conversationEvent:
|
||||
|
||||
@@ -0,0 +1,445 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using FluentAssertions;
|
||||
using Microsoft.Agents.AI.Workflows.InProc;
|
||||
using Microsoft.Agents.AI.Workflows.Sample;
|
||||
|
||||
namespace Microsoft.Agents.AI.Workflows.UnitTests;
|
||||
|
||||
/// <summary>
|
||||
/// Regression tests for GH-2485: pending <see cref="RequestInfoEvent"/> objects must be
|
||||
/// re-emitted after resuming a workflow from a checkpoint.
|
||||
/// </summary>
|
||||
public class CheckpointResumeTests
|
||||
{
|
||||
/// <summary>
|
||||
/// Verifies that a resumed workflow re-emits <see cref="RequestInfoEvent"/>s for
|
||||
/// pending external requests that existed at the time of the checkpoint.
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
||||
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
||||
internal async Task Checkpoint_Resume_WithPendingRequests_RepublishesRequestInfoEventsAsync(ExecutionEnvironment environment)
|
||||
{
|
||||
// Arrange
|
||||
RequestPort<string, string> requestPort = RequestPort.Create<string, string>("TestPort");
|
||||
ForwardMessageExecutor<string> processor = new("Processor");
|
||||
|
||||
Workflow workflow = new WorkflowBuilder(requestPort)
|
||||
.AddEdge(requestPort, processor)
|
||||
.Build();
|
||||
|
||||
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
||||
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
||||
|
||||
// Act 1: Run workflow, collect pending requests and a checkpoint.
|
||||
List<ExternalRequest> originalRequests = [];
|
||||
CheckpointInfo? checkpoint = null;
|
||||
|
||||
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
||||
.RunStreamingAsync(workflow, "Hello"))
|
||||
{
|
||||
await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false))
|
||||
{
|
||||
if (evt is RequestInfoEvent requestInfo)
|
||||
{
|
||||
originalRequests.Add(requestInfo.Request);
|
||||
}
|
||||
|
||||
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
||||
{
|
||||
checkpoint = cp;
|
||||
}
|
||||
}
|
||||
|
||||
originalRequests.Should().NotBeEmpty("the workflow should have created at least one external request");
|
||||
checkpoint.Should().NotBeNull("a checkpoint should have been created");
|
||||
}
|
||||
|
||||
// Act 2: Resume from the checkpoint.
|
||||
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
||||
.ResumeStreamingAsync(workflow, checkpoint!);
|
||||
|
||||
// Assert: The pending requests should be re-emitted.
|
||||
List<ExternalRequest> reEmittedRequests = [];
|
||||
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
||||
|
||||
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
||||
{
|
||||
if (evt is RequestInfoEvent requestInfo)
|
||||
{
|
||||
reEmittedRequests.Add(requestInfo.Request);
|
||||
}
|
||||
}
|
||||
|
||||
reEmittedRequests.Should().HaveCount(originalRequests.Count,
|
||||
"all pending requests from the checkpoint should be re-emitted after resume");
|
||||
reEmittedRequests.Select(r => r.RequestId)
|
||||
.Should().BeEquivalentTo(originalRequests.Select(r => r.RequestId),
|
||||
"the re-emitted request IDs should match the original pending request IDs");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that <see cref="RunStatus"/> transitions to <see cref="RunStatus.PendingRequests"/>
|
||||
/// after resuming from a checkpoint with pending external requests (not stuck at NotStarted).
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
||||
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
||||
internal async Task Checkpoint_Resume_WithPendingRequests_RunStatusIsPendingRequestsAsync(ExecutionEnvironment environment)
|
||||
{
|
||||
// Arrange
|
||||
RequestPort<string, string> requestPort = RequestPort.Create<string, string>("TestPort");
|
||||
ForwardMessageExecutor<string> processor = new("Processor");
|
||||
|
||||
Workflow workflow = new WorkflowBuilder(requestPort)
|
||||
.AddEdge(requestPort, processor)
|
||||
.Build();
|
||||
|
||||
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
||||
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
||||
|
||||
// First run: collect a checkpoint with pending requests.
|
||||
CheckpointInfo? checkpoint = null;
|
||||
|
||||
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
||||
.RunStreamingAsync(workflow, "Hello"))
|
||||
{
|
||||
await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false))
|
||||
{
|
||||
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
||||
{
|
||||
checkpoint = cp;
|
||||
}
|
||||
}
|
||||
|
||||
checkpoint.Should().NotBeNull();
|
||||
}
|
||||
|
||||
// Act: Resume from the checkpoint and consume events so the run loop processes.
|
||||
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
||||
.ResumeStreamingAsync(workflow, checkpoint!);
|
||||
|
||||
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
||||
await foreach (WorkflowEvent _ in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
||||
{
|
||||
// Consume all events until the stream completes.
|
||||
}
|
||||
|
||||
// Assert
|
||||
RunStatus status = await resumed.GetStatusAsync();
|
||||
status.Should().Be(RunStatus.PendingRequests,
|
||||
"the resumed workflow should report PendingRequests after rehydration");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the full roundtrip: resume from checkpoint, observe the re-emitted request,
|
||||
/// send a response, and verify the workflow completes without duplicating the request.
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
||||
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
||||
internal async Task Checkpoint_Resume_RespondToPendingRequest_CompletesWithoutDuplicateAsync(ExecutionEnvironment environment)
|
||||
{
|
||||
// Arrange
|
||||
RequestPort<string, string> requestPort = RequestPort.Create<string, string>("TestPort");
|
||||
ForwardMessageExecutor<string> processor = new("Processor");
|
||||
|
||||
Workflow workflow = new WorkflowBuilder(requestPort)
|
||||
.AddEdge(requestPort, processor)
|
||||
.Build();
|
||||
|
||||
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
||||
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
||||
|
||||
// First run: collect checkpoint + pending request.
|
||||
ExternalRequest? pendingRequest = null;
|
||||
CheckpointInfo? checkpoint = null;
|
||||
|
||||
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
||||
.RunStreamingAsync(workflow, "Hello"))
|
||||
{
|
||||
await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false))
|
||||
{
|
||||
if (evt is RequestInfoEvent requestInfo)
|
||||
{
|
||||
pendingRequest = requestInfo.Request;
|
||||
}
|
||||
|
||||
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
||||
{
|
||||
checkpoint = cp;
|
||||
}
|
||||
}
|
||||
|
||||
pendingRequest.Should().NotBeNull();
|
||||
checkpoint.Should().NotBeNull();
|
||||
}
|
||||
|
||||
// Act: Resume and respond to the restored request.
|
||||
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
||||
.ResumeStreamingAsync(workflow, checkpoint!);
|
||||
|
||||
int requestEventCount = 0;
|
||||
|
||||
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
||||
|
||||
// Use blockOnPendingRequest: false for the first pass to see the re-emitted requests.
|
||||
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
||||
{
|
||||
if (evt is RequestInfoEvent requestInfo)
|
||||
{
|
||||
requestEventCount++;
|
||||
requestInfo.Request.RequestId.Should().Be(pendingRequest!.RequestId,
|
||||
"the re-emitted request should match the original");
|
||||
}
|
||||
}
|
||||
|
||||
requestEventCount.Should().Be(1,
|
||||
"the pending request should be emitted exactly once (no duplicates)");
|
||||
|
||||
// Assert intermediate state before responding: the run should be in PendingRequests
|
||||
// and we should have observed the re-emitted request. If the first WatchStreamAsync
|
||||
// didn't complete or yielded nothing, these assertions catch it with a clear message.
|
||||
RunStatus statusBeforeResponse = await resumed.GetStatusAsync();
|
||||
statusBeforeResponse.Should().Be(RunStatus.PendingRequests,
|
||||
"the run should be in PendingRequests state before we send a response");
|
||||
|
||||
// Now send the response and verify the workflow processes it.
|
||||
ExternalResponse response = pendingRequest!.CreateResponse("World");
|
||||
await resumed.SendResponseAsync(response);
|
||||
|
||||
// Consume the resulting events to verify the workflow progresses without errors.
|
||||
List<WorkflowEvent> postResponseEvents = [];
|
||||
|
||||
using CancellationTokenSource cts2 = new(TimeSpan.FromSeconds(10));
|
||||
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts2.Token))
|
||||
{
|
||||
postResponseEvents.Add(evt);
|
||||
}
|
||||
|
||||
postResponseEvents.Should().NotBeEmpty(
|
||||
"the workflow should process the response and produce events");
|
||||
postResponseEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
||||
"no errors should occur when processing the restored request's response");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that restoring a live run to a checkpoint re-emits pending requests and allows
|
||||
/// the workflow to continue from that restored point.
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
||||
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
||||
internal async Task Checkpoint_Restore_WithPendingRequests_RepublishesRequestInfoEventsAsync(ExecutionEnvironment environment)
|
||||
{
|
||||
// Arrange
|
||||
Workflow workflow = CreateSimpleRequestWorkflow();
|
||||
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
||||
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
||||
|
||||
await using StreamingRun run = await env.WithCheckpointing(checkpointManager)
|
||||
.RunStreamingAsync(workflow, "Hello");
|
||||
|
||||
(ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run);
|
||||
|
||||
// Advance the run past the checkpoint so the restore has meaningful work to undo.
|
||||
await run.SendResponseAsync(pendingRequest.CreateResponse("World"));
|
||||
|
||||
List<WorkflowEvent> firstCompletionEvents = await ReadToHaltAsync(run);
|
||||
firstCompletionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
||||
"the workflow should continue cleanly before we restore");
|
||||
RunStatus statusAfterFirstResponse = await run.GetStatusAsync();
|
||||
statusAfterFirstResponse.Should().Be(RunStatus.Idle,
|
||||
"the workflow should finish processing the first response before we restore");
|
||||
|
||||
// Act
|
||||
await run.RestoreCheckpointAsync(checkpoint);
|
||||
|
||||
// Assert
|
||||
List<WorkflowEvent> restoredEvents = await ReadToHaltAsync(run);
|
||||
ExternalRequest[] replayedRequests = [.. restoredEvents.OfType<RequestInfoEvent>().Select(evt => evt.Request)];
|
||||
|
||||
replayedRequests.Should().ContainSingle("runtime restore should re-emit the restored pending request");
|
||||
replayedRequests[0].RequestId.Should().Be(pendingRequest.RequestId,
|
||||
"the replayed request should match the request captured at the checkpoint");
|
||||
|
||||
await run.SendResponseAsync(replayedRequests[0].CreateResponse("Again"));
|
||||
|
||||
List<WorkflowEvent> secondCompletionEvents = await ReadToHaltAsync(run);
|
||||
secondCompletionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
||||
"runtime restore replay should not introduce workflow errors");
|
||||
RunStatus statusAfterRestoreResponse = await run.GetStatusAsync();
|
||||
statusAfterRestoreResponse.Should().Be(RunStatus.Idle,
|
||||
"the workflow should be able to continue after the runtime restore replay");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow.
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
||||
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
||||
internal async Task Checkpoint_Resume_SubworkflowWithPendingRequests_RepublishesQualifiedRequestInfoEventsAsync(ExecutionEnvironment environment)
|
||||
{
|
||||
// Arrange
|
||||
Workflow workflow = CreateCheckpointedSubworkflowRequestWorkflow();
|
||||
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
||||
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
||||
|
||||
ExternalRequest pendingRequest;
|
||||
CheckpointInfo checkpoint;
|
||||
|
||||
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
||||
.RunStreamingAsync(workflow, "Hello"))
|
||||
{
|
||||
(pendingRequest, checkpoint) = await CapturePendingRequestAndCheckpointAsync(firstRun);
|
||||
}
|
||||
|
||||
// Act
|
||||
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
||||
.ResumeStreamingAsync(workflow, checkpoint);
|
||||
|
||||
// Assert
|
||||
List<WorkflowEvent> resumedEvents = await ReadToHaltAsync(resumed);
|
||||
ExternalRequest[] replayedRequests = [.. resumedEvents.OfType<RequestInfoEvent>().Select(evt => evt.Request)];
|
||||
|
||||
replayedRequests.Should().ContainSingle("the resumed parent workflow should surface the subworkflow request once");
|
||||
replayedRequests[0].RequestId.Should().Be(pendingRequest.RequestId,
|
||||
"the replayed subworkflow request should match the checkpointed request");
|
||||
replayedRequests[0].PortInfo.PortId.Should().Be(pendingRequest.PortInfo.PortId,
|
||||
"the replayed request should remain qualified through the subworkflow boundary");
|
||||
|
||||
await resumed.SendResponseAsync(replayedRequests[0].CreateResponse("World"));
|
||||
|
||||
List<WorkflowEvent> completionEvents = await ReadToHaltAsync(resumed);
|
||||
completionEvents.OfType<RequestInfoEvent>().Should().BeEmpty(
|
||||
"the resumed subworkflow request should not be replayed twice");
|
||||
completionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
|
||||
"subworkflow replay should not introduce workflow errors");
|
||||
RunStatus statusAfterSubworkflowResponse = await resumed.GetStatusAsync();
|
||||
statusAfterSubworkflowResponse.Should().Be(RunStatus.Idle,
|
||||
"the resumed subworkflow should continue after responding to the replayed request");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that when <c>republishPendingEvents</c> is <see langword="false"/>,
|
||||
/// no <see cref="RequestInfoEvent"/> is re-emitted after resuming from a checkpoint.
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
|
||||
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
|
||||
internal async Task Checkpoint_Resume_WithRepublishDisabled_DoesNotEmitRequestInfoEventsAsync(ExecutionEnvironment environment)
|
||||
{
|
||||
// Arrange
|
||||
RequestPort<string, string> requestPort = RequestPort.Create<string, string>("TestPort");
|
||||
ForwardMessageExecutor<string> processor = new("Processor");
|
||||
|
||||
Workflow workflow = new WorkflowBuilder(requestPort)
|
||||
.AddEdge(requestPort, processor)
|
||||
.Build();
|
||||
|
||||
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
|
||||
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
|
||||
|
||||
// First run: collect a checkpoint with pending requests.
|
||||
CheckpointInfo? checkpoint = null;
|
||||
|
||||
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
|
||||
.RunStreamingAsync(workflow, "Hello"))
|
||||
{
|
||||
await foreach (WorkflowEvent evt in firstRun.WatchStreamAsync(blockOnPendingRequest: false))
|
||||
{
|
||||
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
||||
{
|
||||
checkpoint = cp;
|
||||
}
|
||||
}
|
||||
|
||||
checkpoint.Should().NotBeNull();
|
||||
}
|
||||
|
||||
// Act: Resume with republishPendingEvents: false via the internal API.
|
||||
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
|
||||
.ResumeStreamingInternalAsync(workflow, checkpoint!, republishPendingEvents: false);
|
||||
|
||||
// Assert: No RequestInfoEvent should appear in the event stream.
|
||||
int requestEventCount = 0;
|
||||
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
||||
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
||||
{
|
||||
if (evt is RequestInfoEvent)
|
||||
{
|
||||
requestEventCount++;
|
||||
}
|
||||
}
|
||||
|
||||
requestEventCount.Should().Be(0,
|
||||
"no RequestInfoEvent should be emitted when republishPendingEvents is false");
|
||||
}
|
||||
|
||||
private static Workflow CreateSimpleRequestWorkflow(
|
||||
string requestPortId = "TestPort",
|
||||
string processorId = "Processor")
|
||||
{
|
||||
RequestPort<string, string> requestPort = RequestPort.Create<string, string>(requestPortId);
|
||||
ForwardMessageExecutor<string> processor = new(processorId);
|
||||
|
||||
return new WorkflowBuilder(requestPort)
|
||||
.AddEdge(requestPort, processor)
|
||||
.Build();
|
||||
}
|
||||
|
||||
private static Workflow CreateCheckpointedSubworkflowRequestWorkflow()
|
||||
{
|
||||
ExecutorBinding subworkflow = CreateSimpleRequestWorkflow(
|
||||
requestPortId: "InnerTestPort",
|
||||
processorId: "InnerProcessor")
|
||||
.BindAsExecutor("Subworkflow");
|
||||
|
||||
return new WorkflowBuilder(subworkflow)
|
||||
.AddExternalRequest<string, string>(subworkflow, id: "ForwardedSubworkflowRequest")
|
||||
.Build();
|
||||
}
|
||||
|
||||
private static async ValueTask<(ExternalRequest PendingRequest, CheckpointInfo Checkpoint)> CapturePendingRequestAndCheckpointAsync(StreamingRun run)
|
||||
{
|
||||
ExternalRequest? pendingRequest = null;
|
||||
CheckpointInfo? checkpoint = null;
|
||||
|
||||
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false))
|
||||
{
|
||||
if (evt is RequestInfoEvent requestInfo)
|
||||
{
|
||||
pendingRequest ??= requestInfo.Request;
|
||||
}
|
||||
|
||||
if (evt is SuperStepCompletedEvent step && step.CompletionInfo?.Checkpoint is { } cp)
|
||||
{
|
||||
checkpoint = cp;
|
||||
}
|
||||
}
|
||||
|
||||
pendingRequest.Should().NotBeNull("the workflow should have emitted a pending request");
|
||||
checkpoint.Should().NotBeNull("the workflow should have produced a checkpoint");
|
||||
return (pendingRequest!, checkpoint!);
|
||||
}
|
||||
|
||||
private static async ValueTask<List<WorkflowEvent>> ReadToHaltAsync(StreamingRun run)
|
||||
{
|
||||
List<WorkflowEvent> events = [];
|
||||
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
|
||||
|
||||
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cts.Token))
|
||||
{
|
||||
events.Add(evt);
|
||||
}
|
||||
|
||||
return events;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user