Files
agent-framework/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessExecutionTests.cs
westey 3168eb4870 .NET: [BREAKING] Add session StateBag for state storage and support multiple providers on the Agent (#3806)
* .NET: [BREAKING] Add session statebag to use for state storage instead of inside providers (#3737)

* Add a StateBag to AgentSession and pass Agent and AgentSession to AIContextProvider and ChatHistoryProviders

* Convert all AIContextProviders to use the statebag

* Update InMemoryChatHistoryProvider to use StateBag

* Update Comsos and Workflow ChatHistoryProviders

* Update 3rd party chat history storage sample.

* Remove serialize method from providers

* Replacing provider factories with properties

* Remove Providers from Session and flatten state bag serialization

* Update samples to use getservice on agent

* Updated additional session types to serialize statebag

* Fix regression

* Address PR comments

* Address PR comments.

* Fix formatting

* Fix unit tests

* Remove InMemoryAgentSession since it is not required anymore.

* Address PR comments

* Convert sessions for A2AAgent, ChatClientAgent, CopilotStudioAgent and GithubCopilotAgent to use regular json serialization.

* Fix durable agent session jso usgae

* Add jso to InMemory and Workflow ChatHistoryProviders

* Update InMemoryChatHistoryProvider to use an options class for it's many optional settings.

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Address PR feedback

* Fix verification bug.

* Improve state bag thread safety

* Address PR comments and fix unit tests

* Address PR comments

* Fix unit test

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Add a public StateKey property to providers (#3810)

* .NET: [BREAKING] Update providers in such a way that they can participate in a pipeline (#3846)

* Make providers pipeline capable

* Fix unit tests

* Move source stamping to providers from base class

* Also update samples.

* Address PR comments

* Rename AsAgentRequestMessageSourcedMessage to WithAgentRequestMessageSource

* .NET: [BREAKING] Add consistent message filtering to all providers. (#3851)

* Add consistent message filtering to all providers.

* Remove old chat history filtering classes

* Fix merge issues

* Fix unit test

* Enforce non-nullable property

* Fix merging bug and make troubleshooting source info easier by adding tostring implementation

* .NET: [BREAKING] Add support for multiple AIContextProviders on a ChatClientAgent (#3863)

* Add support for multiple AIContextProviders on a ChatClientAgent

* Address PR comments and fix tests

* Address PR comments.

* .NET: [BREAKING]Delay AIContext Materialization until the end of the pipeline is reached. (#3883)

* Delay AIContext Materialization until the end of the pipeline is reached.

* Address PR comments.

* Address PR comments

* Modify InvokedContext to be immutable (#3888)

* .NET: Address Feedback on StateBag feature branch PR (#3910)

* Address Feedback on statebag feature branch PR

* Update dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Address PR comments

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-13 14:08:07 +00:00

200 lines
8.6 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.AI;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
/// <summary>
/// Tests for InProcessExecution to verify streaming and non-streaming execution behavior.
/// </summary>
public class InProcessExecutionTests
{
/// <summary>
/// The non-streaming version (RunAsync) should execute the workflow and produce events,
/// similar to the streaming version (StreamAsync + TrySendMessageAsync).
/// </summary>
[Fact]
public async Task RunAsyncShouldExecuteWorkflowAsync()
{
// Arrange: Create a simple agent that responds to messages
var agent = new SimpleTestAgent("test-agent");
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
// Act: Execute using non-streaming RunAsync
Run run = await InProcessExecution.RunAsync(workflow, new List<ChatMessage> { inputMessage });
// Assert: The workflow should have executed and produced events
RunStatus status = await run.GetStatusAsync();
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
// The run should have events (at minimum, a WorkflowOutputEvent)
run.OutgoingEvents.Should().NotBeEmpty("workflow should produce events during execution");
// Check that we have an agent execution event
var agentEvents = run.OutgoingEvents.OfType<AgentResponseUpdateEvent>().ToList();
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
// Check that we have output events
var outputEvents = run.OutgoingEvents.OfType<WorkflowOutputEvent>().ToList();
outputEvents.Should().NotBeEmpty("workflow should produce output events");
}
/// <summary>
/// This test shows that the streaming version works correctly when TurnToken is sent following a message.
/// </summary>
[Fact]
public async Task StreamAsyncWithTurnTokenShouldExecuteWorkflowAsync()
{
// Arrange: Create a simple agent that responds to messages
var agent = new SimpleTestAgent("test-agent");
var workflow = AgentWorkflowBuilder.BuildSequential(agent);
var inputMessage = new ChatMessage(ChatRole.User, "Hello");
// Act: Execute using streaming version with TurnToken
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, new List<ChatMessage> { inputMessage });
// Send TurnToken to actually trigger execution (this is the key step)
bool messageSent = await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
messageSent.Should().BeTrue("TurnToken should be accepted");
// Collect events
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert: The workflow should have executed and produced events
RunStatus status = await run.GetStatusAsync();
status.Should().Be(RunStatus.Idle, "workflow should complete execution");
events.Should().NotBeEmpty("workflow should produce events during execution");
// Check that we have agent execution events
var agentEvents = events.OfType<AgentResponseUpdateEvent>().ToList();
agentEvents.Should().NotBeEmpty("agent should have executed and produced update events");
// Check that we have output events
var outputEvents = events.OfType<WorkflowOutputEvent>().ToList();
outputEvents.Should().NotBeEmpty("workflow should produce output events");
}
/// <summary>
/// This test compares the behavior of RunAsync vs StreamAsync to highlight the difference.
/// Both should produce similar results, but as of issue #1315, RunAsync fails to execute.
/// </summary>
[Fact]
public async Task RunAsyncAndStreamAsyncShouldProduceSimilarResultsAsync()
{
// Arrange: Create the same workflow for both tests
var agent1 = new SimpleTestAgent("test-agent-1");
var workflow1 = AgentWorkflowBuilder.BuildSequential(agent1);
var agent2 = new SimpleTestAgent("test-agent-2");
var workflow2 = AgentWorkflowBuilder.BuildSequential(agent2);
var inputMessage = new ChatMessage(ChatRole.User, "Test message");
// Act 1: Execute using RunAsync (non-streaming)
Run nonStreamingRun = await InProcessExecution.RunAsync(workflow1, new List<ChatMessage> { inputMessage });
var nonStreamingEvents = nonStreamingRun.OutgoingEvents.ToList();
// Act 2: Execute using StreamAsync (streaming) with TurnToken
await using StreamingRun streamingRun = await InProcessExecution.StreamAsync(workflow2, new List<ChatMessage> { inputMessage });
await streamingRun.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<WorkflowEvent> streamingEvents = [];
await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
{
streamingEvents.Add(evt);
}
// Assert: Both should have produced events
// The streaming version works (we know this from the issue report)
streamingEvents.Should().NotBeEmpty("streaming version should produce events");
// The non-streaming version should also produce events (this is the bug being tested)
nonStreamingEvents.Should().NotBeEmpty("non-streaming version should also produce events");
// Both should have similar types of events
var streamingAgentEvents = streamingEvents.OfType<AgentResponseUpdateEvent>().Count();
var nonStreamingAgentEvents = nonStreamingEvents.OfType<AgentResponseUpdateEvent>().Count();
nonStreamingAgentEvents.Should().Be(streamingAgentEvents,
"both versions should produce the same number of agent events");
}
/// <summary>
/// Simple test agent that echoes back the input message.
/// </summary>
private sealed class SimpleTestAgent : AIAgent
{
public SimpleTestAgent(string name)
{
this.Name = name;
}
public override string Name { get; }
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default) => new(new SimpleTestAgentSession());
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(System.Text.Json.JsonElement serializedState,
System.Text.Json.JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) => new(new SimpleTestAgentSession());
protected override ValueTask<System.Text.Json.JsonElement> SerializeSessionCoreAsync(AgentSession session, System.Text.Json.JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> default;
protected override Task<AgentResponse> RunCoreAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
var lastMessage = messages.LastOrDefault();
var responseMessage = new ChatMessage(ChatRole.Assistant, $"Echo: {lastMessage?.Text ?? "no message"}");
return Task.FromResult(new AgentResponse(responseMessage));
}
protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await Task.Yield();
var lastMessage = messages.LastOrDefault();
var responseText = $"Echo: {lastMessage?.Text ?? "no message"}";
string messageId = Guid.NewGuid().ToString("N");
// Yield role first
yield return new AgentResponseUpdate(ChatRole.Assistant, this.Name)
{
AuthorName = this.Name,
MessageId = messageId
};
// Then yield content
yield return new AgentResponseUpdate(ChatRole.Assistant, responseText)
{
AuthorName = this.Name,
MessageId = messageId
};
}
}
/// <summary>
/// Simple session implementation for SimpleTestAgent.
/// </summary>
private sealed class SimpleTestAgentSession : AgentSession;
}