Files
agent-framework/python/packages/hyperlight/tests/hyperlight/test_hyperlight_codeact.py
T
Eduard van Valkenburg 66a09a76af Python: fix: hyperlight skips symlinks when staging sandbox input (#5919)
* Python: fix(hyperlight): skip symlinks when staging files into the sandbox

The helpers that populate the sandbox input tree (``_copy_path`` and the
``_path_tree_signature`` walker used for cache invalidation) relied on
``Path.is_file()``, ``Path.is_dir()`` and ``shutil.copy2`` - all of which
follow symlinks by default. When the source tree contains symlinks, that
let entries from outside the configured input source surface inside the
sandbox.

Harden both code paths to never follow symlinks:

- ``_copy_path`` now bails out via ``Path.is_symlink()`` before any
  ``is_dir()`` / ``is_file()`` check, skips non-regular files, and uses
  ``shutil.copy2(..., follow_symlinks=False)`` as defense in depth.
- New ``_iter_real_entries`` walker replaces the previous ``Path.rglob``
  call inside ``_path_tree_signature`` (rglob follows directory symlinks).
- ``_path_tree_signature`` switches to ``Path.lstat()`` so size/mtime are
  never read through a symlink target.

Added regression tests covering:

- A pre-placed file symlink in ``workspace_root`` (top level).
- A pre-placed directory symlink in ``workspace_root``.
- A nested file symlink inside a real subdirectory.
- ``_path_tree_signature`` ignoring symlinks so the cache key reflects only
  what is actually staged.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: fix(hyperlight): address PR #5919 review feedback

- _iter_real_entries now yields directories and regular files only,
  skipping non-regular entries (sockets/FIFOs/devices). Keeps the
  cache-key signature consistent with what _copy_path actually stages.
- The four new symlink regression tests skip when the platform does not
  support symlink creation (e.g. unprivileged Windows runners), via a
  local _symlinks_supported helper modelled on the one in
  packages/core/tests/core/test_skills.py. Prevents OSError /
  NotImplementedError from failing CI jobs that have nothing to do with
  the change under test.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: fix(hyperlight): address PR #5919 follow-up review feedback

- _copy_path docstring: narrow the scope to "symlink entries present in
  the source tree at rest" and explicitly call out that the copy is NOT
  atomic with respect to concurrent mutation of the source tree.
  Callers who need that stronger guarantee should snapshot their
  workspace before passing it in. Avoids overpromising on a TOCTOU
  window that pathlib cannot express; closing it properly would need
  fd-based traversal (O_NOFOLLOW | O_DIRECTORY + os.scandir(fd)) with
  a separate Windows story, which is out of scope for this targeted
  fix.

- _path_tree_signature: drop the `if path.is_symlink(): return ()`
  short-circuit. Resolve a symlink root to its real target before
  walking instead. The public construction flow already resolves
  workspace_root / file_mounts[].host_path up front so this never
  affected user-facing code, but the short-circuit was misleading and
  would have produced an empty, stable signature for any direct
  caller that builds a _RunConfig without going through the public
  constructor. Defense in depth: even if a future call site forgets
  to resolve the root, the cache key still reflects real contents.

- Added regression test
  test_path_tree_signature_walks_through_symlinked_root: a symlinked
  workspace root must produce a non-empty signature, AND the signature
  must change when the real target's contents change so the cache key
  actually invalidates.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-19 11:41:53 +00:00

1539 lines
58 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import gc
import importlib.metadata
import importlib.util
import inspect
import json
import sys
import threading
import time
from collections.abc import Awaitable, Callable, Mapping, MutableSequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import pytest
from agent_framework import (
Agent,
BaseChatClient,
ChatResponse,
ChatResponseUpdate,
Content,
FunctionInvocationLayer,
FunctionTool,
Message,
ResponseStream,
tool,
)
from agent_framework_hyperlight import AllowedDomain, FileMount, HyperlightCodeActProvider, HyperlightExecuteCodeTool
from agent_framework_hyperlight import _execute_code_tool as execute_code_module
def _hyperlight_integration_static_skip_reason() -> str | None:
if sys.version_info >= (3, 14):
return (
"Hyperlight integration tests require Python < 3.14 because hyperlight-sandbox-backend-wasm is unsupported."
)
if sys.platform not in {"linux", "win32"}:
return "Hyperlight integration tests require Linux or Windows runners."
if importlib.util.find_spec("hyperlight_sandbox") is None:
return "hyperlight-sandbox is not installed."
if importlib.util.find_spec("python_guest") is None:
return "hyperlight-sandbox-python-guest is not installed."
try:
importlib.metadata.version("hyperlight-sandbox-backend-wasm")
except importlib.metadata.PackageNotFoundError:
return "hyperlight-sandbox-backend-wasm is not installed."
return None
def _hyperlight_integration_runtime_skip_reason() -> str | None:
if (reason := _hyperlight_integration_static_skip_reason()) is not None:
return reason
try:
sandbox_cls = execute_code_module._load_sandbox_class()
sandbox = sandbox_cls(
backend=execute_code_module.DEFAULT_HYPERLIGHT_BACKEND,
module=execute_code_module.DEFAULT_HYPERLIGHT_MODULE,
)
sandbox.run("None")
except RuntimeError as exc:
message = str(exc)
if "no hypervisor was found for sandbox" in message.lower():
return "Hyperlight integration tests require a runner with a working Hyperlight hypervisor."
return None
def _skip_if_hyperlight_integration_runtime_disabled() -> None:
if (reason := _hyperlight_integration_runtime_skip_reason()) is not None:
pytest.skip(reason)
skip_if_hyperlight_integration_tests_disabled = pytest.mark.skipif(
(reason := _hyperlight_integration_static_skip_reason()) is not None,
reason=reason or "Hyperlight integration tests are disabled.",
)
@pytest.fixture(scope="module")
def shared_sandbox():
"""Long-lived sandbox with snapshot/restore for read-mostly tests.
Multiple tests run sequentially against this fixture. Each test restores the
sandbox to a clean state via the ``restored_sandbox`` fixture.
"""
if (reason := _hyperlight_integration_runtime_skip_reason()) is not None:
pytest.skip(reason)
sandbox_cls = execute_code_module._load_sandbox_class()
sandbox = sandbox_cls(
backend=execute_code_module.DEFAULT_HYPERLIGHT_BACKEND,
module=execute_code_module.DEFAULT_HYPERLIGHT_MODULE,
)
sandbox.run("None")
snapshot = sandbox.snapshot()
yield sandbox, snapshot
@pytest.fixture
def restored_sandbox(shared_sandbox):
"""Restore shared sandbox to clean state before each test."""
sandbox, snapshot = shared_sandbox
sandbox.restore(snapshot)
return sandbox
@pytest.fixture
def fresh_sandbox():
"""Short-lived sandbox for tests that alter config meaningfully.
Not pre-warmed: call ``sandbox.run("None")`` after registering tools
and domains, then snapshot/restore before executing test code.
"""
if (reason := _hyperlight_integration_runtime_skip_reason()) is not None:
pytest.skip(reason)
sandbox_cls = execute_code_module._load_sandbox_class()
sandbox = sandbox_cls(
backend=execute_code_module.DEFAULT_HYPERLIGHT_BACKEND,
module=execute_code_module.DEFAULT_HYPERLIGHT_MODULE,
temp_output=True,
)
yield sandbox
@tool(approval_mode="never_require")
def compute(a: int, b: int) -> int:
return a + b
@tool(approval_mode="always_require")
def dangerous_compute(a: int, b: int) -> int:
return a * b
@tool(name="compute", approval_mode="always_require")
def replacement_compute(a: int, b: int) -> int:
return a - b
@dataclass(slots=True)
class _FakeResult:
success: bool
stdout: str = ""
stderr: str = ""
def _run_in_thread(callback: Callable[[], Any]) -> Any:
result: dict[str, Any] = {}
error: dict[str, BaseException] = {}
def _runner() -> None:
try:
result["value"] = callback()
except BaseException as exc:
error["value"] = exc
thread = threading.Thread(target=_runner)
thread.start()
thread.join()
if "value" in error:
raise error["value"]
return result.get("value")
class _FakeSandbox:
instances: list[_FakeSandbox] = []
def __init__(
self,
*,
input_dir: str | None = None,
output_dir: str | None = None,
temp_output: bool = False,
backend: str = "wasm",
module: str | None = None,
module_path: str | None = None,
heap_size: str | None = None,
stack_size: str | None = None,
) -> None:
self.input_dir = input_dir
self.output_dir = output_dir
self.registered_tools: dict[str, Any] = {}
self.allowed_domains: list[tuple[str, list[str] | None]] = []
self.restore_calls: list[Any] = []
self.output_files: list[str] = []
_FakeSandbox.instances.append(self)
def register_tool(self, name_or_tool: Any, callback: Any | None = None) -> None:
if callback is None:
raise AssertionError("Expected callback registration for sandbox tools.")
self.registered_tools[str(name_or_tool)] = callback
def allow_domain(self, target: str, methods: list[str] | None = None) -> None:
self.allowed_domains.append((target, methods))
def _invoke_tool(self, name: str, **kwargs: Any) -> Any:
callback = self.registered_tools[name]
if inspect.iscoroutinefunction(callback):
return _run_in_thread(lambda: asyncio.run(callback(**kwargs)))
result = callback(**kwargs)
if inspect.isawaitable(result):
return _run_in_thread(lambda: asyncio.run(result))
return result
def run(self, code: str) -> _FakeResult:
if code == "None":
return _FakeResult(success=True)
if code == "create-output":
if self.output_dir is None:
raise AssertionError("Expected output directory for create-output test.")
Path(self.output_dir, "report.txt").write_text("artifact", encoding="utf-8")
self.output_files = ["report.txt"]
return _FakeResult(success=True, stdout="done\n")
if 'call_tool("compute", a=20, b=22)' in code:
total = self._invoke_tool("compute", a=20, b=22)
return _FakeResult(success=True, stdout=f"{total}\n")
return _FakeResult(success=False, stderr="sandbox boom")
def snapshot(self) -> str:
return "snapshot"
def restore(self, snapshot: Any) -> None:
self.restore_calls.append(snapshot)
def get_output_files(self) -> list[str]:
return list(self.output_files)
class _FakeRuntime:
def __init__(self) -> None:
self.calls: list[tuple[Any, str]] = []
def execute(self, *, config: Any, code: str) -> list[Content]:
self.calls.append((config, code))
return [Content.from_text("ok")]
class _FakeSandboxWithoutOutputListing(_FakeSandbox):
def get_output_files(self) -> list[str]:
return []
class _FakeSandboxWithDelayedUnlistedOutput(_FakeSandboxWithoutOutputListing):
writer_threads: list[threading.Thread] = []
def run(self, code: str) -> _FakeResult:
if 'Path("/output/report.txt").write_text("artifact", encoding="utf-8")' in code:
if self.output_dir is None:
raise AssertionError("Expected output directory for delayed output test.")
def _write_file() -> None:
time.sleep(0.15)
Path(self.output_dir, "report.txt").write_text("artifact", encoding="utf-8")
writer_thread = threading.Thread(target=_write_file)
writer_thread.start()
self.writer_threads.append(writer_thread)
return _FakeResult(success=True)
return super().run(code)
class _FakeSessionContext:
def __init__(self, *, tools: list[Any] | None = None) -> None:
self.options: dict[str, Any] = {}
if tools is not None:
self.options["tools"] = tools
self.instructions: list[tuple[str, str]] = []
self.tools: list[tuple[str, list[Any]]] = []
def extend_instructions(self, source_id: str, instructions: str) -> None:
self.instructions.append((source_id, instructions))
def extend_tools(self, source_id: str, tools: list[Any]) -> None:
self.tools.append((source_id, tools))
def _extract_text_output(function_result: Content) -> str:
assert function_result.type == "function_result"
assert function_result.exception is None, (
f"execute_code raised {function_result.exception!r} with items={function_result.items!r}"
)
text_output = next(
(item for item in function_result.items or [] if item.type == "text" and item.text is not None),
None,
)
if text_output is not None and text_output.text is not None:
return text_output.text
if function_result.result:
return function_result.result
raise AssertionError(f"Expected text output from execute_code, got {function_result.items!r}")
class _FakeCodeActChatClient(FunctionInvocationLayer[Any], BaseChatClient[Any]):
def __init__(self) -> None:
FunctionInvocationLayer.__init__(self)
BaseChatClient.__init__(self)
self.call_count = 0
def _inner_get_response(
self,
*,
messages: MutableSequence[Message],
stream: bool,
options: Mapping[str, Any],
**kwargs: Any,
) -> Awaitable[ChatResponse] | ResponseStream[ChatResponseUpdate, ChatResponse]:
if stream:
raise AssertionError("Streaming is not used in this integration test.")
async def _get_response() -> ChatResponse:
self.call_count += 1
if self.call_count == 1:
return ChatResponse(
messages=Message(
role="assistant",
contents=[
Content.from_function_call(
call_id="execute_code_call",
name="execute_code",
arguments={
"code": 'total = call_tool("compute", a=20, b=22)\nprint(total)',
},
)
],
)
)
function_results = [
content for message in messages for content in message.contents if content.type == "function_result"
]
assert len(function_results) == 1
result_content = function_results[0]
assert result_content.call_id == "execute_code_call"
assert _extract_text_output(result_content) == "42\n"
return ChatResponse(messages=Message(role="assistant", contents=["The sandbox returned 42."]))
return _get_response()
def test_execute_code_tool_updates_approval_with_managed_tools() -> None:
execute_code = HyperlightExecuteCodeTool(tools=[compute], _registry=_FakeRuntime())
assert execute_code.approval_mode == "never_require"
execute_code.add_tools([dangerous_compute])
assert execute_code.approval_mode == "always_require"
def test_execute_code_tool_replaces_tools_with_the_same_name() -> None:
execute_code = HyperlightExecuteCodeTool(tools=[compute], _registry=_FakeRuntime())
execute_code.add_tools(replacement_compute)
tools = execute_code.get_tools()
assert len(tools) == 1
assert tools[0] is replacement_compute
assert execute_code.approval_mode == "always_require"
def test_execute_code_tool_accepts_string_and_tuple_file_mounts_without_mode_flags(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
shorthand_file = tmp_path / "notes.txt"
shorthand_file.write_text("hello", encoding="utf-8")
explicit_file = tmp_path / "data.json"
explicit_file.write_text('{"hello": "world"}', encoding="utf-8")
monkeypatch.chdir(tmp_path)
execute_code = HyperlightExecuteCodeTool(_registry=_FakeRuntime())
execute_code.add_file_mounts("notes.txt")
execute_code.add_file_mounts((explicit_file, "data/data.json"))
assert execute_code.get_file_mounts() == [
FileMount(shorthand_file.resolve(), "/input/notes.txt"),
FileMount(explicit_file.resolve(), "/input/data/data.json"),
]
async def test_execute_code_tool_populates_input_dir_with_workspace_and_file_mounts(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
workspace_root = tmp_path / "workspace"
workspace_root.mkdir()
(workspace_root / "notes.txt").write_text("workspace note", encoding="utf-8")
mounted_file = tmp_path / "mounted.txt"
mounted_file.write_text("hello from mount", encoding="utf-8")
execute_code = HyperlightExecuteCodeTool(
workspace_root=workspace_root,
file_mounts=[FileMount(mounted_file, "data/input.txt")],
)
result = await execute_code.invoke(arguments={"code": "None"})
assert result[0].type == "text"
assert _FakeSandbox.instances[0].input_dir is not None
input_root = Path(_FakeSandbox.instances[0].input_dir)
assert (input_root / "notes.txt").read_text(encoding="utf-8") == "workspace note"
assert (input_root / "data" / "input.txt").read_text(encoding="utf-8") == "hello from mount"
def _build_run_config(
*,
workspace_root: Path | None = None,
file_mounts: tuple = (),
) -> Any:
"""Build a minimal _RunConfig for tests that exercise _populate_input_dir directly."""
return execute_code_module._RunConfig(
backend="wasm",
module="python_guest.path",
module_path=None,
approval_mode="never_require",
tools=(),
workspace_root=workspace_root,
workspace_signature=(),
file_mounts=file_mounts,
allowed_domains=(),
)
def _symlinks_supported(tmp: Path) -> bool:
"""Return True if the current platform/environment supports symlinks.
Mirrors python/packages/core/tests/core/test_skills.py so the symlink
regression tests are skipped on restricted Windows CI runners instead of
failing on ``OSError`` / ``NotImplementedError`` during creation.
"""
test_target = tmp / "_symlink_test_target"
test_link = tmp / "_symlink_test_link"
try:
test_target.write_text("test", encoding="utf-8")
test_link.symlink_to(test_target)
return True
except (OSError, NotImplementedError):
return False
finally:
test_link.unlink(missing_ok=True)
test_target.unlink(missing_ok=True)
def test_populate_input_dir_skips_symlink_to_file_outside_workspace(tmp_path: Path) -> None:
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "real.txt").write_text("real-content", encoding="utf-8")
(workspace / "link.txt").symlink_to(outside)
input_root = tmp_path / "input"
input_root.mkdir()
execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)
# Real file copied; symlink and its target are absent.
assert (input_root / "real.txt").read_text(encoding="utf-8") == "real-content"
assert not (input_root / "link.txt").exists()
assert not (input_root / "link.txt").is_symlink()
# Sanity: no outside-content anywhere in the input tree.
leaked = [
path
for path in input_root.rglob("*")
if path.is_file() and path.read_text(encoding="utf-8") == "outside-content"
]
assert leaked == []
def test_populate_input_dir_skips_symlinked_directory_outside_workspace(tmp_path: Path) -> None:
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside_dir = tmp_path / "outside_dir"
outside_dir.mkdir()
(outside_dir / "deep.txt").write_text("deep-content", encoding="utf-8")
(workspace / "linked_dir").symlink_to(outside_dir, target_is_directory=True)
input_root = tmp_path / "input"
input_root.mkdir()
execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)
# Neither the symlink itself nor anything under the symlinked target leaks.
assert not (input_root / "linked_dir").exists()
leaked = [
path for path in input_root.rglob("*") if path.is_file() and path.read_text(encoding="utf-8") == "deep-content"
]
assert leaked == []
def test_populate_input_dir_skips_nested_symlinks(tmp_path: Path) -> None:
"""A symlink several levels deep inside a real subdir must also be skipped."""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
(workspace / "real_sub").mkdir(parents=True)
(workspace / "real_sub" / "ok.txt").write_text("ok", encoding="utf-8")
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "real_sub" / "link.txt").symlink_to(outside)
input_root = tmp_path / "input"
input_root.mkdir()
execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)
assert (input_root / "real_sub" / "ok.txt").read_text(encoding="utf-8") == "ok"
assert not (input_root / "real_sub" / "link.txt").exists()
def test_path_tree_signature_does_not_follow_symlinks(tmp_path: Path) -> None:
"""The cache-key signature must reflect only real files (mirrors the staged tree)."""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
real = workspace / "real.txt"
real.write_text("real-content", encoding="utf-8")
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "link.txt").symlink_to(outside)
signature = execute_code_module._path_tree_signature(workspace)
names = [entry[0] for entry in signature]
assert "real.txt" in names
assert "link.txt" not in names
def test_path_tree_signature_walks_through_symlinked_root(tmp_path: Path) -> None:
"""A symlinked workspace root must produce a real signature, not an empty one.
Defends against the cache never invalidating when a caller passes a
symlinked workspace and the underlying real directory's contents change.
"""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
real_workspace = tmp_path / "real_workspace"
real_workspace.mkdir()
target = real_workspace / "data.txt"
target.write_text("v1", encoding="utf-8")
linked_workspace = tmp_path / "linked_workspace"
linked_workspace.symlink_to(real_workspace, target_is_directory=True)
signature_v1 = execute_code_module._path_tree_signature(linked_workspace)
names = [entry[0] for entry in signature_v1]
assert "data.txt" in names, f"signature should include the target's contents, got {signature_v1!r}"
# Mutate the real contents; the symlinked-root signature must reflect the change
# so the cache key invalidates.
import time
time.sleep(0.01) # ensure mtime_ns moves on filesystems with coarse granularity
target.write_text("v2-content-larger", encoding="utf-8")
signature_v2 = execute_code_module._path_tree_signature(linked_workspace)
assert signature_v1 != signature_v2, "signature should change when symlinked target contents change"
def test_execute_code_tool_allowed_domains_use_structured_entries_and_replace_by_target() -> None:
execute_code = HyperlightExecuteCodeTool(_registry=_FakeRuntime())
execute_code.add_allowed_domains(["https://api.example.com/v1", ("github.com", "get")])
execute_code.add_allowed_domains([
AllowedDomain("api.example.com", ("post", "get")),
("github.com", ["head", "get"]),
])
assert execute_code.get_allowed_domains() == [
AllowedDomain("api.example.com", ("GET", "POST")),
AllowedDomain("github.com", ("GET", "HEAD")),
]
def test_execute_code_tool_description_contains_call_tool_guidance(tmp_path: Path) -> None:
workspace_root = tmp_path / "workspace"
workspace_root.mkdir()
(workspace_root / "notes.txt").write_text("hello", encoding="utf-8")
mount_file = tmp_path / "data.json"
mount_file.write_text('{"hello": "world"}', encoding="utf-8")
execute_code = HyperlightExecuteCodeTool(
tools=[compute],
workspace_root=workspace_root,
file_mounts=[FileMount(str(mount_file), "data/data.json")],
allowed_domains=[AllowedDomain("https://api.example.com/v1", ("get", "post")), "github.com"],
_registry=_FakeRuntime(),
)
description = execute_code.description
assert "call_tool(name, **kwargs)" in description
assert "compute" in description
assert "/input/data/data.json" in description
assert "/output" in description
assert "api.example.com" in description
assert "GET, POST" in description
assert "github.com" in description
async def test_execute_code_tool_executes_with_structured_content(monkeypatch: pytest.MonkeyPatch) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
execute_code = HyperlightExecuteCodeTool(
tools=[compute],
file_mounts=[FileMount(Path(__file__), "fixtures/source.py")],
allowed_domains=[("api.example.com", "get")],
)
result = await execute_code.invoke(arguments={"code": "create-output"})
assert result[0].type == "text"
assert result[0].text == "done\n"
assert any(item.type == "data" for item in result)
assert _FakeSandbox.instances[0].allowed_domains == [("api.example.com", ["GET"])]
assert "compute" in _FakeSandbox.instances[0].registered_tools
async def test_execute_code_tool_collects_output_files_without_backend_listing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandboxWithoutOutputListing)
execute_code = HyperlightExecuteCodeTool(
file_mounts=[FileMount(Path(__file__), "fixtures/source.py")],
)
result = await execute_code.invoke(arguments={"code": "create-output"})
assert result[0].type == "text"
assert any(item.type == "data" and item.additional_properties["path"] == "/output/report.txt" for item in result)
async def test_execute_code_tool_waits_for_unlisted_output_files_to_appear(
monkeypatch: pytest.MonkeyPatch,
) -> None:
_FakeSandboxWithDelayedUnlistedOutput.writer_threads.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandboxWithDelayedUnlistedOutput)
execute_code = HyperlightExecuteCodeTool(
file_mounts=[FileMount(Path(__file__), "fixtures/source.py")],
)
result = await execute_code.invoke(
arguments={"code": 'Path("/output/report.txt").write_text("artifact", encoding="utf-8")'}
)
for writer_thread in _FakeSandboxWithDelayedUnlistedOutput.writer_threads:
writer_thread.join()
assert any(item.type == "data" and item.additional_properties["path"] == "/output/report.txt" for item in result)
async def test_execute_code_tool_failure_returns_error_content(monkeypatch: pytest.MonkeyPatch) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
execute_code = HyperlightExecuteCodeTool()
result = await execute_code.invoke(arguments={"code": "fail"})
assert result[0].type == "error"
assert result[0].error_details == "sandbox boom"
async def test_execute_code_tool_retries_allowed_domains_with_urls_when_backend_rejects_host_targets(
monkeypatch: pytest.MonkeyPatch,
) -> None:
class _FakeStrictNetworkSandbox:
instances: list[_FakeStrictNetworkSandbox] = []
def __init__(
self,
*,
input_dir: str | None = None,
output_dir: str | None = None,
backend: str = "wasm",
module: str | None = None,
module_path: str | None = None,
) -> None:
del input_dir, output_dir, backend, module, module_path
self.allowed_domains: list[tuple[str, list[str] | None]] = []
_FakeStrictNetworkSandbox.instances.append(self)
def register_tool(self, name_or_tool: Any, callback: Any | None = None) -> None:
del name_or_tool, callback
def allow_domain(self, target: str, methods: list[str] | None = None) -> None:
self.allowed_domains.append((target, methods))
def run(self, code: str) -> _FakeResult:
if code == "None" and any("://" not in target for target, _ in self.allowed_domains):
raise RuntimeError("invalid URL for network permission: ")
return _FakeResult(success=True)
def snapshot(self) -> str:
return "snapshot"
def restore(self, snapshot: Any) -> None:
del snapshot
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeStrictNetworkSandbox)
execute_code = HyperlightExecuteCodeTool(allowed_domains=[("127.0.0.1:8080", "get")])
result = await execute_code.invoke(arguments={"code": "None"})
assert result[0].type == "text"
assert len(_FakeStrictNetworkSandbox.instances) == 2
assert _FakeStrictNetworkSandbox.instances[0].allowed_domains == [("127.0.0.1:8080", ["GET"])]
assert _FakeStrictNetworkSandbox.instances[1].allowed_domains == [
("http://127.0.0.1:8080", ["GET"]),
("https://127.0.0.1:8080", ["GET"]),
]
def test_hyperlight_integration_runtime_skip_reason_reports_missing_hypervisor(monkeypatch: pytest.MonkeyPatch) -> None:
class _FakeNoHypervisorSandbox:
def __init__(
self,
*,
input_dir: str | None = None,
output_dir: str | None = None,
backend: str = "wasm",
module: str | None = None,
module_path: str | None = None,
) -> None:
del input_dir, output_dir, backend, module, module_path
def run(self, code: str) -> _FakeResult:
del code
raise RuntimeError("failed to build ProtoWasmSandbox: No Hypervisor was found for Sandbox")
original_find_spec = importlib.util.find_spec
def _fake_find_spec(name: str) -> object | None:
if name in {"hyperlight_sandbox", "python_guest"}:
return object()
return original_find_spec(name)
monkeypatch.setattr(sys, "version_info", (3, 13, 0))
monkeypatch.setattr(sys, "platform", "linux")
monkeypatch.setattr(importlib.util, "find_spec", _fake_find_spec)
monkeypatch.setattr(importlib.metadata, "version", lambda _: "0.0.0")
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeNoHypervisorSandbox)
assert _hyperlight_integration_runtime_skip_reason() == (
"Hyperlight integration tests require a runner with a working Hyperlight hypervisor."
)
async def test_provider_injects_run_scoped_execute_code_tool() -> None:
runtime = _FakeRuntime()
provider = HyperlightCodeActProvider(tools=[compute], _registry=runtime)
context = _FakeSessionContext(tools=[dangerous_compute])
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
assert context.options["tools"] == [dangerous_compute]
assert len(context.instructions) == 1
assert len(context.tools) == 1
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
assert run_tool.approval_mode == "never_require"
assert [tool_obj.name for tool_obj in run_tool.get_tools()] == ["compute"]
assert "dangerous_compute" not in context.instructions[0][1]
assert "compute" not in context.instructions[0][1]
assert "Filesystem capabilities:" not in context.instructions[0][1]
assert state[provider.source_id]["tool_names"] == ["compute"]
assert state[provider.source_id]["approval_mode"] == "never_require"
json.dumps(state)
provider.remove_tool("compute")
assert [tool_obj.name for tool_obj in run_tool.get_tools()] == ["compute"]
async def test_agent_runs_hyperlight_codeact_end_to_end_with_fake_sandbox(monkeypatch: pytest.MonkeyPatch) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
client = _FakeCodeActChatClient()
provider = HyperlightCodeActProvider(tools=[compute])
agent = Agent(client=client, context_providers=[provider])
response = await agent.run("Use the sandbox to add 20 and 22.")
assert response.text == "The sandbox returned 42."
assert client.call_count == 2
assert len(_FakeSandbox.instances) == 1
assert "compute" in _FakeSandbox.instances[0].registered_tools
@skip_if_hyperlight_integration_tests_disabled
async def test_agent_runs_hyperlight_codeact_end_to_end_with_real_sandbox() -> None:
_skip_if_hyperlight_integration_runtime_disabled()
client = _FakeCodeActChatClient()
provider = HyperlightCodeActProvider(tools=[compute])
agent = Agent(client=client, context_providers=[provider])
response = await agent.run("Use the sandbox to add 20 and 22.")
assert response.text == "The sandbox returned 42."
assert client.call_count == 2
@skip_if_hyperlight_integration_tests_disabled
async def test_provider_run_tool_writes_files_with_real_sandbox(tmp_path: Path) -> None:
_skip_if_hyperlight_integration_runtime_disabled()
workspace_root = tmp_path / "workspace"
workspace_root.mkdir()
provider = HyperlightCodeActProvider(workspace_root=workspace_root)
context = _FakeSessionContext()
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
result = await run_tool.invoke(
arguments={
"code": (
'payload = "hello from sandbox"\n'
"output_path = None\n"
'for candidate in ("/output/result.txt",):\n'
" try:\n"
' with open(candidate, "w", encoding="utf-8") as f:\n'
" f.write(payload)\n"
" except OSError:\n"
" continue\n"
" output_path = candidate\n"
" break\n"
'assert output_path is not None, "output path unavailable"\n'
'print("validated")\n'
)
}
)
outputs = result
error_outputs = [
f"{item.message}: {item.error_details}"
for item in outputs
if item.type == "error" and item.error_details is not None
]
assert not error_outputs, error_outputs
text_output = next((item for item in outputs if item.type == "text" and item.text is not None), None)
if text_output is not None:
assert text_output.text == "validated\n"
file_output = next((item for item in outputs if item.type == "data"), None)
if file_output is not None:
assert file_output.uri is not None and file_output.uri.startswith("data:")
assert file_output.additional_properties["path"] in {"/output/result.txt", "/output/output/result.txt"}
@pytest.mark.integration
@skip_if_hyperlight_integration_tests_disabled
@pytest.mark.skipif(sys.platform == "win32", reason="Hyperlight WASM sandbox lacks encodings.idna on Windows")
async def test_provider_run_tool_pings_bing_with_real_sandbox() -> None:
_skip_if_hyperlight_integration_runtime_disabled()
provider = HyperlightCodeActProvider()
provider.add_allowed_domains("bing.com")
context = _FakeSessionContext()
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
result = await run_tool.invoke(
arguments={
"code": (
"import _socket\n\n"
'addresses = _socket.getaddrinfo("bing.com", 80, _socket.AF_INET, _socket.SOCK_STREAM)\n'
'assert addresses, "bing.com did not resolve"\n'
"last_error = None\n"
"for family, socktype, proto, _, sockaddr in addresses:\n"
" connection = None\n"
" try:\n"
" connection = _socket.socket(family, socktype, proto)\n"
" connection.settimeout(10)\n"
" connection.connect(sockaddr)\n"
' print("pinged bing.com")\n'
" break\n"
" except OSError as exc:\n"
" last_error = exc\n"
" finally:\n"
" if connection is not None:\n"
" try:\n"
" connection.close()\n"
" except OSError:\n"
" pass\n"
"else:\n"
' raise last_error or RuntimeError("unable to reach bing.com")\n'
)
}
)
outputs = result
error_outputs = [
f"{item.message}: {item.error_details}"
for item in outputs
if item.type == "error" and item.error_details is not None
]
assert not error_outputs, error_outputs
text_output = next((item for item in outputs if item.type == "text" and item.text is not None), None)
if text_output is not None:
assert text_output.text == "pinged bing.com\n"
# ---------------------------------------------------------------------------
# Real-sandbox tests using shared (long-lived) fixture
# ---------------------------------------------------------------------------
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_runs_simple_code(restored_sandbox) -> None:
result = restored_sandbox.run('print("hello")')
assert result.success
assert "hello" in result.stdout
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_stdout_and_stderr_captured(restored_sandbox) -> None:
result = restored_sandbox.run('import sys\nprint("out")\nprint("err", file=sys.stderr)')
assert result.success
assert "out" in result.stdout
assert "err" in result.stderr
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_code_failure_returns_nonzero_exit(restored_sandbox) -> None:
result = restored_sandbox.run("raise ValueError('boom')")
assert not result.success
assert "boom" in result.stderr
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_snapshot_restore_keeps_sandbox_functional(restored_sandbox) -> None:
"""Verify snapshot/restore cycle leaves the sandbox in a working state."""
# Mutate the sandbox
result1 = restored_sandbox.run('print("before snapshot")')
assert result1.success
# Take a snapshot and restore
snapshot = restored_sandbox.snapshot()
restored_sandbox.restore(snapshot)
# Sandbox still works after restore
result2 = restored_sandbox.run('print("after restore")')
assert result2.success
assert "after restore" in result2.stdout
# ---------------------------------------------------------------------------
# Real-sandbox tests using fresh (short-lived) fixture
# ---------------------------------------------------------------------------
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_with_tool_registration_and_execution(fresh_sandbox) -> None:
"""Verify that a sync host tool round-trips via call_tool in the real sandbox."""
def multiply(a: int, b: int) -> int:
return a * b
fresh_sandbox.register_tool("multiply", multiply)
fresh_sandbox.run("None")
snapshot = fresh_sandbox.snapshot()
fresh_sandbox.restore(snapshot)
result = fresh_sandbox.run('result = call_tool("multiply", a=6, b=7)\nprint(result)')
assert result.success
assert "42" in result.stdout
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_async_callback_round_trips_with_real_sandbox(fresh_sandbox) -> None:
"""Confirm that _make_sandbox_callback (sync wrapper) works with real FFI."""
sandbox_tool = FunctionTool(
func=compute,
name="compute",
description="Add two numbers",
)
callback = execute_code_module._make_sandbox_callback(sandbox_tool)
fresh_sandbox.register_tool("compute", callback)
fresh_sandbox.run("None")
snapshot = fresh_sandbox.snapshot()
fresh_sandbox.restore(snapshot)
result = fresh_sandbox.run('total = call_tool("compute", a=20, b=22)\nprint(total)')
assert result.success
assert "42" in result.stdout
@skip_if_hyperlight_integration_tests_disabled
async def test_output_dir_cleared_between_invocations() -> None:
"""Verify stale output files don't leak across invocations (comment 23)."""
_skip_if_hyperlight_integration_runtime_disabled()
provider = HyperlightCodeActProvider(workspace_root=Path(__file__).parent)
context = _FakeSessionContext()
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
# First invocation: write a file
result1 = await run_tool.invoke(
arguments={"code": ('with open("/output/stale.txt", "w") as f:\n f.write("first")\nprint("wrote")\n')}
)
assert result1[0].type == "text" or result1[0].type == "data"
outputs1 = result1
assert any(
item.type == "data" and "stale.txt" in (item.additional_properties or {}).get("path", "") for item in outputs1
), "First invocation should produce stale.txt"
# Second invocation: no file writes
result2 = await run_tool.invoke(arguments={"code": 'print("clean")\n'})
outputs2 = result2
stale_files = [
item
for item in outputs2
if item.type == "data" and "stale.txt" in (item.additional_properties or {}).get("path", "")
]
assert not stale_files, "Stale output file leaked into second invocation"
@skip_if_hyperlight_integration_tests_disabled
async def test_run_code_does_not_block_event_loop() -> None:
"""Verify _run_code uses asyncio.to_thread so the event loop stays responsive (comment 26)."""
_skip_if_hyperlight_integration_runtime_disabled()
provider = HyperlightCodeActProvider()
context = _FakeSessionContext()
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
# Monkeypatch the registry.execute to block on an event, proving the event loop
# stays responsive while the worker thread is blocked.
release = threading.Event()
async_started = asyncio.Event()
loop = asyncio.get_running_loop()
original_execute = run_tool._registry.execute
def _blocking_execute(*, config, code):
loop.call_soon_threadsafe(async_started.set)
release.wait(timeout=10)
return original_execute(config=config, code=code)
run_tool._registry.execute = _blocking_execute # type: ignore[assignment]
concurrent_ran = False
async def _concurrent_task():
nonlocal concurrent_ran
await async_started.wait()
concurrent_ran = True
release.set()
code_task = asyncio.create_task(run_tool.invoke(arguments={"code": 'print("done")\n'}))
await _concurrent_task()
result = await code_task
assert concurrent_ran, "Event loop was blocked during sandbox execution"
assert result[0].type == "text"
class _ThreadAffinityFakeSandbox(_FakeSandbox):
"""Fake sandbox that records the OS thread of every method invocation.
Mirrors the PyO3 ``unsendable`` invariant of ``hyperlight_sandbox.WasmSandbox``:
if ``__init__``, ``register_tool``, ``allow_domain``, ``run``, ``snapshot`` or ``restore``
are ever called from more than one thread for a given instance, the test fails.
"""
affinity_failures: list[str] = []
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._owner_thread = threading.get_ident()
self.thread_ids: set[int] = {self._owner_thread}
def _record(self, method: str) -> None:
ident = threading.get_ident()
self.thread_ids.add(ident)
if ident != self._owner_thread:
_ThreadAffinityFakeSandbox.affinity_failures.append(
f"{method} called from thread {ident}, expected {self._owner_thread}"
)
def register_tool(self, name_or_tool: Any, callback: Any | None = None) -> None:
self._record("register_tool")
super().register_tool(name_or_tool, callback)
def allow_domain(self, target: str, methods: list[str] | None = None) -> None:
self._record("allow_domain")
super().allow_domain(target, methods)
def run(self, code: str) -> _FakeResult:
self._record("run")
return super().run(code)
def snapshot(self) -> str:
self._record("snapshot")
return super().snapshot()
def restore(self, snapshot: Any) -> None:
self._record("restore")
super().restore(snapshot)
async def test_sandbox_calls_are_pinned_to_owning_worker_thread(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Regression: WasmSandbox is unsendable; every sandbox call must run on its owner thread."""
_ThreadAffinityFakeSandbox.instances.clear()
_ThreadAffinityFakeSandbox.affinity_failures.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _ThreadAffinityFakeSandbox)
execute_code = HyperlightExecuteCodeTool()
# Invoke many times concurrently; asyncio.to_thread will spread these across the default
# executor's worker threads, which previously caused PyO3 to panic when a different thread
# touched the cached sandbox.
results = await asyncio.gather(*[execute_code.invoke(arguments={"code": "None"}) for _ in range(8)])
for result in results:
assert result[0].type == "text"
assert _ThreadAffinityFakeSandbox.affinity_failures == []
assert len(_ThreadAffinityFakeSandbox.instances) == 1
sandbox = _ThreadAffinityFakeSandbox.instances[0]
# All sandbox-touching calls must have stayed on a single owning thread, distinct from the
# caller thread that asyncio.to_thread used for dispatch.
assert sandbox.thread_ids == {sandbox._owner_thread}
assert sandbox._owner_thread != threading.get_ident()
async def test_sandbox_owner_thread_persists_across_dispatch_threads(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Sequential calls landing on different dispatch threads still share one sandbox thread."""
_ThreadAffinityFakeSandbox.instances.clear()
_ThreadAffinityFakeSandbox.affinity_failures.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _ThreadAffinityFakeSandbox)
execute_code = HyperlightExecuteCodeTool()
for _ in range(5):
result = await execute_code.invoke(arguments={"code": "None"})
assert result[0].type == "text"
assert _ThreadAffinityFakeSandbox.affinity_failures == []
assert len(_ThreadAffinityFakeSandbox.instances) == 1
def test_sandbox_registry_close_shuts_down_workers(monkeypatch: pytest.MonkeyPatch) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
registry = execute_code_module._SandboxRegistry()
execute_code = HyperlightExecuteCodeTool(_registry=registry)
asyncio.run(execute_code.invoke(arguments={"code": "None"}))
entries = list(registry._entries.values())
assert len(entries) == 1
worker = entries[0].worker
registry.close()
assert registry._entries == {}
# After shutdown, the worker must report itself as no longer accepting work.
assert worker.is_alive() is False
def test_sandbox_registry_close_releases_per_entry_resources(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""close() must invoke any sandbox close hook and release temp directories."""
close_calls: list[int] = []
class _ClosableFakeSandbox(_FakeSandbox):
def close(self) -> None:
close_calls.append(1)
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _ClosableFakeSandbox)
workspace = tmp_path / "workspace"
workspace.mkdir()
registry = execute_code_module._SandboxRegistry()
execute_code = HyperlightExecuteCodeTool(workspace_root=workspace, _registry=registry)
asyncio.run(execute_code.invoke(arguments={"code": "None"}))
entries = list(registry._entries.values())
assert len(entries) == 1
entry = entries[0]
assert entry.input_dir is not None and entry.output_dir is not None
input_path = Path(entry.input_dir.name)
output_path = Path(entry.output_dir.name)
assert input_path.exists() and output_path.exists()
registry.close()
assert close_calls == [1]
assert not input_path.exists()
assert not output_path.exists()
async def test_make_sandbox_callback_returns_native_dict() -> None:
"""Host tool returning a dict must be forwarded as a native dict (no repr round-trip)."""
@tool
def get_weather(city: str) -> dict[str, Any]:
"""Get weather."""
return {"city": city, "temp_c": 21.5}
callback = execute_code_module._make_sandbox_callback(get_weather)
result = callback(city="Seattle")
assert isinstance(result, dict)
assert result == {"city": "Seattle", "temp_c": 21.5}
async def test_make_sandbox_callback_bypasses_user_result_parser() -> None:
"""Documented behavior change: result_parser is bypassed in the sandbox path."""
parser_calls: list[Any] = []
def parser(value: Any) -> str:
parser_calls.append(value)
return "PARSED"
@tool(result_parser=parser)
def make_payload() -> dict[str, int]:
"""Returns a dict."""
return {"a": 1, "b": 2}
callback = execute_code_module._make_sandbox_callback(make_payload)
result = callback()
assert result == {"a": 1, "b": 2}
assert parser_calls == [], "result_parser must not run on the sandbox path"
async def test_make_sandbox_callback_propagates_exceptions() -> None:
@tool
def boom(x: int) -> int:
"""Always fails."""
raise RuntimeError("nope")
callback = execute_code_module._make_sandbox_callback(boom)
with pytest.raises(RuntimeError, match="nope"):
callback(x=1)
class _OwnerThreadTrackedResult:
"""Fake sandbox.run() return value that mirrors a PyO3 ``unsendable`` object's Drop.
Records (rather than panics, since CPython swallows __del__ exceptions) the OS thread
that finalized the object, so tests can assert it was dropped on the sandbox's owner
thread and not on whatever thread happened to GC it.
"""
drop_thread_violations: list[str] = []
def __init__(self, *, owner_thread: int, success: bool = True, stdout: str = "", stderr: str = "") -> None:
self._owner_thread = owner_thread
self.success = success
self.stdout = stdout
self.stderr = stderr
def __del__(self) -> None:
ident = threading.get_ident()
if ident != self._owner_thread:
type(self).drop_thread_violations.append(
f"_OwnerThreadTrackedResult dropped on thread {ident}, owner was {self._owner_thread}"
)
class _ResultDropTrackingFakeSandbox(_FakeSandbox):
"""Fake sandbox whose ``run()`` returns an owner-thread-tracking result."""
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._owner_thread = threading.get_ident()
def run(self, code: str) -> Any:
del code
# Real Hyperlight runs almost always have non-empty stdout (the executed Python
# ``print`` output); that is the path where _build_execution_contents attaches
# raw_representation=result and the unsendable object escapes the worker thread.
return _OwnerThreadTrackedResult(owner_thread=self._owner_thread, success=True, stdout="hello\n")
def test_sandbox_run_result_is_finalized_on_owner_thread(monkeypatch: pytest.MonkeyPatch) -> None:
"""Regression: the object returned by ``sandbox.run`` must not escape its owner thread.
The Hyperlight ``WasmSandbox`` is unsendable; the value its ``run()`` returns can carry
a back-reference to the sandbox and is itself unsendable. Attaching it to
``Content.raw_representation`` lets it ride out of the worker thread and be garbage
collected on whichever thread the asyncio loop / agent state ends up on, which trips
the PyO3 ``Drop`` panic. Drop must happen on the worker thread that ran ``run()``.
"""
_OwnerThreadTrackedResult.drop_thread_violations.clear()
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _ResultDropTrackingFakeSandbox)
execute_code = HyperlightExecuteCodeTool()
def _drive() -> None:
# Run the whole invocation inside a helper frame so every local
# reference (contents, awaitable, asyncio frames) dies when the
# function returns. Anything still pinning the result is the bug.
contents = asyncio.run(execute_code.invoke(arguments={"code": "None"}))
assert contents and contents[0].type == "text"
_drive()
for _ in range(3):
gc.collect()
assert _OwnerThreadTrackedResult.drop_thread_violations == []
def test_sandbox_is_finalized_on_owner_thread_after_registry_close(monkeypatch: pytest.MonkeyPatch) -> None:
"""Regression: dropping the sandbox object itself must occur on its owner thread.
``_SandboxRegistry.close()`` previously held entries in a local list whose lifetime
extended onto the caller's thread. When that list went out of scope the unsendable
sandbox was finalized on the caller's thread, panicking PyO3 with
"WasmSandbox is unsendable, but is being dropped by another thread".
"""
drop_violations: list[str] = []
class _OwnerDropFakeSandbox(_FakeSandbox):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._owner_thread = threading.get_ident()
# Do not pin ourselves on the class-level instances list; we want the
# registry/entry to hold the only strong reference so that dispose-time
# drop is what determines the finalizer thread.
_FakeSandbox.instances.remove(self)
def __del__(self) -> None:
ident = threading.get_ident()
if ident != self._owner_thread:
drop_violations.append(f"sandbox dropped on thread {ident}, owner was {self._owner_thread}")
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _OwnerDropFakeSandbox)
registry = execute_code_module._SandboxRegistry()
execute_code = HyperlightExecuteCodeTool(_registry=registry)
asyncio.run(execute_code.invoke(arguments={"code": "None"}))
registry.close()
# Release the registry/tool references and force a GC. With the fix in place the
# sandbox is already disposed on the worker thread inside close(); dropping these
# local references must not trigger a wrong-thread __del__ now.
del registry
del execute_code
for _ in range(3):
gc.collect()
assert drop_violations == [], f"sandbox was dropped off-thread despite registry close: {drop_violations}"
def test_worker_failure_does_not_leak_unsendable_via_exception_traceback(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Regression: an exception raised inside a worker closure must not leak unsendable refs.
Production failure mode: ``_build_sandbox`` (or ``sandbox.run``) raises on the
worker thread. ``concurrent.futures`` propagates the exception via
``Future.result()`` to the caller's thread. Python's exception object retains
``__traceback__`` whose frames reference local variables -- including the
partially-built PyO3 unsendable sandbox. When the caller's thread eventually
GCs the exception, those locals are dec_ref'd on the wrong thread and PyO3
panics with
``_native_wasm::WasmSandbox is unsendable, but is being dropped on another thread``.
The fix routes every worker closure through ``_run_on_worker``, which catches
the exception on the worker thread, drops its traceback there, and re-raises
a fresh exception on the caller side carrying only the message.
"""
drop_violations: list[str] = []
class _RaisingFakeSandbox(_FakeSandbox):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._owner_thread = threading.get_ident()
_FakeSandbox.instances.remove(self)
# Simulate production bug: build raises while ``self`` is alive in
# the calling frame's locals -- the exception traceback will retain
# a reference to this object.
raise RuntimeError("simulated build failure with unsendable in frame locals")
def __del__(self) -> None:
ident = threading.get_ident()
if ident != self._owner_thread:
drop_violations.append(f"sandbox dropped on thread {ident}, owner was {self._owner_thread}")
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _RaisingFakeSandbox)
registry = execute_code_module._SandboxRegistry()
execute_code = HyperlightExecuteCodeTool(_registry=registry)
async def _drive(tool: HyperlightExecuteCodeTool) -> None:
for _ in range(4):
with contextlib.suppress(Exception):
await tool.invoke(arguments={"code": "None"})
asyncio.run(_drive(execute_code))
registry.close()
del registry
del execute_code
for _ in range(5):
gc.collect()
assert drop_violations == [], (
f"sandbox dropped off-thread despite worker raising on the owner thread: {drop_violations}"
)
def test_sandbox_entry_does_not_expose_unsendable_attributes() -> None:
"""Architectural regression: the entry must not hold sandbox/snapshot as attributes.
The unsendable PyO3 sandbox/snapshot must live ONLY inside the per-entry worker
thread, accessible only via worker-submitted closures. Any direct ``entry.sandbox``
or ``entry.snapshot`` attribute would let callers obtain a strong reference that
can be released on a non-owner thread, triggering PyO3's unsendable Drop panic
(the production bug we are fixing).
"""
fields = {f.name for f in dataclasses.fields(execute_code_module._SandboxEntry)}
assert "sandbox" not in fields, "_SandboxEntry must not expose `sandbox` directly"
assert "snapshot" not in fields, "_SandboxEntry must not expose `snapshot` directly"
# Whatever attributes remain must be sendable / safe to GC on any thread.
assert fields <= {"worker", "input_dir", "output_dir"}
def test_sandbox_survives_external_thread_holding_stale_reference(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Regression: stale refs held by external executors must not cause wrong-thread Drop.
Production traceback was ``concurrent.futures.thread._worker:95 del work_item`` on
``asyncio_0`` -- an external ``ThreadPoolExecutor`` whose ``_WorkItem`` transitively
held a strong reference to the sandbox via ``self._registry.execute``. When that
work_item was deleted on the external worker thread, the sandbox's refcount could
reach zero there, panicking PyO3.
With the actor-model refactor, ``HyperlightExecuteCodeTool._run_code`` runs the
sandbox call via ``asyncio.to_thread(self._registry.execute, ...)`` which creates
an external work_item containing ``self._registry.execute`` -- but that reference
transitively holds only the registry, not the sandbox. The sandbox lives entirely
inside the per-entry ``_SandboxWorker`` and never escapes; so when the external
work_item is deleted on a non-owner thread, the sandbox's refcount cannot reach
zero there.
"""
drop_violations: list[str] = []
class _OwnerDropFakeSandbox(_FakeSandbox):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._owner_thread = threading.get_ident()
_FakeSandbox.instances.remove(self)
def __del__(self) -> None:
ident = threading.get_ident()
if ident != self._owner_thread:
drop_violations.append(f"sandbox dropped on thread {ident}, owner was {self._owner_thread}")
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _OwnerDropFakeSandbox)
registry = execute_code_module._SandboxRegistry()
execute_code = HyperlightExecuteCodeTool(_registry=registry)
async def _drive_many(tool: HyperlightExecuteCodeTool) -> None:
# Many concurrent invocations push work_items into asyncio's default executor;
# each work_item's args transitively reference the registry. If the registry
# were the sandbox holder, the work_items' deletion on asyncio_0/asyncio_1 etc.
# could trigger a wrong-thread Drop -- which is exactly the production bug.
await asyncio.gather(*[tool.invoke(arguments={"code": "None"}) for _ in range(8)])
asyncio.run(_drive_many(execute_code))
registry.close()
del registry
del execute_code
for _ in range(5):
gc.collect()
assert drop_violations == []