diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
index 06ee8d8d4c..e2649178c0 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -32,7 +32,7 @@ jobs:
steps:
- name: Checkout repository
- uses: actions/checkout@v4
+ uses: actions/checkout@v5
with:
persist-credentials: false
diff --git a/.github/workflows/dotnet-build-and-test.yml b/.github/workflows/dotnet-build-and-test.yml
index 82248ff921..237c52cf46 100644
--- a/.github/workflows/dotnet-build-and-test.yml
+++ b/.github/workflows/dotnet-build-and-test.yml
@@ -47,7 +47,7 @@ jobs:
runs-on: ${{ matrix.os }}
environment: ${{ matrix.environment }}
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
with:
persist-credentials: false
sparse-checkout: |
diff --git a/.github/workflows/dotnet-format.yml b/.github/workflows/dotnet-format.yml
index 893bd969b8..581318c538 100644
--- a/.github/workflows/dotnet-format.yml
+++ b/.github/workflows/dotnet-format.yml
@@ -30,7 +30,7 @@ jobs:
steps:
- name: Check out code
- uses: actions/checkout@v4
+ uses: actions/checkout@v5
with:
fetch-depth: 0
persist-credentials: false
diff --git a/.github/workflows/python-code-quality.yml b/.github/workflows/python-code-quality.yml
index d940513986..e4bd5155c1 100644
--- a/.github/workflows/python-code-quality.yml
+++ b/.github/workflows/python-code-quality.yml
@@ -27,7 +27,7 @@ jobs:
env:
UV_PYTHON: ${{ matrix.python-version }}
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
diff --git a/.github/workflows/python-merge-tests.yml b/.github/workflows/python-merge-tests.yml
index 9868dc6c80..950394bae6 100644
--- a/.github/workflows/python-merge-tests.yml
+++ b/.github/workflows/python-merge-tests.yml
@@ -28,7 +28,7 @@ jobs:
outputs:
pythonChanges: ${{ steps.filter.outputs.python}}
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
- uses: dorny/paths-filter@v3
id: filter
with:
@@ -66,7 +66,7 @@ jobs:
run:
working-directory: python
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
@@ -127,7 +127,7 @@ jobs:
run:
working-directory: python
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
@@ -194,7 +194,7 @@ jobs:
run:
working-directory: python
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
diff --git a/.github/workflows/python-release.yml b/.github/workflows/python-release.yml
index 571af6c148..39935650ef 100644
--- a/.github/workflows/python-release.yml
+++ b/.github/workflows/python-release.yml
@@ -23,7 +23,7 @@ jobs:
run:
working-directory: python
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
diff --git a/.github/workflows/python-test-coverage-report.yml b/.github/workflows/python-test-coverage-report.yml
index 3f3256db89..811e65098c 100644
--- a/.github/workflows/python-test-coverage-report.yml
+++ b/.github/workflows/python-test-coverage-report.yml
@@ -19,9 +19,9 @@ jobs:
run:
working-directory: python
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
- name: Download coverage report
- uses: actions/download-artifact@v4
+ uses: actions/download-artifact@v5
with:
github-token: ${{ secrets.GH_ACTIONS_PR_WRITE }}
run-id: ${{ github.event.workflow_run.id }}
@@ -39,7 +39,7 @@ jobs:
echo "PR_NUMBER=$PR_NUMBER" >> $GITHUB_ENV
- name: Pytest coverage comment
id: coverageComment
- uses: MishaKav/pytest-coverage-comment@v1.1.53
+ uses: MishaKav/pytest-coverage-comment@v1.1.56
with:
github-token: ${{ secrets.GH_ACTIONS_PR_WRITE }}
issue-number: ${{ env.PR_NUMBER }}
diff --git a/.github/workflows/python-test-coverage.yml b/.github/workflows/python-test-coverage.yml
index 08ffe47158..c3f014235f 100644
--- a/.github/workflows/python-test-coverage.yml
+++ b/.github/workflows/python-test-coverage.yml
@@ -20,7 +20,7 @@ jobs:
env:
UV_PYTHON: "3.10"
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
# Save the PR number to a file since the workflow_run event
# in the coverage report workflow does not have access to it
- name: Save PR number
diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml
index 5981f27757..20413071e7 100644
--- a/.github/workflows/python-tests.yml
+++ b/.github/workflows/python-tests.yml
@@ -26,7 +26,7 @@ jobs:
run:
working-directory: python
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props
index f0e6d3fe0d..907a58ece8 100644
--- a/dotnet/Directory.Packages.props
+++ b/dotnet/Directory.Packages.props
@@ -74,14 +74,14 @@
-
+
-
+
@@ -91,7 +91,7 @@
-
+
diff --git a/dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs b/dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs
index df315dbfaa..a2713c80d2 100644
--- a/dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs
+++ b/dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs
@@ -30,7 +30,7 @@ namespace SampleApp
// Custom agent that parrot's the user input back in upper case.
internal sealed class UpperCaseParrotAgent : AIAgent
{
- public override async Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
// Create a thread if the user didn't supply one.
thread ??= this.GetNewThread();
@@ -50,7 +50,7 @@ namespace SampleApp
};
}
- public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Create a thread if the user didn't supply one.
thread ??= this.GetNewThread();
@@ -76,7 +76,7 @@ namespace SampleApp
}
}
- private static IEnumerable CloneAndToUpperCase(IReadOnlyCollection messages, string agentName) => messages.Select(x =>
+ private static IEnumerable CloneAndToUpperCase(IEnumerable messages, string agentName) => messages.Select(x =>
{
// Clone the message and update its author to be the agent.
var messageClone = x.Clone();
diff --git a/dotnet/samples/GettingStarted/Agents/Agent_Step07_3rdPartyThreadStorage/Program.cs b/dotnet/samples/GettingStarted/Agents/Agent_Step07_3rdPartyThreadStorage/Program.cs
index 11cbd7670f..8db527a2af 100644
--- a/dotnet/samples/GettingStarted/Agents/Agent_Step07_3rdPartyThreadStorage/Program.cs
+++ b/dotnet/samples/GettingStarted/Agents/Agent_Step07_3rdPartyThreadStorage/Program.cs
@@ -82,7 +82,7 @@ namespace SampleApp
public string? ThreadId => this._threadId;
- public async Task AddMessagesAsync(IReadOnlyCollection messages, CancellationToken cancellationToken)
+ public async Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken)
{
this._threadId ??= Guid.NewGuid().ToString();
diff --git a/dotnet/src/Microsoft.Agents.Orchestration/ConcurrentOrchestration.cs b/dotnet/src/Microsoft.Agents.Orchestration/ConcurrentOrchestration.cs
index 3953b7fd0f..009c327e0e 100644
--- a/dotnet/src/Microsoft.Agents.Orchestration/ConcurrentOrchestration.cs
+++ b/dotnet/src/Microsoft.Agents.Orchestration/ConcurrentOrchestration.cs
@@ -57,11 +57,11 @@ public partial class ConcurrentOrchestration : OrchestratingAgent
}
///
- protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
- this.ResumeAsync(messages, new AgentRunResponse?[this.Agents.Count], context, cancellationToken);
+ protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
+ this.ResumeAsync(messages as IReadOnlyCollection ?? messages.ToList(), new AgentRunResponse?[this.Agents.Count], context, cancellationToken);
///
- protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
+ protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.ConcurrentState) ?? throw new InvalidOperationException("The checkpoint state is invalid.");
diff --git a/dotnet/src/Microsoft.Agents.Orchestration/GroupChat/GroupChatOrchestration.cs b/dotnet/src/Microsoft.Agents.Orchestration/GroupChat/GroupChatOrchestration.cs
index 0f0973e768..5afd420a27 100644
--- a/dotnet/src/Microsoft.Agents.Orchestration/GroupChat/GroupChatOrchestration.cs
+++ b/dotnet/src/Microsoft.Agents.Orchestration/GroupChat/GroupChatOrchestration.cs
@@ -43,7 +43,7 @@ public sealed partial class GroupChatOrchestration : OrchestratingAgent
public Func>? InteractiveCallback { get; set; }
///
- protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken)
+ protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
List allMessages = [.. messages];
int originalMessageCount = allMessages.Count;
@@ -51,7 +51,7 @@ public sealed partial class GroupChatOrchestration : OrchestratingAgent
}
///
- protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
+ protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.GroupChatState) ?? throw new InvalidOperationException("The checkpoint state is invalid.");
diff --git a/dotnet/src/Microsoft.Agents.Orchestration/Handoffs/HandoffOrchestration.cs b/dotnet/src/Microsoft.Agents.Orchestration/Handoffs/HandoffOrchestration.cs
index 8ef64910e7..23d29f9f69 100644
--- a/dotnet/src/Microsoft.Agents.Orchestration/Handoffs/HandoffOrchestration.cs
+++ b/dotnet/src/Microsoft.Agents.Orchestration/Handoffs/HandoffOrchestration.cs
@@ -44,7 +44,7 @@ public sealed partial class HandoffOrchestration : OrchestratingAgent
public Func>? InteractiveCallback { get; set; }
///
- protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken)
+ protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
List allMessages = [.. messages];
int originalMessageCount = allMessages.Count;
@@ -52,7 +52,7 @@ public sealed partial class HandoffOrchestration : OrchestratingAgent
}
///
- protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
+ protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.HandoffState) ?? throw new InvalidOperationException("The checkpoint state is invalid.");
diff --git a/dotnet/src/Microsoft.Agents.Orchestration/OrchestratingAgent.cs b/dotnet/src/Microsoft.Agents.Orchestration/OrchestratingAgent.cs
index 8da88c20bc..18c70cab3a 100644
--- a/dotnet/src/Microsoft.Agents.Orchestration/OrchestratingAgent.cs
+++ b/dotnet/src/Microsoft.Agents.Orchestration/OrchestratingAgent.cs
@@ -65,7 +65,7 @@ public abstract partial class OrchestratingAgent : AIAgent
///
public sealed override async Task RunAsync(
- IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(messages);
@@ -87,7 +87,7 @@ public abstract partial class OrchestratingAgent : AIAgent
///
public sealed override async IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// TODO: There should be a RunAsync overload that returns an OrchestratingAgentStreamingResponse, which this then delegates to.
@@ -106,12 +106,12 @@ public abstract partial class OrchestratingAgent : AIAgent
/// The runtime associated with the orchestration.
/// The to monitor for cancellation requests. The default is .
public async ValueTask RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentRunOptions? options = null,
IActorRuntimeContext? runtime = null,
CancellationToken cancellationToken = default)
{
- Throw.IfNull(messages, nameof(messages));
+ var readonlyCollectionMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList();
cancellationToken.ThrowIfCancellationRequested();
ILogger logger = this.LoggerFactory.CreateLogger(this.GetType().Name);
@@ -131,8 +131,8 @@ public abstract partial class OrchestratingAgent : AIAgent
JsonElement? checkpoint = await this.ReadCheckpointAsync(context, cancellationToken).ConfigureAwait(false);
Task completion = checkpoint is null ?
- this.RunCoreAsync(messages, context, cancellationToken) :
- this.ResumeCoreAsync(checkpoint.Value, messages, context, cancellationToken);
+ this.RunCoreAsync(readonlyCollectionMessages, context, cancellationToken) :
+ this.ResumeCoreAsync(checkpoint.Value, readonlyCollectionMessages, context, cancellationToken);
if (logger.IsEnabled(LogLevel.Trace))
{
@@ -148,7 +148,7 @@ public abstract partial class OrchestratingAgent : AIAgent
/// The input message.
/// The context for this operation.
/// A cancellation token that can be used to cancel the operation.
- protected abstract Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken);
+ protected abstract Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken);
///
/// Resumes processing of the orchestration.
@@ -157,7 +157,7 @@ public abstract partial class OrchestratingAgent : AIAgent
/// The new messages to be processed in addition to the checkpoint state.
/// The context for this operation.
/// A cancellation token that can be used to cancel the operation.
- protected abstract Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken);
+ protected abstract Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken);
///
/// Runs the agent with input messages and respond with both streamed and regular messages.
@@ -168,7 +168,7 @@ public abstract partial class OrchestratingAgent : AIAgent
/// Options to use when invoking the agent.
/// A cancellation token that can be used to cancel the operation.
/// A task that returns the response .
- protected static async ValueTask RunAsync(AIAgent agent, OrchestratingAgentContext context, IReadOnlyCollection input, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ protected static async ValueTask RunAsync(AIAgent agent, OrchestratingAgentContext context, IEnumerable input, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
// Utilize streaming iff a streaming callback is provided; otherwise, use the non-streaming API.
AgentRunResponse response;
diff --git a/dotnet/src/Microsoft.Agents.Orchestration/SequentialOrchestration.cs b/dotnet/src/Microsoft.Agents.Orchestration/SequentialOrchestration.cs
index 1c113cfbde..14a4bb1cdd 100644
--- a/dotnet/src/Microsoft.Agents.Orchestration/SequentialOrchestration.cs
+++ b/dotnet/src/Microsoft.Agents.Orchestration/SequentialOrchestration.cs
@@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@@ -28,11 +29,11 @@ public sealed partial class SequentialOrchestration : OrchestratingAgent
}
///
- protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
- this.ResumeAsync(0, messages, context, cancellationToken);
+ protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
+ this.ResumeAsync(0, messages as IReadOnlyCollection ?? messages.ToList(), context, cancellationToken);
///
- protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
+ protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken)
{
var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.SequentialState) ?? throw new InvalidOperationException("The checkpoint state is invalid.");
diff --git a/dotnet/src/Microsoft.Agents.Workflows/WorkflowHostAgent.cs b/dotnet/src/Microsoft.Agents.Workflows/WorkflowHostAgent.cs
index 3751cd2c9f..d0a48288d5 100644
--- a/dotnet/src/Microsoft.Agents.Workflows/WorkflowHostAgent.cs
+++ b/dotnet/src/Microsoft.Agents.Workflows/WorkflowHostAgent.cs
@@ -96,7 +96,7 @@ internal class WorkflowHostAgent : AIAgent
}
}
- private async ValueTask UpdateThreadAsync(IReadOnlyCollection messages, AgentThread? thread = null, CancellationToken cancellation = default)
+ private async ValueTask UpdateThreadAsync(IEnumerable messages, AgentThread? thread = null, CancellationToken cancellation = default)
{
if (thread is null)
{
@@ -114,7 +114,7 @@ internal class WorkflowHostAgent : AIAgent
public override async
Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -134,7 +134,7 @@ internal class WorkflowHostAgent : AIAgent
public override async
IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
diff --git a/dotnet/src/Microsoft.Agents.Workflows/WorkflowMessageStore.cs b/dotnet/src/Microsoft.Agents.Workflows/WorkflowMessageStore.cs
index 35de73ce0b..dd2aed2750 100644
--- a/dotnet/src/Microsoft.Agents.Workflows/WorkflowMessageStore.cs
+++ b/dotnet/src/Microsoft.Agents.Workflows/WorkflowMessageStore.cs
@@ -25,7 +25,7 @@ internal class WorkflowMessageStore : IChatMessageStore
this._chatMessages.AddRange(messages);
}
- public Task AddMessagesAsync(IReadOnlyCollection messages, CancellationToken cancellationToken)
+ public Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken)
{
this._chatMessages.AddRange(messages);
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/A2AAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/A2AAgent.cs
index 061b988137..802744bf9a 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/A2AAgent.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/A2AAgent.cs
@@ -52,7 +52,7 @@ internal sealed class A2AAgent : AIAgent
}
///
- public override async Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
ValidateInputMessages(messages);
@@ -98,7 +98,7 @@ internal sealed class A2AAgent : AIAgent
}
///
- public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ValidateInputMessages(messages);
@@ -147,7 +147,7 @@ internal sealed class A2AAgent : AIAgent
///
public override string? Description => this._description ?? base.Description;
- private static void ValidateInputMessages(IReadOnlyCollection messages)
+ private static void ValidateInputMessages(IEnumerable messages)
{
_ = Throw.IfNull(messages);
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/Extensions/ChatMessageExtensions.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/Extensions/ChatMessageExtensions.cs
index f030750dcd..f7a2b09013 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/Extensions/ChatMessageExtensions.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/Extensions/ChatMessageExtensions.cs
@@ -11,7 +11,7 @@ namespace Microsoft.Extensions.AI.Agents.A2A;
///
internal static class ChatMessageExtensions
{
- internal static Message ToA2AMessage(this IReadOnlyCollection messages)
+ internal static Message ToA2AMessage(this IEnumerable messages)
{
List allParts = [];
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AIAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AIAgent.cs
index 05bdf49039..395f075029 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AIAgent.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AIAgent.cs
@@ -112,7 +112,7 @@ public abstract class AIAgent
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
- return this.RunAsync((IReadOnlyCollection)[], thread, options, cancellationToken);
+ return this.RunAsync((IEnumerable)[], thread, options, cancellationToken);
}
///
@@ -174,7 +174,7 @@ public abstract class AIAgent
/// The to monitor for cancellation requests. The default is .
/// A containing the list of items.
public abstract Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default);
@@ -194,7 +194,7 @@ public abstract class AIAgent
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
- return this.RunStreamingAsync((IReadOnlyCollection)[], thread, options, cancellationToken);
+ return this.RunStreamingAsync((IEnumerable)[], thread, options, cancellationToken);
}
///
@@ -256,7 +256,7 @@ public abstract class AIAgent
/// The to monitor for cancellation requests. The default is .
/// An async list of response items that each contain a .
public abstract IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default);
@@ -285,14 +285,11 @@ public abstract class AIAgent
/// The messages to pass to the thread.
/// The to monitor for cancellation requests. The default is .
/// An async task that completes once the notification is complete.
- protected static async Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IReadOnlyCollection messages, CancellationToken cancellationToken)
+ protected static async Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IEnumerable messages, CancellationToken cancellationToken)
{
_ = Throw.IfNull(thread);
_ = Throw.IfNull(messages);
- if (messages.Count > 0)
- {
- await thread.OnNewMessagesAsync(messages, cancellationToken).ConfigureAwait(false);
- }
+ await thread.OnNewMessagesAsync(messages, cancellationToken).ConfigureAwait(false);
}
}
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AgentThread.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AgentThread.cs
index afc92bc60d..74a5c8108e 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AgentThread.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AgentThread.cs
@@ -139,7 +139,7 @@ public class AgentThread
/// The to monitor for cancellation requests. The default is .
/// A task that completes when the context has been updated.
/// The thread has been deleted.
- protected internal virtual async Task OnNewMessagesAsync(IReadOnlyCollection newMessages, CancellationToken cancellationToken = default)
+ protected internal virtual async Task OnNewMessagesAsync(IEnumerable newMessages, CancellationToken cancellationToken = default)
{
switch (this)
{
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/DelegatingAIAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/DelegatingAIAgent.cs
index e2e0fd7f2d..81cc93628c 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/DelegatingAIAgent.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/DelegatingAIAgent.cs
@@ -54,7 +54,7 @@ public class DelegatingAIAgent : AIAgent
///
public override Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -62,7 +62,7 @@ public class DelegatingAIAgent : AIAgent
///
public override IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/IChatMessageStore.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/IChatMessageStore.cs
index ad6303f36e..e9c3bf374a 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/IChatMessageStore.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/IChatMessageStore.cs
@@ -42,7 +42,7 @@ public interface IChatMessageStore
/// The messages to add.
/// The to monitor for cancellation requests. The default is .
/// An async task.
- Task AddMessagesAsync(IReadOnlyCollection messages, CancellationToken cancellationToken = default);
+ Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default);
///
/// Deserializes the state contained in the provided into the properties on this store.
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/InMemoryChatMessageStore.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/InMemoryChatMessageStore.cs
index 61e56c5f18..6f59fd696c 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/InMemoryChatMessageStore.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/InMemoryChatMessageStore.cs
@@ -61,15 +61,10 @@ public sealed class InMemoryChatMessageStore : IList, IChatMessageS
}
///
- public async Task AddMessagesAsync(IReadOnlyCollection messages, CancellationToken cancellationToken)
+ public async Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken)
{
_ = Throw.IfNull(messages);
- if (messages.Count == 0)
- {
- return;
- }
-
this._messages.AddRange(messages);
if (this._reducerTriggerEvent == ChatReducerTriggerEvent.AfterMessageAdded && this._chatReducer is not null)
@@ -172,7 +167,7 @@ public sealed class InMemoryChatMessageStore : IList, IChatMessageS
{
///
/// Trigger the reducer when a new message is added.
- /// will only complete when reducer processing is done.
+ /// will only complete when reducer processing is done.
///
AfterMessageAdded,
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.CopilotStudio/CopilotStudioAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.CopilotStudio/CopilotStudioAgent.cs
index 30aa346d3c..dbd15c2b6e 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.CopilotStudio/CopilotStudioAgent.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.CopilotStudio/CopilotStudioAgent.cs
@@ -41,7 +41,7 @@ public class CopilotStudioAgent : AIAgent
///
public override async Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -74,7 +74,7 @@ public class CopilotStudioAgent : AIAgent
///
public override async IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs
index 731f54da72..5a0b41bb8a 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs
@@ -45,7 +45,7 @@ public sealed class AgentProxy : AIAgent
///
public override async Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -57,7 +57,7 @@ public sealed class AgentProxy : AIAgent
///
public override async IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
@@ -70,7 +70,7 @@ public sealed class AgentProxy : AIAgent
}
}
- private async Task RunAsync(IReadOnlyCollection messages, string threadId, CancellationToken cancellationToken)
+ private async Task RunAsync(IEnumerable messages, string threadId, CancellationToken cancellationToken)
{
var handle = await this.RunCoreAsync(messages, threadId, cancellationToken).ConfigureAwait(false);
var response = await handle.GetResponseAsync(cancellationToken).ConfigureAwait(false);
@@ -85,7 +85,7 @@ public sealed class AgentProxy : AIAgent
}
private async IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
string threadId,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
@@ -123,7 +123,7 @@ public sealed class AgentProxy : AIAgent
return agentProxyThread.ConversationId!;
}
- private async Task RunCoreAsync(IReadOnlyCollection messages, string threadId, CancellationToken cancellationToken)
+ private async Task RunCoreAsync(IEnumerable messages, string threadId, CancellationToken cancellationToken)
{
List newMessages = [.. messages];
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.OpenAI/OpenAIChatClientAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.OpenAI/OpenAIChatClientAgent.cs
index 2c9dbdcee3..e583d5d4e5 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents.OpenAI/OpenAIChatClientAgent.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents.OpenAI/OpenAIChatClientAgent.cs
@@ -85,7 +85,7 @@ public class OpenAIChatClientAgent : AIAgent
///
public sealed override Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -93,7 +93,7 @@ public class OpenAIChatClientAgent : AIAgent
///
public sealed override IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents/ChatCompletion/ChatClientAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents/ChatCompletion/ChatClientAgent.cs
index f100fee198..3deafcdc6f 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents/ChatCompletion/ChatClientAgent.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents/ChatCompletion/ChatClientAgent.cs
@@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -99,15 +100,15 @@ public sealed class ChatClientAgent : AIAgent
///
public override async Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
- _ = Throw.IfNull(messages);
+ var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList();
(AgentThread safeThread, ChatOptions? chatOptions, List threadMessages) =
- await this.PrepareThreadAndMessagesAsync(thread, messages, options, cancellationToken).ConfigureAwait(false);
+ await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false);
var agentName = this.GetLoggingAgentName();
@@ -115,14 +116,14 @@ public sealed class ChatClientAgent : AIAgent
ChatResponse chatResponse = await this.ChatClient.GetResponseAsync(threadMessages, chatOptions, cancellationToken).ConfigureAwait(false);
- this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, agentName, this._chatClientType, messages.Count);
+ this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, agentName, this._chatClientType, inputMessages.Count);
// We can derive the type of supported thread from whether we have a conversation id,
// so let's update it and set the conversation id for the service thread case.
this.UpdateThreadWithTypeAndConversationId(safeThread, chatResponse.ConversationId);
// Only notify the thread of new messages if the chatResponse was successful to avoid inconsistent messages state in the thread.
- await NotifyThreadOfNewMessagesAsync(safeThread, messages, cancellationToken).ConfigureAwait(false);
+ await NotifyThreadOfNewMessagesAsync(safeThread, inputMessages, cancellationToken).ConfigureAwait(false);
// Ensure that the author name is set for each message in the response.
foreach (ChatMessage chatResponseMessage in chatResponse.Messages)
@@ -140,12 +141,12 @@ public sealed class ChatClientAgent : AIAgent
///
public override async IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
- var inputMessages = Throw.IfNull(messages);
+ var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList();
(AgentThread safeThread, ChatOptions? chatOptions, List threadMessages) =
await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false);
@@ -334,7 +335,7 @@ public sealed class ChatClientAgent : AIAgent
/// A tuple containing the thread, chat options, and thread messages.
private async Task<(AgentThread AgentThread, ChatOptions? ChatOptions, List ThreadMessages)> PrepareThreadAndMessagesAsync(
AgentThread? thread,
- IReadOnlyCollection inputMessages,
+ IEnumerable inputMessages,
AgentRunOptions? runOptions,
CancellationToken cancellationToken)
{
diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents/OpenTelemetryAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents/OpenTelemetryAgent.cs
index 7f8b46c82b..90f547a1c2 100644
--- a/dotnet/src/Microsoft.Extensions.AI.Agents/OpenTelemetryAgent.cs
+++ b/dotnet/src/Microsoft.Extensions.AI.Agents/OpenTelemetryAgent.cs
@@ -123,23 +123,23 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable
///
public override async Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
- _ = Throw.IfNull(messages);
+ var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList();
- using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, messages, thread);
+ using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, inputMessages, thread);
Stopwatch? stopwatch = this._operationDurationHistogram.Enabled ? Stopwatch.StartNew() : null;
- this.LogChatMessages(messages);
+ this.LogChatMessages(inputMessages);
AgentRunResponse? response = null;
Exception? error = null;
try
{
- response = await base.RunAsync(messages, thread, options, cancellationToken).ConfigureAwait(false);
+ response = await base.RunAsync(inputMessages, thread, options, cancellationToken).ConfigureAwait(false);
return response;
}
catch (Exception ex)
@@ -149,30 +149,30 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable
}
finally
{
- this.TraceResponse(activity, response, error, stopwatch, messages.Count, isStreaming: false);
+ this.TraceResponse(activity, response, error, stopwatch, inputMessages.Count, isStreaming: false);
}
}
///
public override async IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
- _ = Throw.IfNull(messages);
+ var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList();
- using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, messages, thread);
+ using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, inputMessages, thread);
Stopwatch? stopwatch = this._operationDurationHistogram.Enabled ? Stopwatch.StartNew() : null;
IAsyncEnumerable updates;
try
{
- updates = base.RunStreamingAsync(messages, thread, options, cancellationToken);
+ updates = base.RunStreamingAsync(inputMessages, thread, options, cancellationToken);
}
catch (Exception ex)
{
- this.TraceResponse(activity, response: null, ex, stopwatch, messages.Count, isStreaming: true);
+ this.TraceResponse(activity, response: null, ex, stopwatch, inputMessages.Count, isStreaming: true);
throw;
}
@@ -206,7 +206,7 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable
}
finally
{
- this.TraceResponse(activity, trackedUpdates.ToAgentRunResponse(), error, stopwatch, messages.Count, isStreaming: true);
+ this.TraceResponse(activity, trackedUpdates.ToAgentRunResponse(), error, stopwatch, inputMessages.Count, isStreaming: true);
await responseEnumerator.DisposeAsync().ConfigureAwait(false);
}
}
diff --git a/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/MockAgent.cs b/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/MockAgent.cs
index bf0a428aa3..09ddd5c0cb 100644
--- a/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/MockAgent.cs
+++ b/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/MockAgent.cs
@@ -36,7 +36,7 @@ internal sealed class MockAgent(int index) : AIAgent
return new AgentThread() { ConversationId = Guid.NewGuid().ToString() };
}
- public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
this.InvokeCount++;
if (thread == null)
@@ -48,7 +48,7 @@ internal sealed class MockAgent(int index) : AIAgent
return Task.FromResult(new AgentRunResponse(messages: [.. this.Response]));
}
- public override IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
this.InvokeCount++;
diff --git a/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/OrchestrationResultTests.cs b/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/OrchestrationResultTests.cs
index 9d9f238cca..70c1601425 100644
--- a/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/OrchestrationResultTests.cs
+++ b/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/OrchestrationResultTests.cs
@@ -80,18 +80,18 @@ public class OrchestrationResultTests
private sealed class MockOrchestratingAgent() : OrchestratingAgent([new MockAgent()])
{
- protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
+ protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
throw new NotSupportedException();
- protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
+ protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) =>
throw new NotSupportedException();
}
private sealed class MockAgent : AIAgent
{
- public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) =>
+ public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) =>
throw new NotSupportedException();
- public override IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) =>
+ public override IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) =>
throw new NotSupportedException();
}
}
diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/RepresentationTests.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/RepresentationTests.cs
index 294b58a4de..ef0978db75 100644
--- a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/RepresentationTests.cs
+++ b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/RepresentationTests.cs
@@ -22,12 +22,12 @@ public class RepresentationTests
private sealed class TestAgent : AIAgent
{
- public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
- public override IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs
index 47e5128cd9..18a855c803 100644
--- a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs
+++ b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs
@@ -95,7 +95,7 @@ internal sealed class HelloAgent(string id = nameof(HelloAgent)) : AIAgent
public override string Id => id;
public override string? Name => id;
- public override async Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
IEnumerable update = [
await this.RunStreamingAsync(messages, thread, options, cancellationToken)
@@ -105,7 +105,7 @@ internal sealed class HelloAgent(string id = nameof(HelloAgent)) : AIAgent
return update.ToAgentRunResponse();
}
- public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
AgentRunResponseUpdate response = new(ChatRole.Assistant, "Hello World!")
{
@@ -126,7 +126,7 @@ internal sealed class EchoAgent(string id = nameof(EchoAgent)) : AIAgent
public override string Id => id;
public override string? Name => id;
- public override async Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
IEnumerable update = [
await this.RunStreamingAsync(messages, thread, options, cancellationToken)
@@ -136,15 +136,17 @@ internal sealed class EchoAgent(string id = nameof(EchoAgent)) : AIAgent
return update.ToAgentRunResponse();
}
- public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
- if (messages.Count == 0)
+ var messagesList = messages as IReadOnlyCollection ?? messages.ToList();
+
+ if (messagesList.Count == 0)
{
throw new ArgumentException("No messages provided to echo.", nameof(messages));
}
StringBuilder collectedText = new(Prefix);
- foreach (string messageText in messages.Select(message => message.Text)
+ foreach (string messageText in messagesList.Select(message => message.Text)
.Where(text => !string.IsNullOrEmpty(text)))
{
collectedText.AppendLine(messageText);
diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs
index 959ec2c39c..5e67966711 100644
--- a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs
+++ b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs
@@ -56,7 +56,7 @@ public class SpecializedExecutorSmokeTests
public List Messages { get; } = Validate(messages) ?? [];
- public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
return Task.FromResult(new AgentRunResponse(this.Messages)
{
@@ -65,7 +65,7 @@ public class SpecializedExecutorSmokeTests
});
}
- public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
string responseId = Guid.NewGuid().ToString("N");
foreach (ChatMessage message in this.Messages)
diff --git a/dotnet/tests/Microsoft.Extensions.AI.Agents.Abstractions.UnitTests/AIAgentTests.cs b/dotnet/tests/Microsoft.Extensions.AI.Agents.Abstractions.UnitTests/AIAgentTests.cs
index 411e0bfb7e..04cd8584c8 100644
--- a/dotnet/tests/Microsoft.Extensions.AI.Agents.Abstractions.UnitTests/AIAgentTests.cs
+++ b/dotnet/tests/Microsoft.Extensions.AI.Agents.Abstractions.UnitTests/AIAgentTests.cs
@@ -361,13 +361,13 @@ public class AIAgentTests
private sealed class MockAgent : AIAgent
{
- public static new Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IReadOnlyCollection messages, CancellationToken cancellationToken)
+ public static new Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IEnumerable messages, CancellationToken cancellationToken)
{
return AIAgent.NotifyThreadOfNewMessagesAsync(thread, messages, cancellationToken);
}
public override Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -376,7 +376,7 @@ public class AIAgentTests
}
public override IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
diff --git a/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentActorTests.cs b/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentActorTests.cs
index 1584960e9b..d6eeb63b7a 100644
--- a/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentActorTests.cs
+++ b/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentActorTests.cs
@@ -175,7 +175,7 @@ public class AgentActorTests
public bool RunStreamingAsyncCalled { get; private set; }
public AgentThread? ThreadUsedInRunStreamingAsync { get; private set; }
- public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
+ public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
this.ThreadUsedInRunStreamingAsync = thread;
return Task.FromResult(new AgentRunResponse
@@ -184,7 +184,7 @@ public class AgentActorTests
});
}
- public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
this.RunStreamingAsyncCalled = true;
this.ThreadUsedInRunStreamingAsync = thread;
diff --git a/dotnet/tests/Microsoft.Extensions.AI.Agents.UnitTests/AgentAIFunctionFactoryTests.cs b/dotnet/tests/Microsoft.Extensions.AI.Agents.UnitTests/AgentAIFunctionFactoryTests.cs
index d1c10c5f40..014c9b6759 100644
--- a/dotnet/tests/Microsoft.Extensions.AI.Agents.UnitTests/AgentAIFunctionFactoryTests.cs
+++ b/dotnet/tests/Microsoft.Extensions.AI.Agents.UnitTests/AgentAIFunctionFactoryTests.cs
@@ -306,7 +306,7 @@ public class AgentAIFunctionFactoryTests
public int RunAsyncCallCount { get; private set; }
public override Task RunAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
@@ -324,7 +324,7 @@ public class AgentAIFunctionFactoryTests
}
public override async IAsyncEnumerable RunStreamingAsync(
- IReadOnlyCollection messages,
+ IEnumerable messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
diff --git a/python/packages/main/agent_framework/openai/_responses_client.py b/python/packages/main/agent_framework/openai/_responses_client.py
index cc3cd725b0..eb3b9bcbd7 100644
--- a/python/packages/main/agent_framework/openai/_responses_client.py
+++ b/python/packages/main/agent_framework/openai/_responses_client.py
@@ -579,10 +579,10 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
TextContent(text=message_content.refusal, raw_representation=message_content)
)
case "reasoning": # ResponseOutputReasoning
- if item.content:
+ if hasattr(item, "content") and item.content:
for index, reasoning_content in enumerate(item.content):
additional_properties = None
- if item.summary and index < len(item.summary):
+ if hasattr(item, "summary") and item.summary and index < len(item.summary):
additional_properties = {"summary": item.summary[index]}
contents.append(
TextReasoningContent(
@@ -592,7 +592,7 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
)
)
case "code_interpreter_call": # ResponseOutputCodeInterpreterCall
- if item.outputs:
+ if hasattr(item, "outputs") and item.outputs:
for code_output in item.outputs:
if code_output.type == "logs":
contents.append(TextContent(text=code_output.logs, raw_representation=item))
@@ -605,16 +605,16 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
media_type="image",
)
)
- elif item.code:
+ elif hasattr(item, "code") and item.code:
# fallback if no output was returned is the code:
contents.append(TextContent(text=item.code, raw_representation=item))
case "function_call": # ResponseOutputFunctionCall
contents.append(
FunctionCallContent(
- call_id=item.call_id if item.call_id else "",
- name=item.name,
- arguments=item.arguments,
- additional_properties={"fc_id": item.id},
+ call_id=item.call_id if hasattr(item, "call_id") and item.call_id else "",
+ name=item.name if hasattr(item, "name") else "",
+ arguments=item.arguments if hasattr(item, "arguments") else "",
+ additional_properties={"fc_id": item.id} if hasattr(item, "id") else {},
raw_representation=item,
)
)
@@ -739,6 +739,18 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
case "response.output_text.delta":
contents.append(TextContent(text=event.delta, raw_representation=event))
metadata.update(self._get_metadata_from_response(event))
+ case "response.reasoning_text.delta":
+ contents.append(TextReasoningContent(text=event.delta, raw_representation=event))
+ metadata.update(self._get_metadata_from_response(event))
+ case "response.reasoning_text.done":
+ contents.append(TextReasoningContent(text=event.text, raw_representation=event))
+ metadata.update(self._get_metadata_from_response(event))
+ case "response.reasoning_summary_text.delta":
+ contents.append(TextReasoningContent(text=event.delta, raw_representation=event))
+ metadata.update(self._get_metadata_from_response(event))
+ case "response.reasoning_summary_text.done":
+ contents.append(TextReasoningContent(text=event.text, raw_representation=event))
+ metadata.update(self._get_metadata_from_response(event))
case "response.completed":
conversation_id = event.response.id if chat_options.store is True else None
model = event.response.model
@@ -779,7 +791,7 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
)
)
case "code_interpreter_call": # ResponseOutputCodeInterpreterCall
- if event_item.outputs:
+ if hasattr(event_item, "outputs") and event_item.outputs:
for code_output in event_item.outputs:
if code_output.type == "logs":
contents.append(TextContent(text=code_output.logs, raw_representation=event_item))
@@ -792,14 +804,18 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient):
media_type="image",
)
)
- elif event_item.code:
+ elif hasattr(event_item, "code") and event_item.code:
# fallback if no output was returned is the code:
contents.append(TextContent(text=event_item.code, raw_representation=event_item))
case "reasoning": # ResponseOutputReasoning
- if event_item.content:
+ if hasattr(event_item, "content") and event_item.content:
for index, reasoning_content in enumerate(event_item.content):
additional_properties = None
- if event_item.summary and index < len(event_item.summary):
+ if (
+ hasattr(event_item, "summary")
+ and event_item.summary
+ and index < len(event_item.summary)
+ ):
additional_properties = {"summary": event_item.summary[index]}
contents.append(
TextReasoningContent(
diff --git a/python/packages/main/agent_framework/workflow/__init__.py b/python/packages/main/agent_framework/workflow/__init__.py
index db8bccbcb2..cb8cd56fcf 100644
--- a/python/packages/main/agent_framework/workflow/__init__.py
+++ b/python/packages/main/agent_framework/workflow/__init__.py
@@ -56,6 +56,7 @@ _IMPORTS = [
"PlanReviewRequest",
"RequestInfoEvent",
"StandardMagenticManager",
+ "ConcurrentBuilder",
]
diff --git a/python/packages/main/agent_framework/workflow/__init__.pyi b/python/packages/main/agent_framework/workflow/__init__.pyi
index 63f301775f..ea88728525 100644
--- a/python/packages/main/agent_framework/workflow/__init__.pyi
+++ b/python/packages/main/agent_framework/workflow/__init__.pyi
@@ -8,6 +8,7 @@ from agent_framework_workflow import (
AgentRunUpdateEvent,
Case,
CheckpointStorage,
+ ConcurrentBuilder,
Default,
Executor,
ExecutorCompletedEvent,
@@ -56,6 +57,7 @@ __all__ = [
"AgentRunUpdateEvent",
"Case",
"CheckpointStorage",
+ "ConcurrentBuilder",
"Default",
"Executor",
"ExecutorCompletedEvent",
diff --git a/python/packages/main/pyproject.toml b/python/packages/main/pyproject.toml
index 64b34bd0ef..51c658b03e 100644
--- a/python/packages/main/pyproject.toml
+++ b/python/packages/main/pyproject.toml
@@ -33,6 +33,7 @@ dependencies = [
"azure-monitor-opentelemetry>=1.7.0",
"azure-monitor-opentelemetry-exporter>=1.0.0b41",
"opentelemetry-exporter-otlp-proto-grpc>=1.36.0",
+ "opentelemetry-semantic-conventions-ai>=0.4.13"
]
[project.optional-dependencies]
diff --git a/python/packages/main/tests/openai/test_openai_responses_client.py b/python/packages/main/tests/openai/test_openai_responses_client.py
index 945aec9a39..f0e2e01537 100644
--- a/python/packages/main/tests/openai/test_openai_responses_client.py
+++ b/python/packages/main/tests/openai/test_openai_responses_client.py
@@ -1494,6 +1494,7 @@ def test_service_response_exception_includes_original_error_details() -> None:
assert original_error_message in exception_message
+
def test_get_streaming_response_with_response_format() -> None:
"""Test get_streaming_response with response_format."""
client = OpenAIResponsesClient(ai_model_id="test-model", api_key="test-key")
diff --git a/python/packages/workflow/agent_framework_workflow/__init__.py b/python/packages/workflow/agent_framework_workflow/__init__.py
index 25fe4591de..5d3e1352fb 100644
--- a/python/packages/workflow/agent_framework_workflow/__init__.py
+++ b/python/packages/workflow/agent_framework_workflow/__init__.py
@@ -9,6 +9,7 @@ from ._checkpoint import (
InMemoryCheckpointStorage,
WorkflowCheckpoint,
)
+from ._concurrent import ConcurrentBuilder
from ._const import (
DEFAULT_MAX_ITERATIONS,
)
@@ -93,6 +94,7 @@ __all__ = [
"AgentRunUpdateEvent",
"Case",
"CheckpointStorage",
+ "ConcurrentBuilder",
"Default",
"EdgeDuplicationError",
"Executor",
diff --git a/python/packages/workflow/agent_framework_workflow/_concurrent.py b/python/packages/workflow/agent_framework_workflow/_concurrent.py
new file mode 100644
index 0000000000..14feb6bc8e
--- /dev/null
+++ b/python/packages/workflow/agent_framework_workflow/_concurrent.py
@@ -0,0 +1,304 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+import asyncio
+import inspect
+import logging
+from collections.abc import Callable, Sequence
+from typing import Any
+
+from agent_framework import AgentProtocol, ChatMessage, Role
+
+from ._events import WorkflowCompletedEvent
+from ._executor import AgentExecutorRequest, AgentExecutorResponse, Executor, handler
+from ._workflow import Workflow, WorkflowBuilder
+from ._workflow_context import WorkflowContext
+
+logger = logging.getLogger(__name__)
+
+"""Concurrent builder for agent-only fan-out/fan-in workflows.
+
+This module provides a high-level, agent-focused API to quickly assemble a
+parallel workflow with:
+- a default dispatcher that broadcasts the input to all agent participants
+- a default aggregator that combines all agent conversations and completes the workflow
+
+Notes:
+- Participants should be AgentProtocol instances or Executors.
+- A custom aggregator can be provided as:
+ - an Executor instance (it should handle list[AgentExecutorResponse] and add a WorkflowCompletedEvent), or
+ - a callback function with signature:
+ def cb(results: list[AgentExecutorResponse]) -> Any | None
+ def cb(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None
+ If the callback returns a non-None value, it is sent as the data of a WorkflowCompletedEvent.
+ If it returns None, the callback may have already emitted a completion event via ctx.
+"""
+
+
+class _DispatchToAllParticipants(Executor):
+ """Broadcasts input to all downstream participants (via fan-out edges)."""
+
+ @handler
+ async def from_request(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
+ # No explicit target: edge routing delivers to all connected participants.
+ await ctx.send_message(request)
+
+ @handler
+ async def from_str(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
+ request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=prompt)], should_respond=True)
+ await ctx.send_message(request)
+
+ @handler
+ async def from_message(self, message: ChatMessage, ctx: WorkflowContext[AgentExecutorRequest]) -> None: # type: ignore[name-defined]
+ request = AgentExecutorRequest(messages=[message], should_respond=True)
+ await ctx.send_message(request)
+
+ @handler
+ async def from_messages(self, messages: list[ChatMessage], ctx: WorkflowContext[AgentExecutorRequest]) -> None: # type: ignore[name-defined]
+ request = AgentExecutorRequest(messages=list(messages), should_respond=True)
+ await ctx.send_message(request)
+
+
+class _AggregateAgentConversations(Executor):
+ """Aggregates agent responses and completes with combined ChatMessages.
+
+ Emits a list[ChatMessage] shaped as:
+ [ single_user_prompt?, agent1_final_assistant, agent2_final_assistant, ... ]
+
+ - Extracts a single user prompt (first user message seen across results).
+ - For each result, selects the final assistant message (prefers agent_run_response.messages).
+ - Avoids duplicating the same user message per agent.
+ """
+
+ @handler
+ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> None:
+ if not results:
+ logger.error("Concurrent aggregator received empty results list")
+ raise ValueError("Aggregation failed: no results provided")
+
+ def _is_role(msg: Any, role: Role) -> bool:
+ r = getattr(msg, "role", None)
+ if r is None:
+ return False
+ # Normalize both r and role to lowercase strings for comparison
+ r_str = str(r).lower() if isinstance(r, str) or hasattr(r, "__str__") else r
+ role_str = getattr(role, "value", None)
+ if role_str is None:
+ role_str = str(role)
+ role_str = role_str.lower()
+ return r_str == role_str
+
+ prompt_message: ChatMessage | None = None
+ assistant_replies: list[ChatMessage] = []
+
+ for r in results:
+ resp_messages = list(getattr(r.agent_run_response, "messages", []) or [])
+ conv = r.full_conversation if r.full_conversation is not None else resp_messages
+
+ logger.debug(
+ f"Aggregating executor {getattr(r, 'executor_id', '')}: "
+ f"{len(resp_messages)} response msgs, {len(conv)} conversation msgs"
+ )
+
+ # Capture a single user prompt (first encountered across any conversation)
+ if prompt_message is None:
+ found_user = next((m for m in conv if _is_role(m, Role.USER)), None)
+ if found_user is not None:
+ prompt_message = found_user
+
+ # Pick the final assistant message from the response; fallback to conversation search
+ final_assistant = next((m for m in reversed(resp_messages) if _is_role(m, Role.ASSISTANT)), None)
+ if final_assistant is None:
+ final_assistant = next((m for m in reversed(conv) if _is_role(m, Role.ASSISTANT)), None)
+
+ if final_assistant is not None:
+ assistant_replies.append(final_assistant)
+ else:
+ logger.warning(
+ f"No assistant reply found for executor {getattr(r, 'executor_id', '')}; skipping"
+ )
+
+ if not assistant_replies:
+ logger.error(f"Aggregation failed: no assistant replies found across {len(results)} results")
+ raise RuntimeError("Aggregation failed: no assistant replies found")
+
+ output: list[ChatMessage] = []
+ if prompt_message is not None:
+ output.append(prompt_message)
+ else:
+ logger.warning("No user prompt found in any conversation; emitting assistants only")
+ output.extend(assistant_replies)
+
+ await ctx.add_event(WorkflowCompletedEvent(data=output))
+
+
+class _CallbackAggregator(Executor):
+ """Wraps a Python callback as an aggregator.
+
+ Accepts either an async or sync callback with one of the signatures:
+ - (results: list[AgentExecutorResponse]) -> Any | None
+ - (results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None
+
+ Notes:
+ - Async callbacks are awaited directly.
+ - Sync callbacks are executed via asyncio.to_thread to avoid blocking the event loop.
+ - If the callback returns a non-None value, it is wrapped in a WorkflowCompletedEvent.
+ """
+
+ def __init__(self, callback: Callable[..., Any], id: str | None = None) -> None:
+ super().__init__(id)
+ self._callback = callback
+ self._param_count = len(inspect.signature(callback).parameters)
+
+ @handler
+ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> None:
+ # Call according to provided signature, always non-blocking for sync callbacks
+ if self._param_count >= 2:
+ if inspect.iscoroutinefunction(self._callback):
+ ret = await self._callback(results, ctx) # type: ignore[misc]
+ else:
+ ret = await asyncio.to_thread(self._callback, results, ctx)
+ else:
+ if inspect.iscoroutinefunction(self._callback):
+ ret = await self._callback(results) # type: ignore[misc]
+ else:
+ ret = await asyncio.to_thread(self._callback, results)
+
+ # If the callback returned a value, finalize the workflow with it
+ if ret is not None:
+ await ctx.add_event(WorkflowCompletedEvent(ret))
+
+
+class ConcurrentBuilder:
+ r"""High-level builder for concurrent agent workflows.
+
+ - `participants([...])` accepts a list of AgentProtocol (recommended) or Executor.
+ - `build()` wires: dispatcher -> fan-out -> participants -> fan-in -> aggregator.
+ - `with_custom_aggregator(...)` overrides the default aggregator with an Executor or callback.
+
+ Usage:
+ ```python
+ from agent_framework.workflow import ConcurrentBuilder
+
+ # Minimal: use default aggregator (returns list[ChatMessage])
+ workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()
+
+
+ # Custom aggregator via callback (sync or async). The callback receives
+ # list[AgentExecutorResponse] and its return value becomes
+ # WorkflowCompletedEvent.data
+ def summarize(results):
+ return " | ".join(r.agent_run_response.messages[-1].text for r in results)
+
+
+ workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_custom_aggregator(summarize).build()
+ ```
+ """
+
+ def __init__(self) -> None:
+ self._participants: list[AgentProtocol | Executor] = []
+ self._aggregator: Executor | None = None
+
+ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "ConcurrentBuilder":
+ r"""Define the parallel participants for this concurrent workflow.
+
+ Accepts AgentProtocol instances (e.g., created by a chat client) or Executor
+ instances. Each participant is wired as a parallel branch using fan-out edges
+ from an internal dispatcher.
+
+ Raises:
+ ValueError: if `participants` is empty or contains duplicates
+ TypeError: if any entry is not AgentProtocol or Executor
+
+ Example:
+ ```python
+ wf = ConcurrentBuilder().participants([researcher_agent, marketer_agent, legal_agent]).build()
+
+ # Mixing agent(s) and executor(s) is supported
+ wf2 = ConcurrentBuilder().participants([researcher_agent, my_custom_executor]).build()
+ ```
+ """
+ if not participants:
+ raise ValueError("participants cannot be empty")
+
+ # Defensive duplicate detection
+ seen_agent_ids: set[int] = set()
+ seen_executor_ids: set[str] = set()
+ for p in participants:
+ if isinstance(p, Executor):
+ if p.id in seen_executor_ids:
+ raise ValueError(f"Duplicate executor participant detected: id '{p.id}'")
+ seen_executor_ids.add(p.id)
+ elif isinstance(p, AgentProtocol):
+ pid = id(p)
+ if pid in seen_agent_ids:
+ raise ValueError("Duplicate agent participant detected (same agent instance provided twice)")
+ seen_agent_ids.add(pid)
+ else:
+ raise TypeError(f"participants must be AgentProtocol or Executor instances; got {type(p).__name__}")
+
+ self._participants = list(participants)
+ return self
+
+ def with_aggregator(self, aggregator: Executor | Callable[..., Any]) -> "ConcurrentBuilder":
+ r"""Override the default aggregator with an Executor or a callback.
+
+ - Executor: must handle `list[AgentExecutorResponse]` and add a
+ `WorkflowCompletedEvent` to the context.
+ - Callback: sync or async callable with one of the signatures:
+ `(results: list[AgentExecutorResponse]) -> Any | None` or
+ `(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None`.
+ If the callback returns a non-None value, it becomes the
+ `WorkflowCompletedEvent.data`.
+
+ Example:
+ ```python
+ # Callback-based aggregator (string result)
+ async def summarize(results):
+ return " | ".join(r.agent_run_response.messages[-1].text for r in results)
+
+
+ wf = ConcurrentBuilder().participants([a1, a2, a3]).with_custom_aggregator(summarize).build()
+ ```
+ """
+ if isinstance(aggregator, Executor):
+ self._aggregator = aggregator
+ elif callable(aggregator):
+ self._aggregator = _CallbackAggregator(aggregator)
+ else:
+ raise TypeError("aggregator must be an Executor or a callable")
+ return self
+
+ def build(self) -> Workflow:
+ r"""Build and validate the concurrent workflow.
+
+ Wiring pattern:
+ - Dispatcher (internal) fans out the input to all `participants`
+ - Fan-in aggregator collects `AgentExecutorResponse` objects
+ - Aggregator emits a `WorkflowCompletedEvent` with either:
+ - list[ChatMessage] (default aggregator: one user + one assistant per agent)
+ - custom payload from the provided callback/executor
+
+ Returns:
+ Workflow: a ready-to-run workflow instance
+
+ Raises:
+ ValueError: if no participants were defined
+
+ Example:
+ ```python
+ workflow = ConcurrentBuilder().participants([agent1, agent2]).build()
+ ```
+ """
+ if not self._participants:
+ raise ValueError("No participants provided. Call .participants([...]) first.")
+
+ dispatcher = _DispatchToAllParticipants(id="dispatcher")
+ aggregator = self._aggregator or _AggregateAgentConversations(id="aggregator")
+
+ builder = WorkflowBuilder()
+ return (
+ builder.set_start_executor(dispatcher)
+ .add_fan_out_edges(dispatcher, list(self._participants))
+ .add_fan_in_edges(list(self._participants), aggregator)
+ .build()
+ )
diff --git a/python/packages/workflow/tests/test_concurrent.py b/python/packages/workflow/tests/test_concurrent.py
new file mode 100644
index 0000000000..25becca7cd
--- /dev/null
+++ b/python/packages/workflow/tests/test_concurrent.py
@@ -0,0 +1,126 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+from typing import Any, cast
+
+import pytest
+from agent_framework import AgentRunResponse, ChatMessage, Role
+
+from agent_framework_workflow import (
+ AgentExecutorRequest,
+ AgentExecutorResponse,
+ ConcurrentBuilder,
+ Executor,
+ WorkflowCompletedEvent,
+ WorkflowContext,
+ handler,
+)
+
+
+class _FakeAgentExec(Executor):
+ """Test executor that mimics an agent by emitting an AgentExecutorResponse.
+
+ It takes the incoming AgentExecutorRequest, produces a single assistant message
+ with the configured reply text, and sends an AgentExecutorResponse that includes
+ full_conversation (the original user prompt followed by the assistant message).
+ """
+
+ def __init__(self, id: str, reply_text: str) -> None:
+ super().__init__(id)
+ self._reply_text = reply_text
+
+ @handler
+ async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
+ response = AgentRunResponse(messages=ChatMessage(Role.ASSISTANT, text=self._reply_text))
+ full_conversation = list(request.messages) + list(response.messages)
+ await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
+
+
+def test_concurrent_builder_rejects_empty_participants() -> None:
+ with pytest.raises(ValueError):
+ ConcurrentBuilder().participants([])
+
+
+def test_concurrent_builder_rejects_duplicate_executors() -> None:
+ a = _FakeAgentExec("dup", "A")
+ b = _FakeAgentExec("dup", "B") # same executor id
+ with pytest.raises(ValueError):
+ ConcurrentBuilder().participants([a, b])
+
+
+async def test_concurrent_default_aggregator_emits_single_user_and_assistants() -> None:
+ # Three synthetic agent executors
+ e1 = _FakeAgentExec("agentA", "Alpha")
+ e2 = _FakeAgentExec("agentB", "Beta")
+ e3 = _FakeAgentExec("agentC", "Gamma")
+
+ wf = ConcurrentBuilder().participants([e1, e2, e3]).build()
+
+ completed: WorkflowCompletedEvent | None = None
+ async for ev in wf.run_stream("prompt: hello world"):
+ if isinstance(ev, WorkflowCompletedEvent):
+ completed = ev
+ break
+
+ assert completed is not None
+ assert isinstance(completed.data, list)
+ messages: list[ChatMessage] = cast(list[ChatMessage], completed.data) # type: ignore
+
+ # Expect one user message + one assistant message per participant
+ assert len(messages) == 1 + 3
+ assert messages[0].role == Role.USER
+ assert "hello world" in messages[0].text
+
+ assistant_texts = {m.text for m in messages[1:]}
+ assert assistant_texts == {"Alpha", "Beta", "Gamma"}
+ assert all(m.role == Role.ASSISTANT for m in messages[1:])
+
+
+async def test_concurrent_custom_aggregator_callback_is_used() -> None:
+ # Two synthetic agent executors for brevity
+ e1 = _FakeAgentExec("agentA", "One")
+ e2 = _FakeAgentExec("agentB", "Two")
+
+ async def summarize(results: list[AgentExecutorResponse]) -> str:
+ texts: list[str] = []
+ for r in results:
+ msgs: list[ChatMessage] = r.agent_run_response.messages
+ texts.append(msgs[-1].text if msgs else "")
+ return " | ".join(sorted(texts))
+
+ wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(summarize).build()
+
+ completed: WorkflowCompletedEvent | None = None
+ async for ev in wf.run_stream("prompt: custom"):
+ if isinstance(ev, WorkflowCompletedEvent):
+ completed = ev
+ break
+
+ assert completed is not None
+ # Custom aggregator returns a string payload
+ assert isinstance(completed.data, str)
+ assert completed.data == "One | Two"
+
+
+async def test_concurrent_custom_aggregator_sync_callback_is_used() -> None:
+ e1 = _FakeAgentExec("agentA", "One")
+ e2 = _FakeAgentExec("agentB", "Two")
+
+ # Sync callback with ctx parameter (should run via asyncio.to_thread)
+ def summarize_sync(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> str: # type: ignore[unused-argument]
+ texts: list[str] = []
+ for r in results:
+ msgs: list[ChatMessage] = r.agent_run_response.messages
+ texts.append(msgs[-1].text if msgs else "")
+ return " | ".join(sorted(texts))
+
+ wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(summarize_sync).build()
+
+ completed: WorkflowCompletedEvent | None = None
+ async for ev in wf.run_stream("prompt: custom sync"):
+ if isinstance(ev, WorkflowCompletedEvent):
+ completed = ev
+ break
+
+ assert completed is not None
+ assert isinstance(completed.data, str)
+ assert completed.data == "One | Two"
diff --git a/python/pyproject.toml b/python/pyproject.toml
index a8b1d7f979..a891bbe713 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -31,7 +31,7 @@ dev = [
"markdownify",
# Documentation
"myst-nb==1.1.2",
- "pydata-sphinx-theme==0.16.0",
+ "pydata-sphinx-theme==0.16.1",
"sphinx-copybutton",
"sphinx-design",
"sphinx",
diff --git a/python/samples/getting_started/agents/openai_responses_client/openai_responses_client_reasoning.py b/python/samples/getting_started/agents/openai_responses_client/openai_responses_client_reasoning.py
index fcc07c6efa..f9dfb34f16 100644
--- a/python/samples/getting_started/agents/openai_responses_client/openai_responses_client_reasoning.py
+++ b/python/samples/getting_started/agents/openai_responses_client/openai_responses_client_reasoning.py
@@ -10,12 +10,12 @@ async def reasoning_example() -> None:
"""Example of reasoning response (get results as they are generated)."""
print("=== Reasoning Example ===")
- agent = OpenAIResponsesClient(ai_model_id="o4-mini").create_agent(
+ agent = OpenAIResponsesClient(ai_model_id="gpt-5").create_agent(
name="MathHelper",
instructions="You are a personal math tutor. When asked a math question, "
"write and run code using the python tool to answer the question.",
tools=HostedCodeInterpreterTool(),
- reasoning={"effort": "medium"},
+ reasoning={"effort": "high", "summary": "detailed"},
)
query = "I need to solve the equation 3x + 11 = 14. Can you help me?"
@@ -27,9 +27,9 @@ async def reasoning_example() -> None:
for content in chunk.contents:
if isinstance(content, TextReasoningContent):
print(f"\033[97m{content.text}\033[0m", end="", flush=True)
- if isinstance(content, TextContent):
+ elif isinstance(content, TextContent):
print(content.text, end="", flush=True)
- if isinstance(content, UsageContent):
+ elif isinstance(content, UsageContent):
usage = content
print("\n")
if usage:
diff --git a/python/samples/getting_started/workflow/README.md b/python/samples/getting_started/workflow/README.md
index 744ad3c9c0..9fa42bc209 100644
--- a/python/samples/getting_started/workflow/README.md
+++ b/python/samples/getting_started/workflow/README.md
@@ -78,6 +78,9 @@ Once comfortable with these, explore the rest of the samples below.
### orchestration
| Sample | File | Concepts |
|---|---|---|
+| Concurrent Orchestration (Default Aggregator) | [orchestration/concurrent_agents.py](./orchestration/concurrent_agents.py) | Fan-out to multiple agents; fan-in with default aggregator returning combined ChatMessages |
+| Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM |
+| Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder |
| Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming |
| Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution |
diff --git a/python/samples/getting_started/workflow/orchestration/concurrent_agents.py b/python/samples/getting_started/workflow/orchestration/concurrent_agents.py
new file mode 100644
index 0000000000..b8e165c5b4
--- /dev/null
+++ b/python/samples/getting_started/workflow/orchestration/concurrent_agents.py
@@ -0,0 +1,131 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+import asyncio
+from typing import Any
+
+from agent_framework import ChatMessage
+from agent_framework.azure import AzureChatClient
+from agent_framework.workflow import ConcurrentBuilder, WorkflowCompletedEvent
+from azure.identity import AzureCliCredential
+
+"""
+Sample: Concurrent fan-out/fan-in (agent-only API) with default aggregator
+
+Build a high-level concurrent workflow using ConcurrentBuilder and three domain agents.
+The default dispatcher fans out the same user prompt to all agents in parallel.
+The default aggregator fans in their results and emits a WorkflowCompletedEvent whose
+data is a list[ChatMessage] representing the concatenated conversations from all agents.
+
+Demonstrates:
+- Minimal wiring with ConcurrentBuilder().participants([...]).build()
+- Fan-out to multiple agents, fan-in aggregation of final ChatMessages
+- Streaming of AgentRunEvent for simple progress visibility
+
+Prerequisites:
+- Azure OpenAI access configured for AzureChatClient (use az login + env vars)
+- Familiarity with Workflow events (AgentRunEvent, WorkflowCompletedEvent)
+"""
+
+
+async def main() -> None:
+ # 1) Create three domain agents using AzureChatClient
+ chat_client = AzureChatClient(credential=AzureCliCredential())
+
+ researcher = chat_client.create_agent(
+ instructions=(
+ "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
+ " opportunities, and risks."
+ ),
+ name="researcher",
+ )
+
+ marketer = chat_client.create_agent(
+ instructions=(
+ "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
+ " aligned to the prompt."
+ ),
+ name="marketer",
+ )
+
+ legal = chat_client.create_agent(
+ instructions=(
+ "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
+ " based on the prompt."
+ ),
+ name="legal",
+ )
+
+ # 2) Build a concurrent workflow
+ # Participants are either Agents (type of AgentProtocol) or Executors
+ workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
+
+ # 3) Run with a single prompt, stream progress, and pretty-print the final combined messages
+ completion: WorkflowCompletedEvent | None = None
+ async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
+ if isinstance(event, WorkflowCompletedEvent):
+ completion = event
+
+ if completion:
+ print("===== Final Aggregated Conversation (messages) =====")
+ messages: list[ChatMessage] | Any = completion.data
+ for i, msg in enumerate(messages, start=1):
+ name = msg.author_name if msg.author_name else "user"
+ print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")
+
+ """
+ Sample Output:
+
+ ===== Final Aggregated Conversation (messages) =====
+ ------------------------------------------------------------
+
+ 01 [user]:
+ We are launching a new budget-friendly electric bike for urban commuters.
+ ------------------------------------------------------------
+
+ 02 [researcher]:
+ **Insights:**
+
+ - **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
+ likely to include students, young professionals, and price-sensitive urban residents.
+ - **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
+ higher fuel costs, and sustainability concerns driving adoption.
+ - **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon,
+ Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia.
+ - **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection,
+ lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles),
+ and low-maintenance components.
+
+ **Opportunities:**
+
+ - **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of
+ operation, and cost savings vs. public transit/car ownership.
+ ...
+ ------------------------------------------------------------
+
+ 03 [marketer]:
+ **Value Proposition:**
+ "Empowering your city commute: Our new electric bike combines affordability, reliability, and
+ sustainable design—helping you conquer urban journeys without breaking the bank."
+
+ **Target Messaging:**
+
+ *For Young Professionals:*
+ ...
+ ------------------------------------------------------------
+
+ 04 [legal]:
+ **Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
+
+ **1. Regulatory Compliance**
+ - Verify that the electric bike meets all applicable federal, state, and local regulations
+ regarding e-bike classification, speed limits, power output, and safety features.
+ - Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained.
+
+ **2. Product Safety**
+ - Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions.
+ ...
+ """ # noqa: E501
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/python/samples/getting_started/workflow/orchestration/concurrent_custom_agent_executors.py b/python/samples/getting_started/workflow/orchestration/concurrent_custom_agent_executors.py
new file mode 100644
index 0000000000..067489dee2
--- /dev/null
+++ b/python/samples/getting_started/workflow/orchestration/concurrent_custom_agent_executors.py
@@ -0,0 +1,175 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+import asyncio
+from typing import Any
+
+from agent_framework import ChatAgent, ChatMessage
+from agent_framework.azure import AzureChatClient
+from agent_framework.workflow import (
+ AgentExecutorRequest,
+ AgentExecutorResponse,
+ ConcurrentBuilder,
+ Executor,
+ WorkflowCompletedEvent,
+ WorkflowContext,
+ handler,
+)
+from azure.identity import AzureCliCredential
+
+"""
+Sample: Concurrent Orchestration with Custom Agent Executors
+
+This sample shows a concurrent fan-out/fan-in pattern using child Executor classes
+that each own their ChatAgent. The executors accept AgentExecutorRequest inputs
+and emit AgentExecutorResponse outputs, which allows reuse of the high-level
+ConcurrentBuilder API and the default aggregator.
+
+Demonstrates:
+- Executors that create their ChatAgent in __init__ (via AzureChatClient)
+- A @handler that converts AgentExecutorRequest -> AgentExecutorResponse
+- ConcurrentBuilder().participants([...]) to build fan-out/fan-in
+- Default aggregator returning list[ChatMessage] (one user + one assistant per agent)
+
+Prerequisites:
+- Azure OpenAI configured for AzureChatClient (az login + required env vars)
+"""
+
+
+class ResearcherExec(Executor):
+ agent: ChatAgent
+
+ def __init__(self, chat_client: AzureChatClient, id: str = "researcher"):
+ agent = chat_client.create_agent(
+ instructions=(
+ "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
+ " opportunities, and risks."
+ ),
+ name=id,
+ )
+ super().__init__(agent=agent, id=id)
+
+ @handler
+ async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
+ response = await self.agent.run(request.messages)
+ full_conversation = list(request.messages) + list(response.messages)
+ await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
+
+
+class MarketerExec(Executor):
+ agent: ChatAgent
+
+ def __init__(self, chat_client: AzureChatClient, id: str = "marketer"):
+ agent = chat_client.create_agent(
+ instructions=(
+ "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
+ " aligned to the prompt."
+ ),
+ name=id,
+ )
+ super().__init__(agent=agent, id=id)
+
+ @handler
+ async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
+ response = await self.agent.run(request.messages)
+ full_conversation = list(request.messages) + list(response.messages)
+ await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
+
+
+class LegalExec(Executor):
+ agent: ChatAgent
+
+ def __init__(self, chat_client: AzureChatClient, id: str = "legal"):
+ agent = chat_client.create_agent(
+ instructions=(
+ "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
+ " based on the prompt."
+ ),
+ name=id,
+ )
+ super().__init__(agent=agent, id=id)
+
+ @handler
+ async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
+ response = await self.agent.run(request.messages)
+ full_conversation = list(request.messages) + list(response.messages)
+ await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
+
+
+async def main() -> None:
+ chat_client = AzureChatClient(credential=AzureCliCredential())
+
+ researcher = ResearcherExec(chat_client)
+ marketer = MarketerExec(chat_client)
+ legal = LegalExec(chat_client)
+
+ workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
+
+ completion: WorkflowCompletedEvent | None = None
+ async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
+ if isinstance(event, WorkflowCompletedEvent):
+ completion = event
+
+ if completion:
+ print("===== Final Aggregated Conversation (messages) =====")
+ messages: list[ChatMessage] | Any = completion.data
+ for i, msg in enumerate(messages, start=1):
+ name = msg.author_name if msg.author_name else "user"
+ print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")
+
+ """
+ Sample Output:
+
+ ===== Final Aggregated Conversation (messages) =====
+ ------------------------------------------------------------
+
+ 01 [user]:
+ We are launching a new budget-friendly electric bike for urban commuters.
+ ------------------------------------------------------------
+
+ 02 [researcher]:
+ **Insights:**
+
+ - **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
+ likely to include students, young professionals, and price-sensitive urban residents.
+ - **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
+ higher fuel costs, and sustainability concerns driving adoption.
+ - **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon,
+ Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia.
+ - **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection,
+ lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles),
+ and low-maintenance components.
+
+ **Opportunities:**
+
+ - **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of
+ operation, and cost savings vs. public transit/car ownership.
+ ...
+ ------------------------------------------------------------
+
+ 03 [marketer]:
+ **Value Proposition:**
+ "Empowering your city commute: Our new electric bike combines affordability, reliability, and
+ sustainable design—helping you conquer urban journeys without breaking the bank."
+
+ **Target Messaging:**
+
+ *For Young Professionals:*
+ ...
+ ------------------------------------------------------------
+
+ 04 [legal]:
+ **Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
+
+ **1. Regulatory Compliance**
+ - Verify that the electric bike meets all applicable federal, state, and local regulations
+ regarding e-bike classification, speed limits, power output, and safety features.
+ - Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained.
+
+ **2. Product Safety**
+ - Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions.
+ ...
+ """ # noqa: E501
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/python/samples/getting_started/workflow/orchestration/concurrent_custom_aggregator.py b/python/samples/getting_started/workflow/orchestration/concurrent_custom_aggregator.py
new file mode 100644
index 0000000000..73f95388c6
--- /dev/null
+++ b/python/samples/getting_started/workflow/orchestration/concurrent_custom_aggregator.py
@@ -0,0 +1,125 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+import asyncio
+from typing import Any
+
+from agent_framework import ChatMessage, Role
+from agent_framework.azure import AzureChatClient
+from agent_framework.workflow import ConcurrentBuilder, WorkflowCompletedEvent
+from azure.identity import AzureCliCredential
+
+"""
+Sample: Concurrent Orchestration with Custom Aggregator
+
+Build a concurrent workflow with ConcurrentBuilder that fans out one prompt to
+multiple domain agents and fans in their responses. Override the default
+aggregator with a custom async callback that uses AzureChatClient.get_response()
+to synthesize a concise, consolidated summary from the experts' outputs.
+
+Demonstrates:
+- ConcurrentBuilder().participants([...]).with_custom_aggregator(callback)
+- Fan-out to agents and fan-in at an aggregator
+- Aggregation implemented via an LLM call (chat_client.get_response)
+- WorkflowCompletedEvent carrying the synthesized summary string
+
+Prerequisites:
+- Azure OpenAI configured for AzureChatClient (az login + required env vars)
+"""
+
+
+async def main() -> None:
+ chat_client = AzureChatClient(credential=AzureCliCredential())
+
+ researcher = chat_client.create_agent(
+ instructions=(
+ "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
+ " opportunities, and risks."
+ ),
+ name="researcher",
+ )
+ marketer = chat_client.create_agent(
+ instructions=(
+ "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
+ " aligned to the prompt."
+ ),
+ name="marketer",
+ )
+ legal = chat_client.create_agent(
+ instructions=(
+ "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
+ " based on the prompt."
+ ),
+ name="legal",
+ )
+
+ # Define a custom aggregator callback that uses the chat client to summarize
+ async def summarize_results(results: list[Any]) -> str:
+ # Extract one final assistant message per agent
+ expert_sections: list[str] = []
+ for r in results:
+ try:
+ messages = getattr(r.agent_run_response, "messages", [])
+ final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
+ expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}:\n{final_text}")
+ except Exception as e:
+ expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}: (error: {type(e).__name__}: {e})")
+
+ # Ask the model to synthesize a concise summary of the experts' outputs
+ system_msg = ChatMessage(
+ Role.SYSTEM,
+ text=(
+ "You are a helpful assistant that consolidates multiple domain expert outputs "
+ "into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
+ ),
+ )
+ user_msg = ChatMessage(Role.USER, text="\n\n".join(expert_sections))
+
+ response = await chat_client.get_response([system_msg, user_msg])
+ # Return the model's final assistant text as the completion result
+ return response.messages[-1].text if response.messages else ""
+
+ # Build with a custom aggregator callback function
+ # - participants([...]) accepts AgentProtocol (agents) or Executor instances.
+ # Each participant becomes a parallel branch (fan-out) from an internal dispatcher.
+ # - with_aggregator(...) overrides the default aggregator:
+ # • Default aggregator -> returns list[ChatMessage] (one user + one assistant per agent)
+ # • Custom callback -> return value becomes WorkflowCompletedEvent.data (string here)
+ # The callback can be sync or async; it receives list[AgentExecutorResponse].
+ workflow = (
+ ConcurrentBuilder().participants([researcher, marketer, legal]).with_aggregator(summarize_results).build()
+ )
+
+ completion: WorkflowCompletedEvent | None = None
+ async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
+ if isinstance(event, WorkflowCompletedEvent):
+ completion = event
+
+ if completion:
+ print("===== Final Consolidated Output =====")
+ print(completion.data)
+
+ """
+ Sample Output:
+
+ ===== Final Consolidated Output =====
+ Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
+ with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
+ easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
+ include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
+ developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.
+
+ Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
+ for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
+ are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
+ and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
+ marketing are required. For connected features, data privacy policies and user consents are mandatory.
+
+ Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
+ emphasizing affordability, convenience, and sustainability. Slogan suggestion: “Charge Ahead—City Commutes Made
+ Affordable.” Legal review in each target market, compliance vetting, and robust customer support policies are
+ critical before launch.
+ """
+
+
+if __name__ == "__main__":
+ asyncio.run(main())