diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs
index 7c1e801ab1..4470c4ee9a 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
+using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
@@ -140,7 +141,15 @@ public class MagenticWorkflowBuilder(AIAgent managerAgent)
}
///
- public Workflow Build() => this.ReduceToWorkflowBuilder().Build();
+ public Workflow Build()
+ {
+ if (this._team.Count == 0)
+ {
+ throw new InvalidOperationException("At least one participant must be added via AddParticipants() before building the workflow.");
+ }
+
+ return this.ReduceToWorkflowBuilder().Build();
+ }
private TaskLimits Limits => new(
MaxRoundCount: this._maxRounds,
diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticOrchestrator.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticOrchestrator.cs
index a08e0b542e..30a93c6850 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticOrchestrator.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticOrchestrator.cs
@@ -101,6 +101,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta
return base.ConfigureProtocol(protocolBuilder)
.SendsMessage()
.SendsMessage()
+ .YieldsOutput>()
.ConfigureRoutes(ConfigureRoutes);
void ConfigureRoutes(RouteBuilder routeBuilder) => routeBuilder.AddPortHandler(
@@ -109,7 +110,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta
out this._planReviewPort);
}
- private ValueTask SubmitPlanReviewRequestAsync(MagenticTaskContext taskContext, IWorkflowContext workflowContext)
+ private ValueTask SubmitPlanReviewRequestAsync(MagenticTaskContext taskContext, IWorkflowContext workflowContext, bool replanAfterStall = false)
{
MagenticProgressLedger? progressLedger = taskContext.ProgressLedger;
if (progressLedger?.IsStarted is not true)
@@ -117,7 +118,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta
progressLedger = null;
}
- MagenticPlanReviewRequest request = new(taskContext.TaskLedger!.CurrentPlan, progressLedger, taskContext.IsStalled);
+ MagenticPlanReviewRequest request = new(taskContext.TaskLedger!.CurrentPlan, progressLedger, replanAfterStall);
return this._planReviewPort!.PostRequestAsync(request);
}
@@ -146,7 +147,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta
if (this._taskContext.IsTerminated)
{
- throw new InvalidOperationException("Magentic Orchestration has already been terminated and cannot process new messages. Please start a new session.");
+ throw new InvalidOperationException("This Magentic orchestration has already terminated. To process new messages, create a new workflow instance.");
}
if (response.IsApproved)
@@ -161,7 +162,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta
}
}
- private async ValueTask UpdatePlanAndDelegateAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken)
+ private async ValueTask UpdatePlanAndDelegateAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken, bool replanAfterStall = false)
{
bool isReplan = taskContext.TaskLedger != null;
@@ -177,7 +178,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta
if (requirePlanSignoff)
{
- await this.SubmitPlanReviewRequestAsync(taskContext, context).ConfigureAwait(false);
+ await this.SubmitPlanReviewRequestAsync(taskContext, context, replanAfterStall).ConfigureAwait(false);
}
else
{
@@ -187,9 +188,22 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta
protected override async ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)
{
- // First Turn: Initialize the task context and send the initial messages to the planner agent
- this._taskContext ??= new(messages, team, limits, emitEvents, []);
- await this.UpdatePlanAndDelegateAsync(this._taskContext, context, cancellationToken).ConfigureAwait(false);
+ if (this._taskContext?.IsTerminated == true)
+ {
+ throw new InvalidOperationException("This Magentic orchestration has already terminated. To process new messages, create a new workflow instance.");
+ }
+
+ if (this._taskContext == null)
+ {
+ // First Turn: Initialize the task context and create the initial plan
+ this._taskContext = new(messages, team, limits, emitEvents, []);
+ await this.UpdatePlanAndDelegateAsync(this._taskContext, context, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ // Subsequent turns: agent returned control, go directly to coordination (progress ledger only, no replan)
+ await this.RunCoordinationRoundAsync(this._taskContext, context, cancellationToken).ConfigureAwait(false);
+ }
}
private ChatMessage? _fullTaskLedgerMessage;
@@ -288,10 +302,11 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta
private async ValueTask ResetAndReplanAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken)
{
+ bool wasStalled = taskContext.IsStalled;
taskContext.Reset();
await context.SendMessageAsync(new ResetChatSignal(), cancellationToken: cancellationToken).ConfigureAwait(false);
- await this.UpdatePlanAndDelegateAsync(taskContext, context, cancellationToken).ConfigureAwait(false);
+ await this.UpdatePlanAndDelegateAsync(taskContext, context, cancellationToken, replanAfterStall: wasStalled).ConfigureAwait(false);
}
private async ValueTask PrepareFinalAnswerAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken)
diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticTaskContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticTaskContext.cs
index 0db289126e..36b3a070e2 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticTaskContext.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticTaskContext.cs
@@ -58,7 +58,7 @@ internal class MagenticTaskContext(List taskDefinition, List this.TaskCounters.StallCount >= this.TaskLimits.MaxStallCount;
+ public bool IsStalled => this.TaskCounters.StallCount > this.TaskLimits.MaxStallCount;
public (bool HitRoundLimit, bool HitResetLimit) CheckLimits()
{
diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticOrchestrationTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticOrchestrationTests.cs
new file mode 100644
index 0000000000..937e047886
--- /dev/null
+++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticOrchestrationTests.cs
@@ -0,0 +1,1407 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Text.Json;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Microsoft.Agents.AI.Workflows.InProc;
+using Microsoft.Agents.AI.Workflows.Specialized.Magentic;
+using Microsoft.Extensions.AI;
+
+namespace Microsoft.Agents.AI.Workflows.UnitTests;
+
+///
+/// End-to-end tests for the Magentic orchestrator workflow.
+///
+public class MagenticOrchestrationTests
+{
+ [Fact]
+ public async Task Task_Completes_When_RequestSatisfiedAsync()
+ {
+ // Arrange: Manager reports task satisfied on first coordination round
+ // Each response must have unique message IDs, so create separate instances
+ List factsResponse = CreatePlanResponse("Facts about the task");
+ List planResponse = CreatePlanResponse("Step 1: Do the task");
+ List progressLedgerResponse = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Complete the task");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Task completed successfully!");
+
+ TestReplayAgent manager = new(
+ [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, "Do the task")]);
+
+ // Assert: Check the result contains the final answer
+ runResult.Result.Should().NotBeNull();
+ runResult.Result.Should().ContainSingle();
+ runResult.Result![0].Text.Should().Contain("Task completed successfully!");
+ runResult.PendingRequests.Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task PlanReview_Approved_ProceedsAsync()
+ {
+ // Arrange: Human approves initial plan
+ List factsResponse = CreatePlanResponse("Facts about executing the plan");
+ List planResponse = CreatePlanResponse("Step 1: Execute the plan");
+ List progressLedgerResponse = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Execute");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Plan executed successfully");
+
+ TestReplayAgent manager = new(
+ [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(true)
+ .Build();
+
+ CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
+
+ // Act: First run - should pause for plan review
+ WorkflowRunResult firstResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Execute plan")],
+ checkpointManager: checkpointManager);
+
+ firstResult.PendingRequests.Should().ContainSingle();
+ ExternalRequest request = firstResult.PendingRequests[0].Request;
+ MagenticPlanReviewRequest? reviewRequest = request.Data.As();
+ reviewRequest.Should().NotBeNull();
+ reviewRequest!.Plan.Text.Should().Contain("Execute the plan");
+
+ // Act: Resume with approval
+ MagenticPlanReviewResponse approval = reviewRequest.Approve();
+ ExternalResponse response = request.CreateResponse(approval);
+ WorkflowRunResult secondResult = await ResumeMagenticWorkflowAsync(
+ workflow,
+ response,
+ checkpointManager,
+ firstResult.LastCheckpoint);
+
+ // Assert
+ secondResult.Result.Should().NotBeNull();
+ secondResult.Result![0].Text.Should().Contain("Plan executed successfully");
+ }
+
+ [Fact]
+ public async Task Initial_Plan_Emits_PlanCreatedEventAsync()
+ {
+ // Arrange
+ List factsResponse = CreatePlanResponse("Facts about the task");
+ List planResponse = CreatePlanResponse("Step 1: Initial plan");
+ List progressLedgerResponse = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Execute");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Done");
+
+ TestReplayAgent manager = new(
+ [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")],
+ eventCollector: collectedEvents);
+
+ // Assert
+ collectedEvents.OfType().Should().NotBeEmpty();
+ MagenticPlanCreatedEvent planEvent = collectedEvents.OfType().First();
+ planEvent.FullTaskLedger.Should().NotBeNull();
+ }
+
+ [Fact]
+ public async Task NextSpeaker_Invalid_Triggers_FinalAnswerAsync()
+ {
+ // Arrange: ProgressLedger returns invalid next_speaker
+ List factsResponse = CreatePlanResponse("Facts about the task");
+ List planResponse = CreatePlanResponse("Step 1: Execute");
+ List invalidNextSpeakerLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "NonExistentAgent", // Invalid - doesn't match any team member
+ instructionOrQuestion: "Continue");
+ List finalAnswer = CreateFinalAnswerResponse("Forced to conclude due to invalid speaker");
+
+ TestReplayAgent manager = new(
+ [factsResponse, planResponse, invalidNextSpeakerLedger, finalAnswer],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")],
+ eventCollector: collectedEvents);
+
+ // Assert: Warning should be emitted and final answer prepared
+ collectedEvents.OfType()
+ .Should().Contain(e => e.Data != null && e.Data.ToString()!.Contains("Invalid next speaker"));
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Forced to conclude");
+ }
+
+ [Fact]
+ public async Task ProgressLedger_Updated_Event_EmittedAsync()
+ {
+ // Arrange
+ List factsResponse = CreatePlanResponse("Facts about the task");
+ List planResponse = CreatePlanResponse("Step 1: Execute");
+ List progressLedgerResponse = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Execute");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Done");
+
+ TestReplayAgent manager = new(
+ [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")],
+ eventCollector: collectedEvents);
+
+ // Assert
+ collectedEvents.OfType().Should().NotBeEmpty();
+ MagenticProgressLedgerUpdatedEvent ledgerEvent = collectedEvents.OfType().First();
+ ledgerEvent.ProgressLedger.Should().NotBeNull();
+ ledgerEvent.ProgressLedger.IsRequestSatisfied.Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task PlanSignoff_Disabled_Proceeds_ImmediatelyAsync()
+ {
+ // Arrange: requirePlanSignoff=false should mean no plan review request
+ List factsResponse = CreatePlanResponse("Task facts");
+ List planResponse = CreatePlanResponse("Step 1: Execute immediately");
+ List progressLedgerResponse = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Go");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Immediate completion");
+
+ TestReplayAgent manager = new(
+ [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do it now")],
+ eventCollector: collectedEvents);
+
+ // Assert: No plan review request, workflow completes immediately
+ runResult.PendingRequests.Should().BeEmpty("plan signoff is disabled, so no review should be requested");
+ collectedEvents.OfType().Should().BeEmpty();
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Immediate completion");
+ }
+
+ [Fact]
+ public async Task NextSpeaker_Empty_Falls_Back_To_FirstAsync()
+ {
+ // Arrange: First progress ledger returns empty next_speaker, which should fall back to first participant.
+ // Round 1: empty speaker → fallback to Worker (first participant) → Worker echoes
+ // Round 2 (after Worker responds): RunCoordinationRoundAsync → satisfied ledger → final answer
+ // Note: No replan on agent return — only progress ledger is created on subsequent turns.
+ List factsResponse1 = CreatePlanResponse("Facts about the task");
+ List planResponse1 = CreatePlanResponse("Step 1: Execute");
+ List emptyNextSpeakerLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "", // Empty - should fall back to first participant
+ instructionOrQuestion: "Please help with this task");
+
+ // Round 2: satisfied ledger + final answer (no replan on normal agent return)
+ List satisfiedLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Task completed after fallback");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, emptyNextSpeakerLedger,
+ satisfiedLedger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do the task")],
+ eventCollector: collectedEvents);
+
+ // Assert: Warning about empty next speaker should be emitted
+ collectedEvents.OfType()
+ .Should().Contain(e => e.Data != null && e.Data.ToString()!.Contains("empty"));
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Task completed after fallback");
+ }
+
+ [Fact]
+ public async Task Task_Completes_After_Multiple_RoundsAsync()
+ {
+ // Arrange: Round 1 delegates to Worker (not satisfied), round 2 completes
+ // Manager turn sequence: facts1, plan1, ledger1(not satisfied), ledger2(satisfied), finalAnswer
+ // Note: No replan on agent return — only progress ledger is created on subsequent turns.
+ List factsResponse1 = CreatePlanResponse("Initial facts");
+ List planResponse1 = CreatePlanResponse("Step 1: Delegate to worker");
+ List round1Ledger = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Please work on the task");
+
+ List round2Ledger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Task is done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Multi-round task completed!");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, round1Ledger,
+ round2Ledger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Complex multi-round task")],
+ eventCollector: collectedEvents);
+
+ // Assert: One plan created, one progress ledger per round, final answer
+ collectedEvents.OfType().Should().HaveCount(2);
+ collectedEvents.OfType().Should().ContainSingle("only one initial plan, no replan on agent return");
+ collectedEvents.OfType().Should().BeEmpty("no replan occurs on normal agent return");
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Multi-round task completed!");
+ }
+
+ [Fact]
+ public async Task PlanReview_Revised_Triggers_ReplanAsync()
+ {
+ // Arrange: Human rejects initial plan with revision, triggering a replan.
+ // Flow: facts1, plan1 → PlanCreatedEvent → plan review (pending)
+ // resume with revision → facts2, plan2 → MagenticReplannedEvent → plan review again (pending)
+ // resume with approval → progressLedger(satisfied) → finalAnswer
+ List factsResponse1 = CreatePlanResponse("Initial facts");
+ List planResponse1 = CreatePlanResponse("Initial plan - needs revision");
+ List factsResponse2 = CreatePlanResponse("Revised facts");
+ List planResponse2 = CreatePlanResponse("Revised plan - much better");
+ List progressLedgerResponse = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Execute revised plan");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Revised plan executed successfully");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, factsResponse2, planResponse2, progressLedgerResponse, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(true)
+ .Build();
+
+ CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
+ List allEvents = [];
+
+ // Act 1: First run - should pause for plan review with initial plan
+ WorkflowRunResult firstResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Execute task")],
+ checkpointManager: checkpointManager,
+ eventCollector: allEvents);
+
+ firstResult.PendingRequests.Should().ContainSingle();
+ ExternalRequest request1 = firstResult.PendingRequests[0].Request;
+ MagenticPlanReviewRequest? reviewRequest1 = request1.Data.As();
+ reviewRequest1.Should().NotBeNull();
+ reviewRequest1!.Plan.Text.Should().Contain("Initial plan");
+
+ // Act 2: Resume with revision (reject the plan)
+ MagenticPlanReviewResponse revision = reviewRequest1.Revise("Please include more detail");
+ ExternalResponse revisionResponse = request1.CreateResponse(revision);
+ WorkflowRunResult secondResult = await ResumeMagenticWorkflowAsync(
+ workflow,
+ revisionResponse,
+ checkpointManager,
+ firstResult.LastCheckpoint,
+ eventCollector: allEvents);
+
+ // Should pause again for review of the revised plan (stream may include prior request too)
+ secondResult.PendingRequests.Should().NotBeEmpty();
+ ExternalRequest request2 = secondResult.PendingRequests[^1].Request;
+ MagenticPlanReviewRequest? reviewRequest2 = request2.Data.As();
+ reviewRequest2.Should().NotBeNull();
+ reviewRequest2!.Plan.Text.Should().Contain("Revised plan");
+
+ // Act 3: Resume with approval
+ MagenticPlanReviewResponse approval = reviewRequest2.Approve();
+ ExternalResponse approvalResponse = request2.CreateResponse(approval);
+ WorkflowRunResult thirdResult = await ResumeMagenticWorkflowAsync(
+ workflow,
+ approvalResponse,
+ checkpointManager,
+ secondResult.LastCheckpoint,
+ eventCollector: allEvents);
+
+ // Assert: MagenticReplannedEvent should have been emitted, and final answer produced
+ allEvents.OfType().Should().NotBeEmpty("initial plan emits PlanCreatedEvent");
+ allEvents.OfType().Should().NotBeEmpty("revision triggers ReplannedEvent");
+ thirdResult.Result.Should().NotBeNull();
+ thirdResult.Result![0].Text.Should().Contain("Revised plan executed successfully");
+ }
+
+ [Fact]
+ public async Task MaxRoundLimit_Terminates_WorkflowAsync()
+ {
+ // Arrange: MaxRounds=1, so round 1 delegates to Worker, round 2 hits limit and terminates.
+ // Manager turns: facts1, plan1, ledger1(not satisfied→delegates), then limit hit before ledger.
+ List factsResponse1 = CreatePlanResponse("Facts");
+ List planResponse1 = CreatePlanResponse("Plan");
+ List round1Ledger = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Work on it");
+
+ // No more turns needed: RunCoordinationRoundAsync hits round limit before calling UpdateProgressLedgerAsync
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, round1Ledger],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .WithMaxRounds(1)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")]);
+
+ // Assert: Workflow terminates with round limit message
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("maximum round count limit");
+ }
+
+ [Fact]
+ public async Task MaxStallCount_Triggers_ResetAsync()
+ {
+ // Arrange: MaxStallCount=0, so one stall (isInLoop=true, StallCount=1 > 0) triggers ResetAndReplanAsync.
+ // Flow: facts1, plan1 → round1 ledger(stall: isInLoop=true) → StallCount=1 → IsStalled → Reset
+ // → facts2, plan2 (replan) → round2 ledger(satisfied) → finalAnswer
+ List factsResponse1 = CreatePlanResponse("Initial facts");
+ List planResponse1 = CreatePlanResponse("Initial plan");
+ List stalledLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: true, // This triggers stall
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Keep trying");
+
+ // After reset: ResetAndReplanAsync → UpdatePlanAndDelegateAsync → new plan
+ List factsResponse2 = CreatePlanResponse("Fresh facts after reset");
+ List planResponse2 = CreatePlanResponse("Fresh plan after reset");
+ List satisfiedLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after stall reset");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, stalledLedger,
+ factsResponse2, planResponse2, satisfiedLedger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .WithMaxStalls(0) // One stall triggers reset (StallCount 1 > 0)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")],
+ eventCollector: collectedEvents);
+
+ // Assert: MagenticReplannedEvent should be emitted (reset triggers replan), final answer produced
+ collectedEvents.OfType().Should().NotBeEmpty("initial plan created");
+ collectedEvents.OfType().Should().NotBeEmpty("stall triggers reset and replan");
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Recovered after stall reset");
+ }
+
+ [Fact]
+ public async Task Instruction_Message_Sent_When_PresentAsync()
+ {
+ // Arrange: Progress ledger has a non-empty instruction_or_question.
+ // The orchestrator should send the instruction as a ChatMessage before delegating to the next agent.
+ // After Worker echoes, the second round completes.
+ List factsResponse1 = CreatePlanResponse("Facts about the task");
+ List planResponse1 = CreatePlanResponse("Step 1: Instruct the worker");
+ List ledgerWithInstruction = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Please analyze the data carefully");
+
+ // Round 2 after Worker responds (no replan, just progress ledger)
+ List satisfiedLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Task completed with instruction");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, ledgerWithInstruction,
+ satisfiedLedger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Analyze data")],
+ eventCollector: collectedEvents);
+
+ // Assert: The workflow completed successfully, proving the instruction path executed without error.
+ // The update text should contain the instruction text since it is sent to participants as a ChatMessage.
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Task completed with instruction");
+ // Verify the delegation happened (two progress ledger events for two rounds)
+ collectedEvents.OfType().Should().HaveCount(2);
+ }
+
+ [Fact]
+ public async Task PlanReview_On_Stall_ReplanAsync()
+ {
+ // Arrange: Plan signoff enabled, stall triggers reset, replan requires new plan review.
+ // Flow: facts1, plan1 → PlanCreatedEvent → plan review (pending)
+ // resume with approval → ledger1(stall: isInLoop=true) → StallCount=1 → IsStalled → Reset
+ // → facts2, plan2 → MagenticReplannedEvent → plan review again (pending)
+ // resume with approval → ledger2(satisfied) → finalAnswer
+ List factsResponse1 = CreatePlanResponse("Initial facts");
+ List planResponse1 = CreatePlanResponse("Initial plan");
+ List stalledLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: true, // This triggers stall
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Keep trying");
+
+ // After reset: new plan
+ List factsResponse2 = CreatePlanResponse("Fresh facts after stall reset");
+ List planResponse2 = CreatePlanResponse("Fresh plan after stall reset");
+ // After second approval: satisfied ledger + final answer
+ List satisfiedLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after stall with plan review");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, stalledLedger,
+ factsResponse2, planResponse2, satisfiedLedger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(true)
+ .WithMaxStalls(0)
+ .Build();
+
+ CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
+ List allEvents = [];
+
+ // Act 1: First run - should pause for initial plan review
+ WorkflowRunResult firstResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")],
+ checkpointManager: checkpointManager,
+ eventCollector: allEvents);
+
+ firstResult.PendingRequests.Should().ContainSingle();
+ ExternalRequest request1 = firstResult.PendingRequests[0].Request;
+ MagenticPlanReviewRequest? reviewRequest1 = request1.Data.As();
+ reviewRequest1.Should().NotBeNull();
+ reviewRequest1!.Plan.Text.Should().Contain("Initial plan");
+ reviewRequest1.IsStalled.Should().BeFalse("the initial plan review is not stall-triggered");
+
+ // Act 2: Approve initial plan → stall occurs → reset → replan → new plan review
+ MagenticPlanReviewResponse approval1 = reviewRequest1.Approve();
+ ExternalResponse approvalResponse1 = request1.CreateResponse(approval1);
+ WorkflowRunResult secondResult = await ResumeMagenticWorkflowAsync(
+ workflow,
+ approvalResponse1,
+ checkpointManager,
+ firstResult.LastCheckpoint,
+ eventCollector: allEvents);
+
+ // Should pause for review of the replanned plan
+ secondResult.PendingRequests.Should().NotBeEmpty();
+ ExternalRequest request2 = secondResult.PendingRequests[^1].Request;
+ MagenticPlanReviewRequest? reviewRequest2 = request2.Data.As();
+ reviewRequest2.Should().NotBeNull();
+ reviewRequest2!.Plan.Text.Should().Contain("Fresh plan after stall reset");
+ reviewRequest2.IsStalled.Should().BeTrue("the replan was triggered by a stall");
+
+ // Act 3: Approve the revised plan → satisfied → final answer
+ MagenticPlanReviewResponse approval2 = reviewRequest2.Approve();
+ ExternalResponse approvalResponse2 = request2.CreateResponse(approval2);
+ WorkflowRunResult thirdResult = await ResumeMagenticWorkflowAsync(
+ workflow,
+ approvalResponse2,
+ checkpointManager,
+ secondResult.LastCheckpoint,
+ eventCollector: allEvents);
+
+ // Assert
+ allEvents.OfType().Should().NotBeEmpty("initial plan emits PlanCreatedEvent");
+ allEvents.OfType().Should().NotBeEmpty("stall reset triggers ReplannedEvent");
+ thirdResult.Result.Should().NotBeNull();
+ thirdResult.Result![0].Text.Should().Contain("Recovered after stall with plan review");
+ }
+
+ [Fact]
+ public async Task MaxResetLimit_Terminates_WorkflowAsync()
+ {
+ // Arrange: MaxStallCount=0, MaxResets=1.
+ // Flow: facts1, plan1 → ledger1(stall: isInLoop=true) → StallCount=1 > 0 → IsStalled → ResetAndReplanAsync
+ // → ResetCount becomes 1 → facts2, plan2 → DelegateToTeamAsync
+ // → RunCoordinationRoundAsync: CheckLimits() detects ResetCount(1) >= MaxResetCount(1) → terminates
+ List factsResponse1 = CreatePlanResponse("Initial facts");
+ List planResponse1 = CreatePlanResponse("Initial plan");
+ List stalledLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: true, // This triggers stall
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Keep trying");
+
+ // After reset: ResetAndReplanAsync → UpdatePlanAndDelegateAsync → new plan
+ List factsResponse2 = CreatePlanResponse("Fresh facts after reset");
+ List planResponse2 = CreatePlanResponse("Fresh plan after reset");
+ // No more turns needed: RunCoordinationRoundAsync hits reset limit before calling UpdateProgressLedgerAsync
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, stalledLedger,
+ factsResponse2, planResponse2],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .WithMaxStalls(0) // One stall triggers reset (StallCount 1 > 0)
+ .WithMaxResets(1) // One reset triggers termination
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")]);
+
+ // Assert: Workflow terminates with reset limit message
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("maximum reset count limit");
+ }
+
+ [Fact]
+ public async Task ProgressLedger_Retry_On_Parse_FailureAsync()
+ {
+ // Arrange: First progress ledger attempt returns invalid JSON (triggers parse failure + warning),
+ // second attempt returns valid JSON (satisfied=true).
+ // Manager turn sequence: facts, plan, INVALID_JSON, VALID_LEDGER(satisfied), finalAnswer
+ // MagenticManager.UpdateProgressLedgerAsync retries internally: attempt 0 fails, attempt 1 succeeds.
+ List factsResponse = CreatePlanResponse("Facts about the task");
+ List planResponse = CreatePlanResponse("Step 1: Execute");
+ List invalidLedgerResponse = CreatePlanResponse("This is not valid JSON for a progress ledger");
+ List validLedgerResponse = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done after retry");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Completed after ledger retry");
+
+ TestReplayAgent manager = new(
+ [factsResponse, planResponse, invalidLedgerResponse, validLedgerResponse, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")],
+ eventCollector: collectedEvents);
+
+ // Assert: Warning emitted for parse failure, but workflow completes successfully
+ collectedEvents.OfType()
+ .Should().Contain(e => e.Data != null && e.Data.ToString()!.Contains("Progress ledger JSON parse failed"));
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Completed after ledger retry");
+ }
+
+ [Fact]
+ public async Task ProgressLedger_Max_Retries_Triggers_ResetAsync()
+ {
+ // Arrange: All 3 progress ledger retry attempts return invalid JSON → exception → ResetAndReplanAsync.
+ // After reset: new plan, valid ledger (satisfied), final answer.
+ // Turn sequence: facts1, plan1, invalidJSON×3, facts2, plan2, validLedger(satisfied), finalAnswer
+ List factsResponse1 = CreatePlanResponse("Initial facts");
+ List planResponse1 = CreatePlanResponse("Initial plan");
+ List invalidLedger1 = CreatePlanResponse("not json at all");
+ List invalidLedger2 = CreatePlanResponse("still not json");
+ List invalidLedger3 = CreatePlanResponse("definitely not json");
+
+ // After reset: ResetAndReplanAsync → UpdatePlanAndDelegateAsync → new plan
+ List factsResponse2 = CreatePlanResponse("Fresh facts after reset");
+ List planResponse2 = CreatePlanResponse("Fresh plan after reset");
+ List validLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done after reset");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after max retries reset");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, invalidLedger1, invalidLedger2, invalidLedger3,
+ factsResponse2, planResponse2, validLedger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")],
+ eventCollector: collectedEvents);
+
+ // Assert: Parse failure warnings emitted, reset triggered (ReplannedEvent), workflow completes
+ collectedEvents.OfType()
+ .Where(e => e.Data?.ToString()?.Contains("Progress ledger JSON parse failed") == true)
+ .Should().HaveCountGreaterThanOrEqualTo(3, "all 3 retry attempts should emit warnings");
+ collectedEvents.OfType()
+ .Should().Contain(e => e.Data != null && e.Data.ToString()!.Contains("triggering reset"));
+ collectedEvents.OfType().Should().NotBeEmpty("reset triggers replan");
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Recovered after max retries reset");
+ }
+
+ [Fact]
+ public async Task Stall_NoProgress_Increments_StallCountAsync()
+ {
+ // Arrange: MaxStallCount=0, progress ledger reports IsProgressBeingMade=false (not IsInLoop).
+ // This exercises the alternative stall trigger: !IsProgressBeingMade.
+ // Flow: facts1, plan1 → ledger1(IsProgressBeingMade=false) → StallCount=1 > 0 → IsStalled → Reset
+ // → facts2, plan2 (replan) → ledger2(satisfied) → finalAnswer
+ List factsResponse1 = CreatePlanResponse("Initial facts");
+ List planResponse1 = CreatePlanResponse("Initial plan");
+ List noProgressLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false, // Not in loop
+ isProgressBeingMade: false, // But no progress → stall
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Keep trying");
+
+ // After reset: ResetAndReplanAsync → UpdatePlanAndDelegateAsync → new plan
+ List factsResponse2 = CreatePlanResponse("Fresh facts after no-progress reset");
+ List planResponse2 = CreatePlanResponse("Fresh plan after no-progress reset");
+ List satisfiedLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after no-progress stall");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, noProgressLedger,
+ factsResponse2, planResponse2, satisfiedLedger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .WithMaxStalls(0)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Do task")],
+ eventCollector: collectedEvents);
+
+ // Assert: Stall detected via no-progress, reset triggered, replan emitted, workflow completes
+ collectedEvents.OfType().Should().NotBeEmpty("no-progress stall triggers reset and replan");
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Recovered after no-progress stall");
+ }
+
+ [Fact]
+ public async Task Task_Delegates_To_Correct_AgentAsync()
+ {
+ // Arrange: Two participants (WorkerA, WorkerB). Manager selects "WorkerA" as next speaker.
+ // We verify that WorkerA produces a response update event and WorkerB does not.
+ // Flow: facts1, plan1 → ledger1(nextSpeaker=WorkerA, not satisfied) → WorkerA runs
+ // → RunCoordinationRoundAsync (no replan) → ledger2(satisfied) → finalAnswer
+ List factsResponse1 = CreatePlanResponse("Task delegation facts");
+ List planResponse1 = CreatePlanResponse("Delegate to WorkerA");
+ List ledger1 = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "WorkerA",
+ instructionOrQuestion: "WorkerA please handle this");
+
+ // After WorkerA responds, orchestrator goes directly to RunCoordinationRoundAsync (no replan)
+ List ledger2 = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "WorkerA",
+ instructionOrQuestion: "Done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Delegated correctly!");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1, ledger1,
+ ledger2, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent workerA = new(name: "WorkerA", prefix: "[A] ");
+ TestEchoAgent workerB = new(name: "WorkerB", prefix: "[B] ");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(workerA, workerB)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Test delegation routing")],
+ eventCollector: collectedEvents);
+
+ // Assert: WorkerA should have produced response updates, WorkerB should not
+ List agentUpdates = collectedEvents.OfType().ToList();
+
+ // WorkerA's executor should appear in the events
+ agentUpdates.Should().Contain(e => e.Update.AuthorName == "WorkerA",
+ "WorkerA was selected as next speaker and should have responded");
+
+ // WorkerB should NOT have responded
+ agentUpdates.Should().NotContain(e => e.Update.AuthorName == "WorkerB",
+ "WorkerB was not selected and should not have responded");
+
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Delegated correctly!");
+ }
+
+ [Fact]
+ public async Task Progress_Made_Decrements_StallCountAsync()
+ {
+ // Arrange: MaxStallCount=3, so a single stall won't trigger reset.
+ // Round 1: isInLoop=true (stall count → 1), delegates to Worker
+ // Round 2: progress being made (stall count → 0), delegates to Worker
+ // Round 3: satisfied → final answer
+ // No reset should occur because the stall count was decremented before reaching threshold.
+ List facts1 = CreatePlanResponse("Initial facts");
+ List plan1 = CreatePlanResponse("Initial plan");
+ List ledger1 = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: true, // triggers stall increment → StallCount=1
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Keep trying");
+
+ // After Worker responds → RunCoordinationRoundAsync (no replan)
+ List ledger2 = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false,
+ isProgressBeingMade: true, // progress → stall count decrements → StallCount=0
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Good progress");
+
+ // After Worker responds → RunCoordinationRoundAsync (no replan)
+ List ledger3 = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "All done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Completed without reset!");
+
+ TestReplayAgent manager = new(
+ [facts1, plan1, ledger1,
+ ledger2,
+ ledger3, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .WithMaxStalls(3) // high threshold so single stall doesn't trigger reset
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Test stall decrement")],
+ eventCollector: collectedEvents);
+
+ // Assert: Three progress ledger updates, no stall-triggered reset
+ collectedEvents.OfType().Should().HaveCount(3,
+ "three coordination rounds should produce three progress ledger events");
+
+ // One initial plan, no replans (agent returns go directly to coordination, no replan)
+ collectedEvents.OfType().Should().ContainSingle(
+ "only one initial plan should be created");
+ collectedEvents.OfType().Should().BeEmpty(
+ "no replan occurs on normal agent return; stall count never exceeded threshold");
+
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Completed without reset!");
+ }
+
+ [Fact]
+ public async Task Consecutive_Stalls_Trigger_ResetAsync()
+ {
+ // Arrange: MaxStallCount=1 — two consecutive stalls trigger reset (StallCount 2 > 1).
+ // Round 1: isInLoop=true (stall count → 1), delegates to Worker
+ // Round 2: isProgressBeingMade=false (stall count → 2 > 1 → IsStalled) → reset & replan
+ // After reset: new plan → ledger(satisfied) → final answer
+ List facts1 = CreatePlanResponse("Initial facts");
+ List plan1 = CreatePlanResponse("Initial plan");
+ List ledger1 = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: true, // stall #1 → StallCount=1
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Keep trying");
+
+ // After Worker responds → RunCoordinationRoundAsync (no replan)
+ List ledger2 = CreateProgressLedgerResponse(
+ isRequestSatisfied: false,
+ isInLoop: false,
+ isProgressBeingMade: false, // stall #2 → StallCount=2 > 1 → IsStalled → reset
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "No progress");
+
+ // Reset & replan: new facts + plan, then coordination round
+ List resetFacts = CreatePlanResponse("Fresh facts after stall reset");
+ List resetPlan = CreatePlanResponse("Fresh plan after stall reset");
+ List postResetLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after consecutive stalls!");
+
+ TestReplayAgent manager = new(
+ [facts1, plan1, ledger1,
+ ledger2,
+ resetFacts, resetPlan, postResetLedger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ List collectedEvents = [];
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .WithMaxStalls(1) // requires 2 consecutive stalls (StallCount > 1)
+ .Build();
+
+ // Act
+ WorkflowRunResult runResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Test consecutive stalls")],
+ eventCollector: collectedEvents);
+
+ // Assert: Two pre-reset coordination rounds + one post-reset round = 3 ledger events
+ collectedEvents.OfType().Should().HaveCount(3,
+ "two pre-reset rounds and one post-reset round");
+
+ // One initial plan + one stall-triggered reset replan (no normal re-entry replans anymore)
+ collectedEvents.OfType().Should().ContainSingle();
+ collectedEvents.OfType().Should().ContainSingle(
+ "only one replan from stall-triggered reset; no replan on normal agent return");
+
+ runResult.Result.Should().NotBeNull();
+ runResult.Result![0].Text.Should().Contain("Recovered after consecutive stalls!");
+ }
+
+ [Fact]
+ public async Task PlanReview_Multiple_RevisionsAsync()
+ {
+ // Arrange: Human rejects the plan twice before approving on the third review.
+ // Flow: facts1, plan1 → PlanCreatedEvent → plan review (pending)
+ // resume with revision1 → facts2, plan2 → ReplannedEvent → plan review (pending)
+ // resume with revision2 → facts3, plan3 → ReplannedEvent → plan review (pending)
+ // resume with approval → ledger(satisfied) → finalAnswer
+ List factsResponse1 = CreatePlanResponse("Initial facts");
+ List planResponse1 = CreatePlanResponse("Initial plan - too vague");
+ List factsResponse2 = CreatePlanResponse("Revised facts v2");
+ List planResponse2 = CreatePlanResponse("Revised plan v2 - still needs work");
+ List factsResponse3 = CreatePlanResponse("Revised facts v3");
+ List planResponse3 = CreatePlanResponse("Revised plan v3 - final version");
+ List progressLedgerResponse = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Execute final plan");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Completed after multiple revisions");
+
+ TestReplayAgent manager = new(
+ [factsResponse1, planResponse1,
+ factsResponse2, planResponse2,
+ factsResponse3, planResponse3,
+ progressLedgerResponse, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(true)
+ .Build();
+
+ CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
+ List allEvents = [];
+
+ // Act 1: First run - should pause for plan review with initial plan
+ WorkflowRunResult firstResult = await RunMagenticWorkflowAsync(
+ workflow,
+ [new ChatMessage(ChatRole.User, "Execute task")],
+ checkpointManager: checkpointManager,
+ eventCollector: allEvents);
+
+ firstResult.PendingRequests.Should().ContainSingle();
+ ExternalRequest request1 = firstResult.PendingRequests[0].Request;
+ MagenticPlanReviewRequest? reviewRequest1 = request1.Data.As();
+ reviewRequest1.Should().NotBeNull();
+ reviewRequest1!.Plan.Text.Should().Contain("Initial plan");
+
+ // Act 2: Resume with first revision
+ MagenticPlanReviewResponse revision1 = reviewRequest1.Revise("Too vague, add more detail");
+ ExternalResponse revisionResponse1 = request1.CreateResponse(revision1);
+ WorkflowRunResult secondResult = await ResumeMagenticWorkflowAsync(
+ workflow,
+ revisionResponse1,
+ checkpointManager,
+ firstResult.LastCheckpoint,
+ eventCollector: allEvents);
+
+ secondResult.PendingRequests.Should().NotBeEmpty();
+ ExternalRequest request2 = secondResult.PendingRequests[^1].Request;
+ MagenticPlanReviewRequest? reviewRequest2 = request2.Data.As();
+ reviewRequest2.Should().NotBeNull();
+ reviewRequest2!.Plan.Text.Should().Contain("Revised plan v2");
+
+ // Act 3: Resume with second revision
+ MagenticPlanReviewResponse revision2 = reviewRequest2.Revise("Still needs more work on step 3");
+ ExternalResponse revisionResponse2 = request2.CreateResponse(revision2);
+ WorkflowRunResult thirdResult = await ResumeMagenticWorkflowAsync(
+ workflow,
+ revisionResponse2,
+ checkpointManager,
+ secondResult.LastCheckpoint,
+ eventCollector: allEvents);
+
+ thirdResult.PendingRequests.Should().NotBeEmpty();
+ ExternalRequest request3 = thirdResult.PendingRequests[^1].Request;
+ MagenticPlanReviewRequest? reviewRequest3 = request3.Data.As();
+ reviewRequest3.Should().NotBeNull();
+ reviewRequest3!.Plan.Text.Should().Contain("Revised plan v3");
+
+ // Act 4: Resume with approval
+ MagenticPlanReviewResponse approval = reviewRequest3.Approve();
+ ExternalResponse approvalResponse = request3.CreateResponse(approval);
+ WorkflowRunResult fourthResult = await ResumeMagenticWorkflowAsync(
+ workflow,
+ approvalResponse,
+ checkpointManager,
+ thirdResult.LastCheckpoint,
+ eventCollector: allEvents);
+
+ // Assert: Multiple replan events emitted, final answer produced
+ allEvents.OfType().Should().NotBeEmpty("initial plan emits PlanCreatedEvent");
+ allEvents.OfType().Should().HaveCountGreaterThanOrEqualTo(2,
+ "two revisions should emit at least two ReplannedEvents");
+ fourthResult.Result.Should().NotBeNull();
+ fourthResult.Result![0].Text.Should().Contain("Completed after multiple revisions");
+ }
+
+ [Fact]
+ public void Empty_Team_Build_Throws()
+ {
+ // Arrange: No participants added to the builder.
+ TestReplayAgent manager = new(
+ [CreatePlanResponse("Facts"), CreatePlanResponse("Plan")],
+ name: "Manager");
+
+ MagenticWorkflowBuilder builder = new MagenticWorkflowBuilder(manager)
+ // No .AddParticipants() — empty team
+ .RequirePlanSignoff(false);
+
+ // Act & Assert: Build() should throw because the team is empty.
+ Action buildAction = () => builder.Build();
+ buildAction.Should().Throw()
+ .WithMessage("*participant*");
+ }
+
+ [Fact]
+ public async Task Terminated_Context_Rejects_New_MessagesAsync()
+ {
+ // Arrange: Run a workflow to completion so IsTerminated=true, then send another message.
+ // The framework accepts the message (TrySendMessageAsync returns true), but Magentic
+ // should error out internally, surfacing as a WorkflowErrorEvent.
+ List factsResponse = CreatePlanResponse("Facts");
+ List planResponse = CreatePlanResponse("The plan");
+ List satisfiedLedger = CreateProgressLedgerResponse(
+ isRequestSatisfied: true,
+ isInLoop: false,
+ isProgressBeingMade: true,
+ nextSpeaker: "Worker",
+ instructionOrQuestion: "Done");
+ List finalAnswerResponse = CreateFinalAnswerResponse("Task done");
+
+ TestReplayAgent manager = new(
+ [factsResponse, planResponse, satisfiedLedger, finalAnswerResponse],
+ name: "Manager");
+ TestEchoAgent worker = new(name: "Worker");
+
+ Workflow workflow = new MagenticWorkflowBuilder(manager)
+ .AddParticipants(worker)
+ .RequirePlanSignoff(false)
+ .Build();
+
+ InProcessExecutionEnvironment environment = ExecutionEnvironment.InProcess_Lockstep
+ .ToWorkflowExecutionEnvironment()
+ .WithCheckpointing(CheckpointManager.CreateInMemory());
+
+ await using StreamingRun run = await environment.OpenStreamingAsync(workflow);
+
+ // Send the initial messages and run to completion
+ await run.TrySendMessageAsync(new List { new(ChatRole.User, "Do the task") });
+ await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
+
+ // Drain the stream to completion — workflow yields output and sets IsTerminated=true
+ WorkflowOutputEvent? output = null;
+ await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false).ConfigureAwait(false))
+ {
+ if (evt is WorkflowOutputEvent o)
+ {
+ output = o;
+ }
+ }
+
+ output.Should().NotBeNull("workflow should have completed with output");
+
+ // Act: Send a new message after termination — framework accepts it, but Magentic errors out
+ bool accepted = await run.TrySendMessageAsync(new List { new(ChatRole.User, "Another message") });
+ accepted.Should().BeTrue("framework does not have a terminal state — it always queues messages");
+
+ await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
+
+ // Watch for the error
+ WorkflowErrorEvent? errorEvent = null;
+ await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false).ConfigureAwait(false))
+ {
+ if (evt is WorkflowErrorEvent e)
+ {
+ errorEvent = e;
+ }
+ }
+
+ // Assert: Magentic should have rejected the message with an InvalidOperationException
+ // (may be wrapped in TargetInvocationException by the framework's reflection-based dispatch)
+ errorEvent.Should().NotBeNull("sending a message after termination should produce a WorkflowErrorEvent");
+ Exception actual = errorEvent!.Exception is System.Reflection.TargetInvocationException tie && tie.InnerException != null
+ ? tie.InnerException
+ : errorEvent.Exception!;
+ actual.Should().BeOfType();
+ actual.Message.Should().Contain("terminated");
+ }
+
+ #region Helper Methods
+
+ private sealed record WorkflowRunResult(
+ string UpdateText,
+ List? Result,
+ CheckpointInfo? LastCheckpoint,
+ List PendingRequests);
+
+ private static List CreatePlanResponse(string plan)
+ {
+ return
+ [
+ new ChatMessage(ChatRole.Assistant, plan)
+ {
+ MessageId = Guid.NewGuid().ToString("N"),
+ CreatedAt = DateTimeOffset.UtcNow
+ }
+ ];
+ }
+
+ private static List CreateProgressLedgerResponse(
+ bool isRequestSatisfied,
+ bool isInLoop,
+ bool isProgressBeingMade,
+ string nextSpeaker,
+ string instructionOrQuestion)
+ {
+ string isRequestSatisfiedStr = isRequestSatisfied ? "true" : "false";
+ string isInLoopStr = isInLoop ? "true" : "false";
+ string isProgressBeingMadeStr = isProgressBeingMade ? "true" : "false";
+ string nextSpeakerJson = JsonSerializer.Serialize(nextSpeaker);
+ string instructionJson = JsonSerializer.Serialize(instructionOrQuestion);
+
+ string ledgerJson = $$"""
+ {
+ "is_request_satisfied": { "answer": {{isRequestSatisfiedStr}}, "reason": "test reason" },
+ "is_in_loop": { "answer": {{isInLoopStr}}, "reason": "test reason" },
+ "is_progress_being_made": { "answer": {{isProgressBeingMadeStr}}, "reason": "test reason" },
+ "next_speaker": { "answer": {{nextSpeakerJson}}, "reason": "test reason" },
+ "instruction_or_question": { "answer": {{instructionJson}}, "reason": "test reason" }
+ }
+ """;
+
+ return
+ [
+ new ChatMessage(ChatRole.Assistant, ledgerJson)
+ {
+ MessageId = Guid.NewGuid().ToString("N"),
+ CreatedAt = DateTimeOffset.UtcNow
+ }
+ ];
+ }
+
+ private static List CreateFinalAnswerResponse(string answer)
+ {
+ return
+ [
+ new ChatMessage(ChatRole.Assistant, answer)
+ {
+ MessageId = Guid.NewGuid().ToString("N"),
+ CreatedAt = DateTimeOffset.UtcNow
+ }
+ ];
+ }
+
+ private static async Task ResumeMagenticWorkflowAsync(
+ Workflow workflow,
+ ExternalResponse response,
+ CheckpointManager checkpointManager,
+ CheckpointInfo? fromCheckpoint,
+ List? eventCollector = null)
+ {
+ InProcessExecutionEnvironment environment = ExecutionEnvironment.InProcess_Lockstep
+ .ToWorkflowExecutionEnvironment()
+ .WithCheckpointing(checkpointManager);
+
+ await using StreamingRun run = fromCheckpoint != null
+ ? await environment.ResumeStreamingAsync(workflow, fromCheckpoint)
+ : await environment.OpenStreamingAsync(workflow);
+
+ await run.SendResponseAsync(response);
+
+ return await ProcessWorkflowRunAsync(run, eventCollector);
+ }
+
+ private static async Task RunMagenticWorkflowAsync(
+ Workflow workflow,
+ List input,
+ CheckpointManager? checkpointManager = null,
+ List? eventCollector = null)
+ {
+ checkpointManager ??= CheckpointManager.CreateInMemory();
+
+ InProcessExecutionEnvironment environment = ExecutionEnvironment.InProcess_Lockstep
+ .ToWorkflowExecutionEnvironment()
+ .WithCheckpointing(checkpointManager);
+
+ await using StreamingRun run = await environment.OpenStreamingAsync(workflow);
+
+ await run.TrySendMessageAsync(input);
+ await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
+
+ return await ProcessWorkflowRunAsync(run, eventCollector);
+ }
+
+ private static async Task ProcessWorkflowRunAsync(
+ StreamingRun run,
+ List? eventCollector = null)
+ {
+ StringBuilder sb = new();
+ WorkflowOutputEvent? output = null;
+ CheckpointInfo? lastCheckpoint = null;
+ List pendingRequests = [];
+
+ await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false).ConfigureAwait(false))
+ {
+ eventCollector?.Add(evt);
+
+ switch (evt)
+ {
+ case AgentResponseUpdateEvent responseUpdate:
+ sb.Append(responseUpdate.Data);
+ break;
+
+ case RequestInfoEvent requestInfo:
+ pendingRequests.Add(requestInfo);
+ break;
+
+ case WorkflowOutputEvent e:
+ output = e;
+ break;
+
+ case WorkflowErrorEvent errorEvent:
+ Assert.Fail($"Workflow execution failed with error: {errorEvent.Exception}");
+ break;
+
+ case SuperStepCompletedEvent stepCompleted:
+ lastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint;
+ break;
+ }
+ }
+
+ return new(sb.ToString(), output?.As>(), lastCheckpoint, pendingRequests);
+ }
+
+ #endregion
+}
diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs
index 36c43076ed..af8a9d8e0d 100644
--- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs
+++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs
@@ -133,31 +133,31 @@ public sealed class ObservabilityTests : IDisposable
activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowCompleted, "activity should have workflow completed event");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_DefaultAsync()
{
await this.TestWorkflowEndToEndActivitiesAsync("Default");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_OffThreadAsync()
{
await this.TestWorkflowEndToEndActivitiesAsync("OffThread");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_ConcurrentAsync()
{
await this.TestWorkflowEndToEndActivitiesAsync("Concurrent");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_LockstepAsync()
{
await this.TestWorkflowEndToEndActivitiesAsync("Lockstep");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task CreatesWorkflowActivities_WithCorrectNameAsync()
{
// Arrange
@@ -182,7 +182,7 @@ public sealed class ObservabilityTests : IDisposable
tags.Should().ContainKey(Tags.WorkflowDefinition);
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task TelemetryDisabledByDefault_CreatesNoActivitiesAsync()
{
// Arrange
@@ -200,7 +200,7 @@ public sealed class ObservabilityTests : IDisposable
capturedActivities.Should().BeEmpty("No activities should be created when telemetry is disabled (default).");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task WithOpenTelemetry_UsesProvidedActivitySourceAsync()
{
// Arrange
@@ -235,7 +235,7 @@ public sealed class ObservabilityTests : IDisposable
"All activities should come from the user-provided ActivitySource.");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task DisableWorkflowBuild_PreventsWorkflowBuildActivityAsync()
{
// Arrange
@@ -255,7 +255,7 @@ public sealed class ObservabilityTests : IDisposable
"WorkflowBuild activity should be disabled.");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync()
{
// Arrange
@@ -285,7 +285,7 @@ public sealed class ObservabilityTests : IDisposable
"Other activities should still be created.");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync()
{
// Arrange
@@ -312,7 +312,7 @@ public sealed class ObservabilityTests : IDisposable
"Other activities should still be created.");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task DisableEdgeGroupProcess_PreventsEdgeGroupProcessActivityAsync()
{
// Arrange
@@ -333,7 +333,7 @@ public sealed class ObservabilityTests : IDisposable
"Other activities should still be created.");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task DisableMessageSend_PreventsMessageSendActivityAsync()
{
// Arrange
@@ -382,7 +382,7 @@ public sealed class ObservabilityTests : IDisposable
return builder.WithOpenTelemetry(configure: opts => opts.DisableMessageSend = true).Build();
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task EnableSensitiveData_LogsExecutorInputAndOutputAsync()
{
// Arrange
@@ -413,7 +413,7 @@ public sealed class ObservabilityTests : IDisposable
tags[Tags.ExecutorOutput].Should().Contain("HELLO", "Output should contain the transformed value.");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task EnableSensitiveData_Disabled_DoesNotLogInputOutputAsync()
{
// Arrange
@@ -442,7 +442,7 @@ public sealed class ObservabilityTests : IDisposable
tags.Should().NotContainKey(Tags.ExecutorOutput, "Output should NOT be logged when EnableSensitiveData is false.");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task EnableSensitiveData_LogsMessageSendContentAsync()
{
// Arrange
@@ -474,7 +474,7 @@ public sealed class ObservabilityTests : IDisposable
tags.Should().ContainKey(Tags.MessageSourceId, "Source ID should be logged.");
}
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task EnableSensitiveData_Disabled_DoesNotLogMessageContentAsync()
{
// Arrange
diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs
index 112961c609..f35910f26b 100644
--- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs
+++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs
@@ -67,7 +67,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable
/// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never
/// disposed because yield break in async iterators does not trigger using disposal.
///
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task WorkflowRunActivity_IsStopped_LockstepAsync()
{
// Arrange
@@ -111,7 +111,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable
/// Verifies that the workflow_invoke Activity is stopped when using the OffThread (Default)
/// execution environment (StreamingRunEventStream).
///
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task WorkflowRunActivity_IsStopped_OffThreadAsync()
{
// Arrange
@@ -156,7 +156,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable
/// (StreamingRun.WatchStreamAsync) with the OffThread execution environment.
/// This matches the exact usage pattern described in the issue.
///
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync()
{
// Arrange
@@ -203,7 +203,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable
/// streaming invocation, even when using the same workflow in a multi-turn pattern,
/// and that each session gets its own session activity.
///
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsync()
{
// Arrange
@@ -264,7 +264,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable
/// Verifies that all started activities (not just workflow_invoke) are properly stopped.
/// This ensures no spans are "leaked" without being exported.
///
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task AllActivities_AreStopped_AfterWorkflowCompletionAsync()
{
// Arrange
@@ -305,7 +305,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable
/// be parented under the workflow session span. The run activity should
/// still nest correctly under the session.
///
- [Fact(Skip = "Flaky test - temporarily disabled.")]
+ [Fact]
public async Task Lockstep_SessionActivity_DoesNotLeak_IntoCaller_ActivityCurrentAsync()
{
// Arrange
diff --git a/python/packages/a2a/AGENTS.md b/python/packages/a2a/AGENTS.md
index f5f7176cb0..d27d6d1c11 100644
--- a/python/packages/a2a/AGENTS.md
+++ b/python/packages/a2a/AGENTS.md
@@ -42,7 +42,7 @@ request_handler = DefaultRequestHandler(
app = Starlette(
routes=[
*create_agent_card_routes(my_agent_card),
- *create_jsonrpc_routes(request_handler),
+ *create_jsonrpc_routes(request_handler, "/"),
]
)
```
diff --git a/python/packages/a2a/agent_framework_a2a/_a2a_executor.py b/python/packages/a2a/agent_framework_a2a/_a2a_executor.py
index 8e8dd684c9..949ce10167 100644
--- a/python/packages/a2a/agent_framework_a2a/_a2a_executor.py
+++ b/python/packages/a2a/agent_framework_a2a/_a2a_executor.py
@@ -78,7 +78,7 @@ class A2AExecutor(AgentExecutor):
app = Starlette(
routes=[
*create_agent_card_routes(public_agent_card),
- *create_jsonrpc_routes(request_handler),
+ *create_jsonrpc_routes(request_handler, "/"),
],
)
diff --git a/python/packages/a2a/agent_framework_a2a/_agent.py b/python/packages/a2a/agent_framework_a2a/_agent.py
index 93874da391..a6f041ca64 100644
--- a/python/packages/a2a/agent_framework_a2a/_agent.py
+++ b/python/packages/a2a/agent_framework_a2a/_agent.py
@@ -365,6 +365,10 @@ class A2AAgent(AgentTelemetryLayer, BaseAgent):
all_updates: list[AgentResponseUpdate] = []
streamed_artifact_ids_by_task: dict[str, set[str]] = {}
+ # In non-streaming mode, accumulate intermediate status content so it
+ # can be surfaced when the terminal event arrives (mirroring v0.3.x
+ # behavior where the full Task history was available at completion).
+ pending_updates_by_task: dict[str, list[AgentResponseUpdate]] = {}
async for item in a2a_stream:
payload_type = item.WhichOneof("payload")
if payload_type == "message":
@@ -391,27 +395,55 @@ class A2AAgent(AgentTelemetryLayer, BaseAgent):
)
if task.status.state in TERMINAL_TASK_STATES:
streamed_artifact_ids_by_task.pop(task.id, None)
+ # If the terminal Task has no content, flush accumulated updates
+ if not updates or all(not u.contents for u in updates):
+ pending = pending_updates_by_task.pop(task.id, [])
+ for update in pending:
+ all_updates.append(update)
+ yield update
+ else:
+ pending_updates_by_task.pop(task.id, None)
for update in updates:
all_updates.append(update)
yield update
elif payload_type == "status_update":
status_event = item.status_update
updates = self._updates_from_task_update_event(status_event)
+ is_terminal = status_event.status.state in TERMINAL_TASK_STATES
if emit_intermediate:
for update in updates:
all_updates.append(update)
yield update
+ elif is_terminal:
+ if updates:
+ # Terminal event with content — discard accumulated intermediates
+ pending_updates_by_task.pop(status_event.task_id, None)
+ for update in updates:
+ all_updates.append(update)
+ yield update
+ else:
+ # Terminal event with NO content — flush accumulated updates
+ pending = pending_updates_by_task.pop(status_event.task_id, [])
+ for update in pending:
+ all_updates.append(update)
+ yield update
+ else:
+ # Non-streaming intermediate: accumulate for later
+ if updates:
+ pending_updates_by_task.setdefault(status_event.task_id, []).extend(updates)
elif payload_type == "artifact_update":
artifact_event = item.artifact_update
updates = self._updates_from_task_update_event(artifact_event)
+ # Always yield artifact updates — they carry actual response
+ # content (files, data). Track IDs so that a subsequent
+ # terminal Task doesn't duplicate the same artifacts.
if updates:
streamed_artifact_ids_by_task.setdefault(artifact_event.task_id, set()).add(
artifact_event.artifact.artifact_id
)
- if emit_intermediate:
- for update in updates:
- all_updates.append(update)
- yield update
+ for update in updates:
+ all_updates.append(update)
+ yield update
else:
raise NotImplementedError(f"Unsupported StreamResponse payload: {payload_type}")
diff --git a/python/packages/a2a/tests/test_a2a_agent.py b/python/packages/a2a/tests/test_a2a_agent.py
index 9af8c66884..76294f30bf 100644
--- a/python/packages/a2a/tests/test_a2a_agent.py
+++ b/python/packages/a2a/tests/test_a2a_agent.py
@@ -1570,4 +1570,102 @@ async def test_none_metadata_leaves_additional_properties_empty(
assert not response.additional_properties
+async def test_non_streaming_terminal_status_update_surfaces_content(
+ a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient
+) -> None:
+ """Non-streaming run() should surface content from terminal status_update events."""
+ completed_msg = A2AMessage(
+ message_id="msg-complete",
+ role=A2ARole.ROLE_AGENT,
+ parts=[Part(text="Done! Here is your answer.")],
+ )
+ status = TaskStatus(state=TaskState.TASK_STATE_COMPLETED, message=completed_msg)
+ event = TaskStatusUpdateEvent(task_id="task-ts", context_id="ctx-ts", status=status)
+ mock_a2a_client.responses.append(StreamResponse(status_update=event))
+
+ response = await a2a_agent.run("Hello")
+
+ assert len(response.messages) == 1
+ assert response.messages[0].text == "Done! Here is your answer."
+
+
+async def test_non_streaming_accumulates_working_content_for_empty_terminal(
+ a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient
+) -> None:
+ """Non-streaming run() accumulates WORKING content and flushes on empty terminal event."""
+ # Intermediate WORKING event with content
+ working_msg = A2AMessage(
+ message_id="msg-working",
+ role=A2ARole.ROLE_AGENT,
+ parts=[Part(text="Here is your answer from working state.")],
+ )
+ working_status = TaskStatus(state=TaskState.TASK_STATE_WORKING, message=working_msg)
+ working_event = TaskStatusUpdateEvent(task_id="task-acc", context_id="ctx-acc", status=working_status)
+ mock_a2a_client.responses.append(StreamResponse(status_update=working_event))
+
+ # Terminal COMPLETED event with NO content
+ completed_status = TaskStatus(state=TaskState.TASK_STATE_COMPLETED)
+ completed_event = TaskStatusUpdateEvent(task_id="task-acc", context_id="ctx-acc", status=completed_status)
+ mock_a2a_client.responses.append(StreamResponse(status_update=completed_event))
+
+ response = await a2a_agent.run("Hello")
+
+ # The accumulated WORKING content is flushed when terminal arrives empty
+ assert len(response.messages) == 1
+ assert response.messages[0].text == "Here is your answer from working state."
+
+
+async def test_non_streaming_intermediate_discarded_when_terminal_has_content(
+ a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient
+) -> None:
+ """Non-streaming: if terminal event has content, intermediate content is discarded."""
+ # Intermediate WORKING event
+ working_msg = A2AMessage(
+ message_id="msg-working",
+ role=A2ARole.ROLE_AGENT,
+ parts=[Part(text="Still thinking...")],
+ )
+ working_status = TaskStatus(state=TaskState.TASK_STATE_WORKING, message=working_msg)
+ working_event = TaskStatusUpdateEvent(task_id="task-wi", context_id="ctx-wi", status=working_status)
+ mock_a2a_client.responses.append(StreamResponse(status_update=working_event))
+
+ # Terminal COMPLETED event WITH content
+ completed_msg = A2AMessage(
+ message_id="msg-final",
+ role=A2ARole.ROLE_AGENT,
+ parts=[Part(text="Final answer")],
+ )
+ completed_status = TaskStatus(state=TaskState.TASK_STATE_COMPLETED, message=completed_msg)
+ completed_event = TaskStatusUpdateEvent(task_id="task-wi", context_id="ctx-wi", status=completed_status)
+ mock_a2a_client.responses.append(StreamResponse(status_update=completed_event))
+
+ response = await a2a_agent.run("Hello")
+
+ # Terminal content supersedes accumulated intermediates
+ assert len(response.messages) == 1
+ assert response.messages[0].text == "Final answer"
+
+
+async def test_non_streaming_artifact_update_surfaces_content(
+ a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient
+) -> None:
+ """Non-streaming run() should surface content from artifact_update events."""
+ artifact = Artifact(
+ artifact_id="art-ns",
+ parts=[Part(text="Artifact content")],
+ )
+ event = TaskArtifactUpdateEvent(task_id="task-anu", context_id="ctx-anu", artifact=artifact, append=False)
+ mock_a2a_client.responses.append(StreamResponse(artifact_update=event))
+
+ # Terminal task with the same artifact ID — should be deduped
+ mock_a2a_client.add_task_response("task-anu", [{"id": "art-ns", "content": "Artifact content"}])
+
+ response = await a2a_agent.run("Hello")
+
+ # Artifact update + terminal task with same artifact ID = content emitted once from
+ # the artifact_update, then the duplicate from the task is filtered by streamed_artifact_ids
+ assert len(response.messages) == 1
+ assert response.messages[0].text == "Artifact content"
+
+
# endregion
diff --git a/python/packages/core/agent_framework/_mcp.py b/python/packages/core/agent_framework/_mcp.py
index a7a3f1a796..35ccb1d58a 100644
--- a/python/packages/core/agent_framework/_mcp.py
+++ b/python/packages/core/agent_framework/_mcp.py
@@ -261,6 +261,7 @@ class MCPTool:
self.request_timeout = request_timeout
self.client = client
self._functions: list[FunctionTool] = []
+ self._tool_call_meta_by_name: dict[str, dict[str, Any]] = {}
self.is_connected: bool = False
self._tools_loaded: bool = False
self._prompts_loaded: bool = False
@@ -1026,6 +1027,7 @@ class MCPTool:
# Track existing function names to prevent duplicates
existing_names = {func.name for func in self._functions}
+ self._tool_call_meta_by_name.clear()
params: types.PaginatedRequestParams | None = None
while True:
@@ -1035,6 +1037,9 @@ class MCPTool:
tool_list = await self.session.list_tools(params=params) # type: ignore[union-attr]
for tool in tool_list.tools:
+ if tool.meta is not None:
+ self._tool_call_meta_by_name[tool.name] = dict(tool.meta)
+
normalized_name = _normalize_mcp_name(tool.name)
local_name = _build_prefixed_mcp_name(normalized_name, self.tool_name_prefix)
@@ -1185,14 +1190,15 @@ class MCPTool:
}
}
- # Inject OpenTelemetry trace context into MCP _meta for distributed tracing.
- otel_meta = _inject_otel_into_mcp_meta()
+ # Some MCP proxies require their tools/list metadata to be echoed on tools/call.
+ tool_meta = self._tool_call_meta_by_name.get(tool_name)
+ meta = _inject_otel_into_mcp_meta(dict(tool_meta) if tool_meta is not None else None)
parser = self.parse_tool_results or self._parse_tool_result_from_mcp
# Try the operation, reconnecting once if the connection is closed
for attempt in range(2):
try:
- result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=otel_meta) # type: ignore
+ result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=meta) # type: ignore
if result.isError:
parsed = parser(result)
text = (
diff --git a/python/packages/core/tests/core/test_mcp.py b/python/packages/core/tests/core/test_mcp.py
index cd3173a7d3..0fc5867d79 100644
--- a/python/packages/core/tests/core/test_mcp.py
+++ b/python/packages/core/tests/core/test_mcp.py
@@ -4194,6 +4194,57 @@ async def test_mcp_tool_call_tool_otel_meta(use_span, expect_traceparent, span_e
assert meta is None
+async def test_mcp_tool_call_tool_forwards_tool_list_meta():
+ """call_tool echoes per-tool metadata returned by tools/list."""
+ from opentelemetry import trace
+
+ tool_meta = {
+ "tool_configuration": {
+ "name": "WorkIQSharePoint.readSmallBinaryFile",
+ "type": "foundry_toolbox",
+ }
+ }
+
+ class TestServer(MCPTool):
+ async def connect(self):
+ self.session = Mock(spec=ClientSession)
+ self.session.list_tools = AsyncMock(
+ return_value=types.ListToolsResult(
+ tools=[
+ types.Tool(
+ name="WorkIQSharePoint.readSmallBinaryFile",
+ description="Read a binary file",
+ inputSchema={
+ "type": "object",
+ "properties": {"fileId": {"type": "string"}},
+ "required": ["fileId"],
+ },
+ _meta=tool_meta,
+ )
+ ]
+ )
+ )
+ self.session.call_tool = AsyncMock(
+ return_value=types.CallToolResult(content=[types.TextContent(type="text", text="result")])
+ )
+ self.session.list_prompts = AsyncMock(
+ return_value=types.ListPromptsResult(prompts=[])
+ )
+
+ def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
+ return None
+
+ server = TestServer(name="test_server")
+ async with server:
+ await server.load_tools()
+ await server.load_prompts()
+
+ with trace.use_span(trace.NonRecordingSpan(trace.INVALID_SPAN_CONTEXT)):
+ await server.call_tool("WorkIQSharePoint.readSmallBinaryFile", fileId="file-1")
+
+ assert server.session.call_tool.call_args.kwargs["meta"] == tool_meta
+
+
async def test_mcp_streamable_http_tool_hook_not_duplicated_on_repeated_get_mcp_client():
"""Test that calling get_mcp_client multiple times does not accumulate duplicate hooks."""
tool = MCPStreamableHTTPTool(
diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py
index 1645fcec2e..c34c65538c 100644
--- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py
+++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py
@@ -11,6 +11,7 @@ import tempfile
import threading
from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence
from contextlib import suppress
+from pathlib import Path
from typing import Protocol, cast
from agent_framework import (
@@ -205,6 +206,47 @@ class FileBasedFunctionApprovalStorage:
return await asyncio.to_thread(self._load_sync, approval_request_id)
+def _checkpoint_storage_for_context(root: str, context_id: str) -> FileCheckpointStorage:
+ """Build a ``FileCheckpointStorage`` for ``context_id`` rooted under ``root``.
+
+ ``context_id`` originates from caller-controlled fields such as
+ ``previous_response_id`` or from server-generated fields such as
+ ``conversation_id`` / ``response_id``. In every case it must be treated as
+ an untrusted single path segment: path separators, drive letters, parent
+ references and similar would otherwise let the resulting directory escape
+ the configured checkpoint root (CWE-22). The check resolves the joined
+ path and verifies it stays under the resolved root before any directory is
+ created on disk.
+ """
+ if not isinstance(context_id, str) or not context_id:
+ raise RuntimeError("Invalid checkpoint context id: must be a non-empty string.")
+ # Reject any segment that is not a single safe path component. This covers
+ # POSIX/Windows separators, NUL bytes, drive letters, and all-dot segments
+ # (``.``, ``..``, ``...``, ...). We deliberately do not URL-decode the id
+ # here: the hosting layer never decodes context ids before joining them, so
+ # forms such as ``%2e%2e`` are accepted as literal directory names. Do NOT
+ # add decoding here without re-validating after the decode -- decode-then-
+ # join is exactly the pattern that reintroduces traversal. We also do not
+ # attempt to "sanitize" by stripping characters because that can introduce
+ # collisions between distinct ids.
+ if (
+ "/" in context_id
+ or "\\" in context_id
+ or "\x00" in context_id
+ # All-dot segments (``.``, ``..``, ``...``, ...) reduce to "" after stripping dots.
+ or context_id.strip(".") == ""
+ or os.path.isabs(context_id)
+ or os.path.splitdrive(context_id)[0]
+ ):
+ raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}")
+
+ root_path = Path(root).resolve()
+ storage_path = (root_path / context_id).resolve()
+ if not storage_path.is_relative_to(root_path):
+ raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}")
+ return FileCheckpointStorage(storage_path)
+
+
class ResponsesHostServer(ResponsesAgentServerHost):
"""A responses server host for an agent."""
@@ -400,7 +442,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
latest_checkpoint_id: str | None = None
restore_storage: FileCheckpointStorage | None = None
if context_id is not None:
- restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id))
+ restore_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, context_id)
latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name)
if latest_checkpoint is not None:
latest_checkpoint_id = latest_checkpoint.checkpoint_id
@@ -414,7 +456,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
# supplied, restore_storage points at the *prior* response's
# directory and write_storage points at the *current* response's.
write_context_id = context.conversation_id or context.response_id
- write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id))
+ write_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id)
# Multi-turn pattern: when we have a prior checkpoint, restore it
# first (drive the workflow back to idle with prior state intact),
diff --git a/python/packages/foundry_hosting/tests/test_responses.py b/python/packages/foundry_hosting/tests/test_responses.py
index a0a6335651..e4d545d6d7 100644
--- a/python/packages/foundry_hosting/tests/test_responses.py
+++ b/python/packages/foundry_hosting/tests/test_responses.py
@@ -11,7 +11,7 @@ the registered _handle_create handler.
from __future__ import annotations
import json
-from collections.abc import AsyncIterator
+from collections.abc import AsyncIterator, Callable
from unittest.mock import AsyncMock, MagicMock
import httpx
@@ -20,6 +20,7 @@ from agent_framework import (
AgentResponse,
AgentResponseUpdate,
Content,
+ FileCheckpointStorage,
HistoryProvider,
Message,
RawAgent,
@@ -2652,3 +2653,241 @@ class TestFunctionApprovalRoundTrip:
# endregion
+
+
+# region Checkpoint context path validation
+
+
+class TestCheckpointContextPathValidation:
+ """Regression tests for the path-traversal hardening of checkpoint storage.
+
+ These tests guard against CWE-22 in the workflow hosting path. The hosting
+ code joins caller-supplied identifiers (``previous_response_id``) and
+ server-generated identifiers (``conversation_id`` / ``response_id``) under
+ the configured checkpoint root. Without validation, traversal segments
+ such as ``../../escape`` or absolute paths cause directory creation
+ outside the intended root.
+ """
+
+ @staticmethod
+ def _helper() -> Callable[[str, str], FileCheckpointStorage]:
+ from agent_framework_foundry_hosting._responses import ( # pyright: ignore[reportPrivateUsage]
+ _checkpoint_storage_for_context,
+ )
+
+ return _checkpoint_storage_for_context
+
+ def test_valid_segment_creates_storage_under_root(self, tmp_path: Any) -> None:
+ helper = self._helper()
+ root = tmp_path / "root"
+ root.mkdir()
+ storage = helper(str(root), "resp_abc123")
+ assert storage.storage_path.is_dir()
+ assert storage.storage_path.parent == root.resolve()
+
+ @pytest.mark.parametrize(
+ "bad_id",
+ [
+ # Original MSRC repro: traversal embedded inside an id-shaped value.
+ # The 14 ``A``s pad the suffix to mimic the exact length of the
+ # ``api-made-dir<14-char-suffix>`` segment from the original report.
+ "caresp_x/../../service-data/api-made-dir" + "A" * 14,
+ # Variant report repros.
+ "../../escape",
+ "..",
+ ".",
+ "...",
+ "/tmp/escape",
+ "/absolute/path",
+ "C:\\temp\\escape",
+ "..\\..\\escape",
+ "foo\\..\\bar",
+ "foo/bar",
+ "with\x00null",
+ "",
+ ],
+ )
+ def test_traversal_and_separator_payloads_are_rejected(self, tmp_path: Any, bad_id: str) -> None:
+ helper = self._helper()
+ # Use a dedicated root *inside* tmp_path so we can assert that nothing
+ # was created anywhere under tmp_path (root, siblings, or above).
+ # Asserting against tmp_path.parent would be flaky under parallel test
+ # execution because tmp_path.parent is shared across tests.
+ root = tmp_path / "root"
+ root.mkdir()
+ before = sorted(p.name for p in tmp_path.iterdir())
+ with pytest.raises(RuntimeError):
+ helper(str(root), bad_id)
+ # No sibling/escape directory should have been created next to the root.
+ after = sorted(p.name for p in tmp_path.iterdir())
+ assert before == after, f"Unexpected filesystem artifacts created for payload {bad_id!r}"
+ # And nothing inside the root either.
+ assert list(root.iterdir()) == []
+
+ def test_non_string_context_id_is_rejected(self, tmp_path: Any) -> None:
+ helper = self._helper()
+ with pytest.raises(RuntimeError):
+ helper(str(tmp_path), None) # type: ignore[arg-type]
+
+ def test_url_encoded_traversal_is_treated_as_literal_segment(self, tmp_path: Any) -> None:
+ """URL-encoded traversal should not decode to traversal at the filesystem layer.
+
+ The hosting layer never URL-decodes ids before using them; the helper
+ should accept ``%2e%2e`` as a single literal segment (no escape).
+ """
+ helper = self._helper()
+ root = tmp_path / "root"
+ root.mkdir()
+ storage = helper(str(root), "%2e%2e")
+ assert storage.storage_path.parent == root.resolve()
+ assert storage.storage_path.name == "%2e%2e"
+
+ @pytest.mark.parametrize(
+ "context_field,bad_id",
+ [
+ # Restore sink: caller-controlled previous_response_id.
+ ("previous_response_id", "../../escape"),
+ ("previous_response_id", "/tmp/escape-abs"),
+ ("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14),
+ # Restore sink: server-issued conversation_id (defense in depth).
+ ("conversation_id", "../../escape"),
+ # Write sink: malicious response_id (defense in depth).
+ ("response_id", "../../escape"),
+ ],
+ )
+ async def test_handle_inner_workflow_rejects_malicious_context_id(
+ self, tmp_path: Any, context_field: str, bad_id: str
+ ) -> None:
+ """End-to-end: ``_handle_inner_workflow`` must reject malicious ids on
+ both the restore sink (``previous_response_id`` / ``conversation_id``)
+ and the write sink (``response_id``) without creating any directories.
+ """
+ from unittest.mock import patch
+
+ from agent_framework import WorkflowAgent
+ from azure.ai.agentserver.responses import ResponseContext
+ from azure.ai.agentserver.responses.models import CreateResponse
+
+ # Build a mock that satisfies isinstance(agent, WorkflowAgent) and the
+ # constructor's "no existing checkpointing" guard.
+ agent = MagicMock(spec=WorkflowAgent)
+ agent.id = "wf-agent"
+ agent.name = "wf"
+ agent.description = ""
+ agent.context_providers = []
+ agent.workflow = MagicMock()
+ agent.workflow.name = "wf"
+ agent.workflow._runner_context.has_checkpointing = MagicMock(return_value=False)
+
+ # Constructor inspects WorkflowAgent.workflow internals; bypass setup
+ # by feeding a configured mock through a normal init.
+ server = ResponsesHostServer(agent, store=InMemoryResponseProvider())
+ # Re-root checkpoint storage at our isolated tmp_path so we can detect
+ # any escape attempt on the filesystem.
+ root = tmp_path / "root"
+ root.mkdir()
+ server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage]
+
+ # Build a ResponseContext with the malicious id targeting the chosen sink.
+ kwargs: dict[str, Any] = {
+ "response_id": "resp_" + "a" * 48,
+ "mode_flags": MagicMock(),
+ }
+ if context_field == "previous_response_id":
+ request = CreateResponse(model="m", input="hi", previous_response_id=bad_id)
+ kwargs["previous_response_id"] = bad_id
+ elif context_field == "conversation_id":
+ request = CreateResponse(model="m", input="hi")
+ kwargs["conversation_id"] = bad_id
+ else: # response_id (write sink)
+ request = CreateResponse(model="m", input="hi")
+ kwargs["response_id"] = bad_id
+
+ # Avoid invoking the real input-resolution machinery, which would need
+ # a configured provider; we never reach the workflow run on rejection.
+ with patch.object(ResponseContext, "get_input_items", new=AsyncMock(return_value=[])):
+ context = ResponseContext(**kwargs)
+ before = sorted(p.name for p in tmp_path.iterdir())
+ with pytest.raises(RuntimeError, match="Invalid checkpoint context id"):
+ async for _ in server._handle_inner_workflow(request, context): # pyright: ignore[reportPrivateUsage]
+ pass
+ after = sorted(p.name for p in tmp_path.iterdir())
+
+ assert before == after, f"Unexpected filesystem artifacts created for {context_field}={bad_id!r}"
+ assert list(root.iterdir()) == [], f"Checkpoint dir created inside root for {context_field}={bad_id!r}"
+
+ @pytest.mark.parametrize(
+ "context_field,bad_id",
+ [
+ # Restore sink: caller-controlled previous_response_id. These are
+ # rejected by request validation (HTTP 400) before the checkpoint
+ # code is reached.
+ ("previous_response_id", "../../escape"),
+ ("previous_response_id", "/tmp/escape-abs"),
+ ("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14),
+ # Restore sink: server-issued conversation id (defense in depth).
+ # Reaches the checkpoint code and is rejected there, surfacing as
+ # an HTTP 5xx without creating any filesystem artifacts.
+ ("conversation", "../../escape"),
+ ("conversation", "/tmp/escape-abs"),
+ ],
+ )
+ async def test_malicious_context_id_rejected_e2e(self, tmp_path: Any, context_field: str, bad_id: str) -> None:
+ """End-to-end (ASGI-in-process): malicious context ids must be rejected
+ through the full HTTP pipeline, and no checkpoint directory may be
+ created on disk for either the validation-layer rejection
+ (``previous_response_id``) or the deeper checkpoint-layer rejection
+ (``conversation``).
+
+ The ``response_id`` write-sink is server-generated and not reachable
+ via the public HTTP surface, so its defense-in-depth check is covered
+ by the helper-level test above.
+ """
+ from agent_framework import WorkflowAgent
+
+ # Build a mock that satisfies isinstance(agent, WorkflowAgent) and the
+ # constructor's "no existing checkpointing" guard.
+ agent = MagicMock(spec=WorkflowAgent)
+ agent.id = "wf-agent"
+ agent.name = "wf"
+ agent.description = ""
+ agent.context_providers = []
+ agent.workflow = MagicMock()
+ agent.workflow.name = "wf"
+ agent.workflow._runner_context.has_checkpointing = MagicMock( # pyright: ignore[reportPrivateUsage]
+ return_value=False
+ )
+
+ server = ResponsesHostServer(agent, store=InMemoryResponseProvider())
+ # Re-root checkpoint storage at our isolated tmp_path so we can detect
+ # any escape attempt on the filesystem.
+ root = tmp_path / "root"
+ root.mkdir()
+ server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage]
+
+ payload: dict[str, Any] = {"model": "m", "input": "hi"}
+ if context_field == "previous_response_id":
+ payload["previous_response_id"] = bad_id
+ else: # conversation
+ payload["conversation"] = bad_id
+
+ before = sorted(p.name for p in tmp_path.iterdir())
+ transport = httpx.ASGITransport(app=server)
+ async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
+ resp = await client.post("/responses", json=payload)
+ after = sorted(p.name for p in tmp_path.iterdir())
+
+ # The request must not succeed; either request validation rejects it
+ # (4xx) or the checkpoint layer raises and the server returns 5xx.
+ # Either way, no successful response may be produced.
+ assert resp.status_code >= 400, (
+ f"Expected non-2xx for {context_field}={bad_id!r}, got {resp.status_code}: {resp.text[:200]}"
+ )
+ assert before == after, (
+ f"Unexpected filesystem artifacts under tmp_path for {context_field}={bad_id!r}: "
+ f"before={before} after={after}"
+ )
+ assert list(root.iterdir()) == [], f"Checkpoint directory created inside root for {context_field}={bad_id!r}"
+
+
+# endregion
diff --git a/python/samples/04-hosting/a2a/a2a_server.py b/python/samples/04-hosting/a2a/a2a_server.py
index c299f1e71e..8eaf3a7363 100644
--- a/python/samples/04-hosting/a2a/a2a_server.py
+++ b/python/samples/04-hosting/a2a/a2a_server.py
@@ -103,7 +103,7 @@ def main() -> None:
app = Starlette(
routes=[
*create_agent_card_routes(agent_card),
- *create_jsonrpc_routes(request_handler),
+ *create_jsonrpc_routes(request_handler, "/"),
]
)
diff --git a/python/samples/04-hosting/a2a/agent_definitions.py b/python/samples/04-hosting/a2a/agent_definitions.py
index 32da16b9d2..79f1f1b1ee 100644
--- a/python/samples/04-hosting/a2a/agent_definitions.py
+++ b/python/samples/04-hosting/a2a/agent_definitions.py
@@ -8,16 +8,11 @@ AgentCards for the invoice, policy, and logistics agent types.
from __future__ import annotations
-from typing import TYPE_CHECKING
-
from a2a.types import AgentCapabilities, AgentCard, AgentInterface, AgentSkill
+from agent_framework import Agent
+from agent_framework.foundry import FoundryChatClient
from invoice_data import query_by_invoice_id, query_by_transaction_id, query_invoices
-if TYPE_CHECKING:
- from agent_framework import Agent
- from agent_framework.foundry import FoundryChatClient
-
-
# ---------------------------------------------------------------------------
# Agent instructions
# ---------------------------------------------------------------------------
diff --git a/python/samples/04-hosting/a2a/agent_executor.py b/python/samples/04-hosting/a2a/agent_executor.py
index f77f866511..3dcefff09f 100644
--- a/python/samples/04-hosting/a2a/agent_executor.py
+++ b/python/samples/04-hosting/a2a/agent_executor.py
@@ -10,18 +10,12 @@ published back through the a2a-sdk event queue.
from __future__ import annotations
import asyncio
-import uuid
from typing import TYPE_CHECKING
+from a2a.helpers import new_task_from_user_message
from a2a.server.agent_execution.agent_executor import AgentExecutor
-from a2a.types import (
- Message,
- Part,
- Role,
- TaskState,
- TaskStatus,
- TaskStatusUpdateEvent,
-)
+from a2a.server.tasks import TaskUpdater
+from a2a.types import Part, TaskState
if TYPE_CHECKING:
from a2a.server.agent_execution.context import RequestContext
@@ -47,17 +41,17 @@ class AgentFrameworkExecutor(AgentExecutor):
if not user_text:
user_text = "Hello"
- task_id = context.task_id or str(uuid.uuid4())
- context_id = context.context_id or str(uuid.uuid4())
+ # v1.0 requires a Task object in the queue before any TaskStatusUpdateEvent
+ task = context.current_task
+ if not task and context.message:
+ task = new_task_from_user_message(context.message)
+ await event_queue.enqueue_event(task)
+
+ task_id = task.id if task else context.task_id
+ updater = TaskUpdater(event_queue, task_id, context.context_id)
# Signal that the agent is working
- await event_queue.enqueue_event(
- TaskStatusUpdateEvent(
- task_id=task_id,
- context_id=context_id,
- status=TaskStatus(state=TaskState.TASK_STATE_WORKING),
- )
- )
+ await updater.start_work()
try:
response = await self.agent.run(user_text)
@@ -71,48 +65,19 @@ class AgentFrameworkExecutor(AgentExecutor):
if not response_parts:
response_parts.append(Part(text=str(response)))
- # Publish the agent's response as a completed message
- await event_queue.enqueue_event(
- TaskStatusUpdateEvent(
- task_id=task_id,
- context_id=context_id,
- status=TaskStatus(
- state=TaskState.TASK_STATE_COMPLETED,
- message=Message(
- message_id=str(uuid.uuid4()),
- role=Role.ROLE_AGENT,
- parts=response_parts,
- ),
- ),
- )
+ # Publish the agent's response and mark as completed
+ await updater.complete(
+ message=updater.new_agent_message(response_parts),
)
except asyncio.CancelledError:
raise
except Exception as e:
- await event_queue.enqueue_event(
- TaskStatusUpdateEvent(
- task_id=task_id,
- context_id=context_id,
- status=TaskStatus(
- state=TaskState.TASK_STATE_FAILED,
- message=Message(
- message_id=str(uuid.uuid4()),
- role=Role.ROLE_AGENT,
- parts=[Part(text=f"Agent error: {e}")],
- ),
- ),
- )
+ await updater.update_status(
+ state=TaskState.TASK_STATE_FAILED,
+ message=updater.new_agent_message([Part(text=f"Agent error: {e}")]),
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Handle cancellation by publishing a canceled status."""
- task_id = context.task_id or str(uuid.uuid4())
- context_id = context.context_id or str(uuid.uuid4())
-
- await event_queue.enqueue_event(
- TaskStatusUpdateEvent(
- task_id=task_id,
- context_id=context_id,
- status=TaskStatus(state=TaskState.TASK_STATE_CANCELED),
- )
- )
+ updater = TaskUpdater(event_queue, context.task_id, context.context_id)
+ await updater.update_status(state=TaskState.TASK_STATE_CANCELED)
diff --git a/python/samples/04-hosting/a2a/agent_framework_to_a2a.py b/python/samples/04-hosting/a2a/agent_framework_to_a2a.py
index 519c907fe5..c29f18ed6d 100644
--- a/python/samples/04-hosting/a2a/agent_framework_to_a2a.py
+++ b/python/samples/04-hosting/a2a/agent_framework_to_a2a.py
@@ -65,7 +65,7 @@ if __name__ == "__main__":
server = Starlette(
routes=[
*create_agent_card_routes(public_agent_card),
- *create_jsonrpc_routes(request_handler),
+ *create_jsonrpc_routes(request_handler, "/"),
]
)