mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Python: Reject path-traversal context ids in Foundry Hosting Checkpoint Storage (#5851)
* Reject path-traversal context ids in foundry workflow checkpoint storage Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/fca3aae6-50eb-4726-8baf-2718217d4e79 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Address PR review feedback: clarify URL-decode comment, isolate test root, add e2e workflow rejection tests Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/832f45a6-c01e-4da9-bf85-1ba7b5f302e6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * Clarify MSRC repro padding length in regression test Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/832f45a6-c01e-4da9-bf85-1ba7b5f302e6 Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> * add E2E http test for checkpoint context id rejection Agent-Logs-Url: https://github.com/microsoft/agent-framework/sessions/730258ef-2781-4a7d-b7cf-b5c40c11defc Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lokitoth <6936551+lokitoth@users.noreply.github.com> Co-authored-by: Jacob Alber <jaalber@microsoft.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
2ef20cd0aa
commit
67f3db6280
@@ -11,6 +11,7 @@ import tempfile
|
||||
import threading
|
||||
from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence
|
||||
from contextlib import suppress
|
||||
from pathlib import Path
|
||||
from typing import Protocol, cast
|
||||
|
||||
from agent_framework import (
|
||||
@@ -205,6 +206,47 @@ class FileBasedFunctionApprovalStorage:
|
||||
return await asyncio.to_thread(self._load_sync, approval_request_id)
|
||||
|
||||
|
||||
def _checkpoint_storage_for_context(root: str, context_id: str) -> FileCheckpointStorage:
|
||||
"""Build a ``FileCheckpointStorage`` for ``context_id`` rooted under ``root``.
|
||||
|
||||
``context_id`` originates from caller-controlled fields such as
|
||||
``previous_response_id`` or from server-generated fields such as
|
||||
``conversation_id`` / ``response_id``. In every case it must be treated as
|
||||
an untrusted single path segment: path separators, drive letters, parent
|
||||
references and similar would otherwise let the resulting directory escape
|
||||
the configured checkpoint root (CWE-22). The check resolves the joined
|
||||
path and verifies it stays under the resolved root before any directory is
|
||||
created on disk.
|
||||
"""
|
||||
if not isinstance(context_id, str) or not context_id:
|
||||
raise RuntimeError("Invalid checkpoint context id: must be a non-empty string.")
|
||||
# Reject any segment that is not a single safe path component. This covers
|
||||
# POSIX/Windows separators, NUL bytes, drive letters, and all-dot segments
|
||||
# (``.``, ``..``, ``...``, ...). We deliberately do not URL-decode the id
|
||||
# here: the hosting layer never decodes context ids before joining them, so
|
||||
# forms such as ``%2e%2e`` are accepted as literal directory names. Do NOT
|
||||
# add decoding here without re-validating after the decode -- decode-then-
|
||||
# join is exactly the pattern that reintroduces traversal. We also do not
|
||||
# attempt to "sanitize" by stripping characters because that can introduce
|
||||
# collisions between distinct ids.
|
||||
if (
|
||||
"/" in context_id
|
||||
or "\\" in context_id
|
||||
or "\x00" in context_id
|
||||
# All-dot segments (``.``, ``..``, ``...``, ...) reduce to "" after stripping dots.
|
||||
or context_id.strip(".") == ""
|
||||
or os.path.isabs(context_id)
|
||||
or os.path.splitdrive(context_id)[0]
|
||||
):
|
||||
raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}")
|
||||
|
||||
root_path = Path(root).resolve()
|
||||
storage_path = (root_path / context_id).resolve()
|
||||
if not storage_path.is_relative_to(root_path):
|
||||
raise RuntimeError(f"Invalid checkpoint context id: {context_id!r}")
|
||||
return FileCheckpointStorage(storage_path)
|
||||
|
||||
|
||||
class ResponsesHostServer(ResponsesAgentServerHost):
|
||||
"""A responses server host for an agent."""
|
||||
|
||||
@@ -400,7 +442,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
|
||||
latest_checkpoint_id: str | None = None
|
||||
restore_storage: FileCheckpointStorage | None = None
|
||||
if context_id is not None:
|
||||
restore_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, context_id))
|
||||
restore_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, context_id)
|
||||
latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name)
|
||||
if latest_checkpoint is not None:
|
||||
latest_checkpoint_id = latest_checkpoint.checkpoint_id
|
||||
@@ -414,7 +456,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
|
||||
# supplied, restore_storage points at the *prior* response's
|
||||
# directory and write_storage points at the *current* response's.
|
||||
write_context_id = context.conversation_id or context.response_id
|
||||
write_storage = FileCheckpointStorage(os.path.join(self._checkpoint_storage_path, write_context_id))
|
||||
write_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id)
|
||||
|
||||
# Multi-turn pattern: when we have a prior checkpoint, restore it
|
||||
# first (drive the workflow back to idle with prior state intact),
|
||||
|
||||
@@ -11,7 +11,7 @@ the registered _handle_create handler.
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from collections.abc import AsyncIterator
|
||||
from collections.abc import AsyncIterator, Callable
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import httpx
|
||||
@@ -20,6 +20,7 @@ from agent_framework import (
|
||||
AgentResponse,
|
||||
AgentResponseUpdate,
|
||||
Content,
|
||||
FileCheckpointStorage,
|
||||
HistoryProvider,
|
||||
Message,
|
||||
RawAgent,
|
||||
@@ -2652,3 +2653,241 @@ class TestFunctionApprovalRoundTrip:
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
|
||||
# region Checkpoint context path validation
|
||||
|
||||
|
||||
class TestCheckpointContextPathValidation:
|
||||
"""Regression tests for the path-traversal hardening of checkpoint storage.
|
||||
|
||||
These tests guard against CWE-22 in the workflow hosting path. The hosting
|
||||
code joins caller-supplied identifiers (``previous_response_id``) and
|
||||
server-generated identifiers (``conversation_id`` / ``response_id``) under
|
||||
the configured checkpoint root. Without validation, traversal segments
|
||||
such as ``../../escape`` or absolute paths cause directory creation
|
||||
outside the intended root.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _helper() -> Callable[[str, str], FileCheckpointStorage]:
|
||||
from agent_framework_foundry_hosting._responses import ( # pyright: ignore[reportPrivateUsage]
|
||||
_checkpoint_storage_for_context,
|
||||
)
|
||||
|
||||
return _checkpoint_storage_for_context
|
||||
|
||||
def test_valid_segment_creates_storage_under_root(self, tmp_path: Any) -> None:
|
||||
helper = self._helper()
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
storage = helper(str(root), "resp_abc123")
|
||||
assert storage.storage_path.is_dir()
|
||||
assert storage.storage_path.parent == root.resolve()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"bad_id",
|
||||
[
|
||||
# Original MSRC repro: traversal embedded inside an id-shaped value.
|
||||
# The 14 ``A``s pad the suffix to mimic the exact length of the
|
||||
# ``api-made-dir<14-char-suffix>`` segment from the original report.
|
||||
"caresp_x/../../service-data/api-made-dir" + "A" * 14,
|
||||
# Variant report repros.
|
||||
"../../escape",
|
||||
"..",
|
||||
".",
|
||||
"...",
|
||||
"/tmp/escape",
|
||||
"/absolute/path",
|
||||
"C:\\temp\\escape",
|
||||
"..\\..\\escape",
|
||||
"foo\\..\\bar",
|
||||
"foo/bar",
|
||||
"with\x00null",
|
||||
"",
|
||||
],
|
||||
)
|
||||
def test_traversal_and_separator_payloads_are_rejected(self, tmp_path: Any, bad_id: str) -> None:
|
||||
helper = self._helper()
|
||||
# Use a dedicated root *inside* tmp_path so we can assert that nothing
|
||||
# was created anywhere under tmp_path (root, siblings, or above).
|
||||
# Asserting against tmp_path.parent would be flaky under parallel test
|
||||
# execution because tmp_path.parent is shared across tests.
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
before = sorted(p.name for p in tmp_path.iterdir())
|
||||
with pytest.raises(RuntimeError):
|
||||
helper(str(root), bad_id)
|
||||
# No sibling/escape directory should have been created next to the root.
|
||||
after = sorted(p.name for p in tmp_path.iterdir())
|
||||
assert before == after, f"Unexpected filesystem artifacts created for payload {bad_id!r}"
|
||||
# And nothing inside the root either.
|
||||
assert list(root.iterdir()) == []
|
||||
|
||||
def test_non_string_context_id_is_rejected(self, tmp_path: Any) -> None:
|
||||
helper = self._helper()
|
||||
with pytest.raises(RuntimeError):
|
||||
helper(str(tmp_path), None) # type: ignore[arg-type]
|
||||
|
||||
def test_url_encoded_traversal_is_treated_as_literal_segment(self, tmp_path: Any) -> None:
|
||||
"""URL-encoded traversal should not decode to traversal at the filesystem layer.
|
||||
|
||||
The hosting layer never URL-decodes ids before using them; the helper
|
||||
should accept ``%2e%2e`` as a single literal segment (no escape).
|
||||
"""
|
||||
helper = self._helper()
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
storage = helper(str(root), "%2e%2e")
|
||||
assert storage.storage_path.parent == root.resolve()
|
||||
assert storage.storage_path.name == "%2e%2e"
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"context_field,bad_id",
|
||||
[
|
||||
# Restore sink: caller-controlled previous_response_id.
|
||||
("previous_response_id", "../../escape"),
|
||||
("previous_response_id", "/tmp/escape-abs"),
|
||||
("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14),
|
||||
# Restore sink: server-issued conversation_id (defense in depth).
|
||||
("conversation_id", "../../escape"),
|
||||
# Write sink: malicious response_id (defense in depth).
|
||||
("response_id", "../../escape"),
|
||||
],
|
||||
)
|
||||
async def test_handle_inner_workflow_rejects_malicious_context_id(
|
||||
self, tmp_path: Any, context_field: str, bad_id: str
|
||||
) -> None:
|
||||
"""End-to-end: ``_handle_inner_workflow`` must reject malicious ids on
|
||||
both the restore sink (``previous_response_id`` / ``conversation_id``)
|
||||
and the write sink (``response_id``) without creating any directories.
|
||||
"""
|
||||
from unittest.mock import patch
|
||||
|
||||
from agent_framework import WorkflowAgent
|
||||
from azure.ai.agentserver.responses import ResponseContext
|
||||
from azure.ai.agentserver.responses.models import CreateResponse
|
||||
|
||||
# Build a mock that satisfies isinstance(agent, WorkflowAgent) and the
|
||||
# constructor's "no existing checkpointing" guard.
|
||||
agent = MagicMock(spec=WorkflowAgent)
|
||||
agent.id = "wf-agent"
|
||||
agent.name = "wf"
|
||||
agent.description = ""
|
||||
agent.context_providers = []
|
||||
agent.workflow = MagicMock()
|
||||
agent.workflow.name = "wf"
|
||||
agent.workflow._runner_context.has_checkpointing = MagicMock(return_value=False)
|
||||
|
||||
# Constructor inspects WorkflowAgent.workflow internals; bypass setup
|
||||
# by feeding a configured mock through a normal init.
|
||||
server = ResponsesHostServer(agent, store=InMemoryResponseProvider())
|
||||
# Re-root checkpoint storage at our isolated tmp_path so we can detect
|
||||
# any escape attempt on the filesystem.
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage]
|
||||
|
||||
# Build a ResponseContext with the malicious id targeting the chosen sink.
|
||||
kwargs: dict[str, Any] = {
|
||||
"response_id": "resp_" + "a" * 48,
|
||||
"mode_flags": MagicMock(),
|
||||
}
|
||||
if context_field == "previous_response_id":
|
||||
request = CreateResponse(model="m", input="hi", previous_response_id=bad_id)
|
||||
kwargs["previous_response_id"] = bad_id
|
||||
elif context_field == "conversation_id":
|
||||
request = CreateResponse(model="m", input="hi")
|
||||
kwargs["conversation_id"] = bad_id
|
||||
else: # response_id (write sink)
|
||||
request = CreateResponse(model="m", input="hi")
|
||||
kwargs["response_id"] = bad_id
|
||||
|
||||
# Avoid invoking the real input-resolution machinery, which would need
|
||||
# a configured provider; we never reach the workflow run on rejection.
|
||||
with patch.object(ResponseContext, "get_input_items", new=AsyncMock(return_value=[])):
|
||||
context = ResponseContext(**kwargs)
|
||||
before = sorted(p.name for p in tmp_path.iterdir())
|
||||
with pytest.raises(RuntimeError, match="Invalid checkpoint context id"):
|
||||
async for _ in server._handle_inner_workflow(request, context): # pyright: ignore[reportPrivateUsage]
|
||||
pass
|
||||
after = sorted(p.name for p in tmp_path.iterdir())
|
||||
|
||||
assert before == after, f"Unexpected filesystem artifacts created for {context_field}={bad_id!r}"
|
||||
assert list(root.iterdir()) == [], f"Checkpoint dir created inside root for {context_field}={bad_id!r}"
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"context_field,bad_id",
|
||||
[
|
||||
# Restore sink: caller-controlled previous_response_id. These are
|
||||
# rejected by request validation (HTTP 400) before the checkpoint
|
||||
# code is reached.
|
||||
("previous_response_id", "../../escape"),
|
||||
("previous_response_id", "/tmp/escape-abs"),
|
||||
("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14),
|
||||
# Restore sink: server-issued conversation id (defense in depth).
|
||||
# Reaches the checkpoint code and is rejected there, surfacing as
|
||||
# an HTTP 5xx without creating any filesystem artifacts.
|
||||
("conversation", "../../escape"),
|
||||
("conversation", "/tmp/escape-abs"),
|
||||
],
|
||||
)
|
||||
async def test_malicious_context_id_rejected_e2e(self, tmp_path: Any, context_field: str, bad_id: str) -> None:
|
||||
"""End-to-end (ASGI-in-process): malicious context ids must be rejected
|
||||
through the full HTTP pipeline, and no checkpoint directory may be
|
||||
created on disk for either the validation-layer rejection
|
||||
(``previous_response_id``) or the deeper checkpoint-layer rejection
|
||||
(``conversation``).
|
||||
|
||||
The ``response_id`` write-sink is server-generated and not reachable
|
||||
via the public HTTP surface, so its defense-in-depth check is covered
|
||||
by the helper-level test above.
|
||||
"""
|
||||
from agent_framework import WorkflowAgent
|
||||
|
||||
# Build a mock that satisfies isinstance(agent, WorkflowAgent) and the
|
||||
# constructor's "no existing checkpointing" guard.
|
||||
agent = MagicMock(spec=WorkflowAgent)
|
||||
agent.id = "wf-agent"
|
||||
agent.name = "wf"
|
||||
agent.description = ""
|
||||
agent.context_providers = []
|
||||
agent.workflow = MagicMock()
|
||||
agent.workflow.name = "wf"
|
||||
agent.workflow._runner_context.has_checkpointing = MagicMock( # pyright: ignore[reportPrivateUsage]
|
||||
return_value=False
|
||||
)
|
||||
|
||||
server = ResponsesHostServer(agent, store=InMemoryResponseProvider())
|
||||
# Re-root checkpoint storage at our isolated tmp_path so we can detect
|
||||
# any escape attempt on the filesystem.
|
||||
root = tmp_path / "root"
|
||||
root.mkdir()
|
||||
server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage]
|
||||
|
||||
payload: dict[str, Any] = {"model": "m", "input": "hi"}
|
||||
if context_field == "previous_response_id":
|
||||
payload["previous_response_id"] = bad_id
|
||||
else: # conversation
|
||||
payload["conversation"] = bad_id
|
||||
|
||||
before = sorted(p.name for p in tmp_path.iterdir())
|
||||
transport = httpx.ASGITransport(app=server)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
|
||||
resp = await client.post("/responses", json=payload)
|
||||
after = sorted(p.name for p in tmp_path.iterdir())
|
||||
|
||||
# The request must not succeed; either request validation rejects it
|
||||
# (4xx) or the checkpoint layer raises and the server returns 5xx.
|
||||
# Either way, no successful response may be produced.
|
||||
assert resp.status_code >= 400, (
|
||||
f"Expected non-2xx for {context_field}={bad_id!r}, got {resp.status_code}: {resp.text[:200]}"
|
||||
)
|
||||
assert before == after, (
|
||||
f"Unexpected filesystem artifacts under tmp_path for {context_field}={bad_id!r}: "
|
||||
f"before={before} after={after}"
|
||||
)
|
||||
assert list(root.iterdir()) == [], f"Checkpoint directory created inside root for {context_field}={bad_id!r}"
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
Reference in New Issue
Block a user