Files
agent-framework/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs
Jacob Alber 8ed2159c4b .NET: Workflow Outputs Overhaul: Support Tagging, Filtering Agent Outputs (#6045)
* test: reshuffle .NET Workflow tests in preparation for Outputs overhaul

Phase 1 of the .NET Workflows outputs overhaul (see
working/implementation-plan.md). Pure moves/renames in
dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests; no production code
changes, no new test cases. The split keeps each orchestration mode in
its own source file so the upcoming tag-aware and orchestration-default
test additions land on clean diffs.

Renames:
* WorkflowBuilderSmokeTests.cs -> WorkflowBuilderTests.cs (with class
  rename to match). The scope is no longer "smoke"-only once subsequent
  phases add tag-aware builder tests.
* InputWaiterAndOutputFilterTests.cs -> InputWaiterTests.cs +
  OutputFilterTests.cs. The file already declared the two test classes
  separately; this split simply gives each its own file so the
  output-filter cases have a dedicated home for tag-aware additions.

Split of AgentWorkflowBuilderTests.cs:
* AgentWorkflowBuilderTests.cs is now the outer
  `public static partial class AgentWorkflowBuilderTests` holding the
  shared test helpers (DoubleEchoAgent + session + WithBarrier variant,
  WorkflowRunResult, RunWorkflow* methods) bumped from `private` to
  `internal` so the new top-level GroupChatWorkflowBuilderTests in the
  same assembly can reach them.
* AgentWorkflowBuilder.SequentialTests.cs (nested SequentialTests):
  BuildSequential_InvalidArguments_Throws,
  BuildSequential_AgentsRunInOrderAsync.
* AgentWorkflowBuilder.ConcurrentTests.cs (nested ConcurrentTests):
  BuildConcurrent_InvalidArguments_Throws,
  BuildConcurrent_AgentsRunInParallelAsync.

Sequential and Concurrent are kept as nested classes because they're
modes of the same `AgentWorkflowBuilder` static factory and do not
produce dedicated builder types.

New file:
* GroupChatWorkflowBuilderTests.cs (top-level): the existing
  BuildGroupChat_* and GroupChatManager_* cases moved out of the old
  AgentWorkflowBuilderTests file. They exercise the
  `GroupChatWorkflowBuilder` type (returned by
  `AgentWorkflowBuilder.CreateGroupChatBuilderWith`), so a dedicated
  top-level test class - matching the convention reserved by the plan
  for HandoffWorkflowBuilderTests / MagenticWorkflowBuilderTests - is
  the right home. Cross-class helper references qualify with
  `AgentWorkflowBuilderTests.DoubleEchoAgent` and
  `AgentWorkflowBuilderTests.RunWorkflowAsync`.

The outer partial class is `static` (and nested classes carry the
instance test methods) because the outer holds only static helpers;
this satisfies CA1052 without suppressions and is invisible to xUnit
discovery, which finds tests on the nested classes as
`AgentWorkflowBuilderTests.SequentialTests.*` etc.

Validation: `dotnet build` clean on both target frameworks; all 547
tests in Microsoft.Agents.AI.Workflows.UnitTests pass on net10.0.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat: introduce OutputTag, Futures, and tag-aware WorkflowBuilder API

Phase 2 of the .NET Workflows outputs overhaul. Additive code change
only - no observable runtime behavior change. The runner still uses the
legacy bypass for AgentResponse / AgentResponseUpdate payloads, and the
new `Futures.EnableAgentResponseOutputTaggingAndFiltering` flag defaults
to false. Phase 3 will wire the flag into the runner; this commit only
introduces the types and the builder API.

New public surface:
* `OutputTag` (readonly struct): wraps a string Value with ordinal
  equality (IEquatable, GetHashCode, == / !=) so it can participate as a
  HashSet element. Internal ctor closes the set. One public singleton:
  `OutputTag.Intermediate`. Terminal / regular outputs carry no tag
  (empty Tags set). JSON-serialized as a bare string via
  [JsonConverter(typeof(OutputTagJsonConverter))], with the converter
  rehydrating to the well-known singleton on read.
* `Futures` (static class): hosts opt-in pre-GA behavior switches.
  First flag is `EnableAgentResponseOutputTaggingAndFiltering`; XML doc
  captures the v2.0.0 obsoletion / v3.0.0 removal lifecycle.
* `WorkflowOutputEvent.Tags`: `HashSet<OutputTag>` exposed directly
  (concrete collection, matches the JSON-serialization convention used
  for `WorkflowInfo.OutputExecutorIds`). Never null; empty for legacy /
  terminal events. New ctors take a single `OutputTag` or
  `IEnumerable<OutputTag>?`; the existing (data, executorId) ctor
  remains and produces an untagged event. `HasTag(OutputTag)` helper.
  `AgentResponseEvent` and `AgentResponseUpdateEvent` gain matching
  tag-accepting ctors forwarding to the base.
* `WorkflowOutputEventExtensions.IsIntermediate(this WorkflowOutputEvent)`:
  extension method returning `evt.HasTag(OutputTag.Intermediate)`. The
  preferred way to ask "is this an intermediate output?" without
  reaching into the Tags set.
* `WorkflowBuilder.WithOutputFrom(IEnumerable<ExecutorBinding>, OutputTag)`
  and `WorkflowBuilder.WithOutputFrom(ExecutorBinding, OutputTag)`:
  forward-looking tagged overloads. The IEnumerable form is the primary
  tagged surface; the single-executor form is a convenience for the
  common one-executor case. Currently usable for the
  `OutputTag.Intermediate` singleton; will become the primary surface
  once the `OutputTag` constructor is opened to user-defined tags in
  a future release. Callers in this release should prefer the
  intent-specific `WithIntermediateOutputFrom` extension for the
  intermediate case. Tags accumulate across repeated calls; same tag
  repeated dedupes via the HashSet.
* `WorkflowBuilderExtensions.WithIntermediateOutputFrom(this WorkflowBuilder, IEnumerable<ExecutorBinding>)`:
  helper that forwards to `WithOutputFrom(executors, OutputTag.Intermediate)`.
  Takes an IEnumerable (matching the tagged WithOutputFrom shape) -
  callers pass collection literals: `builder.WithIntermediateOutputFrom([a, b])`.
  XML doc remarks call out the Futures-flag interaction and the
  AIAgent-payload forwarding contract.

Internal shape changes:
* `WorkflowBuilder._outputExecutors`: HashSet<string> -> Dictionary<
  string, HashSet<OutputTag>>. The value set is empty for executors
  designated only via the untagged WithOutputFrom; contains Intermediate
  (and possibly future tags) otherwise.
* `Workflow.OutputExecutors`: HashSet<string> -> Dictionary<string,
  HashSet<OutputTag>>.
* `OutputFilter.CanOutput`: `Contains(id)` -> `ContainsKey(id)`.
* `WorkflowInfo.OutputExecutorIds`: HashSet<string> -> Dictionary<
  string, HashSet<OutputTag>>, with a custom JsonConverter that reads
  both the new map shape (`{id: ["intermediate", ...]}`) and the legacy
  array shape (`[id1, id2]`, where each id is treated as an untagged
  output). Always writes the map shape. IsMatch updated to compare
  per-id tag sets.

Tests landing in this commit (per the test-with-feature principle):
* `OutputTagTests.cs` (6 tests): KnownValues, EqualityIsOrdinalOnValue,
  DefaultStructValueIsDistinct (default(OutputTag) does not collide
  with the Intermediate singleton in a HashSet),
  GetHashCodeMatchesEquals, JsonConverter_RoundtripsValueAsString,
  ConstructorIsInternal (reflection-based assertion that the (string)
  ctor is `internal`).
* `WorkflowBuilderTests.cs` adds 7 new tests pinning the builder
  API contract: RegistersWithEmptyTagSet, AddsIntermediateTag,
  MultipleExecutorsAllUntagged, ThenIntermediate_AccumulatesTags,
  RepeatedDedupes, OnlyRegistersWithoutPriorWithOutputFrom,
  TracksExecutorBinding.
* `BackwardsCompatibility/JsonCheckpointSerializationTests.cs`
  (new folder + file, 5 tests): event-level ctor contract tests
  (single-tag, no-tag, multi-tag — the last with a custom tag);
  IsIntermediate() asserted; load-bearing JSON BC tests for
  `WorkflowInfo.OutputExecutorIds` -
  `WorkflowOutputExecutorsReadsLegacyArrayShape` (legacy ids map to
  empty tag sets) and `WorkflowOutputExecutorsWritesMapShape`.

The plan's three JSON round-trip tests for `WorkflowOutputEvent.Tags`
were dropped: `WorkflowEvent` is not currently a serialized checkpoint
shape (see the comment in WorkflowsJsonUtilities.cs about events not
being persisted), so there is no real back-compat surface to pin
through JSON. They are substituted with in-process ctor/property
round-trip tests that exercise the `Tags` / `HasTag` / `IsIntermediate`
contract.

Validation: full `Microsoft.Agents.AI.Workflows.UnitTests` suite runs
green on net10.0 (565 passing, 0 failing). Core library builds clean
on net472, netstandard2.0, net8.0, net9.0, and net10.0. Test project
builds clean on net472 + net10.0.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat: route AgentResponse(Update) through the output filter under a Futures flag

`InProcessRunnerContext.YieldOutputAsync` historically special-cased AgentResponse and
AgentResponseUpdate payloads: it built the typed event subclass and emitted it directly,
bypassing the output filter. Rewrites the method so that:

- When `Futures.EnableAgentResponseOutputTaggingAndFiltering` is `false` (the current
  default), AgentResponse(Update) keep the legacy bypass — emitted as
  AgentResponseEvent / AgentResponseUpdateEvent with no tags. Existing callers see no
  behavior change.
- When the flag is `true`, AIAgent payloads flow through the output filter just like
  every other payload type: undesignated sources are dropped, and the emitted event
  carries the source's tag set (empty for terminal `WithOutputFrom`, `{Intermediate}`
  for `WithIntermediateOutputFrom`, the set union when both designations apply).

