Files
agent-framework/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs
Tao Chen e3b4b6662b .NET: Workflow telemetry opt in (#3467)
* feat(workflows): Make telemetry opt-in via WithOpenTelemetry()

- Add WorkflowTelemetryOptions class with EnableSensitiveData property
- Add WorkflowTelemetryContext to manage ActivitySource lifecycle
- Add WithOpenTelemetry() extension method on WorkflowBuilder
- Update all workflow components to use telemetry context:
  - WorkflowBuilder, Workflow, Executor
  - InProcessRunnerContext, InProcessRunner
  - LockstepRunEventStream, StreamingRunEventStream
  - All edge runners (Direct, FanIn, FanOut, Response)
- Telemetry is now disabled by default
- Users must call WithOpenTelemetry() to enable spans/activities

BREAKING CHANGE: Workflow telemetry is now opt-in. Users who relied on
automatic telemetry must add .WithOpenTelemetry() to their workflow builder.

* refactor: Pass telemetry context as parameter instead of via interface

- Remove IWorkflowContextWithTelemetry interface
- Add internal ExecuteAsync overload that accepts WorkflowTelemetryContext
- Public ExecuteAsync delegates with WorkflowTelemetryContext.Disabled
- InProcessRunner passes TelemetryContext when calling ExecuteAsync
- BoundContext now implements IWorkflowContext (not the removed interface)

* Add optional ActivitySource parameter to WithOpenTelemetry

Allow users to provide their own ActivitySource when enabling telemetry,
giving them better control over the ActivitySource lifecycle. When not
provided, the framework creates one internally (existing behavior).

Changes:
- Add optional activitySource parameter to WithOpenTelemetry() extension
- Update WorkflowTelemetryContext to accept external ActivitySource
- Add unit test for user-provided ActivitySource scenario

* Add component-level telemetry control with disable flags

Allow users to selectively disable specific activity types via
WorkflowTelemetryOptions. All activities are enabled by default.

New disable flags:
- DisableWorkflowBuild: Disables workflow.build activities
- DisableWorkflowRun: Disables workflow_invoke activities
- DisableExecutorProcess: Disables executor.process activities
- DisableEdgeGroupProcess: Disables edge_group.process activities
- DisableMessageSend: Disables message.send activities

Added helper methods to WorkflowTelemetryContext for each activity type
and updated all activity creation sites to use them.

* Implement EnableSensitiveData to log executor input/output

When EnableSensitiveData is true in WorkflowTelemetryOptions, executor
input and output are logged as JSON-serialized attributes in the
executor.process activity.

New activity tags:
- executor.input: JSON serialized input message
- executor.output: JSON serialized output result (non-void only)

Added suppression attributes for AOT/trimming warnings since this is
an opt-in feature for debugging/diagnostics.

* Refactor activity start methods to centralize tagging logic

Move tagging logic into WorkflowTelemetryContext methods:
- StartExecutorProcessActivity now accepts executorId, executorType,
  messageType, and message; sets all tags including executor.input
  when EnableSensitiveData is true
- Added SetExecutorOutput method to set executor.output after execution
- StartMessageSendActivity now accepts sourceId, targetId, and message;
  sets all tags including message.content when EnableSensitiveData is true

Simplified Executor.cs and InProcessRunnerContext.cs by removing
inline tagging code. Added message.content tag constant.

* Revert Python changes

* Update samples and code cleanup

* Fix file formatting

* Add comment

* Add telemetry configuration to declarative workflow

* Remove delays in tests

* Address comments
2026-02-09 23:10:50 +00:00

504 lines
22 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.InProc;
using Microsoft.Agents.AI.Workflows.Observability;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
/// <summary>
/// These tests ensure that OpenTelemetry Activity traces are properly created for workflow monitoring.
/// Tests are run in a collection to avoid parallel execution since ActivityListener is global.
/// Each test creates a new instance of ObservabilityTests and runs in serial within the collection.
/// This prevents interference between tests due to the global nature of ActivityListener.
/// </summary>
[Collection("ObservabilityTests")]
public sealed class ObservabilityTests : IDisposable
{
private readonly ActivityListener _activityListener;
private readonly ConcurrentBag<Activity> _capturedActivities = [];
private bool _isDisposed;
public ObservabilityTests()
{
// Set up activity listener to capture activities from workflow
// This is global and captures ALL workflow activities from ANY test in the same process!
this._activityListener = new ActivityListener
{
ShouldListenTo = source => source.Name.Contains(typeof(Workflow).Namespace!),
Sample = (ref options) => ActivitySamplingResult.AllData,
ActivityStarted = activity => this._capturedActivities.Add(activity),
};
ActivitySource.AddActivityListener(this._activityListener);
}
/// <summary>
/// Create a sample workflow for testing.
/// </summary>
/// <remarks>
/// This workflow is expected to create 8 activities that will be captured by the tests
/// - ActivityNames.WorkflowBuild
/// - ActivityNames.WorkflowRun
/// -- ActivityNames.EdgeGroupProcess
/// -- ActivityNames.ExecutorProcess (UppercaseExecutor)
/// --- ActivityNames.MessageSend
/// ---- ActivityNames.EdgeGroupProcess
/// -- ActivityNames.ExecutorProcess (ReverseTextExecutor)
/// --- ActivityNames.MessageSend
/// </remarks>
/// <returns>The created workflow.</returns>
private static Workflow CreateWorkflow()
{
// Create the executors
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
Func<string, string> reverseFunc = s => new string(s.Reverse().ToArray());
var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor");
// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
return builder.WithOpenTelemetry().Build();
}
private static Dictionary<string, int> GetExpectedActivityNameCounts() =>
new()
{
{ ActivityNames.WorkflowBuild, 1 },
{ ActivityNames.WorkflowRun, 1 },
{ ActivityNames.EdgeGroupProcess, 2 },
{ ActivityNames.ExecutorProcess, 2 },
{ ActivityNames.MessageSend, 2 }
};
private static InProcessExecutionEnvironment GetExecutionEnvironment(string name) =>
name switch
{
"Default" => InProcessExecution.Default,
"Lockstep" => InProcessExecution.Lockstep,
"OffThread" => InProcessExecution.OffThread,
"Concurrent" => InProcessExecution.Concurrent,
_ => throw new ArgumentException($"Unknown execution environment name: {name}")
};
public void Dispose()
{
if (!this._isDisposed)
{
this._activityListener?.Dispose();
this._isDisposed = true;
}
}
private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironmentName)
{
// Arrange
// Create a test activity to correlate captured activities
using var testActivity = new Activity("ObservabilityTest").Start();
// Act
var workflow = CreateWorkflow();
var executionEnvironment = GetExecutionEnvironment(executionEnvironmentName);
Run run = await executionEnvironment.RunAsync(workflow, "Hello, World!");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().HaveCount(8, "Exactly 8 activities should be created.");
// Make sure all expected activities exist and have the correct count
foreach (var kvp in GetExpectedActivityNameCounts())
{
var activityName = kvp.Key;
var expectedCount = kvp.Value;
var actualCount = capturedActivities.Count(a => a.OperationName.StartsWith(activityName, StringComparison.Ordinal));
actualCount.Should().Be(expectedCount, $"Activity '{activityName}' should occur {expectedCount} times.");
}
// Verify WorkflowRun activity events include workflow lifecycle events
var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal));
var activityEvents = workflowRunActivity.Events.ToList();
activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowStarted, "activity should have workflow started event");
activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowCompleted, "activity should have workflow completed event");
}
[Fact]
public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_DefaultAsync()
{
await this.TestWorkflowEndToEndActivitiesAsync("Default");
}
[Fact]
public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_OffThreadAsync()
{
await this.TestWorkflowEndToEndActivitiesAsync("OffThread");
}
[Fact]
public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_ConcurrentAsync()
{
await this.TestWorkflowEndToEndActivitiesAsync("Concurrent");
}
[Fact]
public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_LockstepAsync()
{
await this.TestWorkflowEndToEndActivitiesAsync("Lockstep");
}
[Fact]
public async Task CreatesWorkflowActivities_WithCorrectNameAsync()
{
// Arrange
// Create a test activity to correlate captured activities
using var testActivity = new Activity("ObservabilityTest").Start();
// Act
CreateWorkflow();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().HaveCount(1, "Exactly 1 activity should be created.");
capturedActivities[0].OperationName.Should().Be(ActivityNames.WorkflowBuild,
"The activity should have the correct operation name for workflow build.");
var events = capturedActivities[0].Events.ToList();
events.Should().Contain(e => e.Name == EventNames.BuildStarted, "activity should have build started event");
events.Should().Contain(e => e.Name == EventNames.BuildValidationCompleted, "activity should have build validation completed event");
events.Should().Contain(e => e.Name == EventNames.BuildCompleted, "activity should have build completed event");
var tags = capturedActivities[0].Tags.ToDictionary(t => t.Key, t => t.Value);
tags.Should().ContainKey(Tags.WorkflowId);
tags.Should().ContainKey(Tags.WorkflowDefinition);
}
[Fact]
public async Task TelemetryDisabledByDefault_CreatesNoActivitiesAsync()
{
// Arrange
// Create a test activity to correlate captured activities
using var testActivity = new Activity("ObservabilityTest").Start();
// Act - Build workflow WITHOUT calling WithOpenTelemetry()
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
WorkflowBuilder builder = new(uppercase);
builder.Build(); // No WithOpenTelemetry() call
// Assert - No activities should be created
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().BeEmpty("No activities should be created when telemetry is disabled (default).");
}
[Fact]
public async Task WithOpenTelemetry_UsesProvidedActivitySourceAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
using var userActivitySource = new ActivitySource("UserProvidedSource");
// Set up a separate listener for the user-provided source
ConcurrentBag<Activity> userActivities = [];
using var userListener = new ActivityListener
{
ShouldListenTo = source => source.Name == "UserProvidedSource",
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllData,
ActivityStarted = activity => userActivities.Add(activity),
};
ActivitySource.AddActivityListener(userListener);
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
// Act
WorkflowBuilder builder = new(uppercase);
var workflow = builder.WithOpenTelemetry(activitySource: userActivitySource).Build();
Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello");
await run.DisposeAsync();
// Assert
var capturedActivities = userActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().NotBeEmpty("Activities should be created with user-provided ActivitySource.");
capturedActivities.Should().OnlyContain(
a => a.Source.Name == "UserProvidedSource",
"All activities should come from the user-provided ActivitySource.");
}
[Fact]
public async Task DisableWorkflowBuild_PreventsWorkflowBuildActivityAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
// Act
WorkflowBuilder builder = new(uppercase);
builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowBuild = true).Build();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().NotContain(
a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal),
"WorkflowBuild activity should be disabled.");
}
[Fact]
public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
// Act
WorkflowBuilder builder = new(uppercase);
builder.WithOutputFrom(uppercase);
var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowRun = true).Build();
Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().NotContain(
a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal),
"WorkflowRun activity should be disabled.");
capturedActivities.Should().Contain(
a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal),
"Other activities should still be created.");
}
[Fact]
public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
// Act
WorkflowBuilder builder = new(uppercase);
builder.WithOutputFrom(uppercase);
var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableExecutorProcess = true).Build();
Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().NotContain(
a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal),
"ExecutorProcess activity should be disabled.");
capturedActivities.Should().Contain(
a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal),
"Other activities should still be created.");
}
[Fact]
public async Task DisableEdgeGroupProcess_PreventsEdgeGroupProcessActivityAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
var workflow = CreateWorkflowWithDisabledEdges();
// Act
Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().NotContain(
a => a.OperationName.StartsWith(ActivityNames.EdgeGroupProcess, StringComparison.Ordinal),
"EdgeGroupProcess activity should be disabled.");
capturedActivities.Should().Contain(
a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal),
"Other activities should still be created.");
}
[Fact]
public async Task DisableMessageSend_PreventsMessageSendActivityAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
var workflow = CreateWorkflowWithDisabledMessages();
// Act
Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
capturedActivities.Should().NotContain(
a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal),
"MessageSend activity should be disabled.");
capturedActivities.Should().Contain(
a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal),
"Other activities should still be created.");
}
private static Workflow CreateWorkflowWithDisabledEdges()
{
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
Func<string, string> reverseFunc = s => new string(s.Reverse().ToArray());
var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor");
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
return builder.WithOpenTelemetry(configure: opts => opts.DisableEdgeGroupProcess = true).Build();
}
private static Workflow CreateWorkflowWithDisabledMessages()
{
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
Func<string, string> reverseFunc = s => new string(s.Reverse().ToArray());
var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor");
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
return builder.WithOpenTelemetry(configure: opts => opts.DisableMessageSend = true).Build();
}
[Fact]
public async Task EnableSensitiveData_LogsExecutorInputAndOutputAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
// Act
WorkflowBuilder builder = new(uppercase);
builder.WithOutputFrom(uppercase);
var workflow = builder.WithOpenTelemetry(configure: opts => opts.EnableSensitiveData = true).Build();
Run run = await InProcessExecution.Default.RunAsync(workflow, "hello");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
var executorActivity = capturedActivities.FirstOrDefault(
a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal));
executorActivity.Should().NotBeNull("ExecutorProcess activity should be created.");
var tags = executorActivity!.Tags.ToDictionary(t => t.Key, t => t.Value);
tags.Should().ContainKey(Tags.ExecutorInput, "Input should be logged when EnableSensitiveData is true.");
tags.Should().ContainKey(Tags.ExecutorOutput, "Output should be logged when EnableSensitiveData is true.");
tags[Tags.ExecutorInput].Should().Contain("hello", "Input should contain the input value.");
tags[Tags.ExecutorOutput].Should().Contain("HELLO", "Output should contain the transformed value.");
}
[Fact]
public async Task EnableSensitiveData_Disabled_DoesNotLogInputOutputAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
// Act - EnableSensitiveData is false by default
WorkflowBuilder builder = new(uppercase);
builder.WithOutputFrom(uppercase);
var workflow = builder.WithOpenTelemetry().Build();
Run run = await InProcessExecution.Default.RunAsync(workflow, "hello");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
var executorActivity = capturedActivities.FirstOrDefault(
a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal));
executorActivity.Should().NotBeNull("ExecutorProcess activity should be created.");
var tags = executorActivity!.Tags.ToDictionary(t => t.Key, t => t.Value);
tags.Should().NotContainKey(Tags.ExecutorInput, "Input should NOT be logged when EnableSensitiveData is false.");
tags.Should().NotContainKey(Tags.ExecutorOutput, "Output should NOT be logged when EnableSensitiveData is false.");
}
[Fact]
public async Task EnableSensitiveData_LogsMessageSendContentAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
Func<string, string> reverseFunc = s => new string(s.Reverse().ToArray());
var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor");
// Act
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.WithOpenTelemetry(configure: opts => opts.EnableSensitiveData = true).Build();
Run run = await InProcessExecution.Default.RunAsync(workflow, "hello");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
var messageSendActivity = capturedActivities.FirstOrDefault(
a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal));
messageSendActivity.Should().NotBeNull("MessageSend activity should be created.");
var tags = messageSendActivity!.Tags.ToDictionary(t => t.Key, t => t.Value);
tags.Should().ContainKey(Tags.MessageContent, "Message content should be logged when EnableSensitiveData is true.");
tags.Should().ContainKey(Tags.MessageSourceId, "Source ID should be logged.");
}
[Fact]
public async Task EnableSensitiveData_Disabled_DoesNotLogMessageContentAsync()
{
// Arrange
using var testActivity = new Activity("ObservabilityTest").Start();
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
Func<string, string> reverseFunc = s => new string(s.Reverse().ToArray());
var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor");
// Act - EnableSensitiveData is false by default
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.WithOpenTelemetry().Build();
Run run = await InProcessExecution.Default.RunAsync(workflow, "hello");
await run.DisposeAsync();
// Assert
var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList();
var messageSendActivity = capturedActivities.FirstOrDefault(
a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal));
messageSendActivity.Should().NotBeNull("MessageSend activity should be created.");
var tags = messageSendActivity!.Tags.ToDictionary(t => t.Key, t => t.Value);
tags.Should().NotContainKey(Tags.MessageContent, "Message content should NOT be logged when EnableSensitiveData is false.");
tags.Should().ContainKey(Tags.MessageSourceId, "Source ID should still be logged.");
}
}