Address PR review: fix Inputs.input update and checkpoint storage path

- _declarative_base.py: continuation branch was writing 'Inputs.input' via
  state.set, which routes to the Custom namespace and never updates the
  PowerFx-visible Workflow.Inputs.input. Update state_data['Inputs'] in
  place via get_state_data / set_state_data so =Workflow.Inputs.input and
  =inputs.input see the new turn's user text on continuation.
- _declarative_base.py: refresh docstring to clarify that on a list[Message]
  trigger, Conversation.messages excludes the current user message at the
  start of the turn (agent executors append it before invoking the inner
  agent).
- _responses.py: when previous_response_id is supplied (no conversation_id),
  the prior checkpoint lives under <storage>/<previous_response_id> but new
  checkpoints must land under <storage>/<current_response_id> for the next
  turn to find them. Hold onto restore_storage from the get_latest lookup
  and pass it to the restore-only run; pass write_storage (current id) to
  the message-delivery run and to checkpoint cleanup.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
alliscode
2026-04-28 11:27:05 -07:00
Unverified
parent 891c00c909
commit bb312f660d
2 changed files with 56 additions and 29 deletions
@@ -889,14 +889,19 @@ class DeclarativeActionExecutor(Executor):
- dict/Mapping: Used directly as workflow.inputs
- str: Converted to {"input": value}
- list[Message]: Treated as the agent-facing message contract
(e.g. from WorkflowAgent / as_agent()). The full message list is
stored in ``Conversation.messages``/``Conversation.history`` and
mirrored to ``System.conversations.{id}.messages`` so workflows
that reference ``=Conversation.messages`` (e.g. InvokeAzureAgent)
see the complete history including assistant turns and non-text
content. The last user message's text is also used as the string
(e.g. from WorkflowAgent / as_agent()). The prior conversation
history is stored in ``Conversation.messages``/
``Conversation.history`` and mirrored to
``System.conversations.{id}.messages`` so workflows that
reference ``=Conversation.messages`` (e.g. InvokeAzureAgent) see
assistant turns and other earlier messages, including non-text
content. At the start of a turn this history excludes the current
user message; that message's text is instead used as the string
input (``Inputs.input``) and surfaced via ``System.LastMessage*``
for backward compatibility with simple text-only workflows.
for backward compatibility with simple text-only workflows. Agent
executors are responsible for appending the current user message
to ``Conversation.messages`` immediately before invoking the
inner agent.
- DeclarativeMessage: Internal message, no initialization needed
- Any other type: Converted via str() to {"input": str(value)}
@@ -964,7 +969,21 @@ class DeclarativeActionExecutor(Executor):
# invoking the inner agent (matching the first-turn
# contract where Conversation.messages holds prior turns
# only).
state.set("Inputs.input", last_user_text)
#
# Note: ``state.set("Inputs.input", ...)`` would route to
# the Custom namespace (Inputs is not a recognized top-level
# writable namespace - see DeclarativeWorkflowState.set).
# PowerFx expressions like ``=Workflow.Inputs.input`` /
# ``=inputs.input`` read state_data["Inputs"] directly, so
# we update that dict in place via get_state_data /
# set_state_data.
state_data = state.get_state_data()
inputs_dict = state_data.get("Inputs")
if not isinstance(inputs_dict, dict):
inputs_dict = {}
state_data["Inputs"] = inputs_dict
inputs_dict["input"] = last_user_text
state.set_state_data(state_data)
# Trailing non-user messages (e.g. tool results) sandwiched
# before the new user message in the trigger are still
# appended so later actions see them.
@@ -272,28 +272,36 @@ class ResponsesHostServer(ResponsesAgentServerHost):
raise RuntimeError("Agent is not a workflow agent.")
# Determine the latest checkpoint (if any) so we can resume the
# workflow's prior state in the SAME run that delivers the new
# user input. Multi-turn declarative workflows need the workflow's
# internal state (e.g. Conversation.messages, intermediate Local.*
# variables) to survive across user turns; the only place that
# state lives is the workflow checkpoint, so on every turn we
# restore the latest checkpoint and feed the new input back into
# the start executor as a continuation rather than a fresh run.
# workflow's prior state for this turn. The directory is keyed by
# the inbound context id (conversation_id when set, otherwise
# previous_response_id). Multi-turn declarative workflows need the
# workflow's internal state (e.g. Conversation.messages,
# intermediate Local.* variables) to survive across user turns;
# the only place that state lives is the workflow checkpoint, so
# on every turn we restore the latest checkpoint and feed the new
# input back into the start executor as a continuation rather than
# a fresh run.
latest_checkpoint_id: str | None = None
restore_storage: FileCheckpointStorage | None = None
if context_id is not None:
checkpoint_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id))
latest_checkpoint = await checkpoint_storage.get_latest(workflow_name=self._agent.workflow.name)
restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id))
latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name)
if latest_checkpoint is not None:
latest_checkpoint_id = latest_checkpoint.checkpoint_id
# Now run the agent with the latest input
response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model)
# Create / reuse the checkpoint storage that will receive checkpoints
# written during this turn. The directory is keyed by the outer
# conversation id so subsequent turns find the same checkpoint dir.
context_id = context.conversation_id or context.response_id
checkpoint_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id))
# Storage that will receive checkpoints written during this turn.
# When the caller chains with previous_response_id, the next turn
# will reference the current response_id as its previous_response_id,
# so new checkpoints must land under the current response_id (or the
# conversation_id when set). When conversation_id is set, this
# matches restore_storage; when only previous_response_id was
# supplied, restore_storage points at the *prior* response's
# directory and write_storage points at the *current* response's.
write_context_id = context.conversation_id or context.response_id
write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id))
yield response_event_stream.emit_created()
yield response_event_stream.emit_in_progress()
@@ -305,19 +313,19 @@ class ResponsesHostServer(ResponsesAgentServerHost):
# restore-only call may yield events from any pending in-flight
# work in the checkpoint; we consume those internally here so they
# don't surface to the response stream as duplicates.
if latest_checkpoint_id is not None:
if latest_checkpoint_id is not None and restore_storage is not None:
if is_streaming_request:
async for _ in self._agent.run(
stream=True,
checkpoint_id=latest_checkpoint_id,
checkpoint_storage=checkpoint_storage,
checkpoint_storage=restore_storage,
):
pass
else:
await self._agent.run(
stream=False,
checkpoint_id=latest_checkpoint_id,
checkpoint_storage=checkpoint_storage,
checkpoint_storage=restore_storage,
)
if not is_streaming_request:
@@ -325,7 +333,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
response = await self._agent.run(
input_messages,
stream=False,
checkpoint_storage=checkpoint_storage,
checkpoint_storage=write_storage,
)
for message in response.messages:
@@ -333,7 +341,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
async for item in _to_outputs(response_event_stream, content):
yield item
await self._delete_not_latest_checkpoints(checkpoint_storage, self._agent.workflow.name)
await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name)
yield response_event_stream.emit_completed()
return
@@ -345,7 +353,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
async for update in self._agent.run(
input_messages,
stream=True,
checkpoint_storage=checkpoint_storage,
checkpoint_storage=write_storage,
):
for content in update.contents:
for event in tracker.handle(content):
@@ -359,7 +367,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
for event in tracker.close():
yield event
await self._delete_not_latest_checkpoints(checkpoint_storage, self._agent.workflow.name)
await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name)
yield response_event_stream.emit_completed()
@staticmethod