Addres race condition when stream is dropped midway

This commit is contained in:
Tao Chen
2026-06-15 09:40:21 -07:00
Unverified
parent 422f7e7382
commit d08cec5379
2 changed files with 80 additions and 8 deletions
@@ -838,6 +838,16 @@ class Workflow(DictConvertible):
if checkpoint_storage is not None:
self._runner.context.set_runtime_checkpoint_storage(checkpoint_storage)
# Capture the weakref instance ``run()`` installed for *this* run. We
# compare by object identity in the finally so a stale finalizer (e.g.
# the caller dropped this stream after partial iteration, then started
# a new run before async-gen finalization throws ``GeneratorExit`` into
# us) does not clobber a successor run's freshly installed weakref.
# ``run()`` runs synchronously and assigns ``self._active_run`` before
# this generator's body is first iterated, so by the time we read it
# here it already points at our own ``ResponseStream``.
my_active_run = self._active_run
try:
# Async validation: a fresh-message run is only allowed when the
# runner context has fully drained from any prior run. If it still
@@ -879,14 +889,14 @@ class Workflow(DictConvertible):
continue
yield event
finally:
# Clear the active-run weakref so a subsequent ``run()`` is allowed.
# ``run()`` set this synchronously after constructing the ResponseStream;
# we clear it here once the run has finished (success, error, early
# close, or partial iteration). This is in-band, so by the time the
# caller's stream is later garbage collected, ``_active_run`` is already
# ``None`` (or has been replaced by a newer run's weakref) - no GC-time
# finalizer is needed.
self._active_run = None
# Clear the active-run weakref so a subsequent ``run()`` is allowed,
# but only if the slot still holds *our* weakref. If the caller
# dropped this stream after partial iteration and a new ``run()``
# already installed its own weakref before our async-gen finalizer
# ran, ``self._active_run`` now points at the successor; clearing
# it would silently break the successor's concurrency guard.
if self._active_run is my_active_run:
self._active_run = None
if checkpoint_storage is not None:
self._runner.context.clear_runtime_checkpoint_storage()
@@ -933,6 +933,68 @@ async def test_workflow_unawaited_run_coroutine_releases_run_lock() -> None:
assert result.get_final_state() == WorkflowRunState.IDLE
async def test_workflow_partial_stream_does_not_clobber_successor_active_run() -> None:
"""A stale ``_run_core`` finalizer must not clear a successor's run lock.
Repro for the GC-finalizer race the user reported:
1. Start stream A and consume one event so its body is suspended at a
``yield``. Its ``finally`` is now armed and will run when the
generator is closed.
2. Drop stream A and ``gc.collect``. The ``_active_run`` weakref's
referent is gone, so a subsequent ``run()`` will pass the
concurrency guard - but stream A's async-gen finalizer hasn't
actually executed yet (``aclose`` is scheduled on the loop).
3. Synchronously start stream B; ``run()`` installs a fresh weakref
in ``_active_run``.
4. Yield to the loop so stream A's stale ``finally`` runs. Without
the identity check it unconditionally writes
``self._active_run = None``, silently disabling the concurrency
guard for stream B.
"""
executor = IncrementExecutor(id="stale_finalizer_exec", limit=100, increment=1)
workflow = WorkflowBuilder(start_executor=executor).build()
# Step 1: drive stream A's body until it's suspended at its first yield.
stream_a = workflow.run(NumberMessage(data=0), stream=True)
aiter_a = stream_a.__aiter__()
await aiter_a.__anext__()
# Step 2: drop stream A; GC invalidates the weakref and schedules
# async-gen close, but does not run the close inline.
del stream_a
del aiter_a
gc.collect()
# Step 3: synchronously start stream B *before* yielding to the loop,
# so the stale ``aclose`` for stream A hasn't fired yet.
stream_b = workflow.run(NumberMessage(data=0), stream=True)
ref_b = workflow._active_run # type: ignore[attr-defined]
assert ref_b is not None and ref_b() is stream_b
# Step 4: yield enough times for stream A's scheduled aclose to drive
# its body through ``GeneratorExit`` and into its ``finally``.
for _ in range(5):
await asyncio.sleep(0)
# With the fix, stream B's reservation is still in place. Without it,
# ``_active_run`` was clobbered to ``None`` and a concurrent run would
# be (incorrectly) accepted.
assert workflow._active_run is ref_b # type: ignore[attr-defined]
with pytest.raises(
WorkflowException,
match="Workflow is already running; concurrent runs are not allowed on the same instance.",
):
await workflow.run(NumberMessage(data=0))
# Tear down stream B without iterating it (its body never started, so
# closing it is a no-op for workflow state).
del stream_b
del ref_b
gc.collect()
await asyncio.sleep(0)
class _StreamingTestAgent(BaseAgent):
"""Test agent that supports both streaming and non-streaming modes."""