// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Microsoft.Agents.AI.Workflows.UnitTests; internal abstract partial class TestingExecutor : Executor, IDisposable { private readonly bool _loop; private readonly Func>[] _actions; private readonly HashSet _linkedTokens = []; private CancellationTokenSource _internalCts = new(); public int Iterations { get; private set; } public bool AtEnd => this._nextActionIndex >= this._actions.Length; public bool Completed => !this._loop && this.AtEnd; protected TestingExecutor(string id, bool loop = false, params Func>[] actions) : base(id) { this._loop = loop; this._actions = actions; } public void UnlinkCancellation(CancellationToken cancellationToken) => this._linkedTokens.Remove(cancellationToken); public void LinkCancellation(CancellationToken cancellationToken) { this._linkedTokens.Add(cancellationToken); CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(this._linkedTokens.ToArray()); tokenSource = Interlocked.Exchange(ref this._internalCts, tokenSource); tokenSource.Dispose(); } public void SetCancel() => Volatile.Read(ref this._internalCts).Cancel(); private int _nextActionIndex; [MessageHandler] public ValueTask RouteToActionsAsync(TIn message, IWorkflowContext context) { if (this.AtEnd) { if (this._loop) { this.Iterations++; this._nextActionIndex = 0; } else { throw new InvalidOperationException("No more actions to execute and looping is disabled."); } } try { Func> action = this._actions[this._nextActionIndex]; return action(message, context, Volatile.Read(ref this._internalCts).Token); } finally { this._nextActionIndex++; } } ~TestingExecutor() { this.Dispose(false); } protected virtual void Dispose(bool disposing) => this._internalCts.Dispose(); public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } }