// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Execution;
namespace Microsoft.Agents.AI.Workflows;
///
/// Represents a workflow run that tracks execution status and emitted workflow events, supporting resumption
/// with responses to .
///
public sealed class Run : CheckpointableRunBase, IAsyncDisposable
{
private readonly List _eventSink = [];
private readonly AsyncRunHandle _runHandle;
internal Run(AsyncRunHandle runHandle) : base(runHandle)
{
this._runHandle = runHandle;
}
internal async ValueTask RunToNextHaltAsync(CancellationToken cancellationToken = default)
{
bool hadEvents = false;
await foreach (WorkflowEvent evt in this._runHandle.TakeEventStreamAsync(blockOnPendingRequest: false, cancellationToken).ConfigureAwait(false))
{
hadEvents = true;
this._eventSink.Add(evt);
}
return hadEvents;
}
///
/// A unique identifier for the session. Can be provided at the start of the session, or auto-generated.
///
public string SessionId => this._runHandle.SessionId;
///
/// Gets the current execution status of the workflow run.
///
public ValueTask GetStatusAsync(CancellationToken cancellationToken = default)
=> this._runHandle.GetStatusAsync(cancellationToken);
///
/// Gets all events emitted by the workflow.
///
public IEnumerable OutgoingEvents => this._eventSink;
private int _lastBookmark;
///
/// The number of events emitted by the workflow since the last access to
///
public int NewEventCount => this._eventSink.Count - this._lastBookmark;
///
/// Gets all events emitted by the workflow since the last access to .
///
[DebuggerDisplay("NewEvents[{NewEventCount}]")]
public IEnumerable NewEvents
{
get
{
if (this._lastBookmark >= this._eventSink.Count)
{
return [];
}
int currentBookmark = this._lastBookmark;
this._lastBookmark = this._eventSink.Count;
return this._eventSink.Skip(currentBookmark);
}
}
///
/// Resume execution of the workflow with the provided external responses.
///
/// An array of objects to send to the workflow.
/// The to monitor for cancellation requests. The default is .
/// true if the workflow had any output events, false otherwise.
public async ValueTask ResumeAsync(IEnumerable responses, CancellationToken cancellationToken = default)
{
foreach (ExternalResponse response in responses)
{
await this._runHandle.EnqueueResponseAsync(response, cancellationToken).ConfigureAwait(false);
}
return await this.RunToNextHaltAsync(cancellationToken).ConfigureAwait(false);
}
///
/// Resume execution of the workflow with the provided external responses.
///
/// The to monitor for cancellation requests. The default is .
/// An array of messages to send to the workflow. Messages will only be sent if they are valid
/// input types to the starting executor or a .
/// true if the workflow had any output events, false otherwise.
public async ValueTask ResumeAsync(CancellationToken cancellationToken = default, params IEnumerable messages)
where T : notnull
{
if (messages is IEnumerable responses)
{
return await this.ResumeAsync(responses, cancellationToken).ConfigureAwait(false);
}
if (typeof(T) == typeof(object))
{
foreach (object? message in messages)
{
await this._runHandle.EnqueueMessageUntypedAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
else
{
foreach (T message in messages)
{
await this._runHandle.EnqueueMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
}
return await this.RunToNextHaltAsync(cancellationToken).ConfigureAwait(false);
}
///
public ValueTask DisposeAsync()
{
return this._runHandle.DisposeAsync();
}
}