// Copyright (c) Microsoft. All rights reserved. using System.Collections.Concurrent; using System.Diagnostics; using System.Reflection; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; namespace Microsoft.Agents.AI.DurableTask.IntegrationTests; /// /// Base class for sample validation integration tests providing shared infrastructure /// setup and utility methods for running console app samples. /// public abstract class SamplesValidationBase : IAsyncLifetime { protected const string DtsPort = "8080"; protected const string RedisPort = "6379"; protected static readonly string DotnetTargetFramework = GetTargetFramework(); protected static readonly IConfiguration Configuration = new ConfigurationBuilder() .AddUserSecrets(Assembly.GetExecutingAssembly()) .AddEnvironmentVariables() .Build(); // Semaphores for thread-safe initialization of shared infrastructure. // xUnit may run tests in parallel, so we need to ensure that DTS emulator and Redis // are started only once across all test instances. Using SemaphoreSlim allows async-safe // locking, and the double-check pattern (check flag, acquire lock, check flag again) // minimizes lock contention after initialization is complete. private static readonly SemaphoreSlim s_dtsInitLock = new(1, 1); private static readonly SemaphoreSlim s_redisInitLock = new(1, 1); private static bool s_dtsInfrastructureStarted; private static bool s_redisInfrastructureStarted; protected SamplesValidationBase(ITestOutputHelper outputHelper) { this.OutputHelper = outputHelper; } /// /// Gets the test output helper for logging. /// protected ITestOutputHelper OutputHelper { get; } /// /// Gets the base path to the samples directory for this test class. /// protected abstract string SamplesPath { get; } /// /// Gets whether this test class requires Redis infrastructure. /// protected virtual bool RequiresRedis => false; /// /// Gets the task hub name prefix for this test class. /// protected virtual string TaskHubPrefix => "sample"; /// public async ValueTask InitializeAsync() { await EnsureDtsInfrastructureStartedAsync(this.OutputHelper, this.StartDtsEmulatorAsync); if (this.RequiresRedis) { await EnsureRedisInfrastructureStartedAsync(this.OutputHelper, this.StartRedisAsync); } await Task.Delay(TimeSpan.FromSeconds(5)); } /// /// Ensures DTS infrastructure is started exactly once across all test instances. /// Static method writes to static field to avoid the code smell of instance methods modifying shared state. /// private static async Task EnsureDtsInfrastructureStartedAsync(ITestOutputHelper outputHelper, Func startAction) { if (s_dtsInfrastructureStarted) { return; } await s_dtsInitLock.WaitAsync(); try { if (!s_dtsInfrastructureStarted) { outputHelper.WriteLine("Starting shared DTS infrastructure..."); await startAction(); s_dtsInfrastructureStarted = true; } } finally { s_dtsInitLock.Release(); } } /// /// Ensures Redis infrastructure is started exactly once across all test instances. /// Static method writes to static field to avoid the code smell of instance methods modifying shared state. /// private static async Task EnsureRedisInfrastructureStartedAsync(ITestOutputHelper outputHelper, Func startAction) { if (s_redisInfrastructureStarted) { return; } await s_redisInitLock.WaitAsync(); try { if (!s_redisInfrastructureStarted) { outputHelper.WriteLine("Starting shared Redis infrastructure..."); await startAction(); s_redisInfrastructureStarted = true; } } finally { s_redisInitLock.Release(); } } /// public ValueTask DisposeAsync() { GC.SuppressFinalize(this); return default; } protected sealed record OutputLog(DateTime Timestamp, LogLevel Level, string Message); /// /// Runs a sample test by starting the console app and executing the provided test action. /// protected async Task RunSampleTestAsync(string samplePath, Func, Task> testAction) { string uniqueTaskHubName = $"{this.TaskHubPrefix}-{Guid.NewGuid():N}"[..^26]; // Build the sample project first so that build failures are caught immediately // instead of silently failing inside 'dotnet run' and causing a timeout. await this.BuildSampleAsync(samplePath); using BlockingCollection logsContainer = []; using Process appProcess = this.StartConsoleApp(samplePath, logsContainer, uniqueTaskHubName); try { await testAction(appProcess, logsContainer); } catch (OperationCanceledException e) { throw new TimeoutException("Core test logic timed out!", e); } finally { if (!logsContainer.IsAddingCompleted) { logsContainer.CompleteAdding(); } await this.StopProcessAsync(appProcess); } } /// /// Writes a line to the process's stdin and flushes it. /// protected async Task WriteInputAsync(Process process, string input, CancellationToken cancellationToken) { this.OutputHelper.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [{process.ProcessName}(in)]: {input}"); await process.StandardInput.WriteLineAsync(input); await process.StandardInput.FlushAsync(cancellationToken); } /// /// Reads the next Information-level log line from the queue. /// Returns null if cancelled or collection is completed. /// protected string? ReadLogLine(BlockingCollection logs, CancellationToken cancellationToken) { try { while (!cancellationToken.IsCancellationRequested) { OutputLog log = logs.Take(cancellationToken); if (log.Message.Contains("Unhandled exception")) { Assert.Fail("Console app encountered an unhandled exception."); } if (log.Level == LogLevel.Information) { return log.Message; } } } catch (OperationCanceledException) { return null; } catch (InvalidOperationException) { return null; } return null; } /// /// Creates a cancellation token source with the specified timeout for test operations. /// protected CancellationTokenSource CreateTestTimeoutCts(TimeSpan? timeout = null) { TimeSpan testTimeout = Debugger.IsAttached ? TimeSpan.FromMinutes(5) : timeout ?? TimeSpan.FromSeconds(120); return new CancellationTokenSource(testTimeout); } /// /// Allows derived classes to set additional environment variables for the console app process. /// protected virtual void ConfigureAdditionalEnvironmentVariables(ProcessStartInfo startInfo, Action setEnvVar) { } private static string GetTargetFramework() { string filePath = new Uri(typeof(SamplesValidationBase).Assembly.Location).LocalPath; string directory = Path.GetDirectoryName(filePath)!; string tfm = Path.GetFileName(directory); if (tfm.StartsWith("net", StringComparison.OrdinalIgnoreCase)) { return tfm; } throw new InvalidOperationException($"Unable to find target framework in path: {filePath}"); } private async Task StartDtsEmulatorAsync() { if (!await this.IsDtsEmulatorRunningAsync()) { this.OutputHelper.WriteLine("Starting DTS emulator..."); await this.RunCommandAsync("docker", "run", "-d", "--name", "dts-emulator", "-p", $"{DtsPort}:8080", "-e", "DTS_USE_DYNAMIC_TASK_HUBS=true", "mcr.microsoft.com/dts/dts-emulator:latest"); } } private async Task StartRedisAsync() { if (!await this.IsRedisRunningAsync()) { this.OutputHelper.WriteLine("Starting Redis..."); await this.RunCommandAsync("docker", "run", "-d", "--name", "redis", "-p", $"{RedisPort}:6379", "redis:latest"); } } private async Task IsDtsEmulatorRunningAsync() { this.OutputHelper.WriteLine($"Checking if DTS emulator is running at http://localhost:{DtsPort}/healthz..."); using HttpClient http2Client = new() { DefaultRequestVersion = new Version(2, 0), DefaultVersionPolicy = HttpVersionPolicy.RequestVersionExact }; try { using CancellationTokenSource timeoutCts = new(TimeSpan.FromSeconds(30)); using HttpResponseMessage response = await http2Client.GetAsync( new Uri($"http://localhost:{DtsPort}/healthz"), timeoutCts.Token); if (response.Content.Headers.ContentLength > 0) { string content = await response.Content.ReadAsStringAsync(timeoutCts.Token); this.OutputHelper.WriteLine($"DTS emulator health check response: {content}"); } bool isRunning = response.IsSuccessStatusCode; this.OutputHelper.WriteLine(isRunning ? "DTS emulator is running" : $"DTS emulator not running. Status: {response.StatusCode}"); return isRunning; } catch (HttpRequestException ex) { this.OutputHelper.WriteLine($"DTS emulator is not running: {ex.Message}"); return false; } } private async Task IsRedisRunningAsync() { this.OutputHelper.WriteLine($"Checking if Redis is running at localhost:{RedisPort}..."); try { using CancellationTokenSource timeoutCts = new(TimeSpan.FromSeconds(30)); ProcessStartInfo startInfo = new() { FileName = "docker", Arguments = "exec redis redis-cli ping", UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true }; using Process process = new() { StartInfo = startInfo }; if (!process.Start()) { this.OutputHelper.WriteLine("Failed to start docker exec command"); return false; } string output = await process.StandardOutput.ReadToEndAsync(timeoutCts.Token); await process.WaitForExitAsync(timeoutCts.Token); bool isRunning = process.ExitCode == 0 && output.Contains("PONG", StringComparison.OrdinalIgnoreCase); this.OutputHelper.WriteLine(isRunning ? "Redis is running" : $"Redis not running. Exit: {process.ExitCode}, Output: {output}"); return isRunning; } catch (Exception ex) { this.OutputHelper.WriteLine($"Redis is not running: {ex.Message}"); return false; } } private async Task BuildSampleAsync(string samplePath) { this.OutputHelper.WriteLine($"Building sample at {samplePath}..."); ProcessStartInfo buildInfo = new() { FileName = "dotnet", Arguments = $"build --framework {DotnetTargetFramework}", WorkingDirectory = samplePath, UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, }; using Process buildProcess = new() { StartInfo = buildInfo }; buildProcess.Start(); // Read both streams asynchronously to avoid deadlocks from filled pipe buffers Task stdoutTask = buildProcess.StandardOutput.ReadToEndAsync(); Task stderrTask = buildProcess.StandardError.ReadToEndAsync(); using CancellationTokenSource buildCts = new(TimeSpan.FromMinutes(5)); try { await buildProcess.WaitForExitAsync(buildCts.Token); } catch (OperationCanceledException) { buildProcess.Kill(entireProcessTree: true); throw new TimeoutException($"Build timed out after 5 minutes for sample at {samplePath}."); } await Task.WhenAll(stdoutTask, stderrTask); string stdout = stdoutTask.Result; string stderr = stderrTask.Result; if (buildProcess.ExitCode != 0) { throw new InvalidOperationException($"Failed to build sample at {samplePath}:\n{stdout}\n{stderr}"); } this.OutputHelper.WriteLine($"Build completed for {samplePath}."); } private Process StartConsoleApp(string samplePath, BlockingCollection logs, string taskHubName) { ProcessStartInfo startInfo = new() { FileName = "dotnet", Arguments = $"run --no-build --framework {DotnetTargetFramework}", WorkingDirectory = samplePath, UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, RedirectStandardInput = true, }; string openAiEndpoint = Configuration["AZURE_OPENAI_ENDPOINT"] ?? throw new InvalidOperationException("The required AZURE_OPENAI_ENDPOINT env variable is not set."); string openAiDeployment = Configuration["AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"] ?? throw new InvalidOperationException("The required AZURE_OPENAI_CHAT_DEPLOYMENT_NAME env variable is not set."); void SetAndLogEnvironmentVariable(string key, string value) { this.OutputHelper.WriteLine($"Setting environment variable for {startInfo.FileName} sub-process: {key}={value}"); startInfo.EnvironmentVariables[key] = value; } SetAndLogEnvironmentVariable("AZURE_OPENAI_ENDPOINT", openAiEndpoint); SetAndLogEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT", openAiDeployment); SetAndLogEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING", $"Endpoint=http://localhost:{DtsPort};TaskHub={taskHubName};Authentication=None"); this.ConfigureAdditionalEnvironmentVariables(startInfo, SetAndLogEnvironmentVariable); Process process = new() { StartInfo = startInfo, EnableRaisingEvents = true }; process.ErrorDataReceived += (sender, e) => this.HandleProcessOutput(e.Data, startInfo.FileName, "err", LogLevel.Error, logs); process.OutputDataReceived += (sender, e) => this.HandleProcessOutput(e.Data, startInfo.FileName, "out", LogLevel.Information, logs); // When the process exits unexpectedly (e.g. build failure), complete the log collection // so that ReadLogLine returns null immediately instead of blocking until the test timeout. process.Exited += (sender, e) => { if (!logs.IsAddingCompleted) { logs.CompleteAdding(); } }; if (!process.Start()) { throw new InvalidOperationException("Failed to start the console app"); } process.BeginErrorReadLine(); process.BeginOutputReadLine(); return process; } private void HandleProcessOutput(string? data, string processName, string stream, LogLevel level, BlockingCollection logs) { if (data is null) { return; } string logMessage = $"{DateTime.Now:HH:mm:ss.fff} [{processName}({stream})]: {data}"; this.OutputHelper.WriteLine(logMessage); Debug.WriteLine(logMessage); try { logs.Add(new OutputLog(DateTime.Now, level, data)); } catch (InvalidOperationException) { // Collection completed } } private async Task RunCommandAsync(string command, params string[] args) { ProcessStartInfo startInfo = new() { FileName = command, Arguments = string.Join(" ", args), UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true }; this.OutputHelper.WriteLine($"Running command: {command} {string.Join(" ", args)}"); using Process process = new() { StartInfo = startInfo }; process.ErrorDataReceived += (sender, e) => this.OutputHelper.WriteLine($"[{command}(err)]: {e.Data}"); process.OutputDataReceived += (sender, e) => this.OutputHelper.WriteLine($"[{command}(out)]: {e.Data}"); if (!process.Start()) { throw new InvalidOperationException("Failed to start the command"); } process.BeginErrorReadLine(); process.BeginOutputReadLine(); using CancellationTokenSource cts = new(TimeSpan.FromMinutes(1)); await process.WaitForExitAsync(cts.Token); this.OutputHelper.WriteLine($"Command completed with exit code: {process.ExitCode}"); } private async Task StopProcessAsync(Process process) { try { if (!process.HasExited) { this.OutputHelper.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Killing process {process.ProcessName}#{process.Id}"); process.Kill(entireProcessTree: true); using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10)); await process.WaitForExitAsync(cts.Token); this.OutputHelper.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Process exited: {process.Id}"); } } catch (Exception ex) { this.OutputHelper.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Failed to stop process: {ex.Message}"); } } }