// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.AI;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
///
/// Tests for InProcessExecution to verify streaming and non-streaming execution behavior.
///
public class InProcessExecutionTests
{
///
/// The non-streaming version (RunAsync) should execute the workflow and produce events,
/// similar to the streaming version (StreamAsync + TrySendMessageAsync).
///
[Fact]
public async Task RunAsyncShouldExecuteWorkflowAsync()
{
// Arrange: Create a simple agent that responds to messages
var agent = new SimpleTestAgent("test-agent");
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
// Act: Execute using non-streaming RunAsync
Run run = await InProcessExecution.RunAsync(workflow, new List { inputMessage });
// Assert: The workflow should have executed and produced events
RunStatus status = await run.GetStatusAsync();
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
// The run should have events (at minimum, a WorkflowOutputEvent)
run.OutgoingEvents.Should().NotBeEmpty("workflow should produce events during execution");
// Check that we have an agent execution event
var agentEvents = run.OutgoingEvents.OfType().ToList();
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
// Check that we have output events
var outputEvents = run.OutgoingEvents.OfType().ToList();
outputEvents.Should().NotBeEmpty("workflow should produce output events");
}
///
/// This test shows that the streaming version works correctly when TurnToken is sent following a message.
///
[Fact]
public async Task StreamAsyncWithTurnTokenShouldExecuteWorkflowAsync()
{
// Arrange: Create a simple agent that responds to messages
var agent = new SimpleTestAgent("test-agent");
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
// Act: Execute using streaming version with TurnToken
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List { inputMessage });
// Send TurnToken to actually trigger execution (this is the key step)
bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
messageSent.Should().BeTrue("TurnToken should be accepted");
// Collect events
List events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert: The workflow should have executed and produced events
RunStatus status = await run.GetStatusAsync();
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
events.Should().NotBeEmpty("workflow should produce events during execution");
// Check that we have agent execution events
var agentEvents = events.OfType().ToList();
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
// Check that we have output events
var outputEvents = events.OfType().ToList();
outputEvents.Should().NotBeEmpty("workflow should produce output events");
}
///
/// This test compares the behavior of RunAsync vs StreamAsync to highlight the difference.
/// Both should produce similar results, but as of issue #1315, RunAsync fails to execute.
///
[Fact]
public async Task RunAsyncAndStreamAsyncShouldProduceSimilarResultsAsync()
{
// Arrange: Create the same workflow for both tests
var agent1 = new SimpleTestAgent("test-agent-1");
var workflow1 = AgentWorkflowBuilder.BuildSequential(agent1);
var agent2 = new SimpleTestAgent("test-agent-2");
var workflow2 = AgentWorkflowBuilder.BuildSequential(agent2);
var inputMessage = new ChatMessage(ChatRole.User, "Test message");
// Act 1: Execute using RunAsync (non-streaming)
Run nonStreamingRun = await InProcessExecution.RunAsync(workflow1, new List { inputMessage });
var nonStreamingEvents = nonStreamingRun.OutgoingEvents.ToList();
// Act 2: Execute using StreamAsync (streaming) with TurnToken
await using StreamingRun streamingRun = await InProcessExecution.RunStreamingAsync(workflow2, new List { inputMessage });
await streamingRun.TrySendMessageAsync(new TurnToken(emitEvents: true));
List streamingEvents = [];
await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
{
streamingEvents.Add(evt);
}
// Assert: Both should have produced events
// The streaming version works (we know this from the issue report)
streamingEvents.Should().NotBeEmpty("streaming version should produce events");
// The non-streaming version should also produce events (this is the bug being tested)
nonStreamingEvents.Should().NotBeEmpty("non-streaming version should also produce events");
// Both should have similar types of events
var streamingAgentEvents = streamingEvents.OfType().Count();
var nonStreamingAgentEvents = nonStreamingEvents.OfType().Count();
nonStreamingAgentEvents.Should().Be(streamingAgentEvents,
"both versions should produce the same number of agent events");
}
///
/// Simple test agent that echoes back the input message.
///
private sealed class SimpleTestAgent : AIAgent
{
public SimpleTestAgent(string name)
{
this.Name = name;
}
public override string Name { get; }
protected override ValueTask CreateSessionCoreAsync(CancellationToken cancellationToken = default) => new(new SimpleTestAgentSession());
protected override ValueTask DeserializeSessionCoreAsync(System.Text.Json.JsonElement serializedState,
System.Text.Json.JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) => new(new SimpleTestAgentSession());
protected override ValueTask SerializeSessionCoreAsync(AgentSession session, System.Text.Json.JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> default;
protected override Task RunCoreAsync(
IEnumerable messages,
AgentSession? session = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
var lastMessage = messages.LastOrDefault();
var responseMessage = new ChatMessage(ChatRole.Assistant, $"Echo: {lastMessage?.Text ?? "no message"}");
return Task.FromResult(new AgentResponse(responseMessage));
}
protected override async IAsyncEnumerable RunCoreStreamingAsync(
IEnumerable messages,
AgentSession? session = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await Task.Yield();
var lastMessage = messages.LastOrDefault();
var responseText = $"Echo: {lastMessage?.Text ?? "no message"}";
string messageId = Guid.NewGuid().ToString("N");
// Yield role first
yield return new AgentResponseUpdate(ChatRole.Assistant, this.Name)
{
AuthorName = this.Name,
MessageId = messageId
};
// Then yield content
yield return new AgentResponseUpdate(ChatRole.Assistant, responseText)
{
AuthorName = this.Name,
MessageId = messageId
};
}
}
///
/// Simple session implementation for SimpleTestAgent.
///
private sealed class SimpleTestAgentSession : AgentSession;
}