Files
Jacob Alber 0086d38f58 .NET: [BREAKING] Workflows API Review Naming Changes (Part 1?) (#4090)
* refactor: Normalize Run/RunStreaming with AIAgent

* refactor: Clarify Session vs. Run -level concepts

* Rename RunId to SessionId to better match Run/Session terminology in AIAgent
* [BREAKING]: Will break existing checkpointed sessions in CosmosDb due to field rename

* refactor: Rename and simplify interface around getting typed data out of ExternalRequest/Response

* Also adds hints around using value types in PortableValue

* refactor: Rename AddFanInEdge to AddFanInBarrierEdge

This will prevent a breaking change later when we introduce a programmable FanIn edge, analogous to the FanOut edge's EdgeSelector.

The goal, in the long run is to support a number of different FanIn scenarios, with naive FanIn (no barrier) by default, similar to FanOut.

* refactor: AsAgent(this Workflow, ...) => AsAIAgent(...)

* misc - part1: SwitchBuilder internal

---------

Co-authored-by: Dmytro Struk <13853051+dmytrostruk@users.noreply.github.com>
2026-02-20 02:05:18 +00:00

136 lines
5.1 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Execution;
namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// Represents a workflow run that tracks execution status and emitted workflow events, supporting resumption
/// with responses to <see cref="RequestInfoEvent"/>.
/// </summary>
public sealed class Run : CheckpointableRunBase, IAsyncDisposable
{
private readonly List<WorkflowEvent> _eventSink = [];
private readonly AsyncRunHandle _runHandle;
internal Run(AsyncRunHandle runHandle) : base(runHandle)
{
this._runHandle = runHandle;
}
internal async ValueTask<bool> RunToNextHaltAsync(CancellationToken cancellationToken = default)
{
bool hadEvents = false;
await foreach (WorkflowEvent evt in this._runHandle.TakeEventStreamAsync(blockOnPendingRequest: false, cancellationToken).ConfigureAwait(false))
{
hadEvents = true;
this._eventSink.Add(evt);
}
return hadEvents;
}
/// <summary>
/// A unique identifier for the session. Can be provided at the start of the session, or auto-generated.
/// </summary>
public string SessionId => this._runHandle.SessionId;
/// <summary>
/// Gets the current execution status of the workflow run.
/// </summary>
public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default)
=> this._runHandle.GetStatusAsync(cancellationToken);
/// <summary>
/// Gets all events emitted by the workflow.
/// </summary>
public IEnumerable<WorkflowEvent> OutgoingEvents => this._eventSink;
private int _lastBookmark;
/// <summary>
/// The number of events emitted by the workflow since the last access to <see cref="NewEvents"/>
/// </summary>
public int NewEventCount => this._eventSink.Count - this._lastBookmark;
/// <summary>
/// Gets all events emitted by the workflow since the last access to <see cref="NewEvents" />.
/// </summary>
[DebuggerDisplay("NewEvents[{NewEventCount}]")]
public IEnumerable<WorkflowEvent> NewEvents
{
get
{
if (this._lastBookmark >= this._eventSink.Count)
{
return [];
}
int currentBookmark = this._lastBookmark;
this._lastBookmark = this._eventSink.Count;
return this._eventSink.Skip(currentBookmark);
}
}
/// <summary>
/// Resume execution of the workflow with the provided external responses.
/// </summary>
/// <param name="responses">An array of <see cref="ExternalResponse"/> objects to send to the workflow.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns><c>true</c> if the workflow had any output events, <c>false</c> otherwise.</returns>
public async ValueTask<bool> ResumeAsync(IEnumerable<ExternalResponse> responses, CancellationToken cancellationToken = default)
{
foreach (ExternalResponse response in responses)
{
await this._runHandle.EnqueueResponseAsync(response, cancellationToken).ConfigureAwait(false);
}
return await this.RunToNextHaltAsync(cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Resume execution of the workflow with the provided external responses.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <param name="messages">An array of messages to send to the workflow. Messages will only be sent if they are valid
/// input types to the starting executor or a <see cref="ExternalResponse"/>.</param>
/// <returns><c>true</c> if the workflow had any output events, <c>false</c> otherwise.</returns>
public async ValueTask<bool> ResumeAsync<T>(CancellationToken cancellationToken = default, params IEnumerable<T> messages)
where T : notnull
{
if (messages is IEnumerable<ExternalResponse> responses)
{
return await this.ResumeAsync(responses, cancellationToken).ConfigureAwait(false);
}
if (typeof(T) == typeof(object))
{
foreach (object? message in messages)
{
await this._runHandle.EnqueueMessageUntypedAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
else
{
foreach (T message in messages)
{
await this._runHandle.EnqueueMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
}
return await this.RunToNextHaltAsync(cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc/>
public ValueTask DisposeAsync()
{
return this._runHandle.DisposeAsync();
}
}