Drop Workflow.reset(); checkpointing is the recovery path

The in-flight-messages guard prevented silent misbehavior, but the
companion Workflow.reset() escape hatch only cleared _messages while
leaving iteration count, executor-local state, and shared State
mutations in an indeterminate condition after a mid-run failure. That
gave a false sense of recovery.

Recovery from a mid-run failure is supported only via checkpoint
restoration. Keep the guard and reframe its error message accordingly;
remove reset() and its tests.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
alliscode
2026-04-28 13:29:01 -07:00
Unverified
parent 8252ec6fb2
commit 62150df256
2 changed files with 6 additions and 48 deletions
@@ -617,9 +617,10 @@ class Workflow(DictConvertible):
):
raise RuntimeError(
"Cannot start a new run with 'message' while in-flight executor "
"messages remain from a prior run. Either resume from a checkpoint "
"(checkpoint_id=...), wait for the prior run to complete, or call "
"'await workflow.reset()' to drop the pending messages."
"messages remain from a prior run. Resume from a checkpoint "
"(checkpoint_id=...) or wait for the prior run to complete. "
"Workflows that need to recover from a mid-run failure must use "
"checkpointing; there is no in-process recovery path."
)
initial_executor_fn = self._resolve_execution_mode(
@@ -652,30 +653,6 @@ class Workflow(DictConvertible):
self._runner.context.clear_runtime_checkpoint_storage()
self._reset_running_flag()
async def reset(self) -> None:
"""Drop all in-flight executor messages and per-run accounting.
Workflows preserve shared state and pending executor messages
across :meth:`run` calls so that multi-turn callers (e.g.
:class:`WorkflowAgent`) can deliver follow-up turns to the same
instance without losing context. If a prior run aborted (e.g. the
runner raised :class:`WorkflowConvergenceException`) and the
workflow is not checkpointed, those pending messages remain in
the runner context and every future ``run(message=...)`` call
fails with ``RuntimeError`` because of the in-flight-messages
guard. Callers that have no checkpoint to resume from can use
``await workflow.reset()`` as an explicit escape hatch to clear
pending messages and start fresh.
Note: this does NOT clear the workflow ``State`` (use
:meth:`Workflow.run` with a ``checkpoint_id`` for state replay)
and is a no-op while another run is in progress on this instance.
"""
if self._is_running:
raise RuntimeError("Cannot reset a workflow while a run is in progress.")
self._runner.context.reset_for_new_run()
self._runner.reset_iteration_count()
@staticmethod
def _finalize_events(
events: Sequence[WorkflowEvent],
@@ -960,8 +960,8 @@ async def test_workflow_run_inflight_messages_guard(simple_executor: Executor) -
calls. If a prior run aborted before the runner drained those pending
messages (e.g. it raised :class:`WorkflowConvergenceException`), the next
fresh-message call should fail loudly instead of silently mixing the
leftover messages with the new turn. Callers can recover via
:meth:`Workflow.reset`.
leftover messages with the new turn. The supported recovery path is to
resume from a checkpoint; there is no in-process recovery hatch.
"""
workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build()
test_message = WorkflowMessage(data="test", source_id="test", target_id=None)
@@ -977,25 +977,6 @@ async def test_workflow_run_inflight_messages_guard(simple_executor: Executor) -
async for _ in workflow.run(test_message, stream=True):
pass
# ``Workflow.reset`` is the documented escape hatch.
await workflow.reset()
assert not await workflow._runner.context.has_messages()
# After reset, a new run is accepted again.
result = await workflow.run(test_message)
assert result.get_final_state() == WorkflowRunState.IDLE
async def test_workflow_reset_rejects_concurrent_runs(simple_executor: Executor) -> None:
"""``Workflow.reset`` must not stomp on an in-progress run."""
workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build()
workflow._is_running = True
try:
with pytest.raises(RuntimeError, match="run is in progress"):
await workflow.reset()
finally:
workflow._is_running = False
async def test_workflow_run_parameter_validation(simple_executor: Executor) -> None:
"""Test that stream properly validate parameter combinations."""