From 0d4d7abde166f46a38c752d229cca3234f2f7229 Mon Sep 17 00:00:00 2001 From: Aditya Mandaleeka Date: Wed, 6 Aug 2025 16:10:34 -0700 Subject: [PATCH] .NET: Avoid double responses when streaming (#365) * Avoid double responses when streaming. * Fix tests. --- .../AgentProxy.cs | 7 - .../AgentProxyTests.cs | 131 ++++++++++++++---- 2 files changed, 106 insertions(+), 32 deletions(-) diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs index e2ea2d0453..351d6e8e6b 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs @@ -105,13 +105,6 @@ public sealed class AgentProxy : AIAgent if (update.Status is RequestStatus.Completed) { - var responseTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponse)); - var runResponse = (AgentRunResponse)update.Data.Deserialize(responseTypeInfo)!; - foreach (var item in runResponse.ToAgentRunResponseUpdates()) - { - yield return item; - } - yield break; } diff --git a/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentProxyTests.cs b/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentProxyTests.cs index 17b3274e1d..995d964c2b 100644 --- a/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentProxyTests.cs +++ b/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentProxyTests.cs @@ -264,39 +264,20 @@ public class AgentProxyTests } /// - /// Verifies that RunStreamingAsync yields AgentRunResponseUpdate for non-failed statuses. - /// This test uses a mock IActorClient to return an ActorResponseHandle that yields a single update with given status. - /// Expected: The method yields the deserialized update. + /// Verifies that RunStreamingAsync yields AgentRunResponseUpdate for pending status. /// - [Theory] - [InlineData(RequestStatus.Completed)] - [InlineData(RequestStatus.Pending)] - public async Task RunStreamingAsync_NonFailedStatus_YieldsAgentRunResponseUpdateAsync(RequestStatus status) + [Fact] + public async Task RunStreamingAsync_PendingStatus_YieldsAgentRunResponseUpdateAsync() { // Arrange var messages = Array.Empty(); var threadId = "thread1"; var expectedUpdate = new AgentRunResponseUpdate(ChatRole.Assistant, "response"); - JsonElement jsonElement; - if (status == RequestStatus.Completed) - { - // For Completed status, the implementation expects AgentRunResponse - var agentRunResponse = new AgentRunResponse - { - Messages = new List { new(ChatRole.Assistant, "response") } - }; - var responseTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponse)); - jsonElement = JsonSerializer.SerializeToElement(agentRunResponse, responseTypeInfo); - } - else - { - // For Pending status, the implementation expects AgentRunResponseUpdate - var updateTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponseUpdate)); - jsonElement = JsonSerializer.SerializeToElement(expectedUpdate, updateTypeInfo); - } + var updateTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponseUpdate)); + var jsonElement = JsonSerializer.SerializeToElement(expectedUpdate, updateTypeInfo); - var actorUpdate = new ActorRequestUpdate(status, jsonElement); + var actorUpdate = new ActorRequestUpdate(RequestStatus.Pending, jsonElement); var mockHandle = new Mock(); mockHandle .Setup(h => h.WatchUpdatesAsync(It.IsAny())) @@ -322,12 +303,112 @@ public class AgentProxyTests Assert.Equal(expectedUpdate.Role, results[0].Role); } + /// + /// Verifies that RunStreamingAsync completes without yielding any updates when receiving only a completed status. + /// + [Fact] + public async Task RunStreamingAsync_CompletedStatus_YieldsNoUpdatesAsync() + { + // Arrange + var messages = Array.Empty(); + var threadId = "thread1"; + + var agentRunResponse = new AgentRunResponse + { + Messages = [new(ChatRole.Assistant, "response")] + }; + var responseTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponse)); + var jsonElement = JsonSerializer.SerializeToElement(agentRunResponse, responseTypeInfo); + + var actorUpdate = new ActorRequestUpdate(RequestStatus.Completed, jsonElement); + var mockHandle = new Mock(); + mockHandle + .Setup(h => h.WatchUpdatesAsync(It.IsAny())) + .Returns(GetAsyncEnumerableAsync(actorUpdate)); + var mockClient = new Mock(); + mockClient + .Setup(c => c.SendRequestAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(mockHandle.Object); + + var proxy = new AgentProxy("agentName", mockClient.Object); + var thread = proxy.GetThread(threadId); + + // Act + var results = new List(); + await foreach (var update in proxy.RunStreamingAsync(messages, thread)) + { + results.Add(update); + } + + // Assert + Assert.Empty(results); // Completed status should not yield any updates + } + + /// + /// Verifies that RunStreamingAsync does not yield duplicate content when receiving both + /// streaming updates and a completed message containing the same content. + /// + [Fact] + public async Task RunStreamingAsync_CompletedAfterUpdates_DoesNotYieldDuplicateContentAsync() + { + // Arrange: Create a scenario with streaming updates followed by completion + var messages = Array.Empty(); + var threadId = "thread1"; + + var pendingUpdate = new AgentRunResponseUpdate(ChatRole.Assistant, "streaming response"); + var completedResponse = new AgentRunResponse + { + Messages = new List { new(ChatRole.Assistant, "streaming response") } + }; + + var updates = new List + { + new(RequestStatus.Pending, JsonSerializer.SerializeToElement(pendingUpdate, + AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponseUpdate)))), + new(RequestStatus.Completed, JsonSerializer.SerializeToElement(completedResponse, + AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponse)))) + }; + + var mockHandle = new Mock(); + mockHandle + .Setup(h => h.WatchUpdatesAsync(It.IsAny())) + .Returns(GetAsyncEnumerableAsync(updates)); + var mockClient = new Mock(); + mockClient + .Setup(c => c.SendRequestAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(mockHandle.Object); + + var proxy = new AgentProxy("agentName", mockClient.Object); + var thread = proxy.GetThread(threadId); + + // Act + var results = new List(); + await foreach (var update in proxy.RunStreamingAsync(messages, thread)) + { + results.Add(update); + } + + // Assert: Should only get the pending update, not duplicate content from completion + Assert.Single(results); + Assert.Equal("streaming response", results[0].Text); + Assert.Equal(ChatRole.Assistant, results[0].Role); + } + private static async IAsyncEnumerable GetAsyncEnumerableAsync(ActorRequestUpdate update) { yield return update; await Task.CompletedTask; } + private static async IAsyncEnumerable GetAsyncEnumerableAsync(List updates) + { + foreach (var update in updates) + { + yield return update; + await Task.CompletedTask; + } + } + /// /// Verifies that RunStreamingAsync throws InvalidOperationException when an update status is Failed. /// Uses a mock IActorClient to return a Failed update. Expected: InvalidOperationException is thrown.