Files
agent-framework/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/ConsoleAppSamplesValidation.cs
T
Giles Odigwe c06af9a1b3 .NET: Python: Add dotnet integration test report to CI (#5515)
* Add dotnet integration test report to CI

- Add --report-junit flag to dotnet integration test step to generate
  JUnit XML alongside TRX, with explicit --results-directory to
  centralize output in IntegrationTestResults/
- Upload JUnit XML artifacts from each matrix leg (net10.0/ubuntu,
  net472/windows) as dotnet-test-results-{framework}-{os}
- Add dotnet-integration-test-report job that downloads artifacts,
  runs the existing aggregate.py script, posts markdown to Job Summary,
  and saves trend history via actions/cache
- Refactor aggregate.py to discover JUnit XML files recursively,
  supporting both pytest (pytest.xml) and xunit (*.junit.xml) layouts
- Handle provider name derivation for dotnet artifact naming convention
- Fix nodeid collision when same test runs under multiple frameworks
  by qualifying keys with provider when collisions are detected
- Improve module extraction for dotnet C# classnames (recognizes
  IntegrationTests/UnitTests namespace segments)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* chore: trigger dotnet CI for report validation

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: use .junit extension (not .junit.xml) for xunit v3 output

xUnit v3 generates files with .junit extension, not .junit.xml.
Update upload glob and aggregate.py discovery to match.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: use deterministic provider-qualified keys for dotnet tests

Always prefix dotnet test keys with provider (e.g. net10.0 (ubuntu)::TestName)
to ensure stable, comparable counts across runs regardless of file parse order.
Also show Executed (passed+failed) instead of Total in summary table.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: match Python report summary format (Total, passed/total, etc.)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat: split dotnet report into per-framework tables

Dotnet tests run on multiple frameworks (net10.0, net472). Instead of
one combined table with unstable totals, show separate sections per
framework — each with its own summary row and per-test table. Python
reports retain the original single-table format.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Re-enable 7 flaky dotnet integration tests with increased timeouts

