diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index c11a03e925..fa7d16a31f 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -889,14 +889,19 @@ class DeclarativeActionExecutor(Executor): - dict/Mapping: Used directly as workflow.inputs - str: Converted to {"input": value} - list[Message]: Treated as the agent-facing message contract - (e.g. from WorkflowAgent / as_agent()). The full message list is - stored in ``Conversation.messages``/``Conversation.history`` and - mirrored to ``System.conversations.{id}.messages`` so workflows - that reference ``=Conversation.messages`` (e.g. InvokeAzureAgent) - see the complete history including assistant turns and non-text - content. The last user message's text is also used as the string + (e.g. from WorkflowAgent / as_agent()). The prior conversation + history is stored in ``Conversation.messages``/ + ``Conversation.history`` and mirrored to + ``System.conversations.{id}.messages`` so workflows that + reference ``=Conversation.messages`` (e.g. InvokeAzureAgent) see + assistant turns and other earlier messages, including non-text + content. At the start of a turn this history excludes the current + user message; that message's text is instead used as the string input (``Inputs.input``) and surfaced via ``System.LastMessage*`` - for backward compatibility with simple text-only workflows. + for backward compatibility with simple text-only workflows. Agent + executors are responsible for appending the current user message + to ``Conversation.messages`` immediately before invoking the + inner agent. - DeclarativeMessage: Internal message, no initialization needed - Any other type: Converted via str() to {"input": str(value)} @@ -964,7 +969,21 @@ class DeclarativeActionExecutor(Executor): # invoking the inner agent (matching the first-turn # contract where Conversation.messages holds prior turns # only). - state.set("Inputs.input", last_user_text) + # + # Note: ``state.set("Inputs.input", ...)`` would route to + # the Custom namespace (Inputs is not a recognized top-level + # writable namespace - see DeclarativeWorkflowState.set). + # PowerFx expressions like ``=Workflow.Inputs.input`` / + # ``=inputs.input`` read state_data["Inputs"] directly, so + # we update that dict in place via get_state_data / + # set_state_data. + state_data = state.get_state_data() + inputs_dict = state_data.get("Inputs") + if not isinstance(inputs_dict, dict): + inputs_dict = {} + state_data["Inputs"] = inputs_dict + inputs_dict["input"] = last_user_text + state.set_state_data(state_data) # Trailing non-user messages (e.g. tool results) sandwiched # before the new user message in the trigger are still # appended so later actions see them. diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 4b36ec670e..738318736c 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -272,28 +272,36 @@ class ResponsesHostServer(ResponsesAgentServerHost): raise RuntimeError("Agent is not a workflow agent.") # Determine the latest checkpoint (if any) so we can resume the - # workflow's prior state in the SAME run that delivers the new - # user input. Multi-turn declarative workflows need the workflow's - # internal state (e.g. Conversation.messages, intermediate Local.* - # variables) to survive across user turns; the only place that - # state lives is the workflow checkpoint, so on every turn we - # restore the latest checkpoint and feed the new input back into - # the start executor as a continuation rather than a fresh run. + # workflow's prior state for this turn. The directory is keyed by + # the inbound context id (conversation_id when set, otherwise + # previous_response_id). Multi-turn declarative workflows need the + # workflow's internal state (e.g. Conversation.messages, + # intermediate Local.* variables) to survive across user turns; + # the only place that state lives is the workflow checkpoint, so + # on every turn we restore the latest checkpoint and feed the new + # input back into the start executor as a continuation rather than + # a fresh run. latest_checkpoint_id: str | None = None + restore_storage: FileCheckpointStorage | None = None if context_id is not None: - checkpoint_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) - latest_checkpoint = await checkpoint_storage.get_latest(workflow_name=self._agent.workflow.name) + restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) + latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name) if latest_checkpoint is not None: latest_checkpoint_id = latest_checkpoint.checkpoint_id # Now run the agent with the latest input response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) - # Create / reuse the checkpoint storage that will receive checkpoints - # written during this turn. The directory is keyed by the outer - # conversation id so subsequent turns find the same checkpoint dir. - context_id = context.conversation_id or context.response_id - checkpoint_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) + # Storage that will receive checkpoints written during this turn. + # When the caller chains with previous_response_id, the next turn + # will reference the current response_id as its previous_response_id, + # so new checkpoints must land under the current response_id (or the + # conversation_id when set). When conversation_id is set, this + # matches restore_storage; when only previous_response_id was + # supplied, restore_storage points at the *prior* response's + # directory and write_storage points at the *current* response's. + write_context_id = context.conversation_id or context.response_id + write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id)) yield response_event_stream.emit_created() yield response_event_stream.emit_in_progress() @@ -305,19 +313,19 @@ class ResponsesHostServer(ResponsesAgentServerHost): # restore-only call may yield events from any pending in-flight # work in the checkpoint; we consume those internally here so they # don't surface to the response stream as duplicates. - if latest_checkpoint_id is not None: + if latest_checkpoint_id is not None and restore_storage is not None: if is_streaming_request: async for _ in self._agent.run( stream=True, checkpoint_id=latest_checkpoint_id, - checkpoint_storage=checkpoint_storage, + checkpoint_storage=restore_storage, ): pass else: await self._agent.run( stream=False, checkpoint_id=latest_checkpoint_id, - checkpoint_storage=checkpoint_storage, + checkpoint_storage=restore_storage, ) if not is_streaming_request: @@ -325,7 +333,7 @@ class ResponsesHostServer(ResponsesAgentServerHost): response = await self._agent.run( input_messages, stream=False, - checkpoint_storage=checkpoint_storage, + checkpoint_storage=write_storage, ) for message in response.messages: @@ -333,7 +341,7 @@ class ResponsesHostServer(ResponsesAgentServerHost): async for item in _to_outputs(response_event_stream, content): yield item - await self._delete_not_latest_checkpoints(checkpoint_storage, self._agent.workflow.name) + await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name) yield response_event_stream.emit_completed() return @@ -345,7 +353,7 @@ class ResponsesHostServer(ResponsesAgentServerHost): async for update in self._agent.run( input_messages, stream=True, - checkpoint_storage=checkpoint_storage, + checkpoint_storage=write_storage, ): for content in update.contents: for event in tracker.handle(content): @@ -359,7 +367,7 @@ class ResponsesHostServer(ResponsesAgentServerHost): for event in tracker.close(): yield event - await self._delete_not_latest_checkpoints(checkpoint_storage, self._agent.workflow.name) + await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name) yield response_event_stream.emit_completed() @staticmethod