mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
8ed2159c4b
* test: reshuffle .NET Workflow tests in preparation for Outputs overhaul Phase 1 of the .NET Workflows outputs overhaul (see working/implementation-plan.md). Pure moves/renames in dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests; no production code changes, no new test cases. The split keeps each orchestration mode in its own source file so the upcoming tag-aware and orchestration-default test additions land on clean diffs. Renames: * WorkflowBuilderSmokeTests.cs -> WorkflowBuilderTests.cs (with class rename to match). The scope is no longer "smoke"-only once subsequent phases add tag-aware builder tests. * InputWaiterAndOutputFilterTests.cs -> InputWaiterTests.cs + OutputFilterTests.cs. The file already declared the two test classes separately; this split simply gives each its own file so the output-filter cases have a dedicated home for tag-aware additions. Split of AgentWorkflowBuilderTests.cs: * AgentWorkflowBuilderTests.cs is now the outer `public static partial class AgentWorkflowBuilderTests` holding the shared test helpers (DoubleEchoAgent + session + WithBarrier variant, WorkflowRunResult, RunWorkflow* methods) bumped from `private` to `internal` so the new top-level GroupChatWorkflowBuilderTests in the same assembly can reach them. * AgentWorkflowBuilder.SequentialTests.cs (nested SequentialTests): BuildSequential_InvalidArguments_Throws, BuildSequential_AgentsRunInOrderAsync. * AgentWorkflowBuilder.ConcurrentTests.cs (nested ConcurrentTests): BuildConcurrent_InvalidArguments_Throws, BuildConcurrent_AgentsRunInParallelAsync. Sequential and Concurrent are kept as nested classes because they're modes of the same `AgentWorkflowBuilder` static factory and do not produce dedicated builder types. New file: * GroupChatWorkflowBuilderTests.cs (top-level): the existing BuildGroupChat_* and GroupChatManager_* cases moved out of the old AgentWorkflowBuilderTests file. They exercise the `GroupChatWorkflowBuilder` type (returned by `AgentWorkflowBuilder.CreateGroupChatBuilderWith`), so a dedicated top-level test class - matching the convention reserved by the plan for HandoffWorkflowBuilderTests / MagenticWorkflowBuilderTests - is the right home. Cross-class helper references qualify with `AgentWorkflowBuilderTests.DoubleEchoAgent` and `AgentWorkflowBuilderTests.RunWorkflowAsync`. The outer partial class is `static` (and nested classes carry the instance test methods) because the outer holds only static helpers; this satisfies CA1052 without suppressions and is invisible to xUnit discovery, which finds tests on the nested classes as `AgentWorkflowBuilderTests.SequentialTests.*` etc. Validation: `dotnet build` clean on both target frameworks; all 547 tests in Microsoft.Agents.AI.Workflows.UnitTests pass on net10.0. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat: introduce OutputTag, Futures, and tag-aware WorkflowBuilder API Phase 2 of the .NET Workflows outputs overhaul. Additive code change only - no observable runtime behavior change. The runner still uses the legacy bypass for AgentResponse / AgentResponseUpdate payloads, and the new `Futures.EnableAgentResponseOutputTaggingAndFiltering` flag defaults to false. Phase 3 will wire the flag into the runner; this commit only introduces the types and the builder API. New public surface: * `OutputTag` (readonly struct): wraps a string Value with ordinal equality (IEquatable, GetHashCode, == / !=) so it can participate as a HashSet element. Internal ctor closes the set. One public singleton: `OutputTag.Intermediate`. Terminal / regular outputs carry no tag (empty Tags set). JSON-serialized as a bare string via [JsonConverter(typeof(OutputTagJsonConverter))], with the converter rehydrating to the well-known singleton on read. * `Futures` (static class): hosts opt-in pre-GA behavior switches. First flag is `EnableAgentResponseOutputTaggingAndFiltering`; XML doc captures the v2.0.0 obsoletion / v3.0.0 removal lifecycle. * `WorkflowOutputEvent.Tags`: `HashSet<OutputTag>` exposed directly (concrete collection, matches the JSON-serialization convention used for `WorkflowInfo.OutputExecutorIds`). Never null; empty for legacy / terminal events. New ctors take a single `OutputTag` or `IEnumerable<OutputTag>?`; the existing (data, executorId) ctor remains and produces an untagged event. `HasTag(OutputTag)` helper. `AgentResponseEvent` and `AgentResponseUpdateEvent` gain matching tag-accepting ctors forwarding to the base. * `WorkflowOutputEventExtensions.IsIntermediate(this WorkflowOutputEvent)`: extension method returning `evt.HasTag(OutputTag.Intermediate)`. The preferred way to ask "is this an intermediate output?" without reaching into the Tags set. * `WorkflowBuilder.WithOutputFrom(IEnumerable<ExecutorBinding>, OutputTag)` and `WorkflowBuilder.WithOutputFrom(ExecutorBinding, OutputTag)`: forward-looking tagged overloads. The IEnumerable form is the primary tagged surface; the single-executor form is a convenience for the common one-executor case. Currently usable for the `OutputTag.Intermediate` singleton; will become the primary surface once the `OutputTag` constructor is opened to user-defined tags in a future release. Callers in this release should prefer the intent-specific `WithIntermediateOutputFrom` extension for the intermediate case. Tags accumulate across repeated calls; same tag repeated dedupes via the HashSet. * `WorkflowBuilderExtensions.WithIntermediateOutputFrom(this WorkflowBuilder, IEnumerable<ExecutorBinding>)`: helper that forwards to `WithOutputFrom(executors, OutputTag.Intermediate)`. Takes an IEnumerable (matching the tagged WithOutputFrom shape) - callers pass collection literals: `builder.WithIntermediateOutputFrom([a, b])`. XML doc remarks call out the Futures-flag interaction and the AIAgent-payload forwarding contract. Internal shape changes: * `WorkflowBuilder._outputExecutors`: HashSet<string> -> Dictionary< string, HashSet<OutputTag>>. The value set is empty for executors designated only via the untagged WithOutputFrom; contains Intermediate (and possibly future tags) otherwise. * `Workflow.OutputExecutors`: HashSet<string> -> Dictionary<string, HashSet<OutputTag>>. * `OutputFilter.CanOutput`: `Contains(id)` -> `ContainsKey(id)`. * `WorkflowInfo.OutputExecutorIds`: HashSet<string> -> Dictionary< string, HashSet<OutputTag>>, with a custom JsonConverter that reads both the new map shape (`{id: ["intermediate", ...]}`) and the legacy array shape (`[id1, id2]`, where each id is treated as an untagged output). Always writes the map shape. IsMatch updated to compare per-id tag sets. Tests landing in this commit (per the test-with-feature principle): * `OutputTagTests.cs` (6 tests): KnownValues, EqualityIsOrdinalOnValue, DefaultStructValueIsDistinct (default(OutputTag) does not collide with the Intermediate singleton in a HashSet), GetHashCodeMatchesEquals, JsonConverter_RoundtripsValueAsString, ConstructorIsInternal (reflection-based assertion that the (string) ctor is `internal`). * `WorkflowBuilderTests.cs` adds 7 new tests pinning the builder API contract: RegistersWithEmptyTagSet, AddsIntermediateTag, MultipleExecutorsAllUntagged, ThenIntermediate_AccumulatesTags, RepeatedDedupes, OnlyRegistersWithoutPriorWithOutputFrom, TracksExecutorBinding. * `BackwardsCompatibility/JsonCheckpointSerializationTests.cs` (new folder + file, 5 tests): event-level ctor contract tests (single-tag, no-tag, multi-tag — the last with a custom tag); IsIntermediate() asserted; load-bearing JSON BC tests for `WorkflowInfo.OutputExecutorIds` - `WorkflowOutputExecutorsReadsLegacyArrayShape` (legacy ids map to empty tag sets) and `WorkflowOutputExecutorsWritesMapShape`. The plan's three JSON round-trip tests for `WorkflowOutputEvent.Tags` were dropped: `WorkflowEvent` is not currently a serialized checkpoint shape (see the comment in WorkflowsJsonUtilities.cs about events not being persisted), so there is no real back-compat surface to pin through JSON. They are substituted with in-process ctor/property round-trip tests that exercise the `Tags` / `HasTag` / `IsIntermediate` contract. Validation: full `Microsoft.Agents.AI.Workflows.UnitTests` suite runs green on net10.0 (565 passing, 0 failing). Core library builds clean on net472, netstandard2.0, net8.0, net9.0, and net10.0. Test project builds clean on net472 + net10.0. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat: route AgentResponse(Update) through the output filter under a Futures flag `InProcessRunnerContext.YieldOutputAsync` historically special-cased AgentResponse and AgentResponseUpdate payloads: it built the typed event subclass and emitted it directly, bypassing the output filter. Rewrites the method so that: - When `Futures.EnableAgentResponseOutputTaggingAndFiltering` is `false` (the current default), AgentResponse(Update) keep the legacy bypass — emitted as AgentResponseEvent / AgentResponseUpdateEvent with no tags. Existing callers see no behavior change. - When the flag is `true`, AIAgent payloads flow through the output filter just like every other payload type: undesignated sources are dropped, and the emitted event carries the source's tag set (empty for terminal `WithOutputFrom`, `{Intermediate}` for `WithIntermediateOutputFrom`, the set union when both designations apply). Non-AIAgent (POCO) outputs also now carry the source's tag set on the emitted WorkflowOutputEvent unconditionally — additive, since no existing assertion inspected Tags. Subclass events (`AgentResponseEvent` / `AgentResponseUpdateEvent`) continue to be emitted under both modes so `switch (evt) { case AgentResponseEvent: ... }` consumer code keeps matching. Adds `OutputFilter.TryGetTags` as the tag-aware lookup used by the runner. `OutputFilter.CanOutput` is kept (still used by the existing sync tests in `OutputFilterTests.cs`). Tests ----- - `Futures/Futures.AgentResponseOutputFilteringAndTaggingTests.cs` (new): the F1–F13 matrix from the plan, covering every combination of `(flag on/off) × (designation) × (payload shape)`. Uses a `FuturesScope` IDisposable + a `FuturesSerial` xUnit collection (DisableParallelization = true) to keep the process-global flag from leaking across parallel tests. - `OutputFilterTests.cs`: four new `Test_OutputFilter_…` cases for the `TryGetTags` surface (empty-tag-set for terminal designation, `{Intermediate}` for intermediate designation, union for accumulated designation, `false` for unregistered). 582/582 unit tests pass on net10.0 (565 baseline + 17 new). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat: tag-aware defaults and designation API on orchestration builders Aligns the .NET orchestration builders with Python's output / intermediate-output distinction. Each builder either applies a Python-aligned default designation set or replays the user's explicit `WithOutputFrom` / `WithIntermediateOutputFrom` calls, never both. Static `AgentWorkflowBuilder.BuildSequential` / `BuildConcurrent` apply defaults unconditionally (no user-facing fluent surface to take control through): - Sequential: terminal `end` + every agent designated intermediate. - Concurrent: terminal `end` + every agent and per-agent accumulator designated intermediate. The three fluent instance builders memoize agent-typed designation calls in a `Dictionary<AIAgent, HashSet<OutputTag>>` (empty set = terminal-only, non-empty = intermediate tag(s)) so repeated calls dedupe naturally. They replay the entries at `Build()` time, suppressing defaults when any call has been made: - `HandoffWorkflowBuilder` / `HandoffWorkflowBuilderCore<TBuilder>` (also picked up by the obsolete `HandoffsWorkflowBuilder` via inheritance). Default: terminal `HandoffEnd` + every handoff agent intermediate. (Bug fix: legacy code relied on `WithOutputFrom(end)` to bind `HandoffEnd`. The new explicit-designation path bypasses that, so `Build()` now calls `BindExecutor(end)` unconditionally to keep validation happy.) - `GroupChatWorkflowBuilder` — default: terminal host + every participant intermediate. - `MagenticWorkflowBuilder` — default: terminal orchestrator + every team member intermediate. Designating a non-participant agent throws `InvalidOperationException`. The bare `WorkflowBuilder` default is unchanged — only the orchestration-style builders gain implicit defaults, matching the plan's non-goal. Tests ----- - `AgentWorkflowBuilder.SequentialTests` / `.ConcurrentTests`: one default-spec assertion each. - `GroupChatWorkflowBuilderTests`: defaults-match-spec, explicit-replaces-defaults, non-participant throws. - `HandoffWorkflowBuilderTests` (new file): same three. - `MagenticWorkflowBuilderTests` (new file): same three. 593/593 unit tests pass on net10.0 (582 baseline + 11 new). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat: WorkflowHostAgent forwards AgentResponseEvent unconditionally under Futures-on Aligns the .NET Workflow-as-Agent surface with Python `as_agent`. Under `Futures.EnableAgentResponseOutputTaggingAndFiltering = true`, `WorkflowSession.InvokeStageAsync` now forwards `AgentResponseEvent` unconditionally — joining `AgentResponseUpdateEvent` in ignoring the host's `includeWorkflowOutputsInResponse` switch. That switch keeps governing the generic `WorkflowOutputEvent` path for non-AIAgent payloads, where it is further short-circuited by an `IsIntermediate()` check (tagged intermediate outputs always surface). Under Futures-off the legacy asymmetry is preserved: `AgentResponseUpdateEvent` always forwarded, `AgentResponseEvent` gated by `includeWorkflowOutputsInResponse`. Back-compat: with `Futures.EnableAgentResponseOutputTaggingAndFiltering` left at its default `false`, observable behavior is identical to before. `Futures` documentation gains a remark explaining the `Workflow.AsAIAgent()` interaction in both flag states. Runner fix ---------- `InProcessRunnerContext.YieldOutputAsync` now skips `Executor.CanOutput` for AgentResponse-shaped payloads under both Futures branches. `AIAgentHostExecutor` doesn't declare AgentResponse(Update) in its `Yields` set, so the historical legacy bypass had silently skipped the check; Phase 3's Futures-on path was running it and would reject AIAgent payloads. AIAgent-shaped payloads are now always a valid output shape, matching the legacy bypass semantics. Phase 4 follow-on ----------------- Switched the three orchestration-builder designation-replay loops to iterate `Dictionary.Keys` with a value lookup instead of constructing/destructuring `KeyValuePair<,>`. Cleaner shape and avoids the netstandard2.0 / net472 `KeyValuePair<,>.Deconstruct` unavailability that surfaced when this branch multi-TFM-built. Tests ----- `WorkflowHostSmokeTests.IntermediateForwarding` (new nested class, 6 tests): - intermediate AgentResponse forwarded past the include-outputs gate (Futures on) - terminal AgentResponse forwarded unconditionally (Futures on) - terminal AgentResponse gated by include flag (Futures off, legacy) - undesignated AIAgent executor emits no AgentResponseEvent under Futures-on - legacy bypass still emits AgentResponseEvent under Futures-off - intermediate tag is observable via `update.RawRepresentation` The class joins the `FuturesSerial` xUnit collection so the process-global flag is serialized against other Futures-toggling tests. 599/599 unit tests pass on net10.0 (593 baseline + 6 new). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat: SequentialWorkflowBuilder and ConcurrentWorkflowBuilder, OrchestrationBuilderBase Promotes the Sequential and Concurrent orchestration shapes to first-class fluent builder classes, matching Handoff / GroupChat / Magentic. Users can call `WithOutputFrom(agents)` / `WithIntermediateOutputFrom(agents)` to control which agents are designated output / intermediate sources; when no designation call is made, the Python-aligned defaults apply (terminal aggregator output + every agent intermediate; Concurrent also tags per-agent accumulators). `AgentWorkflowBuilder.BuildSequential(...)` and `BuildConcurrent(...)` are kept and now delegate to the new builders; observable behavior unchanged. Five static factories now mirror each other: - `AgentWorkflowBuilder.CreateSequentialBuilderWith(params IEnumerable<AIAgent>)` - `AgentWorkflowBuilder.CreateConcurrentBuilderWith(params IEnumerable<AIAgent>)` - `AgentWorkflowBuilder.CreateHandoffBuilderWith(AIAgent)` (already existed) - `AgentWorkflowBuilder.CreateGroupChatBuilderWith(Func<...>)` (already existed) - `AgentWorkflowBuilder.CreateMagenticBuilderWith(AIAgent)` (new) OrchestrationBuilderBase ------------------------ New abstract `OrchestrationBuilderBase<TBuilder>` unifies the shared fluent surface across all five orchestration builders: `WithName`, `WithDescription`, `WithOutputFrom`, `WithIntermediateOutputFrom`, and the `ApplyOutputDesignations(builder, agentMap, kind, applyDefaults)` helper that either replays the user's designations or invokes the orchestration-specific defaults. Removes ~150 LOC of duplicated designation-management code from the four non-Handoff builders, plus the equivalent from `HandoffWorkflowBuilderCore`. Tests ----- - New `SequentialWorkflowBuilderTests.cs` / `ConcurrentWorkflowBuilderTests.cs` (replace the old `AgentWorkflowBuilder.{Sequential,Concurrent}Tests.cs` nested-class files). Method names normalized to `Test_<BuilderType>_<Scenario>[Async]`. - Shared helpers (`DoubleEchoAgent`, `DoubleEchoAgentWithBarrier`, `WorkflowRunResult`, `RunWorkflow*`) moved from the old `AgentWorkflowBuilderTests` partial class into a new `OrchestrationTestHelpers` static class in `OrchestrationTestHelpers.cs`. Downstream test files (Group Chat, Handoff, Sequential, Concurrent) updated to qualify with `OrchestrationTestHelpers.*`. - A new `AgentWorkflowBuilderTests.cs` covers the static surface directly: `BuildSequential` / `BuildConcurrent` invariants and aggregator wiring, plus null-rejection + round-trip checks for every `Create*BuilderWith` factory. - New AsAgent intermediate-suppression tests on a nested `AsAgentForwarding` class for each of Sequential and Concurrent: build with only the terminal agent designated via `WithOutputFrom`, run via `AsAIAgent(...)`, assert via `AgentResponseUpdate.AuthorName` that intermediate agents do not surface. Both join the `FuturesSerial` collection. - New `Test_<Builder>_WithDescriptionPropagatesToWorkflow` smoke tests on Sequential and Concurrent (newly available via the base class). 625/625 unit tests pass on net10.0. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * chore: dotnet format * fixup: encoding * fixup: charset * fixup: Updates for PR feedback * fixup: format * fixup: merge issue * Fix intermediate filtering on .AsAgent() * fix filter logic * fix: Revert logic change and add comments --------- Co-authored-by: Jacob Alber <jalber@lokitoth.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
954 lines
47 KiB
C#
954 lines
47 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Runtime.CompilerServices;
|
|
using System.Text.Json;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using FluentAssertions;
|
|
using Microsoft.Extensions.AI;
|
|
|
|
namespace Microsoft.Agents.AI.Workflows.UnitTests;
|
|
|
|
public sealed class ExpectedException : Exception
|
|
{
|
|
public ExpectedException(string message)
|
|
: base(message)
|
|
{
|
|
}
|
|
|
|
public ExpectedException() : base()
|
|
{
|
|
}
|
|
|
|
public ExpectedException(string? message, Exception? innerException) : base(message, innerException)
|
|
{
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// A simple agent that emits a FunctionCallContent or ToolApprovalRequestContent request.
|
|
/// Used to test that RequestInfoEvent handling preserves the original content type.
|
|
/// </summary>
|
|
internal sealed class RequestEmittingAgent : AIAgent
|
|
{
|
|
private readonly AIContent _requestContent;
|
|
private readonly bool _completeOnResponse;
|
|
|
|
/// <summary>
|
|
/// Creates a new <see cref="RequestEmittingAgent"/> that emits the given request content.
|
|
/// </summary>
|
|
/// <param name="requestContent">The content to emit on each turn.</param>
|
|
/// <param name="completeOnResponse">
|
|
/// When <see langword="true"/>, the agent emits a text completion instead of re-emitting
|
|
/// the request when the incoming messages contain a <see cref="FunctionResultContent"/>
|
|
/// or <see cref="ToolApprovalResponseContent"/>. This models realistic agent behaviour
|
|
/// where the agent processes the tool result and produces a final answer.
|
|
/// </param>
|
|
public RequestEmittingAgent(AIContent requestContent, bool completeOnResponse = false)
|
|
{
|
|
this._requestContent = requestContent;
|
|
this._completeOnResponse = completeOnResponse;
|
|
}
|
|
|
|
private sealed class Session : AgentSession
|
|
{
|
|
public Session() { }
|
|
}
|
|
|
|
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
|
|
=> new(new Session());
|
|
|
|
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default)
|
|
=> new(new Session());
|
|
|
|
protected override ValueTask<JsonElement> SerializeSessionCoreAsync(AgentSession session, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
|
|
=> default;
|
|
|
|
protected override Task<AgentResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
|
|
=> this.RunStreamingAsync(messages, session, options, cancellationToken).ToAgentResponseAsync(cancellationToken);
|
|
|
|
protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
|
{
|
|
if (this._completeOnResponse && messages.Any(m => m.Contents.Any(c =>
|
|
c is FunctionResultContent || c is ToolApprovalResponseContent)))
|
|
{
|
|
yield return new AgentResponseUpdate(ChatRole.Assistant, [new TextContent("Request processed")]);
|
|
}
|
|
else
|
|
{
|
|
// Emit the request content
|
|
yield return new AgentResponseUpdate(ChatRole.Assistant, [this._requestContent]);
|
|
}
|
|
}
|
|
}
|
|
|
|
internal sealed class KickoffOnStartExecutor : ChatProtocolExecutor
|
|
{
|
|
private static readonly ChatProtocolExecutorOptions s_options = new()
|
|
{
|
|
AutoSendTurnToken = false,
|
|
};
|
|
|
|
private readonly string _downstreamExecutorId;
|
|
private readonly string _kickoffInputText;
|
|
private readonly string _kickoffMessageText;
|
|
private readonly string _regularResumeText;
|
|
private readonly string _regularProcessedText;
|
|
|
|
public KickoffOnStartExecutor(
|
|
string id,
|
|
string downstreamExecutorId,
|
|
string kickoffInputText,
|
|
string kickoffMessageText,
|
|
string regularResumeText,
|
|
string regularProcessedText)
|
|
: base(id, s_options)
|
|
{
|
|
this._downstreamExecutorId = downstreamExecutorId;
|
|
this._kickoffInputText = kickoffInputText;
|
|
this._kickoffMessageText = kickoffMessageText;
|
|
this._regularResumeText = regularResumeText;
|
|
this._regularProcessedText = regularProcessedText;
|
|
}
|
|
|
|
protected override async ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)
|
|
{
|
|
List<string> textContents =
|
|
[
|
|
.. messages
|
|
.SelectMany(message => message.Contents.OfType<TextContent>())
|
|
.Select(content => content.Text)
|
|
];
|
|
|
|
if (textContents.Contains(this._kickoffInputText, StringComparer.Ordinal))
|
|
{
|
|
await context.SendMessageAsync(
|
|
new List<ChatMessage> { new(ChatRole.User, this._kickoffMessageText) },
|
|
this._downstreamExecutorId,
|
|
cancellationToken).ConfigureAwait(false);
|
|
await context.SendMessageAsync(
|
|
new TurnToken(emitEvents),
|
|
this._downstreamExecutorId,
|
|
cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
if (textContents.Contains(this._regularResumeText, StringComparer.Ordinal))
|
|
{
|
|
AgentResponseUpdate update = new(ChatRole.Assistant, [new TextContent(this._regularProcessedText)])
|
|
{
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
MessageId = Guid.NewGuid().ToString("N"),
|
|
ResponseId = Guid.NewGuid().ToString("N"),
|
|
Role = ChatRole.Assistant,
|
|
};
|
|
|
|
await context.AddEventAsync(new AgentResponseUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// A start executor that always emits a response update on every turn,
|
|
/// useful for verifying that a TurnToken was delivered by the session.
|
|
/// On the first turn (user messages present), it kicks off a downstream executor.
|
|
/// </summary>
|
|
internal sealed class TurnTrackingStartExecutor : ChatProtocolExecutor
|
|
{
|
|
private static readonly ChatProtocolExecutorOptions s_options = new()
|
|
{
|
|
AutoSendTurnToken = false,
|
|
};
|
|
|
|
private readonly string _downstreamExecutorId;
|
|
private readonly string _activatedMarker;
|
|
private int _activationCount;
|
|
|
|
/// <summary>Gets the number of times this executor has been activated (i.e., <see cref="TakeTurnAsync"/> called).</summary>
|
|
public int ActivationCount => this._activationCount;
|
|
|
|
public TurnTrackingStartExecutor(string id, string downstreamExecutorId, string activatedMarker)
|
|
: base(id, s_options)
|
|
{
|
|
this._downstreamExecutorId = downstreamExecutorId;
|
|
this._activatedMarker = activatedMarker;
|
|
}
|
|
|
|
protected override async ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)
|
|
{
|
|
Interlocked.Increment(ref this._activationCount);
|
|
|
|
// On the first turn, forward user messages and a TurnToken to the downstream executor.
|
|
if (messages.Any(m => m.Role == ChatRole.User))
|
|
{
|
|
await context.SendMessageAsync(
|
|
messages,
|
|
this._downstreamExecutorId,
|
|
cancellationToken).ConfigureAwait(false);
|
|
await context.SendMessageAsync(
|
|
new TurnToken(emitEvents),
|
|
this._downstreamExecutorId,
|
|
cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
// Always emit a marker to prove this executor was activated.
|
|
AgentResponseUpdate update = new(ChatRole.Assistant, [new TextContent(this._activatedMarker)])
|
|
{
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
MessageId = Guid.NewGuid().ToString("N"),
|
|
ResponseId = Guid.NewGuid().ToString("N"),
|
|
Role = ChatRole.Assistant,
|
|
};
|
|
|
|
await context.AddEventAsync(new AgentResponseUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
public class NonChatProtocolExecutor() : Executor<string>(nameof(NonChatProtocolExecutor))
|
|
{
|
|
public override ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
|
|
{
|
|
return default;
|
|
}
|
|
}
|
|
|
|
public class WorkflowHostSmokeTests : AIAgentHostingExecutorTestsBase
|
|
{
|
|
private sealed class AlwaysFailsAIAgent(bool failByThrowing) : AIAgent
|
|
{
|
|
private sealed class Session : AgentSession
|
|
{
|
|
public Session() { }
|
|
|
|
public Session(AgentSessionStateBag stateBag) : base(stateBag) { }
|
|
}
|
|
|
|
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
|
|
{
|
|
return new(serializedState.Deserialize<Session>(jsonSerializerOptions)!);
|
|
}
|
|
|
|
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
return new(new Session());
|
|
}
|
|
|
|
protected override ValueTask<JsonElement> SerializeSessionCoreAsync(AgentSession session, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
|
|
=> default;
|
|
|
|
protected override async Task<AgentResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
|
|
{
|
|
return await this.RunStreamingAsync(messages, session, options, cancellationToken)
|
|
.ToAgentResponseAsync(cancellationToken);
|
|
}
|
|
|
|
protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
|
{
|
|
const string ErrorMessage = "Simulated agent failure.";
|
|
if (failByThrowing)
|
|
{
|
|
throw new ExpectedException(ErrorMessage);
|
|
}
|
|
|
|
yield return new AgentResponseUpdate(ChatRole.Assistant, [new ErrorContent(ErrorMessage)]);
|
|
}
|
|
}
|
|
|
|
private static Workflow CreateWorkflow(bool failByThrowing)
|
|
{
|
|
ExecutorBinding agent = new AlwaysFailsAIAgent(failByThrowing).BindAsExecutor(emitEvents: true);
|
|
|
|
return new WorkflowBuilder(agent).Build();
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData(true, true)]
|
|
[InlineData(true, false)]
|
|
[InlineData(false, true)]
|
|
[InlineData(false, false)]
|
|
public async Task Test_AsAgent_ErrorContentStreamedOutAsync(bool includeExceptionDetails, bool failByThrowing)
|
|
{
|
|
string expectedMessage = !failByThrowing || includeExceptionDetails
|
|
? "Simulated agent failure."
|
|
: "An error occurred while executing the workflow.";
|
|
|
|
// Arrange is done by the caller.
|
|
Workflow workflow = CreateWorkflow(failByThrowing);
|
|
|
|
// Act
|
|
List<AgentResponseUpdate> updates = await workflow.AsAIAgent("WorkflowAgent", includeExceptionDetails: includeExceptionDetails)
|
|
.RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello"))
|
|
.ToListAsync();
|
|
|
|
// Assert
|
|
bool hadErrorContent = false;
|
|
foreach (AgentResponseUpdate update in updates)
|
|
{
|
|
if (update.Contents.Any())
|
|
{
|
|
// We should expect a single update which contains the error content.
|
|
update.Contents.Should().ContainSingle()
|
|
.Which.Should().BeOfType<ErrorContent>()
|
|
.Which.Message.Should().Be(expectedMessage);
|
|
hadErrorContent = true;
|
|
}
|
|
}
|
|
|
|
hadErrorContent.Should().BeTrue();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests that when a workflow emits a RequestInfoEvent with FunctionCallContent data,
|
|
/// the AgentResponseUpdate preserves the original FunctionCallContent type.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Test_AsAgent_FunctionCallContentPreservedInRequestInfoAsync()
|
|
{
|
|
// Arrange
|
|
const string CallId = "test-call-id";
|
|
const string FunctionName = "testFunction";
|
|
FunctionCallContent originalContent = new(CallId, FunctionName);
|
|
RequestEmittingAgent requestAgent = new(originalContent);
|
|
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
|
|
|
|
// Act
|
|
List<AgentResponseUpdate> updates = await workflow.AsAIAgent("WorkflowAgent")
|
|
.RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello"))
|
|
.ToListAsync();
|
|
|
|
// Assert
|
|
AgentResponseUpdate? updateWithFunctionCall = updates.FirstOrDefault(u =>
|
|
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent));
|
|
|
|
updateWithFunctionCall.Should().NotBeNull("a FunctionCallContent should be present in the response updates");
|
|
FunctionCallContent retrievedContent = updateWithFunctionCall!.Contents
|
|
.OfType<FunctionCallContent>()
|
|
.Should().ContainSingle()
|
|
.Which;
|
|
|
|
retrievedContent.CallId.Should().NotBe(CallId);
|
|
retrievedContent.CallId.Should().EndWith($":{CallId}");
|
|
retrievedContent.Name.Should().Be(FunctionName);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests that when a workflow emits a RequestInfoEvent with ToolApprovalRequestContent data,
|
|
/// the AgentResponseUpdate preserves the original ToolApprovalRequestContent type.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Test_AsAgent_ToolApprovalRequestContentPreservedInRequestInfoAsync()
|
|
{
|
|
// Arrange
|
|
const string RequestId = "test-request-id";
|
|
McpServerToolCallContent mcpCall = new("call-id", "testToolName", "http://localhost");
|
|
ToolApprovalRequestContent originalContent = new(RequestId, mcpCall);
|
|
RequestEmittingAgent requestAgent = new(originalContent);
|
|
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
|
|
|
|
// Act
|
|
List<AgentResponseUpdate> updates = await workflow.AsAIAgent("WorkflowAgent")
|
|
.RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello"))
|
|
.ToListAsync();
|
|
|
|
// Assert
|
|
AgentResponseUpdate? updateWithUserInput = updates.FirstOrDefault(u =>
|
|
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is ToolApprovalRequestContent));
|
|
|
|
updateWithUserInput.Should().NotBeNull("a ToolApprovalRequestContent should be present in the response updates");
|
|
ToolApprovalRequestContent retrievedContent = updateWithUserInput!.Contents
|
|
.OfType<ToolApprovalRequestContent>()
|
|
.Should().ContainSingle()
|
|
.Which;
|
|
|
|
retrievedContent.Should().NotBeNull();
|
|
retrievedContent.RequestId.Should().NotBe(RequestId);
|
|
retrievedContent.RequestId.Should().EndWith($":{RequestId}");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests the full roundtrip: workflow emits a request, external caller responds, workflow processes response.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Test_AsAgent_FunctionCallRoundtrip_ResponseIsProcessedAsync()
|
|
{
|
|
// Arrange: Create an agent that emits a FunctionCallContent request
|
|
const string CallId = "roundtrip-call-id";
|
|
const string FunctionName = "testFunction";
|
|
FunctionCallContent requestContent = new(CallId, FunctionName);
|
|
RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true);
|
|
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
|
|
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
|
|
|
|
// Act 1: First call - should receive the FunctionCallContent request
|
|
AgentSession session = await agent.CreateSessionAsync();
|
|
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
|
|
new ChatMessage(ChatRole.User, "Start"),
|
|
session).ToListAsync();
|
|
|
|
// Assert 1: We should have received a FunctionCallContent
|
|
AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u =>
|
|
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent));
|
|
updateWithRequest.Should().NotBeNull("a FunctionCallContent should be present in the response updates");
|
|
|
|
FunctionCallContent receivedRequest = updateWithRequest!.Contents
|
|
.OfType<FunctionCallContent>()
|
|
.First();
|
|
receivedRequest.CallId.Should().EndWith($":{CallId}");
|
|
|
|
// Act 2: Send the response back
|
|
FunctionResultContent responseContent = new(receivedRequest.CallId, "test result");
|
|
ChatMessage responseMessage = new(ChatRole.Tool, [responseContent]);
|
|
|
|
// Act 2: Run the workflow with the response and capture the resulting updates
|
|
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(responseMessage, session).ToListAsync();
|
|
|
|
// Assert 2: The response should be processed and the original request should no longer be pending.
|
|
// Concretely, the workflow should not re-emit a FunctionCallContent with the same CallId.
|
|
secondCallUpdates.Should().NotBeNull("processing the response should produce updates");
|
|
secondCallUpdates.Should().NotBeEmpty("processing the response should progress the workflow");
|
|
secondCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Should()
|
|
.NotContain(c => c.CallId == receivedRequest.CallId, "the external FunctionCallContent request should be cleared after processing the response");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests the full roundtrip for ToolApprovalRequestContent: workflow emits request, external caller responds.
|
|
/// Verifying inbound ToolApprovalResponseContent conversion.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Test_AsAgent_ToolApprovalRoundtrip_ResponseIsProcessedAsync()
|
|
{
|
|
// Arrange: Create an agent that emits a ToolApprovalRequestContent request
|
|
const string RequestId = "roundtrip-request-id";
|
|
McpServerToolCallContent mcpCall = new("mcp-call-id", "testMcpTool", "http://localhost");
|
|
ToolApprovalRequestContent requestContent = new(RequestId, mcpCall);
|
|
RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true);
|
|
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUserInputRequests = false, EmitAgentUpdateEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
|
|
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
|
|
|
|
// Act 1: First call - should receive the ToolApprovalRequestContent request
|
|
AgentSession session = await agent.CreateSessionAsync();
|
|
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
|
|
new ChatMessage(ChatRole.User, "Start"),
|
|
session).ToListAsync();
|
|
|
|
// Assert 1: We should have received a ToolApprovalRequestContent
|
|
AgentResponseUpdate? updateWithRequest = firstCallUpdates.FirstOrDefault(u =>
|
|
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is ToolApprovalRequestContent));
|
|
updateWithRequest.Should().NotBeNull("a ToolApprovalRequestContent should be present in the response updates");
|
|
|
|
ToolApprovalRequestContent receivedRequest = updateWithRequest!.Contents
|
|
.OfType<ToolApprovalRequestContent>()
|
|
.First();
|
|
receivedRequest.RequestId.Should().EndWith($":{RequestId}");
|
|
|
|
// Act 2: Send the response back - use CreateResponse to get the right response type
|
|
ToolApprovalResponseContent responseContent = receivedRequest.CreateResponse(approved: true);
|
|
ChatMessage responseMessage = new(ChatRole.User, [responseContent]);
|
|
|
|
// Act 2: Run the workflow again with the response and capture the updates
|
|
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(responseMessage, session).ToListAsync();
|
|
|
|
// Assert 2: The response should be applied so that the original request is no longer pending
|
|
secondCallUpdates.Should().NotBeEmpty("handling the user input response should produce follow-up updates");
|
|
bool requestStillPresent = secondCallUpdates.Any(u =>
|
|
u.RawRepresentation is RequestInfoEvent
|
|
&& u.Contents.OfType<ToolApprovalRequestContent>().Any(r => r.RequestId == receivedRequest.RequestId));
|
|
requestStillPresent.Should().BeFalse("the original ToolApprovalRequestContent should not be re-emitted after its response is processed");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests the mixed-message scenario: resume contains both an external response
|
|
/// (FunctionResultContent matching a pending request) and regular non-response content
|
|
/// in the same message.
|
|
/// Verifies that regular content is still processed and that no duplicate
|
|
/// pending-request errors, redundant FunctionCallContent re-emissions,
|
|
/// or workflow errors occur.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Test_AsAgent_MixedResponseAndRegularMessage_BothProcessedAsync()
|
|
{
|
|
// Arrange: Create an agent that emits a FunctionCallContent request
|
|
const string CallId = "mixed-call-id";
|
|
const string FunctionName = "mixedTestFunction";
|
|
FunctionCallContent requestContent = new(CallId, FunctionName);
|
|
RequestEmittingAgent requestAgent = new(requestContent, completeOnResponse: true);
|
|
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
|
|
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
|
|
|
|
// Act 1: First call - should receive the FunctionCallContent request
|
|
AgentSession session = await agent.CreateSessionAsync();
|
|
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
|
|
new ChatMessage(ChatRole.User, "Start"),
|
|
session).ToListAsync();
|
|
|
|
// Assert 1: We should have received a FunctionCallContent
|
|
AgentResponseUpdate requestUpdate = firstCallUpdates.First(u =>
|
|
u.RawRepresentation is RequestInfoEvent && u.Contents.Any(c => c is FunctionCallContent));
|
|
FunctionCallContent emittedRequest = requestUpdate.Contents.OfType<FunctionCallContent>().Single();
|
|
|
|
firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent),
|
|
"the first call should emit a FunctionCallContent request");
|
|
|
|
// Act 2: Send a mixed message containing both the function result AND regular non-response content
|
|
FunctionResultContent responseContent = new(emittedRequest.CallId, "tool output");
|
|
ChatMessage mixedMessage = new(ChatRole.Tool, [responseContent, new TextContent("additional context")]);
|
|
|
|
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(mixedMessage, session).ToListAsync();
|
|
|
|
// Assert 2: The workflow should have processed both parts without errors
|
|
secondCallUpdates.Should().NotBeEmpty("the mixed message should produce follow-up updates");
|
|
secondCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Should()
|
|
.NotContain(c => c.CallId == emittedRequest.CallId, "the external FunctionCallContent should be cleared after the response is processed");
|
|
secondCallUpdates
|
|
.SelectMany(u => u.Contents.OfType<ErrorContent>())
|
|
.Should()
|
|
.BeEmpty("no workflow errors should occur when processing a mixed response-and-regular message");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_AsAgent_ResponseThenRegularAcrossMessages_NoDuplicateFunctionCallAsync()
|
|
{
|
|
const string CallId = "mixed-separate-call-id";
|
|
const string FunctionName = "mixedSeparateTestFunction";
|
|
|
|
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true);
|
|
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
|
|
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
|
|
|
|
AgentSession session = await agent.CreateSessionAsync();
|
|
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync();
|
|
FunctionCallContent emittedRequest = firstCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Single();
|
|
|
|
ChatMessage[] resumeMessages =
|
|
[
|
|
new(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]),
|
|
new(ChatRole.Tool, [new TextContent("extra context in separate message")])
|
|
];
|
|
|
|
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(resumeMessages, session).ToListAsync();
|
|
|
|
secondCallUpdates.Should().NotBeEmpty();
|
|
secondCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Should()
|
|
.NotContain(c => c.CallId == emittedRequest.CallId, "response+regular content split across messages should not re-emit the handled external request");
|
|
secondCallUpdates
|
|
.SelectMany(u => u.Contents.OfType<ErrorContent>())
|
|
.Should()
|
|
.BeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_AsAgent_MatchingResponse_DoesNotCauseExtraTurnAsync()
|
|
{
|
|
const string CallId = "matching-response-call-id";
|
|
const string FunctionName = "matchingResponseFunction";
|
|
|
|
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: false);
|
|
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
|
|
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
|
|
|
|
AgentSession session = await agent.CreateSessionAsync();
|
|
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync();
|
|
FunctionCallContent emittedRequest = firstCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Single();
|
|
|
|
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(
|
|
new ChatMessage(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]),
|
|
session).ToListAsync();
|
|
|
|
int functionCallCount = secondCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Count(c => c.CallId == emittedRequest.CallId);
|
|
|
|
functionCallCount.Should().Be(1, "a matching external response should not trigger an extra TurnToken-driven turn");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_AsAgent_MixedResponseAndRegularMessage_CrossExecutorStartExecutorIsReawakenedAsync()
|
|
{
|
|
const string StartExecutorId = "start-executor";
|
|
const string KickoffInputText = "Start";
|
|
const string KickoffMessageText = "kickoff downstream";
|
|
const string ResumeRegularText = "resume regular";
|
|
const string ResumeProcessedText = "regular message processed";
|
|
const string CallId = "cross-executor-call-id";
|
|
const string FunctionName = "crossExecutorFunction";
|
|
|
|
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true);
|
|
ExecutorBinding requestBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
|
|
|
|
KickoffOnStartExecutor startExecutor = new(
|
|
StartExecutorId,
|
|
requestBinding.Id,
|
|
KickoffInputText,
|
|
KickoffMessageText,
|
|
ResumeRegularText,
|
|
ResumeProcessedText);
|
|
ExecutorBinding startBinding = startExecutor.BindExecutor();
|
|
|
|
Workflow workflow = new WorkflowBuilder(startBinding)
|
|
.AddEdge<List<ChatMessage>>(startBinding, requestBinding, messages =>
|
|
messages?.Any(message => message.Contents.OfType<TextContent>().Any(content => content.Text == KickoffMessageText)) == true)
|
|
.AddEdge<TurnToken>(startBinding, requestBinding, _ => true)
|
|
.Build();
|
|
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
|
|
|
|
AgentSession session = await agent.CreateSessionAsync();
|
|
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
|
|
new ChatMessage(ChatRole.User, KickoffInputText),
|
|
session).ToListAsync();
|
|
FunctionCallContent emittedRequest = firstCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Single();
|
|
|
|
ChatMessage[] resumeMessages =
|
|
[
|
|
new(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]),
|
|
new(ChatRole.User, ResumeRegularText)
|
|
];
|
|
|
|
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(resumeMessages, session).ToListAsync();
|
|
List<string> textContents = [.. secondCallUpdates.SelectMany(update => update.Contents.OfType<TextContent>()).Select(content => content.Text)];
|
|
|
|
textContents.Should().Contain(ResumeProcessedText, "the start executor should receive an explicit TurnToken when the matched response wakes a different executor");
|
|
textContents.Should().Contain("Request processed", "the matched external response should still be delivered to the downstream request owner");
|
|
secondCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Should()
|
|
.NotContain(c => c.CallId == emittedRequest.CallId, "the handled external request should not be re-emitted while waking the start executor");
|
|
secondCallUpdates.SelectMany(u => u.Contents.OfType<ErrorContent>()).Should().BeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_AsAgent_UnmatchedResponse_TriggersTurnAndKeepsProgressingAsync()
|
|
{
|
|
const string CallId = "unmatched-response-call-id";
|
|
const string FunctionName = "unmatchedResponseFunction";
|
|
|
|
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: false);
|
|
ExecutorBinding agentBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(agentBinding).Build();
|
|
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
|
|
|
|
AgentSession session = await agent.CreateSessionAsync();
|
|
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(new ChatMessage(ChatRole.User, "Start"), session).ToListAsync();
|
|
firstCallUpdates.Should().Contain(u => u.Contents.Any(c => c is FunctionCallContent));
|
|
|
|
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(
|
|
new ChatMessage(ChatRole.Tool, [new FunctionResultContent("different-call-id", "tool output")]),
|
|
session).ToListAsync();
|
|
|
|
int functionCallCount = secondCallUpdates
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Count(c => c.CallId == CallId);
|
|
|
|
functionCallCount.Should().Be(1, "an unmatched response should be treated as regular input and still drive a TurnToken continuation without workflow errors");
|
|
secondCallUpdates.SelectMany(u => u.Contents.OfType<ErrorContent>()).Should().BeEmpty();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tests that when a resume contains only an external response directed at a non-start executor
|
|
/// (no regular messages), the start executor still receives a TurnToken and is activated.
|
|
/// This is a regression test for the case where the TurnToken was previously skipped because
|
|
/// <c>HasRegularMessages</c> was <see langword="false"/>, leaving the start executor dormant.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Test_AsAgent_ResponseOnlyToNonStartExecutor_StartExecutorIsStillActivatedAsync()
|
|
{
|
|
// Arrange
|
|
const string StartExecutorId = "start-executor";
|
|
const string ActivatedMarker = "start-executor-activated";
|
|
const string CallId = "response-only-call-id";
|
|
const string FunctionName = "responseOnlyFunction";
|
|
|
|
RequestEmittingAgent requestAgent = new(new FunctionCallContent(CallId, FunctionName), completeOnResponse: true);
|
|
ExecutorBinding requestBinding = requestAgent.BindAsExecutor(
|
|
new AIAgentHostOptions { InterceptUnterminatedFunctionCalls = false, EmitAgentUpdateEvents = true });
|
|
|
|
TurnTrackingStartExecutor startExecutor = new(StartExecutorId, requestBinding.Id, ActivatedMarker);
|
|
ExecutorBinding startBinding = startExecutor.BindExecutor();
|
|
|
|
Workflow workflow = new WorkflowBuilder(startBinding)
|
|
.AddEdge<List<ChatMessage>>(startBinding, requestBinding, messages =>
|
|
messages?.Any(m => m.Contents.OfType<TextContent>().Any()) == true)
|
|
.AddEdge<TurnToken>(startBinding, requestBinding, _ => true)
|
|
.Build();
|
|
AIAgent agent = workflow.AsAIAgent("WorkflowAgent");
|
|
|
|
// Act 1: First call triggers the downstream FunctionCallContent request
|
|
AgentSession session = await agent.CreateSessionAsync();
|
|
List<AgentResponseUpdate> firstCallUpdates = await agent.RunStreamingAsync(
|
|
new ChatMessage(ChatRole.User, "Start"),
|
|
session).ToListAsync();
|
|
|
|
FunctionCallContent emittedRequest = firstCallUpdates
|
|
.Where(u => u.RawRepresentation is RequestInfoEvent)
|
|
.SelectMany(u => u.Contents.OfType<FunctionCallContent>())
|
|
.Single();
|
|
|
|
// Act 2: Resume with ONLY the external response (no regular messages)
|
|
List<AgentResponseUpdate> secondCallUpdates = await agent.RunStreamingAsync(
|
|
new ChatMessage(ChatRole.Tool, [new FunctionResultContent(emittedRequest.CallId, "tool output")]),
|
|
session).ToListAsync();
|
|
|
|
// Assert: Both the downstream and start executor should have been activated
|
|
List<string> textContents = [.. secondCallUpdates
|
|
.SelectMany(u => u.Contents.OfType<TextContent>())
|
|
.Select(c => c.Text)];
|
|
|
|
textContents.Should().Contain("Request processed",
|
|
"the downstream executor should process the external response");
|
|
textContents.Should().Contain(ActivatedMarker,
|
|
"the start executor should receive a TurnToken and be activated even when resume contains only an external response");
|
|
secondCallUpdates
|
|
.SelectMany(u => u.Contents.OfType<ErrorContent>())
|
|
.Should()
|
|
.BeEmpty();
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData(false)]
|
|
[InlineData(true)]
|
|
public async Task Test_AsAgent_FailsWhenNotChatProtocolAsync(bool runAsync)
|
|
{
|
|
// Arrange
|
|
NonChatProtocolExecutor executor = new();
|
|
executor.DescribeProtocol().IsChatProtocol().Should().BeFalse();
|
|
|
|
Workflow workflow = new WorkflowBuilder(executor).Build();
|
|
AIAgent workflowAsAgent = workflow.AsAIAgent();
|
|
|
|
Func<Task> action = runAsync
|
|
? () => workflowAsAgent.RunStreamingAsync().ToAgentResponseAsync()
|
|
: () => workflowAsAgent.RunAsync();
|
|
|
|
await action.Should().ThrowAsync<InvalidOperationException>();
|
|
}
|
|
|
|
private async Task Run_AsAgent_OutgoingMessagesInHistoryAsync(Workflow workflow, bool runAsync)
|
|
{
|
|
// Arrange
|
|
AIAgent workflowAgent = workflow.AsAIAgent();
|
|
|
|
// Act
|
|
AgentSession session = await workflowAgent.CreateSessionAsync();
|
|
AgentResponse response;
|
|
if (runAsync)
|
|
{
|
|
List<AgentResponseUpdate> updates = [];
|
|
await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(session))
|
|
{
|
|
// Skip WorkflowEvent updates, which do not get persisted in ChatHistory; we cannot skip
|
|
// them after because of a deleterious interaction with .ToAgentResponse() due to the
|
|
// empty initial message (which is created without a MessageId). When running through the
|
|
// message merger, it does the right thing internally.
|
|
if (!string.IsNullOrEmpty(update.Text))
|
|
{
|
|
updates.Add(update);
|
|
}
|
|
}
|
|
|
|
response = updates.ToAgentResponse();
|
|
}
|
|
else
|
|
{
|
|
response = await workflowAgent.RunAsync(session);
|
|
}
|
|
|
|
// Assert
|
|
WorkflowSession workflowSession = session.Should().BeOfType<WorkflowSession>().Subject;
|
|
|
|
ChatMessage[] responseMessages = response.Messages.Where(message => message.Contents.Any())
|
|
.ToArray();
|
|
|
|
ChatMessage[] sessionMessages = workflowSession.ChatHistoryProvider.GetAllMessages(workflowSession)
|
|
.ToArray();
|
|
|
|
// Since we never sent an incoming message, the expectation is that there should be nothing in the session
|
|
// except the response
|
|
responseMessages.Should().BeEquivalentTo(sessionMessages, options => options.WithStrictOrdering());
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData(false)]
|
|
[InlineData(true)]
|
|
public Task Test_SingleAgent_AsAgent_OutgoingMessagesInHistoryAsync(bool runAsync)
|
|
{
|
|
// Arrange
|
|
TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName);
|
|
Workflow singleAgentWorkflow = new WorkflowBuilder(agent).Build();
|
|
return this.Run_AsAgent_OutgoingMessagesInHistoryAsync(singleAgentWorkflow, runAsync);
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData(false)]
|
|
[InlineData(true)]
|
|
public Task Test_Handoffs_AsAgent_OutgoingMessagesInHistoryAsync(bool runAsync)
|
|
{
|
|
// Arrange
|
|
TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName);
|
|
Workflow handoffWorkflow = new HandoffWorkflowBuilder(agent).Build();
|
|
return this.Run_AsAgent_OutgoingMessagesInHistoryAsync(handoffWorkflow, runAsync);
|
|
}
|
|
|
|
// ----- Phase 5: Workflow-as-Agent intermediate forwarding -----------------
|
|
|
|
[Collection(Futures.FuturesSerialCollection.Name)]
|
|
public class IntermediateForwarding
|
|
{
|
|
private const string InterText = "progress";
|
|
private const string FinalText = "final";
|
|
|
|
private static async Task<List<AgentResponseUpdate>> RunStreamingAsync(
|
|
Workflow workflow,
|
|
bool includeWorkflowOutputsInResponse = false)
|
|
{
|
|
return await workflow
|
|
.AsAIAgent("WorkflowAgent", includeWorkflowOutputsInResponse: includeWorkflowOutputsInResponse)
|
|
.RunStreamingAsync(new ChatMessage(ChatRole.User, "hi"))
|
|
.ToListAsync();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_WorkflowHostAgent_IntermediateAgentResponseForwardedInStreamingAsync()
|
|
{
|
|
using Futures.FuturesScope _ = new(enabled: true);
|
|
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText));
|
|
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(binding)
|
|
.WithIntermediateOutputFrom([binding])
|
|
.Build();
|
|
|
|
// Under Futures-on, AgentResponseEvent mirrors AgentResponseUpdateEvent: always
|
|
// forwarded regardless of the include flag. The intermediate tag is observable on
|
|
// the surfaced event for consumers that care to distinguish.
|
|
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: false);
|
|
|
|
updates.Any(u => u.RawRepresentation is AgentResponseEvent are && are.IsIntermediate() && u.Text == InterText)
|
|
.Should().BeTrue("AgentResponseEvent is forwarded under Futures-on regardless of the include flag");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_WorkflowHostAgent_TerminalAgentResponseForwardedUnconditionallyWhenFuturesOnAsync()
|
|
{
|
|
using Futures.FuturesScope _ = new(enabled: true);
|
|
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(FinalText));
|
|
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(binding)
|
|
.WithOutputFrom(binding)
|
|
.Build();
|
|
|
|
// Even a terminal-only designation surfaces without the include flag — the gating
|
|
// asymmetry between AgentResponse and AgentResponseUpdate is gone under Futures-on.
|
|
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: false);
|
|
|
|
updates.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText)
|
|
.Should().BeTrue("terminal AgentResponseEvent is forwarded under Futures-on regardless of the include flag");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_WorkflowHostAgent_TerminalAgentResponseGatedWhenFuturesOffAsync()
|
|
{
|
|
using Futures.FuturesScope _ = new(enabled: false);
|
|
|
|
static Workflow Build()
|
|
{
|
|
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(FinalText));
|
|
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
|
|
return new WorkflowBuilder(binding).WithOutputFrom(binding).Build();
|
|
}
|
|
|
|
// Legacy semantics: AgentResponseEvent stays behind the include flag when Futures
|
|
// is off. Two fresh workflows because in-process runs aren't reentrant.
|
|
List<AgentResponseUpdate> gated = await RunStreamingAsync(Build(), includeWorkflowOutputsInResponse: false);
|
|
gated.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText)
|
|
.Should().BeFalse("terminal AgentResponseEvent stays gated under Futures-off");
|
|
|
|
List<AgentResponseUpdate> included = await RunStreamingAsync(Build(), includeWorkflowOutputsInResponse: true);
|
|
included.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText)
|
|
.Should().BeTrue("opting in via includeWorkflowOutputsInResponse surfaces it");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_WorkflowHostAgent_UndesignatedExecutorEmitsNoAgentResponseEventWhenFuturesOnAsync()
|
|
{
|
|
using Futures.FuturesScope _ = new(enabled: true);
|
|
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText));
|
|
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
|
|
// No designation — under Futures-on, the AgentResponse is dropped by the filter.
|
|
Workflow workflow = new WorkflowBuilder(binding).Build();
|
|
|
|
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: true);
|
|
|
|
updates.Any(u => u.RawRepresentation is AgentResponseEvent)
|
|
.Should().BeFalse("an undesignated AIAgent executor produces no AgentResponseEvent under Futures-on");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_WorkflowHostAgent_UndesignatedAgentResponseSurfacesWhenFuturesOffAsync()
|
|
{
|
|
using Futures.FuturesScope _ = new(enabled: false);
|
|
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText));
|
|
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(binding).Build();
|
|
|
|
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: true);
|
|
|
|
updates.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == InterText)
|
|
.Should().BeTrue("legacy bypass still emits AgentResponseEvent regardless of designation");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_WorkflowHostAgent_IntermediateTagAvailableViaRawRepresentationAsync()
|
|
{
|
|
using Futures.FuturesScope _ = new(enabled: true);
|
|
TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText));
|
|
ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true });
|
|
Workflow workflow = new WorkflowBuilder(binding)
|
|
.WithIntermediateOutputFrom([binding])
|
|
.Build();
|
|
|
|
List<AgentResponseUpdate> updates = await RunStreamingAsync(workflow);
|
|
|
|
AgentResponseUpdate progress = updates.First(u => u.RawRepresentation is AgentResponseEvent && u.Text == InterText);
|
|
AgentResponseEvent raw = (AgentResponseEvent)progress.RawRepresentation!;
|
|
raw.IsIntermediate().Should().BeTrue();
|
|
raw.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate });
|
|
}
|
|
}
|
|
}
|