Non-AIAgent (POCO) outputs also now carry the source's tag set on the emitted
WorkflowOutputEvent unconditionally — additive, since no existing assertion inspected
Tags. Subclass events (`AgentResponseEvent` / `AgentResponseUpdateEvent`) continue to
be emitted under both modes so `switch (evt) { case AgentResponseEvent: ... }`
consumer code keeps matching.

Adds `OutputFilter.TryGetTags` as the tag-aware lookup used by the runner.
`OutputFilter.CanOutput` is kept (still used by the existing sync tests in
`OutputFilterTests.cs`).

Tests
-----
- `Futures/Futures.AgentResponseOutputFilteringAndTaggingTests.cs` (new): the F1–F13
  matrix from the plan, covering every combination of `(flag on/off) × (designation)
  × (payload shape)`. Uses a `FuturesScope` IDisposable + a `FuturesSerial` xUnit
  collection (DisableParallelization = true) to keep the process-global flag from
  leaking across parallel tests.
- `OutputFilterTests.cs`: four new `Test_OutputFilter_…` cases for the `TryGetTags`
  surface (empty-tag-set for terminal designation, `{Intermediate}` for intermediate
  designation, union for accumulated designation, `false` for unregistered).

582/582 unit tests pass on net10.0 (565 baseline + 17 new).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat: tag-aware defaults and designation API on orchestration builders

Aligns the .NET orchestration builders with Python's output / intermediate-output
distinction. Each builder either applies a Python-aligned default designation set or
replays the user's explicit `WithOutputFrom` / `WithIntermediateOutputFrom` calls,
never both.

Static `AgentWorkflowBuilder.BuildSequential` / `BuildConcurrent` apply defaults
unconditionally (no user-facing fluent surface to take control through):

- Sequential: terminal `end` + every agent designated intermediate.
- Concurrent: terminal `end` + every agent and per-agent accumulator designated
  intermediate.

The three fluent instance builders memoize agent-typed designation calls in a
`Dictionary<AIAgent, HashSet<OutputTag>>` (empty set = terminal-only, non-empty =
intermediate tag(s)) so repeated calls dedupe naturally. They replay the entries
at `Build()` time, suppressing defaults when any call has been made:

- `HandoffWorkflowBuilder` / `HandoffWorkflowBuilderCore<TBuilder>` (also picked up
  by the obsolete `HandoffsWorkflowBuilder` via inheritance).
  Default: terminal `HandoffEnd` + every handoff agent intermediate.
  (Bug fix: legacy code relied on `WithOutputFrom(end)` to bind `HandoffEnd`. The
  new explicit-designation path bypasses that, so `Build()` now calls
  `BindExecutor(end)` unconditionally to keep validation happy.)
