// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Execution;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Shared.Diagnostics;
namespace Microsoft.Agents.AI.Workflows;
///
/// A class that represents a workflow that can be executed.
///
public class Workflow
{
///
/// A dictionary of executor providers, keyed by executor ID.
///
internal Dictionary ExecutorBindings { get; init; } = [];
internal Dictionary> Edges { get; init; } = [];
internal HashSet OutputExecutors { get; init; } = [];
///
/// Gets the collection of edges grouped by their source node identifier.
///
public Dictionary> ReflectEdges()
{
return this.Edges.Keys.ToDictionary(
keySelector: key => key,
elementSelector: key => new HashSet(this.Edges[key].Select(RepresentationExtensions.ToEdgeInfo))
);
}
internal Dictionary Ports { get; init; } = [];
///
/// Gets the collection of external request ports, keyed by their ID.
///
///
/// Each port has a corresponding entry in the dictionary.
///
public Dictionary ReflectPorts()
{
return this.Ports.Keys.ToDictionary(
keySelector: key => key,
elementSelector: key => this.Ports[key].ToPortInfo()
);
}
///
/// Gets the collection of executor bindings, keyed by their ID.
///
/// A copy of the executor bindings dictionary. Modifications do not affect the workflow.
public Dictionary ReflectExecutors()
{
return new Dictionary(this.ExecutorBindings);
}
///
/// Gets the identifier of the starting executor of the workflow.
///
public string StartExecutorId { get; }
///
/// Gets the optional human-readable name of the workflow.
///
public string? Name { get; internal init; }
///
/// Gets the optional description of what the workflow does.
///
public string? Description { get; internal init; }
///
/// Gets the telemetry context for the workflow.
///
internal WorkflowTelemetryContext TelemetryContext { get; }
internal bool AllowConcurrent => this.ExecutorBindings.Values.All(registration => registration.SupportsConcurrentSharedExecution);
internal IEnumerable NonConcurrentExecutorIds =>
this.ExecutorBindings.Values.Where(r => !r.SupportsConcurrentSharedExecution).Select(r => r.Id);
///
/// Initializes a new instance of the class with the specified starting executor identifier
/// and input type.
///
/// The unique identifier of the starting executor for the workflow. Cannot be null.
/// Optional human-readable name for the workflow.
/// Optional description of what the workflow does.
/// Optional telemetry context for the workflow.
internal Workflow(string startExecutorId, string? name = null, string? description = null, WorkflowTelemetryContext? telemetryContext = null)
{
this.StartExecutorId = Throw.IfNull(startExecutorId);
this.Name = name;
this.Description = description;
this.TelemetryContext = telemetryContext ?? WorkflowTelemetryContext.Disabled;
}
private bool _needsReset;
private bool HasResettableExecutors =>
this.ExecutorBindings.Values.Any(registration => registration.SupportsResetting);
private async ValueTask TryResetExecutorRegistrationsAsync()
{
if (this.HasResettableExecutors)
{
foreach (ExecutorBinding registration in this.ExecutorBindings.Values)
{
// TryResetAsync returns true if the executor does not need resetting
if (!await registration.TryResetAsync().ConfigureAwait(false))
{
return false;
}
}
this._needsReset = false;
return true;
}
return false;
}
private object? _ownerToken;
private bool _ownedAsSubworkflow;
internal void CheckOwnership(object? existingOwnershipSignoff = null)
{
object? maybeOwned = Volatile.Read(ref this._ownerToken);
if (!ReferenceEquals(maybeOwned, existingOwnershipSignoff))
{
throw new InvalidOperationException($"Existing ownership does not match check value. {Summarize(maybeOwned)} vs. {Summarize(existingOwnershipSignoff)}");
}
static string Summarize(object? maybeOwnerToken) => maybeOwnerToken switch
{
string s => $"'{s}'",
null => "",
_ => $"{maybeOwnerToken.GetType().Name}@{maybeOwnerToken.GetHashCode()}",
};
}
internal void TakeOwnership(object ownerToken, bool subworkflow = false, object? existingOwnershipSignoff = null)
{
object? maybeToken = Interlocked.CompareExchange(ref this._ownerToken, ownerToken, existingOwnershipSignoff);
if (maybeToken == null && existingOwnershipSignoff != null)
{
// We expected to already be owned, but we were not
throw new InvalidOperationException("Existing ownership token was provided, but the workflow is unowned.");
}
if (maybeToken == null && this._needsReset)
{
// There is no owner, but the workflow failed to reset on ownership release (because there are
// shared executors).
throw new InvalidOperationException(
"Cannot reuse Workflow with shared Executor instances that do not implement IResettableExecutor."
);
}
if (!ReferenceEquals(maybeToken, existingOwnershipSignoff) && !ReferenceEquals(maybeToken, ownerToken))
{
// Someone else owns the workflow
Debug.Assert(maybeToken != null);
throw new InvalidOperationException(
(subworkflow, this._ownedAsSubworkflow) switch
{
(true, true) => "Cannot use a Workflow as a subworkflow of multiple parent workflows.",
(true, false) => "Cannot use a running Workflow as a subworkflow.",
(false, true) => "Cannot directly run a Workflow that is a subworkflow of another workflow.",
(false, false) => "Cannot use a Workflow that is already owned by another runner or parent workflow.",
});
}
this._needsReset = this.HasResettableExecutors;
this._ownedAsSubworkflow = subworkflow;
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("Maintainability", "CA1513:Use ObjectDisposedException throw helper",
Justification = "Does not exist in NetFx 4.7.2")]
internal async ValueTask ReleaseOwnershipAsync(object ownerToken, object? targetOwnerToken)
{
object? originalToken = Interlocked.CompareExchange(ref this._ownerToken, targetOwnerToken, ownerToken) ??
throw new InvalidOperationException("Attempting to release ownership of a Workflow that is not owned.");
if (!ReferenceEquals(originalToken, ownerToken))
{
throw new InvalidOperationException("Attempt to release ownership of a Workflow by non-owner.");
}
await this.TryResetExecutorRegistrationsAsync().ConfigureAwait(false);
}
private sealed class NoOpExternalRequestContext : IExternalRequestContext, IExternalRequestSink
{
public ValueTask PostAsync(ExternalRequest request) => default;
IExternalRequestSink IExternalRequestContext.RegisterPort(RequestPort port)
{
return this;
}
}
///
/// Retrieves a defining how to interact with this workflow.
///
/// The to monitor for cancellation requests. The default is .
/// A that represents that asynchronous operation. The result contains
/// a the protocol this follows.
public async ValueTask DescribeProtocolAsync(CancellationToken cancellationToken = default)
{
ExecutorBinding startExecutorRegistration = this.ExecutorBindings[this.StartExecutorId];
Executor startExecutor = await startExecutorRegistration.CreateInstanceAsync(string.Empty)
.ConfigureAwait(false);
startExecutor.AttachRequestContext(new NoOpExternalRequestContext());
ProtocolDescriptor inputProtocol = startExecutor.DescribeProtocol();
IEnumerable> outputExecutorTasks = this.OutputExecutors.Select(executorId => this.ExecutorBindings[executorId].CreateInstanceAsync(string.Empty).AsTask());
Executor[] outputExecutors = await Task.WhenAll(outputExecutorTasks).ConfigureAwait(false);
IEnumerable yieldedTypes = outputExecutors.SelectMany(executor => executor.DescribeProtocol().Yields);
return new(inputProtocol.Accepts, yieldedTypes, [], inputProtocol.AcceptsAll);
}
}