Increase timeouts to reduce timing-related flakiness in LLM-backed
integration tests (issue #4971):

- ExternalClientTests: 60s -> 120s default timeout
- SamplesValidationBase: 60s -> 120s default timeout
- ConsoleAppSamplesValidation: 90s -> 150s for long-running tests
- AzureFunctions SamplesValidation: 2min -> 3min orchestration timeout,
  60s -> 90s per-step WaitForConditionAsync timeouts

Remove all Skip=Flaky annotations and unused SkipFlakyTimingTest constants.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Re-skip LLM non-determinism flaky tests, keep timeout fixes

Re-skip SingleAgentOrchestrationHITLSampleValidationAsync and
LongRunningToolsSampleValidationAsync - these fail due to LLM producing
extra review notifications, not timeouts. Updated skip reasons to
accurately describe the root cause. Reverted unnecessary timeout change
on the skipped LongRunningTools test.

The remaining 5 re-enabled tests with timeout increases are stable.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Enable Anthropic integration tests in CI

Replace hardcoded skip with conditional skip pattern (matching
CopilotStudio approach): tests gracefully skip when ANTHROPIC_API_KEY
is missing, and run when present.

Changes:
- AnthropicChatCompletionFixture: try/catch in InitializeAsync with
  Assert.Skip on missing config (replaces hardcoded SkipReason)
- AnthropicSkillsIntegrationTests: same pattern per test method
- dotnet-build-and-test.yml: wire up ANTHROPIC_API_KEY,
  ANTHROPIC_CHAT_MODEL_NAME, and ANTHROPIC_REASONING_MODEL_NAME
  env vars to the integration test step

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix missing System using in AnthropicSkillsIntegrationTests

Add 'using System;' for InvalidOperationException in try/catch blocks.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Skip flaky SingleAgentOrchestrationChainingSampleValidationAsync

LLM non-determinism causes Assert.NotNull failures on orchestration
results. Skip until test logic is hardened against non-deterministic
LLM responses.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Re-enable HITL and LongRunningTools tests with timeout and flexibility fixes

- Remove Skip attribute from SingleAgentOrchestrationHITLSampleValidationAsync
- Remove Skip attribute from LongRunningToolsSampleValidationAsync
- Increase timeout from 120s/90s to 180s to accommodate 2+ LLM round-trips
- Replace rigid 2-cycle assertion with flexible approval logic that handles
  extra review cycles from LLM non-determinism

Fixes the two failure modes identified in #4971:
1. Timeout: 120s/90s was insufficient for multiple LLM calls under CI load
2. Extra notifications: Assert.Fail on 3rd+ review cycle was too rigid

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Increase AzureFunctions LongRunningTools test timeouts from 90s to 180s

The LongRunningToolsSampleValidationAsync test in the AzureFunctions integration
tests was failing in CI with TimeoutException at the 'Content published
notification is logged' step. The 90-second timeouts are too tight for CI
environments where LLM calls and orchestration overhead can be slow.

Increased all three WaitForConditionAsync timeouts from 90s to 180s:
- Waiting for human feedback notification
- Waiting for publish notification (the step that was failing)
- Waiting for orchestration completion

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Merge main and fix dotnet report path after flaky_report rename

Merge upstream/main which renamed scripts/flaky_report/ to
scripts/integration_test_report/ (from Python PR #5454). Update the
dotnet-build-and-test workflow to reference the new path.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add RetryFact to DurableTask and AzureFunctions integration tests

These tests interact with LLMs via stdin/stdout (DurableTask) or HTTP
(AzureFunctions) and are inherently non-deterministic. Unlike the Python
side which uses pytest-retry, the dotnet tests had no retry mechanism
and a single transient failure would fail the entire CI run.

Changes:
- Switch [Fact] to [RetryFact(2, 5000)] on all LLM-dependent tests
  across ConsoleAppSamplesValidation, ExternalClientTests,
  WorkflowConsoleAppSamplesValidation, and AzureFunctions SamplesValidation
- Add re-prompt mechanism to LongRunningToolsSampleValidationAsync:
  if the LLM doesn't invoke the tool within 60s, re-send the prompt
  (up to 2 retries) instead of burning the full timeout
- Reduce LongRunningTools timeout from 240s to 180s (re-prompt makes
  the extra buffer unnecessary)
- Leave simple/deterministic tests as [Fact] (SingleAgent, unit tests)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add persist-credentials: false to Integration Test Report checkout step

Matches the convention used by other checkout steps in this workflow
to avoid leaving GITHUB_TOKEN credentials in the local git config.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* small fixes

* disable anthropic failing tests

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-07 20:39:32 +00:00

573 lines
26 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text;
namespace Microsoft.Agents.AI.DurableTask.IntegrationTests;
/// <summary>
/// Integration tests for validating the durable agent console app samples
/// located in samples/Durable/Agents/ConsoleApps.
/// </summary>
[Collection("Samples")]
[Trait("Category", "SampleValidation")]
public sealed class ConsoleAppSamplesValidation(ITestOutputHelper outputHelper) : SamplesValidationBase(outputHelper)
{
private static readonly string s_samplesPath = Path.GetFullPath(
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "..", "..", "..", "..", "..", "samples", "04-hosting", "DurableAgents", "ConsoleApps"));
/// <inheritdoc />
protected override string SamplesPath => s_samplesPath;
/// <inheritdoc />
protected override bool RequiresRedis => true;
/// <inheritdoc />
protected override void ConfigureAdditionalEnvironmentVariables(ProcessStartInfo startInfo, Action<string, string> setEnvVar)
{
setEnvVar("REDIS_CONNECTION_STRING", $"localhost:{RedisPort}");
}
[Fact]
public async Task SingleAgentSampleValidationAsync()
{
using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts();
string samplePath = Path.Combine(s_samplesPath, "01_SingleAgent");
await this.RunSampleTestAsync(samplePath, async (process, logs) =>
{
string agentResponse = string.Empty;
bool inputSent = false;
// Read output from logs queue
string? line;
while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null)
{
// Look for the agent's response. Unlike the interactive mode, we won't actually see a line
// that starts with "Joker: ". Instead, we'll see a line that looks like "You: Joker: ..." because
// the standard input is *not* echoed back to standard output.
if (line.Contains("Joker: ", StringComparison.OrdinalIgnoreCase))
{
// This will give us the first line of the agent's response, which is all we need to verify that the agent is working.
agentResponse = line.Substring("Joker: ".Length).Trim();
break;
}
else if (!inputSent)
{
// Send input to stdin after we've started seeing output from the app
await this.WriteInputAsync(process, "Tell me a joke about a pirate.", testTimeoutCts.Token);
inputSent = true;
}
}
Assert.True(inputSent, "Input was not sent to the agent");
Assert.NotEmpty(agentResponse);
// Send exit command
await this.WriteInputAsync(process, "exit", testTimeoutCts.Token);
});
}
[RetryFact(2, 5000)]
public async Task SingleAgentOrchestrationChainingSampleValidationAsync()
{
using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts();
string samplePath = Path.Combine(s_samplesPath, "02_AgentOrchestration_Chaining");
await this.RunSampleTestAsync(samplePath, async (process, logs) =>
{
// Console app runs automatically, just wait for completion
string? line;
bool foundSuccess = false;
while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null)
{
if (line.Contains("Orchestration completed successfully!", StringComparison.OrdinalIgnoreCase))
{
foundSuccess = true;
}
if (line.Contains("Result:", StringComparison.OrdinalIgnoreCase))
{
string result = line.Substring("Result:".Length).Trim();
Assert.NotEmpty(result);
break;
}
// Check for failure
if (line.Contains("Orchestration failed!", StringComparison.OrdinalIgnoreCase))
{
Assert.Fail("Orchestration failed.");
}
}
Assert.True(foundSuccess, "Orchestration did not complete successfully.");
});
}
[RetryFact(2, 5000)]
public async Task MultiAgentConcurrencySampleValidationAsync()
{
using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts();
string samplePath = Path.Combine(s_samplesPath, "03_AgentOrchestration_Concurrency");
await this.RunSampleTestAsync(samplePath, async (process, logs) =>
{
// Send input to stdin
await this.WriteInputAsync(process, "What is temperature?", testTimeoutCts.Token);
// Read output from logs queue
StringBuilder output = new();
string? line;
bool foundSuccess = false;
bool foundPhysicist = false;
bool foundChemist = false;
while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null)
{
output.AppendLine(line);
if (line.Contains("Orchestration completed successfully!", StringComparison.OrdinalIgnoreCase))
{
foundSuccess = true;
}
if (line.Contains("Physicist's response:", StringComparison.OrdinalIgnoreCase))
{
foundPhysicist = true;
}
if (line.Contains("Chemist's response:", StringComparison.OrdinalIgnoreCase))
{
foundChemist = true;
}
// Check for failure
if (line.Contains("Orchestration failed!", StringComparison.OrdinalIgnoreCase))
{
Assert.Fail("Orchestration failed.");
}
// Stop reading once we have both responses
if (foundSuccess && foundPhysicist && foundChemist)
{
break;
}
}
Assert.True(foundSuccess, "Orchestration did not complete successfully.");
Assert.True(foundPhysicist, "Physicist response not found.");
Assert.True(foundChemist, "Chemist response not found.");
});
}
[RetryFact(2, 5000)]
public async Task MultiAgentConditionalSampleValidationAsync()
{
using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts();
string samplePath = Path.Combine(s_samplesPath, "04_AgentOrchestration_Conditionals");
await this.RunSampleTestAsync(samplePath, async (process, logs) =>
{
// Test with legitimate email
await this.TestSpamDetectionAsync(
process: process,
logs: logs,
emailId: "email-001",
emailContent: "Hi John. I wanted to follow up on our meeting yesterday about the quarterly report. Could you please send me the updated figures by Friday? Thanks!",
expectedSpam: false,
testTimeoutCts.Token);
// Restart the process for the second test
await process.WaitForExitAsync();
});
// Run second test with spam email
using CancellationTokenSource testTimeoutCts2 = this.CreateTestTimeoutCts();
await this.RunSampleTestAsync(samplePath, async (process, logs) =>
{
await this.TestSpamDetectionAsync(
process,
logs,
emailId: "email-002",
emailContent: "URGENT! You've won $1,000,000! Click here now to claim your prize! Limited time offer! Don't miss out!",
expectedSpam: true,
testTimeoutCts2.Token);
});
}
private async Task TestSpamDetectionAsync(
Process process,
BlockingCollection<OutputLog> logs,
string emailId,
string emailContent,
bool expectedSpam,
CancellationToken cancellationToken)
{
// Send email content to stdin
await this.WriteInputAsync(process, emailContent, cancellationToken);
// Read output from logs queue
string? line;
bool foundSuccess = false;
while ((line = this.ReadLogLine(logs, cancellationToken)) != null)
{
if (line.Contains("Email sent", StringComparison.OrdinalIgnoreCase))
{
Assert.False(expectedSpam, "Email was sent, but was expected to be marked as spam.");
}
if (line.Contains("Email marked as spam", StringComparison.OrdinalIgnoreCase))
{
Assert.True(expectedSpam, "Email was marked as spam, but was expected to be sent.");
}
if (line.Contains("Orchestration completed successfully!", StringComparison.OrdinalIgnoreCase))
{
foundSuccess = true;
break;
}
// Check for failure
if (line.Contains("Orchestration failed!", StringComparison.OrdinalIgnoreCase))
{
Assert.Fail("Orchestration failed.");
}
}
Assert.True(foundSuccess, "Orchestration did not complete successfully.");
}
[RetryFact(2, 5000)]
public async Task SingleAgentOrchestrationHITLSampleValidationAsync()
{
string samplePath = Path.Combine(s_samplesPath, "05_AgentOrchestration_HITL");
await this.RunSampleTestAsync(samplePath, async (process, logs) =>
{
using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts(TimeSpan.FromSeconds(180));
// Start the HITL orchestration following the happy path from README
await this.WriteInputAsync(process, "The Future of Artificial Intelligence", testTimeoutCts.Token);
await this.WriteInputAsync(process, "3", testTimeoutCts.Token);
await this.WriteInputAsync(process, "72", testTimeoutCts.Token);
// Read output from logs queue
string? line;
bool rejectionSent = false;
bool approvalSent = false;
bool contentPublished = false;
while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null)
{
// Look for notification that content is ready. The first time we see this, we should send a rejection.
// Subsequent times we see this, we should send approval (LLM may produce extra review cycles).
if (line.Contains("Content is ready for review", StringComparison.OrdinalIgnoreCase))
{
if (!rejectionSent)
{
// Prompt: Approve? (y/n):
await this.WriteInputAsync(process, "n", testTimeoutCts.Token);
// Prompt: Feedback (optional):
await this.WriteInputAsync(
process,
"The article needs more technical depth and better examples. Rewrite it with less than 300 words.",
testTimeoutCts.Token);
rejectionSent = true;
}
else
{
// Approve any subsequent draft (LLM non-determinism may produce extra review cycles)
await this.WriteInputAsync(process, "y", testTimeoutCts.Token);
// Prompt: Feedback (optional):
await this.WriteInputAsync(process, "Looks good!", testTimeoutCts.Token);
approvalSent = true;
}
}
// Look for success message
if (line.Contains("PUBLISHING: Content has been published", StringComparison.OrdinalIgnoreCase))
{
contentPublished = true;
break;
}
// Check for failure
if (line.Contains("Orchestration failed", StringComparison.OrdinalIgnoreCase))
{
Assert.Fail("Orchestration failed.");
}
}
Assert.True(rejectionSent, "Wasn't prompted with the first draft.");
Assert.True(approvalSent, "Wasn't prompted with the second draft.");
Assert.True(contentPublished, "Content was not published.");
});
}
[RetryFact(2, 5000)]
public async Task LongRunningToolsSampleValidationAsync()
{
string samplePath = Path.Combine(s_samplesPath, "06_LongRunningTools");
await this.RunSampleTestAsync(samplePath, async (process, logs) =>
{
// This test takes a bit longer to run due to the multiple agent interactions and the lengthy content generation.
using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts(TimeSpan.FromSeconds(180));
// Test starting an agent that schedules a content generation orchestration
await this.WriteInputAsync(
process,
"Start a content generation workflow for the topic 'The Future of Artificial Intelligence'. Keep it less than 300 words.",
testTimeoutCts.Token);
// Read output from logs queue
bool rejectionSent = false;
bool approvalSent = false;
bool contentPublished = false;
string? line;
while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null)
{
// Look for notification that content is ready. The first time we see this, we should send a rejection.
// Subsequent times we see this, we should send approval (LLM may produce extra review cycles).
if (line.Contains("NOTIFICATION: Please review the following content for approval", StringComparison.OrdinalIgnoreCase))
{
// Wait for the notification to be fully written to the console
await Task.Delay(TimeSpan.FromSeconds(1), testTimeoutCts.Token);
if (!rejectionSent)
{
// Reject the content with feedback. Note that we need to send a newline character to the console first before sending the input.
await this.WriteInputAsync(
process,
"\nReject the content with feedback: Make it even shorter.",
testTimeoutCts.Token);
rejectionSent = true;
}
else
{
// Approve any subsequent draft (LLM non-determinism may produce extra review cycles)
await this.WriteInputAsync(
process,
"\nApprove the content",
testTimeoutCts.Token);
approvalSent = true;
}
}
// Look for success message
if (line.Contains("PUBLISHING: Content has been published successfully", StringComparison.OrdinalIgnoreCase))
{
contentPublished = true;
// Ask for the status of the workflow to confirm that it completed successfully.
await Task.Delay(TimeSpan.FromSeconds(1), testTimeoutCts.Token);
await this.WriteInputAsync(process, "\nGet the status of the workflow you previously started", testTimeoutCts.Token);
}
// Check for workflow completion or failure
if (contentPublished)
{
if (line.Contains("Completed", StringComparison.OrdinalIgnoreCase))
{
break;
}
else if (line.Contains("Failed", StringComparison.OrdinalIgnoreCase))
{
Assert.Fail("Workflow failed.");
}
}
}
Assert.True(rejectionSent, "Wasn't prompted with the first draft.");
Assert.True(approvalSent, "Wasn't prompted with the second draft.");
Assert.True(contentPublished, "Content was not published.");
});
}
[RetryFact(2, 5000)]
public async Task ReliableStreamingSampleValidationAsync()
{
string samplePath = Path.Combine(s_samplesPath, "07_ReliableStreaming");
await this.RunSampleTestAsync(samplePath, async (process, logs) =>
{
// This test takes a bit longer to run due to the multiple agent interactions and the lengthy content generation.
using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts(TimeSpan.FromSeconds(150));
// Test the agent endpoint with a simple prompt
await this.WriteInputAsync(process, "Plan a 5-day trip to Seattle. Include daily activities.", testTimeoutCts.Token);
// Read output from stdout - should stream in real-time
// NOTE: The sample uses Console.Write() for streaming chunks, which means content may not be line-buffered.
// We test the interrupt/resume flow by:
// 1. Waiting for at least 10 lines of content
// 2. Sending Enter to interrupt
// 3. Verifying we get "Last cursor" output
// 4. Sending Enter again to resume
// 5. Verifying we get more content and that we're not restarting from the beginning
string? line;
bool foundConversationStart = false;
int contentLinesBeforeInterrupt = 0;
int contentLinesAfterResume = 0;
bool foundLastCursor = false;
bool foundResumeMessage = false;
bool interrupted = false;
bool resumed = false;
// Read output with a reasonable timeout
using CancellationTokenSource readTimeoutCts = this.CreateTestTimeoutCts();
DateTime? interruptTime = null;
try
{
while ((line = this.ReadLogLine(logs, readTimeoutCts.Token)) != null)
{
// Look for the conversation start message (updated format)
if (line.Contains("Conversation ID", StringComparison.OrdinalIgnoreCase))
{
foundConversationStart = true;
continue;
}
// Check if this is a content line (not prompts or status messages)
bool isContentLine = !string.IsNullOrWhiteSpace(line) &&
!line.Contains("Conversation ID", StringComparison.OrdinalIgnoreCase) &&
!line.Contains("Press [Enter]", StringComparison.OrdinalIgnoreCase) &&
!line.Contains("You:", StringComparison.OrdinalIgnoreCase) &&
!line.Contains("exit", StringComparison.OrdinalIgnoreCase) &&
!line.Contains("Stream cancelled", StringComparison.OrdinalIgnoreCase) &&
!line.Contains("Resuming conversation", StringComparison.OrdinalIgnoreCase) &&
!line.Contains("Last cursor", StringComparison.OrdinalIgnoreCase);
// Phase 1: Collect content before interrupt
if (foundConversationStart && !interrupted && isContentLine)
{
contentLinesBeforeInterrupt++;
}
// Phase 2: Wait for enough content, then interrupt
// Interrupt after 2 lines to maximize chance of catching stream while active
// (streams can complete very quickly, so we need to interrupt early)
if (foundConversationStart && !interrupted && contentLinesBeforeInterrupt >= 2)
{
this.OutputHelper.WriteLine($"Interrupting stream after {contentLinesBeforeInterrupt} content lines");
interrupted = true;
interruptTime = DateTime.Now;
// Send Enter to interrupt the stream
await this.WriteInputAsync(process, string.Empty, testTimeoutCts.Token);
// Give the cancellation token a moment to be processed
// Use a longer delay to ensure cancellation propagates
await Task.Delay(TimeSpan.FromMilliseconds(300), testTimeoutCts.Token);
}
// Phase 3: Look for "Last cursor" message after interrupt
if (interrupted && !resumed && line.Contains("Last cursor", StringComparison.OrdinalIgnoreCase))
{
foundLastCursor = true;
// Send Enter again to resume
this.OutputHelper.WriteLine("Resuming stream from last cursor");
await this.WriteInputAsync(process, string.Empty, testTimeoutCts.Token);
resumed = true;
}
// Phase 4: Look for resume message
if (resumed && line.Contains("Resuming conversation", StringComparison.OrdinalIgnoreCase))
{
foundResumeMessage = true;
}
// Phase 5: Collect content after resume
if (resumed && isContentLine)
{
contentLinesAfterResume++;
}
// Look for completion message - but don't break if we interrupted and haven't found Last cursor yet
// Allow some time after interrupt for the cancellation message to appear
if (line.Contains("Conversation completed", StringComparison.OrdinalIgnoreCase))
{
// If we interrupted but haven't found Last cursor, wait a bit more
if (interrupted && !foundLastCursor && interruptTime.HasValue)
{
TimeSpan timeSinceInterrupt = DateTime.Now - interruptTime.Value;
if (timeSinceInterrupt < TimeSpan.FromSeconds(2))
{
// Continue reading for a bit more to catch the cancellation message
this.OutputHelper.WriteLine("Stream completed naturally, but waiting for Last cursor message after interrupt...");
continue;
}
}
// Only break if we've completed the test or if stream completed without interruption
if (!interrupted || (resumed && foundResumeMessage && contentLinesAfterResume >= 5))
{
break;
}
}
// Stop once we've verified the interrupt/resume flow works
if (resumed && foundResumeMessage && contentLinesAfterResume >= 5)
{
this.OutputHelper.WriteLine($"Successfully verified interrupt/resume: {contentLinesBeforeInterrupt} lines before, {contentLinesAfterResume} lines after");
break;
}
}
// If we interrupted but didn't find Last cursor, wait a bit more for it to appear
if (interrupted && !foundLastCursor && interruptTime.HasValue)
{
TimeSpan timeSinceInterrupt = DateTime.Now - interruptTime.Value;
if (timeSinceInterrupt < TimeSpan.FromSeconds(3))
{
this.OutputHelper.WriteLine("Waiting for Last cursor message after interrupt...");
using CancellationTokenSource waitCts = new(TimeSpan.FromSeconds(2));
try
{
while ((line = this.ReadLogLine(logs, waitCts.Token)) != null)
{
if (line.Contains("Last cursor", StringComparison.OrdinalIgnoreCase))
{
foundLastCursor = true;
if (!resumed)
{
this.OutputHelper.WriteLine("Resuming stream from last cursor");
await this.WriteInputAsync(process, string.Empty, testTimeoutCts.Token);
resumed = true;
}
break;
}
}
}
catch (OperationCanceledException)
{
// Timeout waiting for Last cursor
}
}
}
}
catch (OperationCanceledException)
{
// Timeout - check if we got enough to verify the flow
this.OutputHelper.WriteLine($"Read timeout reached. Interrupted: {interrupted}, Resumed: {resumed}, Content before: {contentLinesBeforeInterrupt}, Content after: {contentLinesAfterResume}");
}
Assert.True(foundConversationStart, "Conversation start message not found.");
Assert.True(contentLinesBeforeInterrupt >= 2, $"Not enough content before interrupt (got {contentLinesBeforeInterrupt}).");
// If stream completed before interrupt could take effect, that's a timing issue
// but we should still verify we got the conversation started
if (!interrupted)
{
this.OutputHelper.WriteLine("WARNING: Stream completed before interrupt could be sent. This may indicate the stream is too fast.");
}
Assert.True(interrupted, "Stream was not interrupted (may have completed too quickly).");
Assert.True(foundLastCursor, "'Last cursor' message not found after interrupt.");
Assert.True(resumed, "Stream was not resumed.");
Assert.True(foundResumeMessage, "Resume message not found.");
Assert.True(contentLinesAfterResume > 0, "No content received after resume (expected to continue from cursor, not restart).");
});
}
}