diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index b43c81b8d7..a7c47c2893 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -617,9 +617,10 @@ class Workflow(DictConvertible): ): raise RuntimeError( "Cannot start a new run with 'message' while in-flight executor " - "messages remain from a prior run. Either resume from a checkpoint " - "(checkpoint_id=...), wait for the prior run to complete, or call " - "'await workflow.reset()' to drop the pending messages." + "messages remain from a prior run. Resume from a checkpoint " + "(checkpoint_id=...) or wait for the prior run to complete. " + "Workflows that need to recover from a mid-run failure must use " + "checkpointing; there is no in-process recovery path." ) initial_executor_fn = self._resolve_execution_mode( @@ -652,30 +653,6 @@ class Workflow(DictConvertible): self._runner.context.clear_runtime_checkpoint_storage() self._reset_running_flag() - async def reset(self) -> None: - """Drop all in-flight executor messages and per-run accounting. - - Workflows preserve shared state and pending executor messages - across :meth:`run` calls so that multi-turn callers (e.g. - :class:`WorkflowAgent`) can deliver follow-up turns to the same - instance without losing context. If a prior run aborted (e.g. the - runner raised :class:`WorkflowConvergenceException`) and the - workflow is not checkpointed, those pending messages remain in - the runner context and every future ``run(message=...)`` call - fails with ``RuntimeError`` because of the in-flight-messages - guard. Callers that have no checkpoint to resume from can use - ``await workflow.reset()`` as an explicit escape hatch to clear - pending messages and start fresh. - - Note: this does NOT clear the workflow ``State`` (use - :meth:`Workflow.run` with a ``checkpoint_id`` for state replay) - and is a no-op while another run is in progress on this instance. - """ - if self._is_running: - raise RuntimeError("Cannot reset a workflow while a run is in progress.") - self._runner.context.reset_for_new_run() - self._runner.reset_iteration_count() - @staticmethod def _finalize_events( events: Sequence[WorkflowEvent], diff --git a/python/packages/core/tests/workflow/test_workflow.py b/python/packages/core/tests/workflow/test_workflow.py index e49e5f0e53..30e81d8fe6 100644 --- a/python/packages/core/tests/workflow/test_workflow.py +++ b/python/packages/core/tests/workflow/test_workflow.py @@ -960,8 +960,8 @@ async def test_workflow_run_inflight_messages_guard(simple_executor: Executor) - calls. If a prior run aborted before the runner drained those pending messages (e.g. it raised :class:`WorkflowConvergenceException`), the next fresh-message call should fail loudly instead of silently mixing the - leftover messages with the new turn. Callers can recover via - :meth:`Workflow.reset`. + leftover messages with the new turn. The supported recovery path is to + resume from a checkpoint; there is no in-process recovery hatch. """ workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build() test_message = WorkflowMessage(data="test", source_id="test", target_id=None) @@ -977,25 +977,6 @@ async def test_workflow_run_inflight_messages_guard(simple_executor: Executor) - async for _ in workflow.run(test_message, stream=True): pass - # ``Workflow.reset`` is the documented escape hatch. - await workflow.reset() - assert not await workflow._runner.context.has_messages() - - # After reset, a new run is accepted again. - result = await workflow.run(test_message) - assert result.get_final_state() == WorkflowRunState.IDLE - - -async def test_workflow_reset_rejects_concurrent_runs(simple_executor: Executor) -> None: - """``Workflow.reset`` must not stomp on an in-progress run.""" - workflow = WorkflowBuilder(start_executor=simple_executor).add_edge(simple_executor, simple_executor).build() - workflow._is_running = True - try: - with pytest.raises(RuntimeError, match="run is in progress"): - await workflow.reset() - finally: - workflow._is_running = False - async def test_workflow_run_parameter_validation(simple_executor: Executor) -> None: """Test that stream properly validate parameter combinations."""