- `GroupChatWorkflowBuilder` — default: terminal host + every participant intermediate.
- `MagenticWorkflowBuilder` — default: terminal orchestrator + every team member
  intermediate.

Designating a non-participant agent throws `InvalidOperationException`.

The bare `WorkflowBuilder` default is unchanged — only the orchestration-style
builders gain implicit defaults, matching the plan's non-goal.

Tests
-----
- `AgentWorkflowBuilder.SequentialTests` / `.ConcurrentTests`: one default-spec
  assertion each.
- `GroupChatWorkflowBuilderTests`: defaults-match-spec, explicit-replaces-defaults,
  non-participant throws.
- `HandoffWorkflowBuilderTests` (new file): same three.
- `MagenticWorkflowBuilderTests` (new file): same three.

593/593 unit tests pass on net10.0 (582 baseline + 11 new).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat: WorkflowHostAgent forwards AgentResponseEvent unconditionally under Futures-on

Aligns the .NET Workflow-as-Agent surface with Python `as_agent`. Under
`Futures.EnableAgentResponseOutputTaggingAndFiltering = true`,
`WorkflowSession.InvokeStageAsync` now forwards `AgentResponseEvent`
unconditionally — joining `AgentResponseUpdateEvent` in ignoring the host's
`includeWorkflowOutputsInResponse` switch. That switch keeps governing the
generic `WorkflowOutputEvent` path for non-AIAgent payloads, where it is
further short-circuited by an `IsIntermediate()` check (tagged intermediate
outputs always surface).

Under Futures-off the legacy asymmetry is preserved: `AgentResponseUpdateEvent`
always forwarded, `AgentResponseEvent` gated by `includeWorkflowOutputsInResponse`.

Back-compat: with `Futures.EnableAgentResponseOutputTaggingAndFiltering` left at
its default `false`, observable behavior is identical to before.

`Futures` documentation gains a remark explaining the `Workflow.AsAIAgent()`
interaction in both flag states.

Runner fix
----------
`InProcessRunnerContext.YieldOutputAsync` now skips `Executor.CanOutput` for
AgentResponse-shaped payloads under both Futures branches. `AIAgentHostExecutor`
doesn't declare AgentResponse(Update) in its `Yields` set, so the historical
legacy bypass had silently skipped the check; Phase 3's Futures-on path was
running it and would reject AIAgent payloads. AIAgent-shaped payloads are now
always a valid output shape, matching the legacy bypass semantics.

Phase 4 follow-on
-----------------
Switched the three orchestration-builder designation-replay loops to iterate
`Dictionary.Keys` with a value lookup instead of constructing/destructuring
`KeyValuePair<,>`. Cleaner shape and avoids the netstandard2.0 / net472
`KeyValuePair<,>.Deconstruct` unavailability that surfaced when this branch
multi-TFM-built.

Tests
-----
`WorkflowHostSmokeTests.IntermediateForwarding` (new nested class, 6 tests):
- intermediate AgentResponse forwarded past the include-outputs gate (Futures on)
- terminal AgentResponse forwarded unconditionally (Futures on)
- terminal AgentResponse gated by include flag (Futures off, legacy)
- undesignated AIAgent executor emits no AgentResponseEvent under Futures-on
- legacy bypass still emits AgentResponseEvent under Futures-off
- intermediate tag is observable via `update.RawRepresentation`

The class joins the `FuturesSerial` xUnit collection so the process-global flag
is serialized against other Futures-toggling tests.

599/599 unit tests pass on net10.0 (593 baseline + 6 new).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat: SequentialWorkflowBuilder and ConcurrentWorkflowBuilder, OrchestrationBuilderBase

Promotes the Sequential and Concurrent orchestration shapes to first-class fluent
builder classes, matching Handoff / GroupChat / Magentic. Users can call
`WithOutputFrom(agents)` / `WithIntermediateOutputFrom(agents)` to control which
agents are designated output / intermediate sources; when no designation call is
made, the Python-aligned defaults apply (terminal aggregator output + every agent
intermediate; Concurrent also tags per-agent accumulators).

`AgentWorkflowBuilder.BuildSequential(...)` and `BuildConcurrent(...)` are kept
and now delegate to the new builders; observable behavior unchanged. Five static
factories now mirror each other:

- `AgentWorkflowBuilder.CreateSequentialBuilderWith(params IEnumerable<AIAgent>)`
- `AgentWorkflowBuilder.CreateConcurrentBuilderWith(params IEnumerable<AIAgent>)`
- `AgentWorkflowBuilder.CreateHandoffBuilderWith(AIAgent)`        (already existed)
- `AgentWorkflowBuilder.CreateGroupChatBuilderWith(Func<...>)`    (already existed)
- `AgentWorkflowBuilder.CreateMagenticBuilderWith(AIAgent)`       (new)

OrchestrationBuilderBase
------------------------
New abstract `OrchestrationBuilderBase<TBuilder>` unifies the shared fluent
surface across all five orchestration builders: `WithName`, `WithDescription`,
`WithOutputFrom`, `WithIntermediateOutputFrom`, and the
`ApplyOutputDesignations(builder, agentMap, kind, applyDefaults)` helper that
either replays the user's designations or invokes the orchestration-specific
defaults.

Removes ~150 LOC of duplicated designation-management code from the four
non-Handoff builders, plus the equivalent from `HandoffWorkflowBuilderCore`.

Tests
-----
- New `SequentialWorkflowBuilderTests.cs` / `ConcurrentWorkflowBuilderTests.cs`
  (replace the old `AgentWorkflowBuilder.{Sequential,Concurrent}Tests.cs`
  nested-class files). Method names normalized to
  `Test_<BuilderType>_<Scenario>[Async]`.
- Shared helpers (`DoubleEchoAgent`, `DoubleEchoAgentWithBarrier`,
  `WorkflowRunResult`, `RunWorkflow*`) moved from the old
  `AgentWorkflowBuilderTests` partial class into a new
  `OrchestrationTestHelpers` static class in `OrchestrationTestHelpers.cs`.
  Downstream test files (Group Chat, Handoff, Sequential, Concurrent) updated
  to qualify with `OrchestrationTestHelpers.*`.
- A new `AgentWorkflowBuilderTests.cs` covers the static surface directly:
  `BuildSequential` / `BuildConcurrent` invariants and aggregator wiring, plus
  null-rejection + round-trip checks for every `Create*BuilderWith` factory.
