Reject path-traversal context ids in foundry workflow checkpoint storage

Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/fca3aae6-50eb-4726-8baf-2718217d4e79

Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-05-14 13:55:16 +00:00
committed by GitHub
Unverified
parent 2d83a9b10d
commit 2b10c2a0bc
2 changed files with 122 additions and 3 deletions
@@ -11,6 +11,7 @@ import tempfile
import threading
from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence
from contextlib import suppress
from pathlib import Path
from typing import Protocol, cast
from agent_framework import (
@@ -205,6 +206,44 @@ class FileBasedFunctionApprovalStorage:
return await asyncio.to_thread(self._load_sync, approval_request_id)
def _checkpoint_storage_for_context(root: str, context_id: str) -> FileCheckpointStorage:
"""Build a ``FileCheckpointStorage`` for ``context_id`` rooted under ``root``.
``context_id`` originates from caller-controlled fields such as
``previous_response_id`` or from server-generated fields such as
``conversation_id`` / ``response_id``. In every case it must be treated as
an untrusted single path segment: path separators, drive letters, parent
references and similar would otherwise let the resulting directory escape
the configured checkpoint root (CWE-22). The check resolves the joined
path and verifies it stays under the resolved root before any directory is
created on disk.
"""
if not isinstance(context_id, str) or not context_id:
raise RuntimeError("Invalid checkpoint context id: must be a non-empty string.")
# Reject any segment that is not a single safe path component. This covers
# POSIX/Windows separators, NUL bytes, drive letters, all-dot segments
# (``.``, ``..``, ``...``, ...), and embedded URL-encoded forms once
# decoded by the framework. We deliberately do not attempt to "sanitize"
# by stripping characters because that can introduce collisions between
# distinct ids.
if (
"/" in context_id
or "\\" in context_id
or "\x00" in context_id
# All-dot segments (``.``, ``..``, ``...``, ...) reduce to "" after stripping dots.
or context_id.strip(".") == ""
or os.path.isabs(context_id)
or os.path.splitdrive(context_id)[0]
):
raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}")
root_path = Path(root).resolve()
storage_path = (root_path / context_id).resolve()
if not storage_path.is_relative_to(root_path):
raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}")
return FileCheckpointStorage(storage_path)
class ResponsesHostServer(ResponsesAgentServerHost):
"""A responses server host for an agent."""
@@ -400,7 +439,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
latest_checkpoint_id: str | None = None
restore_storage: FileCheckpointStorage | None = None
if context_id is not None:
restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id))
restore_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, context_id)
latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name)
if latest_checkpoint is not None:
latest_checkpoint_id = latest_checkpoint.checkpoint_id
@@ -414,7 +453,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
# supplied, restore_storage points at the *prior* response's
# directory and write_storage points at the *current* response's.
write_context_id = context.conversation_id or context.response_id
write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id))
write_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id)
# Multi-turn pattern: when we have a prior checkpoint, restore it
# first (drive the workflow back to idle with prior state intact),
@@ -11,7 +11,7 @@ the registered _handle_create handler.
from __future__ import annotations
import json
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Callable
from unittest.mock import AsyncMock, MagicMock
import httpx
@@ -20,6 +20,7 @@ from agent_framework import (
AgentResponse,
AgentResponseUpdate,
Content,
FileCheckpointStorage,
HistoryProvider,
Message,
RawAgent,
@@ -2652,3 +2653,82 @@ class TestFunctionApprovalRoundTrip:
# endregion
# region Checkpoint context path validation
class TestCheckpointContextPathValidation:
"""Regression tests for the path-traversal hardening of checkpoint storage.
These tests guard against CWE-22 in the workflow hosting path. The hosting
code joins caller-supplied identifiers (``previous_response_id``) and
server-generated identifiers (``conversation_id`` / ``response_id``) under
the configured checkpoint root. Without validation, traversal segments
such as ``../../escape`` or absolute paths cause directory creation
outside the intended root.
"""
@staticmethod
def _helper() -> Callable[[str, str], FileCheckpointStorage]:
from agent_framework_foundry_hosting._responses import ( # pyright: ignore[reportPrivateUsage]
_checkpoint_storage_for_context,
)
return _checkpoint_storage_for_context
def test_valid_segment_creates_storage_under_root(self, tmp_path: Any) -> None:
helper = self._helper()
storage = helper(str(tmp_path), "resp_abc123")
assert storage.storage_path.is_dir()
assert storage.storage_path.parent == tmp_path.resolve()
@pytest.mark.parametrize(
"bad_id",
[
# Original MSRC repro: traversal embedded inside an id-shaped value.
"caresp_x/../../service-data/api-made-dir" + "A" * 14,
# Variant report repros.
"../../escape",
"..",
".",
"...",
"/tmp/escape",
"/absolute/path",
"C:\\temp\\escape",
"..\\..\\escape",
"foo\\..\\bar",
"foo/bar",
"with\x00null",
"",
],
)
def test_traversal_and_separator_payloads_are_rejected(self, tmp_path: Any, bad_id: str) -> None:
helper = self._helper()
before = sorted(p.name for p in tmp_path.parent.iterdir())
with pytest.raises(RuntimeError):
helper(str(tmp_path), bad_id)
# Ensure no escape directory was created adjacent to (or above) the root.
after = sorted(p.name for p in tmp_path.parent.iterdir())
assert before == after, f"Unexpected filesystem artifacts created for payload {bad_id!r}"
# And nothing inside the root either.
assert list(tmp_path.iterdir()) == []
def test_non_string_context_id_is_rejected(self, tmp_path: Any) -> None:
helper = self._helper()
with pytest.raises(RuntimeError):
helper(str(tmp_path), None) # type: ignore[arg-type]
def test_url_encoded_traversal_is_treated_as_literal_segment(self, tmp_path: Any) -> None:
"""URL-encoded traversal should not decode to traversal at the filesystem layer.
The hosting layer never URL-decodes ids before using them; the helper
should accept ``%2e%2e`` as a single literal segment (no escape).
"""
helper = self._helper()
storage = helper(str(tmp_path), "%2e%2e")
assert storage.storage_path.parent == tmp_path.resolve()
assert storage.storage_path.name == "%2e%2e"
# endregion