From 27671974c2da82ab16706b4006b314d83842e5df Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 14 May 2026 15:17:26 -0400 Subject: [PATCH 1/5] .NET: Re-enable ObservabilityTests and WorkflowRunActivityStopTests (#5837) Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/220699b9-7f9e-4d5d-87d0-fb621d169d84 Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> Co-authored-by: Jacob Alber --- .../ObservabilityTests.cs | 32 +++++++++---------- .../WorkflowRunActivityStopTests.cs | 12 +++---- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index 36c43076ed..af8a9d8e0d 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -133,31 +133,31 @@ public sealed class ObservabilityTests : IDisposable activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowCompleted, "activity should have workflow completed event"); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_DefaultAsync() { await this.TestWorkflowEndToEndActivitiesAsync("Default"); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_OffThreadAsync() { await this.TestWorkflowEndToEndActivitiesAsync("OffThread"); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_ConcurrentAsync() { await this.TestWorkflowEndToEndActivitiesAsync("Concurrent"); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task CreatesWorkflowEndToEndActivities_WithCorrectName_LockstepAsync() { await this.TestWorkflowEndToEndActivitiesAsync("Lockstep"); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task CreatesWorkflowActivities_WithCorrectNameAsync() { // Arrange @@ -182,7 +182,7 @@ public sealed class ObservabilityTests : IDisposable tags.Should().ContainKey(Tags.WorkflowDefinition); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task TelemetryDisabledByDefault_CreatesNoActivitiesAsync() { // Arrange @@ -200,7 +200,7 @@ public sealed class ObservabilityTests : IDisposable capturedActivities.Should().BeEmpty("No activities should be created when telemetry is disabled (default)."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task WithOpenTelemetry_UsesProvidedActivitySourceAsync() { // Arrange @@ -235,7 +235,7 @@ public sealed class ObservabilityTests : IDisposable "All activities should come from the user-provided ActivitySource."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task DisableWorkflowBuild_PreventsWorkflowBuildActivityAsync() { // Arrange @@ -255,7 +255,7 @@ public sealed class ObservabilityTests : IDisposable "WorkflowBuild activity should be disabled."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync() { // Arrange @@ -285,7 +285,7 @@ public sealed class ObservabilityTests : IDisposable "Other activities should still be created."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync() { // Arrange @@ -312,7 +312,7 @@ public sealed class ObservabilityTests : IDisposable "Other activities should still be created."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task DisableEdgeGroupProcess_PreventsEdgeGroupProcessActivityAsync() { // Arrange @@ -333,7 +333,7 @@ public sealed class ObservabilityTests : IDisposable "Other activities should still be created."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task DisableMessageSend_PreventsMessageSendActivityAsync() { // Arrange @@ -382,7 +382,7 @@ public sealed class ObservabilityTests : IDisposable return builder.WithOpenTelemetry(configure: opts => opts.DisableMessageSend = true).Build(); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task EnableSensitiveData_LogsExecutorInputAndOutputAsync() { // Arrange @@ -413,7 +413,7 @@ public sealed class ObservabilityTests : IDisposable tags[Tags.ExecutorOutput].Should().Contain("HELLO", "Output should contain the transformed value."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task EnableSensitiveData_Disabled_DoesNotLogInputOutputAsync() { // Arrange @@ -442,7 +442,7 @@ public sealed class ObservabilityTests : IDisposable tags.Should().NotContainKey(Tags.ExecutorOutput, "Output should NOT be logged when EnableSensitiveData is false."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task EnableSensitiveData_LogsMessageSendContentAsync() { // Arrange @@ -474,7 +474,7 @@ public sealed class ObservabilityTests : IDisposable tags.Should().ContainKey(Tags.MessageSourceId, "Source ID should be logged."); } - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task EnableSensitiveData_Disabled_DoesNotLogMessageContentAsync() { // Arrange diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs index 112961c609..f35910f26b 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs @@ -67,7 +67,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable /// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never /// disposed because yield break in async iterators does not trigger using disposal. /// - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task WorkflowRunActivity_IsStopped_LockstepAsync() { // Arrange @@ -111,7 +111,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable /// Verifies that the workflow_invoke Activity is stopped when using the OffThread (Default) /// execution environment (StreamingRunEventStream). /// - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task WorkflowRunActivity_IsStopped_OffThreadAsync() { // Arrange @@ -156,7 +156,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable /// (StreamingRun.WatchStreamAsync) with the OffThread execution environment. /// This matches the exact usage pattern described in the issue. /// - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync() { // Arrange @@ -203,7 +203,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable /// streaming invocation, even when using the same workflow in a multi-turn pattern, /// and that each session gets its own session activity. /// - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsync() { // Arrange @@ -264,7 +264,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable /// Verifies that all started activities (not just workflow_invoke) are properly stopped. /// This ensures no spans are "leaked" without being exported. /// - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task AllActivities_AreStopped_AfterWorkflowCompletionAsync() { // Arrange @@ -305,7 +305,7 @@ public sealed class WorkflowRunActivityStopTests : IDisposable /// be parented under the workflow session span. The run activity should /// still nest correctly under the session. /// - [Fact(Skip = "Flaky test - temporarily disabled.")] + [Fact] public async Task Lockstep_SessionActivity_DoesNotLeak_IntoCaller_ActivityCurrentAsync() { // Arrange From 2ef20cd0aaf0ba7312a812fa517e31ecd4eda94e Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 14 May 2026 15:53:07 -0400 Subject: [PATCH 2/5] .NET: Add Magentic E2E workflow coverage (#5833) * Add E2E test plan for Magentic orchestrator Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/96d76349-1ffd-482b-a3ee-ed208778b1bb Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add MagenticOrchestrationTests.cs scaffold for Magentic E2E tests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/44a4fd8a-3828-40e5-9435-90381aeffdb8 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Fix MagenticOrchestrator output declaration and add first E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/322c9e2d-59bc-42ad-9a1e-f6fd4c866b26 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add plan review test and event emission tests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/322c9e2d-59bc-42ad-9a1e-f6fd4c866b26 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add next speaker validation test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/322c9e2d-59bc-42ad-9a1e-f6fd4c866b26 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add Magentic E2E implementation review Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/b2c60ce7-4d05-4a0d-b05d-d4284f5b7bb3 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add PlanSignoff_Disabled_Proceeds_Immediately E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/6e8bca46-448d-4f21-a7e9-240179571970 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add NextSpeaker_Empty_Falls_Back_To_First E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/6e8bca46-448d-4f21-a7e9-240179571970 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add Task_Completes_After_Multiple_Rounds E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/6e8bca46-448d-4f21-a7e9-240179571970 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add PlanReview_Revised_Triggers_Replan E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/6e8bca46-448d-4f21-a7e9-240179571970 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add MaxRoundLimit_Terminates_Workflow E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/6e8bca46-448d-4f21-a7e9-240179571970 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add MaxStallCount_Triggers_Reset E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/6e8bca46-448d-4f21-a7e9-240179571970 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Update MagenticE2E_ImplementationReview.md with full coverage status Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/6e8bca46-448d-4f21-a7e9-240179571970 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Rewrite Magentic E2E implementation review Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/1f878ef4-61b0-410a-a8bc-ebf618b3e5de Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add MaxResetLimit_Terminates_Workflow E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/aba19507-7c7e-40dd-850d-d1fabb5dfa65 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add PlanReview_On_Stall_Replan E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/aba19507-7c7e-40dd-850d-d1fabb5dfa65 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add Instruction_Message_Sent_When_Present E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/aba19507-7c7e-40dd-850d-d1fabb5dfa65 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Update ImplementationReview.md to reflect 14 tests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/aba19507-7c7e-40dd-850d-d1fabb5dfa65 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Rewrite Magentic E2E implementation review Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/6fe88a80-2e05-40d5-9539-ca7c59b9022b Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add ProgressLedger_Retry_On_Parse_Failure E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/125f6628-6b3b-4c51-9a51-ae84baece6bb Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add ProgressLedger_Max_Retries_Triggers_Reset E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/125f6628-6b3b-4c51-9a51-ae84baece6bb Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add Stall_NoProgress_Increments_StallCount E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/125f6628-6b3b-4c51-9a51-ae84baece6bb Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add PlanReview_Multiple_Revisions E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/125f6628-6b3b-4c51-9a51-ae84baece6bb Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Update ImplementationReview.md to reflect 18 tests and new coverage Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/125f6628-6b3b-4c51-9a51-ae84baece6bb Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Rewrite Magentic E2E implementation review Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/21f3b1ae-183e-4fea-99ad-14efc19f084d Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Preserve IsStalled on stall-triggered plan review requests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/1b9e74e8-69e1-43f2-8467-c5ba963c2622 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Rename isStalled parameter to replanAfterStall for clarity Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/1b9e74e8-69e1-43f2-8467-c5ba963c2622 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add Task_Delegates_To_Correct_Agent E2E test with multi-participant routing assertion Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/9b34e409-61b8-4650-ae55-34efad034ed0 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add Progress_Made_Decrements_StallCount E2E test verifying stall count decrement avoids reset Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/9b34e409-61b8-4650-ae55-34efad034ed0 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add Consecutive_Stalls_Trigger_Reset E2E test for multi-stall threshold reset Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/9b34e409-61b8-4650-ae55-34efad034ed0 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Magentic E2E: preserve IsStalled on stall-triggered plan reviews, add routing/stall tests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/9b34e409-61b8-4650-ae55-34efad034ed0 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Fix replan-on-every-turn: skip plan on agent return; align StallCount to > (match Python) Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/43e46b0d-4263-4353-856a-c3730abb1734 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Update implementation review doc for replan-fix and stall threshold alignment Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/43e46b0d-4263-4353-856a-c3730abb1734 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Rewrite Magentic E2E implementation review Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/3d15763b-3a68-488e-9412-3fa280e083c0 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Update stall docs to use > semantics, skip checkpoint-state tests, simplify NextSpeaker fallback test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/cc9ea5a8-84d8-4b6d-bb60-ac9619824d81 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Rewrite Magentic implementation review Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/ed87670a-bf4d-4ba5-a2f3-395a2eead9de Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add empty-team validation to MagenticWorkflowBuilder.Build() and E2E test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/e490fdf7-f107-4fde-ba1f-efdfd9a729c6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add IsTerminated guard to TakeTurnAsync and post-termination rejection test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/e490fdf7-f107-4fde-ba1f-efdfd9a729c6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Rewrite ImplementationReview.md with final 23-test status Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/e490fdf7-f107-4fde-ba1f-efdfd9a729c6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Add PR description markdown Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/df9b4579-10c3-4bfb-927e-da3a0e70009e Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Remove temporary markdown files Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/b3e67553-a3a3-4282-98f2-afd8ad7a6b5d Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Fix IDE1006: add Async suffix to async test methods in MagenticOrchestrationTests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/629fcc07-865e-4832-9e59-ea13df561c5a Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Update error messages per review comments in MagenticOrchestrator and MagenticWorkflowBuilder Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/053e5ded-81e3-4e56-acf1-2a8a939a04b0 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Escape JSON string values in CreateProgressLedgerResponse test helper Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/ec610c61-0a14-44e2-82fd-1cf35e85d6cc Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> Co-authored-by: Jacob Alber --- .../MagenticWorkflowBuilder.cs | 11 +- .../Magentic/MagenticOrchestrator.cs | 33 +- .../Magentic/MagenticTaskContext.cs | 2 +- .../MagenticOrchestrationTests.cs | 1407 +++++++++++++++++ 4 files changed, 1442 insertions(+), 11 deletions(-) create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticOrchestrationTests.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs index 7c1e801ab1..4470c4ee9a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; @@ -140,7 +141,15 @@ public class MagenticWorkflowBuilder(AIAgent managerAgent) } /// - public Workflow Build() => this.ReduceToWorkflowBuilder().Build(); + public Workflow Build() + { + if (this._team.Count == 0) + { + throw new InvalidOperationException("At least one participant must be added via AddParticipants() before building the workflow."); + } + + return this.ReduceToWorkflowBuilder().Build(); + } private TaskLimits Limits => new( MaxRoundCount: this._maxRounds, diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticOrchestrator.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticOrchestrator.cs index a08e0b542e..30a93c6850 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticOrchestrator.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticOrchestrator.cs @@ -101,6 +101,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta return base.ConfigureProtocol(protocolBuilder) .SendsMessage() .SendsMessage() + .YieldsOutput>() .ConfigureRoutes(ConfigureRoutes); void ConfigureRoutes(RouteBuilder routeBuilder) => routeBuilder.AddPortHandler( @@ -109,7 +110,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta out this._planReviewPort); } - private ValueTask SubmitPlanReviewRequestAsync(MagenticTaskContext taskContext, IWorkflowContext workflowContext) + private ValueTask SubmitPlanReviewRequestAsync(MagenticTaskContext taskContext, IWorkflowContext workflowContext, bool replanAfterStall = false) { MagenticProgressLedger? progressLedger = taskContext.ProgressLedger; if (progressLedger?.IsStarted is not true) @@ -117,7 +118,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta progressLedger = null; } - MagenticPlanReviewRequest request = new(taskContext.TaskLedger!.CurrentPlan, progressLedger, taskContext.IsStalled); + MagenticPlanReviewRequest request = new(taskContext.TaskLedger!.CurrentPlan, progressLedger, replanAfterStall); return this._planReviewPort!.PostRequestAsync(request); } @@ -146,7 +147,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta if (this._taskContext.IsTerminated) { - throw new InvalidOperationException("Magentic Orchestration has already been terminated and cannot process new messages. Please start a new session."); + throw new InvalidOperationException("This Magentic orchestration has already terminated. To process new messages, create a new workflow instance."); } if (response.IsApproved) @@ -161,7 +162,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta } } - private async ValueTask UpdatePlanAndDelegateAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken) + private async ValueTask UpdatePlanAndDelegateAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken, bool replanAfterStall = false) { bool isReplan = taskContext.TaskLedger != null; @@ -177,7 +178,7 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta if (requirePlanSignoff) { - await this.SubmitPlanReviewRequestAsync(taskContext, context).ConfigureAwait(false); + await this.SubmitPlanReviewRequestAsync(taskContext, context, replanAfterStall).ConfigureAwait(false); } else { @@ -187,9 +188,22 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta protected override async ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default) { - // First Turn: Initialize the task context and send the initial messages to the planner agent - this._taskContext ??= new(messages, team, limits, emitEvents, []); - await this.UpdatePlanAndDelegateAsync(this._taskContext, context, cancellationToken).ConfigureAwait(false); + if (this._taskContext?.IsTerminated == true) + { + throw new InvalidOperationException("This Magentic orchestration has already terminated. To process new messages, create a new workflow instance."); + } + + if (this._taskContext == null) + { + // First Turn: Initialize the task context and create the initial plan + this._taskContext = new(messages, team, limits, emitEvents, []); + await this.UpdatePlanAndDelegateAsync(this._taskContext, context, cancellationToken).ConfigureAwait(false); + } + else + { + // Subsequent turns: agent returned control, go directly to coordination (progress ledger only, no replan) + await this.RunCoordinationRoundAsync(this._taskContext, context, cancellationToken).ConfigureAwait(false); + } } private ChatMessage? _fullTaskLedgerMessage; @@ -288,10 +302,11 @@ internal class MagenticOrchestrator(AIAgent managerAgent, List team, Ta private async ValueTask ResetAndReplanAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken) { + bool wasStalled = taskContext.IsStalled; taskContext.Reset(); await context.SendMessageAsync(new ResetChatSignal(), cancellationToken: cancellationToken).ConfigureAwait(false); - await this.UpdatePlanAndDelegateAsync(taskContext, context, cancellationToken).ConfigureAwait(false); + await this.UpdatePlanAndDelegateAsync(taskContext, context, cancellationToken, replanAfterStall: wasStalled).ConfigureAwait(false); } private async ValueTask PrepareFinalAnswerAsync(MagenticTaskContext taskContext, IWorkflowContext context, CancellationToken cancellationToken) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticTaskContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticTaskContext.cs index 0db289126e..36b3a070e2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticTaskContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/Magentic/MagenticTaskContext.cs @@ -58,7 +58,7 @@ internal class MagenticTaskContext(List taskDefinition, List this.TaskCounters.StallCount >= this.TaskLimits.MaxStallCount; + public bool IsStalled => this.TaskCounters.StallCount > this.TaskLimits.MaxStallCount; public (bool HitRoundLimit, bool HitResetLimit) CheckLimits() { diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticOrchestrationTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticOrchestrationTests.cs new file mode 100644 index 0000000000..937e047886 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticOrchestrationTests.cs @@ -0,0 +1,1407 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.InProc; +using Microsoft.Agents.AI.Workflows.Specialized.Magentic; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +/// +/// End-to-end tests for the Magentic orchestrator workflow. +/// +public class MagenticOrchestrationTests +{ + [Fact] + public async Task Task_Completes_When_RequestSatisfiedAsync() + { + // Arrange: Manager reports task satisfied on first coordination round + // Each response must have unique message IDs, so create separate instances + List factsResponse = CreatePlanResponse("Facts about the task"); + List planResponse = CreatePlanResponse("Step 1: Do the task"); + List progressLedgerResponse = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Complete the task"); + List finalAnswerResponse = CreateFinalAnswerResponse("Task completed successfully!"); + + TestReplayAgent manager = new( + [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, "Do the task")]); + + // Assert: Check the result contains the final answer + runResult.Result.Should().NotBeNull(); + runResult.Result.Should().ContainSingle(); + runResult.Result![0].Text.Should().Contain("Task completed successfully!"); + runResult.PendingRequests.Should().BeEmpty(); + } + + [Fact] + public async Task PlanReview_Approved_ProceedsAsync() + { + // Arrange: Human approves initial plan + List factsResponse = CreatePlanResponse("Facts about executing the plan"); + List planResponse = CreatePlanResponse("Step 1: Execute the plan"); + List progressLedgerResponse = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Execute"); + List finalAnswerResponse = CreateFinalAnswerResponse("Plan executed successfully"); + + TestReplayAgent manager = new( + [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(true) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + + // Act: First run - should pause for plan review + WorkflowRunResult firstResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Execute plan")], + checkpointManager: checkpointManager); + + firstResult.PendingRequests.Should().ContainSingle(); + ExternalRequest request = firstResult.PendingRequests[0].Request; + MagenticPlanReviewRequest? reviewRequest = request.Data.As(); + reviewRequest.Should().NotBeNull(); + reviewRequest!.Plan.Text.Should().Contain("Execute the plan"); + + // Act: Resume with approval + MagenticPlanReviewResponse approval = reviewRequest.Approve(); + ExternalResponse response = request.CreateResponse(approval); + WorkflowRunResult secondResult = await ResumeMagenticWorkflowAsync( + workflow, + response, + checkpointManager, + firstResult.LastCheckpoint); + + // Assert + secondResult.Result.Should().NotBeNull(); + secondResult.Result![0].Text.Should().Contain("Plan executed successfully"); + } + + [Fact] + public async Task Initial_Plan_Emits_PlanCreatedEventAsync() + { + // Arrange + List factsResponse = CreatePlanResponse("Facts about the task"); + List planResponse = CreatePlanResponse("Step 1: Initial plan"); + List progressLedgerResponse = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Execute"); + List finalAnswerResponse = CreateFinalAnswerResponse("Done"); + + TestReplayAgent manager = new( + [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")], + eventCollector: collectedEvents); + + // Assert + collectedEvents.OfType().Should().NotBeEmpty(); + MagenticPlanCreatedEvent planEvent = collectedEvents.OfType().First(); + planEvent.FullTaskLedger.Should().NotBeNull(); + } + + [Fact] + public async Task NextSpeaker_Invalid_Triggers_FinalAnswerAsync() + { + // Arrange: ProgressLedger returns invalid next_speaker + List factsResponse = CreatePlanResponse("Facts about the task"); + List planResponse = CreatePlanResponse("Step 1: Execute"); + List invalidNextSpeakerLedger = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "NonExistentAgent", // Invalid - doesn't match any team member + instructionOrQuestion: "Continue"); + List finalAnswer = CreateFinalAnswerResponse("Forced to conclude due to invalid speaker"); + + TestReplayAgent manager = new( + [factsResponse, planResponse, invalidNextSpeakerLedger, finalAnswer], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")], + eventCollector: collectedEvents); + + // Assert: Warning should be emitted and final answer prepared + collectedEvents.OfType() + .Should().Contain(e => e.Data != null && e.Data.ToString()!.Contains("Invalid next speaker")); + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Forced to conclude"); + } + + [Fact] + public async Task ProgressLedger_Updated_Event_EmittedAsync() + { + // Arrange + List factsResponse = CreatePlanResponse("Facts about the task"); + List planResponse = CreatePlanResponse("Step 1: Execute"); + List progressLedgerResponse = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Execute"); + List finalAnswerResponse = CreateFinalAnswerResponse("Done"); + + TestReplayAgent manager = new( + [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")], + eventCollector: collectedEvents); + + // Assert + collectedEvents.OfType().Should().NotBeEmpty(); + MagenticProgressLedgerUpdatedEvent ledgerEvent = collectedEvents.OfType().First(); + ledgerEvent.ProgressLedger.Should().NotBeNull(); + ledgerEvent.ProgressLedger.IsRequestSatisfied.Should().BeTrue(); + } + + [Fact] + public async Task PlanSignoff_Disabled_Proceeds_ImmediatelyAsync() + { + // Arrange: requirePlanSignoff=false should mean no plan review request + List factsResponse = CreatePlanResponse("Task facts"); + List planResponse = CreatePlanResponse("Step 1: Execute immediately"); + List progressLedgerResponse = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Go"); + List finalAnswerResponse = CreateFinalAnswerResponse("Immediate completion"); + + TestReplayAgent manager = new( + [factsResponse, planResponse, progressLedgerResponse, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do it now")], + eventCollector: collectedEvents); + + // Assert: No plan review request, workflow completes immediately + runResult.PendingRequests.Should().BeEmpty("plan signoff is disabled, so no review should be requested"); + collectedEvents.OfType().Should().BeEmpty(); + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Immediate completion"); + } + + [Fact] + public async Task NextSpeaker_Empty_Falls_Back_To_FirstAsync() + { + // Arrange: First progress ledger returns empty next_speaker, which should fall back to first participant. + // Round 1: empty speaker → fallback to Worker (first participant) → Worker echoes + // Round 2 (after Worker responds): RunCoordinationRoundAsync → satisfied ledger → final answer + // Note: No replan on agent return — only progress ledger is created on subsequent turns. + List factsResponse1 = CreatePlanResponse("Facts about the task"); + List planResponse1 = CreatePlanResponse("Step 1: Execute"); + List emptyNextSpeakerLedger = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "", // Empty - should fall back to first participant + instructionOrQuestion: "Please help with this task"); + + // Round 2: satisfied ledger + final answer (no replan on normal agent return) + List satisfiedLedger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Task completed after fallback"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, emptyNextSpeakerLedger, + satisfiedLedger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do the task")], + eventCollector: collectedEvents); + + // Assert: Warning about empty next speaker should be emitted + collectedEvents.OfType() + .Should().Contain(e => e.Data != null && e.Data.ToString()!.Contains("empty")); + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Task completed after fallback"); + } + + [Fact] + public async Task Task_Completes_After_Multiple_RoundsAsync() + { + // Arrange: Round 1 delegates to Worker (not satisfied), round 2 completes + // Manager turn sequence: facts1, plan1, ledger1(not satisfied), ledger2(satisfied), finalAnswer + // Note: No replan on agent return — only progress ledger is created on subsequent turns. + List factsResponse1 = CreatePlanResponse("Initial facts"); + List planResponse1 = CreatePlanResponse("Step 1: Delegate to worker"); + List round1Ledger = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Please work on the task"); + + List round2Ledger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Task is done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Multi-round task completed!"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, round1Ledger, + round2Ledger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Complex multi-round task")], + eventCollector: collectedEvents); + + // Assert: One plan created, one progress ledger per round, final answer + collectedEvents.OfType().Should().HaveCount(2); + collectedEvents.OfType().Should().ContainSingle("only one initial plan, no replan on agent return"); + collectedEvents.OfType().Should().BeEmpty("no replan occurs on normal agent return"); + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Multi-round task completed!"); + } + + [Fact] + public async Task PlanReview_Revised_Triggers_ReplanAsync() + { + // Arrange: Human rejects initial plan with revision, triggering a replan. + // Flow: facts1, plan1 → PlanCreatedEvent → plan review (pending) + // resume with revision → facts2, plan2 → MagenticReplannedEvent → plan review again (pending) + // resume with approval → progressLedger(satisfied) → finalAnswer + List factsResponse1 = CreatePlanResponse("Initial facts"); + List planResponse1 = CreatePlanResponse("Initial plan - needs revision"); + List factsResponse2 = CreatePlanResponse("Revised facts"); + List planResponse2 = CreatePlanResponse("Revised plan - much better"); + List progressLedgerResponse = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Execute revised plan"); + List finalAnswerResponse = CreateFinalAnswerResponse("Revised plan executed successfully"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, factsResponse2, planResponse2, progressLedgerResponse, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(true) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + List allEvents = []; + + // Act 1: First run - should pause for plan review with initial plan + WorkflowRunResult firstResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Execute task")], + checkpointManager: checkpointManager, + eventCollector: allEvents); + + firstResult.PendingRequests.Should().ContainSingle(); + ExternalRequest request1 = firstResult.PendingRequests[0].Request; + MagenticPlanReviewRequest? reviewRequest1 = request1.Data.As(); + reviewRequest1.Should().NotBeNull(); + reviewRequest1!.Plan.Text.Should().Contain("Initial plan"); + + // Act 2: Resume with revision (reject the plan) + MagenticPlanReviewResponse revision = reviewRequest1.Revise("Please include more detail"); + ExternalResponse revisionResponse = request1.CreateResponse(revision); + WorkflowRunResult secondResult = await ResumeMagenticWorkflowAsync( + workflow, + revisionResponse, + checkpointManager, + firstResult.LastCheckpoint, + eventCollector: allEvents); + + // Should pause again for review of the revised plan (stream may include prior request too) + secondResult.PendingRequests.Should().NotBeEmpty(); + ExternalRequest request2 = secondResult.PendingRequests[^1].Request; + MagenticPlanReviewRequest? reviewRequest2 = request2.Data.As(); + reviewRequest2.Should().NotBeNull(); + reviewRequest2!.Plan.Text.Should().Contain("Revised plan"); + + // Act 3: Resume with approval + MagenticPlanReviewResponse approval = reviewRequest2.Approve(); + ExternalResponse approvalResponse = request2.CreateResponse(approval); + WorkflowRunResult thirdResult = await ResumeMagenticWorkflowAsync( + workflow, + approvalResponse, + checkpointManager, + secondResult.LastCheckpoint, + eventCollector: allEvents); + + // Assert: MagenticReplannedEvent should have been emitted, and final answer produced + allEvents.OfType().Should().NotBeEmpty("initial plan emits PlanCreatedEvent"); + allEvents.OfType().Should().NotBeEmpty("revision triggers ReplannedEvent"); + thirdResult.Result.Should().NotBeNull(); + thirdResult.Result![0].Text.Should().Contain("Revised plan executed successfully"); + } + + [Fact] + public async Task MaxRoundLimit_Terminates_WorkflowAsync() + { + // Arrange: MaxRounds=1, so round 1 delegates to Worker, round 2 hits limit and terminates. + // Manager turns: facts1, plan1, ledger1(not satisfied→delegates), then limit hit before ledger. + List factsResponse1 = CreatePlanResponse("Facts"); + List planResponse1 = CreatePlanResponse("Plan"); + List round1Ledger = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Work on it"); + + // No more turns needed: RunCoordinationRoundAsync hits round limit before calling UpdateProgressLedgerAsync + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, round1Ledger], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .WithMaxRounds(1) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")]); + + // Assert: Workflow terminates with round limit message + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("maximum round count limit"); + } + + [Fact] + public async Task MaxStallCount_Triggers_ResetAsync() + { + // Arrange: MaxStallCount=0, so one stall (isInLoop=true, StallCount=1 > 0) triggers ResetAndReplanAsync. + // Flow: facts1, plan1 → round1 ledger(stall: isInLoop=true) → StallCount=1 → IsStalled → Reset + // → facts2, plan2 (replan) → round2 ledger(satisfied) → finalAnswer + List factsResponse1 = CreatePlanResponse("Initial facts"); + List planResponse1 = CreatePlanResponse("Initial plan"); + List stalledLedger = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: true, // This triggers stall + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Keep trying"); + + // After reset: ResetAndReplanAsync → UpdatePlanAndDelegateAsync → new plan + List factsResponse2 = CreatePlanResponse("Fresh facts after reset"); + List planResponse2 = CreatePlanResponse("Fresh plan after reset"); + List satisfiedLedger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after stall reset"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, stalledLedger, + factsResponse2, planResponse2, satisfiedLedger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .WithMaxStalls(0) // One stall triggers reset (StallCount 1 > 0) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")], + eventCollector: collectedEvents); + + // Assert: MagenticReplannedEvent should be emitted (reset triggers replan), final answer produced + collectedEvents.OfType().Should().NotBeEmpty("initial plan created"); + collectedEvents.OfType().Should().NotBeEmpty("stall triggers reset and replan"); + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Recovered after stall reset"); + } + + [Fact] + public async Task Instruction_Message_Sent_When_PresentAsync() + { + // Arrange: Progress ledger has a non-empty instruction_or_question. + // The orchestrator should send the instruction as a ChatMessage before delegating to the next agent. + // After Worker echoes, the second round completes. + List factsResponse1 = CreatePlanResponse("Facts about the task"); + List planResponse1 = CreatePlanResponse("Step 1: Instruct the worker"); + List ledgerWithInstruction = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Please analyze the data carefully"); + + // Round 2 after Worker responds (no replan, just progress ledger) + List satisfiedLedger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Task completed with instruction"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, ledgerWithInstruction, + satisfiedLedger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Analyze data")], + eventCollector: collectedEvents); + + // Assert: The workflow completed successfully, proving the instruction path executed without error. + // The update text should contain the instruction text since it is sent to participants as a ChatMessage. + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Task completed with instruction"); + // Verify the delegation happened (two progress ledger events for two rounds) + collectedEvents.OfType().Should().HaveCount(2); + } + + [Fact] + public async Task PlanReview_On_Stall_ReplanAsync() + { + // Arrange: Plan signoff enabled, stall triggers reset, replan requires new plan review. + // Flow: facts1, plan1 → PlanCreatedEvent → plan review (pending) + // resume with approval → ledger1(stall: isInLoop=true) → StallCount=1 → IsStalled → Reset + // → facts2, plan2 → MagenticReplannedEvent → plan review again (pending) + // resume with approval → ledger2(satisfied) → finalAnswer + List factsResponse1 = CreatePlanResponse("Initial facts"); + List planResponse1 = CreatePlanResponse("Initial plan"); + List stalledLedger = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: true, // This triggers stall + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Keep trying"); + + // After reset: new plan + List factsResponse2 = CreatePlanResponse("Fresh facts after stall reset"); + List planResponse2 = CreatePlanResponse("Fresh plan after stall reset"); + // After second approval: satisfied ledger + final answer + List satisfiedLedger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after stall with plan review"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, stalledLedger, + factsResponse2, planResponse2, satisfiedLedger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(true) + .WithMaxStalls(0) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + List allEvents = []; + + // Act 1: First run - should pause for initial plan review + WorkflowRunResult firstResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")], + checkpointManager: checkpointManager, + eventCollector: allEvents); + + firstResult.PendingRequests.Should().ContainSingle(); + ExternalRequest request1 = firstResult.PendingRequests[0].Request; + MagenticPlanReviewRequest? reviewRequest1 = request1.Data.As(); + reviewRequest1.Should().NotBeNull(); + reviewRequest1!.Plan.Text.Should().Contain("Initial plan"); + reviewRequest1.IsStalled.Should().BeFalse("the initial plan review is not stall-triggered"); + + // Act 2: Approve initial plan → stall occurs → reset → replan → new plan review + MagenticPlanReviewResponse approval1 = reviewRequest1.Approve(); + ExternalResponse approvalResponse1 = request1.CreateResponse(approval1); + WorkflowRunResult secondResult = await ResumeMagenticWorkflowAsync( + workflow, + approvalResponse1, + checkpointManager, + firstResult.LastCheckpoint, + eventCollector: allEvents); + + // Should pause for review of the replanned plan + secondResult.PendingRequests.Should().NotBeEmpty(); + ExternalRequest request2 = secondResult.PendingRequests[^1].Request; + MagenticPlanReviewRequest? reviewRequest2 = request2.Data.As(); + reviewRequest2.Should().NotBeNull(); + reviewRequest2!.Plan.Text.Should().Contain("Fresh plan after stall reset"); + reviewRequest2.IsStalled.Should().BeTrue("the replan was triggered by a stall"); + + // Act 3: Approve the revised plan → satisfied → final answer + MagenticPlanReviewResponse approval2 = reviewRequest2.Approve(); + ExternalResponse approvalResponse2 = request2.CreateResponse(approval2); + WorkflowRunResult thirdResult = await ResumeMagenticWorkflowAsync( + workflow, + approvalResponse2, + checkpointManager, + secondResult.LastCheckpoint, + eventCollector: allEvents); + + // Assert + allEvents.OfType().Should().NotBeEmpty("initial plan emits PlanCreatedEvent"); + allEvents.OfType().Should().NotBeEmpty("stall reset triggers ReplannedEvent"); + thirdResult.Result.Should().NotBeNull(); + thirdResult.Result![0].Text.Should().Contain("Recovered after stall with plan review"); + } + + [Fact] + public async Task MaxResetLimit_Terminates_WorkflowAsync() + { + // Arrange: MaxStallCount=0, MaxResets=1. + // Flow: facts1, plan1 → ledger1(stall: isInLoop=true) → StallCount=1 > 0 → IsStalled → ResetAndReplanAsync + // → ResetCount becomes 1 → facts2, plan2 → DelegateToTeamAsync + // → RunCoordinationRoundAsync: CheckLimits() detects ResetCount(1) >= MaxResetCount(1) → terminates + List factsResponse1 = CreatePlanResponse("Initial facts"); + List planResponse1 = CreatePlanResponse("Initial plan"); + List stalledLedger = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: true, // This triggers stall + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Keep trying"); + + // After reset: ResetAndReplanAsync → UpdatePlanAndDelegateAsync → new plan + List factsResponse2 = CreatePlanResponse("Fresh facts after reset"); + List planResponse2 = CreatePlanResponse("Fresh plan after reset"); + // No more turns needed: RunCoordinationRoundAsync hits reset limit before calling UpdateProgressLedgerAsync + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, stalledLedger, + factsResponse2, planResponse2], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .WithMaxStalls(0) // One stall triggers reset (StallCount 1 > 0) + .WithMaxResets(1) // One reset triggers termination + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")]); + + // Assert: Workflow terminates with reset limit message + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("maximum reset count limit"); + } + + [Fact] + public async Task ProgressLedger_Retry_On_Parse_FailureAsync() + { + // Arrange: First progress ledger attempt returns invalid JSON (triggers parse failure + warning), + // second attempt returns valid JSON (satisfied=true). + // Manager turn sequence: facts, plan, INVALID_JSON, VALID_LEDGER(satisfied), finalAnswer + // MagenticManager.UpdateProgressLedgerAsync retries internally: attempt 0 fails, attempt 1 succeeds. + List factsResponse = CreatePlanResponse("Facts about the task"); + List planResponse = CreatePlanResponse("Step 1: Execute"); + List invalidLedgerResponse = CreatePlanResponse("This is not valid JSON for a progress ledger"); + List validLedgerResponse = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done after retry"); + List finalAnswerResponse = CreateFinalAnswerResponse("Completed after ledger retry"); + + TestReplayAgent manager = new( + [factsResponse, planResponse, invalidLedgerResponse, validLedgerResponse, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")], + eventCollector: collectedEvents); + + // Assert: Warning emitted for parse failure, but workflow completes successfully + collectedEvents.OfType() + .Should().Contain(e => e.Data != null && e.Data.ToString()!.Contains("Progress ledger JSON parse failed")); + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Completed after ledger retry"); + } + + [Fact] + public async Task ProgressLedger_Max_Retries_Triggers_ResetAsync() + { + // Arrange: All 3 progress ledger retry attempts return invalid JSON → exception → ResetAndReplanAsync. + // After reset: new plan, valid ledger (satisfied), final answer. + // Turn sequence: facts1, plan1, invalidJSON×3, facts2, plan2, validLedger(satisfied), finalAnswer + List factsResponse1 = CreatePlanResponse("Initial facts"); + List planResponse1 = CreatePlanResponse("Initial plan"); + List invalidLedger1 = CreatePlanResponse("not json at all"); + List invalidLedger2 = CreatePlanResponse("still not json"); + List invalidLedger3 = CreatePlanResponse("definitely not json"); + + // After reset: ResetAndReplanAsync → UpdatePlanAndDelegateAsync → new plan + List factsResponse2 = CreatePlanResponse("Fresh facts after reset"); + List planResponse2 = CreatePlanResponse("Fresh plan after reset"); + List validLedger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done after reset"); + List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after max retries reset"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, invalidLedger1, invalidLedger2, invalidLedger3, + factsResponse2, planResponse2, validLedger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")], + eventCollector: collectedEvents); + + // Assert: Parse failure warnings emitted, reset triggered (ReplannedEvent), workflow completes + collectedEvents.OfType() + .Where(e => e.Data?.ToString()?.Contains("Progress ledger JSON parse failed") == true) + .Should().HaveCountGreaterThanOrEqualTo(3, "all 3 retry attempts should emit warnings"); + collectedEvents.OfType() + .Should().Contain(e => e.Data != null && e.Data.ToString()!.Contains("triggering reset")); + collectedEvents.OfType().Should().NotBeEmpty("reset triggers replan"); + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Recovered after max retries reset"); + } + + [Fact] + public async Task Stall_NoProgress_Increments_StallCountAsync() + { + // Arrange: MaxStallCount=0, progress ledger reports IsProgressBeingMade=false (not IsInLoop). + // This exercises the alternative stall trigger: !IsProgressBeingMade. + // Flow: facts1, plan1 → ledger1(IsProgressBeingMade=false) → StallCount=1 > 0 → IsStalled → Reset + // → facts2, plan2 (replan) → ledger2(satisfied) → finalAnswer + List factsResponse1 = CreatePlanResponse("Initial facts"); + List planResponse1 = CreatePlanResponse("Initial plan"); + List noProgressLedger = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, // Not in loop + isProgressBeingMade: false, // But no progress → stall + nextSpeaker: "Worker", + instructionOrQuestion: "Keep trying"); + + // After reset: ResetAndReplanAsync → UpdatePlanAndDelegateAsync → new plan + List factsResponse2 = CreatePlanResponse("Fresh facts after no-progress reset"); + List planResponse2 = CreatePlanResponse("Fresh plan after no-progress reset"); + List satisfiedLedger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after no-progress stall"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, noProgressLedger, + factsResponse2, planResponse2, satisfiedLedger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .WithMaxStalls(0) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Do task")], + eventCollector: collectedEvents); + + // Assert: Stall detected via no-progress, reset triggered, replan emitted, workflow completes + collectedEvents.OfType().Should().NotBeEmpty("no-progress stall triggers reset and replan"); + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Recovered after no-progress stall"); + } + + [Fact] + public async Task Task_Delegates_To_Correct_AgentAsync() + { + // Arrange: Two participants (WorkerA, WorkerB). Manager selects "WorkerA" as next speaker. + // We verify that WorkerA produces a response update event and WorkerB does not. + // Flow: facts1, plan1 → ledger1(nextSpeaker=WorkerA, not satisfied) → WorkerA runs + // → RunCoordinationRoundAsync (no replan) → ledger2(satisfied) → finalAnswer + List factsResponse1 = CreatePlanResponse("Task delegation facts"); + List planResponse1 = CreatePlanResponse("Delegate to WorkerA"); + List ledger1 = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "WorkerA", + instructionOrQuestion: "WorkerA please handle this"); + + // After WorkerA responds, orchestrator goes directly to RunCoordinationRoundAsync (no replan) + List ledger2 = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "WorkerA", + instructionOrQuestion: "Done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Delegated correctly!"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, ledger1, + ledger2, finalAnswerResponse], + name: "Manager"); + TestEchoAgent workerA = new(name: "WorkerA", prefix: "[A] "); + TestEchoAgent workerB = new(name: "WorkerB", prefix: "[B] "); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(workerA, workerB) + .RequirePlanSignoff(false) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Test delegation routing")], + eventCollector: collectedEvents); + + // Assert: WorkerA should have produced response updates, WorkerB should not + List agentUpdates = collectedEvents.OfType().ToList(); + + // WorkerA's executor should appear in the events + agentUpdates.Should().Contain(e => e.Update.AuthorName == "WorkerA", + "WorkerA was selected as next speaker and should have responded"); + + // WorkerB should NOT have responded + agentUpdates.Should().NotContain(e => e.Update.AuthorName == "WorkerB", + "WorkerB was not selected and should not have responded"); + + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Delegated correctly!"); + } + + [Fact] + public async Task Progress_Made_Decrements_StallCountAsync() + { + // Arrange: MaxStallCount=3, so a single stall won't trigger reset. + // Round 1: isInLoop=true (stall count → 1), delegates to Worker + // Round 2: progress being made (stall count → 0), delegates to Worker + // Round 3: satisfied → final answer + // No reset should occur because the stall count was decremented before reaching threshold. + List facts1 = CreatePlanResponse("Initial facts"); + List plan1 = CreatePlanResponse("Initial plan"); + List ledger1 = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: true, // triggers stall increment → StallCount=1 + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Keep trying"); + + // After Worker responds → RunCoordinationRoundAsync (no replan) + List ledger2 = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, + isProgressBeingMade: true, // progress → stall count decrements → StallCount=0 + nextSpeaker: "Worker", + instructionOrQuestion: "Good progress"); + + // After Worker responds → RunCoordinationRoundAsync (no replan) + List ledger3 = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "All done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Completed without reset!"); + + TestReplayAgent manager = new( + [facts1, plan1, ledger1, + ledger2, + ledger3, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .WithMaxStalls(3) // high threshold so single stall doesn't trigger reset + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Test stall decrement")], + eventCollector: collectedEvents); + + // Assert: Three progress ledger updates, no stall-triggered reset + collectedEvents.OfType().Should().HaveCount(3, + "three coordination rounds should produce three progress ledger events"); + + // One initial plan, no replans (agent returns go directly to coordination, no replan) + collectedEvents.OfType().Should().ContainSingle( + "only one initial plan should be created"); + collectedEvents.OfType().Should().BeEmpty( + "no replan occurs on normal agent return; stall count never exceeded threshold"); + + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Completed without reset!"); + } + + [Fact] + public async Task Consecutive_Stalls_Trigger_ResetAsync() + { + // Arrange: MaxStallCount=1 — two consecutive stalls trigger reset (StallCount 2 > 1). + // Round 1: isInLoop=true (stall count → 1), delegates to Worker + // Round 2: isProgressBeingMade=false (stall count → 2 > 1 → IsStalled) → reset & replan + // After reset: new plan → ledger(satisfied) → final answer + List facts1 = CreatePlanResponse("Initial facts"); + List plan1 = CreatePlanResponse("Initial plan"); + List ledger1 = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: true, // stall #1 → StallCount=1 + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Keep trying"); + + // After Worker responds → RunCoordinationRoundAsync (no replan) + List ledger2 = CreateProgressLedgerResponse( + isRequestSatisfied: false, + isInLoop: false, + isProgressBeingMade: false, // stall #2 → StallCount=2 > 1 → IsStalled → reset + nextSpeaker: "Worker", + instructionOrQuestion: "No progress"); + + // Reset & replan: new facts + plan, then coordination round + List resetFacts = CreatePlanResponse("Fresh facts after stall reset"); + List resetPlan = CreatePlanResponse("Fresh plan after stall reset"); + List postResetLedger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Recovered after consecutive stalls!"); + + TestReplayAgent manager = new( + [facts1, plan1, ledger1, + ledger2, + resetFacts, resetPlan, postResetLedger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + List collectedEvents = []; + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .WithMaxStalls(1) // requires 2 consecutive stalls (StallCount > 1) + .Build(); + + // Act + WorkflowRunResult runResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Test consecutive stalls")], + eventCollector: collectedEvents); + + // Assert: Two pre-reset coordination rounds + one post-reset round = 3 ledger events + collectedEvents.OfType().Should().HaveCount(3, + "two pre-reset rounds and one post-reset round"); + + // One initial plan + one stall-triggered reset replan (no normal re-entry replans anymore) + collectedEvents.OfType().Should().ContainSingle(); + collectedEvents.OfType().Should().ContainSingle( + "only one replan from stall-triggered reset; no replan on normal agent return"); + + runResult.Result.Should().NotBeNull(); + runResult.Result![0].Text.Should().Contain("Recovered after consecutive stalls!"); + } + + [Fact] + public async Task PlanReview_Multiple_RevisionsAsync() + { + // Arrange: Human rejects the plan twice before approving on the third review. + // Flow: facts1, plan1 → PlanCreatedEvent → plan review (pending) + // resume with revision1 → facts2, plan2 → ReplannedEvent → plan review (pending) + // resume with revision2 → facts3, plan3 → ReplannedEvent → plan review (pending) + // resume with approval → ledger(satisfied) → finalAnswer + List factsResponse1 = CreatePlanResponse("Initial facts"); + List planResponse1 = CreatePlanResponse("Initial plan - too vague"); + List factsResponse2 = CreatePlanResponse("Revised facts v2"); + List planResponse2 = CreatePlanResponse("Revised plan v2 - still needs work"); + List factsResponse3 = CreatePlanResponse("Revised facts v3"); + List planResponse3 = CreatePlanResponse("Revised plan v3 - final version"); + List progressLedgerResponse = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Execute final plan"); + List finalAnswerResponse = CreateFinalAnswerResponse("Completed after multiple revisions"); + + TestReplayAgent manager = new( + [factsResponse1, planResponse1, + factsResponse2, planResponse2, + factsResponse3, planResponse3, + progressLedgerResponse, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(true) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + List allEvents = []; + + // Act 1: First run - should pause for plan review with initial plan + WorkflowRunResult firstResult = await RunMagenticWorkflowAsync( + workflow, + [new ChatMessage(ChatRole.User, "Execute task")], + checkpointManager: checkpointManager, + eventCollector: allEvents); + + firstResult.PendingRequests.Should().ContainSingle(); + ExternalRequest request1 = firstResult.PendingRequests[0].Request; + MagenticPlanReviewRequest? reviewRequest1 = request1.Data.As(); + reviewRequest1.Should().NotBeNull(); + reviewRequest1!.Plan.Text.Should().Contain("Initial plan"); + + // Act 2: Resume with first revision + MagenticPlanReviewResponse revision1 = reviewRequest1.Revise("Too vague, add more detail"); + ExternalResponse revisionResponse1 = request1.CreateResponse(revision1); + WorkflowRunResult secondResult = await ResumeMagenticWorkflowAsync( + workflow, + revisionResponse1, + checkpointManager, + firstResult.LastCheckpoint, + eventCollector: allEvents); + + secondResult.PendingRequests.Should().NotBeEmpty(); + ExternalRequest request2 = secondResult.PendingRequests[^1].Request; + MagenticPlanReviewRequest? reviewRequest2 = request2.Data.As(); + reviewRequest2.Should().NotBeNull(); + reviewRequest2!.Plan.Text.Should().Contain("Revised plan v2"); + + // Act 3: Resume with second revision + MagenticPlanReviewResponse revision2 = reviewRequest2.Revise("Still needs more work on step 3"); + ExternalResponse revisionResponse2 = request2.CreateResponse(revision2); + WorkflowRunResult thirdResult = await ResumeMagenticWorkflowAsync( + workflow, + revisionResponse2, + checkpointManager, + secondResult.LastCheckpoint, + eventCollector: allEvents); + + thirdResult.PendingRequests.Should().NotBeEmpty(); + ExternalRequest request3 = thirdResult.PendingRequests[^1].Request; + MagenticPlanReviewRequest? reviewRequest3 = request3.Data.As(); + reviewRequest3.Should().NotBeNull(); + reviewRequest3!.Plan.Text.Should().Contain("Revised plan v3"); + + // Act 4: Resume with approval + MagenticPlanReviewResponse approval = reviewRequest3.Approve(); + ExternalResponse approvalResponse = request3.CreateResponse(approval); + WorkflowRunResult fourthResult = await ResumeMagenticWorkflowAsync( + workflow, + approvalResponse, + checkpointManager, + thirdResult.LastCheckpoint, + eventCollector: allEvents); + + // Assert: Multiple replan events emitted, final answer produced + allEvents.OfType().Should().NotBeEmpty("initial plan emits PlanCreatedEvent"); + allEvents.OfType().Should().HaveCountGreaterThanOrEqualTo(2, + "two revisions should emit at least two ReplannedEvents"); + fourthResult.Result.Should().NotBeNull(); + fourthResult.Result![0].Text.Should().Contain("Completed after multiple revisions"); + } + + [Fact] + public void Empty_Team_Build_Throws() + { + // Arrange: No participants added to the builder. + TestReplayAgent manager = new( + [CreatePlanResponse("Facts"), CreatePlanResponse("Plan")], + name: "Manager"); + + MagenticWorkflowBuilder builder = new MagenticWorkflowBuilder(manager) + // No .AddParticipants() — empty team + .RequirePlanSignoff(false); + + // Act & Assert: Build() should throw because the team is empty. + Action buildAction = () => builder.Build(); + buildAction.Should().Throw() + .WithMessage("*participant*"); + } + + [Fact] + public async Task Terminated_Context_Rejects_New_MessagesAsync() + { + // Arrange: Run a workflow to completion so IsTerminated=true, then send another message. + // The framework accepts the message (TrySendMessageAsync returns true), but Magentic + // should error out internally, surfacing as a WorkflowErrorEvent. + List factsResponse = CreatePlanResponse("Facts"); + List planResponse = CreatePlanResponse("The plan"); + List satisfiedLedger = CreateProgressLedgerResponse( + isRequestSatisfied: true, + isInLoop: false, + isProgressBeingMade: true, + nextSpeaker: "Worker", + instructionOrQuestion: "Done"); + List finalAnswerResponse = CreateFinalAnswerResponse("Task done"); + + TestReplayAgent manager = new( + [factsResponse, planResponse, satisfiedLedger, finalAnswerResponse], + name: "Manager"); + TestEchoAgent worker = new(name: "Worker"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(worker) + .RequirePlanSignoff(false) + .Build(); + + InProcessExecutionEnvironment environment = ExecutionEnvironment.InProcess_Lockstep + .ToWorkflowExecutionEnvironment() + .WithCheckpointing(CheckpointManager.CreateInMemory()); + + await using StreamingRun run = await environment.OpenStreamingAsync(workflow); + + // Send the initial messages and run to completion + await run.TrySendMessageAsync(new List { new(ChatRole.User, "Do the task") }); + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + + // Drain the stream to completion — workflow yields output and sets IsTerminated=true + WorkflowOutputEvent? output = null; + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false).ConfigureAwait(false)) + { + if (evt is WorkflowOutputEvent o) + { + output = o; + } + } + + output.Should().NotBeNull("workflow should have completed with output"); + + // Act: Send a new message after termination — framework accepts it, but Magentic errors out + bool accepted = await run.TrySendMessageAsync(new List { new(ChatRole.User, "Another message") }); + accepted.Should().BeTrue("framework does not have a terminal state — it always queues messages"); + + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + + // Watch for the error + WorkflowErrorEvent? errorEvent = null; + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false).ConfigureAwait(false)) + { + if (evt is WorkflowErrorEvent e) + { + errorEvent = e; + } + } + + // Assert: Magentic should have rejected the message with an InvalidOperationException + // (may be wrapped in TargetInvocationException by the framework's reflection-based dispatch) + errorEvent.Should().NotBeNull("sending a message after termination should produce a WorkflowErrorEvent"); + Exception actual = errorEvent!.Exception is System.Reflection.TargetInvocationException tie && tie.InnerException != null + ? tie.InnerException + : errorEvent.Exception!; + actual.Should().BeOfType(); + actual.Message.Should().Contain("terminated"); + } + + #region Helper Methods + + private sealed record WorkflowRunResult( + string UpdateText, + List? Result, + CheckpointInfo? LastCheckpoint, + List PendingRequests); + + private static List CreatePlanResponse(string plan) + { + return + [ + new ChatMessage(ChatRole.Assistant, plan) + { + MessageId = Guid.NewGuid().ToString("N"), + CreatedAt = DateTimeOffset.UtcNow + } + ]; + } + + private static List CreateProgressLedgerResponse( + bool isRequestSatisfied, + bool isInLoop, + bool isProgressBeingMade, + string nextSpeaker, + string instructionOrQuestion) + { + string isRequestSatisfiedStr = isRequestSatisfied ? "true" : "false"; + string isInLoopStr = isInLoop ? "true" : "false"; + string isProgressBeingMadeStr = isProgressBeingMade ? "true" : "false"; + string nextSpeakerJson = JsonSerializer.Serialize(nextSpeaker); + string instructionJson = JsonSerializer.Serialize(instructionOrQuestion); + + string ledgerJson = $$""" + { + "is_request_satisfied": { "answer": {{isRequestSatisfiedStr}}, "reason": "test reason" }, + "is_in_loop": { "answer": {{isInLoopStr}}, "reason": "test reason" }, + "is_progress_being_made": { "answer": {{isProgressBeingMadeStr}}, "reason": "test reason" }, + "next_speaker": { "answer": {{nextSpeakerJson}}, "reason": "test reason" }, + "instruction_or_question": { "answer": {{instructionJson}}, "reason": "test reason" } + } + """; + + return + [ + new ChatMessage(ChatRole.Assistant, ledgerJson) + { + MessageId = Guid.NewGuid().ToString("N"), + CreatedAt = DateTimeOffset.UtcNow + } + ]; + } + + private static List CreateFinalAnswerResponse(string answer) + { + return + [ + new ChatMessage(ChatRole.Assistant, answer) + { + MessageId = Guid.NewGuid().ToString("N"), + CreatedAt = DateTimeOffset.UtcNow + } + ]; + } + + private static async Task ResumeMagenticWorkflowAsync( + Workflow workflow, + ExternalResponse response, + CheckpointManager checkpointManager, + CheckpointInfo? fromCheckpoint, + List? eventCollector = null) + { + InProcessExecutionEnvironment environment = ExecutionEnvironment.InProcess_Lockstep + .ToWorkflowExecutionEnvironment() + .WithCheckpointing(checkpointManager); + + await using StreamingRun run = fromCheckpoint != null + ? await environment.ResumeStreamingAsync(workflow, fromCheckpoint) + : await environment.OpenStreamingAsync(workflow); + + await run.SendResponseAsync(response); + + return await ProcessWorkflowRunAsync(run, eventCollector); + } + + private static async Task RunMagenticWorkflowAsync( + Workflow workflow, + List input, + CheckpointManager? checkpointManager = null, + List? eventCollector = null) + { + checkpointManager ??= CheckpointManager.CreateInMemory(); + + InProcessExecutionEnvironment environment = ExecutionEnvironment.InProcess_Lockstep + .ToWorkflowExecutionEnvironment() + .WithCheckpointing(checkpointManager); + + await using StreamingRun run = await environment.OpenStreamingAsync(workflow); + + await run.TrySendMessageAsync(input); + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + + return await ProcessWorkflowRunAsync(run, eventCollector); + } + + private static async Task ProcessWorkflowRunAsync( + StreamingRun run, + List? eventCollector = null) + { + StringBuilder sb = new(); + WorkflowOutputEvent? output = null; + CheckpointInfo? lastCheckpoint = null; + List pendingRequests = []; + + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false).ConfigureAwait(false)) + { + eventCollector?.Add(evt); + + switch (evt) + { + case AgentResponseUpdateEvent responseUpdate: + sb.Append(responseUpdate.Data); + break; + + case RequestInfoEvent requestInfo: + pendingRequests.Add(requestInfo); + break; + + case WorkflowOutputEvent e: + output = e; + break; + + case WorkflowErrorEvent errorEvent: + Assert.Fail($"Workflow execution failed with error: {errorEvent.Exception}"); + break; + + case SuperStepCompletedEvent stepCompleted: + lastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint; + break; + } + } + + return new(sb.ToString(), output?.As>(), lastCheckpoint, pendingRequests); + } + + #endregion +} From 67f3db628046cc01aedf584d18c392e716e015f5 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 14 May 2026 17:38:37 -0400 Subject: [PATCH 3/5] Python: Reject path-traversal context ids in Foundry Hosting Checkpoint Storage (#5851) * Reject path-traversal context ids in foundry workflow checkpoint storage Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/fca3aae6-50eb-4726-8baf-2718217d4e79 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Address PR review feedback: clarify URL-decode comment, isolate test root, add e2e workflow rejection tests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/832f45a6-c01e-4da9-bf85-1ba7b5f302e6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Clarify MSRC repro padding length in regression test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/832f45a6-c01e-4da9-bf85-1ba7b5f302e6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * add E2E http test for checkpoint context id rejection Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/730258ef-2781-4a7d-b7cf-b5c40c11defc Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> Co-authored-by: Jacob Alber --- .../_responses.py | 46 +++- .../foundry_hosting/tests/test_responses.py | 241 +++++++++++++++++- 2 files changed, 284 insertions(+), 3 deletions(-) diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 1645fcec2e..c34c65538c 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -11,6 +11,7 @@ import tempfile import threading from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence from contextlib import suppress +from pathlib import Path from typing import Protocol, cast from agent_framework import ( @@ -205,6 +206,47 @@ class FileBasedFunctionApprovalStorage: return await asyncio.to_thread(self._load_sync, approval_request_id) +def _checkpoint_storage_for_context(root: str, context_id: str) -> FileCheckpointStorage: + """Build a ``FileCheckpointStorage`` for ``context_id`` rooted under ``root``. + + ``context_id`` originates from caller-controlled fields such as + ``previous_response_id`` or from server-generated fields such as + ``conversation_id`` / ``response_id``. In every case it must be treated as + an untrusted single path segment: path separators, drive letters, parent + references and similar would otherwise let the resulting directory escape + the configured checkpoint root (CWE-22). The check resolves the joined + path and verifies it stays under the resolved root before any directory is + created on disk. + """ + if not isinstance(context_id, str) or not context_id: + raise RuntimeError("Invalid checkpoint context id: must be a non-empty string.") + # Reject any segment that is not a single safe path component. This covers + # POSIX/Windows separators, NUL bytes, drive letters, and all-dot segments + # (``.``, ``..``, ``...``, ...). We deliberately do not URL-decode the id + # here: the hosting layer never decodes context ids before joining them, so + # forms such as ``%2e%2e`` are accepted as literal directory names. Do NOT + # add decoding here without re-validating after the decode -- decode-then- + # join is exactly the pattern that reintroduces traversal. We also do not + # attempt to "sanitize" by stripping characters because that can introduce + # collisions between distinct ids. + if ( + "/" in context_id + or "\\" in context_id + or "\x00" in context_id + # All-dot segments (``.``, ``..``, ``...``, ...) reduce to "" after stripping dots. + or context_id.strip(".") == "" + or os.path.isabs(context_id) + or os.path.splitdrive(context_id)[0] + ): + raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}") + + root_path = Path(root).resolve() + storage_path = (root_path / context_id).resolve() + if not storage_path.is_relative_to(root_path): + raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}") + return FileCheckpointStorage(storage_path) + + class ResponsesHostServer(ResponsesAgentServerHost): """A responses server host for an agent.""" @@ -400,7 +442,7 @@ class ResponsesHostServer(ResponsesAgentServerHost): latest_checkpoint_id: str | None = None restore_storage: FileCheckpointStorage | None = None if context_id is not None: - restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) + restore_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, context_id) latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name) if latest_checkpoint is not None: latest_checkpoint_id = latest_checkpoint.checkpoint_id @@ -414,7 +456,7 @@ class ResponsesHostServer(ResponsesAgentServerHost): # supplied, restore_storage points at the *prior* response's # directory and write_storage points at the *current* response's. write_context_id = context.conversation_id or context.response_id - write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id)) + write_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id) # Multi-turn pattern: when we have a prior checkpoint, restore it # first (drive the workflow back to idle with prior state intact), diff --git a/python/packages/foundry_hosting/tests/test_responses.py b/python/packages/foundry_hosting/tests/test_responses.py index a0a6335651..e4d545d6d7 100644 --- a/python/packages/foundry_hosting/tests/test_responses.py +++ b/python/packages/foundry_hosting/tests/test_responses.py @@ -11,7 +11,7 @@ the registered _handle_create handler. from __future__ import annotations import json -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Callable from unittest.mock import AsyncMock, MagicMock import httpx @@ -20,6 +20,7 @@ from agent_framework import ( AgentResponse, AgentResponseUpdate, Content, + FileCheckpointStorage, HistoryProvider, Message, RawAgent, @@ -2652,3 +2653,241 @@ class TestFunctionApprovalRoundTrip: # endregion + + +# region Checkpoint context path validation + + +class TestCheckpointContextPathValidation: + """Regression tests for the path-traversal hardening of checkpoint storage. + + These tests guard against CWE-22 in the workflow hosting path. The hosting + code joins caller-supplied identifiers (``previous_response_id``) and + server-generated identifiers (``conversation_id`` / ``response_id``) under + the configured checkpoint root. Without validation, traversal segments + such as ``../../escape`` or absolute paths cause directory creation + outside the intended root. + """ + + @staticmethod + def _helper() -> Callable[[str, str], FileCheckpointStorage]: + from agent_framework_foundry_hosting._responses import ( # pyright: ignore[reportPrivateUsage] + _checkpoint_storage_for_context, + ) + + return _checkpoint_storage_for_context + + def test_valid_segment_creates_storage_under_root(self, tmp_path: Any) -> None: + helper = self._helper() + root = tmp_path / "root" + root.mkdir() + storage = helper(str(root), "resp_abc123") + assert storage.storage_path.is_dir() + assert storage.storage_path.parent == root.resolve() + + @pytest.mark.parametrize( + "bad_id", + [ + # Original MSRC repro: traversal embedded inside an id-shaped value. + # The 14 ``A``s pad the suffix to mimic the exact length of the + # ``api-made-dir<14-char-suffix>`` segment from the original report. + "caresp_x/../../service-data/api-made-dir" + "A" * 14, + # Variant report repros. + "../../escape", + "..", + ".", + "...", + "/tmp/escape", + "/absolute/path", + "C:\\temp\\escape", + "..\\..\\escape", + "foo\\..\\bar", + "foo/bar", + "with\x00null", + "", + ], + ) + def test_traversal_and_separator_payloads_are_rejected(self, tmp_path: Any, bad_id: str) -> None: + helper = self._helper() + # Use a dedicated root *inside* tmp_path so we can assert that nothing + # was created anywhere under tmp_path (root, siblings, or above). + # Asserting against tmp_path.parent would be flaky under parallel test + # execution because tmp_path.parent is shared across tests. + root = tmp_path / "root" + root.mkdir() + before = sorted(p.name for p in tmp_path.iterdir()) + with pytest.raises(RuntimeError): + helper(str(root), bad_id) + # No sibling/escape directory should have been created next to the root. + after = sorted(p.name for p in tmp_path.iterdir()) + assert before == after, f"Unexpected filesystem artifacts created for payload {bad_id!r}" + # And nothing inside the root either. + assert list(root.iterdir()) == [] + + def test_non_string_context_id_is_rejected(self, tmp_path: Any) -> None: + helper = self._helper() + with pytest.raises(RuntimeError): + helper(str(tmp_path), None) # type: ignore[arg-type] + + def test_url_encoded_traversal_is_treated_as_literal_segment(self, tmp_path: Any) -> None: + """URL-encoded traversal should not decode to traversal at the filesystem layer. + + The hosting layer never URL-decodes ids before using them; the helper + should accept ``%2e%2e`` as a single literal segment (no escape). + """ + helper = self._helper() + root = tmp_path / "root" + root.mkdir() + storage = helper(str(root), "%2e%2e") + assert storage.storage_path.parent == root.resolve() + assert storage.storage_path.name == "%2e%2e" + + @pytest.mark.parametrize( + "context_field,bad_id", + [ + # Restore sink: caller-controlled previous_response_id. + ("previous_response_id", "../../escape"), + ("previous_response_id", "/tmp/escape-abs"), + ("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14), + # Restore sink: server-issued conversation_id (defense in depth). + ("conversation_id", "../../escape"), + # Write sink: malicious response_id (defense in depth). + ("response_id", "../../escape"), + ], + ) + async def test_handle_inner_workflow_rejects_malicious_context_id( + self, tmp_path: Any, context_field: str, bad_id: str + ) -> None: + """End-to-end: ``_handle_inner_workflow`` must reject malicious ids on + both the restore sink (``previous_response_id`` / ``conversation_id``) + and the write sink (``response_id``) without creating any directories. + """ + from unittest.mock import patch + + from agent_framework import WorkflowAgent + from azure.ai.agentserver.responses import ResponseContext + from azure.ai.agentserver.responses.models import CreateResponse + + # Build a mock that satisfies isinstance(agent, WorkflowAgent) and the + # constructor's "no existing checkpointing" guard. + agent = MagicMock(spec=WorkflowAgent) + agent.id = "wf-agent" + agent.name = "wf" + agent.description = "" + agent.context_providers = [] + agent.workflow = MagicMock() + agent.workflow.name = "wf" + agent.workflow._runner_context.has_checkpointing = MagicMock(return_value=False) + + # Constructor inspects WorkflowAgent.workflow internals; bypass setup + # by feeding a configured mock through a normal init. + server = ResponsesHostServer(agent, store=InMemoryResponseProvider()) + # Re-root checkpoint storage at our isolated tmp_path so we can detect + # any escape attempt on the filesystem. + root = tmp_path / "root" + root.mkdir() + server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage] + + # Build a ResponseContext with the malicious id targeting the chosen sink. + kwargs: dict[str, Any] = { + "response_id": "resp_" + "a" * 48, + "mode_flags": MagicMock(), + } + if context_field == "previous_response_id": + request = CreateResponse(model="m", input="hi", previous_response_id=bad_id) + kwargs["previous_response_id"] = bad_id + elif context_field == "conversation_id": + request = CreateResponse(model="m", input="hi") + kwargs["conversation_id"] = bad_id + else: # response_id (write sink) + request = CreateResponse(model="m", input="hi") + kwargs["response_id"] = bad_id + + # Avoid invoking the real input-resolution machinery, which would need + # a configured provider; we never reach the workflow run on rejection. + with patch.object(ResponseContext, "get_input_items", new=AsyncMock(return_value=[])): + context = ResponseContext(**kwargs) + before = sorted(p.name for p in tmp_path.iterdir()) + with pytest.raises(RuntimeError, match="Invalid checkpoint context id"): + async for _ in server._handle_inner_workflow(request, context): # pyright: ignore[reportPrivateUsage] + pass + after = sorted(p.name for p in tmp_path.iterdir()) + + assert before == after, f"Unexpected filesystem artifacts created for {context_field}={bad_id!r}" + assert list(root.iterdir()) == [], f"Checkpoint dir created inside root for {context_field}={bad_id!r}" + + @pytest.mark.parametrize( + "context_field,bad_id", + [ + # Restore sink: caller-controlled previous_response_id. These are + # rejected by request validation (HTTP 400) before the checkpoint + # code is reached. + ("previous_response_id", "../../escape"), + ("previous_response_id", "/tmp/escape-abs"), + ("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14), + # Restore sink: server-issued conversation id (defense in depth). + # Reaches the checkpoint code and is rejected there, surfacing as + # an HTTP 5xx without creating any filesystem artifacts. + ("conversation", "../../escape"), + ("conversation", "/tmp/escape-abs"), + ], + ) + async def test_malicious_context_id_rejected_e2e(self, tmp_path: Any, context_field: str, bad_id: str) -> None: + """End-to-end (ASGI-in-process): malicious context ids must be rejected + through the full HTTP pipeline, and no checkpoint directory may be + created on disk for either the validation-layer rejection + (``previous_response_id``) or the deeper checkpoint-layer rejection + (``conversation``). + + The ``response_id`` write-sink is server-generated and not reachable + via the public HTTP surface, so its defense-in-depth check is covered + by the helper-level test above. + """ + from agent_framework import WorkflowAgent + + # Build a mock that satisfies isinstance(agent, WorkflowAgent) and the + # constructor's "no existing checkpointing" guard. + agent = MagicMock(spec=WorkflowAgent) + agent.id = "wf-agent" + agent.name = "wf" + agent.description = "" + agent.context_providers = [] + agent.workflow = MagicMock() + agent.workflow.name = "wf" + agent.workflow._runner_context.has_checkpointing = MagicMock( # pyright: ignore[reportPrivateUsage] + return_value=False + ) + + server = ResponsesHostServer(agent, store=InMemoryResponseProvider()) + # Re-root checkpoint storage at our isolated tmp_path so we can detect + # any escape attempt on the filesystem. + root = tmp_path / "root" + root.mkdir() + server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage] + + payload: dict[str, Any] = {"model": "m", "input": "hi"} + if context_field == "previous_response_id": + payload["previous_response_id"] = bad_id + else: # conversation + payload["conversation"] = bad_id + + before = sorted(p.name for p in tmp_path.iterdir()) + transport = httpx.ASGITransport(app=server) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + resp = await client.post("/responses", json=payload) + after = sorted(p.name for p in tmp_path.iterdir()) + + # The request must not succeed; either request validation rejects it + # (4xx) or the checkpoint layer raises and the server returns 5xx. + # Either way, no successful response may be produced. + assert resp.status_code >= 400, ( + f"Expected non-2xx for {context_field}={bad_id!r}, got {resp.status_code}: {resp.text[:200]}" + ) + assert before == after, ( + f"Unexpected filesystem artifacts under tmp_path for {context_field}={bad_id!r}: " + f"before={before} after={after}" + ) + assert list(root.iterdir()) == [], f"Checkpoint directory created inside root for {context_field}={bad_id!r}" + + +# endregion From 410268b624d77ac305d39f7faf54abe7351de294 Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Fri, 15 May 2026 05:50:39 +0800 Subject: [PATCH 4/5] Python: forward MCP tool call metadata (#5815) * Python: forward MCP tool call metadata * fix: preserve MCP tool meta after prompt reload --- python/packages/core/agent_framework/_mcp.py | 12 +++-- python/packages/core/tests/core/test_mcp.py | 51 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/python/packages/core/agent_framework/_mcp.py b/python/packages/core/agent_framework/_mcp.py index a7a3f1a796..35ccb1d58a 100644 --- a/python/packages/core/agent_framework/_mcp.py +++ b/python/packages/core/agent_framework/_mcp.py @@ -261,6 +261,7 @@ class MCPTool: self.request_timeout = request_timeout self.client = client self._functions: list[FunctionTool] = [] + self._tool_call_meta_by_name: dict[str, dict[str, Any]] = {} self.is_connected: bool = False self._tools_loaded: bool = False self._prompts_loaded: bool = False @@ -1026,6 +1027,7 @@ class MCPTool: # Track existing function names to prevent duplicates existing_names = {func.name for func in self._functions} + self._tool_call_meta_by_name.clear() params: types.PaginatedRequestParams | None = None while True: @@ -1035,6 +1037,9 @@ class MCPTool: tool_list = await self.session.list_tools(params=params) # type: ignore[union-attr] for tool in tool_list.tools: + if tool.meta is not None: + self._tool_call_meta_by_name[tool.name] = dict(tool.meta) + normalized_name = _normalize_mcp_name(tool.name) local_name = _build_prefixed_mcp_name(normalized_name, self.tool_name_prefix) @@ -1185,14 +1190,15 @@ class MCPTool: } } - # Inject OpenTelemetry trace context into MCP _meta for distributed tracing. - otel_meta = _inject_otel_into_mcp_meta() + # Some MCP proxies require their tools/list metadata to be echoed on tools/call. + tool_meta = self._tool_call_meta_by_name.get(tool_name) + meta = _inject_otel_into_mcp_meta(dict(tool_meta) if tool_meta is not None else None) parser = self.parse_tool_results or self._parse_tool_result_from_mcp # Try the operation, reconnecting once if the connection is closed for attempt in range(2): try: - result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=otel_meta) # type: ignore + result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=meta) # type: ignore if result.isError: parsed = parser(result) text = ( diff --git a/python/packages/core/tests/core/test_mcp.py b/python/packages/core/tests/core/test_mcp.py index cd3173a7d3..0fc5867d79 100644 --- a/python/packages/core/tests/core/test_mcp.py +++ b/python/packages/core/tests/core/test_mcp.py @@ -4194,6 +4194,57 @@ async def test_mcp_tool_call_tool_otel_meta(use_span, expect_traceparent, span_e assert meta is None +async def test_mcp_tool_call_tool_forwards_tool_list_meta(): + """call_tool echoes per-tool metadata returned by tools/list.""" + from opentelemetry import trace + + tool_meta = { + "tool_configuration": { + "name": "WorkIQSharePoint.readSmallBinaryFile", + "type": "foundry_toolbox", + } + } + + class TestServer(MCPTool): + async def connect(self): + self.session = Mock(spec=ClientSession) + self.session.list_tools = AsyncMock( + return_value=types.ListToolsResult( + tools=[ + types.Tool( + name="WorkIQSharePoint.readSmallBinaryFile", + description="Read a binary file", + inputSchema={ + "type": "object", + "properties": {"fileId": {"type": "string"}}, + "required": ["fileId"], + }, + _meta=tool_meta, + ) + ] + ) + ) + self.session.call_tool = AsyncMock( + return_value=types.CallToolResult(content=[types.TextContent(type="text", text="result")]) + ) + self.session.list_prompts = AsyncMock( + return_value=types.ListPromptsResult(prompts=[]) + ) + + def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]: + return None + + server = TestServer(name="test_server") + async with server: + await server.load_tools() + await server.load_prompts() + + with trace.use_span(trace.NonRecordingSpan(trace.INVALID_SPAN_CONTEXT)): + await server.call_tool("WorkIQSharePoint.readSmallBinaryFile", fileId="file-1") + + assert server.session.call_tool.call_args.kwargs["meta"] == tool_meta + + async def test_mcp_streamable_http_tool_hook_not_duplicated_on_repeated_get_mcp_client(): """Test that calling get_mcp_client multiple times does not accumulate duplicate hooks.""" tool = MCPStreamableHTTPTool( From 68357b025011871bded63cccb75b09b824447c5e Mon Sep 17 00:00:00 2001 From: Giles Odigwe <79032838+giles17@users.noreply.github.com> Date: Thu, 14 May 2026 15:28:02 -0700 Subject: [PATCH 5/5] Python: Fix A2A v1.0 non-streaming response and sample runtime issues (#5849) - Fix non-streaming empty response by accumulating intermediate WORKING status updates and flushing them when an empty terminal event arrives - Fix sample agent_executor.py to enqueue Task before status events (required by v1.0 ActiveTask validation) - Fix create_jsonrpc_routes() calls to include required rpc_url param - Fix TYPE_CHECKING imports in sample agent_definitions.py - Add tests for non-streaming content accumulation behavior Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/a2a/AGENTS.md | 2 +- .../a2a/agent_framework_a2a/_a2a_executor.py | 2 +- .../a2a/agent_framework_a2a/_agent.py | 40 +++++++- python/packages/a2a/tests/test_a2a_agent.py | 98 +++++++++++++++++++ python/samples/04-hosting/a2a/a2a_server.py | 2 +- .../04-hosting/a2a/agent_definitions.py | 9 +- .../samples/04-hosting/a2a/agent_executor.py | 75 ++++---------- .../04-hosting/a2a/agent_framework_to_a2a.py | 2 +- 8 files changed, 160 insertions(+), 70 deletions(-) diff --git a/python/packages/a2a/AGENTS.md b/python/packages/a2a/AGENTS.md index f5f7176cb0..d27d6d1c11 100644 --- a/python/packages/a2a/AGENTS.md +++ b/python/packages/a2a/AGENTS.md @@ -42,7 +42,7 @@ request_handler = DefaultRequestHandler( app = Starlette( routes=[ *create_agent_card_routes(my_agent_card), - *create_jsonrpc_routes(request_handler), + *create_jsonrpc_routes(request_handler, "/"), ] ) ``` diff --git a/python/packages/a2a/agent_framework_a2a/_a2a_executor.py b/python/packages/a2a/agent_framework_a2a/_a2a_executor.py index 8e8dd684c9..949ce10167 100644 --- a/python/packages/a2a/agent_framework_a2a/_a2a_executor.py +++ b/python/packages/a2a/agent_framework_a2a/_a2a_executor.py @@ -78,7 +78,7 @@ class A2AExecutor(AgentExecutor): app = Starlette( routes=[ *create_agent_card_routes(public_agent_card), - *create_jsonrpc_routes(request_handler), + *create_jsonrpc_routes(request_handler, "/"), ], ) diff --git a/python/packages/a2a/agent_framework_a2a/_agent.py b/python/packages/a2a/agent_framework_a2a/_agent.py index 93874da391..a6f041ca64 100644 --- a/python/packages/a2a/agent_framework_a2a/_agent.py +++ b/python/packages/a2a/agent_framework_a2a/_agent.py @@ -365,6 +365,10 @@ class A2AAgent(AgentTelemetryLayer, BaseAgent): all_updates: list[AgentResponseUpdate] = [] streamed_artifact_ids_by_task: dict[str, set[str]] = {} + # In non-streaming mode, accumulate intermediate status content so it + # can be surfaced when the terminal event arrives (mirroring v0.3.x + # behavior where the full Task history was available at completion). + pending_updates_by_task: dict[str, list[AgentResponseUpdate]] = {} async for item in a2a_stream: payload_type = item.WhichOneof("payload") if payload_type == "message": @@ -391,27 +395,55 @@ class A2AAgent(AgentTelemetryLayer, BaseAgent): ) if task.status.state in TERMINAL_TASK_STATES: streamed_artifact_ids_by_task.pop(task.id, None) + # If the terminal Task has no content, flush accumulated updates + if not updates or all(not u.contents for u in updates): + pending = pending_updates_by_task.pop(task.id, []) + for update in pending: + all_updates.append(update) + yield update + else: + pending_updates_by_task.pop(task.id, None) for update in updates: all_updates.append(update) yield update elif payload_type == "status_update": status_event = item.status_update updates = self._updates_from_task_update_event(status_event) + is_terminal = status_event.status.state in TERMINAL_TASK_STATES if emit_intermediate: for update in updates: all_updates.append(update) yield update + elif is_terminal: + if updates: + # Terminal event with content — discard accumulated intermediates + pending_updates_by_task.pop(status_event.task_id, None) + for update in updates: + all_updates.append(update) + yield update + else: + # Terminal event with NO content — flush accumulated updates + pending = pending_updates_by_task.pop(status_event.task_id, []) + for update in pending: + all_updates.append(update) + yield update + else: + # Non-streaming intermediate: accumulate for later + if updates: + pending_updates_by_task.setdefault(status_event.task_id, []).extend(updates) elif payload_type == "artifact_update": artifact_event = item.artifact_update updates = self._updates_from_task_update_event(artifact_event) + # Always yield artifact updates — they carry actual response + # content (files, data). Track IDs so that a subsequent + # terminal Task doesn't duplicate the same artifacts. if updates: streamed_artifact_ids_by_task.setdefault(artifact_event.task_id, set()).add( artifact_event.artifact.artifact_id ) - if emit_intermediate: - for update in updates: - all_updates.append(update) - yield update + for update in updates: + all_updates.append(update) + yield update else: raise NotImplementedError(f"Unsupported StreamResponse payload: {payload_type}") diff --git a/python/packages/a2a/tests/test_a2a_agent.py b/python/packages/a2a/tests/test_a2a_agent.py index 9af8c66884..76294f30bf 100644 --- a/python/packages/a2a/tests/test_a2a_agent.py +++ b/python/packages/a2a/tests/test_a2a_agent.py @@ -1570,4 +1570,102 @@ async def test_none_metadata_leaves_additional_properties_empty( assert not response.additional_properties +async def test_non_streaming_terminal_status_update_surfaces_content( + a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient +) -> None: + """Non-streaming run() should surface content from terminal status_update events.""" + completed_msg = A2AMessage( + message_id="msg-complete", + role=A2ARole.ROLE_AGENT, + parts=[Part(text="Done! Here is your answer.")], + ) + status = TaskStatus(state=TaskState.TASK_STATE_COMPLETED, message=completed_msg) + event = TaskStatusUpdateEvent(task_id="task-ts", context_id="ctx-ts", status=status) + mock_a2a_client.responses.append(StreamResponse(status_update=event)) + + response = await a2a_agent.run("Hello") + + assert len(response.messages) == 1 + assert response.messages[0].text == "Done! Here is your answer." + + +async def test_non_streaming_accumulates_working_content_for_empty_terminal( + a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient +) -> None: + """Non-streaming run() accumulates WORKING content and flushes on empty terminal event.""" + # Intermediate WORKING event with content + working_msg = A2AMessage( + message_id="msg-working", + role=A2ARole.ROLE_AGENT, + parts=[Part(text="Here is your answer from working state.")], + ) + working_status = TaskStatus(state=TaskState.TASK_STATE_WORKING, message=working_msg) + working_event = TaskStatusUpdateEvent(task_id="task-acc", context_id="ctx-acc", status=working_status) + mock_a2a_client.responses.append(StreamResponse(status_update=working_event)) + + # Terminal COMPLETED event with NO content + completed_status = TaskStatus(state=TaskState.TASK_STATE_COMPLETED) + completed_event = TaskStatusUpdateEvent(task_id="task-acc", context_id="ctx-acc", status=completed_status) + mock_a2a_client.responses.append(StreamResponse(status_update=completed_event)) + + response = await a2a_agent.run("Hello") + + # The accumulated WORKING content is flushed when terminal arrives empty + assert len(response.messages) == 1 + assert response.messages[0].text == "Here is your answer from working state." + + +async def test_non_streaming_intermediate_discarded_when_terminal_has_content( + a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient +) -> None: + """Non-streaming: if terminal event has content, intermediate content is discarded.""" + # Intermediate WORKING event + working_msg = A2AMessage( + message_id="msg-working", + role=A2ARole.ROLE_AGENT, + parts=[Part(text="Still thinking...")], + ) + working_status = TaskStatus(state=TaskState.TASK_STATE_WORKING, message=working_msg) + working_event = TaskStatusUpdateEvent(task_id="task-wi", context_id="ctx-wi", status=working_status) + mock_a2a_client.responses.append(StreamResponse(status_update=working_event)) + + # Terminal COMPLETED event WITH content + completed_msg = A2AMessage( + message_id="msg-final", + role=A2ARole.ROLE_AGENT, + parts=[Part(text="Final answer")], + ) + completed_status = TaskStatus(state=TaskState.TASK_STATE_COMPLETED, message=completed_msg) + completed_event = TaskStatusUpdateEvent(task_id="task-wi", context_id="ctx-wi", status=completed_status) + mock_a2a_client.responses.append(StreamResponse(status_update=completed_event)) + + response = await a2a_agent.run("Hello") + + # Terminal content supersedes accumulated intermediates + assert len(response.messages) == 1 + assert response.messages[0].text == "Final answer" + + +async def test_non_streaming_artifact_update_surfaces_content( + a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient +) -> None: + """Non-streaming run() should surface content from artifact_update events.""" + artifact = Artifact( + artifact_id="art-ns", + parts=[Part(text="Artifact content")], + ) + event = TaskArtifactUpdateEvent(task_id="task-anu", context_id="ctx-anu", artifact=artifact, append=False) + mock_a2a_client.responses.append(StreamResponse(artifact_update=event)) + + # Terminal task with the same artifact ID — should be deduped + mock_a2a_client.add_task_response("task-anu", [{"id": "art-ns", "content": "Artifact content"}]) + + response = await a2a_agent.run("Hello") + + # Artifact update + terminal task with same artifact ID = content emitted once from + # the artifact_update, then the duplicate from the task is filtered by streamed_artifact_ids + assert len(response.messages) == 1 + assert response.messages[0].text == "Artifact content" + + # endregion diff --git a/python/samples/04-hosting/a2a/a2a_server.py b/python/samples/04-hosting/a2a/a2a_server.py index c299f1e71e..8eaf3a7363 100644 --- a/python/samples/04-hosting/a2a/a2a_server.py +++ b/python/samples/04-hosting/a2a/a2a_server.py @@ -103,7 +103,7 @@ def main() -> None: app = Starlette( routes=[ *create_agent_card_routes(agent_card), - *create_jsonrpc_routes(request_handler), + *create_jsonrpc_routes(request_handler, "/"), ] ) diff --git a/python/samples/04-hosting/a2a/agent_definitions.py b/python/samples/04-hosting/a2a/agent_definitions.py index 32da16b9d2..79f1f1b1ee 100644 --- a/python/samples/04-hosting/a2a/agent_definitions.py +++ b/python/samples/04-hosting/a2a/agent_definitions.py @@ -8,16 +8,11 @@ AgentCards for the invoice, policy, and logistics agent types. from __future__ import annotations -from typing import TYPE_CHECKING - from a2a.types import AgentCapabilities, AgentCard, AgentInterface, AgentSkill +from agent_framework import Agent +from agent_framework.foundry import FoundryChatClient from invoice_data import query_by_invoice_id, query_by_transaction_id, query_invoices -if TYPE_CHECKING: - from agent_framework import Agent - from agent_framework.foundry import FoundryChatClient - - # --------------------------------------------------------------------------- # Agent instructions # --------------------------------------------------------------------------- diff --git a/python/samples/04-hosting/a2a/agent_executor.py b/python/samples/04-hosting/a2a/agent_executor.py index f77f866511..3dcefff09f 100644 --- a/python/samples/04-hosting/a2a/agent_executor.py +++ b/python/samples/04-hosting/a2a/agent_executor.py @@ -10,18 +10,12 @@ published back through the a2a-sdk event queue. from __future__ import annotations import asyncio -import uuid from typing import TYPE_CHECKING +from a2a.helpers import new_task_from_user_message from a2a.server.agent_execution.agent_executor import AgentExecutor -from a2a.types import ( - Message, - Part, - Role, - TaskState, - TaskStatus, - TaskStatusUpdateEvent, -) +from a2a.server.tasks import TaskUpdater +from a2a.types import Part, TaskState if TYPE_CHECKING: from a2a.server.agent_execution.context import RequestContext @@ -47,17 +41,17 @@ class AgentFrameworkExecutor(AgentExecutor): if not user_text: user_text = "Hello" - task_id = context.task_id or str(uuid.uuid4()) - context_id = context.context_id or str(uuid.uuid4()) + # v1.0 requires a Task object in the queue before any TaskStatusUpdateEvent + task = context.current_task + if not task and context.message: + task = new_task_from_user_message(context.message) + await event_queue.enqueue_event(task) + + task_id = task.id if task else context.task_id + updater = TaskUpdater(event_queue, task_id, context.context_id) # Signal that the agent is working - await event_queue.enqueue_event( - TaskStatusUpdateEvent( - task_id=task_id, - context_id=context_id, - status=TaskStatus(state=TaskState.TASK_STATE_WORKING), - ) - ) + await updater.start_work() try: response = await self.agent.run(user_text) @@ -71,48 +65,19 @@ class AgentFrameworkExecutor(AgentExecutor): if not response_parts: response_parts.append(Part(text=str(response))) - # Publish the agent's response as a completed message - await event_queue.enqueue_event( - TaskStatusUpdateEvent( - task_id=task_id, - context_id=context_id, - status=TaskStatus( - state=TaskState.TASK_STATE_COMPLETED, - message=Message( - message_id=str(uuid.uuid4()), - role=Role.ROLE_AGENT, - parts=response_parts, - ), - ), - ) + # Publish the agent's response and mark as completed + await updater.complete( + message=updater.new_agent_message(response_parts), ) except asyncio.CancelledError: raise except Exception as e: - await event_queue.enqueue_event( - TaskStatusUpdateEvent( - task_id=task_id, - context_id=context_id, - status=TaskStatus( - state=TaskState.TASK_STATE_FAILED, - message=Message( - message_id=str(uuid.uuid4()), - role=Role.ROLE_AGENT, - parts=[Part(text=f"Agent error: {e}")], - ), - ), - ) + await updater.update_status( + state=TaskState.TASK_STATE_FAILED, + message=updater.new_agent_message([Part(text=f"Agent error: {e}")]), ) async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: """Handle cancellation by publishing a canceled status.""" - task_id = context.task_id or str(uuid.uuid4()) - context_id = context.context_id or str(uuid.uuid4()) - - await event_queue.enqueue_event( - TaskStatusUpdateEvent( - task_id=task_id, - context_id=context_id, - status=TaskStatus(state=TaskState.TASK_STATE_CANCELED), - ) - ) + updater = TaskUpdater(event_queue, context.task_id, context.context_id) + await updater.update_status(state=TaskState.TASK_STATE_CANCELED) diff --git a/python/samples/04-hosting/a2a/agent_framework_to_a2a.py b/python/samples/04-hosting/a2a/agent_framework_to_a2a.py index 519c907fe5..c29f18ed6d 100644 --- a/python/samples/04-hosting/a2a/agent_framework_to_a2a.py +++ b/python/samples/04-hosting/a2a/agent_framework_to_a2a.py @@ -65,7 +65,7 @@ if __name__ == "__main__": server = Starlette( routes=[ *create_agent_card_routes(public_agent_card), - *create_jsonrpc_routes(request_handler), + *create_jsonrpc_routes(request_handler, "/"), ] )