Fix flaky test: prevent spurious workflow_invoke Activity on timeout wake-up

The StreamingRunEventStream run loop uses a 1-second timeout on
WaitForInputAsync. When the timeout fires before the consumer calls
StopAsync, the loop would create a spurious workflow_invoke Activity
even though no actual input was provided. This caused the
WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsync test
to intermittently fail (expecting 2 activities but finding 3).

Fix: guard the loop body with a HasUnprocessedMessages check. On
timeout wake-ups with no work, the loop waits again without creating
an activity or changing the run status.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Roger Barreto
2026-02-27 14:27:20 +00:00
Unverified
parent cc78ab2185
commit 98963e17f2
@@ -80,6 +80,18 @@ internal sealed class StreamingRunEventStream : IRunEventStream
while (!linkedSource.Token.IsCancellationRequested)
{
// Guard against spurious wake-ups from the input waiter timeout.
// Without this check, a timeout wake-up (no actual input) would create
// a new workflow_invoke Activity even though there is no work to process.
if (!this._stepRunner.HasUnprocessedMessages)
{
await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false);
continue;
}
// When signaled with actual input, resume running
this._runStatus = RunStatus.Running;
// Start a new run-stage activity for this input→processing→halt cycle
runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
@@ -117,9 +129,6 @@ internal sealed class StreamingRunEventStream : IRunEventStream
// Wait for next input from the consumer
// Works for both Idle (no work) and PendingRequests (waiting for responses)
await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false);
// When signaled, resume running
this._runStatus = RunStatus.Running;
}
}
catch (OperationCanceledException)