// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; using Microsoft.Agents.AI.Workflows.Execution; using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows; /// /// A class that represents a workflow that can be executed. /// public class Workflow { /// /// A dictionary of executor providers, keyed by executor ID. /// internal Dictionary ExecutorBindings { get; init; } = []; internal Dictionary> Edges { get; init; } = []; internal HashSet OutputExecutors { get; init; } = []; /// /// Gets the collection of edges grouped by their source node identifier. /// public Dictionary> ReflectEdges() { return this.Edges.Keys.ToDictionary( keySelector: key => key, elementSelector: key => new HashSet(this.Edges[key].Select(RepresentationExtensions.ToEdgeInfo)) ); } internal Dictionary Ports { get; init; } = []; /// /// Gets the collection of external request ports, keyed by their ID. /// /// /// Each port has a corresponding entry in the dictionary. /// public Dictionary ReflectPorts() { return this.Ports.Keys.ToDictionary( keySelector: key => key, elementSelector: key => this.Ports[key].ToPortInfo() ); } /// /// Gets the collection of executor bindings, keyed by their ID. /// /// A copy of the executor bindings dictionary. Modifications do not affect the workflow. public Dictionary ReflectExecutors() { return new Dictionary(this.ExecutorBindings); } /// /// Gets the identifier of the starting executor of the workflow. /// public string StartExecutorId { get; } /// /// Gets the optional human-readable name of the workflow. /// public string? Name { get; internal init; } /// /// Gets the optional description of what the workflow does. /// public string? Description { get; internal init; } /// /// Gets the telemetry context for the workflow. /// internal WorkflowTelemetryContext TelemetryContext { get; } internal bool AllowConcurrent => this.ExecutorBindings.Values.All(registration => registration.SupportsConcurrentSharedExecution); internal IEnumerable NonConcurrentExecutorIds => this.ExecutorBindings.Values.Where(r => !r.SupportsConcurrentSharedExecution).Select(r => r.Id); /// /// Initializes a new instance of the class with the specified starting executor identifier /// and input type. /// /// The unique identifier of the starting executor for the workflow. Cannot be null. /// Optional human-readable name for the workflow. /// Optional description of what the workflow does. /// Optional telemetry context for the workflow. internal Workflow(string startExecutorId, string? name = null, string? description = null, WorkflowTelemetryContext? telemetryContext = null) { this.StartExecutorId = Throw.IfNull(startExecutorId); this.Name = name; this.Description = description; this.TelemetryContext = telemetryContext ?? WorkflowTelemetryContext.Disabled; } private bool _needsReset; private bool HasResettableExecutors => this.ExecutorBindings.Values.Any(registration => registration.SupportsResetting); private async ValueTask TryResetExecutorRegistrationsAsync() { if (this.HasResettableExecutors) { foreach (ExecutorBinding registration in this.ExecutorBindings.Values) { // TryResetAsync returns true if the executor does not need resetting if (!await registration.TryResetAsync().ConfigureAwait(false)) { return false; } } this._needsReset = false; return true; } return false; } private object? _ownerToken; private bool _ownedAsSubworkflow; internal void CheckOwnership(object? existingOwnershipSignoff = null) { object? maybeOwned = Volatile.Read(ref this._ownerToken); if (!ReferenceEquals(maybeOwned, existingOwnershipSignoff)) { throw new InvalidOperationException($"Existing ownership does not match check value. {Summarize(maybeOwned)} vs. {Summarize(existingOwnershipSignoff)}"); } static string Summarize(object? maybeOwnerToken) => maybeOwnerToken switch { string s => $"'{s}'", null => "", _ => $"{maybeOwnerToken.GetType().Name}@{maybeOwnerToken.GetHashCode()}", }; } internal void TakeOwnership(object ownerToken, bool subworkflow = false, object? existingOwnershipSignoff = null) { object? maybeToken = Interlocked.CompareExchange(ref this._ownerToken, ownerToken, existingOwnershipSignoff); if (maybeToken == null && existingOwnershipSignoff != null) { // We expected to already be owned, but we were not throw new InvalidOperationException("Existing ownership token was provided, but the workflow is unowned."); } if (maybeToken == null && this._needsReset) { // There is no owner, but the workflow failed to reset on ownership release (because there are // shared executors). throw new InvalidOperationException( "Cannot reuse Workflow with shared Executor instances that do not implement IResettableExecutor." ); } if (!ReferenceEquals(maybeToken, existingOwnershipSignoff) && !ReferenceEquals(maybeToken, ownerToken)) { // Someone else owns the workflow Debug.Assert(maybeToken != null); throw new InvalidOperationException( (subworkflow, this._ownedAsSubworkflow) switch { (true, true) => "Cannot use a Workflow as a subworkflow of multiple parent workflows.", (true, false) => "Cannot use a running Workflow as a subworkflow.", (false, true) => "Cannot directly run a Workflow that is a subworkflow of another workflow.", (false, false) => "Cannot use a Workflow that is already owned by another runner or parent workflow.", }); } this._needsReset = this.HasResettableExecutors; this._ownedAsSubworkflow = subworkflow; } [System.Diagnostics.CodeAnalysis.SuppressMessage("Maintainability", "CA1513:Use ObjectDisposedException throw helper", Justification = "Does not exist in NetFx 4.7.2")] internal async ValueTask ReleaseOwnershipAsync(object ownerToken, object? targetOwnerToken) { object? originalToken = Interlocked.CompareExchange(ref this._ownerToken, targetOwnerToken, ownerToken) ?? throw new InvalidOperationException("Attempting to release ownership of a Workflow that is not owned."); if (!ReferenceEquals(originalToken, ownerToken)) { throw new InvalidOperationException("Attempt to release ownership of a Workflow by non-owner."); } await this.TryResetExecutorRegistrationsAsync().ConfigureAwait(false); } private sealed class NoOpExternalRequestContext : IExternalRequestContext, IExternalRequestSink { public ValueTask PostAsync(ExternalRequest request) => default; IExternalRequestSink IExternalRequestContext.RegisterPort(RequestPort port) { return this; } } /// /// Retrieves a defining how to interact with this workflow. /// /// The to monitor for cancellation requests. The default is . /// A that represents that asynchronous operation. The result contains /// a the protocol this follows. public async ValueTask DescribeProtocolAsync(CancellationToken cancellationToken = default) { ExecutorBinding startExecutorRegistration = this.ExecutorBindings[this.StartExecutorId]; Executor startExecutor = await startExecutorRegistration.CreateInstanceAsync(string.Empty) .ConfigureAwait(false); startExecutor.AttachRequestContext(new NoOpExternalRequestContext()); ProtocolDescriptor inputProtocol = startExecutor.DescribeProtocol(); IEnumerable> outputExecutorTasks = this.OutputExecutors.Select(executorId => this.ExecutorBindings[executorId].CreateInstanceAsync(string.Empty).AsTask()); Executor[] outputExecutors = await Task.WhenAll(outputExecutorTasks).ConfigureAwait(false); IEnumerable yieldedTypes = outputExecutors.SelectMany(executor => executor.DescribeProtocol().Yields); return new(inputProtocol.Accepts, yieldedTypes, [], inputProtocol.AcceptsAll); } }