.NET: Avoid double responses when streaming (#365)

* Avoid double responses when streaming.

* Fix tests.
This commit is contained in:
Aditya Mandaleeka
2025-08-06 16:10:34 -07:00
committed by GitHub
Unverified
parent e7441ee29e
commit 0d4d7abde1
2 changed files with 106 additions and 32 deletions
@@ -105,13 +105,6 @@ public sealed class AgentProxy : AIAgent
if (update.Status is RequestStatus.Completed)
{
var responseTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponse));
var runResponse = (AgentRunResponse)update.Data.Deserialize(responseTypeInfo)!;
foreach (var item in runResponse.ToAgentRunResponseUpdates())
{
yield return item;
}
yield break;
}
@@ -264,39 +264,20 @@ public class AgentProxyTests
}
/// <summary>
/// Verifies that RunStreamingAsync yields AgentRunResponseUpdate for non-failed statuses.
/// This test uses a mock IActorClient to return an ActorResponseHandle that yields a single update with given status.
/// Expected: The method yields the deserialized update.
/// Verifies that RunStreamingAsync yields AgentRunResponseUpdate for pending status.
/// </summary>
[Theory]
[InlineData(RequestStatus.Completed)]
[InlineData(RequestStatus.Pending)]
public async Task RunStreamingAsync_NonFailedStatus_YieldsAgentRunResponseUpdateAsync(RequestStatus status)
[Fact]
public async Task RunStreamingAsync_PendingStatus_YieldsAgentRunResponseUpdateAsync()
{
// Arrange
var messages = Array.Empty<ChatMessage>();
var threadId = "thread1";
var expectedUpdate = new AgentRunResponseUpdate(ChatRole.Assistant, "response");
JsonElement jsonElement;
if (status == RequestStatus.Completed)
{
// For Completed status, the implementation expects AgentRunResponse
var agentRunResponse = new AgentRunResponse
{
Messages = new List<ChatMessage> { new(ChatRole.Assistant, "response") }
};
var responseTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponse));
jsonElement = JsonSerializer.SerializeToElement(agentRunResponse, responseTypeInfo);
}
else
{
// For Pending status, the implementation expects AgentRunResponseUpdate
var updateTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponseUpdate));
jsonElement = JsonSerializer.SerializeToElement(expectedUpdate, updateTypeInfo);
}
var updateTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponseUpdate));
var jsonElement = JsonSerializer.SerializeToElement(expectedUpdate, updateTypeInfo);
var actorUpdate = new ActorRequestUpdate(status, jsonElement);
var actorUpdate = new ActorRequestUpdate(RequestStatus.Pending, jsonElement);
var mockHandle = new Mock<ActorResponseHandle>();
mockHandle
.Setup(h => h.WatchUpdatesAsync(It.IsAny<CancellationToken>()))
@@ -322,12 +303,112 @@ public class AgentProxyTests
Assert.Equal(expectedUpdate.Role, results[0].Role);
}
/// <summary>
/// Verifies that RunStreamingAsync completes without yielding any updates when receiving only a completed status.
/// </summary>
[Fact]
public async Task RunStreamingAsync_CompletedStatus_YieldsNoUpdatesAsync()
{
// Arrange
var messages = Array.Empty<ChatMessage>();
var threadId = "thread1";
var agentRunResponse = new AgentRunResponse
{
Messages = [new(ChatRole.Assistant, "response")]
};
var responseTypeInfo = AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponse));
var jsonElement = JsonSerializer.SerializeToElement(agentRunResponse, responseTypeInfo);
var actorUpdate = new ActorRequestUpdate(RequestStatus.Completed, jsonElement);
var mockHandle = new Mock<ActorResponseHandle>();
mockHandle
.Setup(h => h.WatchUpdatesAsync(It.IsAny<CancellationToken>()))
.Returns(GetAsyncEnumerableAsync(actorUpdate));
var mockClient = new Mock<IActorClient>();
mockClient
.Setup(c => c.SendRequestAsync(It.IsAny<ActorRequest>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(mockHandle.Object);
var proxy = new AgentProxy("agentName", mockClient.Object);
var thread = proxy.GetThread(threadId);
// Act
var results = new List<AgentRunResponseUpdate>();
await foreach (var update in proxy.RunStreamingAsync(messages, thread))
{
results.Add(update);
}
// Assert
Assert.Empty(results); // Completed status should not yield any updates
}
/// <summary>
/// Verifies that RunStreamingAsync does not yield duplicate content when receiving both
/// streaming updates and a completed message containing the same content.
/// </summary>
[Fact]
public async Task RunStreamingAsync_CompletedAfterUpdates_DoesNotYieldDuplicateContentAsync()
{
// Arrange: Create a scenario with streaming updates followed by completion
var messages = Array.Empty<ChatMessage>();
var threadId = "thread1";
var pendingUpdate = new AgentRunResponseUpdate(ChatRole.Assistant, "streaming response");
var completedResponse = new AgentRunResponse
{
Messages = new List<ChatMessage> { new(ChatRole.Assistant, "streaming response") }
};
var updates = new List<ActorRequestUpdate>
{
new(RequestStatus.Pending, JsonSerializer.SerializeToElement(pendingUpdate,
AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponseUpdate)))),
new(RequestStatus.Completed, JsonSerializer.SerializeToElement(completedResponse,
AgentAbstractionsJsonUtilities.DefaultOptions.GetTypeInfo(typeof(AgentRunResponse))))
};
var mockHandle = new Mock<ActorResponseHandle>();
mockHandle
.Setup(h => h.WatchUpdatesAsync(It.IsAny<CancellationToken>()))
.Returns(GetAsyncEnumerableAsync(updates));
var mockClient = new Mock<IActorClient>();
mockClient
.Setup(c => c.SendRequestAsync(It.IsAny<ActorRequest>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(mockHandle.Object);
var proxy = new AgentProxy("agentName", mockClient.Object);
var thread = proxy.GetThread(threadId);
// Act
var results = new List<AgentRunResponseUpdate>();
await foreach (var update in proxy.RunStreamingAsync(messages, thread))
{
results.Add(update);
}
// Assert: Should only get the pending update, not duplicate content from completion
Assert.Single(results);
Assert.Equal("streaming response", results[0].Text);
Assert.Equal(ChatRole.Assistant, results[0].Role);
}
private static async IAsyncEnumerable<ActorRequestUpdate> GetAsyncEnumerableAsync(ActorRequestUpdate update)
{
yield return update;
await Task.CompletedTask;
}
private static async IAsyncEnumerable<ActorRequestUpdate> GetAsyncEnumerableAsync(List<ActorRequestUpdate> updates)
{
foreach (var update in updates)
{
yield return update;
await Task.CompletedTask;
}
}
/// <summary>
/// Verifies that RunStreamingAsync throws InvalidOperationException when an update status is Failed.
/// Uses a mock IActorClient to return a Failed update. Expected: InvalidOperationException is thrown.