.NET Workflows - Expose SendMessage override to all platforms (#1741)

* For real

* Fine-tune

* One more miss
This commit is contained in:
Chris
2025-10-28 05:29:01 -07:00
committed by GitHub
Unverified
parent 742203fb12
commit b70030daec
4 changed files with 75 additions and 70 deletions
@@ -73,7 +73,7 @@ internal sealed class InvokeAzureAgentExecutor(InvokeAzureAgent model, WorkflowA
{
isComplete = false;
UserInputRequest approvalRequest = new(agentName, inputRequests.OfType<AIContent>().ToArray());
await context.SendMessageAsync(approvalRequest, targetId: null, cancellationToken).ConfigureAwait(false);
await context.SendMessageAsync(approvalRequest, cancellationToken).ConfigureAwait(false);
}
// Identify function calls that have no associated result.
@@ -82,7 +82,7 @@ internal sealed class InvokeAzureAgentExecutor(InvokeAzureAgent model, WorkflowA
{
isComplete = false;
AgentFunctionToolRequest toolRequest = new(agentName, functionCalls);
await context.SendMessageAsync(toolRequest, targetId: null, cancellationToken).ConfigureAwait(false);
await context.SendMessageAsync(toolRequest, cancellationToken).ConfigureAwait(false);
}
}
@@ -76,7 +76,7 @@ internal sealed class QuestionExecutor(Question model, WorkflowAgentProvider age
{
int count = await this._promptCount.ReadAsync(context).ConfigureAwait(false);
AnswerRequest inputRequest = new(this.FormatPrompt(this.Model.Prompt));
await context.SendMessageAsync(inputRequest, targetId: null, cancellationToken).ConfigureAwait(false);
await context.SendMessageAsync(inputRequest, cancellationToken).ConfigureAwait(false);
await this._promptCount.WriteAsync(context, count + 1).ConfigureAwait(false);
}
@@ -7,60 +7,6 @@ using System.Threading.Tasks;
namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// Provides extension methods for working with <see cref="IWorkflowContext"/> instances.
/// </summary>
public static class WorkflowContextExtensions
{
/// <summary>
/// Invokes an asynchronous operation that reads, updates, and persists workflow state associated with the specified
/// key.
/// </summary>
/// <typeparam name="TState">The type of the state object to read, update, and persist.</typeparam>
/// <param name="context">The workflow context used to access and update state.</param>
/// <param name="invocation">A delegate that receives the current state, workflow context, and cancellation token, and returns the updated
/// state asynchronously.</param>
/// <param name="key">The key identifying the state to read and update. Cannot be null or empty.</param>
/// <param name="scopeName">An optional scope name that further qualifies the state key. If null, the default scope is used.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <returns>A ValueTask that represents the asynchronous operation.</returns>
public static async ValueTask InvokeWithStateAsync<TState>(this IWorkflowContext context,
Func<TState?, IWorkflowContext, CancellationToken, ValueTask<TState?>> invocation,
string key,
string? scopeName = null,
CancellationToken cancellationToken = default)
{
TState? state = await context.ReadStateAsync<TState>(key, scopeName, cancellationToken).ConfigureAwait(false);
state = await invocation(state, context, cancellationToken).ConfigureAwait(false);
await context.QueueStateUpdateAsync(key, state, scopeName, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Invokes an asynchronous operation that reads, updates, and persists workflow state associated with the specified
/// key.
/// </summary>
/// <typeparam name="TState">The type of the state object to read, update, and persist.</typeparam>
/// <param name="context">The workflow context used to access and update state.</param>
/// <param name="invocation">A delegate that receives the current state, workflow context, and cancellation token, and returns the updated
/// state asynchronously.</param>
/// <param name="key">The key identifying the state to read and update. Cannot be null or empty.</param>
/// <param name="initialStateFactory">A factory to initialize state to if it is not set at the provided key.</param>
/// <param name="scopeName">An optional scope name that further qualifies the state key. If null, the default scope is used.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <returns>A ValueTask that represents the asynchronous operation.</returns>
public static async ValueTask InvokeWithStateAsync<TState>(this IWorkflowContext context,
Func<TState, IWorkflowContext, CancellationToken, ValueTask<TState?>> invocation,
string key,
Func<TState> initialStateFactory,
string? scopeName = null,
CancellationToken cancellationToken = default)
{
TState? state = await context.ReadOrInitStateAsync(key, initialStateFactory, scopeName, cancellationToken).ConfigureAwait(false);
state = await invocation(state, context, cancellationToken).ConfigureAwait(false);
await context.QueueStateUpdateAsync(key, state ?? initialStateFactory(), scopeName, cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// Provides services for an <see cref="Executor"/> during the execution of a workflow.
/// </summary>
@@ -86,19 +32,7 @@ public interface IWorkflowContext
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="ValueTask"/> representing the asynchronous operation.</returns>
ValueTask SendMessageAsync(object message, string? targetId = null, CancellationToken cancellationToken = default);
#if NET // What's the right way to do this so we do not make life a misery for netstandard2.0 targets?
// What's the value if they have to still write `cancellationToken: cancellationToken` to skip the targetId parameter?
// TODO: Remove this? (Maybe not: NET will eventually be the only target framework, right?)
/// <summary>
/// Queues a message to be sent to connected executors. The message will be sent during the next SuperStep.
/// </summary>
/// <param name="message">The message to be sent.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.</param>
/// <returns>A <see cref="ValueTask"/> representing the asynchronous operation.</returns>
ValueTask SendMessageAsync(object message, CancellationToken cancellationToken) => this.SendMessageAsync(message, null, cancellationToken);
#endif
ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken = default);
/// <summary>
/// Adds an output value to the workflow's output queue. These outputs will be bubbled out of the workflow using the
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// Provides extension methods for working with <see cref="IWorkflowContext"/> instances.
/// </summary>
public static class IWorkflowContextExtensions
{
/// <summary>
/// Invokes an asynchronous operation that reads, updates, and persists workflow state associated with the specified
/// key.
/// </summary>
/// <typeparam name="TState">The type of the state object to read, update, and persist.</typeparam>
/// <param name="context">The workflow context used to access and update state.</param>
/// <param name="invocation">A delegate that receives the current state, workflow context, and cancellation token, and returns the updated
/// state asynchronously.</param>
/// <param name="key">The key identifying the state to read and update. Cannot be null or empty.</param>
/// <param name="scopeName">An optional scope name that further qualifies the state key. If null, the default scope is used.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <returns>A ValueTask that represents the asynchronous operation.</returns>
public static async ValueTask InvokeWithStateAsync<TState>(this IWorkflowContext context,
Func<TState?, IWorkflowContext, CancellationToken, ValueTask<TState?>> invocation,
string key,
string? scopeName = null,
CancellationToken cancellationToken = default)
{
TState? state = await context.ReadStateAsync<TState>(key, scopeName, cancellationToken).ConfigureAwait(false);
state = await invocation(state, context, cancellationToken).ConfigureAwait(false);
await context.QueueStateUpdateAsync(key, state, scopeName, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Invokes an asynchronous operation that reads, updates, and persists workflow state associated with the specified
/// key.
/// </summary>
/// <typeparam name="TState">The type of the state object to read, update, and persist.</typeparam>
/// <param name="context">The workflow context used to access and update state.</param>
/// <param name="invocation">A delegate that receives the current state, workflow context, and cancellation token, and returns the updated
/// state asynchronously.</param>
/// <param name="key">The key identifying the state to read and update. Cannot be null or empty.</param>
/// <param name="initialStateFactory">A factory to initialize state to if it is not set at the provided key.</param>
/// <param name="scopeName">An optional scope name that further qualifies the state key. If null, the default scope is used.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <returns>A ValueTask that represents the asynchronous operation.</returns>
public static async ValueTask InvokeWithStateAsync<TState>(this IWorkflowContext context,
Func<TState, IWorkflowContext, CancellationToken, ValueTask<TState?>> invocation,
string key,
Func<TState> initialStateFactory,
string? scopeName = null,
CancellationToken cancellationToken = default)
{
TState? state = await context.ReadOrInitStateAsync(key, initialStateFactory, scopeName, cancellationToken).ConfigureAwait(false);
state = await invocation(state, context, cancellationToken).ConfigureAwait(false);
await context.QueueStateUpdateAsync(key, state ?? initialStateFactory(), scopeName, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Queues a message to be sent to connected executors. The message will be sent during the next SuperStep.
/// </summary>
/// <param name="context">The workflow context used to access and update state.</param>
/// <param name="message">The message to be sent.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.</param>
/// <returns>A <see cref="ValueTask"/> representing the asynchronous operation.</returns>
public static ValueTask SendMessageAsync(this IWorkflowContext context, object message, CancellationToken cancellationToken = default) =>
context.SendMessageAsync(message, null, cancellationToken);
}