mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
27671974c2
Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/220699b9-7f9e-4d5d-87d0-fb621d169d84 Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> Co-authored-by: Jacob Alber <jaalber@microsoft.com>
345 lines
16 KiB
C#
345 lines
16 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
using FluentAssertions;
|
|
using Microsoft.Agents.AI.Workflows.Observability;
|
|
|
|
namespace Microsoft.Agents.AI.Workflows.UnitTests;
|
|
|
|
/// <summary>
|
|
/// Regression test for https://github.com/microsoft/agent-framework/issues/4155
|
|
/// Verifies that the workflow_invoke Activity is properly stopped/disposed so it gets exported
|
|
/// to telemetry backends. The ActivityStopped callback must fire for the workflow_invoke span.
|
|
/// </summary>
|
|
[Collection("ObservabilityTests")]
|
|
public sealed class WorkflowRunActivityStopTests : IDisposable
|
|
{
|
|
private readonly ActivityListener _activityListener;
|
|
private readonly ConcurrentBag<Activity> _startedActivities = [];
|
|
private readonly ConcurrentBag<Activity> _stoppedActivities = [];
|
|
private bool _isDisposed;
|
|
|
|
public WorkflowRunActivityStopTests()
|
|
{
|
|
this._activityListener = new ActivityListener
|
|
{
|
|
ShouldListenTo = source => source.Name.Contains(typeof(Workflow).Namespace!),
|
|
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllData,
|
|
ActivityStarted = activity => this._startedActivities.Add(activity),
|
|
ActivityStopped = activity => this._stoppedActivities.Add(activity),
|
|
};
|
|
ActivitySource.AddActivityListener(this._activityListener);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (!this._isDisposed)
|
|
{
|
|
this._activityListener?.Dispose();
|
|
this._isDisposed = true;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a simple sequential workflow with OpenTelemetry enabled.
|
|
/// </summary>
|
|
private static Workflow CreateWorkflow()
|
|
{
|
|
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
|
|
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
|
|
|
|
Func<string, string> reverseFunc = s => new string(s.Reverse().ToArray());
|
|
var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor");
|
|
|
|
WorkflowBuilder builder = new(uppercase);
|
|
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
|
|
|
|
return builder.WithOpenTelemetry().Build();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that the workflow_invoke Activity is stopped (and thus exportable) when
|
|
/// using the Lockstep execution environment.
|
|
/// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never
|
|
/// disposed because yield break in async iterators does not trigger using disposal.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task WorkflowRunActivity_IsStopped_LockstepAsync()
|
|
{
|
|
// Arrange
|
|
using var testActivity = new Activity("WorkflowRunStopTest_Lockstep").Start();
|
|
|
|
// Act
|
|
var workflow = CreateWorkflow();
|
|
Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!");
|
|
await run.DisposeAsync();
|
|
|
|
// Assert - workflow.session should have been started and stopped
|
|
var startedSessions = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal))
|
|
.ToList();
|
|
startedSessions.Should().HaveCount(1, "workflow.session Activity should be started");
|
|
|
|
var stoppedSessions = this._stoppedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal))
|
|
.ToList();
|
|
stoppedSessions.Should().HaveCount(1,
|
|
"workflow.session Activity should be stopped/disposed so it is exported to telemetry backends");
|
|
|
|
// Assert - workflow_invoke should have been started and stopped
|
|
var startedWorkflowRuns = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started");
|
|
|
|
var stoppedWorkflowRuns = this._stoppedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
stoppedWorkflowRuns.Should().HaveCount(1,
|
|
"workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that the workflow_invoke Activity is stopped when using the OffThread (Default)
|
|
/// execution environment (StreamingRunEventStream).
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task WorkflowRunActivity_IsStopped_OffThreadAsync()
|
|
{
|
|
// Arrange
|
|
using var testActivity = new Activity("WorkflowRunStopTest_OffThread").Start();
|
|
|
|
// Act
|
|
var workflow = CreateWorkflow();
|
|
Run run = await InProcessExecution.OffThread.RunAsync(workflow, "Hello, World!");
|
|
await run.DisposeAsync();
|
|
|
|
// Assert - workflow.session should have been started and stopped
|
|
var startedSessions = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal))
|
|
.ToList();
|
|
startedSessions.Should().HaveCount(1, "workflow.session Activity should be started");
|
|
|
|
var stoppedSessions = this._stoppedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal))
|
|
.ToList();
|
|
stoppedSessions.Should().HaveCount(1,
|
|
"workflow.session Activity should be stopped/disposed so it is exported to telemetry backends");
|
|
|
|
// Assert - workflow_invoke should have been started and stopped
|
|
var startedWorkflowRuns = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started");
|
|
|
|
var stoppedWorkflowRuns = this._stoppedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
stoppedWorkflowRuns.Should().HaveCount(1,
|
|
"workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that the workflow_invoke Activity is stopped when using the streaming API
|
|
/// (StreamingRun.WatchStreamAsync) with the OffThread execution environment.
|
|
/// This matches the exact usage pattern described in the issue.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync()
|
|
{
|
|
// Arrange
|
|
using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread").Start();
|
|
|
|
// Act - use streaming path (WatchStreamAsync), which is the pattern from the issue
|
|
var workflow = CreateWorkflow();
|
|
StreamingRun run = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!");
|
|
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
|
|
{
|
|
// Consume all events
|
|
}
|
|
|
|
// Dispose the run before asserting — the run Activity is disposed when the
|
|
// run loop exits, which happens during DisposeAsync. Without this, assertions
|
|
// can race against the background run loop's finally block.
|
|
await run.DisposeAsync();
|
|
|
|
// Assert - workflow.session should have been started
|
|
var startedSessions = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal))
|
|
.ToList();
|
|
startedSessions.Should().HaveCount(1, "workflow.session Activity should be started");
|
|
|
|
// Assert - workflow_invoke should have been started
|
|
var startedWorkflowRuns = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started");
|
|
|
|
// Assert - workflow_invoke should have been stopped
|
|
var stoppedWorkflowRuns = this._stoppedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
stoppedWorkflowRuns.Should().HaveCount(1,
|
|
"workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that a new workflow_invoke activity is started and stopped for each
|
|
/// streaming invocation, even when using the same workflow in a multi-turn pattern,
|
|
/// and that each session gets its own session activity.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsync()
|
|
{
|
|
// Arrange
|
|
using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread_MultiTurn").Start();
|
|
|
|
var workflow = CreateWorkflow();
|
|
|
|
// Act - first streaming run
|
|
await using (StreamingRun run1 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!"))
|
|
{
|
|
await foreach (WorkflowEvent evt in run1.WatchStreamAsync())
|
|
{
|
|
// Consume all events from first turn
|
|
}
|
|
}
|
|
|
|
// Act - second streaming run (multi-turn scenario with same workflow)
|
|
await using (StreamingRun run2 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Second turn!"))
|
|
{
|
|
await foreach (WorkflowEvent evt in run2.WatchStreamAsync())
|
|
{
|
|
// Consume all events from second turn
|
|
}
|
|
}
|
|
|
|
// Assert - two workflow.session activities should have been started and stopped
|
|
var startedSessions = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal))
|
|
.ToList();
|
|
startedSessions.Should().HaveCount(2,
|
|
"each streaming invocation should start its own workflow.session Activity");
|
|
|
|
var stoppedSessions = this._stoppedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal))
|
|
.ToList();
|
|
stoppedSessions.Should().HaveCount(2,
|
|
"each workflow.session Activity should be stopped/disposed so it is exported to telemetry backends");
|
|
|
|
// Assert - two workflow_invoke activities should have been started and stopped
|
|
var startedWorkflowRuns = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
startedWorkflowRuns.Should().HaveCount(2,
|
|
"each streaming invocation should start its own workflow_invoke Activity");
|
|
|
|
var stoppedWorkflowRuns = this._stoppedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
stoppedWorkflowRuns.Should().HaveCount(2,
|
|
"each workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that all started activities (not just workflow_invoke) are properly stopped.
|
|
/// This ensures no spans are "leaked" without being exported.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task AllActivities_AreStopped_AfterWorkflowCompletionAsync()
|
|
{
|
|
// Arrange
|
|
using var testActivity = new Activity("AllActivitiesStopTest").Start();
|
|
|
|
// Act
|
|
var workflow = CreateWorkflow();
|
|
Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!");
|
|
await run.DisposeAsync();
|
|
|
|
// Assert - every started activity should also be stopped
|
|
var started = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId)
|
|
.Select(a => a.Id)
|
|
.ToHashSet();
|
|
|
|
var stopped = this._stoppedActivities
|
|
.Where(a => a.RootId == testActivity.RootId)
|
|
.Select(a => a.Id)
|
|
.ToHashSet();
|
|
|
|
var neverStopped = started.Except(stopped).ToList();
|
|
if (neverStopped.Count > 0)
|
|
{
|
|
var neverStoppedNames = this._startedActivities
|
|
.Where(a => neverStopped.Contains(a.Id))
|
|
.Select(a => a.OperationName)
|
|
.ToList();
|
|
neverStoppedNames.Should().BeEmpty(
|
|
"all started activities should be stopped so they are exported. " +
|
|
$"Activities started but never stopped: [{string.Join(", ", neverStoppedNames)}]");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that Activity.Current is not leaked after lockstep RunAsync.
|
|
/// Application code creating activities after RunAsync returns should not
|
|
/// be parented under the workflow session span. The run activity should
|
|
/// still nest correctly under the session.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Lockstep_SessionActivity_DoesNotLeak_IntoCaller_ActivityCurrentAsync()
|
|
{
|
|
// Arrange
|
|
using var testActivity = new Activity("SessionLeakTest").Start();
|
|
var workflow = CreateWorkflow();
|
|
|
|
// Act — run the workflow via lockstep (Start + drain happen inside RunAsync)
|
|
Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!");
|
|
|
|
// Create an application activity after RunAsync returns.
|
|
// If the session leaked into Activity.Current, this would be parented under it.
|
|
using var appActivity = new Activity("AppWork").Start();
|
|
appActivity.Stop();
|
|
|
|
await run.DisposeAsync();
|
|
|
|
// Assert — the app activity should be parented under the test root, not the session
|
|
var sessionActivities = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal))
|
|
.ToList();
|
|
sessionActivities.Should().HaveCount(1, "one session activity should exist");
|
|
|
|
appActivity.ParentId.Should().Be(testActivity.Id,
|
|
"application activity should be parented under the test root, not the workflow session");
|
|
|
|
// Assert — the run activity should still be parented under the session
|
|
var invokeActivities = this._startedActivities
|
|
.Where(a => a.RootId == testActivity.RootId &&
|
|
a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal))
|
|
.ToList();
|
|
invokeActivities.Should().HaveCount(1, "one workflow_invoke activity should exist");
|
|
invokeActivities[0].ParentId.Should().Be(sessionActivities[0].Id,
|
|
"workflow_invoke activity should be nested under the session activity");
|
|
}
|
|
}
|