From d08cec5379f2b0cb03e2854890ce5dc4d322fc55 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 15 Jun 2026 09:40:21 -0700 Subject: [PATCH] Addres race condition when stream is dropped midway --- .../agent_framework/_workflows/_workflow.py | 26 +++++--- .../core/tests/workflow/test_workflow.py | 62 +++++++++++++++++++ 2 files changed, 80 insertions(+), 8 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index 09ff6c28c1..fa6daab267 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -838,6 +838,16 @@ class Workflow(DictConvertible): if checkpoint_storage is not None: self._runner.context.set_runtime_checkpoint_storage(checkpoint_storage) + # Capture the weakref instance ``run()`` installed for *this* run. We + # compare by object identity in the finally so a stale finalizer (e.g. + # the caller dropped this stream after partial iteration, then started + # a new run before async-gen finalization throws ``GeneratorExit`` into + # us) does not clobber a successor run's freshly installed weakref. + # ``run()`` runs synchronously and assigns ``self._active_run`` before + # this generator's body is first iterated, so by the time we read it + # here it already points at our own ``ResponseStream``. + my_active_run = self._active_run + try: # Async validation: a fresh-message run is only allowed when the # runner context has fully drained from any prior run. If it still @@ -879,14 +889,14 @@ class Workflow(DictConvertible): continue yield event finally: - # Clear the active-run weakref so a subsequent ``run()`` is allowed. - # ``run()`` set this synchronously after constructing the ResponseStream; - # we clear it here once the run has finished (success, error, early - # close, or partial iteration). This is in-band, so by the time the - # caller's stream is later garbage collected, ``_active_run`` is already - # ``None`` (or has been replaced by a newer run's weakref) - no GC-time - # finalizer is needed. - self._active_run = None + # Clear the active-run weakref so a subsequent ``run()`` is allowed, + # but only if the slot still holds *our* weakref. If the caller + # dropped this stream after partial iteration and a new ``run()`` + # already installed its own weakref before our async-gen finalizer + # ran, ``self._active_run`` now points at the successor; clearing + # it would silently break the successor's concurrency guard. + if self._active_run is my_active_run: + self._active_run = None if checkpoint_storage is not None: self._runner.context.clear_runtime_checkpoint_storage() diff --git a/python/packages/core/tests/workflow/test_workflow.py b/python/packages/core/tests/workflow/test_workflow.py index 9884088cdf..3b1cf48f6a 100644 --- a/python/packages/core/tests/workflow/test_workflow.py +++ b/python/packages/core/tests/workflow/test_workflow.py @@ -933,6 +933,68 @@ async def test_workflow_unawaited_run_coroutine_releases_run_lock() -> None: assert result.get_final_state() == WorkflowRunState.IDLE +async def test_workflow_partial_stream_does_not_clobber_successor_active_run() -> None: + """A stale ``_run_core`` finalizer must not clear a successor's run lock. + + Repro for the GC-finalizer race the user reported: + + 1. Start stream A and consume one event so its body is suspended at a + ``yield``. Its ``finally`` is now armed and will run when the + generator is closed. + 2. Drop stream A and ``gc.collect``. The ``_active_run`` weakref's + referent is gone, so a subsequent ``run()`` will pass the + concurrency guard - but stream A's async-gen finalizer hasn't + actually executed yet (``aclose`` is scheduled on the loop). + 3. Synchronously start stream B; ``run()`` installs a fresh weakref + in ``_active_run``. + 4. Yield to the loop so stream A's stale ``finally`` runs. Without + the identity check it unconditionally writes + ``self._active_run = None``, silently disabling the concurrency + guard for stream B. + """ + executor = IncrementExecutor(id="stale_finalizer_exec", limit=100, increment=1) + workflow = WorkflowBuilder(start_executor=executor).build() + + # Step 1: drive stream A's body until it's suspended at its first yield. + stream_a = workflow.run(NumberMessage(data=0), stream=True) + aiter_a = stream_a.__aiter__() + await aiter_a.__anext__() + + # Step 2: drop stream A; GC invalidates the weakref and schedules + # async-gen close, but does not run the close inline. + del stream_a + del aiter_a + gc.collect() + + # Step 3: synchronously start stream B *before* yielding to the loop, + # so the stale ``aclose`` for stream A hasn't fired yet. + stream_b = workflow.run(NumberMessage(data=0), stream=True) + ref_b = workflow._active_run # type: ignore[attr-defined] + assert ref_b is not None and ref_b() is stream_b + + # Step 4: yield enough times for stream A's scheduled aclose to drive + # its body through ``GeneratorExit`` and into its ``finally``. + for _ in range(5): + await asyncio.sleep(0) + + # With the fix, stream B's reservation is still in place. Without it, + # ``_active_run`` was clobbered to ``None`` and a concurrent run would + # be (incorrectly) accepted. + assert workflow._active_run is ref_b # type: ignore[attr-defined] + with pytest.raises( + WorkflowException, + match="Workflow is already running; concurrent runs are not allowed on the same instance.", + ): + await workflow.run(NumberMessage(data=0)) + + # Tear down stream B without iterating it (its body never started, so + # closing it is a no-op for workflow state). + del stream_b + del ref_b + gc.collect() + await asyncio.sleep(0) + + class _StreamingTestAgent(BaseAgent): """Test agent that supports both streaming and non-streaming modes."""