.NET: Fix Magentic to share agent replies across team (#6222)

* Fix Magentic to share agent replies across team

The per-round instruction was sent untargeted (fan-out delivered it to
every participant) and replies were never relayed, so a later speaker saw
the prior speaker's instruction but not its response - inverted from
GroupChatHost and the Python reference.

- Target the instruction at the selected speaker only.
- Broadcast each reply to the other participants (buffered, no TurnToken),
  excluding the responder via _currentSpeakerExecutorId, mirroring
  GroupChatHost.
- Persist _currentSpeakerExecutorId across checkpoints.
- Add a regression test.

* Address review feedback: null-guard, explicit checkpoint key, drop vacuous assertion

* Address review feedback: centralize checkpoint keys, clear current speaker

- Move CurrentSpeakerStateKey into MagenticConstants as
  nameof(CurrentSpeakerStateKey)
- Clear _currentSpeakerExecutorId in ResetAndReplanAsync and
  PrepareFinalAnswerAsync so a checkpoint taken in those windows does not
  persist a stale speaker
- Add UTF-8 BOM to RecordingEchoAgent.cs to satisfy the format check.
This commit is contained in:
Hasan Ghomi
2026-06-09 21:00:42 +04:00
committed by GitHub
Unverified
parent 632f67b92e
commit 2a345e5d3b
4 changed files with 170 additions and 7 deletions
@@ -5,4 +5,5 @@ namespace Microsoft.Agents.AI.Workflows.Specialized.Magentic;
internal static class MagenticConstants
{
public const string MagenticTaskContextKey = nameof(MagenticTaskContextKey);
public const string CurrentSpeakerStateKey = nameof(CurrentSpeakerStateKey);
}
@@ -90,6 +90,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List<AIAgent> team, Ta
private MagenticTaskContext? _taskContext;
private PortBinding? _planReviewPort;
private string? _currentSpeakerExecutorId;
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
{
@@ -196,15 +197,46 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List<AIAgent> team, Ta
else
{
// Subsequent turns: agent returned control, go directly to coordination (progress ledger only, no replan).
// Capture the participant's reply into the manager-visible chat history so the progress ledger can see it.
if (messages is { Count: > 0 })
{
// Capture the participant's reply into the manager-visible chat history so the progress ledger can see it.
this._taskContext.ChatHistory.AddRange(messages);
// Share the reply with the other participants except the replier
await this.BroadcastReplyToOtherParticipantsAsync(messages, context, cancellationToken).ConfigureAwait(false);
}
await this.RunCoordinationRoundAsync(this._taskContext, context, cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// Forwards a participant's reply to every other participant so they share the running conversation.
/// The messages are buffered (no <see cref="TurnToken"/> is sent) - they only become context for the participant's next turn.
/// </summary>
private ValueTask BroadcastReplyToOtherParticipantsAsync(
List<ChatMessage> messages, IWorkflowContext context, CancellationToken cancellationToken)
{
// Without a known current speaker we cannot exclude the reply's author, so skip the broadcast
// rather than risk echoing the reply back to its own author. This covers the window after a
// checkpoint restore but before any delegation has set the current speaker.
if (string.IsNullOrEmpty(this._currentSpeakerExecutorId))
{
return default;
}
List<Task>? sendTasks = null;
foreach (AIAgent agent in team)
{
string executorId = AIAgentHostExecutor.IdFor(agent);
if (string.Equals(executorId, this._currentSpeakerExecutorId, StringComparison.Ordinal))
{
continue;
}
(sendTasks ??= []).Add(context.SendMessageAsync(messages, executorId, cancellationToken).AsTask());
}
return sendTasks is null ? default : new ValueTask(Task.WhenAll(sendTasks));
}
private ChatMessage? _fullTaskLedgerMessage;
private ValueTask DelegateToTeamAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken)
{
@@ -287,15 +319,18 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List<AIAgent> team, Ta
return;
}
string nextExecutorId = AIAgentHostExecutor.IdFor(nextAgent);
if (!string.IsNullOrWhiteSpace(taskContext.ProgressLedger.InstructionOrQuestion))
{
ChatMessage instruction = new(ChatRole.Assistant, taskContext.ProgressLedger.InstructionOrQuestion);
taskContext.ChatHistory.Add(instruction);
await context.SendMessageAsync(instruction, cancellationToken).ConfigureAwait(false);
// Target the instruction at the chosen speaker only.
await context.SendMessageAsync(instruction, nextExecutorId, cancellationToken).ConfigureAwait(false);
}
string nextExecutorId = AIAgentHostExecutor.IdFor(nextAgent);
this._currentSpeakerExecutorId = nextExecutorId;
await context.SendMessageAsync(new TurnToken(taskContext.EmitUpdateEvents), nextExecutorId, cancellationToken).ConfigureAwait(false);
}
@@ -303,6 +338,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List<AIAgent> team, Ta
{
bool wasStalled = taskContext.IsStalled;
taskContext.Reset();
this._currentSpeakerExecutorId = null;
await context.SendMessageAsync(new ResetChatSignal(), cancellationToken: cancellationToken).ConfigureAwait(false);
await this.UpdatePlanAndDelegateAsync(taskContext, context, cancellationToken, replanAfterStall: wasStalled).ConfigureAwait(false);
@@ -313,9 +349,9 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List<AIAgent> team, Ta
List<ChatMessage> messages = [await this._manager.PrepareFinalAnswerAsync(taskContext, context, cancellationToken).ConfigureAwait(false)];
await context.YieldOutputAsync(messages, cancellationToken).ConfigureAwait(false);
taskContext.IsTerminated = true;
this._currentSpeakerExecutorId = null;
}
private const string CurrentTurnEmitUpdateEventsKey = nameof(CurrentTurnEmitUpdateEventsKey);
protected internal override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
Task contextStateTask = this._taskContext == null
@@ -325,14 +361,21 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List<AIAgent> team, Ta
cancellationToken: cancellationToken)
.AsTask();
Task currentSpeakerTask = context.QueueStateUpdateAsync(MagenticConstants.CurrentSpeakerStateKey,
this._currentSpeakerExecutorId,
cancellationToken: cancellationToken)
.AsTask();
await Task.WhenAll(base.OnCheckpointingAsync(context, cancellationToken).AsTask(),
contextStateTask).ConfigureAwait(false);
contextStateTask,
currentSpeakerTask).ConfigureAwait(false);
}
protected internal override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
await Task.WhenAll(base.OnCheckpointRestoredAsync(context, cancellationToken).AsTask(), LoadContextStateAsync())
.ConfigureAwait(false);
await Task.WhenAll(base.OnCheckpointRestoredAsync(context, cancellationToken).AsTask(),
LoadContextStateAsync(),
LoadCurrentSpeakerAsync()).ConfigureAwait(false);
async Task LoadContextStateAsync()
{
@@ -344,5 +387,11 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List<AIAgent> team, Ta
this._taskContext = new MagenticTaskContext(state, team, limits, []);
}
}
async Task LoadCurrentSpeakerAsync()
{
this._currentSpeakerExecutorId = await context.ReadStateAsync<string?>(MagenticConstants.CurrentSpeakerStateKey, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
}
@@ -419,6 +419,82 @@ public class MagenticOrchestrationTests
"final-answer synthesis must see what participants actually said");
}
[Fact]
public async Task Participant_Receives_Prior_Participant_Response_Not_InstructionAsync()
{
// Regression: each participant must see prior participants' *responses* (the running conversation),
// not their *instructions*. Previously the orchestrator broadcast the per-round instruction to every
// participant (untargeted fan-out) and never broadcast replies, so a later speaker received the earlier
// speaker's instruction and never its answer.
const string HealthInstruction = "HEALTH_CHECKER_INSTRUCTION_check_framework";
const string DatabaseInstruction = "DATABASE_CHECKER_INSTRUCTION_check_database";
const string HealthEchoPrefix = "HC_RESPONSE::";
const string DatabaseEchoPrefix = "DB_RESPONSE::";
List<ChatMessage> facts = CreatePlanResponse("Facts");
List<ChatMessage> plan = CreatePlanResponse("Plan");
List<ChatMessage> round1Ledger = CreateProgressLedgerResponse(
isRequestSatisfied: false,
isInLoop: false,
isProgressBeingMade: true,
nextSpeaker: "HealthChecker",
instructionOrQuestion: HealthInstruction);
List<ChatMessage> round2Ledger = CreateProgressLedgerResponse(
isRequestSatisfied: false,
isInLoop: false,
isProgressBeingMade: true,
nextSpeaker: "DatabaseChecker",
instructionOrQuestion: DatabaseInstruction);
List<ChatMessage> round3Ledger = CreateProgressLedgerResponse(
isRequestSatisfied: true,
isInLoop: false,
isProgressBeingMade: true,
nextSpeaker: "DatabaseChecker",
instructionOrQuestion: "Done");
List<ChatMessage> finalAnswer = CreateFinalAnswerResponse("All systems checked");
TestReplayAgent manager = new(
[facts, plan, round1Ledger, round2Ledger, round3Ledger, finalAnswer],
name: "Manager");
RecordingEchoAgent healthChecker = new(name: "HealthChecker", prefix: HealthEchoPrefix);
RecordingEchoAgent databaseChecker = new(name: "DatabaseChecker", prefix: DatabaseEchoPrefix);
Workflow workflow = new MagenticWorkflowBuilder(manager)
.AddParticipants(healthChecker, databaseChecker)
.RequirePlanSignoff(false)
.Build();
WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
workflow,
[new ChatMessage(ChatRole.User, "Check system health")]);
runResult.Result.Should().NotBeNull();
runResult.Result![0].Text.Should().Contain("All systems checked");
// Each participant takes exactly one turn.
healthChecker.RecordedInputs.Should().ContainSingle();
databaseChecker.RecordedInputs.Should().ContainSingle();
// The first speaker receives its own instruction.
List<ChatMessage> healthInput = healthChecker.RecordedInputs[0];
healthInput.Should().Contain(m => m.Text.Contains(HealthInstruction), "the first speaker receives its own instruction");
// The second speaker must see the first speaker's RESPONSE (authored by HealthChecker, carrying the echo
// prefix that only the response — not the raw instruction — has), plus its own instruction.
List<ChatMessage> databaseInput = databaseChecker.RecordedInputs[0];
databaseInput.Should().Contain(
m => m.AuthorName == "HealthChecker" && m.Text.Contains(HealthEchoPrefix),
"the next speaker must receive the prior participant's response (the running conversation)");
databaseInput.Should().Contain(m => m.Text.Contains(DatabaseInstruction),
"the next speaker must receive its own instruction");
// The leaked-instruction bug: the second speaker must not receive HealthChecker's instruction as a
// bare message (it should only appear, if at all, embedded in HealthChecker's prefixed response).
databaseInput.Should().NotContain(
m => m.AuthorName != "HealthChecker" && m.Text.Trim() == HealthInstruction,
"the prior speaker's instruction must not leak into the next speaker's context as a standalone message");
}
[Fact]
public async Task PlanReview_Revised_Triggers_ReplanAsync()
{
@@ -0,0 +1,37 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using Microsoft.Extensions.AI;
namespace Microsoft.Agents.AI.Workflows.UnitTests;
/// <summary>
/// A <see cref="TestEchoAgent"/> that records the input messages it receives on each call.
/// Used by tests that need to assert what context a participant was actually handed - for example,
/// that a later speaker sees prior participants' <em>responses</em> (the running conversation) rather
/// than their <em>instructions</em>.
/// </summary>
internal sealed class RecordingEchoAgent(string? id = null, string? name = null, string? prefix = null)
: TestEchoAgent(id, name, prefix)
{
public List<List<ChatMessage>> RecordedInputs { get; } = [];
protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session = null,
AgentRunOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Materialize once so the deferred input is recorded and replayed identically.
List<ChatMessage> recorded = messages.ToList();
this.RecordedInputs.Add(recorded);
await foreach (AgentResponseUpdate update in base.RunCoreStreamingAsync(recorded, session, options, cancellationToken))
{
yield return update;
}
}
}