mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Python: Fix hyperlight WasmSandbox cross-thread Drop and harden hosted-agent sample (#5603)
* update hyperlight to beta and move samples, add hosted agent sample * Python: Fix hyperlight WasmSandbox cross-thread Drop and harden sample Root cause: when a worker-side closure raised, the exception's __traceback__ retained frame locals that included the partially constructed PyO3 sandbox. Future.result() re-raised that exception on the caller thread, and when the caller's exception was eventually GC'd the frame locals were released off-thread, dec_ref'ing the unsendable sandbox from the wrong thread and tripping the PyO3 panic '_native_wasm::WasmSandbox is unsendable, but is being dropped on another thread'. Fix: * Add _SandboxWorker._run_on_worker which catches every exception on the worker, drops __traceback__ there, deletes the original exception, and re-raises a fresh instance on the caller thread. initialize and execute route through it; dispose keeps its bare-submit semantics. * Add an opt-in diagnostic module _drop_diagnostic (no-op unless HYPERLIGHT_TRACE_DROPS=1) that installs a sys.unraisablehook and dumps owner-thread + per-thread stacks on any future cross-thread unsendable Drop. Useful for triaging similar PyO3 regressions. * Tests: cross-thread invocation, traceback-leak isolation, _SandboxEntry attribute-shape check, and a stale-reference stress test driven through asyncio.to_thread. Sample (samples/04-hosting/foundry-hosted-agents/responses/06_hyperlight_codeact): * Dockerfile installs agent-framework-* from in-tree source with python/ as build context so unreleased fixes can be validated end-to-end. * call_server.py pins the Responses API version. * main.py enables include_detailed_errors=True so future tool failures surface the actual exception text instead of a bare 'Error: Function failed.' string. * README.md documents the in-tree-package build and the Hyperlight hypervisor requirement (/dev/kvm on Linux, MSHV on Windows). Hosted environments without hypervisor passthrough surface 'No Hypervisor was found for Sandbox'; this is a hosting constraint, not a hyperlight bug. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Python: remove _drop_diagnostic from hyperlight package The diagnostic module was useful while bisecting the cross-thread Drop bug, but it is no longer needed now that _SandboxWorker._run_on_worker prevents the panic at the source. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Python: address PR review feedback on hyperlight - Use lazy agent_framework.hyperlight import in sample main.py. - Env-driven endpoint (FOUNDRY_AGENT_ENDPOINT) in call_server.py; remove personal URLs. - Align agent.yaml model deployment with manifest (gpt-4.1-mini). - Tighten Dockerfile requirements guard; drop dangling deploy.ps1 reference. - Preserve exception args when sanitizing tracebacks in _run_on_worker. - Add public _SandboxWorker.is_alive(); update test to avoid private attr. - Add namespace coverage tests for agent_framework.hyperlight lazy loader. - Add prominent note: Foundry hosted-agent runtime does not yet support Hyperlight (no hypervisor exposed); container works locally with /dev/kvm. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Python: bump hyperlight-sandbox dependencies to 0.4.x Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Python: renumber hyperlight codeact sample to 08 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Coerce worker exception args to strings for cross-thread safety Stringify exc.args on the worker thread before propagating, so any PyO3 unsendable object captured in args (e.g. via a caller-supplied callback or underlying SDK) cannot be Dropped on the calling thread. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * moved sample --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
36b9b41e3b
commit
57c901a245
@@ -8,10 +8,10 @@ import shutil
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable, Sequence
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from contextlib import suppress
|
||||
from copy import copy
|
||||
from dataclasses import dataclass, field
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path, PurePosixPath
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Any, Protocol, TypeGuard, TypeVar, cast
|
||||
@@ -92,39 +92,208 @@ _T = TypeVar("_T")
|
||||
|
||||
|
||||
class _SandboxWorker:
|
||||
"""Single-threaded executor that confines all sandbox operations to one OS thread.
|
||||
"""Thread-confined actor that owns a sandbox + snapshot.
|
||||
|
||||
The Hyperlight ``WasmSandbox`` is declared ``unsendable`` in PyO3, meaning it can only be
|
||||
accessed from the OS thread that created it; touching it from any other thread triggers a
|
||||
Rust panic that cannot be caught from Python. Every cached :class:`_SandboxEntry` therefore
|
||||
owns its own ``_SandboxWorker``, and *all* lifecycle and execution calls against the
|
||||
underlying sandbox object must be routed through :meth:`submit`/:meth:`run`.
|
||||
The Hyperlight ``WasmSandbox`` is declared ``unsendable`` in PyO3: it can only be
|
||||
accessed *and dropped* from the OS thread that created it. Touching or
|
||||
releasing it on any other thread triggers a Rust panic
|
||||
(``"_native_wasm::WasmSandbox is unsendable, but is being dropped on another thread"``)
|
||||
that cannot be caught from Python.
|
||||
|
||||
To make this guarantee airtight, this class is an actor: the underlying
|
||||
sandbox and snapshot are stored ONLY as worker-local state and are never
|
||||
exposed to or returned to other threads. Public methods submit closures to
|
||||
the dedicated single-thread executor and return only sendable results.
|
||||
Because no caller can ever obtain a strong reference to the unsendable
|
||||
objects, no caller can ever cause them to be dropped on the wrong thread.
|
||||
|
||||
Exception isolation: exceptions raised inside worker closures carry a
|
||||
``__traceback__`` whose frames retain references to local variables --
|
||||
including PyO3 unsendable sandbox/native_result objects. Letting such an
|
||||
exception propagate to the calling thread would defeat the actor model:
|
||||
when the calling thread GCs the exception, the traceback's frame locals
|
||||
are dropped on the wrong thread and PyO3 panics. To prevent this, every
|
||||
exception raised inside a worker closure is caught on the worker, the
|
||||
traceback is dropped while still on the worker thread, and a sanitized
|
||||
copy (preserving message and exception type) is re-raised on the caller.
|
||||
"""
|
||||
|
||||
__slots__ = ("_executor",)
|
||||
__slots__ = ("_executor", "_initialized", "_sandbox", "_snapshot")
|
||||
|
||||
def __init__(self, *, name: str = "hl-sandbox") -> None:
|
||||
self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix=name)
|
||||
# _sandbox/_snapshot are accessed/mutated ONLY from worker-side closures.
|
||||
self._sandbox: Any = None
|
||||
self._snapshot: Any = None
|
||||
self._initialized = False
|
||||
|
||||
def submit(self, fn: Callable[..., _T], /, *args: Any, **kwargs: Any) -> Future[_T]:
|
||||
return self._executor.submit(fn, *args, **kwargs)
|
||||
def _run_on_worker(self, fn: Callable[[], _T]) -> _T:
|
||||
"""Run ``fn`` on the worker thread; sanitize any exception's traceback there.
|
||||
|
||||
def run(self, fn: Callable[..., _T], /, *args: Any, **kwargs: Any) -> _T:
|
||||
return self._executor.submit(fn, *args, **kwargs).result()
|
||||
If ``fn`` raises, the exception's ``__traceback__`` is dropped on the worker
|
||||
thread (so any PyO3 unsendable locals captured in frame locals are released
|
||||
on the owner thread) and a fresh exception of the same type is raised on
|
||||
the caller's thread carrying only the original message.
|
||||
"""
|
||||
|
||||
def shutdown(self) -> None:
|
||||
# Do not block on shutdown; stop accepting new tasks, but allow the currently running
|
||||
# task and any already-queued tasks to finish before the worker thread exits.
|
||||
def _wrapped() -> tuple[bool, Any]:
|
||||
try:
|
||||
return True, fn()
|
||||
except BaseException as exc:
|
||||
exc_type = type(exc)
|
||||
# Capture args (usually (message,)) so the re-raised exception keeps the
|
||||
# original shape for types whose constructor doesn't accept a single str.
|
||||
# Coerce each arg to ``str`` on the worker thread: if a caller-supplied
|
||||
# callback (or an underlying SDK) constructed the exception with a PyO3
|
||||
# unsendable object in args, forwarding it as-is would re-introduce the
|
||||
# same cross-thread Drop hazard the traceback nulling avoids. Strings
|
||||
# are always sendable. Fall back to the str() form if args is empty.
|
||||
exc_args: tuple[str, ...] = tuple(str(a) for a in exc.args) if exc.args else (str(exc),)
|
||||
# Drop the traceback on the worker thread so frame locals (which
|
||||
# may include PyO3 unsendable objects) are released here, not on
|
||||
# the caller thread that will receive the wrapped exception.
|
||||
exc.__traceback__ = None
|
||||
del exc
|
||||
return False, (exc_type, exc_args)
|
||||
|
||||
ok, payload = self._executor.submit(_wrapped).result()
|
||||
if ok:
|
||||
return cast(_T, payload)
|
||||
exc_type, exc_args = cast(tuple[type[BaseException], tuple[str, ...]], payload)
|
||||
# Re-raise a fresh instance with no chained traceback frames from the worker.
|
||||
# If the exception type's constructor rejects the captured args (rare), fall
|
||||
# back to a RuntimeError carrying the string form so we never lose the signal.
|
||||
try:
|
||||
raise exc_type(*exc_args)
|
||||
except TypeError:
|
||||
raise RuntimeError(f"{exc_type.__name__}: {exc_args}") from None
|
||||
|
||||
def initialize(self, build_fn: Callable[[], tuple[Any, Any]]) -> None:
|
||||
"""Build and install the sandbox+snapshot on the worker thread.
|
||||
|
||||
``build_fn`` is invoked with no arguments on the worker thread. It must
|
||||
return ``(sandbox, snapshot)``. Both references are retained as worker-
|
||||
local attributes; they do not escape this thread.
|
||||
"""
|
||||
|
||||
def _init_on_worker() -> None:
|
||||
sandbox, snapshot = build_fn()
|
||||
self._sandbox = sandbox
|
||||
self._snapshot = snapshot
|
||||
self._initialized = True
|
||||
# Locals fall out of scope on the worker thread; the worker-local
|
||||
# attributes hold the only strong refs from now on.
|
||||
|
||||
self._run_on_worker(_init_on_worker)
|
||||
|
||||
def execute(
|
||||
self,
|
||||
*,
|
||||
code: str,
|
||||
output_dir: TemporaryDirectory[str] | None,
|
||||
build_contents: Callable[..., list[Content]],
|
||||
) -> list[Content]:
|
||||
"""Restore + run + build sendable contents — all on the worker thread.
|
||||
|
||||
Returns a plain ``list[Content]`` whose elements never carry strong
|
||||
references to the underlying sandbox or snapshot.
|
||||
"""
|
||||
|
||||
def _on_worker() -> list[Content]:
|
||||
sandbox = self._sandbox
|
||||
snapshot = self._snapshot
|
||||
sandbox.restore(snapshot)
|
||||
_clear_directory(output_dir)
|
||||
result = sandbox.run(code=code)
|
||||
try:
|
||||
return build_contents(
|
||||
result=result,
|
||||
sandbox=sandbox,
|
||||
output_dir=output_dir,
|
||||
code=code,
|
||||
)
|
||||
finally:
|
||||
# ``result`` may carry a back-reference to the sandbox. Force its
|
||||
# final dec_ref on this thread so Drop runs here, not on whatever
|
||||
# thread later GCs the ``Content`` list.
|
||||
del result
|
||||
|
||||
return self._run_on_worker(_on_worker)
|
||||
|
||||
def is_alive(self) -> bool:
|
||||
"""Return ``True`` while the worker thread can still accept new submissions.
|
||||
|
||||
Useful for tests/observability; returns ``False`` after ``dispose()``.
|
||||
"""
|
||||
try:
|
||||
self._executor.submit(lambda: None).result(timeout=1.0)
|
||||
except RuntimeError:
|
||||
return False
|
||||
return True
|
||||
|
||||
def dispose(self) -> None:
|
||||
"""Release the sandbox+snapshot on the owner worker thread, then shut down.
|
||||
|
||||
Safe to call multiple times. After ``dispose`` returns, the sandbox/
|
||||
snapshot are guaranteed to have been released on the worker thread; any
|
||||
remaining references held elsewhere have already been impossible (they
|
||||
never leaked out of this object).
|
||||
"""
|
||||
|
||||
def _dispose_on_worker() -> None:
|
||||
sandbox = self._sandbox
|
||||
snapshot = self._snapshot
|
||||
self._sandbox = None
|
||||
self._snapshot = None
|
||||
close_hook = (
|
||||
(getattr(sandbox, "close", None) or getattr(sandbox, "shutdown", None)) if sandbox is not None else None
|
||||
)
|
||||
if callable(close_hook):
|
||||
with suppress(Exception):
|
||||
close_hook()
|
||||
# ``sandbox`` and ``snapshot`` are local on the worker thread and
|
||||
# will be dec_ref'd here when this frame returns -> Drop on worker.
|
||||
del sandbox, snapshot
|
||||
|
||||
if self._initialized:
|
||||
try:
|
||||
# Use the bare executor here -- _dispose_on_worker swallows its
|
||||
# own errors and never raises, so traceback sanitization is not
|
||||
# needed and we want dispose to remain robust during teardown.
|
||||
self._executor.submit(_dispose_on_worker).result()
|
||||
except RuntimeError:
|
||||
# Worker already shut down; sandbox/snapshot will leak rather
|
||||
# than panic on the wrong thread. This is the safest fallback.
|
||||
pass
|
||||
finally:
|
||||
self._initialized = False
|
||||
# Do not block on shutdown; stop accepting new tasks, but allow any
|
||||
# already-queued task (including the dispose closure above) to finish.
|
||||
self._executor.shutdown(wait=False, cancel_futures=False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class _SandboxEntry:
|
||||
sandbox: Any
|
||||
snapshot: Any
|
||||
"""Per-config cached sandbox handle.
|
||||
|
||||
The unsendable sandbox/snapshot live inside ``worker`` and never appear as
|
||||
Python attributes on this object. Anything stored here is sendable and
|
||||
safe to GC on any thread.
|
||||
"""
|
||||
|
||||
worker: _SandboxWorker
|
||||
input_dir: TemporaryDirectory[str] | None
|
||||
output_dir: TemporaryDirectory[str] | None
|
||||
worker: _SandboxWorker = field(default_factory=_SandboxWorker)
|
||||
|
||||
def dispose(self) -> None:
|
||||
"""Release the sandbox+snapshot on the worker thread and clean up temp dirs."""
|
||||
self.worker.dispose()
|
||||
for tmp_dir in (self.input_dir, self.output_dir):
|
||||
if tmp_dir is not None:
|
||||
with suppress(Exception):
|
||||
tmp_dir.cleanup()
|
||||
self.input_dir = None
|
||||
self.output_dir = None
|
||||
|
||||
|
||||
def _load_sandbox_class() -> type[Any]:
|
||||
@@ -432,6 +601,23 @@ def _parse_output_files(
|
||||
return []
|
||||
|
||||
|
||||
def _result_snapshot(result: Any) -> dict[str, Any]:
|
||||
"""Return a sendable plain-dict snapshot of a sandbox.run() result.
|
||||
|
||||
The Hyperlight ``WasmSandbox.run()`` return value is a PyO3 ``unsendable`` object that
|
||||
can carry a back-reference to the sandbox itself. Storing it on
|
||||
``Content.raw_representation`` lets it ride out of the owner thread and be garbage
|
||||
collected elsewhere, which trips the PyO3 ``Drop`` panic. Build a thread-safe summary
|
||||
of the fields we actually surface and forward that instead, so the original result can
|
||||
be released on the worker thread that produced it.
|
||||
"""
|
||||
return {
|
||||
"success": bool(getattr(result, "success", False)),
|
||||
"stdout": str(getattr(result, "stdout", "") or ""),
|
||||
"stderr": str(getattr(result, "stderr", "") or ""),
|
||||
}
|
||||
|
||||
|
||||
def _build_execution_contents(
|
||||
*,
|
||||
result: Any,
|
||||
@@ -442,10 +628,11 @@ def _build_execution_contents(
|
||||
success = bool(getattr(result, "success", False))
|
||||
stdout = str(getattr(result, "stdout", "") or "").replace("\r\n", "\n") or None
|
||||
stderr = str(getattr(result, "stderr", "") or "").replace("\r\n", "\n") or None
|
||||
snapshot = _result_snapshot(result)
|
||||
outputs: list[Content] = []
|
||||
|
||||
if stdout is not None:
|
||||
outputs.append(Content.from_text(stdout, raw_representation=result))
|
||||
outputs.append(Content.from_text(stdout, raw_representation=snapshot))
|
||||
|
||||
outputs.extend(
|
||||
_parse_output_files(
|
||||
@@ -457,7 +644,7 @@ def _build_execution_contents(
|
||||
|
||||
if success:
|
||||
if stderr is not None:
|
||||
outputs.append(Content.from_text(stderr, raw_representation=result))
|
||||
outputs.append(Content.from_text(stderr, raw_representation=snapshot))
|
||||
if not outputs:
|
||||
outputs.append(Content.from_text("Code executed successfully without output."))
|
||||
return outputs
|
||||
@@ -467,7 +654,7 @@ def _build_execution_contents(
|
||||
Content.from_error(
|
||||
message="Execution error",
|
||||
error_details=error_details,
|
||||
raw_representation=result,
|
||||
raw_representation=snapshot,
|
||||
)
|
||||
)
|
||||
return outputs
|
||||
@@ -533,21 +720,14 @@ class _SandboxRegistry(SandboxRuntime):
|
||||
Entries are keyed by ``config.cache_key()``. All operations against the underlying
|
||||
sandbox object are routed through the entry's dedicated single-threaded worker, which
|
||||
both serializes concurrent callers and satisfies the PyO3 ``unsendable`` invariant
|
||||
that the sandbox can only be touched from the thread that created it.
|
||||
that the sandbox can only be touched from the thread that created it. The unsendable
|
||||
objects never escape the worker; this method returns only sendable plain Python data.
|
||||
"""
|
||||
entry = self._get_or_create_entry(config)
|
||||
return entry.worker.run(self._run_on_worker, entry, code)
|
||||
|
||||
@staticmethod
|
||||
def _run_on_worker(entry: _SandboxEntry, code: str) -> list[Content]:
|
||||
entry.sandbox.restore(entry.snapshot)
|
||||
_clear_directory(entry.output_dir)
|
||||
result = entry.sandbox.run(code=code)
|
||||
return _build_execution_contents(
|
||||
result=result,
|
||||
sandbox=entry.sandbox,
|
||||
output_dir=entry.output_dir,
|
||||
return entry.worker.execute(
|
||||
code=code,
|
||||
output_dir=entry.output_dir,
|
||||
build_contents=_build_execution_contents,
|
||||
)
|
||||
|
||||
def _get_or_create_entry(self, config: _RunConfig) -> _SandboxEntry:
|
||||
@@ -562,22 +742,19 @@ class _SandboxRegistry(SandboxRuntime):
|
||||
def close(self) -> None:
|
||||
"""Shut down all per-entry worker threads and release per-entry resources.
|
||||
|
||||
Safe to call multiple times. Runs any sandbox close hook on the entry's
|
||||
own worker thread to honor the PyO3 ``unsendable`` invariant.
|
||||
Safe to call multiple times. Each entry's sandbox/snapshot is disposed on the
|
||||
worker thread that created it to honor the PyO3 ``unsendable`` invariant.
|
||||
"""
|
||||
with self._entries_lock:
|
||||
entries = list(self._entries.values())
|
||||
self._entries.clear()
|
||||
for entry in entries:
|
||||
close_hook = getattr(entry.sandbox, "close", None) or getattr(entry.sandbox, "shutdown", None)
|
||||
if callable(close_hook):
|
||||
with suppress(Exception):
|
||||
entry.worker.run(close_hook)
|
||||
entry.worker.shutdown()
|
||||
for tmp_dir in (entry.input_dir, entry.output_dir):
|
||||
if tmp_dir is not None:
|
||||
with suppress(Exception):
|
||||
tmp_dir.cleanup()
|
||||
try:
|
||||
for entry in entries:
|
||||
entry.dispose()
|
||||
finally:
|
||||
# Drop our local strong references; entries' own refs to sandbox/snapshot
|
||||
# were already moved into the per-worker disposal closure inside dispose().
|
||||
del entries
|
||||
|
||||
def _create_entry(self, config: _RunConfig) -> _SandboxEntry:
|
||||
input_dir_handle = TemporaryDirectory() if config.filesystem_enabled else None
|
||||
@@ -617,8 +794,6 @@ class _SandboxRegistry(SandboxRuntime):
|
||||
methods=list(allowed_domain.methods) if allowed_domain.methods is not None else None,
|
||||
)
|
||||
|
||||
worker = _SandboxWorker()
|
||||
|
||||
def _build_sandbox() -> tuple[Any, Any]:
|
||||
sandbox = _create_sandbox()
|
||||
_configure_sandbox(sandbox=sandbox, expand_missing_scheme=False)
|
||||
@@ -636,18 +811,17 @@ class _SandboxRegistry(SandboxRuntime):
|
||||
snapshot = sandbox.snapshot()
|
||||
return sandbox, snapshot
|
||||
|
||||
worker = _SandboxWorker()
|
||||
try:
|
||||
sandbox, snapshot = worker.run(_build_sandbox)
|
||||
worker.initialize(_build_sandbox)
|
||||
except BaseException:
|
||||
worker.shutdown()
|
||||
worker.dispose()
|
||||
raise
|
||||
|
||||
return _SandboxEntry(
|
||||
sandbox=sandbox,
|
||||
snapshot=snapshot,
|
||||
worker=worker,
|
||||
input_dir=input_dir_handle,
|
||||
output_dir=output_dir_handle,
|
||||
worker=worker,
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user