From 98963e17f2cee64d4304b9f19e5f4ab380435961 Mon Sep 17 00:00:00 2001 From: Roger Barreto <19890735+rogerbarreto@users.noreply.github.com> Date: Fri, 27 Feb 2026 14:27:20 +0000 Subject: [PATCH] Fix flaky test: prevent spurious workflow_invoke Activity on timeout wake-up The StreamingRunEventStream run loop uses a 1-second timeout on WaitForInputAsync. When the timeout fires before the consumer calls StopAsync, the loop would create a spurious workflow_invoke Activity even though no actual input was provided. This caused the WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsync test to intermittently fail (expecting 2 activities but finding 3). Fix: guard the loop body with a HasUnprocessedMessages check. On timeout wake-ups with no work, the loop waits again without creating an activity or changing the run status. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Execution/StreamingRunEventStream.cs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index a09dedd8ad..dabfb8a54b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -80,6 +80,18 @@ internal sealed class StreamingRunEventStream : IRunEventStream while (!linkedSource.Token.IsCancellationRequested) { + // Guard against spurious wake-ups from the input waiter timeout. + // Without this check, a timeout wake-up (no actual input) would create + // a new workflow_invoke Activity even though there is no work to process. + if (!this._stepRunner.HasUnprocessedMessages) + { + await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false); + continue; + } + + // When signaled with actual input, resume running + this._runStatus = RunStatus.Running; + // Start a new run-stage activity for this input→processing→halt cycle runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId) @@ -117,9 +129,6 @@ internal sealed class StreamingRunEventStream : IRunEventStream // Wait for next input from the consumer // Works for both Idle (no work) and PendingRequests (waiting for responses) await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false); - - // When signaled, resume running - this._runStatus = RunStatus.Running; } } catch (OperationCanceledException)