From 67f3db628046cc01aedf584d18c392e716e015f5 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 14 May 2026 17:38:37 -0400 Subject: [PATCH] Python: Reject path-traversal context ids in Foundry Hosting Checkpoint Storage (#5851) * Reject path-traversal context ids in foundry workflow checkpoint storage Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/fca3aae6-50eb-4726-8baf-2718217d4e79 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Address PR review feedback: clarify URL-decode comment, isolate test root, add e2e workflow rejection tests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/832f45a6-c01e-4da9-bf85-1ba7b5f302e6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Clarify MSRC repro padding length in regression test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/832f45a6-c01e-4da9-bf85-1ba7b5f302e6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * add E2E http test for checkpoint context id rejection Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/730258ef-2781-4a7d-b7cf-b5c40c11defc Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> Co-authored-by: Jacob Alber --- .../_responses.py | 46 +++- .../foundry_hosting/tests/test_responses.py | 241 +++++++++++++++++- 2 files changed, 284 insertions(+), 3 deletions(-) diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 1645fcec2e..c34c65538c 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -11,6 +11,7 @@ import tempfile import threading from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence from contextlib import suppress +from pathlib import Path from typing import Protocol, cast from agent_framework import ( @@ -205,6 +206,47 @@ class FileBasedFunctionApprovalStorage: return await asyncio.to_thread(self._load_sync, approval_request_id) +def _checkpoint_storage_for_context(root: str, context_id: str) -> FileCheckpointStorage: + """Build a ``FileCheckpointStorage`` for ``context_id`` rooted under ``root``. + + ``context_id`` originates from caller-controlled fields such as + ``previous_response_id`` or from server-generated fields such as + ``conversation_id`` / ``response_id``. In every case it must be treated as + an untrusted single path segment: path separators, drive letters, parent + references and similar would otherwise let the resulting directory escape + the configured checkpoint root (CWE-22). The check resolves the joined + path and verifies it stays under the resolved root before any directory is + created on disk. + """ + if not isinstance(context_id, str) or not context_id: + raise RuntimeError("Invalid checkpoint context id: must be a non-empty string.") + # Reject any segment that is not a single safe path component. This covers + # POSIX/Windows separators, NUL bytes, drive letters, and all-dot segments + # (``.``, ``..``, ``...``, ...). We deliberately do not URL-decode the id + # here: the hosting layer never decodes context ids before joining them, so + # forms such as ``%2e%2e`` are accepted as literal directory names. Do NOT + # add decoding here without re-validating after the decode -- decode-then- + # join is exactly the pattern that reintroduces traversal. We also do not + # attempt to "sanitize" by stripping characters because that can introduce + # collisions between distinct ids. + if ( + "/" in context_id + or "\\" in context_id + or "\x00" in context_id + # All-dot segments (``.``, ``..``, ``...``, ...) reduce to "" after stripping dots. + or context_id.strip(".") == "" + or os.path.isabs(context_id) + or os.path.splitdrive(context_id)[0] + ): + raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}") + + root_path = Path(root).resolve() + storage_path = (root_path / context_id).resolve() + if not storage_path.is_relative_to(root_path): + raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}") + return FileCheckpointStorage(storage_path) + + class ResponsesHostServer(ResponsesAgentServerHost): """A responses server host for an agent.""" @@ -400,7 +442,7 @@ class ResponsesHostServer(ResponsesAgentServerHost): latest_checkpoint_id: str | None = None restore_storage: FileCheckpointStorage | None = None if context_id is not None: - restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id)) + restore_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, context_id) latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name) if latest_checkpoint is not None: latest_checkpoint_id = latest_checkpoint.checkpoint_id @@ -414,7 +456,7 @@ class ResponsesHostServer(ResponsesAgentServerHost): # supplied, restore_storage points at the *prior* response's # directory and write_storage points at the *current* response's. write_context_id = context.conversation_id or context.response_id - write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id)) + write_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id) # Multi-turn pattern: when we have a prior checkpoint, restore it # first (drive the workflow back to idle with prior state intact), diff --git a/python/packages/foundry_hosting/tests/test_responses.py b/python/packages/foundry_hosting/tests/test_responses.py index a0a6335651..e4d545d6d7 100644 --- a/python/packages/foundry_hosting/tests/test_responses.py +++ b/python/packages/foundry_hosting/tests/test_responses.py @@ -11,7 +11,7 @@ the registered _handle_create handler. from __future__ import annotations import json -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Callable from unittest.mock import AsyncMock, MagicMock import httpx @@ -20,6 +20,7 @@ from agent_framework import ( AgentResponse, AgentResponseUpdate, Content, + FileCheckpointStorage, HistoryProvider, Message, RawAgent, @@ -2652,3 +2653,241 @@ class TestFunctionApprovalRoundTrip: # endregion + + +# region Checkpoint context path validation + + +class TestCheckpointContextPathValidation: + """Regression tests for the path-traversal hardening of checkpoint storage. + + These tests guard against CWE-22 in the workflow hosting path. The hosting + code joins caller-supplied identifiers (``previous_response_id``) and + server-generated identifiers (``conversation_id`` / ``response_id``) under + the configured checkpoint root. Without validation, traversal segments + such as ``../../escape`` or absolute paths cause directory creation + outside the intended root. + """ + + @staticmethod + def _helper() -> Callable[[str, str], FileCheckpointStorage]: + from agent_framework_foundry_hosting._responses import ( # pyright: ignore[reportPrivateUsage] + _checkpoint_storage_for_context, + ) + + return _checkpoint_storage_for_context + + def test_valid_segment_creates_storage_under_root(self, tmp_path: Any) -> None: + helper = self._helper() + root = tmp_path / "root" + root.mkdir() + storage = helper(str(root), "resp_abc123") + assert storage.storage_path.is_dir() + assert storage.storage_path.parent == root.resolve() + + @pytest.mark.parametrize( + "bad_id", + [ + # Original MSRC repro: traversal embedded inside an id-shaped value. + # The 14 ``A``s pad the suffix to mimic the exact length of the + # ``api-made-dir<14-char-suffix>`` segment from the original report. + "caresp_x/../../service-data/api-made-dir" + "A" * 14, + # Variant report repros. + "../../escape", + "..", + ".", + "...", + "/tmp/escape", + "/absolute/path", + "C:\\temp\\escape", + "..\\..\\escape", + "foo\\..\\bar", + "foo/bar", + "with\x00null", + "", + ], + ) + def test_traversal_and_separator_payloads_are_rejected(self, tmp_path: Any, bad_id: str) -> None: + helper = self._helper() + # Use a dedicated root *inside* tmp_path so we can assert that nothing + # was created anywhere under tmp_path (root, siblings, or above). + # Asserting against tmp_path.parent would be flaky under parallel test + # execution because tmp_path.parent is shared across tests. + root = tmp_path / "root" + root.mkdir() + before = sorted(p.name for p in tmp_path.iterdir()) + with pytest.raises(RuntimeError): + helper(str(root), bad_id) + # No sibling/escape directory should have been created next to the root. + after = sorted(p.name for p in tmp_path.iterdir()) + assert before == after, f"Unexpected filesystem artifacts created for payload {bad_id!r}" + # And nothing inside the root either. + assert list(root.iterdir()) == [] + + def test_non_string_context_id_is_rejected(self, tmp_path: Any) -> None: + helper = self._helper() + with pytest.raises(RuntimeError): + helper(str(tmp_path), None) # type: ignore[arg-type] + + def test_url_encoded_traversal_is_treated_as_literal_segment(self, tmp_path: Any) -> None: + """URL-encoded traversal should not decode to traversal at the filesystem layer. + + The hosting layer never URL-decodes ids before using them; the helper + should accept ``%2e%2e`` as a single literal segment (no escape). + """ + helper = self._helper() + root = tmp_path / "root" + root.mkdir() + storage = helper(str(root), "%2e%2e") + assert storage.storage_path.parent == root.resolve() + assert storage.storage_path.name == "%2e%2e" + + @pytest.mark.parametrize( + "context_field,bad_id", + [ + # Restore sink: caller-controlled previous_response_id. + ("previous_response_id", "../../escape"), + ("previous_response_id", "/tmp/escape-abs"), + ("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14), + # Restore sink: server-issued conversation_id (defense in depth). + ("conversation_id", "../../escape"), + # Write sink: malicious response_id (defense in depth). + ("response_id", "../../escape"), + ], + ) + async def test_handle_inner_workflow_rejects_malicious_context_id( + self, tmp_path: Any, context_field: str, bad_id: str + ) -> None: + """End-to-end: ``_handle_inner_workflow`` must reject malicious ids on + both the restore sink (``previous_response_id`` / ``conversation_id``) + and the write sink (``response_id``) without creating any directories. + """ + from unittest.mock import patch + + from agent_framework import WorkflowAgent + from azure.ai.agentserver.responses import ResponseContext + from azure.ai.agentserver.responses.models import CreateResponse + + # Build a mock that satisfies isinstance(agent, WorkflowAgent) and the + # constructor's "no existing checkpointing" guard. + agent = MagicMock(spec=WorkflowAgent) + agent.id = "wf-agent" + agent.name = "wf" + agent.description = "" + agent.context_providers = [] + agent.workflow = MagicMock() + agent.workflow.name = "wf" + agent.workflow._runner_context.has_checkpointing = MagicMock(return_value=False) + + # Constructor inspects WorkflowAgent.workflow internals; bypass setup + # by feeding a configured mock through a normal init. + server = ResponsesHostServer(agent, store=InMemoryResponseProvider()) + # Re-root checkpoint storage at our isolated tmp_path so we can detect + # any escape attempt on the filesystem. + root = tmp_path / "root" + root.mkdir() + server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage] + + # Build a ResponseContext with the malicious id targeting the chosen sink. + kwargs: dict[str, Any] = { + "response_id": "resp_" + "a" * 48, + "mode_flags": MagicMock(), + } + if context_field == "previous_response_id": + request = CreateResponse(model="m", input="hi", previous_response_id=bad_id) + kwargs["previous_response_id"] = bad_id + elif context_field == "conversation_id": + request = CreateResponse(model="m", input="hi") + kwargs["conversation_id"] = bad_id + else: # response_id (write sink) + request = CreateResponse(model="m", input="hi") + kwargs["response_id"] = bad_id + + # Avoid invoking the real input-resolution machinery, which would need + # a configured provider; we never reach the workflow run on rejection. + with patch.object(ResponseContext, "get_input_items", new=AsyncMock(return_value=[])): + context = ResponseContext(**kwargs) + before = sorted(p.name for p in tmp_path.iterdir()) + with pytest.raises(RuntimeError, match="Invalid checkpoint context id"): + async for _ in server._handle_inner_workflow(request, context): # pyright: ignore[reportPrivateUsage] + pass + after = sorted(p.name for p in tmp_path.iterdir()) + + assert before == after, f"Unexpected filesystem artifacts created for {context_field}={bad_id!r}" + assert list(root.iterdir()) == [], f"Checkpoint dir created inside root for {context_field}={bad_id!r}" + + @pytest.mark.parametrize( + "context_field,bad_id", + [ + # Restore sink: caller-controlled previous_response_id. These are + # rejected by request validation (HTTP 400) before the checkpoint + # code is reached. + ("previous_response_id", "../../escape"), + ("previous_response_id", "/tmp/escape-abs"), + ("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14), + # Restore sink: server-issued conversation id (defense in depth). + # Reaches the checkpoint code and is rejected there, surfacing as + # an HTTP 5xx without creating any filesystem artifacts. + ("conversation", "../../escape"), + ("conversation", "/tmp/escape-abs"), + ], + ) + async def test_malicious_context_id_rejected_e2e(self, tmp_path: Any, context_field: str, bad_id: str) -> None: + """End-to-end (ASGI-in-process): malicious context ids must be rejected + through the full HTTP pipeline, and no checkpoint directory may be + created on disk for either the validation-layer rejection + (``previous_response_id``) or the deeper checkpoint-layer rejection + (``conversation``). + + The ``response_id`` write-sink is server-generated and not reachable + via the public HTTP surface, so its defense-in-depth check is covered + by the helper-level test above. + """ + from agent_framework import WorkflowAgent + + # Build a mock that satisfies isinstance(agent, WorkflowAgent) and the + # constructor's "no existing checkpointing" guard. + agent = MagicMock(spec=WorkflowAgent) + agent.id = "wf-agent" + agent.name = "wf" + agent.description = "" + agent.context_providers = [] + agent.workflow = MagicMock() + agent.workflow.name = "wf" + agent.workflow._runner_context.has_checkpointing = MagicMock( # pyright: ignore[reportPrivateUsage] + return_value=False + ) + + server = ResponsesHostServer(agent, store=InMemoryResponseProvider()) + # Re-root checkpoint storage at our isolated tmp_path so we can detect + # any escape attempt on the filesystem. + root = tmp_path / "root" + root.mkdir() + server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage] + + payload: dict[str, Any] = {"model": "m", "input": "hi"} + if context_field == "previous_response_id": + payload["previous_response_id"] = bad_id + else: # conversation + payload["conversation"] = bad_id + + before = sorted(p.name for p in tmp_path.iterdir()) + transport = httpx.ASGITransport(app=server) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + resp = await client.post("/responses", json=payload) + after = sorted(p.name for p in tmp_path.iterdir()) + + # The request must not succeed; either request validation rejects it + # (4xx) or the checkpoint layer raises and the server returns 5xx. + # Either way, no successful response may be produced. + assert resp.status_code >= 400, ( + f"Expected non-2xx for {context_field}={bad_id!r}, got {resp.status_code}: {resp.text[:200]}" + ) + assert before == after, ( + f"Unexpected filesystem artifacts under tmp_path for {context_field}={bad_id!r}: " + f"before={before} after={after}" + ) + assert list(root.iterdir()) == [], f"Checkpoint directory created inside root for {context_field}={bad_id!r}" + + +# endregion