// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using FluentAssertions; using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.UnitTests; /// /// Regression test for https://github.com/microsoft/agent-framework/issues/4155 /// Verifies that the workflow_invoke Activity is properly stopped/disposed so it gets exported /// to telemetry backends. The ActivityStopped callback must fire for the workflow_invoke span. /// [Collection("ObservabilityTests")] public sealed class WorkflowRunActivityStopTests : IDisposable { private readonly ActivityListener _activityListener; private readonly ConcurrentBag _startedActivities = []; private readonly ConcurrentBag _stoppedActivities = []; private bool _isDisposed; public WorkflowRunActivityStopTests() { this._activityListener = new ActivityListener { ShouldListenTo = source => source.Name.Contains(typeof(Workflow).Namespace!), Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, ActivityStarted = activity => this._startedActivities.Add(activity), ActivityStopped = activity => this._stoppedActivities.Add(activity), }; ActivitySource.AddActivityListener(this._activityListener); } public void Dispose() { if (!this._isDisposed) { this._activityListener?.Dispose(); this._isDisposed = true; } } /// /// Creates a simple sequential workflow with OpenTelemetry enabled. /// private static Workflow CreateWorkflow() { Func uppercaseFunc = s => s.ToUpperInvariant(); var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); Func reverseFunc = s => new string(s.Reverse().ToArray()); var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); WorkflowBuilder builder = new(uppercase); builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); return builder.WithOpenTelemetry().Build(); } /// /// Verifies that the workflow_invoke Activity is stopped (and thus exportable) when /// using the Lockstep execution environment. /// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never /// disposed because yield break in async iterators does not trigger using disposal. /// [Fact] public async Task WorkflowRunActivity_IsStopped_LockstepAsync() { // Arrange using var testActivity = new Activity("WorkflowRunStopTest_Lockstep").Start(); // Act var workflow = CreateWorkflow(); Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); await run.DisposeAsync(); // Assert - workflow.session should have been started and stopped var startedSessions = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) .ToList(); startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); var stoppedSessions = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) .ToList(); stoppedSessions.Should().HaveCount(1, "workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); // Assert - workflow_invoke should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); } /// /// Verifies that the workflow_invoke Activity is stopped when using the OffThread (Default) /// execution environment (StreamingRunEventStream). /// [Fact] public async Task WorkflowRunActivity_IsStopped_OffThreadAsync() { // Arrange using var testActivity = new Activity("WorkflowRunStopTest_OffThread").Start(); // Act var workflow = CreateWorkflow(); Run run = await InProcessExecution.OffThread.RunAsync(workflow, "Hello, World!"); await run.DisposeAsync(); // Assert - workflow.session should have been started and stopped var startedSessions = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) .ToList(); startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); var stoppedSessions = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) .ToList(); stoppedSessions.Should().HaveCount(1, "workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); // Assert - workflow_invoke should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); } /// /// Verifies that the workflow_invoke Activity is stopped when using the streaming API /// (StreamingRun.WatchStreamAsync) with the OffThread execution environment. /// This matches the exact usage pattern described in the issue. /// [Fact] public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync() { // Arrange using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread").Start(); // Act - use streaming path (WatchStreamAsync), which is the pattern from the issue var workflow = CreateWorkflow(); StreamingRun run = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!"); await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { // Consume all events } // Dispose the run before asserting — the run Activity is disposed when the // run loop exits, which happens during DisposeAsync. Without this, assertions // can race against the background run loop's finally block. await run.DisposeAsync(); // Assert - workflow.session should have been started var startedSessions = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) .ToList(); startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); // Assert - workflow_invoke should have been started var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); // Assert - workflow_invoke should have been stopped var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); } /// /// Verifies that a new workflow_invoke activity is started and stopped for each /// streaming invocation, even when using the same workflow in a multi-turn pattern, /// and that each session gets its own session activity. /// [Fact] public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsync() { // Arrange using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread_MultiTurn").Start(); var workflow = CreateWorkflow(); // Act - first streaming run await using (StreamingRun run1 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!")) { await foreach (WorkflowEvent evt in run1.WatchStreamAsync()) { // Consume all events from first turn } } // Act - second streaming run (multi-turn scenario with same workflow) await using (StreamingRun run2 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Second turn!")) { await foreach (WorkflowEvent evt in run2.WatchStreamAsync()) { // Consume all events from second turn } } // Assert - two workflow.session activities should have been started and stopped var startedSessions = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) .ToList(); startedSessions.Should().HaveCount(2, "each streaming invocation should start its own workflow.session Activity"); var stoppedSessions = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) .ToList(); stoppedSessions.Should().HaveCount(2, "each workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); // Assert - two workflow_invoke activities should have been started and stopped var startedWorkflowRuns = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); startedWorkflowRuns.Should().HaveCount(2, "each streaming invocation should start its own workflow_invoke Activity"); var stoppedWorkflowRuns = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); stoppedWorkflowRuns.Should().HaveCount(2, "each workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios"); } /// /// Verifies that all started activities (not just workflow_invoke) are properly stopped. /// This ensures no spans are "leaked" without being exported. /// [Fact] public async Task AllActivities_AreStopped_AfterWorkflowCompletionAsync() { // Arrange using var testActivity = new Activity("AllActivitiesStopTest").Start(); // Act var workflow = CreateWorkflow(); Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); await run.DisposeAsync(); // Assert - every started activity should also be stopped var started = this._startedActivities .Where(a => a.RootId == testActivity.RootId) .Select(a => a.Id) .ToHashSet(); var stopped = this._stoppedActivities .Where(a => a.RootId == testActivity.RootId) .Select(a => a.Id) .ToHashSet(); var neverStopped = started.Except(stopped).ToList(); if (neverStopped.Count > 0) { var neverStoppedNames = this._startedActivities .Where(a => neverStopped.Contains(a.Id)) .Select(a => a.OperationName) .ToList(); neverStoppedNames.Should().BeEmpty( "all started activities should be stopped so they are exported. " + $"Activities started but never stopped: [{string.Join(", ", neverStoppedNames)}]"); } } /// /// Verifies that Activity.Current is not leaked after lockstep RunAsync. /// Application code creating activities after RunAsync returns should not /// be parented under the workflow session span. The run activity should /// still nest correctly under the session. /// [Fact] public async Task Lockstep_SessionActivity_DoesNotLeak_IntoCaller_ActivityCurrentAsync() { // Arrange using var testActivity = new Activity("SessionLeakTest").Start(); var workflow = CreateWorkflow(); // Act — run the workflow via lockstep (Start + drain happen inside RunAsync) Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); // Create an application activity after RunAsync returns. // If the session leaked into Activity.Current, this would be parented under it. using var appActivity = new Activity("AppWork").Start(); appActivity.Stop(); await run.DisposeAsync(); // Assert — the app activity should be parented under the test root, not the session var sessionActivities = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) .ToList(); sessionActivities.Should().HaveCount(1, "one session activity should exist"); appActivity.ParentId.Should().Be(testActivity.Id, "application activity should be parented under the test root, not the workflow session"); // Assert — the run activity should still be parented under the session var invokeActivities = this._startedActivities .Where(a => a.RootId == testActivity.RootId && a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) .ToList(); invokeActivities.Should().HaveCount(1, "one workflow_invoke activity should exist"); invokeActivities[0].ParentId.Should().Be(sessionActivities[0].Id, "workflow_invoke activity should be nested under the session activity"); } }