[BREAKING] .NET: Decouple Checkpointing from Run/StreamAsync APIs (#4037)

* [BREAKING] refactor: Decouple Checkpointing and Execution APIs

With this change, Checkpointing becomes an property of an IWorkflowExecutionEnvironment. This lets environments that are tightly-coupled to their CheckpointManager avoid needing to present APIs that would not work (e.g. taking in an InMemory CheckpointManager for Durable Tasks, for example)

* refactor: Normalize IsCheckpointingEnabled naming
This commit is contained in:
Jacob Alber
2026-02-19 11:41:35 -05:00
committed by GitHub
Unverified
parent fd4e6e816c
commit c73bd87503
34 changed files with 299 additions and 317 deletions
@@ -32,10 +32,10 @@ public static class Program
var checkpoints = new List<CheckpointInfo>();
// Execute the workflow and save checkpoints
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
await using StreamingRun checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompletedEvt)
{
@@ -72,10 +72,10 @@ public static class Program
Console.WriteLine($"\n\nHydrating a new workflow instance from the {CheckpointIndex + 1}th checkpoint.");
CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex];
await using Checkpointed<StreamingRun> newCheckpointedRun =
await using StreamingRun newCheckpointedRun =
await InProcessExecution.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in newCheckpointedRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompletedEvt)
{
@@ -31,10 +31,10 @@ public static class Program
var checkpoints = new List<CheckpointInfo>();
// Execute the workflow and save checkpoints
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
await using StreamingRun checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager)
;
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompletedEvt)
{
@@ -71,7 +71,7 @@ public static class Program
CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompletedEvt)
{
@@ -34,17 +34,17 @@ public static class Program
var checkpoints = new List<CheckpointInfo>();
// Execute the workflow and save checkpoints
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
await using StreamingRun checkpointedRun = await InProcessExecution
.StreamAsync(workflow, new SignalWithNumber(NumberSignal.Init), checkpointManager)
;
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
await checkpointedRun.SendResponseAsync(response);
break;
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"* Executor {executorCompletedEvt.ExecutorId} completed.");
@@ -77,14 +77,14 @@ public static class Program
CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
await checkpointedRun.SendResponseAsync(response);
break;
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"* Executor {executorCompletedEvt.ExecutorId} completed.");
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
@@ -54,4 +55,7 @@ public sealed class CheckpointManager : ICheckpointManager
ValueTask<Checkpoint> ICheckpointManager.LookupCheckpointAsync(string runId, CheckpointInfo checkpointInfo)
=> this._impl.LookupCheckpointAsync(runId, checkpointInfo);
ValueTask<IEnumerable<CheckpointInfo>> ICheckpointManager.RetrieveIndexAsync(string runId, CheckpointInfo? withParent)
=> this._impl.RetrieveIndexAsync(runId, withParent);
}
@@ -0,0 +1,49 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// Represents a base object for a workflow run that may support checkpointing.
/// </summary>
public abstract class CheckpointableRunBase
{
// TODO: Rename Context?
private readonly ICheckpointingHandle _checkpointingHandle;
internal CheckpointableRunBase(ICheckpointingHandle checkpointingHandle)
{
this._checkpointingHandle = checkpointingHandle;
}
/// <inheritdoc cref="ICheckpointingHandle.IsCheckpointingEnabled"/>
public bool IsCheckpointingEnabled => this._checkpointingHandle.IsCheckpointingEnabled;
/// <inheritdoc cref="ICheckpointingHandle.Checkpoints"/>
public IReadOnlyList<CheckpointInfo> Checkpoints => this._checkpointingHandle.Checkpoints ?? [];
/// <summary>
/// Gets the most recent checkpoint information.
/// </summary>
public CheckpointInfo? LastCheckpoint
{
get
{
if (!this.IsCheckpointingEnabled)
{
return null;
}
var checkpoints = this.Checkpoints;
return checkpoints.Count > 0 ? checkpoints[checkpoints.Count - 1] : null;
}
}
/// <inheritdoc cref="ICheckpointingHandle.RestoreCheckpointAsync"/>
public ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
=> this._checkpointingHandle.RestoreCheckpointAsync(checkpointInfo, cancellationToken);
}
@@ -1,66 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Shared.Diagnostics;
namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// Represents a workflow run that supports checkpointing.
/// </summary>
/// <typeparam name="TRun">The type of the underlying workflow run handle.</typeparam>
/// <seealso cref="Run"/>
/// <seealso cref="StreamingRun"/>
public sealed class Checkpointed<TRun> : IAsyncDisposable
{
private readonly ICheckpointingHandle _runner;
internal Checkpointed(TRun run, ICheckpointingHandle runner)
{
this.Run = Throw.IfNull(run);
this._runner = Throw.IfNull(runner);
}
/// <summary>
/// Gets the workflow run associated with this <see cref="Checkpointed{TRun}"/> instance.
/// </summary>
/// <seealso cref="Run"/>
/// <seealso cref="StreamingRun"/>
public TRun Run { get; }
/// <inheritdoc cref="ICheckpointingHandle.Checkpoints"/>
public IReadOnlyList<CheckpointInfo> Checkpoints => this._runner.Checkpoints;
/// <summary>
/// Gets the most recent checkpoint information.
/// </summary>
public CheckpointInfo? LastCheckpoint
{
get
{
var checkpoints = this.Checkpoints;
return checkpoints.Count > 0 ? checkpoints[checkpoints.Count - 1] : null;
}
}
/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
if (this.Run is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
}
else if (this.Run is IDisposable disposable)
{
disposable.Dispose();
}
}
/// <inheritdoc cref="ICheckpointingHandle.RestoreCheckpointAsync"/>
public ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
=> this._runner.RestoreCheckpointAsync(checkpointInfo, cancellationToken);
}
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Microsoft.Agents.AI.Workflows.Checkpointing;
@@ -27,4 +28,7 @@ internal sealed class CheckpointManagerImpl<TStoreObject> : ICheckpointManager
TStoreObject result = await this._store.RetrieveCheckpointAsync(runId, checkpointInfo).ConfigureAwait(false);
return this._marshaller.Marshal<Checkpoint>(result);
}
public ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null)
=> this._store.RetrieveIndexAsync(runId, withParent);
}
@@ -27,4 +27,16 @@ internal interface ICheckpointManager
/// cref="Checkpoint"/> associated with the specified <paramref name="checkpointInfo"/>.</returns>
/// <exception cref="KeyNotFoundException">Thrown if the checkpoint is not found.</exception>
ValueTask<Checkpoint> LookupCheckpointAsync(string runId, CheckpointInfo checkpointInfo);
/// <summary>
/// Asynchronously retrieves the collection of checkpoint information for the specified run identifier, optionally
/// filtered by a parent checkpoint.
/// </summary>
/// <param name="runId">The unique identifier of the run for which to retrieve checkpoint information. Cannot be null or empty.</param>
/// <param name="withParent">An optional parent checkpoint to filter the results. If specified, only checkpoints with the given parent are
/// returned; otherwise, all checkpoints for the run are included.</param>
/// <returns>A value task representing the asynchronous operation. The result contains a collection of <see
/// cref="CheckpointInfo"/> objects associated with the specified run. The collection is empty if no checkpoints are
/// found.</returns>
ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null);
}
@@ -8,8 +8,21 @@ namespace Microsoft.Agents.AI.Workflows.Checkpointing;
internal interface ICheckpointingHandle
{
// TODO: Convert this to a multi-timeline (e.g.: Live timeline + forks for orphaned checkpoints due to timetravel)
/// <summary>
/// Gets a value indicating whether checkpointing is enabled for the current operation or process.
/// </summary>
bool IsCheckpointingEnabled { get; }
/// <summary>
/// Gets a read-only list of checkpoint information associated with the current context.
/// </summary>
IReadOnlyList<CheckpointInfo> Checkpoints { get; }
/// <summary>
/// Restores the system state from the specified checkpoint asynchronously.
/// </summary>
/// <param name="checkpointInfo">The checkpoint information that identifies the state to restore. Cannot be null.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the restore operation.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous restore operation.</returns>
ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default);
}
@@ -60,4 +60,7 @@ internal sealed class InMemoryCheckpointManager : ICheckpointManager
public bool TryGetLastCheckpoint(string runId, [NotNullWhen(true)] out CheckpointInfo? checkpoint)
=> this.GetRunStore(runId).TryGetLastCheckpointInfo(out checkpoint);
public ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null)
=> new(this.GetRunStore(runId).CheckpointIndex.AsReadOnly());
}
@@ -46,6 +46,8 @@ internal sealed class AsyncRunHandle : ICheckpointingHandle, IAsyncDisposable
public string RunId => this._stepRunner.RunId;
public bool IsCheckpointingEnabled => this._checkpointingHandle.IsCheckpointingEnabled;
public IReadOnlyList<CheckpointInfo> Checkpoints => this._checkpointingHandle.Checkpoints;
public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default)
@@ -1,6 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Threading;
using System.Threading.Tasks;
@@ -8,12 +7,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution;
internal static class AsyncRunHandleExtensions
{
public static async ValueTask<Checkpointed<TRunType>> WithCheckpointingAsync<TRunType>(this AsyncRunHandle runHandle, Func<ValueTask<TRunType>> prepareFunc)
{
TRunType run = await prepareFunc().ConfigureAwait(false);
return new Checkpointed<TRunType>(run, runHandle);
}
public static async ValueTask<StreamingRun> EnqueueAndStreamAsync<TInput>(this AsyncRunHandle runHandle, TInput input, CancellationToken cancellationToken = default)
{
await runHandle.EnqueueMessageAsync(input, cancellationToken).ConfigureAwait(false);
@@ -8,7 +8,7 @@ namespace Microsoft.Agents.AI.Workflows.Execution;
internal interface ISuperStepJoinContext
{
bool WithCheckpointing { get; }
bool IsCheckpointingEnabled { get; }
bool ConcurrentRunsEnabled { get; }
ValueTask ForwardWorkflowEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
@@ -11,6 +11,11 @@ namespace Microsoft.Agents.AI.Workflows;
/// </summary>
public interface IWorkflowExecutionEnvironment
{
/// <summary>
/// Specifies whether Checkpointing is configured for this environment.
/// </summary>
bool IsCheckpointingEnabled { get; }
/// <summary>
/// Initiates a streaming run of the specified workflow without sending any initial input. Note that the starting
/// <see cref="Executor"/> will not be invoked until an input message is received.
@@ -37,36 +42,6 @@ public interface IWorkflowExecutionEnvironment
/// cref="StreamingRun"/> for managing and interacting with the streaming run.</returns>
ValueTask<StreamingRun> StreamAsync<TInput>(Workflow workflow, TInput input, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull;
/// <summary>
/// Initiates an asynchronous streaming execution without sending any initial input, with checkpointing.
/// </summary>
/// <remarks>The returned <see cref="StreamingRun"/> provides methods to observe and control
/// the ongoing streaming execution. The operation will continue until the streaming execution is finished or
/// cancelled.</remarks>
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
/// <param name="runId">An optional unique identifier for the run. If not provided, a new identifier will be generated.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="ValueTask{StreamingRun}"/> that represents the asynchronous operation. The result contains a <see
/// cref="StreamingRun"/> for managing and interacting with the streaming run.</returns>
ValueTask<Checkpointed<StreamingRun>> StreamAsync(Workflow workflow, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default);
/// <summary>
/// Initiates an asynchronous streaming execution using the specified input, with checkpointing.
/// </summary>
/// <remarks>The returned <see cref="StreamingRun"/> provides methods to observe and control
/// the ongoing streaming execution. The operation will continue until the streaming execution is finished or
/// cancelled.</remarks>
/// <typeparam name="TInput">The type of input accepted by the workflow. Must be non-nullable.</typeparam>
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
/// <param name="input">The input message to be processed as part of the streaming run.</param>
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
/// <param name="runId">An optional unique identifier for the run. If not provided, a new identifier will be generated.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="ValueTask{StreamingRun}"/> that represents the asynchronous operation. The result contains a <see
/// cref="StreamingRun"/> for managing and interacting with the streaming run.</returns>
ValueTask<Checkpointed<StreamingRun>> StreamAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull;
/// <summary>
/// Resumes an asynchronous streaming execution for the specified input from a checkpoint.
/// </summary>
@@ -74,10 +49,9 @@ public interface IWorkflowExecutionEnvironment
/// be terminated.</remarks>
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
/// <param name="fromCheckpoint">The <see cref="CheckpointInfo"/> corresponding to the checkpoint from which to resume.</param>
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="StreamingRun"/> that provides access to the results of the streaming run.</returns>
ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default);
ValueTask<StreamingRun> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CancellationToken cancellationToken = default);
/// <summary>
/// Initiates a non-streaming execution of the workflow with the specified input.
@@ -93,21 +67,6 @@ public interface IWorkflowExecutionEnvironment
/// cref="Run"/> for managing and interacting with the streaming run.</returns>
ValueTask<Run> RunAsync<TInput>(Workflow workflow, TInput input, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull;
/// <summary>
/// Initiates a non-streaming execution of the workflow with the specified input, with checkpointing.
/// </summary>
/// <remarks>The workflow will run until its first halt, and the returned <see cref="Run"/> will capture
/// all outgoing events. Use the <c>Run</c> instance to resume execution with responses to outgoing events.</remarks>
/// <typeparam name="TInput">The type of input accepted by the workflow. Must be non-nullable.</typeparam>
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
/// <param name="input">The input message to be processed as part of the run.</param>
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
/// <param name="runId">An optional unique identifier for the run. If not provided, a new identifier will be generated.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="ValueTask{Run}"/> that represents the asynchronous operation. The result contains a <see
/// cref="Run"/> for managing and interacting with the streaming run.</returns>
ValueTask<Checkpointed<Run>> RunAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull;
/// <summary>
/// Resumes a non-streaming execution of the workflow from a checkpoint.
/// </summary>
@@ -115,9 +74,8 @@ public interface IWorkflowExecutionEnvironment
/// all outgoing events. Use the <c>Run</c> instance to resume execution with responses to outgoing events.</remarks>
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
/// <param name="fromCheckpoint">The <see cref="CheckpointInfo"/> corresponding to the checkpoint from which to resume.</param>
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="ValueTask{Run}"/> that represents the asynchronous operation. The result contains a <see
/// cref="Run"/> for managing and interacting with the streaming run.</returns>
ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default);
ValueTask<Run> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CancellationToken cancellationToken = default);
}
@@ -2,9 +2,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Execution;
namespace Microsoft.Agents.AI.Workflows.InProc;
@@ -15,24 +15,44 @@ namespace Microsoft.Agents.AI.Workflows.InProc;
/// </summary>
public sealed class InProcessExecutionEnvironment : IWorkflowExecutionEnvironment
{
internal InProcessExecutionEnvironment(ExecutionMode mode, bool enableConcurrentRuns = false)
internal InProcessExecutionEnvironment(ExecutionMode mode, bool enableConcurrentRuns = false, CheckpointManager? checkpointManager = null)
{
this.ExecutionMode = mode;
this.EnableConcurrentRuns = enableConcurrentRuns;
this.CheckpointManager = checkpointManager;
}
/// <summary>
/// Configure a new execution environment, inheriting configuration for the current one with the specified <see cref="Workflows.CheckpointManager"/>
/// for use in checkpointing.
/// </summary>
/// <param name="checkpointManager">The CheckpointManager to use for checkpointing.</param>
/// <returns>
/// A new InProcess <see cref="IWorkflowExecutionEnvironment"/> configured for checkpointing, inheriting configuration from the current
/// environment.
/// </returns>
public InProcessExecutionEnvironment WithCheckpointing(CheckpointManager? checkpointManager)
{
return new(this.ExecutionMode, this.EnableConcurrentRuns, checkpointManager);
}
internal ExecutionMode ExecutionMode { get; }
internal bool EnableConcurrentRuns { get; }
internal CheckpointManager? CheckpointManager { get; }
internal ValueTask<AsyncRunHandle> BeginRunAsync(Workflow workflow, ICheckpointManager? checkpointManager, string? runId, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
/// <inheritdoc/>
public bool IsCheckpointingEnabled => this.CheckpointManager != null;
internal ValueTask<AsyncRunHandle> BeginRunAsync(Workflow workflow, string? runId, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
{
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, checkpointManager, runId, this.EnableConcurrentRuns, knownValidInputTypes);
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, this.CheckpointManager, runId, this.EnableConcurrentRuns, knownValidInputTypes);
return runner.BeginStreamAsync(this.ExecutionMode, cancellationToken);
}
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, ICheckpointManager? checkpointManager, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
{
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, checkpointManager, fromCheckpoint.RunId, this.EnableConcurrentRuns, knownValidInputTypes);
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, this.CheckpointManager, fromCheckpoint.RunId, this.EnableConcurrentRuns, knownValidInputTypes);
return runner.ResumeStreamAsync(this.ExecutionMode, fromCheckpoint, cancellationToken);
}
@@ -42,7 +62,7 @@ public sealed class InProcessExecutionEnvironment : IWorkflowExecutionEnvironmen
string? runId = null,
CancellationToken cancellationToken = default)
{
AsyncRunHandle runHandle = await this.BeginRunAsync(workflow, checkpointManager: null, runId: runId, [], cancellationToken)
AsyncRunHandle runHandle = await this.BeginRunAsync(workflow, runId, [], cancellationToken)
.ConfigureAwait(false);
return new(runHandle);
@@ -55,63 +75,42 @@ public sealed class InProcessExecutionEnvironment : IWorkflowExecutionEnvironmen
string? runId = null,
CancellationToken cancellationToken = default) where TInput : notnull
{
AsyncRunHandle runHandle = await this.BeginRunAsync(workflow, checkpointManager: null, runId: runId, [], cancellationToken)
AsyncRunHandle runHandle = await this.BeginRunAsync(workflow, runId, [], cancellationToken)
.ConfigureAwait(false);
return await runHandle.EnqueueAndStreamAsync(input, cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc/>
public async ValueTask<Checkpointed<StreamingRun>> StreamAsync(
Workflow workflow,
CheckpointManager checkpointManager,
string? runId = null,
CancellationToken cancellationToken = default)
[MemberNotNull(nameof(CheckpointManager))]
private void VerifyCheckpointingConfigured()
{
AsyncRunHandle runHandle = await this.BeginRunAsync(workflow, checkpointManager, runId: runId, [], cancellationToken)
.ConfigureAwait(false);
return await runHandle.WithCheckpointingAsync<StreamingRun>(() => new(new StreamingRun(runHandle)))
.ConfigureAwait(false);
if (this.CheckpointManager == null)
{
throw new InvalidOperationException("Checkpointing is not configured for this execution environment. Please use the InProcessExecutionEnvironment.WithCheckpointing method to attach a CheckpointManager.");
}
}
/// <inheritdoc/>
public async ValueTask<Checkpointed<StreamingRun>> StreamAsync<TInput>(
Workflow workflow,
TInput input,
CheckpointManager checkpointManager,
string? runId = null,
CancellationToken cancellationToken = default) where TInput : notnull
{
AsyncRunHandle runHandle = await this.BeginRunAsync(workflow, checkpointManager, runId: runId, [], cancellationToken)
.ConfigureAwait(false);
return await runHandle.WithCheckpointingAsync(() => runHandle.EnqueueAndStreamAsync(input, cancellationToken))
.ConfigureAwait(false);
}
/// <inheritdoc/>
public async ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(
public async ValueTask<StreamingRun> ResumeStreamAsync(
Workflow workflow,
CheckpointInfo fromCheckpoint,
CheckpointManager checkpointManager,
CancellationToken cancellationToken = default)
{
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, fromCheckpoint, [], cancellationToken)
this.VerifyCheckpointingConfigured();
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, fromCheckpoint, [], cancellationToken)
.ConfigureAwait(false);
return await runHandle.WithCheckpointingAsync<StreamingRun>(() => new(new StreamingRun(runHandle)))
.ConfigureAwait(false);
return new(runHandle);
}
private async ValueTask<AsyncRunHandle> BeginRunHandlingChatProtocolAsync<TInput>(Workflow workflow,
TInput input,
CheckpointManager? checkpointManager,
string? runId = null,
CancellationToken cancellationToken = default)
{
ProtocolDescriptor descriptor = await workflow.DescribeProtocolAsync(cancellationToken).ConfigureAwait(false);
AsyncRunHandle runHandle = await this.BeginRunAsync(workflow, checkpointManager, runId, descriptor.Accepts, cancellationToken)
AsyncRunHandle runHandle = await this.BeginRunAsync(workflow, runId, descriptor.Accepts, cancellationToken)
.ConfigureAwait(false);
await runHandle.EnqueueMessageAsync(input, cancellationToken).ConfigureAwait(false);
@@ -134,7 +133,6 @@ public sealed class InProcessExecutionEnvironment : IWorkflowExecutionEnvironmen
AsyncRunHandle runHandle = await this.BeginRunHandlingChatProtocolAsync(
workflow,
input,
checkpointManager: null,
runId,
cancellationToken)
.ConfigureAwait(false);
@@ -145,38 +143,16 @@ public sealed class InProcessExecutionEnvironment : IWorkflowExecutionEnvironmen
}
/// <inheritdoc/>
public async ValueTask<Checkpointed<Run>> RunAsync<TInput>(
Workflow workflow,
TInput input,
CheckpointManager checkpointManager,
string? runId = null,
CancellationToken cancellationToken = default) where TInput : notnull
{
AsyncRunHandle runHandle = await this.BeginRunHandlingChatProtocolAsync(
workflow,
input,
checkpointManager,
runId,
cancellationToken)
.ConfigureAwait(false);
Run run = new(runHandle);
await run.RunToNextHaltAsync(cancellationToken).ConfigureAwait(false);
return await runHandle.WithCheckpointingAsync(() => new ValueTask<Run>(run))
.ConfigureAwait(false);
}
/// <inheritdoc/>
public async ValueTask<Checkpointed<Run>> ResumeAsync(
public async ValueTask<Run> ResumeAsync(
Workflow workflow,
CheckpointInfo fromCheckpoint,
CheckpointManager checkpointManager,
CancellationToken cancellationToken = default)
{
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, fromCheckpoint, [], cancellationToken)
this.VerifyCheckpointingConfigured();
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, fromCheckpoint, [], cancellationToken)
.ConfigureAwait(false);
return await runHandle.WithCheckpointingAsync<Run>(() => new(new Run(runHandle)))
.ConfigureAwait(false);
return new(runHandle);
}
}
@@ -54,7 +54,7 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
this.StartExecutorId = workflow.StartExecutorId;
this.Workflow = Throw.IfNull(workflow);
this.RunContext = new InProcessRunnerContext(workflow, this.RunId, withCheckpointing: checkpointManager != null, this.OutgoingEvents, this.StepTracer, existingOwnerSignoff, subworkflow, enableConcurrentRuns);
this.RunContext = new InProcessRunnerContext(workflow, this.RunId, checkpointingEnabled: checkpointManager != null, this.OutgoingEvents, this.StepTracer, existingOwnerSignoff, subworkflow, enableConcurrentRuns);
this.CheckpointManager = checkpointManager;
this._knownValidInputTypes = knownValidInputTypes != null
@@ -161,6 +161,8 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
bool ISuperStepRunner.HasUnservicedRequests => this.RunContext.HasUnservicedRequests;
bool ISuperStepRunner.HasUnprocessedMessages => this.RunContext.NextStepHasActions;
public bool IsCheckpointingEnabled => this.RunContext.IsCheckpointingEnabled;
public IReadOnlyList<CheckpointInfo> Checkpoints => this._checkpoints;
async ValueTask<bool> ISuperStepRunner.RunSuperStepAsync(CancellationToken cancellationToken)
@@ -325,6 +327,8 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
throw new InvalidDataException("The specified checkpoint is not compatible with the workflow associated with this runner.");
}
ValueTask restoreCheckpointIndexTask = UpdateCheckpointIndexAsync();
await this.RunContext.StateManager.ImportStateAsync(checkpoint).ConfigureAwait(false);
await this.RunContext.ImportStateAsync(checkpoint).ConfigureAwait(false);
@@ -332,10 +336,18 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingHandle
ValueTask republishRequestsTask = this.RunContext.RepublishUnservicedRequestsAsync(cancellationToken);
await this.EdgeMap.ImportStateAsync(checkpoint).ConfigureAwait(false);
await Task.WhenAll(executorNotifyTask, republishRequestsTask.AsTask()).ConfigureAwait(false);
await Task.WhenAll(executorNotifyTask,
republishRequestsTask.AsTask(),
restoreCheckpointIndexTask.AsTask()).ConfigureAwait(false);
this._lastCheckpointInfo = checkpointInfo;
this.StepTracer.Reload(this.StepTracer.StepNumber);
async ValueTask UpdateCheckpointIndexAsync()
{
this._checkpoints.Clear();
this._checkpoints.AddRange(await this.CheckpointManager!.RetrieveIndexAsync(this.RunId).ConfigureAwait(false));
}
}
private bool CheckWorkflowMatch(Checkpoint checkpoint) =>
@@ -41,7 +41,7 @@ internal sealed class InProcessRunnerContext : IRunnerContext
public InProcessRunnerContext(
Workflow workflow,
string runId,
bool withCheckpointing,
bool checkpointingEnabled,
IEventSink outgoingEvents,
IStepTracer? stepTracer,
object? existingOwnershipSignoff = null,
@@ -66,7 +66,7 @@ internal sealed class InProcessRunnerContext : IRunnerContext
this._edgeMap = new(this, this._workflow, stepTracer);
this._outputFilter = new(workflow);
this.WithCheckpointing = withCheckpointing;
this.IsCheckpointingEnabled = checkpointingEnabled;
this.ConcurrentRunsEnabled = enableConcurrentRuns;
this.OutgoingEvents = outgoingEvents;
}
@@ -350,7 +350,7 @@ internal sealed class InProcessRunnerContext : IRunnerContext
public bool ConcurrentRunsEnabled => RunnerContext.ConcurrentRunsEnabled;
}
public bool WithCheckpointing { get; }
public bool IsCheckpointingEnabled { get; }
public bool ConcurrentRunsEnabled { get; }
internal Task PrepareForCheckpointAsync(CancellationToken cancellationToken = default)
@@ -49,27 +49,27 @@ public static class InProcessExecution
public static ValueTask<StreamingRun> StreamAsync<TInput>(Workflow workflow, TInput input, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
=> Default.StreamAsync(workflow, input, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.StreamAsync(Workflow, CheckpointManager, string?, CancellationToken)"/>
public static ValueTask<Checkpointed<StreamingRun>> StreamAsync(Workflow workflow, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default)
=> Default.StreamAsync(workflow, checkpointManager, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.OpenStreamAsync(Workflow, string?, CancellationToken)"/>
public static ValueTask<StreamingRun> OpenStreamAsync(Workflow workflow, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default)
=> Default.WithCheckpointing(checkpointManager).OpenStreamAsync(workflow, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.StreamAsync{TInput}(Workflow, TInput, CheckpointManager, string?, CancellationToken)"/>
public static ValueTask<Checkpointed<StreamingRun>> StreamAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
=> Default.StreamAsync(workflow, input, checkpointManager, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.StreamAsync{TInput}(Workflow, TInput, string?, CancellationToken)"/>
public static ValueTask<StreamingRun> StreamAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
=> Default.WithCheckpointing(checkpointManager).StreamAsync(workflow, input, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeStreamAsync(Workflow, CheckpointInfo, CheckpointManager, CancellationToken)"/>
public static ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default)
=> Default.ResumeStreamAsync(workflow, fromCheckpoint, checkpointManager, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeStreamAsync(Workflow, CheckpointInfo, CancellationToken)"/>
public static ValueTask<StreamingRun> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default)
=> Default.WithCheckpointing(checkpointManager).ResumeStreamAsync(workflow, fromCheckpoint, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.RunAsync{TInput}(Workflow, TInput, string?, CancellationToken)"/>
public static ValueTask<Run> RunAsync<TInput>(Workflow workflow, TInput input, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
=> Default.RunAsync(workflow, input, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.RunAsync{TInput}(Workflow, TInput, CheckpointManager, string?, CancellationToken)"/>
public static ValueTask<Checkpointed<Run>> RunAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
=> Default.RunAsync(workflow, input, checkpointManager, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.RunAsync{TInput}(Workflow, TInput, string?, CancellationToken)"/>
public static ValueTask<Run> RunAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
=> Default.WithCheckpointing(checkpointManager).RunAsync(workflow, input, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeAsync(Workflow, CheckpointInfo, CheckpointManager, CancellationToken)"/>
public static ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default)
=> Default.ResumeAsync(workflow, fromCheckpoint, checkpointManager, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeAsync(Workflow, CheckpointInfo, CancellationToken)"/>
public static ValueTask<Run> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default)
=> Default.WithCheckpointing(checkpointManager).ResumeAsync(workflow, fromCheckpoint, cancellationToken);
}
@@ -14,13 +14,13 @@ namespace Microsoft.Agents.AI.Workflows;
/// Represents a workflow run that tracks execution status and emitted workflow events, supporting resumption
/// with responses to <see cref="RequestInfoEvent"/>.
/// </summary>
public sealed class Run : IAsyncDisposable
public sealed class Run : CheckpointableRunBase, IAsyncDisposable
{
private readonly List<WorkflowEvent> _eventSink = [];
private readonly AsyncRunHandle _runHandle;
internal Run(AsyncRunHandle _runHandle)
internal Run(AsyncRunHandle runHandle) : base(runHandle)
{
this._runHandle = _runHandle;
this._runHandle = runHandle;
}
internal async ValueTask<bool> RunToNextHaltAsync(CancellationToken cancellationToken = default)
@@ -81,7 +81,7 @@ internal class WorkflowHostExecutor : Executor, IAsyncDisposable
{
if (this._activeRunner == null)
{
if (this.JoinContext.WithCheckpointing)
if (this.JoinContext.IsCheckpointingEnabled)
{
// Use a seprate in-memory checkpoint manager for scoping purposes. We do not need to worry about
// serialization because we will be relying on the parent workflow's checkpoint manager to do that,
@@ -14,11 +14,11 @@ namespace Microsoft.Agents.AI.Workflows;
/// A <see cref="Workflow"/> run instance supporting a streaming form of receiving workflow events, and providing
/// a mechanism to send responses back to the workflow.
/// </summary>
public sealed class StreamingRun : IAsyncDisposable
public sealed class StreamingRun : CheckpointableRunBase, IAsyncDisposable
{
private readonly AsyncRunHandle _runHandle;
internal StreamingRun(AsyncRunHandle runHandle)
internal StreamingRun(AsyncRunHandle runHandle) : base(runHandle)
{
this._runHandle = Throw.IfNull(runHandle);
}
@@ -7,6 +7,7 @@ using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.InProc;
using Microsoft.Extensions.AI;
using Microsoft.Shared.Diagnostics;
@@ -16,7 +17,6 @@ internal sealed class WorkflowHostAgent : AIAgent
{
private readonly Workflow _workflow;
private readonly string? _id;
private readonly CheckpointManager? _checkpointManager;
private readonly IWorkflowExecutionEnvironment _executionEnvironment;
private readonly bool _includeExceptionDetails;
private readonly bool _includeWorkflowOutputsInResponse;
@@ -24,14 +24,22 @@ internal sealed class WorkflowHostAgent : AIAgent
private readonly ConcurrentDictionary<string, string> _assignedRunIds = [];
public WorkflowHostAgent(Workflow workflow, string? id = null, string? name = null, string? description = null, CheckpointManager? checkpointManager = null, IWorkflowExecutionEnvironment? executionEnvironment = null, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false)
public WorkflowHostAgent(Workflow workflow, string? id = null, string? name = null, string? description = null, IWorkflowExecutionEnvironment? executionEnvironment = null, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false)
{
this._workflow = Throw.IfNull(workflow);
this._executionEnvironment = executionEnvironment ?? (workflow.AllowConcurrent
? InProcessExecution.Concurrent
: InProcessExecution.OffThread);
this._checkpointManager = checkpointManager;
if (!this._executionEnvironment.IsCheckpointingEnabled &&
this._executionEnvironment is not InProcessExecutionEnvironment)
{
// Cannot have an implicit CheckpointManager for non-InProcessExecution environments (or others that
// support BYO Checkpointing.
throw new InvalidOperationException("Cannot use a non-checkpointed execution environment. Implicit checkpointing is supported only for InProcess.");
}
this._includeExceptionDetails = includeExceptionDetails;
this._includeWorkflowOutputsInResponse = includeWorkflowOutputsInResponse;
@@ -66,7 +74,7 @@ internal sealed class WorkflowHostAgent : AIAgent
}
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default)
=> new(new WorkflowSession(this._workflow, this.GenerateNewId(), this._executionEnvironment, this._checkpointManager, this._includeExceptionDetails, this._includeWorkflowOutputsInResponse));
=> new(new WorkflowSession(this._workflow, this.GenerateNewId(), this._executionEnvironment, this._includeExceptionDetails, this._includeWorkflowOutputsInResponse));
protected override ValueTask<JsonElement> SerializeSessionCoreAsync(AgentSession session, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
{
@@ -81,7 +89,7 @@ internal sealed class WorkflowHostAgent : AIAgent
}
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> new(new WorkflowSession(this._workflow, serializedState, this._executionEnvironment, this._checkpointManager, this._includeExceptionDetails, this._includeWorkflowOutputsInResponse, jsonSerializerOptions));
=> new(new WorkflowSession(this._workflow, serializedState, this._executionEnvironment, this._includeExceptionDetails, this._includeWorkflowOutputsInResponse, jsonSerializerOptions));
private async ValueTask<WorkflowSession> UpdateSessionAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, CancellationToken cancellationToken = default)
{
@@ -17,7 +17,6 @@ public static class WorkflowHostingExtensions
/// <param name="id">A unique id for the hosting <see cref="AIAgent"/>.</param>
/// <param name="name">A name for the hosting <see cref="AIAgent"/>.</param>
/// <param name="description">A description for the hosting <see cref="AIAgent"/>.</param>
/// <param name="checkpointManager">A <see cref="CheckpointManager"/> to enable persistence of run state.</param>
/// <param name="executionEnvironment">Specify the execution environment to use when running the workflows. See
/// <see cref="InProcessExecution.OffThread"/>, <see cref="InProcessExecution.Concurrent"/> and
/// <see cref="InProcessExecution.Lockstep"/> for the in-process environments.</param>
@@ -31,12 +30,11 @@ public static class WorkflowHostingExtensions
string? id = null,
string? name = null,
string? description = null,
CheckpointManager? checkpointManager = null,
IWorkflowExecutionEnvironment? executionEnvironment = null,
bool includeExceptionDetails = false,
bool includeWorkflowOutputsInResponse = false)
{
return new WorkflowHostAgent(workflow, id, name, description, checkpointManager, executionEnvironment, includeExceptionDetails, includeWorkflowOutputsInResponse);
return new WorkflowHostAgent(workflow, id, name, description, executionEnvironment, includeExceptionDetails, includeWorkflowOutputsInResponse);
}
internal static FunctionCallContent ToFunctionCall(this ExternalRequest request)
@@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
@@ -9,6 +10,7 @@ using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.InProc;
using Microsoft.Extensions.AI;
using Microsoft.Shared.Diagnostics;
@@ -21,26 +23,48 @@ internal sealed class WorkflowSession : AgentSession
private readonly bool _includeExceptionDetails;
private readonly bool _includeWorkflowOutputsInResponse;
private readonly CheckpointManager _checkpointManager;
private readonly InMemoryCheckpointManager? _inMemoryCheckpointManager;
private InMemoryCheckpointManager? _inMemoryCheckpointManager;
public WorkflowSession(Workflow workflow, string runId, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false)
internal static bool VerifyCheckpointingConfiguration(IWorkflowExecutionEnvironment executionEnvironment, [NotNullWhen(true)] out InProcessExecutionEnvironment? inProcEnv)
{
inProcEnv = null;
if (executionEnvironment.IsCheckpointingEnabled)
{
return false;
}
if ((inProcEnv = executionEnvironment as InProcessExecutionEnvironment) == null)
{
throw new InvalidOperationException("Cannot use a non-checkpointed execution environment. Implicit checkpointing is supported only for InProcess.");
}
return true;
}
public WorkflowSession(Workflow workflow, string runId, IWorkflowExecutionEnvironment executionEnvironment, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false)
{
this._workflow = Throw.IfNull(workflow);
this._executionEnvironment = Throw.IfNull(executionEnvironment);
this._includeExceptionDetails = includeExceptionDetails;
this._includeWorkflowOutputsInResponse = includeWorkflowOutputsInResponse;
// If the user provided an external checkpoint manager, use that, otherwise rely on an in-memory one.
// TODO: Implement persist-only-last functionality for in-memory checkpoint manager, to avoid unbounded
// memory growth.
this._checkpointManager = checkpointManager ?? new(this._inMemoryCheckpointManager = new());
if (VerifyCheckpointingConfiguration(executionEnvironment, out InProcessExecutionEnvironment? inProcEnv))
{
// We have an InProcessExecutionEnvironment which is not configured for checkpointing. Ensure it has an externalizable checkpoint manager,
// since we are responsible for maintaining the state.
this._executionEnvironment = inProcEnv.WithCheckpointing(this.EnsureExternalizedInMemoryCheckpointing());
}
this.RunId = Throw.IfNullOrEmpty(runId);
this.ChatHistoryProvider = new WorkflowChatHistoryProvider();
}
public WorkflowSession(Workflow workflow, JsonElement serializedSession, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false, JsonSerializerOptions? jsonSerializerOptions = null)
private CheckpointManager EnsureExternalizedInMemoryCheckpointing()
{
return new(this._inMemoryCheckpointManager ??= new());
}
public WorkflowSession(Workflow workflow, JsonElement serializedSession, IWorkflowExecutionEnvironment executionEnvironment, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false, JsonSerializerOptions? jsonSerializerOptions = null)
{
this._workflow = Throw.IfNull(workflow);
this._executionEnvironment = Throw.IfNull(executionEnvironment);
@@ -51,26 +75,20 @@ internal sealed class WorkflowSession : AgentSession
SessionState sessionState = marshaller.Marshal<SessionState>(serializedSession);
this._inMemoryCheckpointManager = sessionState.CheckpointManager;
if (this._inMemoryCheckpointManager is not null && checkpointManager is not null)
if (this._inMemoryCheckpointManager != null &&
VerifyCheckpointingConfiguration(executionEnvironment, out InProcessExecutionEnvironment? inProcEnv))
{
// The session was externalized with an in-memory checkpoint manager, but the caller is providing an external one.
throw new ArgumentException("Cannot provide an external checkpoint manager when deserializing a session that " +
"was serialized with an in-memory checkpoint manager.", nameof(checkpointManager));
this._executionEnvironment = inProcEnv.WithCheckpointing(this.EnsureExternalizedInMemoryCheckpointing());
}
else if (this._inMemoryCheckpointManager is null && checkpointManager is null)
else if (this._inMemoryCheckpointManager != null)
{
// The session was externalized without an in-memory checkpoint manager, and the caller is not providing an external one.
throw new ArgumentException("An external checkpoint manager must be provided when deserializing a session that " +
"was serialized without an in-memory checkpoint manager.", nameof(checkpointManager));
}
else
{
this._checkpointManager = checkpointManager ?? new(this._inMemoryCheckpointManager!);
throw new ArgumentException("The session was saved with an externalized checkpoint manager, but the incoming execution environment does not support it.", nameof(executionEnvironment));
}
this.RunId = sessionState.RunId;
this.LastCheckpoint = sessionState.LastCheckpoint;
this.ChatHistoryProvider = new WorkflowChatHistoryProvider();
this.LastCheckpoint = sessionState.LastCheckpoint;
this.StateBag = sessionState.StateBag;
}
@@ -123,28 +141,26 @@ internal sealed class WorkflowSession : AgentSession
return update;
}
private async ValueTask<Checkpointed<StreamingRun>> CreateOrResumeRunAsync(List<ChatMessage> messages, CancellationToken cancellationToken = default)
private async ValueTask<StreamingRun> CreateOrResumeRunAsync(List<ChatMessage> messages, CancellationToken cancellationToken = default)
{
// The workflow is validated to be a ChatProtocol workflow by the WorkflowHostAgent before creating the session,
// and does not need to be checked again here.
if (this.LastCheckpoint is not null)
{
Checkpointed<StreamingRun> checkpointed =
StreamingRun run =
await this._executionEnvironment
.ResumeStreamAsync(this._workflow,
this.LastCheckpoint,
this._checkpointManager,
cancellationToken)
.ConfigureAwait(false);
await checkpointed.Run.TrySendMessageAsync(messages).ConfigureAwait(false);
return checkpointed;
await run.TrySendMessageAsync(messages).ConfigureAwait(false);
return run;
}
return await this._executionEnvironment
.StreamAsync(this._workflow,
messages,
this._checkpointManager,
this.RunId,
cancellationToken)
.ConfigureAwait(false);
@@ -160,11 +176,10 @@ internal sealed class WorkflowSession : AgentSession
List<ChatMessage> messages = this.ChatHistoryProvider.GetFromBookmark(this).ToList();
#pragma warning disable CA2007 // Analyzer misfiring and not seeing .ConfigureAwait(false) below.
await using Checkpointed<StreamingRun> checkpointed =
await using StreamingRun run =
await this.CreateOrResumeRunAsync(messages, cancellationToken).ConfigureAwait(false);
#pragma warning restore CA2007
StreamingRun run = checkpointed.Run;
await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cancellationToken)
.ConfigureAwait(false)
@@ -68,7 +68,7 @@ internal sealed class WorkflowRunner
checkpointManager = CheckpointManager.CreateInMemory();
}
Checkpointed<StreamingRun> run = await InProcessExecution.StreamAsync(workflow, input, checkpointManager).ConfigureAwait(false);
StreamingRun run = await InProcessExecution.StreamAsync(workflow, input, checkpointManager).ConfigureAwait(false);
bool isComplete = false;
ExternalResponse? requestResponse = null;
@@ -107,7 +107,7 @@ internal sealed class WorkflowRunner
Notify("\nWORKFLOW: Done!\n");
}
public async Task<ExternalRequest?> MonitorAndDisposeWorkflowRunAsync(Checkpointed<StreamingRun> run, ExternalResponse? response = null)
public async Task<ExternalRequest?> MonitorAndDisposeWorkflowRunAsync(StreamingRun run, ExternalResponse? response = null)
{
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
await using IAsyncDisposable disposeRun = run;
@@ -121,10 +121,10 @@ internal sealed class WorkflowRunner
if (response is not null)
{
await run.Run.SendResponseAsync(response).ConfigureAwait(false);
await run.SendResponseAsync(response).ConfigureAwait(false);
}
await foreach (WorkflowEvent workflowEvent in run.Run.WatchStreamAsync().ConfigureAwait(false))
await foreach (WorkflowEvent workflowEvent in run.WatchStreamAsync().ConfigureAwait(false))
{
switch (workflowEvent)
{
@@ -45,7 +45,7 @@ internal sealed class WorkflowHarness(Workflow workflow, string runId)
public async Task<WorkflowEvents> RunWorkflowAsync<TInput>(TInput input, bool useJson = false) where TInput : notnull
{
Console.WriteLine("RUNNING WORKFLOW...");
Checkpointed<StreamingRun> run = await InProcessExecution.StreamAsync(workflow, input, this.GetCheckpointManager(useJson), runId);
StreamingRun run = await InProcessExecution.StreamAsync(workflow, input, this.GetCheckpointManager(useJson), runId);
IReadOnlyList<WorkflowEvent> workflowEvents = await MonitorAndDisposeWorkflowRunAsync(run).ToArrayAsync();
this._lastCheckpoint = workflowEvents.OfType<SuperStepCompletedEvent>().LastOrDefault()?.CompletionInfo?.Checkpoint;
return new WorkflowEvents(workflowEvents);
@@ -55,7 +55,7 @@ internal sealed class WorkflowHarness(Workflow workflow, string runId)
{
Console.WriteLine("\nRESUMING WORKFLOW...");
Assert.NotNull(this._lastCheckpoint);
Checkpointed<StreamingRun> run = await InProcessExecution.ResumeStreamAsync(workflow, this._lastCheckpoint, this.GetCheckpointManager());
StreamingRun run = await InProcessExecution.ResumeStreamAsync(workflow, this._lastCheckpoint, this.GetCheckpointManager());
IReadOnlyList<WorkflowEvent> workflowEvents = await MonitorAndDisposeWorkflowRunAsync(run, response).ToArrayAsync();
this._lastCheckpoint = workflowEvents.OfType<SuperStepCompletedEvent>().LastOrDefault()?.CompletionInfo?.Checkpoint;
return new WorkflowEvents(workflowEvents);
@@ -97,19 +97,19 @@ internal sealed class WorkflowHarness(Workflow workflow, string runId)
return this._checkpointManager;
}
private static async IAsyncEnumerable<WorkflowEvent> MonitorAndDisposeWorkflowRunAsync(Checkpointed<StreamingRun> run, ExternalResponse? response = null)
private static async IAsyncEnumerable<WorkflowEvent> MonitorAndDisposeWorkflowRunAsync(StreamingRun run, ExternalResponse? response = null)
{
await using IAsyncDisposable disposeRun = run;
if (response is not null)
{
await run.Run.SendResponseAsync(response).ConfigureAwait(false);
await run.SendResponseAsync(response).ConfigureAwait(false);
}
bool exitLoop = false;
bool hasRequest = false;
await foreach (WorkflowEvent workflowEvent in run.Run.WatchStreamAsync().ConfigureAwait(false))
await foreach (WorkflowEvent workflowEvent in run.WatchStreamAsync().ConfigureAwait(false))
{
switch (workflowEvent)
{
@@ -9,6 +9,7 @@ using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.InProc;
using Microsoft.Extensions.AI;
#pragma warning disable SYSLIB1045 // Use GeneratedRegex
@@ -389,7 +390,7 @@ public class AgentWorkflowBuilderTests
{
StringBuilder sb = new();
IWorkflowExecutionEnvironment environment = executionEnvironment.ToWorkflowExecutionEnvironment();
InProcessExecutionEnvironment environment = executionEnvironment.ToWorkflowExecutionEnvironment();
await using StreamingRun run = await environment.StreamAsync(workflow, input);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
@@ -5,6 +5,7 @@ using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.InProc;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
@@ -28,14 +29,14 @@ public class CheckpointParentTests
.Build();
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
// Act
Checkpointed<StreamingRun> checkpointed =
await env.StreamAsync(workflow, "Hello", checkpointManager);
StreamingRun run =
await env.WithCheckpointing(checkpointManager).StreamAsync(workflow, "Hello");
List<CheckpointInfo> checkpoints = [];
await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
@@ -68,16 +69,15 @@ public class CheckpointParentTests
.Build();
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
// Act
await using Checkpointed<StreamingRun> checkpointed =
await env.StreamAsync(workflow, "Hello", checkpointManager);
await using StreamingRun run = await env.WithCheckpointing(checkpointManager).StreamAsync(workflow, "Hello");
List<CheckpointInfo> checkpoints = [];
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync(cts.Token))
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
@@ -123,15 +123,14 @@ public class CheckpointParentTests
.Build();
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
// First run: collect a checkpoint to resume from
await using Checkpointed<StreamingRun> checkpointed =
await env.StreamAsync(workflow, "Hello", checkpointManager);
await using StreamingRun run = await env.WithCheckpointing(checkpointManager).StreamAsync(workflow, "Hello");
List<CheckpointInfo> firstRunCheckpoints = [];
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync(cts.Token))
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
@@ -147,15 +146,14 @@ public class CheckpointParentTests
CheckpointInfo resumePoint = firstRunCheckpoints[0];
// Dispose the first run to release workflow ownership before resuming.
await checkpointed.DisposeAsync();
await run.DisposeAsync();
// Act: Resume from the first checkpoint
Checkpointed<StreamingRun> resumed =
await env.ResumeStreamAsync(workflow, resumePoint, checkpointManager);
StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamAsync(workflow, resumePoint);
List<CheckpointInfo> resumedCheckpoints = [];
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.Run.WatchStreamAsync(cts2.Token))
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
@@ -1,12 +1,13 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using Microsoft.Agents.AI.Workflows.InProc;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
internal static class ExecutionExtensions
{
public static IWorkflowExecutionEnvironment ToWorkflowExecutionEnvironment(this ExecutionEnvironment environment)
public static InProcessExecutionEnvironment ToWorkflowExecutionEnvironment(this ExecutionEnvironment environment)
{
return environment switch
{
@@ -131,11 +131,11 @@ public partial class InProcessStateTests
.AddEdge(writer, validator, MaxTurns(4))
.AddEdge(validator, writer, MaxTurns(4)).Build();
Checkpointed<Run> checkpointed = await InProcessExecution.RunAsync<TurnToken>(workflow, new(), CheckpointManager.Default);
Run checkpointed = await InProcessExecution.RunAsync<TurnToken>(workflow, new(), CheckpointManager.Default);
checkpointed.Checkpoints.Should().HaveCount(4);
RunStatus status = await checkpointed.Run.GetStatusAsync();
RunStatus status = await checkpointed.GetStatusAsync();
status.Should().Be(RunStatus.Idle);
writer.Completed.Should().BeTrue();
@@ -6,12 +6,13 @@ using System.IO;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.InProc;
namespace Microsoft.Agents.AI.Workflows.Sample;
internal static class Step5EntryPoint
{
public static async ValueTask<string> RunAsync(TextWriter writer, Func<string, int> userGuessCallback, IWorkflowExecutionEnvironment environment, bool rehydrateToRestore = false, CheckpointManager? checkpointManager = null)
public static async ValueTask<string> RunAsync(TextWriter writer, Func<string, int> userGuessCallback, InProcessExecutionEnvironment environment, bool rehydrateToRestore = false, CheckpointManager? checkpointManager = null)
{
Dictionary<CheckpointInfo, (NumberSignal signal, string? prompt)> checkpointedOutputs = [];
@@ -22,14 +23,14 @@ internal static class Step5EntryPoint
Workflow workflow = Step4EntryPoint.CreateWorkflowInstance(out JudgeExecutor judge);
Checkpointed<StreamingRun> checkpointed =
await environment.StreamAsync(workflow, NumberSignal.Init, checkpointManager)
StreamingRun handle =
await environment.WithCheckpointing(checkpointManager)
.StreamAsync(workflow, NumberSignal.Init)
.ConfigureAwait(false);
List<CheckpointInfo> checkpoints = [];
CancellationTokenSource cancellationSource = new();
StreamingRun handle = checkpointed.Run;
string? result = await RunStreamToHaltOrMaxStepAsync(maxStep: 6).ConfigureAwait(false);
result.Should().BeNull();
@@ -42,13 +43,13 @@ internal static class Step5EntryPoint
{
await handle.DisposeAsync().ConfigureAwait(false);
checkpointed = await environment.ResumeStreamAsync(workflow, targetCheckpoint, checkpointManager, cancellationToken: CancellationToken.None)
.ConfigureAwait(false);
handle = checkpointed.Run;
handle = await environment.WithCheckpointing(checkpointManager)
.ResumeStreamAsync(workflow, targetCheckpoint, CancellationToken.None)
.ConfigureAwait(false);
}
else
{
await checkpointed.RestoreCheckpointAsync(checkpoints[2], CancellationToken.None).ConfigureAwait(false);
await handle.RestoreCheckpointAsync(checkpoints[2], CancellationToken.None).ConfigureAwait(false);
}
(signal, prompt) = checkpointedOutputs[targetCheckpoint];
@@ -48,10 +48,9 @@ internal static class Step13EntryPoint
return session;
}
public static async ValueTask<CheckpointInfo> RunAsync(TextWriter writer, string input, IWorkflowExecutionEnvironment environment, CheckpointManager checkpointManager, CheckpointInfo? resumeFrom)
public static async ValueTask<CheckpointInfo> RunAsync(TextWriter writer, string input, IWorkflowExecutionEnvironment environment, CheckpointInfo? resumeFrom)
{
await using Checkpointed<StreamingRun> checkpointed = await BeginAsync();
StreamingRun run = checkpointed.Run;
await using StreamingRun run = await BeginAsync();
await run.TrySendMessageAsync(new TurnToken());
@@ -80,16 +79,16 @@ internal static class Step13EntryPoint
return lastCheckpoint!;
async ValueTask<Checkpointed<StreamingRun>> BeginAsync()
async ValueTask<StreamingRun> BeginAsync()
{
if (resumeFrom == null)
{
return await environment.StreamAsync(WorkflowInstance, input, checkpointManager);
return await environment.StreamAsync(WorkflowInstance, input);
}
Checkpointed<StreamingRun> checkpointed = await environment.ResumeStreamAsync(WorkflowInstance, resumeFrom, checkpointManager);
await checkpointed.Run.TrySendMessageAsync(input);
return checkpointed;
StreamingRun run = await environment.ResumeStreamAsync(WorkflowInstance, resumeFrom);
await run.TrySendMessageAsync(input);
return run;
}
}
}
@@ -7,6 +7,7 @@ using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.InProc;
using Microsoft.Agents.AI.Workflows.Sample;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
@@ -378,9 +379,9 @@ public class SampleSmokeTest
[InlineData(ExecutionEnvironment.InProcess_Concurrent)]
internal async Task Test_RunSample_Step13Async(ExecutionEnvironment environment)
{
IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment();
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment().WithCheckpointing(checkpointManager);
CheckpointInfo? resumeFrom = null;
await RunAndValidateAsync(1);
@@ -393,7 +394,7 @@ public class SampleSmokeTest
using StringWriter writer = new();
string input = $"[{step}] Hello, World!";
resumeFrom = await Step13EntryPoint.RunAsync(writer, input, executionEnvironment, checkpointManager, resumeFrom);
resumeFrom = await Step13EntryPoint.RunAsync(writer, input, executionEnvironment, resumeFrom);
string result = writer.ToString();
string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries);
@@ -144,7 +144,7 @@ public class TestRunContext : IRunnerContext
public Dictionary<string, Executor> Executors { get; set; } = [];
public string StartingExecutorId { get; set; } = string.Empty;
public bool WithCheckpointing => false;
public bool IsCheckpointingEnabled => false;
public bool ConcurrentRunsEnabled => false;
WorkflowTelemetryContext IRunnerContext.TelemetryContext => WorkflowTelemetryContext.Disabled;