mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
281661e409
* fix: Remove Timeout from InputWait in StreamingRunEventStream * fix: Race condition when the workflow executes to halt before TakeEventStream * test: Make the OffThread Delay test more nimble * fix: Remove slight window where runStatus could be stale
247 lines
11 KiB
C#
247 lines
11 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Runtime.CompilerServices;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using FluentAssertions;
|
|
using Microsoft.Extensions.AI;
|
|
|
|
namespace Microsoft.Agents.AI.Workflows.UnitTests;
|
|
|
|
/// <summary>
|
|
/// Tests for InProcessExecution to verify streaming and non-streaming execution behavior.
|
|
/// </summary>
|
|
public class InProcessExecutionTests
|
|
{
|
|
/// <summary>
|
|
/// The non-streaming version (RunAsync) should execute the workflow and produce events,
|
|
/// similar to the streaming version (StreamAsync + TrySendMessageAsync).
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task RunAsyncShouldExecuteWorkflowAsync()
|
|
{
|
|
// Arrange: Create a simple agent that responds to messages
|
|
var agent = new SimpleTestAgent("test-agent");
|
|
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
|
|
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
|
|
|
|
// Act: Execute using non-streaming RunAsync
|
|
Run run = await InProcessExecution.RunAsync(workflow, new List<ChatMessage> { inputMessage });
|
|
|
|
// Assert: The workflow should have executed and produced events
|
|
RunStatus status = await run.GetStatusAsync();
|
|
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
|
|
|
|
// The run should have events (at minimum, a WorkflowOutputEvent)
|
|
run.OutgoingEvents.Should().NotBeEmpty("workflow should produce events during execution");
|
|
|
|
// Check that we have an agent execution event
|
|
var agentEvents = run.OutgoingEvents.OfType<AgentResponseUpdateEvent>().ToList();
|
|
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
|
|
|
|
// Check that we have output events
|
|
var outputEvents = run.OutgoingEvents.OfType<WorkflowOutputEvent>().ToList();
|
|
outputEvents.Should().NotBeEmpty("workflow should produce output events");
|
|
}
|
|
|
|
/// <summary>
|
|
/// This test shows that the streaming version works correctly when TurnToken is sent following a message.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task StreamAsyncWithTurnTokenShouldExecuteWorkflowAsync()
|
|
{
|
|
// Arrange: Create a simple agent that responds to messages
|
|
var agent = new SimpleTestAgent("test-agent");
|
|
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
|
|
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
|
|
|
|
// Act: Execute using streaming version with TurnToken
|
|
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
|
|
|
|
// Send TurnToken to actually trigger execution (this is the key step)
|
|
bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
|
|
messageSent.Should().BeTrue("TurnToken should be accepted");
|
|
|
|
// Collect events
|
|
List<WorkflowEvent> events = [];
|
|
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
|
|
{
|
|
events.Add(evt);
|
|
}
|
|
|
|
// Assert: The workflow should have executed and produced events
|
|
RunStatus status = await run.GetStatusAsync();
|
|
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
|
|
|
|
events.Should().NotBeEmpty("workflow should produce events during execution");
|
|
|
|
// Check that we have agent execution events
|
|
var agentEvents = events.OfType<AgentResponseUpdateEvent>().ToList();
|
|
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
|
|
|
|
// Check that we have output events
|
|
var outputEvents = events.OfType<WorkflowOutputEvent>().ToList();
|
|
outputEvents.Should().NotBeEmpty("workflow should produce output events");
|
|
}
|
|
|
|
/// <summary>
|
|
/// This test compares the behavior of RunAsync vs StreamAsync to highlight the difference.
|
|
/// Both should produce similar results, but as of issue #1315, RunAsync fails to execute.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task RunAsyncAndStreamAsyncShouldProduceSimilarResultsAsync()
|
|
{
|
|
// Arrange: Create the same workflow for both tests
|
|
var agent1 = new SimpleTestAgent("test-agent-1");
|
|
var workflow1 = AgentWorkflowBuilder.BuildSequential(agent1);
|
|
|
|
var agent2 = new SimpleTestAgent("test-agent-2");
|
|
var workflow2 = AgentWorkflowBuilder.BuildSequential(agent2);
|
|
|
|
var inputMessage = new ChatMessage(ChatRole.User, "Test message");
|
|
|
|
// Act 1: Execute using RunAsync (non-streaming)
|
|
Run nonStreamingRun = await InProcessExecution.RunAsync(workflow1, new List<ChatMessage> { inputMessage });
|
|
var nonStreamingEvents = nonStreamingRun.OutgoingEvents.ToList();
|
|
|
|
// Act 2: Execute using StreamAsync (streaming) with TurnToken
|
|
await using StreamingRun streamingRun = await InProcessExecution.RunStreamingAsync(workflow2, new List<ChatMessage> { inputMessage });
|
|
await streamingRun.TrySendMessageAsync(new TurnToken(emitEvents: true));
|
|
|
|
List<WorkflowEvent> streamingEvents = [];
|
|
await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
|
|
{
|
|
streamingEvents.Add(evt);
|
|
}
|
|
|
|
// Assert: Both should have produced events
|
|
// The streaming version works (we know this from the issue report)
|
|
streamingEvents.Should().NotBeEmpty("streaming version should produce events");
|
|
|
|
// The non-streaming version should also produce events (this is the bug being tested)
|
|
nonStreamingEvents.Should().NotBeEmpty("non-streaming version should also produce events");
|
|
|
|
// Both should have similar types of events
|
|
var streamingAgentEvents = streamingEvents.OfType<AgentResponseUpdateEvent>().Count();
|
|
var nonStreamingAgentEvents = nonStreamingEvents.OfType<AgentResponseUpdateEvent>().Count();
|
|
|
|
nonStreamingAgentEvents.Should().Be(streamingAgentEvents,
|
|
"both versions should produce the same number of agent events");
|
|
}
|
|
|
|
/// <summary>
|
|
/// This test checks that the logic around waiting for input and halting appropriately works right when the
|
|
/// workflow runs to halting before the EventStream is watched by the user.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task RunStreamingAsyncWaitToTakeStreamAsync()
|
|
{
|
|
// Arrange: Create a simple agent that responds to messages
|
|
var agent = new SimpleTestAgent("test-agent");
|
|
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
|
|
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
|
|
|
|
// Act: Execute using streaming version with TurnToken
|
|
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
|
|
|
|
// Send TurnToken to actually trigger execution (this is the key step)
|
|
bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
|
|
messageSent.Should().BeTrue("TurnToken should be accepted");
|
|
|
|
while (await run.GetStatusAsync() != RunStatus.Idle)
|
|
{
|
|
await Task.Delay(200);
|
|
}
|
|
|
|
// Collect events
|
|
List<WorkflowEvent> events = [];
|
|
|
|
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
|
|
{
|
|
events.Add(evt);
|
|
}
|
|
|
|
// Assert: The workflow should have executed and produced events
|
|
RunStatus status = await run.GetStatusAsync();
|
|
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
|
|
|
|
events.Should().NotBeEmpty("workflow should produce events during execution");
|
|
|
|
// Check that we have agent execution events
|
|
var agentEvents = events.OfType<AgentResponseUpdateEvent>().ToList();
|
|
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
|
|
|
|
// Check that we have output events
|
|
var outputEvents = events.OfType<WorkflowOutputEvent>().ToList();
|
|
outputEvents.Should().NotBeEmpty("workflow should produce output events");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Simple test agent that echoes back the input message.
|
|
/// </summary>
|
|
private sealed class SimpleTestAgent : AIAgent
|
|
{
|
|
public SimpleTestAgent(string name)
|
|
{
|
|
this.Name = name;
|
|
}
|
|
|
|
public override string Name { get; }
|
|
|
|
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default) => new(new SimpleTestAgentSession());
|
|
|
|
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(System.Text.Json.JsonElement serializedState,
|
|
System.Text.Json.JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) => new(new SimpleTestAgentSession());
|
|
|
|
protected override ValueTask<System.Text.Json.JsonElement> SerializeSessionCoreAsync(AgentSession session, System.Text.Json.JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
|
|
=> default;
|
|
|
|
protected override Task<AgentResponse> RunCoreAsync(
|
|
IEnumerable<ChatMessage> messages,
|
|
AgentSession? session = null,
|
|
AgentRunOptions? options = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var lastMessage = messages.LastOrDefault();
|
|
var responseMessage = new ChatMessage(ChatRole.Assistant, $"Echo: {lastMessage?.Text ?? "no message"}");
|
|
return Task.FromResult(new AgentResponse(responseMessage));
|
|
}
|
|
|
|
protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(
|
|
IEnumerable<ChatMessage> messages,
|
|
AgentSession? session = null,
|
|
AgentRunOptions? options = null,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
|
{
|
|
await Task.Yield();
|
|
|
|
var lastMessage = messages.LastOrDefault();
|
|
var responseText = $"Echo: {lastMessage?.Text ?? "no message"}";
|
|
|
|
string messageId = Guid.NewGuid().ToString("N");
|
|
|
|
// Yield role first
|
|
yield return new AgentResponseUpdate(ChatRole.Assistant, this.Name)
|
|
{
|
|
AuthorName = this.Name,
|
|
MessageId = messageId
|
|
};
|
|
|
|
// Then yield content
|
|
yield return new AgentResponseUpdate(ChatRole.Assistant, responseText)
|
|
{
|
|
AuthorName = this.Name,
|
|
MessageId = messageId
|
|
};
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Simple session implementation for SimpleTestAgent.
|
|
/// </summary>
|
|
private sealed class SimpleTestAgentSession : AgentSession;
|
|
}
|