Python: feat: add agent-framework-monty (Monty-backed CodeAct provider) (#5915)

* Python: feat: add agent-framework-monty (Monty-backed CodeAct)

New alpha package that wraps pydantic-monty (a Rust-based Python
interpreter) behind the same CodeAct API surface as
agent-framework-hyperlight, so users can swap providers with minimal
code change.

Public API (agent_framework_monty):
- MontyCodeActProvider — ContextProvider that injects a run-scoped
  execute_code tool plus dynamic CodeAct instructions.
- MontyExecuteCodeTool — standalone FunctionTool for mixed-tool agents
  or manual static wiring.
- FileMount / FileMountInput / MountMode — public types mirroring the
  Hyperlight names, with Monty's mode (read-only/read-write/overlay)
  and write_bytes_limit on FileMount.

Constructor kwargs (both classes) mirror Hyperlight where possible:
tools, approval_mode, workspace_root, file_mounts; plus a Monty-only
resource_limits forwarding ResourceLimits to Monty.start().

Filesystem flow:
- workspace_root auto-mounts at /input (read-write), matching Hyperlight.
- file_mounts accepts string shorthand, (host, mount) tuple, or
  FileMount with mode + write cap.
- Files written under read-write mounts are scanned post-execution and
  returned as Content.from_data items (mirrors Hyperlight /output).
- overlay mounts buffer writes in-memory; read-only mounts reject writes.

Internals:
- _monty_bridge.InlineCodeBridge ports the inline (non-durable) bridge
  from anthonychu/maf-codeact-monty-python; handles FunctionSnapshot /
  FutureSnapshot pause/resume, dispatches direct typed calls + the
  call_tool fallback, forwards mount/limits to Monty.start(...).
- generate_type_stubs emits per-tool stubs so Monty's `ty` type-checker
  rejects bad calls before any host tool runs.

Alpha-policy compliance (per python-package-management skill):
- Added agent-framework-monty = { workspace = true } to root
  pyproject.toml.
- Added row to python/PACKAGE_STATUS.md.
- Added monty entry under Experimental in python/AGENTS.md.
- NOT added to core[all]; NO agent_framework.monty lazy shim (deferred
  to beta promotion).

Samples (three sets, import from agent_framework_monty directly):
- samples/02-agents/context_providers/code_act/monty_code_act.py
  (provider pattern) + updated local README.
- samples/02-agents/tools/monty_code_interpreter/ (standalone +
  manual-wiring + README).
- samples/04-hosting/foundry-hosted-agents/responses/11_monty_codeact/
  (full hosted-agent layout with uv-based pyproject.toml + Dockerfile,
  Azure Monitor wiring via APPLICATIONINSIGHTS_CONNECTION_STRING +
  enable_instrumentation, ENABLE_INSTRUMENTATION and
  ENABLE_SENSITIVE_DATA env vars). The alpha wheel is vendored into
  ./wheels/ (gitignored) via vendor-wheel.sh; new row added to the
  parent Responses-API README.

Tests:
- 28 hermetic unit tests (stubbed pydantic_monty).
- 18 integration tests marked @pytest.mark.integration, auto-skipped
  when pydantic_monty is unimportable; exercise the real Monty
  runtime: print round-trip, last-expression value, direct typed
  tool dispatch, call_tool fallback, async tool, asyncio.gather
  parallelism, ty type-check rejection, OS blocked by default,
  workspace_root read+write capture, read-only / overlay mount
  semantics, resource_limits.max_duration_secs abort, approval
  gating end-to-end, full Agent run with a scripted chat client.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: fix: monty FileMount test compares against the normalized POSIX path

The shorthand string mount goes through _normalize_mount_path, which
rewrites Windows drive letters like 'C:\\Users\\...' into
'/C:/Users/...' (POSIX-style). The Windows CI runners surfaced this
because tmp_path resolves to a backslashed Windows path; the test was
comparing against the raw str(host_a) instead of the normalized form.

Compare against _normalize_mount_path(str(host_a)) so the assertion is
platform-independent.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: fix: address PR #5915 review feedback

- _execute_code_tool docstring: clarify that the Monty backend supports
  scoped filesystem access via workspace_root / file_mounts (blocked by
  default).
- _to_monty_mount: import pydantic_monty lazily through load_monty so
  missing-dependency errors surface as the same actionable RuntimeError
  the rest of the package raises (not a bare ImportError at module load).
  Renamed _load_monty -> load_monty for the same reason.
- _python_type_repr: emit None for type(None) instead of Any, and
  normalize both typing.Union[...] and PEP-604 X | Y to PEP-604 syntax
  so Optional[X] / Union[..., None] / -> None signatures round-trip
  correctly through ty validation. Added a regression test.
- _PrintCollector: track a running character count instead of
  recomputing sum(len(c) for c in self.chunks) per callback. Eliminates
  the O(n^2) cost on print-heavy code.
- Instructions: mention that the value of the final expression is also
  returned alongside captured stdout (matches actual behavior).
- 11_monty_codeact Dockerfile: pin ghcr.io/astral-sh/uv to 0.11.6
  instead of :latest for reproducible builds.
- 11_monty_codeact README: replace the bare "see parent README" pointer
  with sample-specific steps (./vendor-wheel.sh + uv sync + uv run),
  since the sample uses pyproject.toml + a vendored wheel rather than
  requirements.txt.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: sample: 11_monty_codeact installs agent-framework-monty from PyPI

Drop the vendored-wheel scaffolding now that agent-framework-monty is on
PyPI as an alpha (1.0.0a*) release:

- pyproject.toml: remove [tool.uv.sources] override; keep [tool.uv]
  prerelease = "allow" so uv pulls the alpha automatically.
- Dockerfile: drop the COPY wheels/ step.
- README: drop the ./vendor-wheel.sh setup step and the
  not-yet-on-PyPI warning.
- Delete vendor-wheel.sh and the gitignored wheels/ directory.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: fix(monty): harden post-execution file capture against symlink escape

Same class of issue as the MSRC-reported Hyperlight finding: the
post-execution capture walked workspace_root with Path.rglob() +
is_file() + read_bytes() - all of which follow symlinks. An attacker
who controls the workspace (cloned repo, extracted archive, shared
workspace) could pre-place `workspace/leak.txt -> /etc/passwd` or
`workspace/outside_dir -> /etc/` and have host files surface as
captured Content items.

Monty's mount layer already rejects symlink reads from inside the
sandbox across all three modes (verified empirically), so the runtime
path was safe. This commit closes the post-execution scan path.

Changes:
- New `_iter_real_files(root)` walker that uses iterdir() +
  is_symlink() to skip symlinks at every directory level and yields
  only real files. Replaces the previous `host_root.rglob("*")` calls
  in both `_snapshot_writable_mounts` and `_capture_written_files`.
- Use `Path.lstat()` instead of `Path.stat()` so size/mtime can never
  be taken from a symlink target.
- Three new integration tests reproducing the MSRC attack shape
  against the workspace_root flow: symlink-to-file outside workspace,
  symlink-to-directory outside workspace, and a guard ensuring
  legitimate sandbox writes are still captured when symlinks are
  present.

Per user request, hyperlight is untouched in this commit (separate fix).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: fix(monty): skip symlink regression tests when unsupported

Apply the same Windows-CI safety guard as the hyperlight fix in PR #5919:
the three symlink integration tests create symlinks via Path.symlink_to(),
which fails with OSError / NotImplementedError on unprivileged Windows
runners. Add a local _symlinks_supported helper (mirroring the one in
packages/core/tests/core/test_skills.py) and pytest.skip when symlinks
aren't available, so the tests no longer fail for environment reasons.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: fix(monty): address PR #5915 follow-up review feedback

- _invoke_tool: drop the inspect.iscoroutinefunction(...) branch and
  always `await self.tool_map[name](**kwargs)`. Every entry in
  tool_map is `partial(FunctionTool.invoke, skip_parsing=True)` and
  FunctionTool.invoke is `async def`, so the branching was dead code -
  and on Python versions affected by cpython#98590,
  iscoroutinefunction(partial(bound_async_method, ...)) returns False,
  causing the bridge to take the asyncio.to_thread path, return an
  unawaited coroutine, and surface it as a JSON-serialization failure
  for every tool call. Added a regression test
  test_invoke_tool_awaits_partial_wrapped_async_method.

- generate_type_stubs: skip tools whose name is not a valid Python
  identifier or is a Python keyword. FunctionTool.name has no upstream
  validation, so a name like "weird-name" produced a syntax error in
  the stubs and a name like "broken\n    pass\nasync def injected"
  would inject arbitrary stub source. Non-identifier names stay
  reachable via `call_tool("weird-name", ...)` at runtime; they just
  don't get type-checked stubs. Added regression test
  test_generate_type_stubs_skips_non_identifier_tool_names.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-05-20 02:35:23 +02:00
committed by GitHub
Unverified
parent 4b0522d62d
commit 4609535e22
29 changed files with 3738 additions and 10 deletions
+78
View File
@@ -0,0 +1,78 @@
# Monty Package (agent-framework-monty)
Monty-backed CodeAct integrations for the Microsoft Agent Framework.
> [!NOTE]
> **Alpha package.** Not part of `agent-framework[all]` yet. Install explicitly
> with `pip install agent-framework-monty --pre`.
## Core Classes
- **`MontyCodeActProvider`** — `ContextProvider` that injects a run-scoped
`execute_code` tool plus dynamic CodeAct instructions. Mirrors the
`HyperlightCodeActProvider` API for the parts that apply to a non-sandboxed
Python interpreter.
- **`MontyExecuteCodeTool`** — `FunctionTool` that wraps the Monty interpreter.
Use directly for mixed-tool agents or manual static wiring. Mirrors
`HyperlightExecuteCodeTool`.
## Public API
```python
from agent_framework_monty import (
FileMount,
FileMountInput,
MontyCodeActProvider,
MontyExecuteCodeTool,
MountMode,
)
```
`MontyCodeActProvider` and `MontyExecuteCodeTool` both accept:
- `tools` — host tool callables / `FunctionTool`s
- `approval_mode``"never_require"` (default) or `"always_require"`
- `workspace_root` — host directory auto-mounted at `/input`
(mirrors `HyperlightCodeActProvider.workspace_root`)
- `file_mounts` — sequence of `FileMountInput` (str shorthand,
`(host_path, mount_path)` tuple, or `FileMount`)
- `resource_limits` — Monty `ResourceLimits` TypedDict
Tool-management methods on both classes: `add_tools`, `get_tools`,
`remove_tool`, `clear_tools`. Mount-management methods: `add_file_mounts`,
`get_file_mounts`, `remove_file_mount`, `clear_file_mounts`.
`MontyExecuteCodeTool` additionally exposes:
- `build_instructions(*, tools_visible_to_model: bool) -> str`
- `create_run_tool() -> MontyExecuteCodeTool`
- `build_serializable_state() -> dict[str, Any]`
- `workspace_root`, `resource_limits` properties
## Architecture
- **`_types.py`** — `FileMount`, `FileMountInput`, `MountMode` (public).
- **`_provider.py`** — `MontyCodeActProvider` (thin wrapper around the tool).
- **`_execute_code_tool.py`** — `MontyExecuteCodeTool` plus tool / mount
normalization, approval helpers, dynamic `description`/`instructions`
builders, and the post-execution file-capture flow that surfaces files
written to `read-write` mounts as `Content.from_data` items.
- **`_monty_bridge.py`** — `InlineCodeBridge` and `generate_type_stubs`,
adapted from the reference Monty CodeAct repo. Pauses on `FunctionSnapshot`
to dispatch host calls, then resumes; supports direct typed tool calls,
the `call_tool` fallback, `asyncio.gather` fan-out, and forwards
``mount`` / ``limits`` to `Monty(...).start(...)`.
- **`_instructions.py`** — dynamic instruction / tool-description builders
(include filesystem capability summaries when mounts are configured).
## Not implemented (yet)
| Capability | Monty primitive | Status |
|------------|-----------------|--------|
| Custom virtual filesystem | `OSAccess` subclass passed to `Monty(...).start(os=...)` | Not exposed. Strictly more general than file mounts; useful when you want a fully synthetic FS. |
| Outbound URL allow-list | No Monty primitive — expose `fetch_url` as a host tool with the allow-list check in your tool function. | Not exposed in this package; users add it as a regular tool. |
## Out of scope (for now)
- **Durable execution** — the reference Monty CodeAct repo also offers a
Durable-Functions-backed mode (`DurableCodeBridge`, `register_durable_codeact`,
`wait_for_external_event`, per-tool approval via external events). That is
intentionally not in this package yet.
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) Microsoft Corporation.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
+179
View File
@@ -0,0 +1,179 @@
# agent-framework-monty
Monty-backed CodeAct integrations for Microsoft Agent Framework.
> [!WARNING]
> This package is in **alpha**. APIs may change without notice. It is not part of
> `agent-framework[all]` yet; install it explicitly with `--pre`.
## Installation
```bash
pip install agent-framework-monty --pre
```
The package depends on [`pydantic-monty`](https://github.com/pydantic/monty), a
Rust-based Python interpreter, so it runs on Linux, macOS, and Windows wherever
Monty wheels are published — no hypervisor or WASM backend required.
## Quick start
### Context provider (recommended)
Use `MontyCodeActProvider` to automatically inject the `execute_code` tool and
CodeAct instructions into every agent run. Tools registered on the provider are
available inside the Monty interpreter as **typed async functions** (e.g.
`await compute(operation="add", a=1, b=2)`), and as a fallback through
`call_tool(...)`.
```python
from agent_framework import Agent, tool
from agent_framework_monty import MontyCodeActProvider
@tool
def compute(operation: str, a: float, b: float) -> float:
"""Perform a math operation."""
ops = {"add": a + b, "subtract": a - b, "multiply": a * b, "divide": a / b}
return ops[operation]
codeact = MontyCodeActProvider(
tools=[compute],
approval_mode="never_require",
)
agent = Agent(
client=client,
name="CodeActAgent",
instructions="You are a helpful assistant.",
context_providers=[codeact],
)
result = await agent.run("Multiply 6 by 7 using execute_code.")
```
### Standalone tool
Use `MontyExecuteCodeTool` directly when you want full control over how the
tool is added to the agent (e.g. when mixing sandbox tools with direct-only
tools on the same agent).
```python
from agent_framework import Agent, tool
from agent_framework_monty import MontyExecuteCodeTool
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email (direct-only, not available inside the sandbox)."""
return f"Email sent to {to}"
execute_code = MontyExecuteCodeTool(
tools=[compute],
approval_mode="never_require",
)
agent = Agent(
client=client,
name="MixedToolsAgent",
instructions="You are a helpful assistant.",
tools=[send_email, execute_code],
)
```
### Manual static wiring
For fixed configurations where provider lifecycle overhead is unnecessary,
build the CodeAct instructions once and pass them to the agent at construction
time:
```python
execute_code = MontyExecuteCodeTool(
tools=[compute],
approval_mode="never_require",
)
codeact_instructions = execute_code.build_instructions(tools_visible_to_model=False)
agent = Agent(
client=client,
name="StaticWiringAgent",
instructions=f"You are a helpful assistant.\n\n{codeact_instructions}",
tools=[execute_code],
)
```
### File mounts and resource limits
Mount host directories into the sandbox and cap execution resources:
```python
from agent_framework_monty import FileMount, MontyCodeActProvider
codeact = MontyCodeActProvider(
tools=[compute],
workspace_root="/host/workspace", # auto-mounted at /input (read-write)
file_mounts=[
"/host/data", # shorthand: same path on both sides
("/host/models", "/sandbox/models"), # explicit (host, mount_path)
FileMount( # full control
host_path="/host/cache",
mount_path="/sandbox/cache",
mode="overlay", # "read-only" | "read-write" | "overlay"
write_bytes_limit=10 * 1024 * 1024,
),
],
resource_limits={ # Monty ResourceLimits TypedDict
"max_duration_secs": 5.0,
"max_memory": 64 * 1024 * 1024,
},
)
```
- **`workspace_root`** mirrors the Hyperlight default: the directory is mounted
at `/input` in `read-write` mode.
- **`file_mounts`** accepts a string shorthand, a `(host_path, mount_path)`
tuple, or a `FileMount` named tuple (with optional `mode` and
`write_bytes_limit`).
- Files written by the sandbox to any **`read-write`** mount are scanned
after each `execute_code` call and returned as `Content.from_data(...)`
attachments (with a `path` annotation in `additional_properties`),
mirroring Hyperlight's `/output` flow.
- `overlay` mounts buffer writes in memory (nothing leaks to the host and
nothing is captured). `read-only` mounts reject writes.
- **`resource_limits`** is forwarded straight to Monty's
[`ResourceLimits`](https://github.com/pydantic/monty) TypedDict
(`max_allocations`, `max_duration_secs`, `max_memory`, `gc_interval`,
`max_recursion_depth`).
## DSL inside `execute_code`
The model generates Python code that runs inside Monty's Rust-based interpreter.
Available primitives:
| Primitive | Behavior |
|-----------|----------|
| `await tool_name(**kwargs)` | Direct typed call to a registered host tool. Argument types are checked before execution. |
| `await call_tool("name", **kwargs)` | Generic fallback that dispatches by tool name. Not type-checked. |
| `asyncio.gather(...)` | Fans out concurrent tool calls. |
| `print(...)` | Captured and surfaced as text in the tool result. |
## Notes
- `MontyCodeActProvider` and `MontyExecuteCodeTool` mirror the API surface of
the `agent-framework-hyperlight` counterparts where the underlying runtime
supports it.
- Monty interprets a **subset** of Python (a Rust-based interpreter). Most
control flow, common stdlib modules (`sys`, `os`, `typing`, `asyncio`, `re`,
`datetime`, `json`), and async functions are supported, but exotic features
may not be available. OS-level access (filesystem, network, subprocess) is
rejected with `PermissionError` **by default**; mount host directories with
`workspace_root` / `file_mounts` to grant scoped filesystem access.
- Code is type-checked against tool signatures via
[ty](https://docs.astral.sh/ty/) before execution, so wrong argument types
surface as a clear error before any host tool runs.
- The alpha package is **not** part of `agent-framework[all]` yet, so it must
be installed explicitly. Once promoted to beta it will be reachable via the
lazy-loading namespace `agent_framework.monty`.
@@ -0,0 +1,23 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import importlib.metadata
from ._execute_code_tool import MontyExecuteCodeTool
from ._provider import MontyCodeActProvider
from ._types import FileMount, FileMountInput, MountMode
try:
__version__ = importlib.metadata.version(__name__)
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0"
__all__ = [
"FileMount",
"FileMountInput",
"MontyCodeActProvider",
"MontyExecuteCodeTool",
"MountMode",
"__version__",
]
@@ -0,0 +1,558 @@
# Copyright (c) Microsoft. All rights reserved.
"""``MontyExecuteCodeTool`` - a ``FunctionTool`` that runs Python in Monty.
Mirrors the public API of ``HyperlightExecuteCodeTool`` for the subset that
applies to a pure-Python interpreter (no backends to choose from). By default
the Monty sandbox rejects OS / filesystem / network calls with
``PermissionError``; pass ``workspace_root`` or ``file_mounts`` to expose
scoped host directories, and the tool will capture any files written under
``read-write`` mounts as ``Content`` items in the response.
"""
from __future__ import annotations
import json
import mimetypes
from collections.abc import Callable, Iterator, Sequence
from copy import copy
from functools import partial
from pathlib import Path, PurePosixPath
from typing import Any, cast
from agent_framework import Content, FunctionTool
from agent_framework._tools import ApprovalMode, normalize_tools
from ._instructions import build_codeact_instructions, build_execute_code_description
from ._monty_bridge import InlineCodeBridge, generate_type_stubs
from ._types import FileMount, FileMountInput
EXECUTE_CODE_TOOL_NAME = "execute_code"
EXECUTE_CODE_TOOL_DESCRIPTION = "Execute Python in a Monty interpreter."
#: Virtual path that the optional ``workspace_root`` directory is mounted at,
#: matching the Hyperlight default. Use ``file_mounts`` for any other path.
WORKSPACE_MOUNT_PATH = "/input"
#: Maximum bytes per captured output file. Files larger than this are skipped
#: and a ``Content.from_text`` warning is appended in their place.
MAX_CAPTURED_FILE_BYTES = 5 * 1024 * 1024 # 5 MiB
EXECUTE_CODE_INPUT_SCHEMA: dict[str, Any] = {
"type": "object",
"title": "_ExecuteCodeInput",
"properties": {
"code": {
"type": "string",
"title": "Code",
"description": "Python code to execute in a Monty interpreter.",
},
},
"required": ["code"],
}
def _collect_tools(*tool_groups: Any) -> list[FunctionTool]:
"""Merge tool groups, dropping any ``execute_code`` entries and deduping by name."""
tools_by_name: dict[str, FunctionTool] = {}
for tool_group in tool_groups:
normalized_group = normalize_tools(tool_group)
for tool_obj in normalized_group:
if not isinstance(tool_obj, FunctionTool):
continue
if tool_obj.name == EXECUTE_CODE_TOOL_NAME:
continue
tools_by_name.pop(tool_obj.name, None)
tools_by_name[tool_obj.name] = tool_obj
return list(tools_by_name.values())
def _resolve_execute_code_approval_mode(
*,
base_approval_mode: ApprovalMode,
tools: Sequence[FunctionTool],
) -> ApprovalMode:
if base_approval_mode == "always_require":
return "always_require"
if any(tool_obj.approval_mode == "always_require" for tool_obj in tools):
return "always_require"
return "never_require"
def _normalize_mount_path(mount_path: str) -> str:
"""Normalize a virtual mount path to a clean POSIX absolute path."""
raw = mount_path.strip().replace("\\", "/")
if not raw:
raise ValueError("mount_path must not be empty.")
pure = PurePosixPath(raw)
parts = [part for part in pure.parts if part not in {"", "/", "."}]
if any(part == ".." for part in parts):
raise ValueError("mount_path must not contain '..' segments.")
if not parts:
raise ValueError("mount_path must point to a concrete absolute path.")
return "/" + "/".join(parts)
def _resolve_existing_directory(value: str | Path) -> Path:
resolved = Path(value).expanduser().resolve(strict=True)
if not resolved.is_dir():
raise ValueError(f"Path {value!r} must point to an existing directory.")
return resolved
def _is_file_mount_pair(value: Any) -> bool:
if not isinstance(value, tuple) or isinstance(value, FileMount):
return False
items = cast("tuple[object, ...]", value)
if len(items) != 2:
return False
host_path, mount_path = items
return isinstance(host_path, (str, Path)) and isinstance(mount_path, str)
def _normalize_file_mount(file_mount: FileMountInput) -> FileMount:
if isinstance(file_mount, FileMount):
host_path = file_mount.host_path
mount_path = file_mount.mount_path
mode = file_mount.mode
write_limit = file_mount.write_bytes_limit
elif isinstance(file_mount, str):
host_path = file_mount
mount_path = file_mount
mode = "overlay"
write_limit = None
else:
host_path, mount_path = file_mount
mode = "overlay"
write_limit = None
return FileMount(
host_path=_resolve_existing_directory(host_path),
mount_path=_normalize_mount_path(mount_path),
mode=mode,
write_bytes_limit=write_limit,
)
def _to_monty_mount(file_mount: FileMount) -> Any:
"""Convert a public :class:`FileMount` to Monty's ``MountDir``.
Imports lazily through the bridge's loader so missing-dependency errors
surface as the same actionable ``RuntimeError`` the rest of the package
raises, rather than a bare ``ImportError`` from a top-level import.
"""
from ._monty_bridge import load_monty # avoid top-level pydantic_monty import
monty_module = load_monty()
return monty_module.MountDir(
virtual_path=file_mount.mount_path,
host_path=str(file_mount.host_path),
mode=file_mount.mode,
write_bytes_limit=file_mount.write_bytes_limit,
)
def _make_tool_callback(tool_obj: FunctionTool) -> Callable[..., Any]:
"""Return an async callable that invokes ``tool_obj`` with the bridge's kwargs.
Returns the raw native value (no ``Content`` wrapping) so the Monty interpreter
receives real Python objects. ``FunctionTool.invoke`` accepts direct keyword
arguments and handles both sync and async underlying functions internally.
"""
return partial(copy(tool_obj).invoke, skip_parsing=True)
class MontyExecuteCodeTool(FunctionTool):
"""Execute Python code inside a Monty interpreter.
Tools registered on this object are available inside the interpreter as
typed async functions (e.g. ``await tool_name(...)``). Argument types are
validated by the [ty](https://docs.astral.sh/ty/) type checker before any
host tool runs.
Optional filesystem access is exposed via:
- ``workspace_root`` — auto-mounts a host directory at ``/input`` (matching
Hyperlight's default).
- ``file_mounts`` — extra :class:`FileMount` entries for fine-grained
control (mount path, read-only / read-write / overlay mode, write
byte caps).
Files written by sandboxed code to any **read-write** mount are scanned
after execution and returned as ``Content.from_data`` items, mirroring
Hyperlight's ``/output`` flow.
``resource_limits`` is forwarded to Monty's ``ResourceLimits`` to cap CPU
time, memory, output size, recursion depth, and GC frequency.
All mutators (``add_tools``, ``add_file_mounts`` etc.) must be called from
the same task/thread that owns the tool. Monty itself runs on the event
loop, so no internal locking is needed.
"""
def __init__(
self,
*,
tools: FunctionTool | Callable[..., Any] | Sequence[FunctionTool | Callable[..., Any]] | None = None,
approval_mode: ApprovalMode | None = None,
workspace_root: str | Path | None = None,
file_mounts: FileMountInput | Sequence[FileMountInput] | None = None,
resource_limits: dict[str, Any] | None = None,
) -> None:
super().__init__(
name=EXECUTE_CODE_TOOL_NAME,
description=EXECUTE_CODE_TOOL_DESCRIPTION,
approval_mode="never_require",
func=self._run_code,
input_model=EXECUTE_CODE_INPUT_SCHEMA,
)
self._default_approval_mode: ApprovalMode = approval_mode or "never_require"
self._managed_tools: list[FunctionTool] = []
self._workspace_root: Path | None = (
_resolve_existing_directory(workspace_root) if workspace_root is not None else None
)
self._file_mounts: dict[str, FileMount] = {}
self._resource_limits: dict[str, Any] | None = dict(resource_limits) if resource_limits else None
if tools is not None:
self.add_tools(tools)
if file_mounts is not None:
self.add_file_mounts(file_mounts)
self._refresh_approval_mode()
@property
def description(self) -> str:
# During FunctionTool.__init__, ``_managed_tools`` is not yet set.
if not hasattr(self, "_managed_tools"):
return str(self.__dict__.get("description", EXECUTE_CODE_TOOL_DESCRIPTION))
return build_execute_code_description(
tools=self._managed_tools,
mounts=self._effective_mounts(),
)
@description.setter
def description(self, value: str) -> None:
self.__dict__["description"] = value
def add_tools(
self,
tools: FunctionTool | Callable[..., Any] | Sequence[FunctionTool | Callable[..., Any]],
) -> None:
"""Add Monty-side tools to this execute_code surface."""
self._managed_tools = _collect_tools(self._managed_tools, tools)
self._refresh_approval_mode()
def get_tools(self) -> list[FunctionTool]:
"""Return the currently managed Monty tools."""
return list(self._managed_tools)
def remove_tool(self, name: str) -> None:
"""Remove one managed Monty tool by name."""
remaining_tools = [tool_obj for tool_obj in self._managed_tools if tool_obj.name != name]
if len(remaining_tools) == len(self._managed_tools):
raise KeyError(f"No managed tool named {name!r} is registered.")
self._managed_tools = remaining_tools
self._refresh_approval_mode()
def clear_tools(self) -> None:
"""Remove all managed Monty tools."""
self._managed_tools = []
self._refresh_approval_mode()
def add_file_mounts(self, file_mounts: FileMountInput | Sequence[FileMountInput]) -> None:
"""Add one or more file mounts.
A single string mounts the same path on both sides. Use a
``(host_path, mount_path)`` tuple or :class:`FileMount` when the paths
differ or when you need to set the mount mode / write limit.
"""
if isinstance(file_mounts, (str, FileMount)) or _is_file_mount_pair(file_mounts):
normalized = [_normalize_file_mount(cast("FileMountInput", file_mounts))]
else:
normalized = [_normalize_file_mount(item) for item in cast("Sequence[FileMountInput]", file_mounts)]
for mount in normalized:
self._file_mounts[mount.mount_path] = mount
def get_file_mounts(self) -> list[FileMount]:
"""Return the configured file mounts (excluding ``workspace_root``)."""
return list(self._file_mounts.values())
def remove_file_mount(self, mount_path: str) -> None:
"""Remove one file mount by its sandbox path."""
normalized = _normalize_mount_path(mount_path)
if normalized not in self._file_mounts:
raise KeyError(f"No file mount exists for {mount_path!r}.")
del self._file_mounts[normalized]
def clear_file_mounts(self) -> None:
"""Remove all configured file mounts."""
self._file_mounts.clear()
@property
def workspace_root(self) -> Path | None:
"""Return the configured workspace root, if any."""
return self._workspace_root
@property
def resource_limits(self) -> dict[str, Any] | None:
"""Return the configured Monty :class:`pydantic_monty.ResourceLimits`, if any."""
return dict(self._resource_limits) if self._resource_limits else None
def build_instructions(self, *, tools_visible_to_model: bool) -> str:
"""Build the current CodeAct instructions for this execute_code surface."""
return build_codeact_instructions(
tools=list(self._managed_tools),
tools_visible_to_model=tools_visible_to_model,
mounts=self._effective_mounts(),
)
def create_run_tool(self) -> MontyExecuteCodeTool:
"""Create a run-scoped snapshot of this execute_code surface."""
return MontyExecuteCodeTool(
tools=self.get_tools(),
approval_mode=self._default_approval_mode,
workspace_root=self._workspace_root,
file_mounts=list(self._file_mounts.values()) or None,
resource_limits=self._resource_limits,
)
def build_serializable_state(self) -> dict[str, Any]:
"""Return a JSON-serializable snapshot of the effective run state."""
approval_mode = _resolve_execute_code_approval_mode(
base_approval_mode=self._default_approval_mode,
tools=self._managed_tools,
)
mounts = self._effective_mounts()
return {
"runtime": "monty",
"approval_mode": approval_mode,
"tool_names": [tool_obj.name for tool_obj in self._managed_tools],
"workspace_root": str(self._workspace_root) if self._workspace_root is not None else None,
"file_mounts": [
{
"host_path": str(mount.host_path),
"mount_path": mount.mount_path,
"mode": mount.mode,
"write_bytes_limit": mount.write_bytes_limit,
}
for mount in mounts
],
"resource_limits": dict(self._resource_limits) if self._resource_limits else None,
}
def to_dict(self, *, exclude: set[str] | None = None, exclude_none: bool = True) -> dict[str, Any]:
# Materialize the dynamic description so the dump captures the current tool list.
self.__dict__["description"] = self.description
return super().to_dict(exclude=exclude, exclude_none=exclude_none)
def _refresh_approval_mode(self) -> None:
self.approval_mode = _resolve_execute_code_approval_mode(
base_approval_mode=self._default_approval_mode,
tools=self._managed_tools,
)
def _build_tool_map(self, tools: Sequence[FunctionTool]) -> dict[str, Callable[..., Any]]:
return {tool_obj.name: _make_tool_callback(tool_obj) for tool_obj in tools}
def _build_type_stub_map(self, tools: Sequence[FunctionTool]) -> dict[str, Callable[..., Any]]:
"""Return a name -> underlying-Python-callable map for type stub generation.
The raw Python function attached to the ``FunctionTool`` carries the
author's actual ``Annotated`` parameter types, which are what we want
``ty`` to validate against. Tools without an attached function (e.g.
``declaration_only`` tools) are skipped.
"""
stub_map: dict[str, Callable[..., Any]] = {}
for tool_obj in tools:
func = getattr(tool_obj, "func", None)
if callable(func):
stub_map[tool_obj.name] = func
return stub_map
def _effective_mounts(self) -> list[FileMount]:
"""Combine ``workspace_root`` (if set) with the explicit ``file_mounts``."""
mounts: list[FileMount] = []
if self._workspace_root is not None and WORKSPACE_MOUNT_PATH not in self._file_mounts:
mounts.append(
FileMount(
host_path=self._workspace_root,
mount_path=WORKSPACE_MOUNT_PATH,
mode="read-write",
write_bytes_limit=None,
)
)
mounts.extend(self._file_mounts.values())
return mounts
async def _run_code(self, *, code: str) -> list[Content]:
tools = list(self._managed_tools)
mounts = self._effective_mounts()
tool_map = self._build_tool_map(tools)
stub_map = self._build_type_stub_map(tools)
type_stubs = generate_type_stubs(stub_map) if stub_map else None
# Snapshot mtimes of host files in read-write mounts so we can later
# identify which files the sandbox actually touched.
pre_state = _snapshot_writable_mounts(mounts)
bridge = InlineCodeBridge(
tool_map,
type_stubs=type_stubs,
mounts=[_to_monty_mount(mount) for mount in mounts] or None,
resource_limits=self._resource_limits,
)
try:
result = await bridge.run(code)
except Exception as exc:
return [
Content.from_error(
message="Execution error",
error_details=f"{type(exc).__name__}: {exc}",
),
]
contents = _build_execution_contents(result=result)
contents.extend(_capture_written_files(mounts, pre_state))
return contents
def _build_execution_contents(*, result: dict[str, Any]) -> list[Content]:
stdout = str(result.get("stdout") or "").replace("\r\n", "\n")
output_value = result.get("output")
truncated = bool(result.get("truncated"))
outputs: list[Content] = []
if stdout:
text = stdout
if truncated:
text = f"{text}\n\n[stdout truncated]"
outputs.append(Content.from_text(text))
elif truncated:
outputs.append(Content.from_text("[stdout truncated]"))
if output_value is not None:
try:
serialized_output = json.dumps(output_value, ensure_ascii=False)
except (TypeError, ValueError):
serialized_output = repr(output_value)
outputs.append(Content.from_text(serialized_output))
if not outputs:
outputs.append(Content.from_text("Code executed successfully without output."))
return outputs
def _iter_real_files(root: Path) -> Iterator[Path]:
"""Walk ``root`` recursively, yielding only real (non-symlink) files.
``Path.rglob`` follows directory symlinks by default, which combined with
``Path.is_file()`` / ``Path.read_bytes()`` (both follow symlinks) would let
an attacker who controls the workspace pre-place a symlink to a host file
or directory and have our post-execution capture surface it. Skipping every
symlink at both the directory and file level closes that escape.
"""
stack: list[Path] = [root]
while stack:
current = stack.pop()
try:
entries = list(current.iterdir())
except OSError:
continue
for entry in entries:
try:
if entry.is_symlink():
continue
if entry.is_dir():
stack.append(entry)
elif entry.is_file():
yield entry
except OSError:
continue
def _snapshot_writable_mounts(mounts: Sequence[FileMount]) -> dict[str, dict[str, tuple[int, int]]]:
"""Capture (size, mtime_ns) for every real (non-symlink) host file under read-write mounts.
Returns ``{mount_path: {relative_posix_path: (size, mtime_ns)}}``. Used by
:func:`_capture_written_files` to detect new or modified files after the run.
Read-only and overlay mounts are skipped because their writes do not
propagate to the host. Symlinks (file or directory) are deliberately skipped
so an attacker cannot escape the mount by pre-placing a symlink to a host
path outside the workspace.
"""
snapshot: dict[str, dict[str, tuple[int, int]]] = {}
for mount in mounts:
if mount.mode != "read-write":
continue
host_root = Path(mount.host_path)
per_mount: dict[str, tuple[int, int]] = {}
for entry in _iter_real_files(host_root):
try:
stat = entry.lstat() # lstat: never follow symlinks (defensive)
except OSError:
continue
relative = entry.relative_to(host_root).as_posix()
per_mount[relative] = (int(stat.st_size), int(stat.st_mtime_ns))
snapshot[mount.mount_path] = per_mount
return snapshot
def _capture_written_files(
mounts: Sequence[FileMount],
pre_state: dict[str, dict[str, tuple[int, int]]],
) -> list[Content]:
"""Return :class:`Content` items for files the sandbox wrote during the run.
Mirrors Hyperlight's ``/output`` capture flow: any new or modified real
(non-symlink) file under a read-write mount is read back as binary and
surfaced as ``Content.from_data`` with a ``path`` annotation in
``additional_properties``. Symlinks are skipped at both directory and file
level so a malicious workspace cannot trick us into capturing host files
outside the configured mount root.
"""
captured: list[Content] = []
for mount in mounts:
if mount.mode != "read-write":
continue
host_root = Path(mount.host_path)
before = pre_state.get(mount.mount_path, {})
for entry in sorted(_iter_real_files(host_root)):
try:
stat = entry.lstat()
except OSError:
continue
relative = entry.relative_to(host_root).as_posix()
current = (int(stat.st_size), int(stat.st_mtime_ns))
if before.get(relative) == current:
continue # Unchanged.
sandbox_path = f"{mount.mount_path.rstrip('/')}/{relative}"
if stat.st_size > MAX_CAPTURED_FILE_BYTES:
captured.append(
Content.from_text(
f"[file {sandbox_path} omitted: {stat.st_size} bytes "
f"exceeds MAX_CAPTURED_FILE_BYTES={MAX_CAPTURED_FILE_BYTES}]"
)
)
continue
try:
# _iter_real_files already excluded symlinks at every level of
# the walk; reading the file here is safe.
data = entry.read_bytes()
except OSError:
continue
media_type = mimetypes.guess_type(entry.name)[0] or "application/octet-stream"
captured.append(
Content.from_data(
data=data,
media_type=media_type,
additional_properties={"path": sandbox_path},
)
)
return captured
@@ -0,0 +1,125 @@
# Copyright (c) Microsoft. All rights reserved.
"""Dynamic CodeAct instructions and execute_code tool descriptions for Monty."""
from __future__ import annotations
from collections.abc import Sequence
from agent_framework import FunctionTool
from ._types import FileMount
def _format_tool_summaries(tools: Sequence[FunctionTool]) -> str:
if not tools:
return "- No tools are currently registered."
lines: list[str] = []
for tool_obj in tools:
parameters = tool_obj.parameters().get("properties", {})
parameter_names = [name for name in parameters if isinstance(name, str)]
parameter_summary = ", ".join(parameter_names) if parameter_names else "none"
description = str(tool_obj.description or "").strip() or "No description provided."
lines.append(f"- `{tool_obj.name}`: {description} Parameters: {parameter_summary}.")
return "\n".join(lines)
def _format_filesystem_capabilities(mounts: Sequence[FileMount]) -> str:
if not mounts:
return (
"Filesystem access is unavailable. OS-level paths raise `PermissionError`. "
"If you need files, ask the agent operator to configure `workspace_root` or `file_mounts`."
)
lines = ["Filesystem access is enabled. Read and write paths via `pathlib.Path(...)` (or `os.path`)."]
lines.append("Configured mounts:")
for mount in mounts:
cap = ""
if mount.write_bytes_limit is not None:
cap = f", write cap {mount.write_bytes_limit} bytes"
lines.append(f"- `{mount.mount_path}` ({mount.mode}{cap})")
writable = [mount for mount in mounts if mount.mode == "read-write"]
if writable:
writable_paths = ", ".join(f"`{m.mount_path}`" for m in writable)
lines.append(
f"Files written to {writable_paths} are returned to the caller as attached files; "
"use these paths for any output artifacts."
)
return "\n".join(lines)
def build_codeact_instructions(
*,
tools: Sequence[FunctionTool],
tools_visible_to_model: bool,
mounts: Sequence[FileMount] = (),
) -> str:
"""Build dynamic CodeAct instructions for the effective Monty tool set."""
tool_summaries = _format_tool_summaries(tools)
filesystem_text = _format_filesystem_capabilities(mounts)
usage_note = (
"Some tools may also appear directly, but prefer `execute_code` whenever you need to combine "
"Python control flow with sandbox tool calls."
if tools_visible_to_model
else "Provider-owned sandbox tools are not exposed separately; use `execute_code` when you need them."
)
return f"""You have one primary tool: `execute_code`.
Inside `execute_code`, call registered tools directly as async functions:
`result = await tool_name(param=value)`. Always use `await` and keyword arguments.
Your code is type-checked against the tool signatures below before execution.
`await call_tool('name', **kwargs)` is also supported as a fallback but is not type-checked.
For fan-out, use `asyncio.gather`:
`results = await asyncio.gather(tool_a(...), tool_b(...))`.
Surface results to the caller via `print(...)` (captured and returned as text)
or by ending the code with an expression whose value is JSON-encodable - the
value of the final expression is returned alongside captured stdout.
Filesystem capabilities:
{filesystem_text}
Registered tools:
{tool_summaries}
Prefer a single `execute_code` call per request when possible, combining
multiple tool calls with Python control flow.
{usage_note}
"""
def build_execute_code_description(
*,
tools: Sequence[FunctionTool],
mounts: Sequence[FileMount] = (),
) -> str:
"""Build the dynamic ``execute_code`` tool description for standalone usage."""
tool_summaries = _format_tool_summaries(tools)
filesystem_text = _format_filesystem_capabilities(mounts)
return f"""Execute Python code in a Monty interpreter.
Inside the sandbox, call registered tools directly as typed async functions:
`result = await tool_name(param=value)`. Always use `await` and keyword arguments.
Code is type-checked against tool signatures before execution.
`await call_tool('name', **kwargs)` is also supported as a fallback.
For fan-out, use `asyncio.gather`:
`results = await asyncio.gather(tool_a(...), tool_b(...))`.
Filesystem capabilities:
{filesystem_text}
Registered tools:
{tool_summaries}
Surface results via `print(...)` (captured and returned as text) or by ending
with an expression whose value is JSON-encodable.
"""
@@ -0,0 +1,327 @@
# Copyright (c) Microsoft. All rights reserved.
"""Inline (non-durable) Monty execution bridge and type-stub generation.
Adapted from https://github.com/anthonychu/maf-codeact-monty-python.
"""
from __future__ import annotations
import asyncio
import inspect
import keyword
import types
import typing
from collections.abc import Callable, Sequence
from typing import Annotated, Any, cast, get_type_hints
MAX_PRINT_OUTPUT_CHARS = 8192
# Prelude injected into all Monty code so `asyncio.gather` works for fan-out.
_CODEACT_PRELUDE = """\
import asyncio
"""
def _ensure_json_value(value: Any) -> Any:
if value is None or isinstance(value, (str, bool, int)):
return value
if isinstance(value, float):
if value != value or value in (float("inf"), float("-inf")):
raise ValueError("Non-finite floating point values are not JSON-safe.")
return value
if isinstance(value, (list, tuple)):
items = cast("list[object] | tuple[object, ...]", value)
return [_ensure_json_value(item) for item in items]
if isinstance(value, dict):
as_dict = cast("dict[object, object]", value)
return {str(k): _ensure_json_value(v) for k, v in as_dict.items()}
raise ValueError(f"Value of type {type(value).__name__} is not JSON-safe.")
def _external_error(exc: Exception) -> dict[str, str]:
return {"exc_type": type(exc).__name__, "message": str(exc)}
def _parse_call_tool(args: tuple[Any, ...], kwargs: dict[str, Any]) -> tuple[str, dict[str, Any]]:
if not args:
raise ValueError("call_tool requires a tool name as the first argument.")
name = args[0]
if not isinstance(name, str) or not name:
raise ValueError("Tool name must be a non-empty string.")
if len(args) > 1:
raise ValueError(
"call_tool accepts only the tool name as a positional argument. Use keyword arguments for parameters."
)
return name, dict(kwargs)
def _build_code(code: str) -> str:
return f"{_CODEACT_PRELUDE}\n{code}"
def _python_type_repr(annotation: Any) -> str:
"""Convert a Python type annotation to its string representation for stubs."""
if annotation is inspect.Parameter.empty:
return "Any"
if annotation is type(None):
# ``None`` in annotations represents ``NoneType``; emit it literally so
# ``ty`` can validate ``Optional[X]`` / ``Union[..., None]`` / ``-> None``
# signatures correctly.
return "None"
origin = typing.get_origin(annotation)
if origin is Annotated:
args = typing.get_args(annotation)
return _python_type_repr(args[0]) if args else "Any"
if origin is not None:
args = typing.get_args(annotation)
# Normalize ``typing.Union[...]`` and PEP-604 ``X | Y`` to PEP-604 syntax so
# ``None`` is preserved across both forms.
if origin is typing.Union or origin is types.UnionType:
return " | ".join(_python_type_repr(a) for a in args) if args else "Any"
origin_name = getattr(origin, "__name__", None)
if origin_name is None:
origin_name = str(origin)
if origin_name.startswith("<class '"):
origin_name = origin_name[8:-2]
if args:
arg_strs = ", ".join(_python_type_repr(a) for a in args)
return f"{origin_name}[{arg_strs}]"
return origin_name
if hasattr(annotation, "__name__"):
return str(annotation.__name__)
return str(annotation)
def generate_type_stubs(tool_callables: dict[str, Callable[..., Any]]) -> str:
"""Generate Python type stub declarations for tools + DSL primitives.
Stubs are fed to Monty's ``type_check_stubs`` so ``ty`` can validate the
LLM-generated code against the actual tool signatures before any host
call runs.
Tools whose ``name`` is not a valid Python identifier are skipped because
their name cannot be safely splatted into stub source. The model can still
reach them via the ``call_tool("weird name", ...)`` fallback at runtime,
but they will not get type-checked stubs.
"""
lines: list[str] = [
"from typing import Any",
"",
"# DSL primitives",
"async def call_tool(name: str, **kwargs: Any) -> Any:",
" raise NotImplementedError()",
"",
"# Registered tools - call directly with typed arguments",
]
for name, func in sorted(tool_callables.items()):
if not name.isidentifier() or keyword.iskeyword(name):
# A non-identifier name (or a Python keyword) would inject invalid
# / dangerous syntax into the stub source. Skip stub generation;
# the tool stays reachable through ``call_tool(name, ...)``.
continue
try:
sig = inspect.signature(func)
hints = get_type_hints(func, include_extras=True)
except (ValueError, TypeError):
lines.append(f"async def {name}(**kwargs: Any) -> Any:")
lines.append(" raise NotImplementedError()")
lines.append("")
continue
params: list[str] = []
for param_name, param in sig.parameters.items():
annotation = hints.get(param_name, inspect.Parameter.empty)
type_str = _python_type_repr(annotation)
if param.default is not inspect.Parameter.empty:
params.append(f"{param_name}: {type_str} = ...")
else:
params.append(f"{param_name}: {type_str}")
return_annotation = hints.get("return", inspect.Parameter.empty)
return_str = _python_type_repr(return_annotation)
param_str = ", ".join(params)
lines.append(f"async def {name}({param_str}) -> {return_str}:")
lines.append(" raise NotImplementedError()")
lines.append("")
return "\n".join(lines)
class _PrintCollector:
"""Collect Monty stdout, capped at ``MAX_PRINT_OUTPUT_CHARS``."""
def __init__(self) -> None:
self.chunks: list[str] = []
self.truncated: bool = False
self._size: int = 0 # running character count to avoid O(n) per append
def __call__(self, stream: str, text: str) -> None:
if self.truncated:
return
remaining = MAX_PRINT_OUTPUT_CHARS - self._size
if remaining <= 0:
self.truncated = True
return
text_value = str(text)
if len(text_value) > remaining:
clipped = text_value[:remaining]
self.chunks.append(clipped)
self._size += len(clipped)
self.truncated = True
else:
self.chunks.append(text_value)
self._size += len(text_value)
@property
def output(self) -> str:
return "".join(self.chunks)
def load_monty() -> Any:
"""Import ``pydantic_monty`` lazily so unit tests can run without it.
Returns the module so callers can read ``Monty``, ``MontyComplete``,
``FunctionSnapshot``, ``FutureSnapshot``, ``NameLookupSnapshot`` from it.
"""
try:
import pydantic_monty # type: ignore[import-not-found]
except ImportError as exc:
raise RuntimeError(
"The `pydantic-monty` package is required to execute Monty CodeAct code. "
"Install it with `pip install pydantic-monty`."
) from exc
return pydantic_monty
class InlineCodeBridge:
"""Execute Monty code inline (non-durable).
Supports both ``await call_tool('name', ...)`` and direct ``await name(...)``
calls. When Monty yields a :class:`FutureSnapshot`, the bridge invokes the
registered host tools and resumes execution with the results.
"""
def __init__(
self,
tool_map: dict[str, Callable[..., Any]],
*,
type_stubs: str | None = None,
mounts: Sequence[Any] | None = None,
resource_limits: dict[str, Any] | None = None,
) -> None:
self.tool_map: dict[str, Callable[..., Any]] = dict(tool_map)
self.type_stubs: str | None = type_stubs
self._mounts = tuple(mounts) if mounts else ()
self._resource_limits = resource_limits
self._pending_calls: dict[int, tuple[str, dict[str, Any]]] = {}
async def run(self, code: str) -> dict[str, Any]:
if not isinstance(code, str) or not code.strip():
raise ValueError("Code must be a non-empty string.")
monty_module = load_monty()
Monty = monty_module.Monty
MontyComplete = monty_module.MontyComplete
FunctionSnapshot = monty_module.FunctionSnapshot
FutureSnapshot = monty_module.FutureSnapshot
NameLookupSnapshot = monty_module.NameLookupSnapshot
printer = _PrintCollector()
monty = Monty(
_build_code(code),
script_name="codeact.py",
type_check=self.type_stubs is not None,
type_check_stubs=self.type_stubs,
)
start_kwargs: dict[str, Any] = {"print_callback": printer}
if self._mounts:
start_kwargs["mount"] = list(self._mounts)
if self._resource_limits:
start_kwargs["limits"] = self._resource_limits
progress = monty.start(**start_kwargs)
while True:
if isinstance(progress, MontyComplete):
return {
"output": _ensure_json_value(progress.output),
"stdout": printer.output,
"truncated": printer.truncated,
}
if isinstance(progress, FunctionSnapshot):
progress = self._handle_function(progress)
continue
if isinstance(progress, FutureSnapshot):
progress = await self._handle_future(progress)
continue
if isinstance(progress, NameLookupSnapshot):
raise RuntimeError(f"Name lookup not supported: {progress.variable_name!r}")
raise RuntimeError(f"Unsupported Monty progress type: {type(progress).__name__}")
def _handle_function(self, snapshot: Any) -> Any:
if snapshot.is_os_function:
return snapshot.resume({
"exc_type": "PermissionError",
"message": "OS and filesystem calls are not available.",
})
function_name = str(snapshot.function_name)
if function_name in self.tool_map:
return self._schedule_direct_tool(snapshot, function_name)
if function_name == "call_tool":
return self._schedule_call_tool(snapshot)
return snapshot.resume({
"exc_type": "NameError",
"message": f"Function {function_name!r} is not available.",
})
def _schedule_direct_tool(self, snapshot: Any, name: str) -> Any:
# Positional args are rejected up-front by ``ty`` because the generated
# stubs declare every parameter as keyword-typed. Anything that slips
# through (e.g. tools with no signature inspection) is forwarded to the
# host tool as-is via kwargs only.
self._pending_calls[int(snapshot.call_id)] = (name, dict(snapshot.kwargs))
return snapshot.resume({"future": ...})
def _schedule_call_tool(self, snapshot: Any) -> Any:
try:
name, kwargs = _parse_call_tool(snapshot.args, snapshot.kwargs)
if name not in self.tool_map:
allowed = ", ".join(sorted(self.tool_map.keys())) or "<none>"
raise ValueError(f"Tool {name!r} is not registered. Available tools: {allowed}")
self._pending_calls[int(snapshot.call_id)] = (name, kwargs)
except Exception as exc:
return snapshot.resume(_external_error(exc))
return snapshot.resume({"future": ...})
async def _handle_future(self, snapshot: Any) -> Any:
pending_call_ids = [int(cid) for cid in snapshot.pending_call_ids]
if not pending_call_ids:
return snapshot.resume({})
entries: list[tuple[int, tuple[str, dict[str, Any]]]] = []
for cid in pending_call_ids:
if cid not in self._pending_calls:
raise RuntimeError(f"Unknown future call ID: {cid}")
entries.append((cid, self._pending_calls.pop(cid)))
tasks = [self._invoke_tool(cid, name, kwargs) for cid, (name, kwargs) in entries]
results = await asyncio.gather(*tasks)
resume_results: dict[int, Any] = dict(results)
return snapshot.resume(resume_results)
async def _invoke_tool(self, cid: int, name: str, kwargs: dict[str, Any]) -> tuple[int, Any]:
# Every entry in ``self.tool_map`` is produced by ``_make_tool_callback``
# as ``partial(FunctionTool.invoke, skip_parsing=True)``. ``FunctionTool.invoke``
# is always ``async def``, so a plain ``await`` is correct for every call and
# avoids relying on ``inspect.iscoroutinefunction(partial(...))``, which can
# return ``False`` for some ``partial`` shapes (cpython#98590) and would route
# the call through ``asyncio.to_thread`` with an unawaited coroutine return.
try:
result = await self.tool_map[name](**kwargs)
return cid, {"return_value": _ensure_json_value(result)}
except Exception as exc:
return cid, _external_error(exc)
@@ -0,0 +1,95 @@
# Copyright (c) Microsoft. All rights reserved.
"""``MontyCodeActProvider`` - context provider injecting Monty-backed CodeAct."""
from __future__ import annotations
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any
from agent_framework import AgentSession, ContextProvider, FunctionTool, SessionContext
from agent_framework._tools import ApprovalMode
from ._execute_code_tool import MontyExecuteCodeTool
from ._types import FileMount, FileMountInput
class MontyCodeActProvider(ContextProvider):
"""Inject a Monty-backed CodeAct surface using provider-owned tools.
Mirrors :class:`agent_framework_hyperlight.HyperlightCodeActProvider` for
the subset of capabilities that apply to the Monty interpreter:
``tools``, ``approval_mode``, ``workspace_root``, ``file_mounts``, and
``resource_limits`` (Monty-only).
"""
DEFAULT_SOURCE_ID = "monty_codeact"
def __init__(
self,
source_id: str = DEFAULT_SOURCE_ID,
*,
tools: FunctionTool | Callable[..., Any] | Sequence[FunctionTool | Callable[..., Any]] | None = None,
approval_mode: ApprovalMode | None = None,
workspace_root: str | Path | None = None,
file_mounts: FileMountInput | Sequence[FileMountInput] | None = None,
resource_limits: dict[str, Any] | None = None,
) -> None:
super().__init__(source_id)
self._execute_code_tool = MontyExecuteCodeTool(
tools=tools,
approval_mode=approval_mode,
workspace_root=workspace_root,
file_mounts=file_mounts,
resource_limits=resource_limits,
)
def add_tools(
self,
tools: FunctionTool | Callable[..., Any] | Sequence[FunctionTool | Callable[..., Any]],
) -> None:
"""Add provider-owned Monty tools."""
self._execute_code_tool.add_tools(tools)
def get_tools(self) -> list[FunctionTool]:
"""Return the provider-owned Monty tools."""
return self._execute_code_tool.get_tools()
def remove_tool(self, name: str) -> None:
"""Remove one provider-owned Monty tool by name."""
self._execute_code_tool.remove_tool(name)
def clear_tools(self) -> None:
"""Remove all provider-owned Monty tools."""
self._execute_code_tool.clear_tools()
def add_file_mounts(self, file_mounts: FileMountInput | Sequence[FileMountInput]) -> None:
"""Add provider-managed file mounts."""
self._execute_code_tool.add_file_mounts(file_mounts)
def get_file_mounts(self) -> list[FileMount]:
"""Return the provider-managed file mounts (excluding ``workspace_root``)."""
return self._execute_code_tool.get_file_mounts()
def remove_file_mount(self, mount_path: str) -> None:
"""Remove one provider-managed file mount by its sandbox path."""
self._execute_code_tool.remove_file_mount(mount_path)
def clear_file_mounts(self) -> None:
"""Remove all provider-managed file mounts."""
self._execute_code_tool.clear_file_mounts()
async def before_run(
self,
*,
agent: Any,
session: AgentSession | None,
context: SessionContext,
state: dict[str, Any],
) -> None:
"""Inject CodeAct instructions and a run-scoped execute_code tool before each run."""
run_tool = self._execute_code_tool.create_run_tool()
state[self.source_id] = run_tool.build_serializable_state()
context.extend_instructions(self.source_id, run_tool.build_instructions(tools_visible_to_model=False))
context.extend_tools(self.source_id, [run_tool])
@@ -0,0 +1,38 @@
# Copyright (c) Microsoft. All rights reserved.
"""Public types for ``agent-framework-monty``.
Mirrors ``agent_framework_hyperlight._types`` where the Monty runtime exposes
an equivalent concept so users can move between the two providers with minimal
churn.
"""
from __future__ import annotations
from pathlib import Path
from typing import Literal, NamedTuple, TypeAlias
#: Allowed Monty mount modes. ``overlay`` (the Monty default) buffers writes
#: in-memory and is therefore not visible to the host after execution.
#: ``read-only`` rejects writes. ``read-write`` writes through to the host
#: directory.
MountMode: TypeAlias = Literal["overlay", "read-only", "read-write"]
class FileMount(NamedTuple):
"""Map a host directory into the Monty sandbox.
Mirrors :class:`agent_framework_hyperlight.FileMount` with two extra
fields that surface Monty's underlying ``MountDir`` capabilities:
``mode`` selects read-only / read-write / overlay semantics, and
``write_bytes_limit`` caps the total bytes written through this mount.
"""
host_path: str | Path
mount_path: str
mode: MountMode = "overlay"
write_bytes_limit: int | None = None
FileMountHostPath: TypeAlias = str | Path
FileMountInput: TypeAlias = str | tuple[FileMountHostPath, str] | FileMount
+107
View File
@@ -0,0 +1,107 @@
[project]
name = "agent-framework-monty"
description = "Monty CodeAct integrations for Microsoft Agent Framework."
authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
version = "1.0.0a260518"
license-files = ["LICENSE"]
urls.homepage = "https://aka.ms/agent-framework"
urls.source = "https://github.com/microsoft/agent-framework/tree/main/python"
urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true"
urls.issues = "https://github.com/microsoft/agent-framework/issues"
classifiers = [
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Typing :: Typed",
]
dependencies = [
"agent-framework-core>=1.4.0,<2",
"pydantic-monty>=0,<0.1",
]
[tool.uv]
prerelease = "if-necessary-or-explicit"
environments = [
"sys_platform == 'darwin'",
"sys_platform == 'linux'",
"sys_platform == 'win32'"
]
[tool.uv-dynamic-versioning]
fallback-version = "0.0.0"
[tool.pytest.ini_options]
testpaths = 'tests'
addopts = "-ra -q -r fEX"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = []
timeout = 120
markers = [
"integration: marks tests as integration tests that require external services",
]
[tool.ruff]
extend = "../../pyproject.toml"
[tool.ruff.lint.per-file-ignores]
"tests/**" = ["D", "INP", "TD", "ERA001", "RUF", "S"]
[tool.coverage.run]
omit = [
"**/__init__.py"
]
[tool.pyright]
extends = "../../pyproject.toml"
include = ["agent_framework_monty"]
exclude = ['tests']
[tool.mypy]
plugins = ['pydantic.mypy']
strict = true
python_version = "3.10"
ignore_missing_imports = true
disallow_untyped_defs = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
[tool.bandit]
targets = ["agent_framework_monty"]
exclude_dirs = ["tests"]
[tool.poe]
executor.type = "uv"
include = "../../shared_tasks.toml"
[tool.poe.tasks.mypy]
help = "Run MyPy for this package."
cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_monty"
[tool.poe.tasks.test]
help = "Run the default unit test suite for this package."
cmd = 'pytest -m "not integration" --cov=agent_framework_monty --cov-report=term-missing:skip-covered tests'
[tool.poe.tasks.test-integration]
help = "Run integration tests for this package (requires pydantic-monty)."
cmd = 'pytest -m "integration" tests'
[tool.flit.module]
name = "agent_framework_monty"
[build-system]
requires = ["flit-core >= 3.11,<4.0"]
build-backend = "flit_core.buildapi"
@@ -0,0 +1,642 @@
# Copyright (c) Microsoft. All rights reserved.
"""Hermetic unit tests for ``agent_framework_monty``.
These tests inject a fake Monty runtime via ``monkeypatch`` so they run without
the real ``pydantic-monty`` package doing any work. End-to-end tests against
the real runtime live in ``test_monty_codeact_integration.py``.
"""
from __future__ import annotations
import json
import sys
import types
from collections.abc import Iterable, Iterator
from dataclasses import dataclass, field
from pathlib import Path
from typing import Annotated, Any
from unittest.mock import MagicMock
import pytest
from agent_framework import Content, FunctionTool, Message, tool
from agent_framework._sessions import SessionContext
from agent_framework_monty import MontyCodeActProvider, MontyExecuteCodeTool
from agent_framework_monty import _execute_code_tool as execute_code_module
from agent_framework_monty import _monty_bridge as bridge_module
# ---------------------------------------------------------------------------
# Fake Monty runtime - drop-in replacement for pydantic_monty
# ---------------------------------------------------------------------------
@dataclass
class _FakeMontyComplete:
output: Any = None
@dataclass
class _FakeFunctionSnapshot:
function_name: str
call_id: int
args: tuple[Any, ...] = ()
kwargs: dict[str, Any] = field(default_factory=dict)
is_os_function: bool = False
_script: _FakeScript | None = None
def resume(self, payload: Any) -> Any:
assert self._script is not None, "Snapshot must be attached to a script."
return self._script.advance(("function_resume", self, payload))
@dataclass
class _FakeFutureSnapshot:
pending_call_ids: list[int]
_script: _FakeScript | None = None
def resume(self, payload: Any) -> Any:
assert self._script is not None, "Snapshot must be attached to a script."
return self._script.advance(("future_resume", self, payload))
@dataclass
class _FakeNameLookupSnapshot:
variable_name: str
@dataclass
class _PrintAction:
"""Marker pushed onto a script to emit captured stdout via the print callback."""
text: str
class _FakeScript:
"""Replayable Monty progress script with a resume log."""
def __init__(self, items: Iterable[Any]) -> None:
self._queue: list[Any] = list(items)
self.resume_log: list[tuple[str, Any, Any]] = []
def attach(self, snapshot: Any) -> Any:
snapshot._script = self
return snapshot
def next_item(self) -> Any:
if not self._queue:
return _FakeMontyComplete(output=None)
item = self._queue.pop(0)
if isinstance(item, _FakeMontyComplete):
return item
if isinstance(item, _PrintAction):
return item
if isinstance(item, _FakeNameLookupSnapshot):
return item
return self.attach(item)
def advance(self, log_entry: tuple[str, Any, Any]) -> Any:
self.resume_log.append(log_entry)
return self.next_item()
_current_script: list[_FakeScript | None] = [None]
def _set_script(*items: Any) -> _FakeScript:
script = _FakeScript(items)
_current_script[0] = script
return script
def _get_script() -> _FakeScript:
script = _current_script[0]
assert script is not None, "Test must call _set_script(...) before running code."
return script
class _FakeMonty:
def __init__(
self,
code: str,
*,
script_name: str,
type_check: bool,
type_check_stubs: str | None,
) -> None:
self.code = code
self.script_name = script_name
self.type_check = type_check
self.type_check_stubs = type_check_stubs
self._script = _get_script()
def start(self, *, print_callback: Any) -> Any:
while True:
item = self._script.next_item()
if isinstance(item, _PrintAction):
print_callback("stdout", item.text)
continue
return item
@pytest.fixture(autouse=True)
def fake_monty_module(monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
"""Install a fake ``pydantic_monty`` module for the duration of each test."""
fake = types.ModuleType("pydantic_monty")
fake.Monty = _FakeMonty # type: ignore[attr-defined]
fake.MontyComplete = _FakeMontyComplete # type: ignore[attr-defined]
fake.FunctionSnapshot = _FakeFunctionSnapshot # type: ignore[attr-defined]
fake.FutureSnapshot = _FakeFutureSnapshot # type: ignore[attr-defined]
fake.NameLookupSnapshot = _FakeNameLookupSnapshot # type: ignore[attr-defined]
monkeypatch.setitem(sys.modules, "pydantic_monty", fake)
_current_script[0] = None
yield
_current_script[0] = None
# ---------------------------------------------------------------------------
# Sample tools used across tests
# ---------------------------------------------------------------------------
@tool
def add_tool(
a: Annotated[int, "First addend"],
b: Annotated[int, "Second addend"],
) -> int:
"""Add two integers."""
return a + b
@tool
def mul_tool(
a: Annotated[int, "First factor"],
b: Annotated[int, "Second factor"],
) -> int:
"""Multiply two integers."""
return a * b
@tool(approval_mode="always_require")
def dangerous_tool(payload: Annotated[str, "Anything"]) -> str:
"""A tool that always requires approval."""
return payload
# ---------------------------------------------------------------------------
# MontyExecuteCodeTool tests
# ---------------------------------------------------------------------------
def test_tool_construction_defaults() -> None:
monty_tool = MontyExecuteCodeTool()
assert monty_tool.name == "execute_code"
assert monty_tool.approval_mode == "never_require"
assert monty_tool.get_tools() == []
def test_add_remove_clear_tools_round_trip() -> None:
monty_tool = MontyExecuteCodeTool()
monty_tool.add_tools([add_tool, mul_tool])
assert [t.name for t in monty_tool.get_tools()] == ["add_tool", "mul_tool"]
monty_tool.remove_tool("add_tool")
assert [t.name for t in monty_tool.get_tools()] == ["mul_tool"]
with pytest.raises(KeyError):
monty_tool.remove_tool("missing")
monty_tool.clear_tools()
assert monty_tool.get_tools() == []
def test_approval_required_tool_gates_execute_code() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
assert monty_tool.approval_mode == "never_require"
monty_tool.add_tools([dangerous_tool])
assert monty_tool.approval_mode == "always_require"
monty_tool.remove_tool("dangerous_tool")
assert monty_tool.approval_mode == "never_require"
def test_default_approval_mode_always_require_is_sticky() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add_tool], approval_mode="always_require")
assert monty_tool.approval_mode == "always_require"
monty_tool.clear_tools()
assert monty_tool.approval_mode == "always_require"
def test_dynamic_description_reflects_registered_tools() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
description = monty_tool.description
assert "add_tool" in description
assert "Monty" in description
monty_tool.add_tools([mul_tool])
description_updated = monty_tool.description
assert "mul_tool" in description_updated
def test_create_run_tool_snapshots_current_state() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add_tool], approval_mode="never_require")
run_tool = monty_tool.create_run_tool()
assert run_tool is not monty_tool
assert [t.name for t in run_tool.get_tools()] == ["add_tool"]
assert run_tool.approval_mode == monty_tool.approval_mode
# Mutating the original must not leak into the snapshot.
monty_tool.add_tools([mul_tool])
assert [t.name for t in run_tool.get_tools()] == ["add_tool"]
def test_build_serializable_state_matches_effective_config() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add_tool, dangerous_tool])
state = monty_tool.build_serializable_state()
assert state["runtime"] == "monty"
assert state["approval_mode"] == "always_require"
assert set(state["tool_names"]) == {"add_tool", "dangerous_tool"}
assert state["workspace_root"] is None
assert state["file_mounts"] == []
assert state["resource_limits"] is None
def test_file_mounts_normalized_and_round_tripped(tmp_path: Path) -> None:
from agent_framework_monty import FileMount
from agent_framework_monty._execute_code_tool import _normalize_mount_path
host_a = tmp_path / "a"
host_a.mkdir()
host_b = tmp_path / "b"
host_b.mkdir()
monty_tool = MontyExecuteCodeTool(
file_mounts=[
str(host_a), # shorthand: same path on both sides
(str(host_b), "/work"), # explicit tuple
FileMount(host_path=host_a, mount_path="/data", mode="read-only"),
],
)
mounts = monty_tool.get_file_mounts()
by_mount = {m.mount_path: m for m in mounts}
# The shorthand string is normalized through _normalize_mount_path (POSIX-style),
# so on Windows `C:\\...` becomes `/C:/...`. Compare against the same normalizer.
shorthand_key = _normalize_mount_path(str(host_a))
assert set(by_mount) == {shorthand_key, "/work", "/data"}
assert by_mount["/work"].host_path == host_b.resolve()
assert by_mount["/data"].mode == "read-only"
assert by_mount[shorthand_key].mode == "overlay" # default
def test_workspace_root_auto_mounts_at_input(tmp_path: Path) -> None:
monty_tool = MontyExecuteCodeTool(workspace_root=tmp_path)
mounts = monty_tool._effective_mounts()
assert any(m.mount_path == "/input" and m.mode == "read-write" for m in mounts)
def test_workspace_root_yields_to_explicit_input_mount(tmp_path: Path) -> None:
from agent_framework_monty import FileMount
explicit = tmp_path / "explicit"
explicit.mkdir()
monty_tool = MontyExecuteCodeTool(
workspace_root=tmp_path,
file_mounts=[FileMount(host_path=explicit, mount_path="/input", mode="read-only")],
)
input_mounts = [m for m in monty_tool._effective_mounts() if m.mount_path == "/input"]
assert len(input_mounts) == 1
assert input_mounts[0].mode == "read-only"
assert input_mounts[0].host_path == explicit.resolve()
def test_remove_file_mount_raises_on_missing() -> None:
monty_tool = MontyExecuteCodeTool()
with pytest.raises(KeyError):
monty_tool.remove_file_mount("/never-added")
def test_dynamic_description_mentions_filesystem_when_mounts_configured(tmp_path: Path) -> None:
monty_tool = MontyExecuteCodeTool(workspace_root=tmp_path)
description = monty_tool.description
assert "Filesystem access is enabled" in description
assert "/input" in description
def test_dynamic_description_default_mentions_no_filesystem() -> None:
monty_tool = MontyExecuteCodeTool()
description = monty_tool.description
assert "Filesystem access is unavailable" in description
def test_resource_limits_round_trip() -> None:
monty_tool = MontyExecuteCodeTool(resource_limits={"max_duration_secs": 5.0})
assert monty_tool.resource_limits == {"max_duration_secs": 5.0}
state = monty_tool.build_serializable_state()
assert state["resource_limits"] == {"max_duration_secs": 5.0}
def test_build_instructions_includes_registered_tools() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
instructions = monty_tool.build_instructions(tools_visible_to_model=False)
assert "add_tool" in instructions
assert "execute_code" in instructions
assert "asyncio.gather" in instructions
def test_execute_code_filtered_out_when_added_as_tool() -> None:
spurious = FunctionTool(
name="execute_code",
description="should not appear",
func=lambda: None,
)
monty_tool = MontyExecuteCodeTool(tools=[spurious, add_tool])
assert [t.name for t in monty_tool.get_tools()] == ["add_tool"]
# ---------------------------------------------------------------------------
# _run_code behavior with the fake Monty runtime
# ---------------------------------------------------------------------------
async def test_run_code_with_no_tools_returns_default_text() -> None:
_set_script(_FakeMontyComplete(output=None))
monty_tool = MontyExecuteCodeTool()
result = await monty_tool._run_code(code="None")
assert len(result) == 1
assert isinstance(result[0], Content)
async def test_run_code_surfaces_stdout_and_output() -> None:
_set_script(_PrintAction("hello\n"), _FakeMontyComplete(output=42))
monty_tool = MontyExecuteCodeTool()
result = await monty_tool._run_code(code="print('hello')")
text_contents = [c for c in result if c.type == "text"]
assert any("hello" in (c.text or "") for c in text_contents)
assert any(
(c.text or "").strip() and json.loads(c.text or "null") == 42
for c in text_contents
if (c.text or "").strip().isdigit()
)
async def test_run_code_direct_typed_call_invokes_registered_tool() -> None:
func_snapshot = _FakeFunctionSnapshot(
function_name="add_tool",
call_id=1,
kwargs={"a": 2, "b": 3},
)
future_snapshot = _FakeFutureSnapshot(pending_call_ids=[1])
script = _set_script(func_snapshot, future_snapshot, _FakeMontyComplete(output=None))
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
await monty_tool._run_code(code="await add_tool(a=2, b=3)")
payloads = [payload for _, _, payload in script.resume_log]
assert {"future": ...} in payloads
final_resume = next(p for p in payloads if isinstance(p, dict) and 1 in p)
assert final_resume[1] == {"return_value": 5}
async def test_run_code_call_tool_fallback_invokes_registered_tool() -> None:
func_snapshot = _FakeFunctionSnapshot(
function_name="call_tool",
call_id=7,
args=("add_tool",),
kwargs={"a": 4, "b": 8},
)
future_snapshot = _FakeFutureSnapshot(pending_call_ids=[7])
script = _set_script(func_snapshot, future_snapshot, _FakeMontyComplete(output=None))
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
await monty_tool._run_code(code="await call_tool('add_tool', a=4, b=8)")
payloads = [payload for _, _, payload in script.resume_log]
final_resume = next(p for p in payloads if isinstance(p, dict) and 7 in p)
assert final_resume[7] == {"return_value": 12}
async def test_run_code_unknown_tool_returns_nameerror_resume() -> None:
func_snapshot = _FakeFunctionSnapshot(
function_name="does_not_exist",
call_id=11,
)
script = _set_script(func_snapshot, _FakeMontyComplete(output=None))
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
await monty_tool._run_code(code="await does_not_exist()")
payloads = [payload for _, _, payload in script.resume_log]
assert any(isinstance(p, dict) and p.get("exc_type") == "NameError" for p in payloads)
async def test_run_code_os_function_is_rejected_with_permissionerror() -> None:
os_snapshot = _FakeFunctionSnapshot(
function_name="os.listdir",
call_id=12,
is_os_function=True,
)
script = _set_script(os_snapshot, _FakeMontyComplete(output=None))
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
await monty_tool._run_code(code="import os; os.listdir('.')")
payloads = [payload for _, _, payload in script.resume_log]
assert any(isinstance(p, dict) and p.get("exc_type") == "PermissionError" for p in payloads)
async def test_when_any_returns_nameerror_now_that_it_is_removed() -> None:
"""`when_any` is no longer part of the DSL and should resolve to a NameError."""
func_snapshot = _FakeFunctionSnapshot(
function_name="when_any",
call_id=99,
args=([{"tool": "add_tool", "kwargs": {"a": 1, "b": 2}}],),
)
script = _set_script(func_snapshot, _FakeMontyComplete(output=None))
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
await monty_tool._run_code(code="await when_any([{'tool': 'add_tool', 'kwargs': {'a': 1, 'b': 2}}])")
payloads = [payload for _, _, payload in script.resume_log]
assert any(isinstance(p, dict) and p.get("exc_type") == "NameError" for p in payloads)
async def test_run_code_call_tool_with_unregistered_name_returns_error() -> None:
func_snapshot = _FakeFunctionSnapshot(
function_name="call_tool",
call_id=20,
args=("missing",),
kwargs={},
)
script = _set_script(func_snapshot, _FakeMontyComplete(output=None))
monty_tool = MontyExecuteCodeTool(tools=[add_tool])
await monty_tool._run_code(code="await call_tool('missing')")
payloads = [payload for _, _, payload in script.resume_log]
assert any(
isinstance(p, dict) and p.get("exc_type") == "ValueError" and "Tool 'missing'" in p.get("message", "")
for p in payloads
)
async def test_run_code_returns_error_content_on_runtime_failure(monkeypatch: pytest.MonkeyPatch) -> None:
class _BoomBridge:
def __init__(self, tool_map: Any, **_: Any) -> None:
pass
async def run(self, code: str) -> dict[str, Any]:
raise RuntimeError("boom")
monkeypatch.setattr(execute_code_module, "InlineCodeBridge", _BoomBridge)
monty_tool = MontyExecuteCodeTool()
result = await monty_tool._run_code(code="x = 1")
assert len(result) == 1
assert result[0].type == "error"
assert "boom" in (result[0].error_details or "")
# ---------------------------------------------------------------------------
# MontyCodeActProvider tests
# ---------------------------------------------------------------------------
async def test_provider_injects_execute_code_tool_and_instructions() -> None:
provider = MontyCodeActProvider(tools=[add_tool])
context = SessionContext(input_messages=[Message(role="user", contents=[Content.from_text("hi")])])
state: dict[str, Any] = {}
await provider.before_run(agent=MagicMock(), session=None, context=context, state=state)
assert state["monty_codeact"]["tool_names"] == ["add_tool"]
assert any("add_tool" in instruction for instruction in context.instructions)
assert len(context.tools) == 1
assert isinstance(context.tools[0], MontyExecuteCodeTool)
# The injected tool is a per-run snapshot, not the provider's stored copy.
assert context.tools[0] is not provider._execute_code_tool # type: ignore[attr-defined]
def test_provider_delegates_tool_management_to_internal_tool() -> None:
provider = MontyCodeActProvider()
provider.add_tools([add_tool, mul_tool])
assert [t.name for t in provider.get_tools()] == ["add_tool", "mul_tool"]
provider.remove_tool("add_tool")
assert [t.name for t in provider.get_tools()] == ["mul_tool"]
provider.clear_tools()
assert provider.get_tools() == []
# ---------------------------------------------------------------------------
# generate_type_stubs - signature smoke test
# ---------------------------------------------------------------------------
def test_generate_type_stubs_emits_dsl_and_tool_signatures() -> None:
def custom(x: int, y: str = "z") -> bool:
"""Stub-test tool."""
return True
stubs = bridge_module.generate_type_stubs({"custom": custom})
assert "async def call_tool(name: str, **kwargs: Any) -> Any:" in stubs
assert "async def custom(x: int, y: str = ...) -> bool:" in stubs
assert "when_any" not in stubs
def test_generate_type_stubs_preserves_none_and_optional() -> None:
def nullable_return(x: int) -> None:
"""Returns nothing."""
return
def optional_param(x: int | None = None) -> bool: # noqa: UP045 - intentional
"""Optional via typing.Optional."""
return x is None
def union_param(x: int | str | None) -> str: # noqa: UP007 - intentional
"""Union with None."""
return str(x)
stubs = bridge_module.generate_type_stubs({
"nullable_return": nullable_return,
"optional_param": optional_param,
"union_param": union_param,
})
# ``None`` return must round-trip as None, not Any.
assert "async def nullable_return(x: int) -> None:" in stubs
# ``Optional[X]`` is ``Union[X, None]`` at runtime; preserve None.
assert "async def optional_param(x: int | None = ...) -> bool:" in stubs
# Multi-arm union with None.
assert "async def union_param(x: int | str | None) -> str:" in stubs
def test_generate_type_stubs_skips_non_identifier_tool_names() -> None:
"""Tool names that are not valid Python identifiers must not be splatted into stub source.
The model can still reach them via ``call_tool("weird-name", ...)`` at
runtime; they just don't get type-checked stubs.
"""
def evil(x: int) -> int:
return x
def normal(x: int) -> int:
return x
stubs = bridge_module.generate_type_stubs({
# Hyphens are not valid identifier chars.
"weird-name": evil,
# Newlines in the name would inject arbitrary stub source.
"broken\n pass\nasync def injected": evil,
# Python keywords are valid identifiers per ``str.isidentifier()`` but
# would still produce uncompilable stubs.
"async": evil,
# Real tool that should still appear.
"normal": normal,
})
assert "async def normal(x: int) -> int:" in stubs
assert "weird-name" not in stubs
assert "injected" not in stubs
assert "async def async(" not in stubs
async def test_invoke_tool_awaits_partial_wrapped_async_method() -> None:
"""A FunctionTool callback registered via partial(FunctionTool.invoke, ...) must be awaited.
Regression for PR #5915 review feedback: relying on ``inspect.iscoroutinefunction``
to choose between ``await`` and ``asyncio.to_thread`` is fragile for
``functools.partial`` wrappers (cpython#98590) and would surface the
returned coroutine as a JSON-serialization error instead of the real
tool result. The bridge must always ``await`` entries in ``self.tool_map``.
"""
from functools import partial
from agent_framework_monty._monty_bridge import InlineCodeBridge
@tool
def adder(a: Annotated[int, ""], b: Annotated[int, ""]) -> int:
"""Add."""
return a + b
# Mirrors what _make_tool_callback returns.
cb = partial(adder.invoke, skip_parsing=True)
bridge = InlineCodeBridge({"adder": cb})
cid, payload = await bridge._invoke_tool(7, "adder", {"a": 6, "b": 7})
assert cid == 7
assert payload == {"return_value": 13}, payload
@@ -0,0 +1,601 @@
# Copyright (c) Microsoft. All rights reserved.
"""Integration tests for ``agent_framework_monty`` exercising the real Monty runtime.
These tests import the real ``pydantic-monty`` package and run actual Python
code through it via :class:`MontyExecuteCodeTool`. They are marked
``@pytest.mark.integration`` and are skipped automatically when
``pydantic_monty`` is unavailable.
"""
from __future__ import annotations
import asyncio
import importlib.util
import time
from typing import Annotated, Any
from unittest.mock import MagicMock
import pytest
from agent_framework import Agent, Content, Message, tool
from agent_framework._sessions import SessionContext
from agent_framework_monty import MontyCodeActProvider, MontyExecuteCodeTool
def _monty_integration_skip_reason() -> str | None:
if importlib.util.find_spec("pydantic_monty") is None:
return "pydantic-monty is not installed."
return None
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(
_monty_integration_skip_reason() is not None,
reason=_monty_integration_skip_reason() or "Monty integration tests are disabled.",
),
]
# ---------------------------------------------------------------------------
# Sample tools
# ---------------------------------------------------------------------------
@tool
def add(
a: Annotated[int, "First addend"],
b: Annotated[int, "Second addend"],
) -> int:
"""Return ``a + b``."""
return a + b
@tool
def multiply(
a: Annotated[int, "First factor"],
b: Annotated[int, "Second factor"],
) -> int:
"""Return ``a * b``."""
return a * b
@tool
async def async_echo(value: Annotated[str, "Value to echo"]) -> str:
"""Return ``value`` after a no-op await."""
await asyncio.sleep(0)
return value
def _async_slow_factory(label: str, delay: float) -> Any:
@tool(name=f"slow_{label}")
async def slow(value: Annotated[int, "Input"]) -> int:
"""Sleep asynchronously, then return value untouched."""
await asyncio.sleep(delay)
return value
return slow
@tool(approval_mode="always_require")
def restricted(payload: Annotated[str, "Any text"]) -> str:
"""A tool that always requires approval."""
return payload
def _text_outputs(contents: list[Content]) -> list[str]:
return [c.text or "" for c in contents if c.type == "text"]
# ---------------------------------------------------------------------------
# Basic execution
# ---------------------------------------------------------------------------
async def test_plain_python_print_round_trips() -> None:
monty_tool = MontyExecuteCodeTool()
result = await monty_tool._run_code(code="print('hello world')")
texts = _text_outputs(result)
assert any("hello world" in text for text in texts)
async def test_last_expression_value_is_returned() -> None:
monty_tool = MontyExecuteCodeTool()
result = await monty_tool._run_code(code="5 + 7")
texts = _text_outputs(result)
assert any(text.strip() == "12" for text in texts)
# ---------------------------------------------------------------------------
# Tool dispatch
# ---------------------------------------------------------------------------
async def test_direct_typed_tool_call_invokes_host() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add])
result = await monty_tool._run_code(code="print(await add(a=2, b=3))")
texts = _text_outputs(result)
assert any("5" in text for text in texts)
async def test_call_tool_fallback_invokes_host() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add])
result = await monty_tool._run_code(code="print(await call_tool('add', a=4, b=8))")
texts = _text_outputs(result)
assert any("12" in text for text in texts)
async def test_async_host_tool_is_awaited() -> None:
monty_tool = MontyExecuteCodeTool(tools=[async_echo])
result = await monty_tool._run_code(code="print(await async_echo(value='ping'))")
texts = _text_outputs(result)
assert any("ping" in text for text in texts)
# ---------------------------------------------------------------------------
# Concurrency
# ---------------------------------------------------------------------------
async def test_asyncio_gather_fans_out_tool_calls_concurrently() -> None:
"""Two async tools dispatched via ``asyncio.gather`` should run on the event loop in parallel.
Sync tools cannot fan out (FunctionTool.invoke runs them inline on the event loop),
so this test uses async host tools to verify the bridge's gather pipeline does
not introduce extra serialization.
"""
slow_a = _async_slow_factory("a", delay=0.25)
slow_b = _async_slow_factory("b", delay=0.25)
monty_tool = MontyExecuteCodeTool(tools=[slow_a, slow_b])
code = """
results = await asyncio.gather(slow_a(value=1), slow_b(value=2))
print(results)
"""
start = time.perf_counter()
result = await monty_tool._run_code(code=code)
elapsed = time.perf_counter() - start
texts = _text_outputs(result)
assert any("[1, 2]" in text for text in texts)
# Allow some scheduling slack but verify it's noticeably less than sequential (~0.5s).
assert elapsed < 0.45, f"Expected concurrent execution; took {elapsed:.3f}s"
# ---------------------------------------------------------------------------
# Sandbox safety + type checking
# ---------------------------------------------------------------------------
async def test_type_check_rejects_wrong_argument_type() -> None:
invocation_count = {"count": 0}
@tool
def typed_add(
a: Annotated[int, "First"],
b: Annotated[int, "Second"],
) -> int:
"""Add two ints; records invocations."""
invocation_count["count"] += 1
return a + b
monty_tool = MontyExecuteCodeTool(tools=[typed_add])
result = await monty_tool._run_code(code="print(await typed_add(a='not an int', b=3))")
texts = _text_outputs(result)
errors = [c for c in result if c.type == "error"]
# Either ty raises and surfaces as an error Content, or Monty reports the typing error in stdout.
assert errors or any("type" in text.lower() or "monty" in text.lower() for text in texts)
assert invocation_count["count"] == 0
async def test_os_calls_are_blocked() -> None:
monty_tool = MontyExecuteCodeTool()
code = """
try:
import os
os.listdir('/')
print('LEAKED')
except PermissionError as exc:
print('blocked:', exc)
except Exception as exc:
print('other:', type(exc).__name__)
"""
result = await monty_tool._run_code(code=code)
texts = _text_outputs(result)
assert not any("LEAKED" in text for text in texts)
assert any("blocked" in text or "PermissionError" in text or "other" in text for text in texts)
async def test_unknown_tool_call_returns_clean_error() -> None:
monty_tool = MontyExecuteCodeTool(tools=[add])
code = """
try:
await call_tool('missing')
except Exception as exc:
print('err:', type(exc).__name__, str(exc))
"""
result = await monty_tool._run_code(code=code)
texts = _text_outputs(result)
assert any("missing" in text for text in texts)
# ---------------------------------------------------------------------------
# Print capture
# ---------------------------------------------------------------------------
async def test_print_truncation_caps_output() -> None:
monty_tool = MontyExecuteCodeTool()
# Emit more than MAX_PRINT_OUTPUT_CHARS bytes of output.
code = """
for _ in range(2000):
print('X' * 64)
"""
result = await monty_tool._run_code(code=code)
texts = _text_outputs(result)
combined = "\n".join(texts)
assert len(combined) <= 9000 # MAX_PRINT_OUTPUT_CHARS=8192 plus a small truncation marker
assert "[stdout truncated]" in combined
# ---------------------------------------------------------------------------
# Filesystem (workspace_root, file_mounts, output capture, resource limits)
# ---------------------------------------------------------------------------
async def test_workspace_root_reads_seed_files_from_host(tmp_path: Any) -> None:
seed = tmp_path / "seed.txt"
seed.write_text("hello from host", encoding="utf-8")
monty_tool = MontyExecuteCodeTool(workspace_root=tmp_path)
code = """
import pathlib
data = pathlib.Path('/input/seed.txt').read_text()
print(data)
"""
result = await monty_tool._run_code(code=code)
texts = _text_outputs(result)
assert any("hello from host" in text for text in texts)
async def test_workspace_root_writes_are_captured_as_content(tmp_path: Any) -> None:
monty_tool = MontyExecuteCodeTool(workspace_root=tmp_path)
code = """
import pathlib
pathlib.Path('/input/report.txt').write_text('result-payload')
print('wrote report')
"""
result = await monty_tool._run_code(code=code)
data_contents = [c for c in result if c.type == "data"]
assert len(data_contents) == 1, [c.type for c in result]
written = data_contents[0]
# Content.from_data stores bytes as a base64-encoded data: URI.
import base64
assert written.uri is not None
payload = written.uri.split(",", 1)[1]
assert base64.b64decode(payload) == b"result-payload"
assert (written.additional_properties or {}).get("path") == "/input/report.txt"
# And the file actually landed on the host filesystem (read-write mode).
assert (tmp_path / "report.txt").read_text() == "result-payload"
async def test_read_only_mount_writes_are_rejected_and_not_captured(tmp_path: Any) -> None:
from agent_framework_monty import FileMount
seed = tmp_path / "seed.txt"
seed.write_text("ro-content", encoding="utf-8")
monty_tool = MontyExecuteCodeTool(
file_mounts=[FileMount(host_path=tmp_path, mount_path="/ro", mode="read-only")],
)
code = """
import pathlib
print(pathlib.Path('/ro/seed.txt').read_text())
try:
pathlib.Path('/ro/should-not-exist.txt').write_text('nope')
print('LEAKED')
except Exception as exc:
print('write blocked:', type(exc).__name__)
"""
result = await monty_tool._run_code(code=code)
texts = _text_outputs(result)
assert any("ro-content" in t for t in texts)
assert not any("LEAKED" in t for t in texts)
# No write went to host; no captured Content for the rejected write.
assert not (tmp_path / "should-not-exist.txt").exists()
assert not any(c.type == "data" for c in result)
async def test_overlay_mount_writes_do_not_persist_to_host(tmp_path: Any) -> None:
from agent_framework_monty import FileMount
monty_tool = MontyExecuteCodeTool(
file_mounts=[FileMount(host_path=tmp_path, mount_path="/overlay", mode="overlay")],
)
code = """
import pathlib
pathlib.Path('/overlay/scratch.txt').write_text('overlay-only')
print('wrote')
"""
result = await monty_tool._run_code(code=code)
assert any("wrote" in t for t in _text_outputs(result))
# Overlay writes stay in-memory: nothing on host, nothing captured.
assert not (tmp_path / "scratch.txt").exists()
assert not any(c.type == "data" for c in result)
async def test_resource_limit_short_duration_aborts_long_loop() -> None:
# Cap CPU time hard; a busy loop should be killed before it can print 'done'.
monty_tool = MontyExecuteCodeTool(resource_limits={"max_duration_secs": 0.2})
code = """
total = 0
for i in range(10_000_000):
total += i
print('done', total)
"""
result = await monty_tool._run_code(code=code)
# Result is either an error Content (timeout surfaces as RuntimeError) or
# truncated stdout without the 'done' marker.
texts = _text_outputs(result)
assert not any("done" in t for t in texts), texts
# ---------------------------------------------------------------------------
# Symlink escape regression (MSRC-style)
# ---------------------------------------------------------------------------
def _symlinks_supported(tmp: Any) -> bool:
"""Return True if the current platform/environment supports symlinks.
Mirrors python/packages/core/tests/core/test_skills.py so the symlink
regression tests are skipped on restricted Windows CI runners instead of
failing on ``OSError`` / ``NotImplementedError`` during creation.
"""
test_target = tmp / "_symlink_test_target"
test_link = tmp / "_symlink_test_link"
try:
test_target.write_text("test", encoding="utf-8")
test_link.symlink_to(test_target)
return True
except (OSError, NotImplementedError):
return False
finally:
test_link.unlink(missing_ok=True)
test_target.unlink(missing_ok=True)
async def test_symlinks_inside_workspace_are_not_followed_by_runtime(tmp_path: Any) -> None:
"""A pre-existing symlink in workspace_root must NOT let sandbox code read its target.
Monty's mount layer enforces this (PermissionError at the OS bridge), but we
pin the behavior here so any future change to the OS dispatch path is
detected.
"""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside = tmp_path / "outside_secret.txt"
outside.write_text("SECRET_OUTSIDE_WORKSPACE", encoding="utf-8")
(workspace / "leak.txt").symlink_to(outside)
monty_tool = MontyExecuteCodeTool(workspace_root=workspace)
code = """
import pathlib
try:
print('read:', pathlib.Path('/input/leak.txt').read_text())
except PermissionError as exc:
print('blocked:', exc)
except Exception as exc:
print('other:', type(exc).__name__, exc)
"""
result = await monty_tool._run_code(code=code)
texts = _text_outputs(result)
assert not any("SECRET_OUTSIDE_WORKSPACE" in t for t in texts), texts
assert any("blocked" in t or "PermissionError" in t or "other" in t for t in texts), texts
async def test_post_capture_skips_symlinks_pointing_outside_workspace(tmp_path: Any) -> None:
"""File capture must NOT read through a symlink that points outside the mount.
Reproduces the MSRC-reported Hyperlight pattern in Monty's post-execution
file-capture path: an attacker-placed ``workspace/leak.txt -> /outside/secret``
must not be returned as Content.
"""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside = tmp_path / "outside_secret.txt"
outside.write_text("SECRET_OUTSIDE_WORKSPACE", encoding="utf-8")
(workspace / "leak.txt").symlink_to(outside)
outside_dir = tmp_path / "outside_dir"
outside_dir.mkdir()
(outside_dir / "deep.txt").write_text("DEEP_SECRET", encoding="utf-8")
(workspace / "leak_dir").symlink_to(outside_dir)
monty_tool = MontyExecuteCodeTool(workspace_root=workspace)
# Run trivial code so the post-execution scan fires.
result = await monty_tool._run_code(code="print('ran')")
# Inspect the URIs of any returned data Content items.
import base64
leaked_paths: list[str] = []
leaked_bodies: list[bytes] = []
for content in result:
if content.type != "data" or not content.uri:
continue
payload = content.uri.split(",", 1)[1] if "," in content.uri else ""
try:
body = base64.b64decode(payload)
except Exception: # noqa: BLE001
body = b""
leaked_bodies.append(body)
leaked_paths.append((content.additional_properties or {}).get("path", ""))
assert not any(b"SECRET_OUTSIDE_WORKSPACE" in body for body in leaked_bodies), (
"Symlink file outside workspace was captured: " + repr(leaked_paths)
)
assert not any(b"DEEP_SECRET" in body for body in leaked_bodies), (
"Symlinked directory escape was captured: " + repr(leaked_paths)
)
async def test_post_capture_still_returns_real_writes_when_symlinks_present(tmp_path: Any) -> None:
"""The symlink-skipping logic must not regress capture of legitimate sandbox writes."""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside = tmp_path / "outside_secret.txt"
outside.write_text("SHOULD_NEVER_LEAK", encoding="utf-8")
(workspace / "leak.txt").symlink_to(outside)
monty_tool = MontyExecuteCodeTool(workspace_root=workspace)
code = """
import pathlib
pathlib.Path('/input/report.txt').write_text('legit-output')
print('wrote')
"""
result = await monty_tool._run_code(code=code)
import base64
data_items = [c for c in result if c.type == "data" and c.uri]
# Exactly one new file should be captured: report.txt.
assert len(data_items) == 1, [(c.additional_properties or {}).get("path") for c in data_items]
item = data_items[0]
assert (item.additional_properties or {}).get("path") == "/input/report.txt"
payload = item.uri.split(",", 1)[1] if item.uri and "," in item.uri else ""
assert base64.b64decode(payload) == b"legit-output"
# ---------------------------------------------------------------------------
# Provider + approval gating
# ---------------------------------------------------------------------------
async def test_provider_run_tool_executes_real_monty_end_to_end() -> None:
provider = MontyCodeActProvider(tools=[add])
context = SessionContext(input_messages=[Message(role="user", contents=[Content.from_text("hi")])])
state: dict[str, Any] = {}
await provider.before_run(agent=MagicMock(), session=None, context=context, state=state)
run_tool = context.tools[0]
assert isinstance(run_tool, MontyExecuteCodeTool)
result = await run_tool._run_code(code="print(await add(a=10, b=32))")
texts = _text_outputs(result)
assert any("42" in text for text in texts)
async def test_approval_required_tool_gates_execute_code_end_to_end() -> None:
provider = MontyCodeActProvider(tools=[restricted])
context = SessionContext(input_messages=[Message(role="user", contents=[Content.from_text("hi")])])
state: dict[str, Any] = {}
await provider.before_run(agent=MagicMock(), session=None, context=context, state=state)
run_tool = context.tools[0]
assert isinstance(run_tool, MontyExecuteCodeTool)
assert run_tool.approval_mode == "always_require"
assert state["monty_codeact"]["approval_mode"] == "always_require"
# ---------------------------------------------------------------------------
# End-to-end Agent run with a fake chat client
# ---------------------------------------------------------------------------
async def test_agent_runs_monty_codeact_end_to_end() -> None:
"""A fake chat client emits one execute_code tool call; Monty runs it end-to-end."""
from collections.abc import Awaitable, Mapping, MutableSequence
from agent_framework import (
BaseChatClient,
ChatResponse,
ChatResponseUpdate,
FunctionInvocationLayer,
ResponseStream,
)
class _FakeCodeActChatClient(FunctionInvocationLayer[Any], BaseChatClient[Any]):
def __init__(self) -> None:
FunctionInvocationLayer.__init__(self)
BaseChatClient.__init__(self)
self.call_count = 0
def _inner_get_response(
self,
*,
messages: MutableSequence[Message],
stream: bool,
options: Mapping[str, Any],
**kwargs: Any,
) -> Awaitable[ChatResponse] | ResponseStream[ChatResponseUpdate, ChatResponse]:
if stream:
raise AssertionError("Streaming is not used in this integration test.")
async def _get_response() -> ChatResponse:
self.call_count += 1
if self.call_count == 1:
return ChatResponse(
messages=Message(
role="assistant",
contents=[
Content.from_function_call(
call_id="execute_code_call",
name="execute_code",
arguments={"code": "print(await add(a=6, b=7))"},
)
],
)
)
function_results = [
content for message in messages for content in message.contents if content.type == "function_result"
]
assert len(function_results) == 1
result_content = function_results[0]
result_text = ""
if isinstance(result_content.result, list):
for item in result_content.result:
text = getattr(item, "text", None)
if text:
result_text += text
else:
result_text = str(result_content.result or "")
return ChatResponse(
messages=Message(
role="assistant",
contents=[f"answer: {result_text.strip() or 'none'}"],
)
)
return _get_response()
client = _FakeCodeActChatClient()
provider = MontyCodeActProvider(tools=[add])
agent = Agent(client=client, context_providers=[provider])
response = await agent.run("Add 6 and 7 inside execute_code.")
assert "13" in (response.text or "")
assert client.call_count == 2