- New AsAgent intermediate-suppression tests on a nested `AsAgentForwarding`
  class for each of Sequential and Concurrent: build with only the terminal
  agent designated via `WithOutputFrom`, run via `AsAIAgent(...)`, assert via
  `AgentResponseUpdate.AuthorName` that intermediate agents do not surface.
  Both join the `FuturesSerial` collection.
- New `Test_<Builder>_WithDescriptionPropagatesToWorkflow` smoke tests on
  Sequential and Concurrent (newly available via the base class).

625/625 unit tests pass on net10.0.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* chore: dotnet format

* fixup: encoding

* fixup: charset

* fixup: Updates for PR feedback

* fixup: format

* fixup: merge issue

* Fix intermediate filtering on .AsAgent()

* fix filter logic

* fix: Revert logic change and add comments

---------

Co-authored-by: Jacob Alber <jalber@lokitoth.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-28 21:26:31 +00:00

954 lines
47 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.AI;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
public sealed class ExpectedException : Exception
{
public ExpectedException(string message)
: base(message)
{
}
public ExpectedException() : base()
{
}
public ExpectedException(string? message, Exception? innerException) : base(message, innerException)
{
}
}
/// <summary>
/// A simple agent that emits a FunctionCallContent or ToolApprovalRequestContent request.
/// Used to test that RequestInfoEvent handling preserves the original content type.
/// </summary>
internal sealed class RequestEmittingAgent : AIAgent
{
private readonly AIContent _requestContent;
private readonly bool _completeOnResponse;
/// <summary>
/// Creates a new <see cref="RequestEmittingAgent"/> that emits the given request content.
/// </summary>
/// <param name="requestContent">The content to emit on each turn.</param>
/// <param name="completeOnResponse">
/// When <see langword="true"/>, the agent emits a text completion instead of re-emitting
/// the request when the incoming messages contain a <see cref="FunctionResultContent"/>
/// or <see cref="ToolApprovalResponseContent"/>. This models realistic agent behaviour
/// where the agent processes the tool result and produces a final answer.
/// </param>
public RequestEmittingAgent(AIContent requestContent, bool completeOnResponse = false)
{
this._requestContent = requestContent;
this._completeOnResponse = completeOnResponse;
}
private sealed class Session : AgentSession
{
public Session() { }
}
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> new(new Session());
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default)
=> new(new Session());
protected override ValueTask<JsonElement> SerializeSessionCoreAsync(AgentSession session, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> default;
protected override Task<AgentResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
=> this.RunStreamingAsync(messages, session, options, cancellationToken).ToAgentResponseAsync(cancellationToken);
protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (this._completeOnResponse && messages.Any(m => m.Contents.Any(c =>
c is FunctionResultContent || c is ToolApprovalResponseContent)))
{
yield return new AgentResponseUpdate(ChatRole.Assistant, [new TextContent("Request processed")]);
}
else
{
// Emit the request content
yield return new AgentResponseUpdate(ChatRole.Assistant, [this._requestContent]);
}
}
}
internal sealed class KickoffOnStartExecutor : ChatProtocolExecutor
{
private static readonly ChatProtocolExecutorOptions s_options = new()
{
AutoSendTurnToken = false,
};
private readonly string _downstreamExecutorId;
private readonly string _kickoffInputText;
private readonly string _kickoffMessageText;
private readonly string _regularResumeText;
private readonly string _regularProcessedText;
public KickoffOnStartExecutor(
string id,
string downstreamExecutorId,
string kickoffInputText,
string kickoffMessageText,
string regularResumeText,
string regularProcessedText)
: base(id, s_options)
{
this._downstreamExecutorId = downstreamExecutorId;
this._kickoffInputText = kickoffInputText;
this._kickoffMessageText = kickoffMessageText;
this._regularResumeText = regularResumeText;
this._regularProcessedText = regularProcessedText;
}
protected override async ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)
{
List<string> textContents =
[
.. messages
.SelectMany(message => message.Contents.OfType<TextContent>())
.Select(content => content.Text)
];
if (textContents.Contains(this._kickoffInputText, StringComparer.Ordinal))
{
await context.SendMessageAsync(
new List<ChatMessage> { new(ChatRole.User, this._kickoffMessageText) },
this._downstreamExecutorId,
cancellationToken).ConfigureAwait(false);
await context.SendMessageAsync(
new TurnToken(emitEvents),
this._downstreamExecutorId,
cancellationToken).ConfigureAwait(false);
}
if (textContents.Contains(this._regularResumeText, StringComparer.Ordinal))
{
AgentResponseUpdate update = new(ChatRole.Assistant, [new TextContent(this._regularProcessedText)])
{
CreatedAt = DateTimeOffset.UtcNow,
MessageId = Guid.NewGuid().ToString("N"),
ResponseId = Guid.NewGuid().ToString("N"),
Role = ChatRole.Assistant,
};
await context.AddEventAsync(new AgentResponseUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false);
}
}
}
/// <summary>
/// A start executor that always emits a response update on every turn,
/// useful for verifying that a TurnToken was delivered by the session.
/// On the first turn (user messages present), it kicks off a downstream executor.
/// </summary>
internal sealed class TurnTrackingStartExecutor : ChatProtocolExecutor
{
private static readonly ChatProtocolExecutorOptions s_options = new()
{
AutoSendTurnToken = false,
};
private readonly string _downstreamExecutorId;
private readonly string _activatedMarker;
private int _activationCount;
/// <summary>Gets the number of times this executor has been activated (i.e., <see cref="TakeTurnAsync"/> called).</summary>
public int ActivationCount => this._activationCount;
public TurnTrackingStartExecutor(string id, string downstreamExecutorId, string activatedMarker)
: base(id, s_options)
{
this._downstreamExecutorId = downstreamExecutorId;
this._activatedMarker = activatedMarker;
}
protected override async ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)
{
Interlocked.Increment(ref this._activationCount);
// On the first turn, forward user messages and a TurnToken to the downstream executor.
if (messages.Any(m => m.Role == ChatRole.User))
{
await context.SendMessageAsync(
messages,
this._downstreamExecutorId,
cancellationToken).ConfigureAwait(false);
await context.SendMessageAsync(
new TurnToken(emitEvents),
this._downstreamExecutorId,
cancellationToken).ConfigureAwait(false);
}
// Always emit a marker to prove this executor was activated.
AgentResponseUpdate update = new(ChatRole.Assistant, [new TextContent(this._activatedMarker)])
{
CreatedAt = DateTimeOffset.UtcNow,
MessageId = Guid.NewGuid().ToString("N"),
ResponseId = Guid.NewGuid().ToString("N"),
Role = ChatRole.Assistant,
};
await context.AddEventAsync(new AgentResponseUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false);
}
}
public class NonChatProtocolExecutor() : Executor<string>(nameof(NonChatProtocolExecutor))
{
public override ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
return default;
}
}
public class WorkflowHostSmokeTests : AIAgentHostingExecutorTestsBase
{
private sealed class AlwaysFailsAIAgent(bool failByThrowing) : AIAgent
{
private sealed class Session : AgentSession
{
public Session() { }
public Session(AgentSessionStateBag stateBag) : base(stateBag) { }
}
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
{
return new(serializedState.Deserialize<Session>(jsonSerializerOptions)!);
}
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default)
{
return new(new Session());
}
protected override ValueTask<JsonElement> SerializeSessionCoreAsync(AgentSession session, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> default;
protected override async Task<AgentResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
return await this.RunStreamingAsync(messages, session, options, cancellationToken)
.ToAgentResponseAsync(cancellationToken);
}
protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
const string ErrorMessage = "Simulated agent failure.";
if (failByThrowing)
{
throw new ExpectedException(ErrorMessage);
}
yield return new AgentResponseUpdate(ChatRole.Assistant, [new ErrorContent(ErrorMessage)]);
}
}
private static Workflow CreateWorkflow(bool failByThrowing)
{
ExecutorBinding agent = new AlwaysFailsAIAgent(failByThrowing).BindAsExecutor(emitEvents: true);
return new WorkflowBuilder(agent).Build();
}
[Theory]
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
public async Task Test_AsAgent_ErrorContentStreamedOutAsync(bool includeExceptionDetails, bool failByThrowing)
{
string expectedMessage = !failByThrowing || includeExceptionDetails
? "Simulated agent failure."
: "An error occurred while executing the workflow.";
// Arrange is done by the caller.
Workflow workflow = CreateWorkflow(failByThrowing);
// Act
List<AgentResponseUpdate> updates = await workflow.AsAIAgent("WorkflowAgent", includeExceptionDetails: includeExceptionDetails)
.RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello"))
.ToListAsync();
// Assert
bool hadErrorContent = false;
foreach (AgentResponseUpdate update in updates)
{
if (update.Contents.Any())
{
// We should expect a single update which contains the error content.
update.Contents.Should().ContainSingle()
.Which.Should().BeOfType<ErrorContent>()
.Which.Message.Should().Be(expectedMessage);
hadErrorContent = true;
}
}
hadErrorContent.Should().BeTrue();
}
/// <summary>
/// Tests that when a workflow emits a RequestInfoEvent with FunctionCallContent data,
/// the AgentResponseUpdate preserves the original FunctionCallContent type.
/// </summary>
[Fact]
public async Task Test_AsAgent_FunctionCallContentPreservedInRequestInfoAsync()
{
// Arrange
const string CallId = "test-call-id";
const string FunctionName = "testFunction";
FunctionCallContent originalContent = new(CallId, FunctionName);
RequestEmittingAgent requestAgent = new(originalContent);
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
// Act
List<AgentResponseUpdate> updates = await workflow.AsAIAgent("WorkflowAgent")
.RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello"))
.ToListAsync();
// Assert
AgentResponseUpdate? updateWithFunctionCall = updates.FirstOrDefault(u =>
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent));
updateWithFunctionCall.Should().NotBeNull("a FunctionCallContent should be present in the response updates");
FunctionCallContent retrievedContent = updateWithFunctionCall!.Contents
.OfType<FunctionCallContent>()
.Should().ContainSingle()
.Which;
retrievedContent.CallId.Should().NotBe(CallId);
retrievedContent.CallId.Should().EndWith($":{CallId}");
retrievedContent.Name.Should().Be(FunctionName);
}
/// <summary>
/// Tests that when a workflow emits a RequestInfoEvent with ToolApprovalRequestContent data,
/// the AgentResponseUpdate preserves the original ToolApprovalRequestContent type.
/// </summary>
[Fact]
public async Task Test_AsAgent_ToolApprovalRequestContentPreservedInRequestInfoAsync()
{
// Arrange
const string RequestId = "test-request-id";
McpServerToolCallContent mcpCall = new("call-id", "testToolName", "http://localhost");
ToolApprovalRequestContent originalContent = new(RequestId, mcpCall);
RequestEmittingAgent requestAgent = new(originalContent);
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true });
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
// Act
List<AgentResponseUpdate> updates = await workflow.AsAIAgent("WorkflowAgent")
.RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello"))
.ToListAsync();
// Assert
AgentResponseUpdate? updateWithUserInput = updates.FirstOrDefault(u =>
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is ToolApprovalRequestContent));
updateWithUserInput.Should().NotBeNull("a ToolApprovalRequestContent should be present in the response updates");
ToolApprovalRequestContent retrievedContent = updateWithUserInput!.Contents
.OfType<ToolApprovalRequestContent>()
.Should().ContainSingle()
.Which;
retrievedContent.Should().NotBeNull();
retrievedContent.RequestId.Should().NotBe(RequestId);
retrievedContent.RequestId.Should().EndWith($":{RequestId}");
}
/// <summary>
/// Tests the full roundtrip: workflow emits a request, external caller responds, workflow processes response.
/// </summary>
[Fact]
public async Task Test_AsAgent_FunctionCallRoundtrip_ResponseIsProcessedAsync()
{
// Arrange: Create an agent that emits a FunctionCallContent request
const string CallId = "roundtrip-call-id";
const string FunctionName = "testFunction";
FunctionCallContent requestContent = new(CallId, FunctionName);
RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true);
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
// Act 1: First call - should receive the FunctionCallContent request
AgentSession session = await agent.CreateSessionAsync();
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
new ChatMessage(ChatRole.User, "Start"),
session).ToListAsync();
// Assert 1: We should have received a FunctionCallContent
AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u =>
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent));
updateWithRequest.Should().NotBeNull("a FunctionCallContent should be present in the response updates");
FunctionCallContent receivedRequest = updateWithRequest!.Contents
.OfType<FunctionCallContent>()
.First();
receivedRequest.CallId.Should().EndWith($":{CallId}");
// Act 2: Send the response back
FunctionResultContent responseContent = new(receivedRequest.CallId, "test result");
ChatMessage responseMessage = new(ChatRole.Tool, [responseContent]);
// Act 2: Run the workflow with the response and capture the resulting updates
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(responseMessage, session).ToListAsync();
// Assert 2: The response should be processed and the original request should no longer be pending.
// Concretely, the workflow should not re-emit a FunctionCallContent with the same CallId.
secondCallUpdates.Should().NotBeNull("processing the response should produce updates");
secondCallUpdates.Should().NotBeEmpty("processing the response should progress the workflow");
secondCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Should()
.NotContain(c => c.CallId == receivedRequest.CallId, "the external FunctionCallContent request should be cleared after processing the response");
}
/// <summary>
/// Tests the full roundtrip for ToolApprovalRequestContent: workflow emits request, external caller responds.
/// Verifying inbound ToolApprovalResponseContent conversion.
/// </summary>
[Fact]
public async Task Test_AsAgent_ToolApprovalRoundtrip_ResponseIsProcessedAsync()
{
// Arrange: Create an agent that emits a ToolApprovalRequestContent request
const string RequestId = "roundtrip-request-id";
McpServerToolCallContent mcpCall = new("mcp-call-id", "testMcpTool", "http://localhost");
ToolApprovalRequestContent requestContent = new(RequestId, mcpCall);
RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true);
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true });
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
// Act 1: First call - should receive the ToolApprovalRequestContent request
AgentSession session = await agent.CreateSessionAsync();
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
new ChatMessage(ChatRole.User, "Start"),
session).ToListAsync();
// Assert 1: We should have received a ToolApprovalRequestContent
AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u =>
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is ToolApprovalRequestContent));
updateWithRequest.Should().NotBeNull("a ToolApprovalRequestContent should be present in the response updates");
ToolApprovalRequestContent receivedRequest = updateWithRequest!.Contents
.OfType<ToolApprovalRequestContent>()
.First();
receivedRequest.RequestId.Should().EndWith($":{RequestId}");
// Act 2: Send the response back - use CreateResponse to get the right response type
ToolApprovalResponseContent responseContent = receivedRequest.CreateResponse(approved: true);
ChatMessage responseMessage = new(ChatRole.User, [responseContent]);
// Act 2: Run the workflow again with the response and capture the updates
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(responseMessage, session).ToListAsync();
// Assert 2: The response should be applied so that the original request is no longer pending
secondCallUpdates.Should().NotBeEmpty("handling the user input response should produce follow-up updates");
bool requestStillPresent = secondCallUpdates.Any(u =>
u.RawRepresentation is RequestInfoEvent
&& u.Contents.OfType<ToolApprovalRequestContent>().Any(r => r.RequestId == receivedRequest.RequestId));
requestStillPresent.Should().BeFalse("the original ToolApprovalRequestContent should not be re-emitted after its response is processed");
}
/// <summary>
/// Tests the mixed-message scenario: resume contains both an external response
/// (FunctionResultContent matching a pending request) and regular non-response content
/// in the same message.
/// Verifies that regular content is still processed and that no duplicate
/// pending-request errors, redundant FunctionCallContent re-emissions,
/// or workflow errors occur.
/// </summary>
[Fact]
public async Task Test_AsAgent_MixedResponseAndRegularMessage_BothProcessedAsync()
{
// Arrange: Create an agent that emits a FunctionCallContent request
const string CallId = "mixed-call-id";
const string FunctionName = "mixedTestFunction";
FunctionCallContent requestContent = new(CallId, FunctionName);
RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true);
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
// Act 1: First call - should receive the FunctionCallContent request
AgentSession session = await agent.CreateSessionAsync();
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
new ChatMessage(ChatRole.User, "Start"),
session).ToListAsync();
// Assert 1: We should have received a FunctionCallContent
AgentResponseUpdate requestUpdate = firstCallUpdates.First(u =>
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent));
FunctionCallContent emittedRequest = requestUpdate.Contents.OfType<FunctionCallContent>().Single();
firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent),
"the first call should emit a FunctionCallContent request");
// Act 2: Send a mixed message containing both the function result AND regular non-response content
FunctionResultContent responseContent = new(emittedRequest.CallId, "tool output");
ChatMessage mixedMessage = new(ChatRole.Tool, [responseContent, new TextContent("additional context")]);
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(mixedMessage, session).ToListAsync();
// Assert 2: The workflow should have processed both parts without errors
secondCallUpdates.Should().NotBeEmpty("the mixed message should produce follow-up updates");
secondCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Should()
.NotContain(c => c.CallId == emittedRequest.CallId, "the external FunctionCallContent should be cleared after the response is processed");
secondCallUpdates
.SelectMany(u => u.Contents.OfType<ErrorContent>())
.Should()
.BeEmpty("no workflow errors should occur when processing a mixed response-and-regular message");
}
[Fact]
public async Task Test_AsAgent_ResponseThenRegularAcrossMessages_NoDuplicateFunctionCallAsync()
{
const string CallId = "mixed-separate-call-id";
const string FunctionName = "mixedSeparateTestFunction";
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true);
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
AgentSession session = await agent.CreateSessionAsync();
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync();
FunctionCallContent emittedRequest = firstCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Single();
ChatMessage[] resumeMessages =
[
new(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]),
new(ChatRole.Tool, [new TextContent("extra context in separate message")])
];
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(resumeMessages, session).ToListAsync();
secondCallUpdates.Should().NotBeEmpty();
secondCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Should()
.NotContain(c => c.CallId == emittedRequest.CallId, "response+regular content split across messages should not re-emit the handled external request");
secondCallUpdates
.SelectMany(u => u.Contents.OfType<ErrorContent>())
.Should()
.BeEmpty();
}
[Fact]
public async Task Test_AsAgent_MatchingResponse_DoesNotCauseExtraTurnAsync()
{
const string CallId = "matching-response-call-id";
const string FunctionName = "matchingResponseFunction";
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: false);
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
AgentSession session = await agent.CreateSessionAsync();
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync();
FunctionCallContent emittedRequest = firstCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Single();
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(
new ChatMessage(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]),
session).ToListAsync();
int functionCallCount = secondCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Count(c => c.CallId == emittedRequest.CallId);
functionCallCount.Should().Be(1, "a matching external response should not trigger an extra TurnToken-driven turn");
}
[Fact]
public async Task Test_AsAgent_MixedResponseAndRegularMessage_CrossExecutorStartExecutorIsReawakenedAsync()
{
const string StartExecutorId = "start-executor";
const string KickoffInputText = "Start";
const string KickoffMessageText = "kickoff downstream";
const string ResumeRegularText = "resume regular";
const string ResumeProcessedText = "regular message processed";
const string CallId = "cross-executor-call-id";
const string FunctionName = "crossExecutorFunction";
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true);
ExecutorBinding requestBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
KickoffOnStartExecutor startExecutor = new(
StartExecutorId,
requestBinding.Id,
KickoffInputText,
KickoffMessageText,
ResumeRegularText,
ResumeProcessedText);
ExecutorBinding startBinding = startExecutor.BindExecutor();
Workflow workflow = new WorkflowBuilder(startBinding)
.AddEdge<List<ChatMessage>>(startBinding, requestBinding, messages =>
messages?.Any(message => message.Contents.OfType<TextContent>().Any(content => content.Text == KickoffMessageText)) == true)
.AddEdge<TurnToken>(startBinding, requestBinding, _ => true)
.Build();
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
AgentSession session = await agent.CreateSessionAsync();
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
new ChatMessage(ChatRole.User, KickoffInputText),
session).ToListAsync();
FunctionCallContent emittedRequest = firstCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Single();
ChatMessage[] resumeMessages =
[
new(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]),
new(ChatRole.User, ResumeRegularText)
];
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(resumeMessages, session).ToListAsync();
List<string> textContents = [.. secondCallUpdates.SelectMany(update => update.Contents.OfType<TextContent>()).Select(content => content.Text)];
textContents.Should().Contain(ResumeProcessedText, "the start executor should receive an explicit TurnToken when the matched response wakes a different executor");
textContents.Should().Contain("Request processed", "the matched external response should still be delivered to the downstream request owner");
secondCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Should()
.NotContain(c => c.CallId == emittedRequest.CallId, "the handled external request should not be re-emitted while waking the start executor");
secondCallUpdates.SelectMany(u => u.Contents.OfType<ErrorContent>()).Should().BeEmpty();
}
[Fact]
public async Task Test_AsAgent_UnmatchedResponse_TriggersTurnAndKeepsProgressingAsync()
{
const string CallId = "unmatched-response-call-id";
const string FunctionName = "unmatchedResponseFunction";
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: false);
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
AgentSession session = await agent.CreateSessionAsync();
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync();
firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent));
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(
new ChatMessage(ChatRole.Tool, [new FunctionResultContent("different-call-id", "tool output")]),
session).ToListAsync();
int functionCallCount = secondCallUpdates
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Count(c => c.CallId == CallId);
functionCallCount.Should().Be(1, "an unmatched response should be treated as regular input and still drive a TurnToken continuation without workflow errors");
secondCallUpdates.SelectMany(u => u.Contents.OfType<ErrorContent>()).Should().BeEmpty();
}
/// <summary>
/// Tests that when a resume contains only an external response directed at a non-start executor
/// (no regular messages), the start executor still receives a TurnToken and is activated.
/// This is a regression test for the case where the TurnToken was previously skipped because
/// <c>HasRegularMessages</c> was <see langword="false"/>, leaving the start executor dormant.
/// </summary>
[Fact]
public async Task Test_AsAgent_ResponseOnlyToNonStartExecutor_StartExecutorIsStillActivatedAsync()
{
// Arrange
const string StartExecutorId = "start-executor";
const string ActivatedMarker = "start-executor-activated";
const string CallId = "response-only-call-id";
const string FunctionName = "responseOnlyFunction";
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true);
ExecutorBinding requestBinding = requestAgent.BindAsExecutor(
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
TurnTrackingStartExecutor startExecutor = new(StartExecutorId, requestBinding.Id, ActivatedMarker);
ExecutorBinding startBinding = startExecutor.BindExecutor();
Workflow workflow = new WorkflowBuilder(startBinding)
.AddEdge<List<ChatMessage>>(startBinding, requestBinding, messages =>
messages?.Any(m => m.Contents.OfType<TextContent>().Any()) == true)
.AddEdge<TurnToken>(startBinding, requestBinding, _ => true)
.Build();
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
// Act 1: First call triggers the downstream FunctionCallContent request
AgentSession session = await agent.CreateSessionAsync();
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
new ChatMessage(ChatRole.User, "Start"),
session).ToListAsync();
FunctionCallContent emittedRequest = firstCallUpdates
.Where(u => u.RawRepresentation is RequestInfoEvent)
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
.Single();
// Act 2: Resume with ONLY the external response (no regular messages)
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(
new ChatMessage(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]),
session).ToListAsync();
// Assert: Both the downstream and start executor should have been activated
List<string> textContents = [.. secondCallUpdates
.SelectMany(u => u.Contents.OfType<TextContent>())
.Select(c => c.Text)];
textContents.Should().Contain("Request processed",
"the downstream executor should process the external response");
textContents.Should().Contain(ActivatedMarker,
"the start executor should receive a TurnToken and be activated even when resume contains only an external response");
secondCallUpdates
.SelectMany(u => u.Contents.OfType<ErrorContent>())
.Should()
.BeEmpty();
}
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task Test_AsAgent_FailsWhenNotChatProtocolAsync(bool runAsync)
{
// Arrange
NonChatProtocolExecutor executor = new();
executor.DescribeProtocol().IsChatProtocol().Should().BeFalse();
Workflow workflow = new WorkflowBuilder(executor).Build();
AIAgent workflowAsAgent = workflow.AsAIAgent();
Func<Task> action = runAsync
? () => workflowAsAgent.RunStreamingAsync().ToAgentResponseAsync()
: () => workflowAsAgent.RunAsync();
await action.Should().ThrowAsync<InvalidOperationException>();
}
private async Task Run_AsAgent_OutgoingMessagesInHistoryAsync(Workflow workflow, bool runAsync)
{
// Arrange
AIAgent workflowAgent = workflow.AsAIAgent();
// Act
AgentSession session = await workflowAgent.CreateSessionAsync();
AgentResponse response;
if (runAsync)
{
List<AgentResponseUpdate> updates = [];
await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(session))
{
// Skip WorkflowEvent updates, which do not get persisted in ChatHistory; we cannot skip
// them after because of a deleterious interaction with .ToAgentResponse() due to the
// empty initial message (which is created without a MessageId). When running through the
// message merger, it does the right thing internally.
if (!string.IsNullOrEmpty(update.Text))
{
updates.Add(update);
}
}
response = updates.ToAgentResponse();
}
else
{
response = await workflowAgent.RunAsync(session);
}
// Assert
WorkflowSession workflowSession = session.Should().BeOfType<WorkflowSession>().Subject;
ChatMessage[] responseMessages = response.Messages.Where(message => message.Contents.Any())
.ToArray();
ChatMessage[] sessionMessages = workflowSession.ChatHistoryProvider.GetAllMessages(workflowSession)
.ToArray();
// Since we never sent an incoming message, the expectation is that there should be nothing in the session
// except the response
responseMessages.Should().BeEquivalentTo(sessionMessages, options => options.WithStrictOrdering());
}
[Theory]
[InlineData(false)]
[InlineData(true)]
public Task Test_SingleAgent_AsAgent_OutgoingMessagesInHistoryAsync(bool runAsync)
{
// Arrange
TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName);
Workflow singleAgentWorkflow = new WorkflowBuilder(agent).Build();
return this.Run_AsAgent_OutgoingMessagesInHistoryAsync(singleAgentWorkflow, runAsync);
}
[Theory]
[InlineData(false)]
[InlineData(true)]
public Task Test_Handoffs_AsAgent_OutgoingMessagesInHistoryAsync(bool runAsync)
{
// Arrange
TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName);
Workflow handoffWorkflow = new HandoffWorkflowBuilder(agent).Build();
return this.Run_AsAgent_OutgoingMessagesInHistoryAsync(handoffWorkflow, runAsync);
}
// ----- Phase 5: Workflow-as-Agent intermediate forwarding -----------------
[Collection(Futures.FuturesSerialCollection.Name)]
public class IntermediateForwarding
{
private const string InterText = "progress";
private const string FinalText = "final";
private static async Task<List<AgentResponseUpdate>> RunStreamingAsync(
Workflow workflow,
bool includeWorkflowOutputsInResponse = false)
{
return await workflow
.AsAIAgent("WorkflowAgent", includeWorkflowOutputsInResponse: includeWorkflowOutputsInResponse)
.RunStreamingAsync(new ChatMessage(ChatRole.User, "hi"))
.ToListAsync();
}
[Fact]
public async Task Test_WorkflowHostAgent_IntermediateAgentResponseForwardedInStreamingAsync()
{
using Futures.FuturesScope _ = new(enabled: true);
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText));
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
Workflow workflow = new WorkflowBuilder(binding)
.WithIntermediateOutputFrom([binding])
.Build();
// Under Futures-on, AgentResponseEvent mirrors AgentResponseUpdateEvent: always
// forwarded regardless of the include flag. The intermediate tag is observable on
// the surfaced event for consumers that care to distinguish.
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: false);
updates.Any(u => u.RawRepresentation is AgentResponseEvent are && are.IsIntermediate() && u.Text == InterText)
.Should().BeTrue("AgentResponseEvent is forwarded under Futures-on regardless of the include flag");
}
[Fact]
public async Task Test_WorkflowHostAgent_TerminalAgentResponseForwardedUnconditionallyWhenFuturesOnAsync()
{
using Futures.FuturesScope _ = new(enabled: true);
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(FinalText));
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
Workflow workflow = new WorkflowBuilder(binding)
.WithOutputFrom(binding)
.Build();
// Even a terminal-only designation surfaces without the include flag — the gating
// asymmetry between AgentResponse and AgentResponseUpdate is gone under Futures-on.
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: false);
updates.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText)
.Should().BeTrue("terminal AgentResponseEvent is forwarded under Futures-on regardless of the include flag");
}
[Fact]
public async Task Test_WorkflowHostAgent_TerminalAgentResponseGatedWhenFuturesOffAsync()
{
using Futures.FuturesScope _ = new(enabled: false);
static Workflow Build()
{
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(FinalText));
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
return new WorkflowBuilder(binding).WithOutputFrom(binding).Build();
}
// Legacy semantics: AgentResponseEvent stays behind the include flag when Futures
// is off. Two fresh workflows because in-process runs aren't reentrant.
List<AgentResponseUpdate> gated = await RunStreamingAsync(Build(), includeWorkflowOutputsInResponse: false);
gated.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText)
.Should().BeFalse("terminal AgentResponseEvent stays gated under Futures-off");
List<AgentResponseUpdate> included = await RunStreamingAsync(Build(), includeWorkflowOutputsInResponse: true);
included.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText)
.Should().BeTrue("opting in via includeWorkflowOutputsInResponse surfaces it");
}
[Fact]
public async Task Test_WorkflowHostAgent_UndesignatedExecutorEmitsNoAgentResponseEventWhenFuturesOnAsync()
{
using Futures.FuturesScope _ = new(enabled: true);
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText));
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
// No designation — under Futures-on, the AgentResponse is dropped by the filter.
Workflow workflow = new WorkflowBuilder(binding).Build();
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: true);
updates.Any(u => u.RawRepresentation is AgentResponseEvent)
.Should().BeFalse("an undesignated AIAgent executor produces no AgentResponseEvent under Futures-on");
}
[Fact]
public async Task Test_WorkflowHostAgent_UndesignatedAgentResponseSurfacesWhenFuturesOffAsync()
{
using Futures.FuturesScope _ = new(enabled: false);
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText));
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
Workflow workflow = new WorkflowBuilder(binding).Build();
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: true);
updates.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == InterText)
.Should().BeTrue("legacy bypass still emits AgentResponseEvent regardless of designation");
}
[Fact]
public async Task Test_WorkflowHostAgent_IntermediateTagAvailableViaRawRepresentationAsync()
{
using Futures.FuturesScope _ = new(enabled: true);
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText));
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
Workflow workflow = new WorkflowBuilder(binding)
.WithIntermediateOutputFrom([binding])
.Build();
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow);
AgentResponseUpdate progress = updates.First(u => u.RawRepresentation is AgentResponseEvent && u.Text == InterText);
AgentResponseEvent raw = (AgentResponseEvent)progress.RawRepresentation!;
raw.IsIntermediate().Should().BeTrue();
raw.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate });
}
}
}