Files
Jacob Alber 0086d38f58 .NET: [BREAKING] Workflows API Review Naming Changes (Part 1?) (#4090)
* refactor: Normalize Run/RunStreaming with AIAgent

* refactor: Clarify Session vs. Run -level concepts

* Rename RunId to SessionId to better match Run/Session terminology in AIAgent
* [BREAKING]: Will break existing checkpointed sessions in CosmosDb due to field rename

* refactor: Rename and simplify interface around getting typed data out of ExternalRequest/Response

* Also adds hints around using value types in PortableValue

* refactor: Rename AddFanInEdge to AddFanInBarrierEdge

This will prevent a breaking change later when we introduce a programmable FanIn edge, analogous to the FanOut edge's EdgeSelector.

The goal, in the long run is to support a number of different FanIn scenarios, with naive FanIn (no barrier) by default, similar to FanOut.

* refactor: AsAgent(this Workflow, ...) => AsAIAgent(...)

* misc - part1: SwitchBuilder internal

---------

Co-authored-by: Dmytro Struk <13853051+dmytrostruk@users.noreply.github.com>
2026-02-20 02:05:18 +00:00

200 lines
8.6 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.AI;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
/// <summary>
/// Tests for InProcessExecution to verify streaming and non-streaming execution behavior.
/// </summary>
public class InProcessExecutionTests
{
/// <summary>
/// The non-streaming version (RunAsync) should execute the workflow and produce events,
/// similar to the streaming version (StreamAsync + TrySendMessageAsync).
/// </summary>
[Fact]
public async Task RunAsyncShouldExecuteWorkflowAsync()
{
// Arrange: Create a simple agent that responds to messages
var agent = new SimpleTestAgent("test-agent");
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
// Act: Execute using non-streaming RunAsync
Run run = await InProcessExecution.RunAsync(workflow, new List<ChatMessage> { inputMessage });
// Assert: The workflow should have executed and produced events
RunStatus status = await run.GetStatusAsync();
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
// The run should have events (at minimum, a WorkflowOutputEvent)
run.OutgoingEvents.Should().NotBeEmpty("workflow should produce events during execution");
// Check that we have an agent execution event
var agentEvents = run.OutgoingEvents.OfType<AgentResponseUpdateEvent>().ToList();
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
// Check that we have output events
var outputEvents = run.OutgoingEvents.OfType<WorkflowOutputEvent>().ToList();
outputEvents.Should().NotBeEmpty("workflow should produce output events");
}
/// <summary>
/// This test shows that the streaming version works correctly when TurnToken is sent following a message.
/// </summary>
[Fact]
public async Task StreamAsyncWithTurnTokenShouldExecuteWorkflowAsync()
{
// Arrange: Create a simple agent that responds to messages
var agent = new SimpleTestAgent("test-agent");
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
// Act: Execute using streaming version with TurnToken
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
// Send TurnToken to actually trigger execution (this is the key step)
bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
messageSent.Should().BeTrue("TurnToken should be accepted");
// Collect events
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert: The workflow should have executed and produced events
RunStatus status = await run.GetStatusAsync();
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
events.Should().NotBeEmpty("workflow should produce events during execution");
// Check that we have agent execution events
var agentEvents = events.OfType<AgentResponseUpdateEvent>().ToList();
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
// Check that we have output events
var outputEvents = events.OfType<WorkflowOutputEvent>().ToList();
outputEvents.Should().NotBeEmpty("workflow should produce output events");
}
/// <summary>
/// This test compares the behavior of RunAsync vs StreamAsync to highlight the difference.
/// Both should produce similar results, but as of issue #1315, RunAsync fails to execute.
/// </summary>
[Fact]
public async Task RunAsyncAndStreamAsyncShouldProduceSimilarResultsAsync()
{
// Arrange: Create the same workflow for both tests
var agent1 = new SimpleTestAgent("test-agent-1");
var workflow1 = AgentWorkflowBuilder.BuildSequential(agent1);
var agent2 = new SimpleTestAgent("test-agent-2");
var workflow2 = AgentWorkflowBuilder.BuildSequential(agent2);
var inputMessage = new ChatMessage(ChatRole.User, "Test message");
// Act 1: Execute using RunAsync (non-streaming)
Run nonStreamingRun = await InProcessExecution.RunAsync(workflow1, new List<ChatMessage> { inputMessage });
var nonStreamingEvents = nonStreamingRun.OutgoingEvents.ToList();
// Act 2: Execute using StreamAsync (streaming) with TurnToken
await using StreamingRun streamingRun = await InProcessExecution.RunStreamingAsync(workflow2, new List<ChatMessage> { inputMessage });
await streamingRun.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<WorkflowEvent> streamingEvents = [];
await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
{
streamingEvents.Add(evt);
}
// Assert: Both should have produced events
// The streaming version works (we know this from the issue report)
streamingEvents.Should().NotBeEmpty("streaming version should produce events");
// The non-streaming version should also produce events (this is the bug being tested)
nonStreamingEvents.Should().NotBeEmpty("non-streaming version should also produce events");
// Both should have similar types of events
var streamingAgentEvents = streamingEvents.OfType<AgentResponseUpdateEvent>().Count();
var nonStreamingAgentEvents = nonStreamingEvents.OfType<AgentResponseUpdateEvent>().Count();
nonStreamingAgentEvents.Should().Be(streamingAgentEvents,
"both versions should produce the same number of agent events");
}
/// <summary>
/// Simple test agent that echoes back the input message.
/// </summary>
private sealed class SimpleTestAgent : AIAgent
{
public SimpleTestAgent(string name)
{
this.Name = name;
}
public override string Name { get; }
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default) => new(new SimpleTestAgentSession());
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(System.Text.Json.JsonElement serializedState,
System.Text.Json.JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) => new(new SimpleTestAgentSession());
protected override ValueTask<System.Text.Json.JsonElement> SerializeSessionCoreAsync(AgentSession session, System.Text.Json.JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> default;
protected override Task<AgentResponse> RunCoreAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
var lastMessage = messages.LastOrDefault();
var responseMessage = new ChatMessage(ChatRole.Assistant, $"Echo: {lastMessage?.Text ?? "no message"}");
return Task.FromResult(new AgentResponse(responseMessage));
}
protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await Task.Yield();
var lastMessage = messages.LastOrDefault();
var responseText = $"Echo: {lastMessage?.Text ?? "no message"}";
string messageId = Guid.NewGuid().ToString("N");
// Yield role first
yield return new AgentResponseUpdate(ChatRole.Assistant, this.Name)
{
AuthorName = this.Name,
MessageId = messageId
};
// Then yield content
yield return new AgentResponseUpdate(ChatRole.Assistant, responseText)
{
AuthorName = this.Name,
MessageId = messageId
};
}
}
/// <summary>
/// Simple session implementation for SimpleTestAgent.
/// </summary>
private sealed class SimpleTestAgentSession : AgentSession;
}