Files
Jacob Alber 6a3d22598f .NET: [BREAKING] Implement Polymorphic Routing (#3792)
* feat: Implement Polymorphic Routing

* feat: Add support for Send/Yield annotations with basic Executor

* Adds annotations to Declarative workflow executors

* fix: Address PR Comments

* Implicit filter in collection loops
* Remove debug / usused / superfluous code
* Fix ProtocolBuilder implicit output registrations
* Fix logic error in ExecuteRouteGeneratorTests.ClassWithManualConfigureProtocol_DoesNotGenerate

* fix: Solidify type checks and send/yield type registrations

* fix: Suppress generation of TurnTokens out of AggregateTurnMessagesExecutor

* Fixes an issue where ConcurrentEndExecutor is not expecting TurnTokens.

* fix: Add ProtocolBuilder support for chained-delegation

* Updates Declarative pacakge to rely on chained-delegation Send/Yield registration
* Renames DeclarativeActionExectuor's new ExecuteAsync to ExecuteActionAsync to avoid colliding with Executor.ExecutoeAsync

* fix: Address PR Comments

* Fixes type mapping in FanInEdgeRunner
* Fixes and expalins send/yield type registration in FunctionExecutor

* fixup: build-break

* fix: Add missing SendsMesage declaration to InvokeAzureAgentExecutor
2026-02-19 14:09:03 +00:00

85 lines
2.6 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
internal abstract partial class TestingExecutor<TIn, TOut> : Executor, IDisposable
{
private readonly bool _loop;
private readonly Func<TIn, IWorkflowContext, CancellationToken, ValueTask<TOut>>[] _actions;
private readonly HashSet<CancellationToken> _linkedTokens = [];
private CancellationTokenSource _internalCts = new();
public int Iterations { get; private set; }
public bool AtEnd => this._nextActionIndex >= this._actions.Length;
public bool Completed => !this._loop && this.AtEnd;
protected TestingExecutor(string id, bool loop = false, params Func<TIn, IWorkflowContext, CancellationToken, ValueTask<TOut>>[] actions) : base(id)
{
this._loop = loop;
this._actions = actions;
}
public void UnlinkCancellation(CancellationToken cancellationToken) =>
this._linkedTokens.Remove(cancellationToken);
public void LinkCancellation(CancellationToken cancellationToken)
{
this._linkedTokens.Add(cancellationToken);
CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(this._linkedTokens.ToArray());
tokenSource = Interlocked.Exchange(ref this._internalCts, tokenSource);
tokenSource.Dispose();
}
public void SetCancel() =>
Volatile.Read(ref this._internalCts).Cancel();
private int _nextActionIndex;
[MessageHandler]
public ValueTask<TOut> RouteToActionsAsync(TIn message, IWorkflowContext context)
{
if (this.AtEnd)
{
if (this._loop)
{
this.Iterations++;
this._nextActionIndex = 0;
}
else
{
throw new InvalidOperationException("No more actions to execute and looping is disabled.");
}
}
try
{
Func<TIn, IWorkflowContext, CancellationToken, ValueTask<TOut>> action = this._actions[this._nextActionIndex];
return action(message, context, Volatile.Read(ref this._internalCts).Token);
}
finally
{
this._nextActionIndex++;
}
}
~TestingExecutor()
{
this.Dispose(false);
}
protected virtual void Dispose(bool disposing) =>
this._internalCts.Dispose();
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}