Python: Add Hyperlight CodeAct package and docs (#5185)

* initial work on code_mode

* updated samples

* updates to codeact

* udpated codeact

* Draft CodeAct ADR and sample updates

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* initial implementation and adr and feature

* Python: Limit Hyperlight wasm backend to Python <3.14

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: Fix CI for Hyperlight CodeAct PR

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: Run Hyperlight integration when available

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: Address Hyperlight review feedback

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: Simplify Hyperlight file mount inputs

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: Accept Path host paths in Hyperlight mounts

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: Fix Hyperlight mount typing for CI

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* temp run integration test

* Python: Strengthen Hyperlight real sandbox tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* added additional tests

* Python: Simplify Hyperlight CodeAct API

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* set tests as non-integration

* Retry Hyperlight allowed-domain registration

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Gate Hyperlight integration tests by runtime support

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix Hyperlight skip test on Python 3.14

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Delay Hyperlight runtime probe until test execution

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Relax Hyperlight Windows integration stdout assertion

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Scan Hyperlight output directory for artifacts

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Retry Hyperlight output artifact collection

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Harden Hyperlight integration output assertions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Retry Hyperlight read-back check in integration test

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Simplify Hyperlight integration write assertion

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Avoid pathlib in Hyperlight integration sandbox

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Use socket network check in Hyperlight sandbox

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Replace blocked Azure AI Search blog link

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Clarify Hyperlight guest stdlib limits

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Use _socket in Hyperlight integration sandbox

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Handle Hyperlight mounted file paths

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Broaden Hyperlight sandbox path fallbacks

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Search Hyperlight guest mounts recursively

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Split Hyperlight mount coverage

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Split Hyperlight live network tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix Hyperlight file-write test on Windows

Enable the sandbox filesystem by providing a workspace_root so
/output is mounted. Remove os.path.exists assertion (unsupported
in WASM guest) and fix Content data assertion to use .uri.
Skip the network integration test on Windows where the WASM
sandbox lacks the encodings.idna codec.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR review: ADR intro, manual wiring sample, doc clarifications

- Add CodeAct introduction section to ADR for unfamiliar readers
- Clarify 'less runtime efficient' con with specific overhead description
- Add note in Python impl doc clarifying ADR vs impl doc split
- Explain why before_run hooks must be per-run (CRUD, concurrency, approval)
- Rename code_interpreter variable to codeact in E2E sample
- Add manual static wiring sample (codeact_manual_wiring.py)
- Add 'when to use which pattern' guidance to samples README

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR #5185 review comments and add .NET CodeAct design doc

- Fix async callback: _make_sandbox_callback returns sync wrapper with
  thread + asyncio.run() bridge (was broken with real Wasm FFI)
- Fix stale output: clear output_dir before each sandbox.run() call
- Fix blocking event loop: _run_code now async with asyncio.to_thread()
- Revert _agents.py options['tools'] injection (unnecessary; provider
  uses context.extend_tools())
- Revert SessionContext.options docstring back to read-only
- Add real-sandbox test fixtures (shared/restored/fresh)
- Add 8 new real-sandbox tests for callback round-trip, stale output,
  event loop non-blocking, basic execution, stdout/stderr, errors,
  snapshot/restore, and tool registration
- Add comprehensive .NET HyperlightCodeActProvider design document

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Update hyperlight README with code snippets and remove Public API section

Replace bare export list with Quick Start code examples covering the
context provider, standalone tool, manual static wiring, and file
mounts / network access patterns.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-04-17 02:49:44 +02:00
committed by GitHub
Unverified
parent dbf935b4e3
commit b03cb324d5
25 changed files with 4176 additions and 9 deletions
+2
View File
@@ -30,6 +30,7 @@
"azuredocs",
"azurefunctions",
"boto",
"codeact",
"contentvector",
"contoso",
"datamodel",
@@ -45,6 +46,7 @@
"hnsw",
"httpx",
"huggingface",
"hyperlight",
"Instrumentor",
"logit",
"logprobs",
+1
View File
@@ -33,6 +33,7 @@ Status is grouped into these buckets:
| `agent-framework-foundry-local` | `python/packages/foundry_local` | `beta` |
| `agent-framework-gemini` | `python/packages/gemini` | `alpha` |
| `agent-framework-github-copilot` | `python/packages/github_copilot` | `beta` |
| `agent-framework-hyperlight` | `python/packages/hyperlight` | `alpha` |
| `agent-framework-lab` | `python/packages/lab` | `beta` |
| `agent-framework-mem0` | `python/packages/mem0` | `beta` |
| `agent-framework-ollama` | `python/packages/ollama` | `beta` |
@@ -89,6 +89,7 @@ logger = logging.getLogger("agent_framework")
DEFAULT_MAX_ITERATIONS: Final[int] = 40
DEFAULT_MAX_CONSECUTIVE_ERRORS_PER_REQUEST: Final[int] = 3
SHELL_TOOL_KIND_VALUE: Final[str] = "shell"
ApprovalMode: TypeAlias = Literal["always_require", "never_require"]
ChatClientT = TypeVar("ChatClientT", bound="SupportsChatGetResponse[Any]")
ResponseModelBoundT = TypeVar("ResponseModelBoundT", bound=BaseModel)
@@ -270,7 +271,7 @@ class FunctionTool(SerializationMixin):
*,
name: str,
description: str = "",
approval_mode: Literal["always_require", "never_require"] | None = None,
approval_mode: ApprovalMode | None = None,
kind: str | None = None,
max_invocations: int | None = None,
max_invocation_exceptions: int | None = None,
@@ -1033,7 +1034,7 @@ def tool(
name: str | None = None,
description: str | None = None,
schema: type[BaseModel] | Mapping[str, Any] | None = None,
approval_mode: Literal["always_require", "never_require"] | None = None,
approval_mode: ApprovalMode | None = None,
kind: str | None = None,
max_invocations: int | None = None,
max_invocation_exceptions: int | None = None,
@@ -1049,7 +1050,7 @@ def tool(
name: str | None = None,
description: str | None = None,
schema: type[BaseModel] | Mapping[str, Any] | None = None,
approval_mode: Literal["always_require", "never_require"] | None = None,
approval_mode: ApprovalMode | None = None,
kind: str | None = None,
max_invocations: int | None = None,
max_invocation_exceptions: int | None = None,
@@ -1064,7 +1065,7 @@ def tool(
name: str | None = None,
description: str | None = None,
schema: type[BaseModel] | Mapping[str, Any] | None = None,
approval_mode: Literal["always_require", "never_require"] | None = None,
approval_mode: ApprovalMode | None = None,
kind: str | None = None,
max_invocations: int | None = None,
max_invocation_exceptions: int | None = None,
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) Microsoft Corporation.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
+132
View File
@@ -0,0 +1,132 @@
# agent-framework-hyperlight
Alpha Hyperlight-backed CodeAct integrations for Microsoft Agent Framework.
## Installation
```bash
pip install agent-framework-hyperlight --pre
```
This package depends on `hyperlight-sandbox`, the packaged Python guest, and the
Wasm backend package on supported platforms. If the backend is not published for
your current platform yet, `execute_code` will fail at runtime when it tries to
create the sandbox.
## Quick start
### Context provider (recommended)
Use `HyperlightCodeActProvider` to automatically inject the `execute_code` tool
and CodeAct instructions into every agent run. Tools registered on the provider
are available inside the sandbox via `call_tool(...)` but are **not** exposed as
direct agent tools.
```python
from agent_framework import Agent, tool
from agent_framework_hyperlight import HyperlightCodeActProvider
@tool
def compute(operation: str, a: float, b: float) -> float:
"""Perform a math operation."""
ops = {"add": a + b, "subtract": a - b, "multiply": a * b, "divide": a / b}
return ops[operation]
codeact = HyperlightCodeActProvider(
tools=[compute],
approval_mode="never_require",
)
agent = Agent(
client=client,
name="CodeActAgent",
instructions="You are a helpful assistant.",
context_providers=[codeact],
)
result = await agent.run("Multiply 6 by 7 using execute_code.")
```
### Standalone tool
Use `HyperlightExecuteCodeTool` directly when you want full control over how the
tool is added to the agent. This is useful when mixing sandbox tools with
direct-only tools on the same agent.
```python
from agent_framework import Agent, tool
from agent_framework_hyperlight import HyperlightExecuteCodeTool
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email (direct-only, not available inside the sandbox)."""
return f"Email sent to {to}"
execute_code = HyperlightExecuteCodeTool(
tools=[compute],
approval_mode="never_require",
)
agent = Agent(
client=client,
name="MixedToolsAgent",
instructions="You are a helpful assistant.",
tools=[send_email, execute_code],
)
```
### Manual static wiring
For fixed configurations where provider lifecycle overhead is unnecessary, build
the CodeAct instructions once and pass them to the agent at construction time:
```python
execute_code = HyperlightExecuteCodeTool(
tools=[compute],
approval_mode="never_require",
)
codeact_instructions = execute_code.build_instructions(tools_visible_to_model=False)
agent = Agent(
client=client,
name="StaticWiringAgent",
instructions=f"You are a helpful assistant.\n\n{codeact_instructions}",
tools=[execute_code],
)
```
### File mounts and network access
Mount host directories into the sandbox and allow outbound HTTP to specific
domains:
```python
from agent_framework_hyperlight import HyperlightCodeActProvider, FileMount
codeact = HyperlightCodeActProvider(
tools=[compute],
file_mounts=[
"/host/data", # shorthand — same path in sandbox
("/host/models", "/sandbox/models"), # explicit host → sandbox mapping
FileMount("/host/config", "/sandbox/config"), # named tuple
],
allowed_domains=[
"api.github.com", # all methods
("internal.api.example.com", "GET"), # GET only
],
)
```
## Notes
- This package is intentionally separate from `agent-framework-core` so CodeAct
usage and installation remain optional.
- Alpha-package samples live under `packages/hyperlight/samples/`.
- `file_mounts` accepts a single string shorthand, an explicit `(host_path,
mount_path)` pair, or a `FileMount` named tuple. The host-side path in the
explicit forms may be a `str` or `Path`. Use the explicit two-value form when
the host path differs from the sandbox path.
- `allowed_domains` accepts a single string target such as `"github.com"` to
allow all backend-supported methods, an explicit `(target, method_or_methods)`
tuple such as `("github.com", "GET")`, or an `AllowedDomain` named tuple.
@@ -0,0 +1,24 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import importlib.metadata
from ._execute_code_tool import HyperlightExecuteCodeTool
from ._provider import HyperlightCodeActProvider
from ._types import AllowedDomain, AllowedDomainInput, FileMount, FileMountInput
try:
__version__ = importlib.metadata.version(__name__)
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0"
__all__ = [
"AllowedDomain",
"AllowedDomainInput",
"FileMount",
"FileMountInput",
"HyperlightCodeActProvider",
"HyperlightExecuteCodeTool",
"__version__",
]
@@ -0,0 +1,860 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import ast
import asyncio
import copy
import mimetypes
import shutil
import threading
import time
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from tempfile import TemporaryDirectory
from typing import Annotated, Any, Protocol, TypeGuard, cast
from urllib.parse import urlparse
from agent_framework import Content, FunctionTool
from agent_framework._tools import ApprovalMode, normalize_tools
from pydantic import BaseModel, Field
from ._instructions import build_codeact_instructions, build_execute_code_description
from ._types import AllowedDomain, AllowedDomainInput, FileMount, FileMountHostPath, FileMountInput
DEFAULT_HYPERLIGHT_BACKEND = "wasm"
DEFAULT_HYPERLIGHT_MODULE = "python_guest.path"
EXECUTE_CODE_INPUT_DESCRIPTION = "Python code to execute in an isolated Hyperlight sandbox."
OUTPUT_FILE_RETRY_ATTEMPTS = 10
OUTPUT_FILE_RETRY_DELAY_SECONDS = 0.1
class _ExecuteCodeInput(BaseModel):
code: Annotated[str, Field(description=EXECUTE_CODE_INPUT_DESCRIPTION)]
@dataclass(frozen=True, slots=True)
class _StoredFileMount:
host_path: Path
mount_path: str
@dataclass(frozen=True, slots=True)
class _NormalizedFileMount:
host_path: Path
mount_path: str
path_signature: tuple[tuple[str, int, int], ...]
@dataclass(frozen=True, slots=True)
class _RunConfig:
backend: str
module: str | None
module_path: str | None
approval_mode: ApprovalMode
tools: tuple[FunctionTool, ...]
workspace_root: Path | None
workspace_signature: tuple[tuple[str, int, int], ...]
file_mounts: tuple[_NormalizedFileMount, ...]
allowed_domains: tuple[AllowedDomain, ...]
@property
def mounted_paths(self) -> tuple[str, ...]:
return tuple(_display_mount_path(mount.mount_path) for mount in self.file_mounts)
@property
def filesystem_enabled(self) -> bool:
return self.workspace_root is not None or bool(self.file_mounts)
def cache_key(self) -> tuple[Any, ...]:
return (
self.backend,
self.module,
self.module_path,
self.approval_mode,
tuple((tool_obj.name, id(tool_obj)) for tool_obj in self.tools),
str(self.workspace_root) if self.workspace_root is not None else None,
self.workspace_signature,
tuple((mount.mount_path, str(mount.host_path), mount.path_signature) for mount in self.file_mounts),
tuple((allowed_domain.target, allowed_domain.methods) for allowed_domain in self.allowed_domains),
)
class SandboxRuntime(Protocol):
def execute(self, *, config: _RunConfig, code: str) -> list[Content]: ...
@dataclass
class _SandboxEntry:
sandbox: Any
snapshot: Any
input_dir: TemporaryDirectory[str] | None
output_dir: TemporaryDirectory[str] | None
lock: threading.RLock
def _load_sandbox_class() -> type[Any]:
try:
from hyperlight_sandbox import Sandbox
except ModuleNotFoundError as exc:
raise ModuleNotFoundError(
"Hyperlight support requires `hyperlight-sandbox`, `hyperlight-sandbox-python-guest`, "
"and a compatible backend package such as `hyperlight-sandbox-backend-wasm`."
) from exc
return Sandbox
def _passthrough_result_parser(result: Any) -> str:
return repr(result)
def _collect_tools(*tool_groups: Any) -> list[FunctionTool]:
tools_by_name: dict[str, FunctionTool] = {}
for tool_group in tool_groups:
normalized_group = normalize_tools(tool_group)
for tool_obj in normalized_group:
if not isinstance(tool_obj, FunctionTool):
continue
if tool_obj.name == "execute_code":
continue
tools_by_name.pop(tool_obj.name, None)
tools_by_name[tool_obj.name] = tool_obj
return list(tools_by_name.values())
def _resolve_execute_code_approval_mode(
*,
base_approval_mode: ApprovalMode,
tools: Sequence[FunctionTool],
) -> ApprovalMode:
if base_approval_mode == "always_require":
return "always_require"
if any(tool_obj.approval_mode == "always_require" for tool_obj in tools):
return "always_require"
return "never_require"
def _resolve_existing_path(value: str | Path) -> Path:
return Path(value).expanduser().resolve(strict=True)
def _resolve_workspace_root(value: str | Path | None) -> Path | None:
if value is None:
return None
resolved_path = _resolve_existing_path(value)
if not resolved_path.is_dir():
raise ValueError("workspace_root must point to an existing directory.")
return resolved_path
def _is_file_mount_pair(value: Any) -> TypeGuard[FileMount | tuple[FileMountHostPath, str]]:
if not isinstance(value, tuple):
return False
value_tuple = cast(tuple[object, ...], value)
if len(value_tuple) != 2:
return False
host_path, mount_path = value_tuple
return isinstance(host_path, (str, Path)) and isinstance(mount_path, str)
def _normalize_file_mount_input(file_mount: FileMountInput) -> _StoredFileMount:
host_path: FileMountHostPath
mount_path: str
if isinstance(file_mount, str):
host_path = file_mount
mount_path = file_mount
else:
host_path = file_mount[0]
mount_path = file_mount[1]
return _StoredFileMount(
host_path=_resolve_existing_path(host_path),
mount_path=_normalize_mount_path(mount_path),
)
def _normalize_domain(target: str) -> str:
candidate = target.strip()
if not candidate:
raise ValueError("Allowed domain entries must not be empty.")
parsed = urlparse(candidate if "://" in candidate else f"//{candidate}")
normalized = (parsed.netloc or parsed.path).strip().rstrip("/")
if not normalized:
raise ValueError(f"Could not normalize allowed domain entry: {target!r}.")
return normalized.lower()
def _normalize_http_method(method: str) -> str:
normalized = method.strip().upper()
if not normalized:
raise ValueError("HTTP method entries must not be empty.")
return normalized
def _normalize_http_methods(methods: str | Sequence[str] | None) -> tuple[str, ...] | None:
if methods is None:
return None
normalized_methods = (
{_normalize_http_method(methods)}
if isinstance(methods, str)
else {_normalize_http_method(method) for method in methods}
)
if not normalized_methods:
raise ValueError("Allowed domain methods must not be empty when provided.")
return tuple(sorted(normalized_methods))
def _is_allowed_domain_pair(value: Any) -> TypeGuard[tuple[str, str | Sequence[str]]]:
if not isinstance(value, tuple) or isinstance(value, AllowedDomain):
return False
value_tuple = cast(tuple[object, ...], value)
if len(value_tuple) != 2:
return False
target, methods = value_tuple
if not isinstance(target, str):
return False
if isinstance(methods, str):
return True
return isinstance(methods, Sequence)
def _normalize_allowed_domain_input(allowed_domain: AllowedDomainInput) -> AllowedDomain:
if isinstance(allowed_domain, str):
return AllowedDomain(target=_normalize_domain(allowed_domain), methods=None)
if isinstance(allowed_domain, AllowedDomain):
return AllowedDomain(
target=_normalize_domain(allowed_domain.target),
methods=_normalize_http_methods(allowed_domain.methods),
)
target, methods = allowed_domain
return AllowedDomain(
target=_normalize_domain(target),
methods=_normalize_http_methods(methods),
)
def _allowed_domain_registration_targets(*, target: str, expand_missing_scheme: bool) -> tuple[str, ...]:
if not expand_missing_scheme or "://" in target:
return (target,)
return (f"http://{target}", f"https://{target}")
def _should_retry_allowed_domain_registration(
*,
error: RuntimeError,
allowed_domains: Sequence[AllowedDomain],
) -> bool:
message = str(error).lower()
return "invalid url for network permission" in message and any(
"://" not in domain.target for domain in allowed_domains
)
def _normalize_mount_path(mount_path: str) -> str:
raw_path = mount_path.strip().replace("\\", "/")
if not raw_path:
raise ValueError("mount_path must not be empty.")
pure_path = PurePosixPath(raw_path)
parts = [part for part in pure_path.parts if part not in {"", "/", "."}]
if parts and parts[0] == "input":
parts = parts[1:]
if any(part == ".." for part in parts):
raise ValueError("mount_path must stay within /input.")
if not parts:
raise ValueError("mount_path must point to a concrete path under /input.")
return "/".join(parts)
def _display_mount_path(mount_path: str) -> str:
return f"/input/{mount_path}"
def _path_tree_signature(path: Path) -> tuple[tuple[str, int, int], ...]:
if path.is_file():
stat = path.stat()
return ((path.name, int(stat.st_size), int(stat.st_mtime_ns)),)
entries: list[tuple[str, int, int]] = []
for candidate in sorted(path.rglob("*"), key=lambda value: value.as_posix()):
try:
stat = candidate.stat()
except FileNotFoundError:
continue
relative_path = candidate.relative_to(path).as_posix()
size = int(stat.st_size) if candidate.is_file() else 0
entries.append((relative_path, size, int(stat.st_mtime_ns)))
return tuple(entries)
def _copy_path(source: Path, destination: Path) -> None:
if source.is_dir():
destination.mkdir(parents=True, exist_ok=True)
for child in sorted(source.iterdir(), key=lambda value: value.name):
_copy_path(child, destination / child.name)
return
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, destination)
def _populate_input_dir(*, config: _RunConfig, input_root: Path) -> None:
if config.workspace_root is not None:
for child in sorted(config.workspace_root.iterdir(), key=lambda value: value.name):
_copy_path(child, input_root / child.name)
for mount in config.file_mounts:
_copy_path(mount.host_path, input_root / mount.mount_path)
def _create_file_content(file_path: Path, *, relative_path: str) -> Content:
media_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
return Content.from_data(
data=file_path.read_bytes(),
media_type=media_type,
additional_properties={"path": f"/output/{relative_path}"},
)
def _normalize_output_relative_path(*, output_file: object, root: Path) -> str | None:
candidate_path = Path(str(output_file))
if candidate_path.is_absolute():
try:
return candidate_path.relative_to(root).as_posix()
except ValueError:
return None
raw_path = str(output_file).replace("\\", "/")
pure_path = PurePosixPath(raw_path)
parts = [part for part in pure_path.parts if part not in {"", "/", "."}]
if parts and parts[0] == "output":
parts = parts[1:]
if not parts or any(part == ".." for part in parts):
return None
return "/".join(parts)
def _collect_output_relative_paths(*, sandbox: Any, root: Path) -> set[str]:
relative_paths: set[str] = set()
if hasattr(sandbox, "get_output_files"):
try:
output_files = cast(Sequence[object], sandbox.get_output_files())
except Exception:
output_files = ()
for output_file in output_files:
if (relative_path := _normalize_output_relative_path(output_file=output_file, root=root)) is not None:
relative_paths.add(relative_path)
for host_path in root.rglob("*"):
if host_path.is_file():
relative_paths.add(host_path.relative_to(root).as_posix())
return relative_paths
def _parse_output_files(
*,
sandbox: Any,
output_dir: TemporaryDirectory[str] | None,
expect_output_files: bool,
) -> list[Content]:
if output_dir is None:
return []
root = Path(output_dir.name)
for attempt in range(OUTPUT_FILE_RETRY_ATTEMPTS):
relative_paths = _collect_output_relative_paths(sandbox=sandbox, root=root)
missing_files = expect_output_files and not relative_paths
contents: list[Content] = []
for relative_path in sorted(relative_paths):
host_path = root.joinpath(*PurePosixPath(relative_path).parts)
if not host_path.is_file():
missing_files = True
continue
try:
contents.append(_create_file_content(host_path, relative_path=relative_path))
except PermissionError:
missing_files = True
if not missing_files or attempt == OUTPUT_FILE_RETRY_ATTEMPTS - 1:
return contents
time.sleep(OUTPUT_FILE_RETRY_DELAY_SECONDS)
return []
def _build_execution_contents(
*,
result: Any,
sandbox: Any,
output_dir: TemporaryDirectory[str] | None,
code: str,
) -> list[Content]:
success = bool(getattr(result, "success", False))
stdout = str(getattr(result, "stdout", "") or "").replace("\r\n", "\n") or None
stderr = str(getattr(result, "stderr", "") or "").replace("\r\n", "\n") or None
outputs: list[Content] = []
if stdout is not None:
outputs.append(Content.from_text(stdout, raw_representation=result))
outputs.extend(
_parse_output_files(
sandbox=sandbox,
output_dir=output_dir,
expect_output_files="/output" in code,
)
)
if success:
if stderr is not None:
outputs.append(Content.from_text(stderr, raw_representation=result))
if not outputs:
outputs.append(Content.from_text("Code executed successfully without output."))
return [Content.from_code_interpreter_tool_result(outputs=outputs, raw_representation=result)]
error_details = stderr or "Unknown sandbox error"
outputs.append(
Content.from_error(
message="Execution error",
error_details=error_details,
raw_representation=result,
)
)
return [Content.from_code_interpreter_tool_result(outputs=outputs, raw_representation=result)]
def _make_sandbox_callback(tool_obj: FunctionTool) -> Callable[..., Any]:
sandbox_tool = copy.copy(tool_obj)
sandbox_tool.result_parser = _passthrough_result_parser
def _callback(**kwargs: Any) -> Any:
async def _invoke() -> list[Content]:
return await sandbox_tool.invoke(arguments=kwargs)
# FunctionTool.invoke() is always async. The real Hyperlight backend invokes
# registered callbacks synchronously via FFI, so this must be a sync function.
# We run the async call on a dedicated thread to avoid conflicts with any
# event loop that may be running on the current thread.
result_box: list[Any] = [None]
error_box: list[BaseException] = []
def _run() -> None:
try:
result_box[0] = asyncio.run(_invoke())
except BaseException as exc:
error_box.append(exc)
worker = threading.Thread(target=_run)
worker.start()
worker.join()
if error_box:
raise error_box[0]
contents: list[Content] = result_box[0]
values: list[Any] = []
for content in contents:
if content.type == "text" and content.text is not None:
try:
values.append(ast.literal_eval(content.text))
except (SyntaxError, ValueError):
values.append(content.text)
continue
values.append(content.to_dict())
if len(values) == 1:
return values[0]
return values
return _callback
def _clear_directory(output_dir: TemporaryDirectory[str] | None) -> None:
"""Remove all contents of the output directory without deleting the directory itself."""
if output_dir is None:
return
root = Path(output_dir.name)
for child in root.iterdir():
try:
if child.is_symlink() or child.is_file():
child.unlink()
elif child.is_dir():
shutil.rmtree(child, ignore_errors=True)
except (FileNotFoundError, PermissionError):
pass
class _SandboxRegistry:
def __init__(self) -> None:
self._entries: dict[tuple[Any, ...], _SandboxEntry] = {}
self._entries_lock = threading.RLock()
def execute(self, *, config: _RunConfig, code: str) -> list[Content]:
"""Execute code in a cached sandbox matching the given config.
Entries are keyed by ``config.cache_key()``. Concurrent calls with the same
key are serialized by the entry lock so they never race, but they share the
same sandbox instance. For true parallel execution, use distinct provider
instances or configs that produce different cache keys.
"""
cache_key = config.cache_key()
with self._entries_lock:
entry = self._entries.get(cache_key)
if entry is None:
entry = self._create_entry(config)
self._entries[cache_key] = entry
with entry.lock:
entry.sandbox.restore(entry.snapshot)
_clear_directory(entry.output_dir)
result = entry.sandbox.run(code=code)
return _build_execution_contents(
result=result,
sandbox=entry.sandbox,
output_dir=entry.output_dir,
code=code,
)
def _create_entry(self, config: _RunConfig) -> _SandboxEntry:
input_dir_handle = TemporaryDirectory() if config.filesystem_enabled else None
output_dir_handle = TemporaryDirectory() if config.filesystem_enabled else None
if input_dir_handle is not None:
_populate_input_dir(config=config, input_root=Path(input_dir_handle.name))
sandbox_cls = _load_sandbox_class()
def _create_sandbox() -> Any:
try:
return sandbox_cls(
backend=config.backend,
module=config.module,
module_path=config.module_path,
input_dir=input_dir_handle.name if input_dir_handle is not None else None,
output_dir=output_dir_handle.name if output_dir_handle is not None else None,
)
except ImportError as exc:
raise RuntimeError(
"The selected Hyperlight backend is not installed or not supported on this platform. "
"Install a compatible backend package, such as `hyperlight-sandbox-backend-wasm`."
) from exc
def _configure_sandbox(*, sandbox: Any, expand_missing_scheme: bool) -> None:
for tool_obj in config.tools:
sandbox.register_tool(tool_obj.name, _make_sandbox_callback(tool_obj))
for allowed_domain in config.allowed_domains:
for target in _allowed_domain_registration_targets(
target=allowed_domain.target,
expand_missing_scheme=expand_missing_scheme,
):
sandbox.allow_domain(
target,
methods=list(allowed_domain.methods) if allowed_domain.methods is not None else None,
)
sandbox = _create_sandbox()
_configure_sandbox(sandbox=sandbox, expand_missing_scheme=False)
try:
sandbox.run("None")
except RuntimeError as exc:
if not _should_retry_allowed_domain_registration(error=exc, allowed_domains=config.allowed_domains):
raise
sandbox = _create_sandbox()
_configure_sandbox(sandbox=sandbox, expand_missing_scheme=True)
sandbox.run("None")
snapshot = sandbox.snapshot()
return _SandboxEntry(
sandbox=sandbox,
snapshot=snapshot,
input_dir=input_dir_handle,
output_dir=output_dir_handle,
lock=threading.RLock(),
)
class HyperlightExecuteCodeTool(FunctionTool):
"""Execute Python code inside a Hyperlight sandbox."""
def __init__(
self,
*,
tools: FunctionTool | Callable[..., Any] | Sequence[FunctionTool | Callable[..., Any]] | None = None,
approval_mode: ApprovalMode | None = None,
workspace_root: str | Path | None = None,
file_mounts: FileMountInput | Sequence[FileMountInput] | None = None,
allowed_domains: AllowedDomainInput | Sequence[AllowedDomainInput] | None = None,
backend: str = DEFAULT_HYPERLIGHT_BACKEND,
module: str | None = DEFAULT_HYPERLIGHT_MODULE,
module_path: str | None = None,
_registry: SandboxRuntime | None = None,
) -> None:
super().__init__(
name="execute_code",
description=EXECUTE_CODE_INPUT_DESCRIPTION,
approval_mode="never_require",
func=self._run_code,
input_model=_ExecuteCodeInput,
)
self._state_lock = threading.RLock()
self._registry = _registry or _SandboxRegistry()
self._default_approval_mode: ApprovalMode = approval_mode or "never_require"
self._workspace_root = _resolve_workspace_root(workspace_root)
self._backend: str = backend
self._module: str | None = module
self._module_path: str | None = module_path
self._managed_tools: list[FunctionTool] = []
self._file_mounts: dict[str, _StoredFileMount] = {}
self._allowed_domains: dict[str, AllowedDomain] = {}
if tools is not None:
self.add_tools(tools)
if file_mounts is not None:
self.add_file_mounts(file_mounts)
if allowed_domains is not None:
self.add_allowed_domains(allowed_domains)
self._refresh_approval_mode()
@property
def description(self) -> str:
state_lock = getattr(self, "_state_lock", None)
if state_lock is None:
return str(self.__dict__.get("description", EXECUTE_CODE_INPUT_DESCRIPTION))
with state_lock:
allowed_domains = sorted(self._allowed_domains.values(), key=lambda value: value.target)
return build_execute_code_description(
tools=self._managed_tools,
filesystem_enabled=self._workspace_root is not None or bool(self._file_mounts),
workspace_enabled=self._workspace_root is not None,
mounted_paths=[_display_mount_path(mount.mount_path) for mount in self._file_mounts.values()],
allowed_domains=allowed_domains,
)
@description.setter
def description(self, value: str) -> None:
self.__dict__["description"] = value
def add_tools(
self,
tools: FunctionTool | Callable[..., Any] | Sequence[FunctionTool | Callable[..., Any]],
) -> None:
"""Add sandbox-managed tools to this execute_code surface."""
with self._state_lock:
combined_tools = _collect_tools(self._managed_tools, tools)
self._managed_tools = combined_tools
self._refresh_approval_mode()
def get_tools(self) -> list[FunctionTool]:
"""Return the currently managed sandbox tools."""
with self._state_lock:
return list(self._managed_tools)
def remove_tool(self, name: str) -> None:
"""Remove one managed sandbox tool by name."""
with self._state_lock:
remaining_tools = [tool_obj for tool_obj in self._managed_tools if tool_obj.name != name]
if len(remaining_tools) == len(self._managed_tools):
raise KeyError(f"No managed tool named {name!r} is registered.")
self._managed_tools = remaining_tools
self._refresh_approval_mode()
def clear_tools(self) -> None:
"""Remove all managed sandbox tools."""
with self._state_lock:
self._managed_tools = []
self._refresh_approval_mode()
def add_file_mounts(self, file_mounts: FileMountInput | Sequence[FileMountInput]) -> None:
"""Add one or more file mounts under `/input`.
A single string uses the same relative path on the host and in the sandbox.
Use a two-string tuple or `FileMount` when those paths differ.
"""
if isinstance(file_mounts, str) or _is_file_mount_pair(file_mounts):
normalized_mounts = [_normalize_file_mount_input(file_mounts)]
else:
normalized_mounts = [
_normalize_file_mount_input(mount) for mount in cast(Sequence[FileMountInput], file_mounts)
]
with self._state_lock:
for mount in normalized_mounts:
self._file_mounts[mount.mount_path] = mount
def get_file_mounts(self) -> list[FileMount]:
"""Return the configured file mounts."""
with self._state_lock:
return [
FileMount(host_path=mount.host_path, mount_path=_display_mount_path(mount.mount_path))
for mount in self._file_mounts.values()
]
def remove_file_mount(self, mount_path: str) -> None:
"""Remove one file mount by its sandbox path."""
normalized_mount_path = _normalize_mount_path(mount_path)
with self._state_lock:
if normalized_mount_path not in self._file_mounts:
raise KeyError(f"No file mount exists for {mount_path!r}.")
del self._file_mounts[normalized_mount_path]
def clear_file_mounts(self) -> None:
"""Remove all configured file mounts."""
with self._state_lock:
self._file_mounts.clear()
def add_allowed_domains(self, domains: AllowedDomainInput | Sequence[AllowedDomainInput]) -> None:
"""Add one or more outbound allow-list entries."""
if isinstance(domains, (str, AllowedDomain)) or _is_allowed_domain_pair(domains):
normalized_domains = [_normalize_allowed_domain_input(domains)]
else:
normalized_domains = [
_normalize_allowed_domain_input(domain) for domain in cast(Sequence[AllowedDomainInput], domains)
]
with self._state_lock:
for normalized_domain in normalized_domains:
self._allowed_domains[normalized_domain.target] = normalized_domain
def get_allowed_domains(self) -> list[AllowedDomain]:
"""Return the configured outbound allow-list entries."""
with self._state_lock:
return sorted(self._allowed_domains.values(), key=lambda value: value.target)
def remove_allowed_domain(self, domain: str) -> None:
"""Remove one outbound allow-list entry."""
normalized_domain = _normalize_domain(domain)
with self._state_lock:
if normalized_domain not in self._allowed_domains:
raise KeyError(f"No allowed domain exists for {domain!r}.")
del self._allowed_domains[normalized_domain]
def clear_allowed_domains(self) -> None:
"""Remove all outbound allow-list entries."""
with self._state_lock:
self._allowed_domains.clear()
def build_instructions(self, *, tools_visible_to_model: bool) -> str:
"""Build the current CodeAct instructions for this execute_code surface."""
config = self._build_run_config()
return build_codeact_instructions(
tools=config.tools,
tools_visible_to_model=tools_visible_to_model,
)
def create_run_tool(self) -> HyperlightExecuteCodeTool:
"""Create a run-scoped snapshot of this execute_code surface."""
file_mounts = self.get_file_mounts()
allowed_domains = self.get_allowed_domains()
return HyperlightExecuteCodeTool(
tools=self.get_tools(),
approval_mode=self._default_approval_mode,
workspace_root=self._workspace_root,
file_mounts=file_mounts or None,
allowed_domains=allowed_domains or None,
backend=self._backend,
module=self._module,
module_path=self._module_path,
_registry=self._registry,
)
def build_serializable_state(self) -> dict[str, Any]:
"""Return a JSON-serializable snapshot of the effective run state."""
config = self._build_run_config()
return {
"backend": config.backend,
"module": config.module,
"module_path": config.module_path,
"approval_mode": config.approval_mode,
"tool_names": [tool_obj.name for tool_obj in config.tools],
"filesystem_enabled": config.filesystem_enabled,
"workspace_root": str(config.workspace_root) if config.workspace_root is not None else None,
"file_mounts": [
{
"host_path": str(mount.host_path),
"mount_path": _display_mount_path(mount.mount_path),
}
for mount in config.file_mounts
],
"network_enabled": bool(config.allowed_domains),
"allowed_domains": [
{
"target": allowed_domain.target,
"methods": list(allowed_domain.methods) if allowed_domain.methods is not None else None,
}
for allowed_domain in config.allowed_domains
],
}
def to_dict(self, *, exclude: set[str] | None = None, exclude_none: bool = True) -> dict[str, Any]:
self.__dict__["description"] = self.description
return super().to_dict(exclude=exclude, exclude_none=exclude_none)
def _refresh_approval_mode(self) -> None:
self.approval_mode = _resolve_execute_code_approval_mode(
base_approval_mode=self._default_approval_mode,
tools=self._managed_tools,
)
def _build_run_config(self) -> _RunConfig:
with self._state_lock:
managed_tools = tuple(self._managed_tools)
workspace_root = self._workspace_root
stored_mounts = tuple(self._file_mounts.values())
allowed_domains = tuple(sorted(self._allowed_domains.values(), key=lambda value: value.target))
approval_mode = _resolve_execute_code_approval_mode(
base_approval_mode=self._default_approval_mode,
tools=managed_tools,
)
workspace_signature = _path_tree_signature(workspace_root) if workspace_root is not None else ()
normalized_mounts = tuple(
_NormalizedFileMount(
host_path=mount.host_path,
mount_path=mount.mount_path,
path_signature=_path_tree_signature(mount.host_path),
)
for mount in stored_mounts
)
return _RunConfig(
backend=self._backend,
module=self._module,
module_path=self._module_path,
approval_mode=approval_mode,
tools=managed_tools,
workspace_root=workspace_root,
workspace_signature=workspace_signature,
file_mounts=normalized_mounts,
allowed_domains=allowed_domains,
)
async def _run_code(self, *, code: str) -> list[Content]:
config = self._build_run_config()
return await asyncio.to_thread(self._registry.execute, config=config, code=code)
@@ -0,0 +1,126 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
from collections.abc import Sequence
from agent_framework import FunctionTool
from ._types import AllowedDomain
def _format_tool_summaries(tools: Sequence[FunctionTool]) -> str:
if not tools:
return "- No tools are currently registered inside the sandbox."
lines: list[str] = []
for tool_obj in tools:
parameters = tool_obj.parameters().get("properties", {})
parameter_names = [name for name in parameters if isinstance(name, str)]
parameter_summary = ", ".join(parameter_names) if parameter_names else "none"
description = str(tool_obj.description or "").strip() or "No description provided."
lines.append(f"- `{tool_obj.name}`: {description} Parameters: {parameter_summary}.")
return "\n".join(lines)
def _format_filesystem_capabilities(
*,
filesystem_enabled: bool,
workspace_enabled: bool,
mounted_paths: Sequence[str],
) -> str:
if not filesystem_enabled:
return "Filesystem access is unavailable because no workspace root or file mounts are configured."
lines = ["Filesystem access is enabled."]
lines.append("Read files from `/input`.")
lines.append("Write generated artifacts to `/output`; returned files will be attached to the tool result.")
if workspace_enabled:
lines.append("The configured workspace root is available under `/input/`.")
if mounted_paths:
lines.append("Additional mounted paths:")
lines.extend(f"- `{mounted_path}`" for mounted_path in mounted_paths)
elif not workspace_enabled:
lines.append("No workspace root or explicit file mounts are currently configured.")
return "\n".join(lines)
def _format_network_capabilities(
*,
allowed_domains: Sequence[AllowedDomain],
) -> str:
if not allowed_domains:
return "Outbound network access is unavailable because no allow-listed targets are configured."
lines = ["Outbound network access is allowed only for these configured targets:"]
for allowed_domain in allowed_domains:
methods_text = (
", ".join(allowed_domain.methods) if allowed_domain.methods else "all methods allowed by the backend"
)
lines.append(f"- `{allowed_domain.target}`: {methods_text}.")
return "\n".join(lines)
def build_codeact_instructions(
*,
tools: Sequence[FunctionTool],
tools_visible_to_model: bool,
) -> str:
"""Build dynamic CodeAct instructions for the effective sandbox state."""
usage_note = (
"Some tools may also appear directly, but prefer `execute_code` whenever you need to combine Python "
"control flow with sandbox tool calls."
if tools_visible_to_model
else "Provider-owned sandbox tools are not exposed separately; use `execute_code` when you need them."
)
return f"""You have one primary tool: execute_code.
Prefer one execute_code call per request when possible.
Its tool description contains the current `call_tool(...)` guidance, sandbox
tool registry, and capability limits.
{usage_note}
"""
def build_execute_code_description(
*,
tools: Sequence[FunctionTool],
filesystem_enabled: bool,
workspace_enabled: bool,
mounted_paths: Sequence[str],
allowed_domains: Sequence[AllowedDomain],
) -> str:
"""Build the dynamic execute_code tool description for standalone usage."""
filesystem_text = _format_filesystem_capabilities(
filesystem_enabled=filesystem_enabled,
workspace_enabled=workspace_enabled,
mounted_paths=mounted_paths,
)
network_text = _format_network_capabilities(
allowed_domains=allowed_domains,
)
return f"""Execute Python in an isolated Hyperlight sandbox.
Inside the sandbox, `call_tool(name, **kwargs)` is available as a built-in for
registered host callbacks. Use the tool name as the first argument and keyword
arguments only. Do not pass a dict or any other positional arguments after the
tool name.
Registered sandbox tools:
{_format_tool_summaries(tools)}
Filesystem capabilities:
{filesystem_text}
Network capabilities:
{network_text}
Prefer `execute_code` when you need to combine one or more `call_tool(...)`
calls with Python control flow, loops, or post-processing.
"""
@@ -0,0 +1,111 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any
from agent_framework import AgentSession, ContextProvider, FunctionTool, SessionContext
from agent_framework._tools import ApprovalMode
from ._execute_code_tool import HyperlightExecuteCodeTool, SandboxRuntime
from ._types import AllowedDomain, AllowedDomainInput, FileMount, FileMountInput
class HyperlightCodeActProvider(ContextProvider):
"""Inject a Hyperlight-backed CodeAct surface using provider-owned tools."""
DEFAULT_SOURCE_ID = "hyperlight_codeact"
def __init__(
self,
source_id: str = DEFAULT_SOURCE_ID,
*,
tools: FunctionTool | Callable[..., Any] | Sequence[FunctionTool | Callable[..., Any]] | None = None,
approval_mode: ApprovalMode | None = None,
workspace_root: str | Path | None = None,
file_mounts: FileMountInput | Sequence[FileMountInput] | None = None,
allowed_domains: AllowedDomainInput | Sequence[AllowedDomainInput] | None = None,
backend: str = "wasm",
module: str | None = "python_guest.path",
module_path: str | None = None,
_registry: SandboxRuntime | None = None,
) -> None:
super().__init__(source_id)
self._execute_code_tool = HyperlightExecuteCodeTool(
tools=tools,
approval_mode=approval_mode,
workspace_root=workspace_root,
file_mounts=file_mounts,
allowed_domains=allowed_domains,
backend=backend,
module=module,
module_path=module_path,
_registry=_registry,
)
def add_tools(
self,
tools: FunctionTool | Callable[..., Any] | Sequence[FunctionTool | Callable[..., Any]],
) -> None:
"""Add provider-owned sandbox tools."""
self._execute_code_tool.add_tools(tools)
def get_tools(self) -> list[FunctionTool]:
"""Return the provider-owned sandbox tools."""
return self._execute_code_tool.get_tools()
def remove_tool(self, name: str) -> None:
"""Remove one provider-owned sandbox tool by name."""
self._execute_code_tool.remove_tool(name)
def clear_tools(self) -> None:
"""Remove all provider-owned sandbox tools."""
self._execute_code_tool.clear_tools()
def add_file_mounts(self, file_mounts: FileMountInput | Sequence[FileMountInput]) -> None:
"""Add provider-managed file mounts."""
self._execute_code_tool.add_file_mounts(file_mounts)
def get_file_mounts(self) -> list[FileMount]:
"""Return the provider-managed file mounts."""
return self._execute_code_tool.get_file_mounts()
def remove_file_mount(self, mount_path: str) -> None:
"""Remove one provider-managed file mount."""
self._execute_code_tool.remove_file_mount(mount_path)
def clear_file_mounts(self) -> None:
"""Remove all provider-managed file mounts."""
self._execute_code_tool.clear_file_mounts()
def add_allowed_domains(self, domains: AllowedDomainInput | Sequence[AllowedDomainInput]) -> None:
"""Add provider-managed outbound allow-list entries."""
self._execute_code_tool.add_allowed_domains(domains)
def get_allowed_domains(self) -> list[AllowedDomain]:
"""Return the provider-managed outbound allow-list entries."""
return self._execute_code_tool.get_allowed_domains()
def remove_allowed_domain(self, domain: str) -> None:
"""Remove one provider-managed outbound allow-list entry."""
self._execute_code_tool.remove_allowed_domain(domain)
def clear_allowed_domains(self) -> None:
"""Remove all provider-managed outbound allow-list entries."""
self._execute_code_tool.clear_allowed_domains()
async def before_run(
self,
*,
agent: Any,
session: AgentSession | None,
context: SessionContext,
state: dict[str, Any],
) -> None:
"""Inject CodeAct instructions and a run-scoped execute_code tool before each run."""
run_tool = self._execute_code_tool.create_run_tool()
state[self.source_id] = run_tool.build_serializable_state()
context.extend_instructions(self.source_id, run_tool.build_instructions(tools_visible_to_model=False))
context.extend_tools(self.source_id, [run_tool])
@@ -0,0 +1,28 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
from collections.abc import Sequence
from pathlib import Path
from typing import NamedTuple, TypeAlias
class FileMount(NamedTuple):
"""Map a host file or directory into the sandbox input tree."""
host_path: str | Path
mount_path: str
FileMountHostPath: TypeAlias = str | Path
FileMountInput: TypeAlias = str | tuple[FileMountHostPath, str] | FileMount
class AllowedDomain(NamedTuple):
"""Allow outbound requests to one target, optionally restricted to specific HTTP methods."""
target: str
methods: tuple[str, ...] | None = None
AllowedDomainInput: TypeAlias = str | tuple[str, str | Sequence[str]] | AllowedDomain
+101
View File
@@ -0,0 +1,101 @@
[project]
name = "agent-framework-hyperlight"
description = "Hyperlight CodeAct integrations for Microsoft Agent Framework."
authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
version = "1.0.0a260409"
license-files = ["LICENSE"]
urls.homepage = "https://aka.ms/agent-framework"
urls.source = "https://github.com/microsoft/agent-framework/tree/main/python"
urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true"
urls.issues = "https://github.com/microsoft/agent-framework/issues"
classifiers = [
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Typing :: Typed",
]
dependencies = [
"agent-framework-core>=1.0.0,<2",
"hyperlight-sandbox>=0.3.0,<0.4",
"hyperlight-sandbox-backend-wasm>=0.3.0,<0.4 ; (sys_platform == 'linux' or sys_platform == 'win32') and python_version < '3.14'",
"hyperlight-sandbox-python-guest>=0.3.0,<0.4",
]
[tool.uv]
prerelease = "if-necessary-or-explicit"
environments = [
"sys_platform == 'linux'",
"sys_platform == 'win32'"
]
[tool.uv-dynamic-versioning]
fallback-version = "0.0.0"
[tool.pytest.ini_options]
testpaths = 'tests'
addopts = "-ra -q -r fEX"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = []
timeout = 120
markers = [
"integration: marks tests as integration tests that require external services",
]
[tool.ruff]
extend = "../../pyproject.toml"
[tool.ruff.lint.per-file-ignores]
"samples/**" = ["INP", "T201"]
"tests/**" = ["D", "INP", "TD", "ERA001", "RUF", "S"]
[tool.coverage.run]
omit = [
"**/__init__.py"
]
[tool.pyright]
extends = "../../pyproject.toml"
include = ["agent_framework_hyperlight"]
exclude = ['tests']
[tool.mypy]
plugins = ['pydantic.mypy']
strict = true
python_version = "3.10"
ignore_missing_imports = true
disallow_untyped_defs = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
[tool.bandit]
targets = ["agent_framework_hyperlight"]
exclude_dirs = ["tests", "samples"]
[tool.poe]
executor.type = "uv"
include = "../../shared_tasks.toml"
[tool.poe.tasks.mypy]
help = "Run MyPy for this package."
cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hyperlight"
[tool.poe.tasks.test]
help = "Run the default unit test suite for this package."
cmd = 'pytest -m "not integration" --cov=agent_framework_hyperlight --cov-report=term-missing:skip-covered tests'
[build-system]
requires = ["flit-core >= 3.11,<4.0"]
build-backend = "flit_core.buildapi"
@@ -0,0 +1,43 @@
# Hyperlight CodeAct samples
These samples demonstrate the alpha `agent-framework-hyperlight` package.
## When to use which pattern
- **Provider pattern** (`codeact_context_provider.py`): Use when the tool
registry, file mounts, or network allow-list may change between runs, or when
you want the provider to manage CodeAct instructions and approval computation
automatically on every invocation. This is the recommended default for
production agents that need dynamic capability management or concurrent runs
sharing one provider.
- **Manual static wiring** (`codeact_manual_wiring.py`): Use when the sandbox
tool set and capabilities are fixed for the agent's lifetime. This pattern
builds instructions once, passes `execute_code` alongside direct tools in
`tools=`, and skips the per-run provider lifecycle entirely. Simpler setup,
but changes to the tool registry after construction will not update the
agent's instructions automatically.
- **Standalone tool** (`codeact_tool.py`): Use for the simplest integration
where `execute_code` is added directly to the agent tool list. The tool's own
description advertises `call_tool(...)` and the registered sandbox tools, so
no extra agent instructions are needed. Best for quick prototyping or when
CodeAct is just another tool alongside the agent's direct tools.
## Samples
- `codeact_context_provider.py` shows the provider-owned CodeAct model where the
agent only sees `execute_code` and sandbox tools are owned by
`HyperlightCodeActProvider`.
- `codeact_manual_wiring.py` shows static wiring where `HyperlightExecuteCodeTool`
and its instructions are passed directly to the `Agent` constructor.
- `codeact_tool.py` shows the standalone `HyperlightExecuteCodeTool` surface
where `execute_code` is added directly to the agent tool list.
Run the samples from the repository after installing the workspace dependencies:
```bash
uv run --directory packages/hyperlight python samples/codeact_context_provider.py
uv run --directory packages/hyperlight python samples/codeact_manual_wiring.py
uv run --directory packages/hyperlight python samples/codeact_tool.py
```
@@ -0,0 +1,192 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import asyncio
import logging
import os
from collections.abc import Awaitable, Callable
from typing import Annotated, Any, Literal
from agent_framework import Agent, FunctionInvocationContext, function_middleware, tool
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from agent_framework_hyperlight import HyperlightCodeActProvider
"""This sample demonstrates the provider-owned Hyperlight CodeAct flow.
The sample keeps `compute` and `fetch_data` off the direct agent tool surface and
registers them only with `HyperlightCodeActProvider`. The model therefore sees a
single `execute_code` tool and must call the provider-owned tools from inside
the sandbox with `call_tool(...)`.
"""
load_dotenv()
_CYAN = "\033[36m"
_YELLOW = "\033[33m"
_GREEN = "\033[32m"
_DIM = "\033[2m"
_RESET = "\033[0m"
class _ColoredFormatter(logging.Formatter):
"""Dim logger output so it does not compete with sample prints."""
def format(self, record: logging.LogRecord) -> str:
return f"{_DIM}{super().format(record)}{_RESET}"
logging.basicConfig(level=logging.WARNING)
logging.getLogger().handlers[0].setFormatter(
_ColoredFormatter("[%(asctime)s] %(levelname)s: %(message)s"),
)
@function_middleware
async def log_function_calls(
context: FunctionInvocationContext,
call_next: Callable[[], Awaitable[None]],
) -> None:
"""Log tool calls, including readable execute_code blocks."""
import time
function_name = context.function.name
arguments = context.arguments if isinstance(context.arguments, dict) else {}
if function_name == "execute_code" and "code" in arguments:
print(f"\n{_YELLOW}{'' * 60}")
print("▶ execute_code")
print(f"{'' * 60}{_RESET}")
print(arguments["code"])
print(f"{_YELLOW}{'' * 60}{_RESET}")
else:
pairs = ", ".join(f"{name}={value!r}" for name, value in arguments.items())
print(f"\n{_YELLOW}{function_name}({pairs}){_RESET}")
start = time.perf_counter()
await call_next()
elapsed = time.perf_counter() - start
result = context.result
if function_name == "execute_code" and isinstance(result, list):
for item in result:
if item.type != "code_interpreter_tool_result":
continue
for output in item.outputs or []:
if output.type == "text" and output.text:
print(f"{_GREEN}stdout:\n{output.text}{_RESET}")
if output.type == "error" and output.error_details:
print(f"{_YELLOW}stderr:\n{output.error_details}{_RESET}")
else:
print(f"{_YELLOW}{function_name}{result!r}{_RESET}")
print(f"{_DIM} ({elapsed:.4f}s){_RESET}")
@tool(approval_mode="never_require")
def compute(
operation: Annotated[
Literal["add", "subtract", "multiply", "divide"],
"Math operation: add, subtract, multiply, or divide.",
],
a: Annotated[float, "First numeric operand."],
b: Annotated[float, "Second numeric operand."],
) -> float:
"""Perform a math operation for sandboxed code."""
operations = {
"add": a + b,
"subtract": a - b,
"multiply": a * b,
"divide": a / b if b else float("inf"),
}
return operations[operation]
@tool(approval_mode="never_require")
async def fetch_data(
table: Annotated[str, "Name of the simulated table to query."],
) -> list[dict[str, Any]]:
"""Fetch records from a named table."""
await asyncio.sleep(0.5)
data: dict[str, list[dict[str, Any]]] = {
"users": [
{"id": 1, "name": "Alice", "role": "admin"},
{"id": 2, "name": "Bob", "role": "user"},
{"id": 3, "name": "Charlie", "role": "admin"},
],
"products": [
{"id": 101, "name": "Widget", "price": 9.99},
{"id": 102, "name": "Gadget", "price": 19.99},
],
}
return data.get(table, [])
async def main() -> None:
"""Run the provider-owned Hyperlight CodeAct sample."""
# 1. Create the Hyperlight-backed provider and register sandbox tools on it.
codeact = HyperlightCodeActProvider(
tools=[compute, fetch_data],
approval_mode="never_require",
)
# 2. Create the client and the agent.
agent = Agent(
client=FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
),
name="HyperlightCodeActProviderAgent",
instructions="You are a helpful assistant.",
context_providers=[codeact],
middleware=[log_function_calls],
)
# 3. Run a request that should use execute_code plus provider-owned tools.
query = (
"Fetch all users, find admins, multiply 7*(3*2), and print the users, "
"admins, and multiplication result. Use execute_code and call_tool(...) "
"inside the sandbox."
)
print(f"{_CYAN}{'=' * 60}")
print("Hyperlight CodeAct provider sample")
print(f"{'=' * 60}{_RESET}")
print(f"{_CYAN}User: {query}{_RESET}")
result = await agent.run(query)
print(f"{_CYAN}Agent: {result.text}{_RESET}")
"""
Sample output (shape only):
============================================================
Hyperlight CodeAct provider sample
============================================================
User: Fetch all users, find admins, multiply 7*(3*2), ...
────────────────────────────────────────────────────────────
▶ execute_code
────────────────────────────────────────────────────────────
users = call_tool("fetch_data", table="users")
admins = [user for user in users if user["role"] == "admin"]
result = call_tool("compute", operation="multiply", a=7, b=6)
print("Users:", users)
print("Admins:", admins)
print("7 * 6 =", result)
────────────────────────────────────────────────────────────
stdout:
Users: [...]
Admins: [...]
7 * 6 = 42.0
(0.0xxx s)
Agent: ...
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,133 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import asyncio
import os
from typing import Annotated, Any, Literal
from agent_framework import Agent, tool
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from agent_framework_hyperlight import HyperlightExecuteCodeTool
"""This sample demonstrates manual static wiring of CodeAct without a provider.
Instead of using `HyperlightCodeActProvider` with `context_providers=`, this
sample creates a `HyperlightExecuteCodeTool` directly, extracts its CodeAct
instructions once, and passes both to the `Agent` constructor at build time.
This avoids the per-run provider lifecycle (`before_run` / `after_run`) and is
well-suited when the tool registry, file mounts, and network allow-list are
fixed for the agent's lifetime. The tradeoff is that dynamic tool or capability
changes between runs are not supported — any mutations to the tool would not
update the agent's instructions automatically.
"""
load_dotenv()
@tool(approval_mode="never_require")
def compute(
operation: Annotated[
Literal["add", "subtract", "multiply", "divide"],
"Math operation: add, subtract, multiply, or divide.",
],
a: Annotated[float, "First numeric operand."],
b: Annotated[float, "Second numeric operand."],
) -> float:
"""Perform a math operation used by sandboxed code."""
operations = {
"add": a + b,
"subtract": a - b,
"multiply": a * b,
"divide": a / b if b else float("inf"),
}
return operations[operation]
@tool(approval_mode="never_require")
def fetch_data(
table: Annotated[str, "Name of the simulated table to query."],
) -> list[dict[str, Any]]:
"""Fetch simulated records from a named table."""
data: dict[str, list[dict[str, Any]]] = {
"users": [
{"id": 1, "name": "Alice", "role": "admin"},
{"id": 2, "name": "Bob", "role": "user"},
{"id": 3, "name": "Charlie", "role": "admin"},
],
"products": [
{"id": 101, "name": "Widget", "price": 9.99},
{"id": 102, "name": "Gadget", "price": 19.99},
],
}
return data.get(table, [])
@tool(approval_mode="never_require")
def send_email(
to: Annotated[str, "Recipient email address."],
subject: Annotated[str, "Email subject line."],
body: Annotated[str, "Email body text."],
) -> str:
"""Simulate sending an email (direct-only tool, not available inside the sandbox)."""
return f"Email sent to {to}: {subject}"
async def main() -> None:
"""Run the manual static-wiring sample."""
# 1. Create the execute_code tool and register sandbox tools on it.
execute_code = HyperlightExecuteCodeTool(
tools=[compute, fetch_data],
approval_mode="never_require",
)
# 2. Build CodeAct instructions once. Setting tools_visible_to_model=False
# tells the instructions builder that sandbox tools are not in the agent's
# direct tool list, so the model must use call_tool(...) inside execute_code.
codeact_instructions = execute_code.build_instructions(tools_visible_to_model=False)
# 3. Create the client and the agent with everything wired at construction time.
# - send_email is a direct-only tool (not available inside the sandbox).
# - execute_code carries sandbox tools (compute, fetch_data) via call_tool.
agent = Agent(
client=FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
),
name="ManualWiringAgent",
instructions=f"You are a helpful assistant.\n\n{codeact_instructions}",
tools=[send_email, execute_code],
)
# 4. Run a request that exercises both the sandbox and the direct tool.
print("=" * 60)
print("Manual static-wiring CodeAct sample")
print("=" * 60)
query = (
"Fetch all users, find admins, multiply 6*7, and print the users, admins, "
"and multiplication result. Use one execute_code call. "
"Then send an email to admin@example.com summarising the results."
)
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result.text}")
"""
Sample output (shape only):
============================================================
Manual static-wiring CodeAct sample
============================================================
User: Fetch all users, find admins, multiply 6*7, ...
Agent: ...
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,110 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import asyncio
import os
from typing import Annotated, Any, Literal
from agent_framework import Agent, tool
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from agent_framework_hyperlight import HyperlightExecuteCodeTool
"""This sample demonstrates the standalone Hyperlight execute_code tool.
The sample adds `HyperlightExecuteCodeTool` directly to the agent. The tool's
own description advertises `call_tool(...)`, the registered sandbox tools, and
the current capability configuration, so no extra CodeAct-specific agent
instructions are required.
"""
load_dotenv()
@tool(approval_mode="never_require")
def compute(
operation: Annotated[
Literal["add", "subtract", "multiply", "divide"],
"Math operation: add, subtract, multiply, or divide.",
],
a: Annotated[float, "First numeric operand."],
b: Annotated[float, "Second numeric operand."],
) -> float:
"""Perform a math operation used by sandboxed code."""
operations = {
"add": a + b,
"subtract": a - b,
"multiply": a * b,
"divide": a / b if b else float("inf"),
}
return operations[operation]
@tool(approval_mode="never_require")
def fetch_data(
table: Annotated[str, "Name of the simulated table to query."],
) -> list[dict[str, Any]]:
"""Fetch simulated records from a named table."""
data: dict[str, list[dict[str, Any]]] = {
"users": [
{"id": 1, "name": "Alice", "role": "admin"},
{"id": 2, "name": "Bob", "role": "user"},
{"id": 3, "name": "Charlie", "role": "admin"},
],
"products": [
{"id": 101, "name": "Widget", "price": 9.99},
{"id": 102, "name": "Gadget", "price": 19.99},
],
}
return data.get(table, [])
async def main() -> None:
"""Run the standalone execute_code sample."""
# 1. Create the packaged execute_code tool and register sandbox tools on it.
execute_code = HyperlightExecuteCodeTool(
tools=[compute, fetch_data],
approval_mode="never_require",
)
# 2. Create the client and the agent.
agent = Agent(
client=FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
),
name="HyperlightExecuteCodeToolAgent",
instructions="You are a helpful assistant.",
tools=execute_code,
)
# 3. Run one request through the direct-tool surface.
print("=" * 60)
print("Hyperlight execute_code tool sample")
print("=" * 60)
query = (
"Fetch all users, find admins, multiply 6*7, and print the users, admins, "
"and multiplication result. Use one execute_code call."
)
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result.text}")
"""
Sample output (shape only):
============================================================
Hyperlight execute_code tool sample
============================================================
User: Fetch all users, find admins, multiply 6*7, ...
Agent: ...
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,981 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import asyncio
import importlib.metadata
import importlib.util
import inspect
import json
import sys
import threading
import time
from collections.abc import Awaitable, Callable, Mapping, MutableSequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import pytest
from agent_framework import (
Agent,
BaseChatClient,
ChatResponse,
ChatResponseUpdate,
Content,
FunctionInvocationLayer,
FunctionTool,
Message,
ResponseStream,
tool,
)
from agent_framework_hyperlight import AllowedDomain, FileMount, HyperlightCodeActProvider, HyperlightExecuteCodeTool
from agent_framework_hyperlight import _execute_code_tool as execute_code_module
def _hyperlight_integration_static_skip_reason() -> str | None:
if sys.version_info >= (3, 14):
return (
"Hyperlight integration tests require Python < 3.14 because hyperlight-sandbox-backend-wasm is unsupported."
)
if sys.platform not in {"linux", "win32"}:
return "Hyperlight integration tests require Linux or Windows runners."
if importlib.util.find_spec("hyperlight_sandbox") is None:
return "hyperlight-sandbox is not installed."
if importlib.util.find_spec("python_guest") is None:
return "hyperlight-sandbox-python-guest is not installed."
try:
importlib.metadata.version("hyperlight-sandbox-backend-wasm")
except importlib.metadata.PackageNotFoundError:
return "hyperlight-sandbox-backend-wasm is not installed."
return None
def _hyperlight_integration_runtime_skip_reason() -> str | None:
if (reason := _hyperlight_integration_static_skip_reason()) is not None:
return reason
try:
sandbox_cls = execute_code_module._load_sandbox_class()
sandbox = sandbox_cls(
backend=execute_code_module.DEFAULT_HYPERLIGHT_BACKEND,
module=execute_code_module.DEFAULT_HYPERLIGHT_MODULE,
)
sandbox.run("None")
except RuntimeError as exc:
message = str(exc)
if "no hypervisor was found for sandbox" in message.lower():
return "Hyperlight integration tests require a runner with a working Hyperlight hypervisor."
return None
def _skip_if_hyperlight_integration_runtime_disabled() -> None:
if (reason := _hyperlight_integration_runtime_skip_reason()) is not None:
pytest.skip(reason)
skip_if_hyperlight_integration_tests_disabled = pytest.mark.skipif(
(reason := _hyperlight_integration_static_skip_reason()) is not None,
reason=reason or "Hyperlight integration tests are disabled.",
)
@pytest.fixture(scope="module")
def shared_sandbox():
"""Long-lived sandbox with snapshot/restore for read-mostly tests.
Multiple tests run sequentially against this fixture. Each test restores the
sandbox to a clean state via the ``restored_sandbox`` fixture.
"""
if (reason := _hyperlight_integration_runtime_skip_reason()) is not None:
pytest.skip(reason)
sandbox_cls = execute_code_module._load_sandbox_class()
sandbox = sandbox_cls(
backend=execute_code_module.DEFAULT_HYPERLIGHT_BACKEND,
module=execute_code_module.DEFAULT_HYPERLIGHT_MODULE,
)
sandbox.run("None")
snapshot = sandbox.snapshot()
yield sandbox, snapshot
@pytest.fixture
def restored_sandbox(shared_sandbox):
"""Restore shared sandbox to clean state before each test."""
sandbox, snapshot = shared_sandbox
sandbox.restore(snapshot)
return sandbox
@pytest.fixture
def fresh_sandbox():
"""Short-lived sandbox for tests that alter config meaningfully.
Not pre-warmed: call ``sandbox.run("None")`` after registering tools
and domains, then snapshot/restore before executing test code.
"""
if (reason := _hyperlight_integration_runtime_skip_reason()) is not None:
pytest.skip(reason)
sandbox_cls = execute_code_module._load_sandbox_class()
sandbox = sandbox_cls(
backend=execute_code_module.DEFAULT_HYPERLIGHT_BACKEND,
module=execute_code_module.DEFAULT_HYPERLIGHT_MODULE,
temp_output=True,
)
yield sandbox
@tool(approval_mode="never_require")
def compute(a: int, b: int) -> int:
return a + b
@tool(approval_mode="always_require")
def dangerous_compute(a: int, b: int) -> int:
return a * b
@tool(name="compute", approval_mode="always_require")
def replacement_compute(a: int, b: int) -> int:
return a - b
@dataclass(slots=True)
class _FakeResult:
success: bool
stdout: str = ""
stderr: str = ""
def _run_in_thread(callback: Callable[[], Any]) -> Any:
result: dict[str, Any] = {}
error: dict[str, BaseException] = {}
def _runner() -> None:
try:
result["value"] = callback()
except BaseException as exc:
error["value"] = exc
thread = threading.Thread(target=_runner)
thread.start()
thread.join()
if "value" in error:
raise error["value"]
return result.get("value")
class _FakeSandbox:
instances: list[_FakeSandbox] = []
def __init__(
self,
*,
input_dir: str | None = None,
output_dir: str | None = None,
temp_output: bool = False,
backend: str = "wasm",
module: str | None = None,
module_path: str | None = None,
heap_size: str | None = None,
stack_size: str | None = None,
) -> None:
self.input_dir = input_dir
self.output_dir = output_dir
self.registered_tools: dict[str, Any] = {}
self.allowed_domains: list[tuple[str, list[str] | None]] = []
self.restore_calls: list[Any] = []
self.output_files: list[str] = []
_FakeSandbox.instances.append(self)
def register_tool(self, name_or_tool: Any, callback: Any | None = None) -> None:
if callback is None:
raise AssertionError("Expected callback registration for sandbox tools.")
self.registered_tools[str(name_or_tool)] = callback
def allow_domain(self, target: str, methods: list[str] | None = None) -> None:
self.allowed_domains.append((target, methods))
def _invoke_tool(self, name: str, **kwargs: Any) -> Any:
callback = self.registered_tools[name]
if inspect.iscoroutinefunction(callback):
return _run_in_thread(lambda: asyncio.run(callback(**kwargs)))
result = callback(**kwargs)
if inspect.isawaitable(result):
return _run_in_thread(lambda: asyncio.run(result))
return result
def run(self, code: str) -> _FakeResult:
if code == "None":
return _FakeResult(success=True)
if code == "create-output":
if self.output_dir is None:
raise AssertionError("Expected output directory for create-output test.")
Path(self.output_dir, "report.txt").write_text("artifact", encoding="utf-8")
self.output_files = ["report.txt"]
return _FakeResult(success=True, stdout="done\n")
if 'call_tool("compute", a=20, b=22)' in code:
total = self._invoke_tool("compute", a=20, b=22)
return _FakeResult(success=True, stdout=f"{total}\n")
return _FakeResult(success=False, stderr="sandbox boom")
def snapshot(self) -> str:
return "snapshot"
def restore(self, snapshot: Any) -> None:
self.restore_calls.append(snapshot)
def get_output_files(self) -> list[str]:
return list(self.output_files)
class _FakeRuntime:
def __init__(self) -> None:
self.calls: list[tuple[Any, str]] = []
def execute(self, *, config: Any, code: str) -> list[Content]:
self.calls.append((config, code))
return [Content.from_text("ok")]
class _FakeSandboxWithoutOutputListing(_FakeSandbox):
def get_output_files(self) -> list[str]:
return []
class _FakeSandboxWithDelayedUnlistedOutput(_FakeSandboxWithoutOutputListing):
writer_threads: list[threading.Thread] = []
def run(self, code: str) -> _FakeResult:
if 'Path("/output/report.txt").write_text("artifact", encoding="utf-8")' in code:
if self.output_dir is None:
raise AssertionError("Expected output directory for delayed output test.")
def _write_file() -> None:
time.sleep(0.15)
Path(self.output_dir, "report.txt").write_text("artifact", encoding="utf-8")
writer_thread = threading.Thread(target=_write_file)
writer_thread.start()
self.writer_threads.append(writer_thread)
return _FakeResult(success=True)
return super().run(code)
class _FakeSessionContext:
def __init__(self, *, tools: list[Any] | None = None) -> None:
self.options: dict[str, Any] = {}
if tools is not None:
self.options["tools"] = tools
self.instructions: list[tuple[str, str]] = []
self.tools: list[tuple[str, list[Any]]] = []
def extend_instructions(self, source_id: str, instructions: str) -> None:
self.instructions.append((source_id, instructions))
def extend_tools(self, source_id: str, tools: list[Any]) -> None:
self.tools.append((source_id, tools))
def _extract_execute_code_result(function_result: Content) -> Content:
assert function_result.type == "function_result"
assert function_result.exception is None, (
f"execute_code raised {function_result.exception!r} with items={function_result.items!r}"
)
code_result = next(
(item for item in function_result.items or [] if item.type == "code_interpreter_tool_result"),
None,
)
if code_result is not None:
return code_result
text_outputs = [item for item in function_result.items or [] if item.type == "text"]
if text_outputs:
return Content.from_code_interpreter_tool_result(outputs=text_outputs)
if function_result.result:
return Content.from_code_interpreter_tool_result(outputs=[Content.from_text(function_result.result)])
raise AssertionError(f"execute_code returned no usable outputs: {function_result.items!r}")
def _extract_text_output(result_content: Content) -> str:
code_result = _extract_execute_code_result(result_content)
text_output = next(
(item for item in code_result.outputs or [] if item.type == "text" and item.text is not None), None
)
assert text_output is not None and text_output.text is not None, (
f"Expected text output from execute_code, got {code_result.outputs!r}"
)
return text_output.text
class _FakeCodeActChatClient(FunctionInvocationLayer[Any], BaseChatClient[Any]):
def __init__(self) -> None:
FunctionInvocationLayer.__init__(self)
BaseChatClient.__init__(self)
self.call_count = 0
def _inner_get_response(
self,
*,
messages: MutableSequence[Message],
stream: bool,
options: Mapping[str, Any],
**kwargs: Any,
) -> Awaitable[ChatResponse] | ResponseStream[ChatResponseUpdate, ChatResponse]:
if stream:
raise AssertionError("Streaming is not used in this integration test.")
async def _get_response() -> ChatResponse:
self.call_count += 1
if self.call_count == 1:
return ChatResponse(
messages=Message(
role="assistant",
contents=[
Content.from_function_call(
call_id="execute_code_call",
name="execute_code",
arguments={
"code": 'total = call_tool("compute", a=20, b=22)\nprint(total)',
},
)
],
)
)
function_results = [
content for message in messages for content in message.contents if content.type == "function_result"
]
assert len(function_results) == 1
result_content = function_results[0]
assert result_content.call_id == "execute_code_call"
assert _extract_text_output(result_content) == "42\n"
return ChatResponse(messages=Message(role="assistant", contents=["The sandbox returned 42."]))
return _get_response()
def test_execute_code_tool_updates_approval_with_managed_tools() -> None:
execute_code = HyperlightExecuteCodeTool(tools=[compute], _registry=_FakeRuntime())
assert execute_code.approval_mode == "never_require"
execute_code.add_tools([dangerous_compute])
assert execute_code.approval_mode == "always_require"
def test_execute_code_tool_replaces_tools_with_the_same_name() -> None:
execute_code = HyperlightExecuteCodeTool(tools=[compute], _registry=_FakeRuntime())
execute_code.add_tools(replacement_compute)
tools = execute_code.get_tools()
assert len(tools) == 1
assert tools[0] is replacement_compute
assert execute_code.approval_mode == "always_require"
def test_execute_code_tool_accepts_string_and_tuple_file_mounts_without_mode_flags(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
shorthand_file = tmp_path / "notes.txt"
shorthand_file.write_text("hello", encoding="utf-8")
explicit_file = tmp_path / "data.json"
explicit_file.write_text('{"hello": "world"}', encoding="utf-8")
monkeypatch.chdir(tmp_path)
execute_code = HyperlightExecuteCodeTool(_registry=_FakeRuntime())
execute_code.add_file_mounts("notes.txt")
execute_code.add_file_mounts((explicit_file, "data/data.json"))
assert execute_code.get_file_mounts() == [
FileMount(shorthand_file.resolve(), "/input/notes.txt"),
FileMount(explicit_file.resolve(), "/input/data/data.json"),
]
async def test_execute_code_tool_populates_input_dir_with_workspace_and_file_mounts(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
workspace_root = tmp_path / "workspace"
workspace_root.mkdir()
(workspace_root / "notes.txt").write_text("workspace note", encoding="utf-8")
mounted_file = tmp_path / "mounted.txt"
mounted_file.write_text("hello from mount", encoding="utf-8")
execute_code = HyperlightExecuteCodeTool(
workspace_root=workspace_root,
file_mounts=[FileMount(mounted_file, "data/input.txt")],
)
result = await execute_code.invoke(arguments={"code": "None"})
assert result[0].type == "code_interpreter_tool_result"
assert _FakeSandbox.instances[0].input_dir is not None
input_root = Path(_FakeSandbox.instances[0].input_dir)
assert (input_root / "notes.txt").read_text(encoding="utf-8") == "workspace note"
assert (input_root / "data" / "input.txt").read_text(encoding="utf-8") == "hello from mount"
def test_execute_code_tool_allowed_domains_use_structured_entries_and_replace_by_target() -> None:
execute_code = HyperlightExecuteCodeTool(_registry=_FakeRuntime())
execute_code.add_allowed_domains(["https://api.example.com/v1", ("github.com", "get")])
execute_code.add_allowed_domains([
AllowedDomain("api.example.com", ("post", "get")),
("github.com", ["head", "get"]),
])
assert execute_code.get_allowed_domains() == [
AllowedDomain("api.example.com", ("GET", "POST")),
AllowedDomain("github.com", ("GET", "HEAD")),
]
def test_execute_code_tool_description_contains_call_tool_guidance(tmp_path: Path) -> None:
workspace_root = tmp_path / "workspace"
workspace_root.mkdir()
(workspace_root / "notes.txt").write_text("hello", encoding="utf-8")
mount_file = tmp_path / "data.json"
mount_file.write_text('{"hello": "world"}', encoding="utf-8")
execute_code = HyperlightExecuteCodeTool(
tools=[compute],
workspace_root=workspace_root,
file_mounts=[FileMount(str(mount_file), "data/data.json")],
allowed_domains=[AllowedDomain("https://api.example.com/v1", ("get", "post")), "github.com"],
_registry=_FakeRuntime(),
)
description = execute_code.description
assert "call_tool(name, **kwargs)" in description
assert "compute" in description
assert "/input/data/data.json" in description
assert "/output" in description
assert "api.example.com" in description
assert "GET, POST" in description
assert "github.com" in description
async def test_execute_code_tool_executes_with_structured_content(monkeypatch: pytest.MonkeyPatch) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
execute_code = HyperlightExecuteCodeTool(
tools=[compute],
file_mounts=[FileMount(Path(__file__), "fixtures/source.py")],
allowed_domains=[("api.example.com", "get")],
)
result = await execute_code.invoke(arguments={"code": "create-output"})
assert result[0].type == "code_interpreter_tool_result"
assert result[0].outputs is not None
assert result[0].outputs[0].type == "text"
assert result[0].outputs[0].text == "done\n"
assert any(item.type == "data" for item in result[0].outputs)
assert _FakeSandbox.instances[0].allowed_domains == [("api.example.com", ["GET"])]
assert "compute" in _FakeSandbox.instances[0].registered_tools
async def test_execute_code_tool_collects_output_files_without_backend_listing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandboxWithoutOutputListing)
execute_code = HyperlightExecuteCodeTool(
file_mounts=[FileMount(Path(__file__), "fixtures/source.py")],
)
result = await execute_code.invoke(arguments={"code": "create-output"})
assert result[0].type == "code_interpreter_tool_result"
assert result[0].outputs is not None
assert any(
item.type == "data" and item.additional_properties["path"] == "/output/report.txt" for item in result[0].outputs
)
async def test_execute_code_tool_waits_for_unlisted_output_files_to_appear(
monkeypatch: pytest.MonkeyPatch,
) -> None:
_FakeSandboxWithDelayedUnlistedOutput.writer_threads.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandboxWithDelayedUnlistedOutput)
execute_code = HyperlightExecuteCodeTool(
file_mounts=[FileMount(Path(__file__), "fixtures/source.py")],
)
result = await execute_code.invoke(
arguments={"code": 'Path("/output/report.txt").write_text("artifact", encoding="utf-8")'}
)
for writer_thread in _FakeSandboxWithDelayedUnlistedOutput.writer_threads:
writer_thread.join()
assert result[0].type == "code_interpreter_tool_result"
assert result[0].outputs is not None
assert any(
item.type == "data" and item.additional_properties["path"] == "/output/report.txt" for item in result[0].outputs
)
async def test_execute_code_tool_failure_returns_error_content(monkeypatch: pytest.MonkeyPatch) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
execute_code = HyperlightExecuteCodeTool()
result = await execute_code.invoke(arguments={"code": "fail"})
assert result[0].type == "code_interpreter_tool_result"
assert result[0].outputs is not None
assert result[0].outputs[0].type == "error"
assert result[0].outputs[0].error_details == "sandbox boom"
async def test_execute_code_tool_retries_allowed_domains_with_urls_when_backend_rejects_host_targets(
monkeypatch: pytest.MonkeyPatch,
) -> None:
class _FakeStrictNetworkSandbox:
instances: list[_FakeStrictNetworkSandbox] = []
def __init__(
self,
*,
input_dir: str | None = None,
output_dir: str | None = None,
backend: str = "wasm",
module: str | None = None,
module_path: str | None = None,
) -> None:
del input_dir, output_dir, backend, module, module_path
self.allowed_domains: list[tuple[str, list[str] | None]] = []
_FakeStrictNetworkSandbox.instances.append(self)
def register_tool(self, name_or_tool: Any, callback: Any | None = None) -> None:
del name_or_tool, callback
def allow_domain(self, target: str, methods: list[str] | None = None) -> None:
self.allowed_domains.append((target, methods))
def run(self, code: str) -> _FakeResult:
if code == "None" and any("://" not in target for target, _ in self.allowed_domains):
raise RuntimeError("invalid URL for network permission: ")
return _FakeResult(success=True)
def snapshot(self) -> str:
return "snapshot"
def restore(self, snapshot: Any) -> None:
del snapshot
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeStrictNetworkSandbox)
execute_code = HyperlightExecuteCodeTool(allowed_domains=[("127.0.0.1:8080", "get")])
result = await execute_code.invoke(arguments={"code": "None"})
assert result[0].type == "code_interpreter_tool_result"
assert len(_FakeStrictNetworkSandbox.instances) == 2
assert _FakeStrictNetworkSandbox.instances[0].allowed_domains == [("127.0.0.1:8080", ["GET"])]
assert _FakeStrictNetworkSandbox.instances[1].allowed_domains == [
("http://127.0.0.1:8080", ["GET"]),
("https://127.0.0.1:8080", ["GET"]),
]
def test_hyperlight_integration_runtime_skip_reason_reports_missing_hypervisor(monkeypatch: pytest.MonkeyPatch) -> None:
class _FakeNoHypervisorSandbox:
def __init__(
self,
*,
input_dir: str | None = None,
output_dir: str | None = None,
backend: str = "wasm",
module: str | None = None,
module_path: str | None = None,
) -> None:
del input_dir, output_dir, backend, module, module_path
def run(self, code: str) -> _FakeResult:
del code
raise RuntimeError("failed to build ProtoWasmSandbox: No Hypervisor was found for Sandbox")
original_find_spec = importlib.util.find_spec
def _fake_find_spec(name: str) -> object | None:
if name in {"hyperlight_sandbox", "python_guest"}:
return object()
return original_find_spec(name)
monkeypatch.setattr(sys, "version_info", (3, 13, 0))
monkeypatch.setattr(sys, "platform", "linux")
monkeypatch.setattr(importlib.util, "find_spec", _fake_find_spec)
monkeypatch.setattr(importlib.metadata, "version", lambda _: "0.0.0")
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeNoHypervisorSandbox)
assert _hyperlight_integration_runtime_skip_reason() == (
"Hyperlight integration tests require a runner with a working Hyperlight hypervisor."
)
async def test_provider_injects_run_scoped_execute_code_tool() -> None:
runtime = _FakeRuntime()
provider = HyperlightCodeActProvider(tools=[compute], _registry=runtime)
context = _FakeSessionContext(tools=[dangerous_compute])
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
assert context.options["tools"] == [dangerous_compute]
assert len(context.instructions) == 1
assert len(context.tools) == 1
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
assert run_tool.approval_mode == "never_require"
assert [tool_obj.name for tool_obj in run_tool.get_tools()] == ["compute"]
assert "dangerous_compute" not in context.instructions[0][1]
assert "compute" not in context.instructions[0][1]
assert "Filesystem capabilities:" not in context.instructions[0][1]
assert state[provider.source_id]["tool_names"] == ["compute"]
assert state[provider.source_id]["approval_mode"] == "never_require"
json.dumps(state)
provider.remove_tool("compute")
assert [tool_obj.name for tool_obj in run_tool.get_tools()] == ["compute"]
async def test_agent_runs_hyperlight_codeact_end_to_end_with_fake_sandbox(monkeypatch: pytest.MonkeyPatch) -> None:
_FakeSandbox.instances.clear()
monkeypatch.setattr(execute_code_module, "_load_sandbox_class", lambda: _FakeSandbox)
client = _FakeCodeActChatClient()
provider = HyperlightCodeActProvider(tools=[compute])
agent = Agent(client=client, context_providers=[provider])
response = await agent.run("Use the sandbox to add 20 and 22.")
assert response.text == "The sandbox returned 42."
assert client.call_count == 2
assert len(_FakeSandbox.instances) == 1
assert "compute" in _FakeSandbox.instances[0].registered_tools
@skip_if_hyperlight_integration_tests_disabled
async def test_agent_runs_hyperlight_codeact_end_to_end_with_real_sandbox() -> None:
_skip_if_hyperlight_integration_runtime_disabled()
client = _FakeCodeActChatClient()
provider = HyperlightCodeActProvider(tools=[compute])
agent = Agent(client=client, context_providers=[provider])
response = await agent.run("Use the sandbox to add 20 and 22.")
assert response.text == "The sandbox returned 42."
assert client.call_count == 2
@skip_if_hyperlight_integration_tests_disabled
async def test_provider_run_tool_writes_files_with_real_sandbox(tmp_path: Path) -> None:
_skip_if_hyperlight_integration_runtime_disabled()
workspace_root = tmp_path / "workspace"
workspace_root.mkdir()
provider = HyperlightCodeActProvider(workspace_root=workspace_root)
context = _FakeSessionContext()
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
result = await run_tool.invoke(
arguments={
"code": (
'payload = "hello from sandbox"\n'
"output_path = None\n"
'for candidate in ("/output/result.txt",):\n'
" try:\n"
' with open(candidate, "w", encoding="utf-8") as f:\n'
" f.write(payload)\n"
" except OSError:\n"
" continue\n"
" output_path = candidate\n"
" break\n"
'assert output_path is not None, "output path unavailable"\n'
'print("validated")\n'
)
}
)
assert result[0].type == "code_interpreter_tool_result"
outputs = result[0].outputs or []
error_outputs = [
f"{item.message}: {item.error_details}"
for item in outputs
if item.type == "error" and item.error_details is not None
]
assert not error_outputs, error_outputs
text_output = next((item for item in outputs if item.type == "text" and item.text is not None), None)
if text_output is not None:
assert text_output.text == "validated\n"
file_output = next((item for item in outputs if item.type == "data"), None)
if file_output is not None:
assert file_output.uri is not None and file_output.uri.startswith("data:")
assert file_output.additional_properties["path"] in {"/output/result.txt", "/output/output/result.txt"}
@pytest.mark.integration
@skip_if_hyperlight_integration_tests_disabled
@pytest.mark.skipif(sys.platform == "win32", reason="Hyperlight WASM sandbox lacks encodings.idna on Windows")
async def test_provider_run_tool_pings_bing_with_real_sandbox() -> None:
_skip_if_hyperlight_integration_runtime_disabled()
provider = HyperlightCodeActProvider()
provider.add_allowed_domains("bing.com")
context = _FakeSessionContext()
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
result = await run_tool.invoke(
arguments={
"code": (
"import _socket\n\n"
'addresses = _socket.getaddrinfo("bing.com", 80, _socket.AF_INET, _socket.SOCK_STREAM)\n'
'assert addresses, "bing.com did not resolve"\n'
"last_error = None\n"
"for family, socktype, proto, _, sockaddr in addresses:\n"
" connection = None\n"
" try:\n"
" connection = _socket.socket(family, socktype, proto)\n"
" connection.settimeout(10)\n"
" connection.connect(sockaddr)\n"
' print("pinged bing.com")\n'
" break\n"
" except OSError as exc:\n"
" last_error = exc\n"
" finally:\n"
" if connection is not None:\n"
" try:\n"
" connection.close()\n"
" except OSError:\n"
" pass\n"
"else:\n"
' raise last_error or RuntimeError("unable to reach bing.com")\n'
)
}
)
assert result[0].type == "code_interpreter_tool_result"
outputs = result[0].outputs or []
error_outputs = [
f"{item.message}: {item.error_details}"
for item in outputs
if item.type == "error" and item.error_details is not None
]
assert not error_outputs, error_outputs
text_output = next((item for item in outputs if item.type == "text" and item.text is not None), None)
if text_output is not None:
assert text_output.text == "pinged bing.com\n"
# ---------------------------------------------------------------------------
# Real-sandbox tests using shared (long-lived) fixture
# ---------------------------------------------------------------------------
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_runs_simple_code(restored_sandbox) -> None:
result = restored_sandbox.run('print("hello")')
assert result.success
assert "hello" in result.stdout
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_stdout_and_stderr_captured(restored_sandbox) -> None:
result = restored_sandbox.run(
'import sys\nprint("out")\nprint("err", file=sys.stderr)'
)
assert result.success
assert "out" in result.stdout
assert "err" in result.stderr
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_code_failure_returns_nonzero_exit(restored_sandbox) -> None:
result = restored_sandbox.run("raise ValueError('boom')")
assert not result.success
assert "boom" in result.stderr
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_snapshot_restore_keeps_sandbox_functional(restored_sandbox) -> None:
"""Verify snapshot/restore cycle leaves the sandbox in a working state."""
# Mutate the sandbox
result1 = restored_sandbox.run('print("before snapshot")')
assert result1.success
# Take a snapshot and restore
snapshot = restored_sandbox.snapshot()
restored_sandbox.restore(snapshot)
# Sandbox still works after restore
result2 = restored_sandbox.run('print("after restore")')
assert result2.success
assert "after restore" in result2.stdout
# ---------------------------------------------------------------------------
# Real-sandbox tests using fresh (short-lived) fixture
# ---------------------------------------------------------------------------
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_with_tool_registration_and_execution(fresh_sandbox) -> None:
"""Verify that a sync host tool round-trips via call_tool in the real sandbox."""
def multiply(a: int, b: int) -> int:
return a * b
fresh_sandbox.register_tool("multiply", multiply)
fresh_sandbox.run("None")
snapshot = fresh_sandbox.snapshot()
fresh_sandbox.restore(snapshot)
result = fresh_sandbox.run('result = call_tool("multiply", a=6, b=7)\nprint(result)')
assert result.success
assert "42" in result.stdout
@skip_if_hyperlight_integration_tests_disabled
async def test_sandbox_async_callback_round_trips_with_real_sandbox(fresh_sandbox) -> None:
"""Confirm that _make_sandbox_callback (sync wrapper) works with real FFI."""
sandbox_tool = FunctionTool(
func=compute,
name="compute",
description="Add two numbers",
)
callback = execute_code_module._make_sandbox_callback(sandbox_tool)
fresh_sandbox.register_tool("compute", callback)
fresh_sandbox.run("None")
snapshot = fresh_sandbox.snapshot()
fresh_sandbox.restore(snapshot)
result = fresh_sandbox.run('total = call_tool("compute", a=20, b=22)\nprint(total)')
assert result.success
assert "42" in result.stdout
@skip_if_hyperlight_integration_tests_disabled
async def test_output_dir_cleared_between_invocations() -> None:
"""Verify stale output files don't leak across invocations (comment 23)."""
_skip_if_hyperlight_integration_runtime_disabled()
provider = HyperlightCodeActProvider(workspace_root=Path(__file__).parent)
context = _FakeSessionContext()
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
# First invocation: write a file
result1 = await run_tool.invoke(
arguments={
"code": (
'with open("/output/stale.txt", "w") as f:\n'
' f.write("first")\n'
'print("wrote")\n'
)
}
)
assert result1[0].type == "code_interpreter_tool_result"
outputs1 = result1[0].outputs or []
assert any(
item.type == "data" and "stale.txt" in (item.additional_properties or {}).get("path", "")
for item in outputs1
), "First invocation should produce stale.txt"
# Second invocation: no file writes
result2 = await run_tool.invoke(arguments={"code": 'print("clean")\n'})
outputs2 = result2[0].outputs or []
stale_files = [
item
for item in outputs2
if item.type == "data" and "stale.txt" in (item.additional_properties or {}).get("path", "")
]
assert not stale_files, "Stale output file leaked into second invocation"
@skip_if_hyperlight_integration_tests_disabled
async def test_run_code_does_not_block_event_loop() -> None:
"""Verify _run_code uses asyncio.to_thread so the event loop stays responsive (comment 26)."""
_skip_if_hyperlight_integration_runtime_disabled()
provider = HyperlightCodeActProvider()
context = _FakeSessionContext()
state: dict[str, Any] = {}
await provider.before_run(agent=object(), session=None, context=context, state=state)
run_tool = context.tools[0][1][0]
assert isinstance(run_tool, HyperlightExecuteCodeTool)
# Monkeypatch the registry.execute to block on an event, proving the event loop
# stays responsive while the worker thread is blocked.
release = threading.Event()
async_started = asyncio.Event()
loop = asyncio.get_running_loop()
original_execute = run_tool._registry.execute
def _blocking_execute(*, config, code):
loop.call_soon_threadsafe(async_started.set)
release.wait(timeout=10)
return original_execute(config=config, code=code)
run_tool._registry.execute = _blocking_execute # type: ignore[assignment]
concurrent_ran = False
async def _concurrent_task():
nonlocal concurrent_ran
await async_started.wait()
concurrent_ran = True
release.set()
code_task = asyncio.create_task(
run_tool.invoke(arguments={"code": 'print("done")\n'})
)
await _concurrent_task()
result = await code_task
assert concurrent_ran, "Event loop was blocked during sandbox execution"
assert result[0].type == "code_interpreter_tool_result"
+1
View File
@@ -82,6 +82,7 @@ agent-framework-foundry = { workspace = true }
agent-framework-foundry-local = { workspace = true }
agent-framework-gemini = { workspace = true }
agent-framework-github-copilot = { workspace = true }
agent-framework-hyperlight = { workspace = true }
agent-framework-lab = { workspace = true }
agent-framework-mem0 = { workspace = true }
agent-framework-ollama = { workspace = true }
@@ -8,7 +8,7 @@ This folder contains examples demonstrating how to use the Azure AI Search conte
| File | Description |
|------|-------------|
| [`search_context_agentic.py`](search_context_agentic.py) | **Agentic mode** (recommended for most scenarios): Uses Knowledge Bases in Azure AI Search for query planning and multi-hop reasoning. Provides more accurate results through intelligent retrieval with automatic query reformulation. Slightly slower with more token consumption for query planning. [Learn more](https://techcommunity.microsoft.com/blog/azure-ai-foundry-blog/foundry-iq-boost-response-relevance-by-36-with-agentic-retrieval/4470720) |
| [`search_context_agentic.py`](search_context_agentic.py) | **Agentic mode** (recommended for most scenarios): Uses Knowledge Bases in Azure AI Search for query planning and multi-hop reasoning. Provides more accurate results through intelligent retrieval with automatic query reformulation. Slightly slower with more token consumption for query planning. [Learn more](https://learn.microsoft.com/azure/search/agentic-retrieval-overview) |
| [`search_context_semantic.py`](search_context_semantic.py) | **Semantic mode** (fast queries): Fast hybrid search combining vector and keyword search with semantic ranking. Returns raw search results as context. Best for scenarios where speed is critical and simple retrieval is sufficient. |
## Installation
@@ -265,4 +265,4 @@ async with Agent(
- [RAG with Azure AI Search](https://learn.microsoft.com/azure/search/retrieval-augmented-generation-overview)
- [Semantic Search in Azure AI Search](https://learn.microsoft.com/azure/search/semantic-search-overview)
- [Knowledge Bases in Azure AI Search](https://learn.microsoft.com/azure/search/knowledge-store-concept-intro)
- [Agentic Retrieval Blog Post](https://techcommunity.microsoft.com/blog/azure-ai-foundry-blog/foundry-iq-boost-response-relevance-by-36-with-agentic-retrieval/4470720)
- [Agentic Retrieval in Azure AI Search](https://learn.microsoft.com/azure/search/agentic-retrieval-overview)
+53
View File
@@ -45,6 +45,7 @@ members = [
"agent-framework-foundry-local",
"agent-framework-gemini",
"agent-framework-github-copilot",
"agent-framework-hyperlight",
"agent-framework-lab",
"agent-framework-mem0",
"agent-framework-ollama",
@@ -545,6 +546,25 @@ requires-dist = [
{ name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = ">=0.2.1,<=0.2.1" },
]
[[package]]
name = "agent-framework-hyperlight"
version = "1.0.0a260409"
source = { editable = "packages/hyperlight" }
dependencies = [
{ name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "hyperlight-sandbox", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "hyperlight-sandbox-backend-wasm", marker = "(python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" },
{ name = "hyperlight-sandbox-python-guest", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
[package.metadata]
requires-dist = [
{ name = "agent-framework-core", editable = "packages/core" },
{ name = "hyperlight-sandbox", specifier = ">=0.3.0,<0.4" },
{ name = "hyperlight-sandbox-backend-wasm", marker = "(python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')", specifier = ">=0.3.0,<0.4" },
{ name = "hyperlight-sandbox-python-guest", specifier = ">=0.3.0,<0.4" },
]
[[package]]
name = "agent-framework-lab"
version = "1.0.0b260409"
@@ -2725,6 +2745,39 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/48/30/47d0bf6072f7252e6521f3447ccfa40b421b6824517f82854703d0f5a98b/hyperframe-6.1.0-py3-none-any.whl", hash = "sha256:b03380493a519fce58ea5af42e4a42317bf9bd425596f7a0835ffce80f1a42e5", size = 13007, upload-time = "2025-01-22T21:41:47.295Z" },
]
[[package]]
name = "hyperlight-sandbox"
version = "0.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/cf/fe/ce88996ea3e3e05130d6f0e8cd2ffbe9ab9bf3d9448b7050d4b8d0802b0a/hyperlight_sandbox-0.3.0.tar.gz", hash = "sha256:00491ce267ffbdb206377c79b4afd86510177ad73f4daf2ef7fce02b54eaf801", size = 9251, upload-time = "2026-04-07T03:49:52.542Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2b/33/e6dcd6729308d13570ae2d3be0e476019a6f3fea387d7549bb1f77ce0408/hyperlight_sandbox-0.3.0-py3-none-any.whl", hash = "sha256:ba8e6779d64e9c187acd93456851ebafaed2f49380e5d132bc0906a4080d2217", size = 5723, upload-time = "2026-04-07T03:49:53.276Z" },
]
[[package]]
name = "hyperlight-sandbox-backend-wasm"
version = "0.3.0"
source = { registry = "https://pypi.org/simple" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/58/91/c9d68cad7996fdd2f1facef1453156bdd8d52eefa976cc8c827c13029497/hyperlight_sandbox_backend_wasm-0.3.0-cp310-cp310-manylinux_2_34_x86_64.whl", hash = "sha256:eda362f5f737b0823326290d7627c76ce0547a78e70f07f8c9d177e34622fc02", size = 3806454, upload-time = "2026-04-07T03:49:24.238Z" },
{ url = "https://files.pythonhosted.org/packages/9a/6f/6b2399a1caf59dd19b635d99ee1add0c975af7bc3317f5d0f1f9c3f90aa0/hyperlight_sandbox_backend_wasm-0.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:79347b7ae94f2786691b04cb52130dabc5991e0c03b42a24bad8adc766832655", size = 3283951, upload-time = "2026-04-07T03:49:17.137Z" },
{ url = "https://files.pythonhosted.org/packages/23/f2/b380c34a0ce8d486a05adb66757f98cca029e1fb1c96b1c29be0d25d3882/hyperlight_sandbox_backend_wasm-0.3.0-cp311-cp311-manylinux_2_34_x86_64.whl", hash = "sha256:aff9eec4803fb535a140298e2632529f4150fcf3c6ea3ff2ae4571572a836116", size = 3806601, upload-time = "2026-04-07T03:49:22.853Z" },
{ url = "https://files.pythonhosted.org/packages/b4/5a/fb78cfd934e0523887b8d5b073b7b2aed3b545add21cda3aa95929ac1659/hyperlight_sandbox_backend_wasm-0.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:b6151704dd19862c9869b115752b4504b45d0b2eeb46aa9385a1a3b8be11cfa8", size = 3284164, upload-time = "2026-04-07T03:49:18.556Z" },
{ url = "https://files.pythonhosted.org/packages/21/bc/4e21f5c7ccd9307ac63a61c71b62a57ee4a9e6eec77fc72ff072907a21f5/hyperlight_sandbox_backend_wasm-0.3.0-cp312-cp312-manylinux_2_34_x86_64.whl", hash = "sha256:cfd1d22ce221774d82a5174d268d56ff70fc1a23fb993a6491358b5d0ed169bf", size = 3802901, upload-time = "2026-04-07T03:49:19.845Z" },
{ url = "https://files.pythonhosted.org/packages/9a/41/646be9b0c7bb0f9192e45a77414673aa414eb316c92b5312efe6fb4ce802/hyperlight_sandbox_backend_wasm-0.3.0-cp312-cp312-win_amd64.whl", hash = "sha256:229ab494a422f2de895a2a27ad6a6a2daed710ea062d7c213878bbe5f5b32fa7", size = 3281220, upload-time = "2026-04-07T03:49:21.368Z" },
{ url = "https://files.pythonhosted.org/packages/74/3a/f8ec4a41fffba4036dfc3cbddc3dfb6e87466b01afe1cb0a50cc6a0f0eed/hyperlight_sandbox_backend_wasm-0.3.0-cp313-cp313-manylinux_2_34_x86_64.whl", hash = "sha256:b91905ee2ddd36a78b0dd13b1a62be99a995a45121587c111692591e40b36912", size = 3802789, upload-time = "2026-04-07T03:49:15.614Z" },
{ url = "https://files.pythonhosted.org/packages/3c/62/dfa8c15102f9b8ec5c3b5ffb54b99d60c75e7a6e4d00540757656bc5a5d8/hyperlight_sandbox_backend_wasm-0.3.0-cp313-cp313-win_amd64.whl", hash = "sha256:eff682761c3b86abfe7e0d523ea0e6d5c7e8299302917c53918743b82c9d1ea2", size = 3280501, upload-time = "2026-04-07T03:49:13.939Z" },
]
[[package]]
name = "hyperlight-sandbox-python-guest"
version = "0.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/23/6a/f182c4315d31a98dd3b82f9274638e3adb399779584af93c5087bb2f814f/hyperlight_sandbox_python_guest-0.3.0.tar.gz", hash = "sha256:b1de5d8e87375dc6bef744ecd7ae2a7f43d5f6b913b4e990e9872bd439c0b19e", size = 21554625, upload-time = "2026-04-07T03:49:42.672Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c9/8e/4cd754928464f56528645c7421ccbb3fcbe45ad2542f899712b0f2f2c0e1/hyperlight_sandbox_python_guest-0.3.0-py3-none-any.whl", hash = "sha256:3c55a7420666ad9a208893dbdf7ad1b5c8ad4f3a94b1a56e64979719c7ce95c1", size = 21716481, upload-time = "2026-04-07T03:49:39.885Z" },
]
[[package]]
name = "idna"
version = "3.11"