diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 457116c65c..3ee1c10690 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -972,7 +972,6 @@ class MagenticOrchestratorExecutor(Executor): self._require_plan_signoff = require_plan_signoff self._plan_review_round = 0 self._max_plan_review_rounds = max_plan_review_rounds - self._inner_loop_lock = asyncio.Lock() # Registry of agent executors for internal coordination (e.g., resets) self._agent_executors = {} # Terminal state marker to stop further processing after completion/limits @@ -1104,8 +1103,6 @@ class MagenticOrchestratorExecutor(Executor): task=message.task, participant_descriptions=self._participants, ) - # Record the original user task in orchestrator context (no broadcast) - self._context.chat_history.append(message.task) self._state_restored = True # Non-streaming callback for the orchestrator receipt of the task if self._message_callback: @@ -1317,10 +1314,10 @@ class MagenticOrchestratorExecutor(Executor): """Run the inner orchestration loop. Coordination phase. Serialized with a lock.""" if self._context is None or self._task_ledger is None: raise RuntimeError("Context or task ledger not initialized") - async with self._inner_loop_lock: - await self._run_inner_loop_locked(context) - async def _run_inner_loop_locked( + await self._run_inner_loop_helper(context) + + async def _run_inner_loop_helper( self, context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage, ChatMessage], ) -> None: diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index bb6984cc74..b52449a928 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -344,9 +344,10 @@ async def test_magentic_checkpoint_resume_round_trip(): assert orchestrator._context is not None # type: ignore[reportPrivateUsage] assert orchestrator._context.chat_history # type: ignore[reportPrivateUsage] - assert orchestrator._context.chat_history[0].text == task_text # type: ignore[reportPrivateUsage] assert orchestrator._task_ledger is not None # type: ignore[reportPrivateUsage] assert manager2.task_ledger is not None + # Initial message should be the task ledger plan + assert orchestrator._context.chat_history[0].text == orchestrator._task_ledger.text # type: ignore[reportPrivateUsage] class _DummyExec(Executor): @@ -690,3 +691,47 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames(): responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)}, ): pass + + +class NotProgressingManager(MagenticManagerBase): + """ + A manager that never marks progress being made, to test stall/reset limits. + """ + + async def plan(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=Role.ASSISTANT, text="ledger") + + async def replan(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=Role.ASSISTANT, text="re-ledger") + + async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger: + return MagenticProgressLedger( + is_request_satisfied=MagenticProgressLedgerItem(reason="r", answer=False), + is_in_loop=MagenticProgressLedgerItem(reason="r", answer=True), + is_progress_being_made=MagenticProgressLedgerItem(reason="r", answer=False), + next_speaker=MagenticProgressLedgerItem(reason="r", answer="agentA"), + instruction_or_question=MagenticProgressLedgerItem(reason="r", answer="done"), + ) + + async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=Role.ASSISTANT, text="final") + + +async def test_magentic_stall_and_reset_successfully(): + manager = NotProgressingManager(max_round_count=10, max_stall_count=0, max_reset_count=1) + + wf = MagenticBuilder().participants(agentA=_DummyExec("agentA")).with_standard_manager(manager).build() + + events: list[WorkflowEvent] = [] + async for ev in wf.run_stream("test limits"): + events.append(ev) + + idle_status = next( + (e for e in events if isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE), None + ) + assert idle_status is not None + output_event = next((e for e in events if isinstance(e, WorkflowOutputEvent)), None) + assert output_event is not None + assert isinstance(output_event.data, ChatMessage) + assert output_event.data.text is not None + assert output_event.data.text == "re-ledger"