// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Azure.AI.AgentServer.Responses; using Azure.AI.AgentServer.Responses.Models; using Microsoft.Extensions.AI; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Moq; using OpenTelemetry; using OpenTelemetry.Trace; using MeaiTextContent = Microsoft.Extensions.AI.TextContent; namespace Microsoft.Agents.AI.Foundry.Hosting.UnitTests; /// /// Tests that verify OTel spans are actually emitted and captured through the /// pipeline when /// wraps the resolved agent. /// public class AgentFrameworkResponseHandlerTelemetryTests { /// /// The ActivitySource name used by ApplyOpenTelemetry() — equals AgentHostTelemetry.ResponsesSourceName. /// Declared as a constant so the TracerProvider and assertions reference the same literal. /// private const string ResponsesSourceName = "Azure.AI.AgentServer.Responses"; [Fact] public async Task CreateAsync_DefaultAgent_EmitsInvokeAgentSpanAsync() { // Arrange var activities = new ConcurrentActivityList(); using var tracerProvider = Sdk.CreateTracerProviderBuilder() .AddSource(ResponsesSourceName) .AddInMemoryExporter(activities) .Build(); var agent = new TelemetryTestAgent(); var services = new ServiceCollection(); services.AddSingleton(new InMemoryAgentSessionStore()); services.AddSingleton(agent); services.AddSingleton(new FakeHostedSessionIsolationKeyProvider()); var sp = services.BuildServiceProvider(); var handler = new AgentFrameworkResponseHandler(sp, NullLogger.Instance); var (request, context) = BuildRequest(); // Act — enumerate all events so the span completes before asserting await foreach (var _ in handler.CreateAsync(request, context, CancellationToken.None)) { } // Assert — filter by agent name to isolate this test's span from any parallel test spans var mySpan = Assert.Single(activities.Snapshot().Where(a => TelemetryTestAgent.AgentName.Equals(a.GetTagItem("gen_ai.agent.name"))).ToList()); Assert.Equal("invoke_agent", mySpan.GetTagItem("gen_ai.operation.name")); Assert.NotNull(mySpan.GetTagItem("gen_ai.agent.id")); } [Fact] public async Task CreateAsync_KeyedAgent_EmitsInvokeAgentSpanAsync() { // Arrange var activities = new ConcurrentActivityList(); using var tracerProvider = Sdk.CreateTracerProviderBuilder() .AddSource(ResponsesSourceName) .AddInMemoryExporter(activities) .Build(); var agent = new TelemetryTestAgent(); var services = new ServiceCollection(); services.AddSingleton(new InMemoryAgentSessionStore()); services.AddKeyedSingleton("keyed-agent", agent); services.AddSingleton(new FakeHostedSessionIsolationKeyProvider()); var sp = services.BuildServiceProvider(); var handler = new AgentFrameworkResponseHandler(sp, NullLogger.Instance); var (request, context) = BuildRequest(agentKey: "keyed-agent"); // Act await foreach (var _ in handler.CreateAsync(request, context, CancellationToken.None)) { } // Assert — filter by agent name to isolate this test's span var mySpan = Assert.Single(activities.Snapshot().Where(a => TelemetryTestAgent.AgentName.Equals(a.GetTagItem("gen_ai.agent.name"))).ToList()); Assert.Equal("invoke_agent", mySpan.GetTagItem("gen_ai.operation.name")); } [Fact] public async Task CreateAsync_AlreadyInstrumentedAgent_EmitsSingleSpanPerRunAsync() { // Arrange — use a unique source for the pre-wrapped agent distinct from ResponsesSourceName. // If ApplyOpenTelemetry double-wraps, an extra span would appear on ResponsesSourceName. // If it correctly skips wrapping, only the pre-wrap's unique source emits spans. var preWrapSource = Guid.NewGuid().ToString(); var preWrapActivities = new ConcurrentActivityList(); var responsesActivities = new ConcurrentActivityList(); using var preWrapProvider = Sdk.CreateTracerProviderBuilder() .AddSource(preWrapSource) .AddInMemoryExporter(preWrapActivities) .Build(); using var responsesProvider = Sdk.CreateTracerProviderBuilder() .AddSource(ResponsesSourceName) .AddInMemoryExporter(responsesActivities) .Build(); var innerAgent = new TelemetryTestAgent(); var preWrapped = innerAgent.AsBuilder() .UseOpenTelemetry(sourceName: preWrapSource) .Build(); var services = new ServiceCollection(); services.AddSingleton(new InMemoryAgentSessionStore()); services.AddSingleton(preWrapped); services.AddSingleton(new FakeHostedSessionIsolationKeyProvider()); var sp = services.BuildServiceProvider(); var handler = new AgentFrameworkResponseHandler(sp, NullLogger.Instance); // Act var (request, context) = BuildRequest(); await foreach (var _ in handler.CreateAsync(request, context, CancellationToken.None)) { } // Assert — pre-wrap source emits exactly 1 span (agent ran) var preWrapSnapshot = preWrapActivities.Snapshot(); Assert.Single(preWrapSnapshot); Assert.Equal("invoke_agent", preWrapSnapshot[0].GetTagItem("gen_ai.operation.name")); // ResponsesSourceName emits 0 spans — ApplyOpenTelemetry skipped wrapping the pre-instrumented agent Assert.DoesNotContain(responsesActivities.Snapshot(), a => TelemetryTestAgent.AgentName.Equals(a.GetTagItem("gen_ai.agent.name"))); } [Fact] public async Task CreateAsync_DefaultAgent_SpanDisplayNameContainsAgentNameAsync() { // Arrange var activities = new ConcurrentActivityList(); using var tracerProvider = Sdk.CreateTracerProviderBuilder() .AddSource(ResponsesSourceName) .AddInMemoryExporter(activities) .Build(); var agent = new TelemetryTestAgent(); var services = new ServiceCollection(); services.AddSingleton(new InMemoryAgentSessionStore()); services.AddSingleton(agent); services.AddSingleton(new FakeHostedSessionIsolationKeyProvider()); var sp = services.BuildServiceProvider(); var handler = new AgentFrameworkResponseHandler(sp, NullLogger.Instance); var (request, context) = BuildRequest(); // Act await foreach (var _ in handler.CreateAsync(request, context, CancellationToken.None)) { } // Assert — display name follows "invoke_agent {Name}({Id})" convention; filter by agent name to isolate var mySpan = Assert.Single(activities.Snapshot().Where(a => TelemetryTestAgent.AgentName.Equals(a.GetTagItem("gen_ai.agent.name"))).ToList()); Assert.Contains("invoke_agent", mySpan.DisplayName, StringComparison.Ordinal); Assert.Contains(TelemetryTestAgent.AgentName, mySpan.DisplayName, StringComparison.Ordinal); } private static (CreateResponse request, ResponseContext context) BuildRequest(string? agentKey = null) { var request = agentKey is null ? new CreateResponse { Model = "test" } : new CreateResponse { Model = "test", AgentReference = new AgentReference(agentKey) }; request.Input = BinaryData.FromObjectAsJson(new[] { new { type = "message", id = "msg_1", status = "completed", role = "user", content = new[] { new { type = "input_text", text = "Hello" } } } }); var mockContext = new Mock("resp_" + new string('0', 46)) { CallBase = true }; mockContext.Setup(x => x.GetHistoryAsync(It.IsAny())) .ReturnsAsync([]); mockContext.Setup(x => x.GetInputItemsAsync(It.IsAny(), It.IsAny())) .ReturnsAsync([]); return (request, mockContext.Object); } private sealed class TelemetryTestAgent : AIAgent { public const string AgentName = "TelemetryTestAgent"; public override string? Name => AgentName; protected override IAsyncEnumerable RunCoreStreamingAsync( IEnumerable messages, AgentSession? session, AgentRunOptions? options, CancellationToken cancellationToken = default) => SingleUpdateAsync(new AgentResponseUpdate { MessageId = "resp_msg_1", Contents = [new MeaiTextContent("telemetry test response")] }, cancellationToken); protected override Task RunCoreAsync( IEnumerable messages, AgentSession? session, AgentRunOptions? options, CancellationToken cancellationToken = default) => throw new NotImplementedException(); protected override ValueTask CreateSessionCoreAsync( CancellationToken cancellationToken = default) => new(new TelemetryAgentSession()); protected override ValueTask SerializeSessionCoreAsync( AgentSession session, JsonSerializerOptions? jsonSerializerOptions, CancellationToken cancellationToken = default) => new(JsonDocument.Parse("{}").RootElement); protected override ValueTask DeserializeSessionCoreAsync( JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions, CancellationToken cancellationToken = default) => new(new TelemetryAgentSession()); private static async IAsyncEnumerable SingleUpdateAsync( AgentResponseUpdate update, [EnumeratorCancellation] CancellationToken cancellationToken = default) { await Task.Yield(); yield return update; } } private sealed class TelemetryAgentSession : AgentSession; /// /// Thread-safe used by OTel's InMemoryExporter to capture /// activities emitted on globally-listened sources. Required because the exporter writes into /// the supplied collection from background Activity completion callbacks while the test thread /// may be enumerating it for assertions, and other tests in the same assembly may emit on the /// same source concurrently. A plain trips /// "Collection was modified; enumeration operation may not execute." in that scenario. /// private sealed class ConcurrentActivityList : ICollection { private readonly List _items = new(); private readonly object _gate = new(); public int Count { get { lock (this._gate) { return this._items.Count; } } } public bool IsReadOnly => false; public void Add(Activity item) { lock (this._gate) { this._items.Add(item); } } public void Clear() { lock (this._gate) { this._items.Clear(); } } public bool Contains(Activity item) { lock (this._gate) { return this._items.Contains(item); } } public void CopyTo(Activity[] array, int arrayIndex) { lock (this._gate) { this._items.CopyTo(array, arrayIndex); } } public bool Remove(Activity item) { lock (this._gate) { return this._items.Remove(item); } } public Activity[] Snapshot() { lock (this._gate) { return this._items.ToArray(); } } public IEnumerator GetEnumerator() => ((IEnumerable)this.Snapshot()).GetEnumerator(); System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => this.GetEnumerator(); } }