Merge branch 'main' into openai_response_agent_completeness

This commit is contained in:
Giles Odigwe
2025-09-12 09:15:58 -07:00
committed by GitHub
Unverified
53 changed files with 1020 additions and 137 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
persist-credentials: false
+1 -1
View File
@@ -47,7 +47,7 @@ jobs:
runs-on: ${{ matrix.os }}
environment: ${{ matrix.environment }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
persist-credentials: false
sparse-checkout: |
+1 -1
View File
@@ -30,7 +30,7 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
fetch-depth: 0
persist-credentials: false
+1 -1
View File
@@ -27,7 +27,7 @@ jobs:
env:
UV_PYTHON: ${{ matrix.python-version }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
+4 -4
View File
@@ -28,7 +28,7 @@ jobs:
outputs:
pythonChanges: ${{ steps.filter.outputs.python}}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- uses: dorny/paths-filter@v3
id: filter
with:
@@ -66,7 +66,7 @@ jobs:
run:
working-directory: python
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
@@ -127,7 +127,7 @@ jobs:
run:
working-directory: python
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
@@ -194,7 +194,7 @@ jobs:
run:
working-directory: python
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
+1 -1
View File
@@ -23,7 +23,7 @@ jobs:
run:
working-directory: python
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
@@ -19,9 +19,9 @@ jobs:
run:
working-directory: python
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Download coverage report
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
with:
github-token: ${{ secrets.GH_ACTIONS_PR_WRITE }}
run-id: ${{ github.event.workflow_run.id }}
@@ -39,7 +39,7 @@ jobs:
echo "PR_NUMBER=$PR_NUMBER" >> $GITHUB_ENV
- name: Pytest coverage comment
id: coverageComment
uses: MishaKav/pytest-coverage-comment@v1.1.53
uses: MishaKav/pytest-coverage-comment@v1.1.56
with:
github-token: ${{ secrets.GH_ACTIONS_PR_WRITE }}
issue-number: ${{ env.PR_NUMBER }}
+1 -1
View File
@@ -20,7 +20,7 @@ jobs:
env:
UV_PYTHON: "3.10"
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
# Save the PR number to a file since the workflow_run event
# in the coverage report workflow does not have access to it
- name: Save PR number
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
run:
working-directory: python
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
+3 -3
View File
@@ -74,14 +74,14 @@
<PackageVersion Include="Microsoft.SemanticKernel.Agents.OpenAI" Version="1.65.0-preview" />
<PackageVersion Include="Microsoft.SemanticKernel.Agents.AzureAI" Version="1.65.0-preview" />
<!-- Agent SDKs -->
<PackageVersion Include="Microsoft.Agents.CopilotStudio.Client" Version="1.1.151" />
<PackageVersion Include="Microsoft.Agents.CopilotStudio.Client" Version="1.2.41" />
<!-- A2A -->
<PackageVersion Include="A2A" Version="0.1.0-preview.2" />
<PackageVersion Include="A2A.AspNetCore" Version="0.1.0-preview.2" />
<!-- MCP -->
<PackageVersion Include="ModelContextProtocol" Version="0.3.0-preview.4" />
<!-- Inference SDKs -->
<PackageVersion Include="Microsoft.ML.OnnxRuntimeGenAI" Version="0.9.0" />
<PackageVersion Include="Microsoft.ML.OnnxRuntimeGenAI" Version="0.9.1" />
<PackageVersion Include="OllamaSharp" Version="5.3.6" />
<!-- Identity -->
<PackageVersion Include="Microsoft.Identity.Client.Extensions.Msal" Version="4.74.1" />
@@ -91,7 +91,7 @@
<PackageVersion Include="Microsoft.Bot.ObjectModel.PowerFx" Version="2025.8.2-1" />
<PackageVersion Include="Microsoft.PowerFx.Interpreter" Version="1.4.0-build.20250625-1002" />
<!-- Test -->
<PackageVersion Include="FluentAssertions" Version="8.5.0" />
<PackageVersion Include="FluentAssertions" Version="8.6.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageVersion Include="Moq" Version="[4.18.4]" />
<PackageVersion Include="xunit" Version="2.9.3" />
@@ -30,7 +30,7 @@ namespace SampleApp
// Custom agent that parrot's the user input back in upper case.
internal sealed class UpperCaseParrotAgent : AIAgent
{
public override async Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
// Create a thread if the user didn't supply one.
thread ??= this.GetNewThread();
@@ -50,7 +50,7 @@ namespace SampleApp
};
}
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Create a thread if the user didn't supply one.
thread ??= this.GetNewThread();
@@ -76,7 +76,7 @@ namespace SampleApp
}
}
private static IEnumerable<ChatMessage> CloneAndToUpperCase(IReadOnlyCollection<ChatMessage> messages, string agentName) => messages.Select(x =>
private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
{
// Clone the message and update its author to be the agent.
var messageClone = x.Clone();
@@ -82,7 +82,7 @@ namespace SampleApp
public string? ThreadId => this._threadId;
public async Task AddMessagesAsync(IReadOnlyCollection<ChatMessage> messages, CancellationToken cancellationToken)
public async Task AddMessagesAsync(IEnumerable<ChatMessage> messages, CancellationToken cancellationToken)
{
this._threadId ??= Guid.NewGuid().ToString();
@@ -57,11 +57,11 @@ public partial class ConcurrentOrchestration : OrchestratingAgent
}
/// <inheritdoc />
protected override Task<AgentRunResponse> RunCoreAsync(IReadOnlyCollection<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
this.ResumeAsync(messages, new AgentRunResponse?[this.Agents.Count], context, cancellationToken);
protected override Task<AgentRunResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
this.ResumeAsync(messages as IReadOnlyCollection<ChatMessage> ?? messages.ToList(), new AgentRunResponse?[this.Agents.Count], context, cancellationToken);
/// <inheritdoc />
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IEnumerable<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.ConcurrentState) ?? throw new InvalidOperationException("The checkpoint state is invalid.");
@@ -43,7 +43,7 @@ public sealed partial class GroupChatOrchestration : OrchestratingAgent
public Func<ValueTask<ChatMessage>>? InteractiveCallback { get; set; }
/// <inheritdoc />
protected override Task<AgentRunResponse> RunCoreAsync(IReadOnlyCollection<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken)
protected override Task<AgentRunResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
List<ChatMessage> allMessages = [.. messages];
int originalMessageCount = allMessages.Count;
@@ -51,7 +51,7 @@ public sealed partial class GroupChatOrchestration : OrchestratingAgent
}
/// <inheritdoc />
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IEnumerable<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.GroupChatState) ?? throw new InvalidOperationException("The checkpoint state is invalid.");
@@ -44,7 +44,7 @@ public sealed partial class HandoffOrchestration : OrchestratingAgent
public Func<ValueTask<ChatMessage>>? InteractiveCallback { get; set; }
/// <inheritdoc />
protected override Task<AgentRunResponse> RunCoreAsync(IReadOnlyCollection<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken)
protected override Task<AgentRunResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
List<ChatMessage> allMessages = [.. messages];
int originalMessageCount = allMessages.Count;
@@ -52,7 +52,7 @@ public sealed partial class HandoffOrchestration : OrchestratingAgent
}
/// <inheritdoc />
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IEnumerable<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.HandoffState) ?? throw new InvalidOperationException("The checkpoint state is invalid.");
@@ -65,7 +65,7 @@ public abstract partial class OrchestratingAgent : AIAgent
/// <inheritdoc />
public sealed override async Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(messages);
@@ -87,7 +87,7 @@ public abstract partial class OrchestratingAgent : AIAgent
/// <inheritdoc />
public sealed override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// TODO: There should be a RunAsync overload that returns an OrchestratingAgentStreamingResponse, which this then delegates to.
@@ -106,12 +106,12 @@ public abstract partial class OrchestratingAgent : AIAgent
/// <param name="runtime">The runtime associated with the orchestration.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
public async ValueTask<OrchestratingAgentResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentRunOptions? options = null,
IActorRuntimeContext? runtime = null,
CancellationToken cancellationToken = default)
{
Throw.IfNull(messages, nameof(messages));
var readonlyCollectionMessages = Throw.IfNull(messages) as IReadOnlyCollection<ChatMessage> ?? messages.ToList();
cancellationToken.ThrowIfCancellationRequested();
ILogger logger = this.LoggerFactory.CreateLogger(this.GetType().Name);
@@ -131,8 +131,8 @@ public abstract partial class OrchestratingAgent : AIAgent
JsonElement? checkpoint = await this.ReadCheckpointAsync(context, cancellationToken).ConfigureAwait(false);
Task<AgentRunResponse> completion = checkpoint is null ?
this.RunCoreAsync(messages, context, cancellationToken) :
this.ResumeCoreAsync(checkpoint.Value, messages, context, cancellationToken);
this.RunCoreAsync(readonlyCollectionMessages, context, cancellationToken) :
this.ResumeCoreAsync(checkpoint.Value, readonlyCollectionMessages, context, cancellationToken);
if (logger.IsEnabled(LogLevel.Trace))
{
@@ -148,7 +148,7 @@ public abstract partial class OrchestratingAgent : AIAgent
/// <param name="messages">The input message.</param>
/// <param name="context">The context for this operation.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
protected abstract Task<AgentRunResponse> RunCoreAsync(IReadOnlyCollection<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken);
protected abstract Task<AgentRunResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken);
/// <summary>
/// Resumes processing of the orchestration.
@@ -157,7 +157,7 @@ public abstract partial class OrchestratingAgent : AIAgent
/// <param name="newMessages">The new messages to be processed in addition to the checkpoint state.</param>
/// <param name="context">The context for this operation.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
protected abstract Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken);
protected abstract Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IEnumerable<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken);
/// <summary>
/// Runs the agent with input messages and respond with both streamed and regular messages.
@@ -168,7 +168,7 @@ public abstract partial class OrchestratingAgent : AIAgent
/// <param name="options">Options to use when invoking the agent.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
/// <returns>A task that returns the response <see cref="ChatMessage"/>.</returns>
protected static async ValueTask<AgentRunResponse> RunAsync(AIAgent agent, OrchestratingAgentContext context, IReadOnlyCollection<ChatMessage> input, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
protected static async ValueTask<AgentRunResponse> RunAsync(AIAgent agent, OrchestratingAgentContext context, IEnumerable<ChatMessage> input, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
// Utilize streaming iff a streaming callback is provided; otherwise, use the non-streaming API.
AgentRunResponse response;
@@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@@ -28,11 +29,11 @@ public sealed partial class SequentialOrchestration : OrchestratingAgent
}
/// <inheritdoc />
protected override Task<AgentRunResponse> RunCoreAsync(IReadOnlyCollection<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
this.ResumeAsync(0, messages, context, cancellationToken);
protected override Task<AgentRunResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
this.ResumeAsync(0, messages as IReadOnlyCollection<ChatMessage> ?? messages.ToList(), context, cancellationToken);
/// <inheritdoc />
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IEnumerable<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.SequentialState) ?? throw new InvalidOperationException("The checkpoint state is invalid.");
@@ -96,7 +96,7 @@ internal class WorkflowHostAgent : AIAgent
}
}
private async ValueTask<WorkflowThread> UpdateThreadAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, CancellationToken cancellation = default)
private async ValueTask<WorkflowThread> UpdateThreadAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, CancellationToken cancellation = default)
{
if (thread is null)
{
@@ -114,7 +114,7 @@ internal class WorkflowHostAgent : AIAgent
public override async
Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -134,7 +134,7 @@ internal class WorkflowHostAgent : AIAgent
public override async
IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
@@ -25,7 +25,7 @@ internal class WorkflowMessageStore : IChatMessageStore
this._chatMessages.AddRange(messages);
}
public Task AddMessagesAsync(IReadOnlyCollection<ChatMessage> messages, CancellationToken cancellationToken)
public Task AddMessagesAsync(IEnumerable<ChatMessage> messages, CancellationToken cancellationToken)
{
this._chatMessages.AddRange(messages);
@@ -52,7 +52,7 @@ internal sealed class A2AAgent : AIAgent
}
/// <inheritdoc/>
public override async Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
ValidateInputMessages(messages);
@@ -98,7 +98,7 @@ internal sealed class A2AAgent : AIAgent
}
/// <inheritdoc/>
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ValidateInputMessages(messages);
@@ -147,7 +147,7 @@ internal sealed class A2AAgent : AIAgent
/// <inheritdoc/>
public override string? Description => this._description ?? base.Description;
private static void ValidateInputMessages(IReadOnlyCollection<ChatMessage> messages)
private static void ValidateInputMessages(IEnumerable<ChatMessage> messages)
{
_ = Throw.IfNull(messages);
@@ -11,7 +11,7 @@ namespace Microsoft.Extensions.AI.Agents.A2A;
/// </summary>
internal static class ChatMessageExtensions
{
internal static Message ToA2AMessage(this IReadOnlyCollection<ChatMessage> messages)
internal static Message ToA2AMessage(this IEnumerable<ChatMessage> messages)
{
List<Part> allParts = [];
@@ -112,7 +112,7 @@ public abstract class AIAgent
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
return this.RunAsync((IReadOnlyCollection<ChatMessage>)[], thread, options, cancellationToken);
return this.RunAsync((IEnumerable<ChatMessage>)[], thread, options, cancellationToken);
}
/// <summary>
@@ -174,7 +174,7 @@ public abstract class AIAgent
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="AgentRunResponse"/> containing the list of <see cref="ChatMessage"/> items.</returns>
public abstract Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default);
@@ -194,7 +194,7 @@ public abstract class AIAgent
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
return this.RunStreamingAsync((IReadOnlyCollection<ChatMessage>)[], thread, options, cancellationToken);
return this.RunStreamingAsync((IEnumerable<ChatMessage>)[], thread, options, cancellationToken);
}
/// <summary>
@@ -256,7 +256,7 @@ public abstract class AIAgent
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>An async list of response items that each contain a <see cref="AgentRunResponseUpdate"/>.</returns>
public abstract IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default);
@@ -285,14 +285,11 @@ public abstract class AIAgent
/// <param name="messages">The messages to pass to the thread.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>An async task that completes once the notification is complete.</returns>
protected static async Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IReadOnlyCollection<ChatMessage> messages, CancellationToken cancellationToken)
protected static async Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IEnumerable<ChatMessage> messages, CancellationToken cancellationToken)
{
_ = Throw.IfNull(thread);
_ = Throw.IfNull(messages);
if (messages.Count > 0)
{
await thread.OnNewMessagesAsync(messages, cancellationToken).ConfigureAwait(false);
}
await thread.OnNewMessagesAsync(messages, cancellationToken).ConfigureAwait(false);
}
}
@@ -139,7 +139,7 @@ public class AgentThread
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task that completes when the context has been updated.</returns>
/// <exception cref="InvalidOperationException">The thread has been deleted.</exception>
protected internal virtual async Task OnNewMessagesAsync(IReadOnlyCollection<ChatMessage> newMessages, CancellationToken cancellationToken = default)
protected internal virtual async Task OnNewMessagesAsync(IEnumerable<ChatMessage> newMessages, CancellationToken cancellationToken = default)
{
switch (this)
{
@@ -54,7 +54,7 @@ public class DelegatingAIAgent : AIAgent
/// <inheritdoc />
public override Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -62,7 +62,7 @@ public class DelegatingAIAgent : AIAgent
/// <inheritdoc />
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -42,7 +42,7 @@ public interface IChatMessageStore
/// <param name="messages">The messages to add.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>An async task.</returns>
Task AddMessagesAsync(IReadOnlyCollection<ChatMessage> messages, CancellationToken cancellationToken = default);
Task AddMessagesAsync(IEnumerable<ChatMessage> messages, CancellationToken cancellationToken = default);
/// <summary>
/// Deserializes the state contained in the provided <see cref="JsonElement"/> into the properties on this store.
@@ -61,15 +61,10 @@ public sealed class InMemoryChatMessageStore : IList<ChatMessage>, IChatMessageS
}
/// <inheritdoc />
public async Task AddMessagesAsync(IReadOnlyCollection<ChatMessage> messages, CancellationToken cancellationToken)
public async Task AddMessagesAsync(IEnumerable<ChatMessage> messages, CancellationToken cancellationToken)
{
_ = Throw.IfNull(messages);
if (messages.Count == 0)
{
return;
}
this._messages.AddRange(messages);
if (this._reducerTriggerEvent == ChatReducerTriggerEvent.AfterMessageAdded && this._chatReducer is not null)
@@ -172,7 +167,7 @@ public sealed class InMemoryChatMessageStore : IList<ChatMessage>, IChatMessageS
{
/// <summary>
/// Trigger the reducer when a new message is added.
/// <see cref="AddMessagesAsync(IReadOnlyCollection{ChatMessage}, CancellationToken)"/> will only complete when reducer processing is done.
/// <see cref="AddMessagesAsync(IEnumerable{ChatMessage}, CancellationToken)"/> will only complete when reducer processing is done.
/// </summary>
AfterMessageAdded,
@@ -41,7 +41,7 @@ public class CopilotStudioAgent : AIAgent
/// <inheritdoc/>
public override async Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -74,7 +74,7 @@ public class CopilotStudioAgent : AIAgent
/// <inheritdoc/>
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
@@ -45,7 +45,7 @@ public sealed class AgentProxy : AIAgent
/// <inheritdoc/>
public override async Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -57,7 +57,7 @@ public sealed class AgentProxy : AIAgent
/// <inheritdoc/>
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
@@ -70,7 +70,7 @@ public sealed class AgentProxy : AIAgent
}
}
private async Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, string threadId, CancellationToken cancellationToken)
private async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, string threadId, CancellationToken cancellationToken)
{
var handle = await this.RunCoreAsync(messages, threadId, cancellationToken).ConfigureAwait(false);
var response = await handle.GetResponseAsync(cancellationToken).ConfigureAwait(false);
@@ -85,7 +85,7 @@ public sealed class AgentProxy : AIAgent
}
private async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
string threadId,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
@@ -123,7 +123,7 @@ public sealed class AgentProxy : AIAgent
return agentProxyThread.ConversationId!;
}
private async Task<ActorResponseHandle> RunCoreAsync(IReadOnlyCollection<ChatMessage> messages, string threadId, CancellationToken cancellationToken)
private async Task<ActorResponseHandle> RunCoreAsync(IEnumerable<ChatMessage> messages, string threadId, CancellationToken cancellationToken)
{
List<ChatMessage> newMessages = [.. messages];
@@ -85,7 +85,7 @@ public class OpenAIChatClientAgent : AIAgent
/// <inheritdoc/>
public sealed override Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<Microsoft.Extensions.AI.ChatMessage> messages,
IEnumerable<Microsoft.Extensions.AI.ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -93,7 +93,7 @@ public class OpenAIChatClientAgent : AIAgent
/// <inheritdoc/>
public sealed override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<Microsoft.Extensions.AI.ChatMessage> messages,
IEnumerable<Microsoft.Extensions.AI.ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -99,15 +100,15 @@ public sealed class ChatClientAgent : AIAgent
/// <inheritdoc/>
public override async Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(messages);
var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection<ChatMessage> ?? messages.ToList();
(AgentThread safeThread, ChatOptions? chatOptions, List<ChatMessage> threadMessages) =
await this.PrepareThreadAndMessagesAsync(thread, messages, options, cancellationToken).ConfigureAwait(false);
await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false);
var agentName = this.GetLoggingAgentName();
@@ -115,14 +116,14 @@ public sealed class ChatClientAgent : AIAgent
ChatResponse chatResponse = await this.ChatClient.GetResponseAsync(threadMessages, chatOptions, cancellationToken).ConfigureAwait(false);
this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, agentName, this._chatClientType, messages.Count);
this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, agentName, this._chatClientType, inputMessages.Count);
// We can derive the type of supported thread from whether we have a conversation id,
// so let's update it and set the conversation id for the service thread case.
this.UpdateThreadWithTypeAndConversationId(safeThread, chatResponse.ConversationId);
// Only notify the thread of new messages if the chatResponse was successful to avoid inconsistent messages state in the thread.
await NotifyThreadOfNewMessagesAsync(safeThread, messages, cancellationToken).ConfigureAwait(false);
await NotifyThreadOfNewMessagesAsync(safeThread, inputMessages, cancellationToken).ConfigureAwait(false);
// Ensure that the author name is set for each message in the response.
foreach (ChatMessage chatResponseMessage in chatResponse.Messages)
@@ -140,12 +141,12 @@ public sealed class ChatClientAgent : AIAgent
/// <inheritdoc/>
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var inputMessages = Throw.IfNull(messages);
var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection<ChatMessage> ?? messages.ToList();
(AgentThread safeThread, ChatOptions? chatOptions, List<ChatMessage> threadMessages) =
await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false);
@@ -334,7 +335,7 @@ public sealed class ChatClientAgent : AIAgent
/// <returns>A tuple containing the thread, chat options, and thread messages.</returns>
private async Task<(AgentThread AgentThread, ChatOptions? ChatOptions, List<ChatMessage> ThreadMessages)> PrepareThreadAndMessagesAsync(
AgentThread? thread,
IReadOnlyCollection<ChatMessage> inputMessages,
IEnumerable<ChatMessage> inputMessages,
AgentRunOptions? runOptions,
CancellationToken cancellationToken)
{
@@ -123,23 +123,23 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable
/// <inheritdoc/>
public override async Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(messages);
var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection<ChatMessage> ?? messages.ToList();
using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, messages, thread);
using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, inputMessages, thread);
Stopwatch? stopwatch = this._operationDurationHistogram.Enabled ? Stopwatch.StartNew() : null;
this.LogChatMessages(messages);
this.LogChatMessages(inputMessages);
AgentRunResponse? response = null;
Exception? error = null;
try
{
response = await base.RunAsync(messages, thread, options, cancellationToken).ConfigureAwait(false);
response = await base.RunAsync(inputMessages, thread, options, cancellationToken).ConfigureAwait(false);
return response;
}
catch (Exception ex)
@@ -149,30 +149,30 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable
}
finally
{
this.TraceResponse(activity, response, error, stopwatch, messages.Count, isStreaming: false);
this.TraceResponse(activity, response, error, stopwatch, inputMessages.Count, isStreaming: false);
}
}
/// <inheritdoc/>
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(messages);
var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection<ChatMessage> ?? messages.ToList();
using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, messages, thread);
using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, inputMessages, thread);
Stopwatch? stopwatch = this._operationDurationHistogram.Enabled ? Stopwatch.StartNew() : null;
IAsyncEnumerable<AgentRunResponseUpdate> updates;
try
{
updates = base.RunStreamingAsync(messages, thread, options, cancellationToken);
updates = base.RunStreamingAsync(inputMessages, thread, options, cancellationToken);
}
catch (Exception ex)
{
this.TraceResponse(activity, response: null, ex, stopwatch, messages.Count, isStreaming: true);
this.TraceResponse(activity, response: null, ex, stopwatch, inputMessages.Count, isStreaming: true);
throw;
}
@@ -206,7 +206,7 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable
}
finally
{
this.TraceResponse(activity, trackedUpdates.ToAgentRunResponse(), error, stopwatch, messages.Count, isStreaming: true);
this.TraceResponse(activity, trackedUpdates.ToAgentRunResponse(), error, stopwatch, inputMessages.Count, isStreaming: true);
await responseEnumerator.DisposeAsync().ConfigureAwait(false);
}
}
@@ -36,7 +36,7 @@ internal sealed class MockAgent(int index) : AIAgent
return new AgentThread() { ConversationId = Guid.NewGuid().ToString() };
}
public override Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
this.InvokeCount++;
if (thread == null)
@@ -48,7 +48,7 @@ internal sealed class MockAgent(int index) : AIAgent
return Task.FromResult(new AgentRunResponse(messages: [.. this.Response]));
}
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
this.InvokeCount++;
@@ -80,18 +80,18 @@ public class OrchestrationResultTests
private sealed class MockOrchestratingAgent() : OrchestratingAgent([new MockAgent()])
{
protected override Task<AgentRunResponse> RunCoreAsync(IReadOnlyCollection<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
protected override Task<AgentRunResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
throw new NotSupportedException();
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
protected override Task<AgentRunResponse> ResumeCoreAsync(JsonElement checkpointState, IEnumerable<ChatMessage> newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
throw new NotSupportedException();
}
private sealed class MockAgent : AIAgent
{
public override Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) =>
public override Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) =>
throw new NotSupportedException();
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) =>
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) =>
throw new NotSupportedException();
}
}
@@ -22,12 +22,12 @@ public class RepresentationTests
private sealed class TestAgent : AIAgent
{
public override Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
@@ -95,7 +95,7 @@ internal sealed class HelloAgent(string id = nameof(HelloAgent)) : AIAgent
public override string Id => id;
public override string? Name => id;
public override async Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
IEnumerable<AgentRunResponseUpdate> update = [
await this.RunStreamingAsync(messages, thread, options, cancellationToken)
@@ -105,7 +105,7 @@ internal sealed class HelloAgent(string id = nameof(HelloAgent)) : AIAgent
return update.ToAgentRunResponse();
}
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
AgentRunResponseUpdate response = new(ChatRole.Assistant, "Hello World!")
{
@@ -126,7 +126,7 @@ internal sealed class EchoAgent(string id = nameof(EchoAgent)) : AIAgent
public override string Id => id;
public override string? Name => id;
public override async Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
IEnumerable<AgentRunResponseUpdate> update = [
await this.RunStreamingAsync(messages, thread, options, cancellationToken)
@@ -136,15 +136,17 @@ internal sealed class EchoAgent(string id = nameof(EchoAgent)) : AIAgent
return update.ToAgentRunResponse();
}
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (messages.Count == 0)
var messagesList = messages as IReadOnlyCollection<ChatMessage> ?? messages.ToList();
if (messagesList.Count == 0)
{
throw new ArgumentException("No messages provided to echo.", nameof(messages));
}
StringBuilder collectedText = new(Prefix);
foreach (string messageText in messages.Select(message => message.Text)
foreach (string messageText in messagesList.Select(message => message.Text)
.Where(text => !string.IsNullOrEmpty(text)))
{
collectedText.AppendLine(messageText);
@@ -56,7 +56,7 @@ public class SpecializedExecutorSmokeTests
public List<ChatMessage> Messages { get; } = Validate(messages) ?? [];
public override Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
return Task.FromResult(new AgentRunResponse(this.Messages)
{
@@ -65,7 +65,7 @@ public class SpecializedExecutorSmokeTests
});
}
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
string responseId = Guid.NewGuid().ToString("N");
foreach (ChatMessage message in this.Messages)
@@ -361,13 +361,13 @@ public class AIAgentTests
private sealed class MockAgent : AIAgent
{
public static new Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IReadOnlyCollection<ChatMessage> messages, CancellationToken cancellationToken)
public static new Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IEnumerable<ChatMessage> messages, CancellationToken cancellationToken)
{
return AIAgent.NotifyThreadOfNewMessagesAsync(thread, messages, cancellationToken);
}
public override Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -376,7 +376,7 @@ public class AIAgentTests
}
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -175,7 +175,7 @@ public class AgentActorTests
public bool RunStreamingAsyncCalled { get; private set; }
public AgentThread? ThreadUsedInRunStreamingAsync { get; private set; }
public override Task<AgentRunResponse> RunAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
public override Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
this.ThreadUsedInRunStreamingAsync = thread;
return Task.FromResult(new AgentRunResponse
@@ -184,7 +184,7 @@ public class AgentActorTests
});
}
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IReadOnlyCollection<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
this.RunStreamingAsyncCalled = true;
this.ThreadUsedInRunStreamingAsync = thread;
@@ -306,7 +306,7 @@ public class AgentAIFunctionFactoryTests
public int RunAsyncCallCount { get; private set; }
public override Task<AgentRunResponse> RunAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -324,7 +324,7 @@ public class AgentAIFunctionFactoryTests
}
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IReadOnlyCollection<ChatMessage> messages,
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
@@ -579,10 +579,10 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
TextContent(text=message_content.refusal, raw_representation=message_content)
)
case "reasoning": # ResponseOutputReasoning
if item.content:
if hasattr(item, "content") and item.content:
for index, reasoning_content in enumerate(item.content):
additional_properties = None
if item.summary and index < len(item.summary):
if hasattr(item, "summary") and item.summary and index < len(item.summary):
additional_properties = {"summary": item.summary[index]}
contents.append(
TextReasoningContent(
@@ -592,7 +592,7 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
)
)
case "code_interpreter_call": # ResponseOutputCodeInterpreterCall
if item.outputs:
if hasattr(item, "outputs") and item.outputs:
for code_output in item.outputs:
if code_output.type == "logs":
contents.append(TextContent(text=code_output.logs, raw_representation=item))
@@ -605,16 +605,16 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
media_type="image",
)
)
elif item.code:
elif hasattr(item, "code") and item.code:
# fallback if no output was returned is the code:
contents.append(TextContent(text=item.code, raw_representation=item))
case "function_call": # ResponseOutputFunctionCall
contents.append(
FunctionCallContent(
call_id=item.call_id if item.call_id else "",
name=item.name,
arguments=item.arguments,
additional_properties={"fc_id": item.id},
call_id=item.call_id if hasattr(item, "call_id") and item.call_id else "",
name=item.name if hasattr(item, "name") else "",
arguments=item.arguments if hasattr(item, "arguments") else "",
additional_properties={"fc_id": item.id} if hasattr(item, "id") else {},
raw_representation=item,
)
)
@@ -739,6 +739,18 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
case "response.output_text.delta":
contents.append(TextContent(text=event.delta, raw_representation=event))
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_text.delta":
contents.append(TextReasoningContent(text=event.delta, raw_representation=event))
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_text.done":
contents.append(TextReasoningContent(text=event.text, raw_representation=event))
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.delta":
contents.append(TextReasoningContent(text=event.delta, raw_representation=event))
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.done":
contents.append(TextReasoningContent(text=event.text, raw_representation=event))
metadata.update(self._get_metadata_from_response(event))
case "response.completed":
conversation_id = event.response.id if chat_options.store is True else None
model = event.response.model
@@ -779,7 +791,7 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
)
)
case "code_interpreter_call": # ResponseOutputCodeInterpreterCall
if event_item.outputs:
if hasattr(event_item, "outputs") and event_item.outputs:
for code_output in event_item.outputs:
if code_output.type == "logs":
contents.append(TextContent(text=code_output.logs, raw_representation=event_item))
@@ -792,14 +804,18 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
media_type="image",
)
)
elif event_item.code:
elif hasattr(event_item, "code") and event_item.code:
# fallback if no output was returned is the code:
contents.append(TextContent(text=event_item.code, raw_representation=event_item))
case "reasoning": # ResponseOutputReasoning
if event_item.content:
if hasattr(event_item, "content") and event_item.content:
for index, reasoning_content in enumerate(event_item.content):
additional_properties = None
if event_item.summary and index < len(event_item.summary):
if (
hasattr(event_item, "summary")
and event_item.summary
and index < len(event_item.summary)
):
additional_properties = {"summary": event_item.summary[index]}
contents.append(
TextReasoningContent(
@@ -56,6 +56,7 @@ _IMPORTS = [
"PlanReviewRequest",
"RequestInfoEvent",
"StandardMagenticManager",
"ConcurrentBuilder",
]
@@ -8,6 +8,7 @@ from agent_framework_workflow import (
AgentRunUpdateEvent,
Case,
CheckpointStorage,
ConcurrentBuilder,
Default,
Executor,
ExecutorCompletedEvent,
@@ -56,6 +57,7 @@ __all__ = [
"AgentRunUpdateEvent",
"Case",
"CheckpointStorage",
"ConcurrentBuilder",
"Default",
"Executor",
"ExecutorCompletedEvent",
+1
View File
@@ -33,6 +33,7 @@ dependencies = [
"azure-monitor-opentelemetry>=1.7.0",
"azure-monitor-opentelemetry-exporter>=1.0.0b41",
"opentelemetry-exporter-otlp-proto-grpc>=1.36.0",
"opentelemetry-semantic-conventions-ai>=0.4.13"
]
[project.optional-dependencies]
@@ -1494,6 +1494,7 @@ def test_service_response_exception_includes_original_error_details() -> None:
assert original_error_message in exception_message
def test_get_streaming_response_with_response_format() -> None:
"""Test get_streaming_response with response_format."""
client = OpenAIResponsesClient(ai_model_id="test-model", api_key="test-key")
@@ -9,6 +9,7 @@ from ._checkpoint import (
InMemoryCheckpointStorage,
WorkflowCheckpoint,
)
from ._concurrent import ConcurrentBuilder
from ._const import (
DEFAULT_MAX_ITERATIONS,
)
@@ -93,6 +94,7 @@ __all__ = [
"AgentRunUpdateEvent",
"Case",
"CheckpointStorage",
"ConcurrentBuilder",
"Default",
"EdgeDuplicationError",
"Executor",
@@ -0,0 +1,304 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import inspect
import logging
from collections.abc import Callable, Sequence
from typing import Any
from agent_framework import AgentProtocol, ChatMessage, Role
from ._events import WorkflowCompletedEvent
from ._executor import AgentExecutorRequest, AgentExecutorResponse, Executor, handler
from ._workflow import Workflow, WorkflowBuilder
from ._workflow_context import WorkflowContext
logger = logging.getLogger(__name__)
"""Concurrent builder for agent-only fan-out/fan-in workflows.
This module provides a high-level, agent-focused API to quickly assemble a
parallel workflow with:
- a default dispatcher that broadcasts the input to all agent participants
- a default aggregator that combines all agent conversations and completes the workflow
Notes:
- Participants should be AgentProtocol instances or Executors.
- A custom aggregator can be provided as:
- an Executor instance (it should handle list[AgentExecutorResponse] and add a WorkflowCompletedEvent), or
- a callback function with signature:
def cb(results: list[AgentExecutorResponse]) -> Any | None
def cb(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None
If the callback returns a non-None value, it is sent as the data of a WorkflowCompletedEvent.
If it returns None, the callback may have already emitted a completion event via ctx.
"""
class _DispatchToAllParticipants(Executor):
"""Broadcasts input to all downstream participants (via fan-out edges)."""
@handler
async def from_request(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
# No explicit target: edge routing delivers to all connected participants.
await ctx.send_message(request)
@handler
async def from_str(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=prompt)], should_respond=True)
await ctx.send_message(request)
@handler
async def from_message(self, message: ChatMessage, ctx: WorkflowContext[AgentExecutorRequest]) -> None: # type: ignore[name-defined]
request = AgentExecutorRequest(messages=[message], should_respond=True)
await ctx.send_message(request)
@handler
async def from_messages(self, messages: list[ChatMessage], ctx: WorkflowContext[AgentExecutorRequest]) -> None: # type: ignore[name-defined]
request = AgentExecutorRequest(messages=list(messages), should_respond=True)
await ctx.send_message(request)
class _AggregateAgentConversations(Executor):
"""Aggregates agent responses and completes with combined ChatMessages.
Emits a list[ChatMessage] shaped as:
[ single_user_prompt?, agent1_final_assistant, agent2_final_assistant, ... ]
- Extracts a single user prompt (first user message seen across results).
- For each result, selects the final assistant message (prefers agent_run_response.messages).
- Avoids duplicating the same user message per agent.
"""
@handler
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> None:
if not results:
logger.error("Concurrent aggregator received empty results list")
raise ValueError("Aggregation failed: no results provided")
def _is_role(msg: Any, role: Role) -> bool:
r = getattr(msg, "role", None)
if r is None:
return False
# Normalize both r and role to lowercase strings for comparison
r_str = str(r).lower() if isinstance(r, str) or hasattr(r, "__str__") else r
role_str = getattr(role, "value", None)
if role_str is None:
role_str = str(role)
role_str = role_str.lower()
return r_str == role_str
prompt_message: ChatMessage | None = None
assistant_replies: list[ChatMessage] = []
for r in results:
resp_messages = list(getattr(r.agent_run_response, "messages", []) or [])
conv = r.full_conversation if r.full_conversation is not None else resp_messages
logger.debug(
f"Aggregating executor {getattr(r, 'executor_id', '<unknown>')}: "
f"{len(resp_messages)} response msgs, {len(conv)} conversation msgs"
)
# Capture a single user prompt (first encountered across any conversation)
if prompt_message is None:
found_user = next((m for m in conv if _is_role(m, Role.USER)), None)
if found_user is not None:
prompt_message = found_user
# Pick the final assistant message from the response; fallback to conversation search
final_assistant = next((m for m in reversed(resp_messages) if _is_role(m, Role.ASSISTANT)), None)
if final_assistant is None:
final_assistant = next((m for m in reversed(conv) if _is_role(m, Role.ASSISTANT)), None)
if final_assistant is not None:
assistant_replies.append(final_assistant)
else:
logger.warning(
f"No assistant reply found for executor {getattr(r, 'executor_id', '<unknown>')}; skipping"
)
if not assistant_replies:
logger.error(f"Aggregation failed: no assistant replies found across {len(results)} results")
raise RuntimeError("Aggregation failed: no assistant replies found")
output: list[ChatMessage] = []
if prompt_message is not None:
output.append(prompt_message)
else:
logger.warning("No user prompt found in any conversation; emitting assistants only")
output.extend(assistant_replies)
await ctx.add_event(WorkflowCompletedEvent(data=output))
class _CallbackAggregator(Executor):
"""Wraps a Python callback as an aggregator.
Accepts either an async or sync callback with one of the signatures:
- (results: list[AgentExecutorResponse]) -> Any | None
- (results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None
Notes:
- Async callbacks are awaited directly.
- Sync callbacks are executed via asyncio.to_thread to avoid blocking the event loop.
- If the callback returns a non-None value, it is wrapped in a WorkflowCompletedEvent.
"""
def __init__(self, callback: Callable[..., Any], id: str | None = None) -> None:
super().__init__(id)
self._callback = callback
self._param_count = len(inspect.signature(callback).parameters)
@handler
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> None:
# Call according to provided signature, always non-blocking for sync callbacks
if self._param_count >= 2:
if inspect.iscoroutinefunction(self._callback):
ret = await self._callback(results, ctx) # type: ignore[misc]
else:
ret = await asyncio.to_thread(self._callback, results, ctx)
else:
if inspect.iscoroutinefunction(self._callback):
ret = await self._callback(results) # type: ignore[misc]
else:
ret = await asyncio.to_thread(self._callback, results)
# If the callback returned a value, finalize the workflow with it
if ret is not None:
await ctx.add_event(WorkflowCompletedEvent(ret))
class ConcurrentBuilder:
r"""High-level builder for concurrent agent workflows.
- `participants([...])` accepts a list of AgentProtocol (recommended) or Executor.
- `build()` wires: dispatcher -> fan-out -> participants -> fan-in -> aggregator.
- `with_custom_aggregator(...)` overrides the default aggregator with an Executor or callback.
Usage:
```python
from agent_framework.workflow import ConcurrentBuilder
# Minimal: use default aggregator (returns list[ChatMessage])
workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()
# Custom aggregator via callback (sync or async). The callback receives
# list[AgentExecutorResponse] and its return value becomes
# WorkflowCompletedEvent.data
def summarize(results):
return " | ".join(r.agent_run_response.messages[-1].text for r in results)
workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_custom_aggregator(summarize).build()
```
"""
def __init__(self) -> None:
self._participants: list[AgentProtocol | Executor] = []
self._aggregator: Executor | None = None
def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "ConcurrentBuilder":
r"""Define the parallel participants for this concurrent workflow.
Accepts AgentProtocol instances (e.g., created by a chat client) or Executor
instances. Each participant is wired as a parallel branch using fan-out edges
from an internal dispatcher.
Raises:
ValueError: if `participants` is empty or contains duplicates
TypeError: if any entry is not AgentProtocol or Executor
Example:
```python
wf = ConcurrentBuilder().participants([researcher_agent, marketer_agent, legal_agent]).build()
# Mixing agent(s) and executor(s) is supported
wf2 = ConcurrentBuilder().participants([researcher_agent, my_custom_executor]).build()
```
"""
if not participants:
raise ValueError("participants cannot be empty")
# Defensive duplicate detection
seen_agent_ids: set[int] = set()
seen_executor_ids: set[str] = set()
for p in participants:
if isinstance(p, Executor):
if p.id in seen_executor_ids:
raise ValueError(f"Duplicate executor participant detected: id '{p.id}'")
seen_executor_ids.add(p.id)
elif isinstance(p, AgentProtocol):
pid = id(p)
if pid in seen_agent_ids:
raise ValueError("Duplicate agent participant detected (same agent instance provided twice)")
seen_agent_ids.add(pid)
else:
raise TypeError(f"participants must be AgentProtocol or Executor instances; got {type(p).__name__}")
self._participants = list(participants)
return self
def with_aggregator(self, aggregator: Executor | Callable[..., Any]) -> "ConcurrentBuilder":
r"""Override the default aggregator with an Executor or a callback.
- Executor: must handle `list[AgentExecutorResponse]` and add a
`WorkflowCompletedEvent` to the context.
- Callback: sync or async callable with one of the signatures:
`(results: list[AgentExecutorResponse]) -> Any | None` or
`(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None`.
If the callback returns a non-None value, it becomes the
`WorkflowCompletedEvent.data`.
Example:
```python
# Callback-based aggregator (string result)
async def summarize(results):
return " | ".join(r.agent_run_response.messages[-1].text for r in results)
wf = ConcurrentBuilder().participants([a1, a2, a3]).with_custom_aggregator(summarize).build()
```
"""
if isinstance(aggregator, Executor):
self._aggregator = aggregator
elif callable(aggregator):
self._aggregator = _CallbackAggregator(aggregator)
else:
raise TypeError("aggregator must be an Executor or a callable")
return self
def build(self) -> Workflow:
r"""Build and validate the concurrent workflow.
Wiring pattern:
- Dispatcher (internal) fans out the input to all `participants`
- Fan-in aggregator collects `AgentExecutorResponse` objects
- Aggregator emits a `WorkflowCompletedEvent` with either:
- list[ChatMessage] (default aggregator: one user + one assistant per agent)
- custom payload from the provided callback/executor
Returns:
Workflow: a ready-to-run workflow instance
Raises:
ValueError: if no participants were defined
Example:
```python
workflow = ConcurrentBuilder().participants([agent1, agent2]).build()
```
"""
if not self._participants:
raise ValueError("No participants provided. Call .participants([...]) first.")
dispatcher = _DispatchToAllParticipants(id="dispatcher")
aggregator = self._aggregator or _AggregateAgentConversations(id="aggregator")
builder = WorkflowBuilder()
return (
builder.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, list(self._participants))
.add_fan_in_edges(list(self._participants), aggregator)
.build()
)
@@ -0,0 +1,126 @@
# Copyright (c) Microsoft. All rights reserved.
from typing import Any, cast
import pytest
from agent_framework import AgentRunResponse, ChatMessage, Role
from agent_framework_workflow import (
AgentExecutorRequest,
AgentExecutorResponse,
ConcurrentBuilder,
Executor,
WorkflowCompletedEvent,
WorkflowContext,
handler,
)
class _FakeAgentExec(Executor):
"""Test executor that mimics an agent by emitting an AgentExecutorResponse.
It takes the incoming AgentExecutorRequest, produces a single assistant message
with the configured reply text, and sends an AgentExecutorResponse that includes
full_conversation (the original user prompt followed by the assistant message).
"""
def __init__(self, id: str, reply_text: str) -> None:
super().__init__(id)
self._reply_text = reply_text
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = AgentRunResponse(messages=ChatMessage(Role.ASSISTANT, text=self._reply_text))
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
def test_concurrent_builder_rejects_empty_participants() -> None:
with pytest.raises(ValueError):
ConcurrentBuilder().participants([])
def test_concurrent_builder_rejects_duplicate_executors() -> None:
a = _FakeAgentExec("dup", "A")
b = _FakeAgentExec("dup", "B") # same executor id
with pytest.raises(ValueError):
ConcurrentBuilder().participants([a, b])
async def test_concurrent_default_aggregator_emits_single_user_and_assistants() -> None:
# Three synthetic agent executors
e1 = _FakeAgentExec("agentA", "Alpha")
e2 = _FakeAgentExec("agentB", "Beta")
e3 = _FakeAgentExec("agentC", "Gamma")
wf = ConcurrentBuilder().participants([e1, e2, e3]).build()
completed: WorkflowCompletedEvent | None = None
async for ev in wf.run_stream("prompt: hello world"):
if isinstance(ev, WorkflowCompletedEvent):
completed = ev
break
assert completed is not None
assert isinstance(completed.data, list)
messages: list[ChatMessage] = cast(list[ChatMessage], completed.data) # type: ignore
# Expect one user message + one assistant message per participant
assert len(messages) == 1 + 3
assert messages[0].role == Role.USER
assert "hello world" in messages[0].text
assistant_texts = {m.text for m in messages[1:]}
assert assistant_texts == {"Alpha", "Beta", "Gamma"}
assert all(m.role == Role.ASSISTANT for m in messages[1:])
async def test_concurrent_custom_aggregator_callback_is_used() -> None:
# Two synthetic agent executors for brevity
e1 = _FakeAgentExec("agentA", "One")
e2 = _FakeAgentExec("agentB", "Two")
async def summarize(results: list[AgentExecutorResponse]) -> str:
texts: list[str] = []
for r in results:
msgs: list[ChatMessage] = r.agent_run_response.messages
texts.append(msgs[-1].text if msgs else "")
return " | ".join(sorted(texts))
wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(summarize).build()
completed: WorkflowCompletedEvent | None = None
async for ev in wf.run_stream("prompt: custom"):
if isinstance(ev, WorkflowCompletedEvent):
completed = ev
break
assert completed is not None
# Custom aggregator returns a string payload
assert isinstance(completed.data, str)
assert completed.data == "One | Two"
async def test_concurrent_custom_aggregator_sync_callback_is_used() -> None:
e1 = _FakeAgentExec("agentA", "One")
e2 = _FakeAgentExec("agentB", "Two")
# Sync callback with ctx parameter (should run via asyncio.to_thread)
def summarize_sync(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> str: # type: ignore[unused-argument]
texts: list[str] = []
for r in results:
msgs: list[ChatMessage] = r.agent_run_response.messages
texts.append(msgs[-1].text if msgs else "")
return " | ".join(sorted(texts))
wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(summarize_sync).build()
completed: WorkflowCompletedEvent | None = None
async for ev in wf.run_stream("prompt: custom sync"):
if isinstance(ev, WorkflowCompletedEvent):
completed = ev
break
assert completed is not None
assert isinstance(completed.data, str)
assert completed.data == "One | Two"
+1 -1
View File
@@ -31,7 +31,7 @@ dev = [
"markdownify",
# Documentation
"myst-nb==1.1.2",
"pydata-sphinx-theme==0.16.0",
"pydata-sphinx-theme==0.16.1",
"sphinx-copybutton",
"sphinx-design",
"sphinx",
@@ -10,12 +10,12 @@ async def reasoning_example() -> None:
"""Example of reasoning response (get results as they are generated)."""
print("=== Reasoning Example ===")
agent = OpenAIResponsesClient(ai_model_id="o4-mini").create_agent(
agent = OpenAIResponsesClient(ai_model_id="gpt-5").create_agent(
name="MathHelper",
instructions="You are a personal math tutor. When asked a math question, "
"write and run code using the python tool to answer the question.",
tools=HostedCodeInterpreterTool(),
reasoning={"effort": "medium"},
reasoning={"effort": "high", "summary": "detailed"},
)
query = "I need to solve the equation 3x + 11 = 14. Can you help me?"
@@ -27,9 +27,9 @@ async def reasoning_example() -> None:
for content in chunk.contents:
if isinstance(content, TextReasoningContent):
print(f"\033[97m{content.text}\033[0m", end="", flush=True)
if isinstance(content, TextContent):
elif isinstance(content, TextContent):
print(content.text, end="", flush=True)
if isinstance(content, UsageContent):
elif isinstance(content, UsageContent):
usage = content
print("\n")
if usage:
@@ -78,6 +78,9 @@ Once comfortable with these, explore the rest of the samples below.
### orchestration
| Sample | File | Concepts |
|---|---|---|
| Concurrent Orchestration (Default Aggregator) | [orchestration/concurrent_agents.py](./orchestration/concurrent_agents.py) | Fan-out to multiple agents; fan-in with default aggregator returning combined ChatMessages |
| Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM |
| Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder |
| Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming |
| Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution |
@@ -0,0 +1,131 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import ChatMessage
from agent_framework.azure import AzureChatClient
from agent_framework.workflow import ConcurrentBuilder, WorkflowCompletedEvent
from azure.identity import AzureCliCredential
"""
Sample: Concurrent fan-out/fan-in (agent-only API) with default aggregator
Build a high-level concurrent workflow using ConcurrentBuilder and three domain agents.
The default dispatcher fans out the same user prompt to all agents in parallel.
The default aggregator fans in their results and emits a WorkflowCompletedEvent whose
data is a list[ChatMessage] representing the concatenated conversations from all agents.
Demonstrates:
- Minimal wiring with ConcurrentBuilder().participants([...]).build()
- Fan-out to multiple agents, fan-in aggregation of final ChatMessages
- Streaming of AgentRunEvent for simple progress visibility
Prerequisites:
- Azure OpenAI access configured for AzureChatClient (use az login + env vars)
- Familiarity with Workflow events (AgentRunEvent, WorkflowCompletedEvent)
"""
async def main() -> None:
# 1) Create three domain agents using AzureChatClient
chat_client = AzureChatClient(credential=AzureCliCredential())
researcher = chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = chat_client.create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
# 2) Build a concurrent workflow
# Participants are either Agents (type of AgentProtocol) or Executors
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
# 3) Run with a single prompt, stream progress, and pretty-print the final combined messages
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
print("===== Final Aggregated Conversation (messages) =====")
messages: list[ChatMessage] | Any = completion.data
for i, msg in enumerate(messages, start=1):
name = msg.author_name if msg.author_name else "user"
print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")
"""
Sample Output:
===== Final Aggregated Conversation (messages) =====
------------------------------------------------------------
01 [user]:
We are launching a new budget-friendly electric bike for urban commuters.
------------------------------------------------------------
02 [researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
- **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon,
Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia.
- **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection,
lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles),
and low-maintenance components.
**Opportunities:**
- **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of
operation, and cost savings vs. public transit/car ownership.
...
------------------------------------------------------------
03 [marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
**Target Messaging:**
*For Young Professionals:*
...
------------------------------------------------------------
04 [legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
- Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained.
**2. Product Safety**
- Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions.
...
""" # noqa: E501
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,175 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import ChatAgent, ChatMessage
from agent_framework.azure import AzureChatClient
from agent_framework.workflow import (
AgentExecutorRequest,
AgentExecutorResponse,
ConcurrentBuilder,
Executor,
WorkflowCompletedEvent,
WorkflowContext,
handler,
)
from azure.identity import AzureCliCredential
"""
Sample: Concurrent Orchestration with Custom Agent Executors
This sample shows a concurrent fan-out/fan-in pattern using child Executor classes
that each own their ChatAgent. The executors accept AgentExecutorRequest inputs
and emit AgentExecutorResponse outputs, which allows reuse of the high-level
ConcurrentBuilder API and the default aggregator.
Demonstrates:
- Executors that create their ChatAgent in __init__ (via AzureChatClient)
- A @handler that converts AgentExecutorRequest -> AgentExecutorResponse
- ConcurrentBuilder().participants([...]) to build fan-out/fan-in
- Default aggregator returning list[ChatMessage] (one user + one assistant per agent)
Prerequisites:
- Azure OpenAI configured for AzureChatClient (az login + required env vars)
"""
class ResearcherExec(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "researcher"):
agent = chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name=id,
)
super().__init__(agent=agent, id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class MarketerExec(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "marketer"):
agent = chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name=id,
)
super().__init__(agent=agent, id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class LegalExec(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "legal"):
agent = chat_client.create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name=id,
)
super().__init__(agent=agent, id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
async def main() -> None:
chat_client = AzureChatClient(credential=AzureCliCredential())
researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
print("===== Final Aggregated Conversation (messages) =====")
messages: list[ChatMessage] | Any = completion.data
for i, msg in enumerate(messages, start=1):
name = msg.author_name if msg.author_name else "user"
print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")
"""
Sample Output:
===== Final Aggregated Conversation (messages) =====
------------------------------------------------------------
01 [user]:
We are launching a new budget-friendly electric bike for urban commuters.
------------------------------------------------------------
02 [researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
- **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon,
Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia.
- **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection,
lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles),
and low-maintenance components.
**Opportunities:**
- **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of
operation, and cost savings vs. public transit/car ownership.
...
------------------------------------------------------------
03 [marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
**Target Messaging:**
*For Young Professionals:*
...
------------------------------------------------------------
04 [legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
- Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained.
**2. Product Safety**
- Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions.
...
""" # noqa: E501
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,125 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import ChatMessage, Role
from agent_framework.azure import AzureChatClient
from agent_framework.workflow import ConcurrentBuilder, WorkflowCompletedEvent
from azure.identity import AzureCliCredential
"""
Sample: Concurrent Orchestration with Custom Aggregator
Build a concurrent workflow with ConcurrentBuilder that fans out one prompt to
multiple domain agents and fans in their responses. Override the default
aggregator with a custom async callback that uses AzureChatClient.get_response()
to synthesize a concise, consolidated summary from the experts' outputs.
Demonstrates:
- ConcurrentBuilder().participants([...]).with_custom_aggregator(callback)
- Fan-out to agents and fan-in at an aggregator
- Aggregation implemented via an LLM call (chat_client.get_response)
- WorkflowCompletedEvent carrying the synthesized summary string
Prerequisites:
- Azure OpenAI configured for AzureChatClient (az login + required env vars)
"""
async def main() -> None:
chat_client = AzureChatClient(credential=AzureCliCredential())
researcher = chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = chat_client.create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
# Define a custom aggregator callback that uses the chat client to summarize
async def summarize_results(results: list[Any]) -> str:
# Extract one final assistant message per agent
expert_sections: list[str] = []
for r in results:
try:
messages = getattr(r.agent_run_response, "messages", [])
final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}:\n{final_text}")
except Exception as e:
expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}: (error: {type(e).__name__}: {e})")
# Ask the model to synthesize a concise summary of the experts' outputs
system_msg = ChatMessage(
Role.SYSTEM,
text=(
"You are a helpful assistant that consolidates multiple domain expert outputs "
"into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
),
)
user_msg = ChatMessage(Role.USER, text="\n\n".join(expert_sections))
response = await chat_client.get_response([system_msg, user_msg])
# Return the model's final assistant text as the completion result
return response.messages[-1].text if response.messages else ""
# Build with a custom aggregator callback function
# - participants([...]) accepts AgentProtocol (agents) or Executor instances.
# Each participant becomes a parallel branch (fan-out) from an internal dispatcher.
# - with_aggregator(...) overrides the default aggregator:
# • Default aggregator -> returns list[ChatMessage] (one user + one assistant per agent)
# • Custom callback -> return value becomes WorkflowCompletedEvent.data (string here)
# The callback can be sync or async; it receives list[AgentExecutorResponse].
workflow = (
ConcurrentBuilder().participants([researcher, marketer, legal]).with_aggregator(summarize_results).build()
)
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
print("===== Final Consolidated Output =====")
print(completion.data)
"""
Sample Output:
===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.
Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.
Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: “Charge Ahead—City Commutes Made
Affordable.” Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.
"""
if __name__ == "__main__":
asyncio.run(main())