mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Python: Fix deadlock in Magentic workflow (#1325)
* Fix deadlock in Magentic workflow * Fix unit test
This commit is contained in:
committed by
GitHub
Unverified
parent
15afc966ce
commit
2397795c1d
@@ -972,7 +972,6 @@ class MagenticOrchestratorExecutor(Executor):
|
||||
self._require_plan_signoff = require_plan_signoff
|
||||
self._plan_review_round = 0
|
||||
self._max_plan_review_rounds = max_plan_review_rounds
|
||||
self._inner_loop_lock = asyncio.Lock()
|
||||
# Registry of agent executors for internal coordination (e.g., resets)
|
||||
self._agent_executors = {}
|
||||
# Terminal state marker to stop further processing after completion/limits
|
||||
@@ -1104,8 +1103,6 @@ class MagenticOrchestratorExecutor(Executor):
|
||||
task=message.task,
|
||||
participant_descriptions=self._participants,
|
||||
)
|
||||
# Record the original user task in orchestrator context (no broadcast)
|
||||
self._context.chat_history.append(message.task)
|
||||
self._state_restored = True
|
||||
# Non-streaming callback for the orchestrator receipt of the task
|
||||
if self._message_callback:
|
||||
@@ -1317,10 +1314,10 @@ class MagenticOrchestratorExecutor(Executor):
|
||||
"""Run the inner orchestration loop. Coordination phase. Serialized with a lock."""
|
||||
if self._context is None or self._task_ledger is None:
|
||||
raise RuntimeError("Context or task ledger not initialized")
|
||||
async with self._inner_loop_lock:
|
||||
await self._run_inner_loop_locked(context)
|
||||
|
||||
async def _run_inner_loop_locked(
|
||||
await self._run_inner_loop_helper(context)
|
||||
|
||||
async def _run_inner_loop_helper(
|
||||
self,
|
||||
context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage, ChatMessage],
|
||||
) -> None:
|
||||
|
||||
@@ -344,9 +344,10 @@ async def test_magentic_checkpoint_resume_round_trip():
|
||||
|
||||
assert orchestrator._context is not None # type: ignore[reportPrivateUsage]
|
||||
assert orchestrator._context.chat_history # type: ignore[reportPrivateUsage]
|
||||
assert orchestrator._context.chat_history[0].text == task_text # type: ignore[reportPrivateUsage]
|
||||
assert orchestrator._task_ledger is not None # type: ignore[reportPrivateUsage]
|
||||
assert manager2.task_ledger is not None
|
||||
# Initial message should be the task ledger plan
|
||||
assert orchestrator._context.chat_history[0].text == orchestrator._task_ledger.text # type: ignore[reportPrivateUsage]
|
||||
|
||||
|
||||
class _DummyExec(Executor):
|
||||
@@ -690,3 +691,47 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames():
|
||||
responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)},
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class NotProgressingManager(MagenticManagerBase):
|
||||
"""
|
||||
A manager that never marks progress being made, to test stall/reset limits.
|
||||
"""
|
||||
|
||||
async def plan(self, magentic_context: MagenticContext) -> ChatMessage:
|
||||
return ChatMessage(role=Role.ASSISTANT, text="ledger")
|
||||
|
||||
async def replan(self, magentic_context: MagenticContext) -> ChatMessage:
|
||||
return ChatMessage(role=Role.ASSISTANT, text="re-ledger")
|
||||
|
||||
async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger:
|
||||
return MagenticProgressLedger(
|
||||
is_request_satisfied=MagenticProgressLedgerItem(reason="r", answer=False),
|
||||
is_in_loop=MagenticProgressLedgerItem(reason="r", answer=True),
|
||||
is_progress_being_made=MagenticProgressLedgerItem(reason="r", answer=False),
|
||||
next_speaker=MagenticProgressLedgerItem(reason="r", answer="agentA"),
|
||||
instruction_or_question=MagenticProgressLedgerItem(reason="r", answer="done"),
|
||||
)
|
||||
|
||||
async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage:
|
||||
return ChatMessage(role=Role.ASSISTANT, text="final")
|
||||
|
||||
|
||||
async def test_magentic_stall_and_reset_successfully():
|
||||
manager = NotProgressingManager(max_round_count=10, max_stall_count=0, max_reset_count=1)
|
||||
|
||||
wf = MagenticBuilder().participants(agentA=_DummyExec("agentA")).with_standard_manager(manager).build()
|
||||
|
||||
events: list[WorkflowEvent] = []
|
||||
async for ev in wf.run_stream("test limits"):
|
||||
events.append(ev)
|
||||
|
||||
idle_status = next(
|
||||
(e for e in events if isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE), None
|
||||
)
|
||||
assert idle_status is not None
|
||||
output_event = next((e for e in events if isinstance(e, WorkflowOutputEvent)), None)
|
||||
assert output_event is not None
|
||||
assert isinstance(output_event.data, ChatMessage)
|
||||
assert output_event.data.text is not None
|
||||
assert output_event.data.text == "re-ledger"
|
||||
|
||||
Reference in New Issue
Block a user