// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Execution; namespace Microsoft.Agents.AI.Workflows; /// /// Represents a workflow run that tracks execution status and emitted workflow events, supporting resumption /// with responses to . /// public sealed class Run : CheckpointableRunBase, IAsyncDisposable { private readonly List _eventSink = []; private readonly AsyncRunHandle _runHandle; internal Run(AsyncRunHandle runHandle) : base(runHandle) { this._runHandle = runHandle; } internal async ValueTask RunToNextHaltAsync(CancellationToken cancellationToken = default) { bool hadEvents = false; await foreach (WorkflowEvent evt in this._runHandle.TakeEventStreamAsync(blockOnPendingRequest: false, cancellationToken).ConfigureAwait(false)) { hadEvents = true; this._eventSink.Add(evt); } return hadEvents; } /// /// A unique identifier for the session. Can be provided at the start of the session, or auto-generated. /// public string SessionId => this._runHandle.SessionId; /// /// Gets the current execution status of the workflow run. /// public ValueTask GetStatusAsync(CancellationToken cancellationToken = default) => this._runHandle.GetStatusAsync(cancellationToken); /// /// Gets all events emitted by the workflow. /// public IEnumerable OutgoingEvents => this._eventSink; private int _lastBookmark; /// /// The number of events emitted by the workflow since the last access to /// public int NewEventCount => this._eventSink.Count - this._lastBookmark; /// /// Gets all events emitted by the workflow since the last access to . /// [DebuggerDisplay("NewEvents[{NewEventCount}]")] public IEnumerable NewEvents { get { if (this._lastBookmark >= this._eventSink.Count) { return []; } int currentBookmark = this._lastBookmark; this._lastBookmark = this._eventSink.Count; return this._eventSink.Skip(currentBookmark); } } /// /// Resume execution of the workflow with the provided external responses. /// /// An array of objects to send to the workflow. /// The to monitor for cancellation requests. The default is . /// true if the workflow had any output events, false otherwise. public async ValueTask ResumeAsync(IEnumerable responses, CancellationToken cancellationToken = default) { foreach (ExternalResponse response in responses) { await this._runHandle.EnqueueResponseAsync(response, cancellationToken).ConfigureAwait(false); } return await this.RunToNextHaltAsync(cancellationToken).ConfigureAwait(false); } /// /// Resume execution of the workflow with the provided external responses. /// /// The to monitor for cancellation requests. The default is . /// An array of messages to send to the workflow. Messages will only be sent if they are valid /// input types to the starting executor or a . /// true if the workflow had any output events, false otherwise. public async ValueTask ResumeAsync(CancellationToken cancellationToken = default, params IEnumerable messages) where T : notnull { if (messages is IEnumerable responses) { return await this.ResumeAsync(responses, cancellationToken).ConfigureAwait(false); } if (typeof(T) == typeof(object)) { foreach (object? message in messages) { await this._runHandle.EnqueueMessageUntypedAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); } } else { foreach (T message in messages) { await this._runHandle.EnqueueMessageAsync(message, cancellationToken).ConfigureAwait(false); } } return await this.RunToNextHaltAsync(cancellationToken).ConfigureAwait(false); } /// public ValueTask DisposeAsync() { return this._runHandle.DisposeAsync(); } }