Activate selected executor plugin MCPs in app-server (#27893)

## Why

#27870 teaches the MCP extension how to discover stdio MCP servers
declared by a selected executor plugin, but app-server does not yet
install that contributor or initialize its per-thread state. As a
result, `thread/start.selectedCapabilityRoots` can select the plugin
while its MCP servers remain inactive.

This PR closes that app-server wiring gap:

```text
thread/start(selectedCapabilityRoots)
    -> initialize the thread's selected-plugin MCP snapshot
    -> read the selected plugin's .mcp.json through its environment
    -> start declared stdio servers in that environment
    -> expose their tools only on the selected thread
```

## What changed

- Install the selected-executor-plugin MCP contributor in app-server
using the existing shared `EnvironmentManager`.
- Initialize its frozen thread snapshot when `thread/start` includes
selected capability roots.
- Document that selected plugin stdio MCPs are activated in their owning
environment.
- Add an app-server E2E covering the complete selection-to-tool-call
path.

The E2E verifies that:

- the selected MCP process receives an executor-only environment value,
proving the tool runs through the selected environment;
- the MCP tool is advertised to the model and can be called;
- a normal MCP config reload does not discard the thread's frozen
selected-plugin registration;
- another thread without the selected root does not see the MCP server.

## Scope

- Existing sessions without `selectedCapabilityRoots` are unchanged.
- Only stdio MCP declarations are activated. HTTP declarations remain
inactive.
- This does not change selected-root persistence across resume/fork or
add hosted-plugin behavior.

## Verification

- Focused app-server E2E:
`selected_executor_plugin_exposes_its_stdio_mcp_only_to_that_thread`

## Stack

Stacked on #27870.
This commit is contained in:
jif
2026-06-15 15:23:37 +01:00
committed by GitHub
Unverified
parent e5253b97fb
commit c8c78b63a7
10 changed files with 321 additions and 20 deletions
+4
View File
@@ -6,6 +6,10 @@ codex_rust_crate(
extra_binaries = [
"//codex-rs/bwrap:bwrap",
],
extra_binaries_non_windows = [
"//codex-rs/cli:codex",
"//codex-rs/rmcp-client:test_stdio_server",
],
integration_test_timeout = "long",
test_shard_counts = {
# Note app-server-all-test has a large number of integration tests, so
+1 -1
View File
@@ -130,7 +130,7 @@ Example with notification opt-out:
## API Overview
- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. When the request includes a `cwd` and the resolved sandbox is `workspace-write` or full access, app-server also marks that project as trusted in the user `config.toml`. Pass `sessionStartSource: "clear"` when starting a replacement thread after clearing the current session so `SessionStart` hooks receive `source: "clear"` instead of the default `"startup"`. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; paths must be absolute. For permissions, prefer experimental `permissions` profile selection by id; the legacy `sandbox` shorthand is still accepted but cannot be combined with `permissions`. Experimental `environments` selects the sticky execution environments for turns on the thread; omit it to use the server default, pass `[]` to disable environments, or pass explicit environment ids with per-environment `cwd`. Experimental `selectedCapabilityRoots` selects environment-owned plugin or standalone-skill roots. Skills found below those roots are listed and read through the owning environment; other plugin capabilities are not activated yet.
- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. When the request includes a `cwd` and the resolved sandbox is `workspace-write` or full access, app-server also marks that project as trusted in the user `config.toml`. Pass `sessionStartSource: "clear"` when starting a replacement thread after clearing the current session so `SessionStart` hooks receive `source: "clear"` instead of the default `"startup"`. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; paths must be absolute. For permissions, prefer experimental `permissions` profile selection by id; the legacy `sandbox` shorthand is still accepted but cannot be combined with `permissions`. Experimental `environments` selects the sticky execution environments for turns on the thread; omit it to use the server default, pass `[]` to disable environments, or pass explicit environment ids with per-environment `cwd`. Experimental `selectedCapabilityRoots` selects environment-owned plugin or standalone-skill roots. Skills found below those roots are listed and read through the owning environment. Stdio MCP servers declared by selected plugins are also started in that environment; HTTP MCP declarations remain inactive.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`.
- `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known.
+7 -8
View File
@@ -9,6 +9,7 @@ use codex_core::NewThread;
use codex_core::StartThreadOptions;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_exec_server::EnvironmentManager;
use codex_extension_api::AgentSpawnFuture;
use codex_extension_api::AgentSpawner;
use codex_extension_api::ExtensionEventSink;
@@ -27,9 +28,6 @@ use crate::outgoing_message::OutgoingMessageSender;
use crate::thread_state::ThreadListenerCommand;
use crate::thread_state::ThreadStateManager;
// TODO(jif): Enable once /ps/mcp serves complete hosted skill packages.
const ORCHESTRATOR_SKILLS_ENABLED: bool = false;
pub(crate) struct ThreadExtensionDependencies {
pub(crate) event_sink: Arc<dyn ExtensionEventSink>,
pub(crate) auth_manager: Arc<AuthManager>,
@@ -37,6 +35,7 @@ pub(crate) struct ThreadExtensionDependencies {
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) thread_manager: Weak<ThreadManager>,
pub(crate) goal_service: Arc<GoalService>,
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) executor_skill_provider: Arc<dyn codex_skills_extension::SkillProvider>,
/// Process-scoped persistence backend for extensions that need stored thread history.
pub(crate) thread_store: Arc<dyn ThreadStore>,
@@ -56,6 +55,7 @@ where
analytics_events_client,
thread_manager,
goal_service,
environment_manager,
executor_skill_provider,
thread_store: _thread_store,
} = dependencies;
@@ -74,15 +74,14 @@ where
codex_guardian::install(&mut builder, guardian_agent_spawner);
codex_memories_extension::install(&mut builder, codex_otel::global());
codex_mcp_extension::install(&mut builder);
codex_mcp_extension::install_executor_plugins(&mut builder, environment_manager);
codex_web_search_extension::install(&mut builder, auth_manager.clone());
codex_image_generation_extension::install(&mut builder, auth_manager);
let mut skill_providers = codex_skills_extension::SkillProviders::new()
.with_executor_provider(executor_skill_provider);
if ORCHESTRATOR_SKILLS_ENABLED {
skill_providers = skill_providers.with_orchestrator_provider(Arc::new(
let skill_providers = codex_skills_extension::SkillProviders::new()
.with_executor_provider(executor_skill_provider)
.with_orchestrator_provider(Arc::new(
codex_skills_extension::OrchestratorSkillProvider::new(),
));
}
codex_skills_extension::install_with_providers(
&mut builder,
skill_providers,
+1
View File
@@ -237,6 +237,7 @@ mod tests {
analytics_events_client: codex_analytics::AnalyticsEventsClient::disabled(),
thread_manager: thread_manager.clone(),
goal_service: Arc::new(codex_goal_extension::GoalService::new()),
environment_manager: Arc::clone(&environment_manager),
executor_skill_provider: Arc::clone(&executor_skill_provider),
thread_store: Arc::clone(&thread_store),
},
+2 -1
View File
@@ -329,7 +329,7 @@ impl MessageProcessor {
let restriction_product = session_source.restriction_product();
let executor_skill_provider: Arc<dyn codex_skills_extension::SkillProvider> = Arc::new(
codex_skills_extension::ExecutorSkillProvider::new_with_restriction_product(
environment_manager_for_extensions,
Arc::clone(&environment_manager_for_extensions),
restriction_product,
),
);
@@ -352,6 +352,7 @@ impl MessageProcessor {
analytics_events_client: analytics_events_client.clone(),
thread_manager: thread_manager.clone(),
goal_service: Arc::clone(&goal_service),
environment_manager: Arc::clone(&environment_manager_for_extensions),
executor_skill_provider: Arc::clone(&executor_skill_provider),
thread_store: Arc::clone(&thread_store),
},
@@ -1096,6 +1096,7 @@ impl ThreadRequestProcessor {
let mut thread_extension_init = ExtensionDataInit::new();
if !selected_capability_roots.is_empty() {
thread_extension_init.insert(selected_capability_roots);
codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_extension_init);
}
let create_thread_started_at = std::time::Instant::now();
let NewThread {
@@ -0,0 +1,264 @@
use anyhow::Result;
use app_test_support::TestAppServer;
use app_test_support::to_response;
use app_test_support::write_mock_responses_config_toml;
use codex_app_server_protocol::CapabilityRootLocation;
use codex_app_server_protocol::ListMcpServerStatusParams;
use codex_app_server_protocol::ListMcpServerStatusResponse;
use codex_app_server_protocol::McpServerToolCallParams;
use codex_app_server_protocol::McpServerToolCallResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SelectedCapabilityRoot;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use core_test_support::responses;
use core_test_support::stdio_server_bin;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::BTreeMap;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(20);
const MCP_SERVER_NAME: &str = "executor_demo";
const EXECUTOR_ENV_NAME: &str = "MCP_EXECUTOR_MARKER";
const EXECUTOR_ENV_VALUE: &str = "executor-only";
const EXECUTOR_ID: &str = "executor-1";
const REFRESH_PROBE_SERVER_NAME: &str = "refresh_probe";
const TOOL_CALL_ID: &str = "executor-mcp-call";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn selected_executor_plugin_exposes_its_stdio_mcp_only_to_that_thread() -> Result<()> {
let responses_server = responses::start_mock_server().await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&responses_server.uri(),
&BTreeMap::new(),
/*auto_compact_limit*/ 1024,
/*requires_openai_auth*/ None,
"mock_provider",
"compact",
)?;
std::fs::write(
codex_home.path().join("environments.toml"),
format!(
r#"
include_local = true
[[environments]]
id = "{EXECUTOR_ID}"
program = {}
args = ["exec-server", "--listen", "stdio"]
[environments.env]
{EXECUTOR_ENV_NAME} = "{EXECUTOR_ENV_VALUE}"
"#,
toml::Value::String(
codex_utils_cargo_bin::cargo_bin("codex")?
.to_string_lossy()
.into_owned()
)
),
)?;
let plugin = TempDir::new()?;
std::fs::create_dir_all(plugin.path().join(".codex-plugin"))?;
std::fs::write(
plugin.path().join(".codex-plugin/plugin.json"),
r#"{"name":"executor-demo"}"#,
)?;
std::fs::write(
plugin.path().join(".mcp.json"),
serde_json::to_vec_pretty(&json!({
"mcpServers": {
(MCP_SERVER_NAME): {
"command": stdio_server_bin()?,
"env_vars": [EXECUTOR_ENV_NAME],
"startup_timeout_sec": 10,
}
}
}))?,
)?;
let mut app_server = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, app_server.initialize()).await??;
let selected_thread = start_thread(
&mut app_server,
Some(vec![SelectedCapabilityRoot {
id: "executor-demo@1".to_string(),
location: CapabilityRootLocation::Environment {
environment_id: EXECUTOR_ID.to_string(),
path: plugin.path().to_string_lossy().into_owned(),
},
}]),
)
.await?;
std::fs::write(plugin.path().join(".mcp.json"), r#"{"mcpServers":{}}"#)?;
let config_path = codex_home.path().join("config.toml");
let mut config = std::fs::read_to_string(&config_path)?;
config.push_str(&format!(
r#"
[mcp_servers.{REFRESH_PROBE_SERVER_NAME}]
command = {}
startup_timeout_sec = 10
"#,
toml::Value::String(stdio_server_bin()?)
));
std::fs::write(config_path, config)?;
let request_id = app_server
.send_raw_request("config/mcpServer/reload", /*params*/ None)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
app_server.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let namespace = format!("mcp__{MCP_SERVER_NAME}");
let response_mock = responses::mount_sse_sequence(
&responses_server,
vec![
responses::sse(vec![
responses::ev_response_created("resp-executor-mcp-call"),
responses::ev_function_call_with_namespace(
TOOL_CALL_ID,
&namespace,
"echo",
&json!({
"message": "hello from executor",
"env_var": EXECUTOR_ENV_NAME,
})
.to_string(),
),
responses::ev_completed("resp-executor-mcp-call"),
]),
responses::sse(vec![
responses::ev_response_created("resp-executor-mcp-done"),
responses::ev_assistant_message("msg-executor-mcp-done", "Done"),
responses::ev_completed("resp-executor-mcp-done"),
]),
],
)
.await;
let request_id = app_server
.send_turn_start_request(TurnStartParams {
thread_id: selected_thread.clone(),
input: vec![UserInput::Text {
text: "Call the executor MCP echo tool".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
app_server.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
app_server.read_stream_until_notification_message("turn/completed"),
)
.await??;
let requests = response_mock.requests();
assert_eq!(requests.len(), 2);
assert!(requests[0].tool_by_name(&namespace, "echo").is_some());
let output = requests[1].function_call_output(TOOL_CALL_ID);
let output = output
.get("output")
.and_then(serde_json::Value::as_str)
.expect("MCP function output should be text");
assert!(output.contains("ECHOING: hello from executor"));
assert!(output.contains(EXECUTOR_ENV_VALUE));
let request_id = app_server
.send_mcp_server_tool_call_request(McpServerToolCallParams {
thread_id: selected_thread.clone(),
server: REFRESH_PROBE_SERVER_NAME.to_string(),
tool: "echo".to_string(),
arguments: Some(json!({"message": "refresh applied"})),
meta: None,
})
.await?;
let response = timeout(
DEFAULT_READ_TIMEOUT,
app_server.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: McpServerToolCallResponse = to_response(response)?;
assert_eq!(
response
.structured_content
.and_then(|content| content.get("echo").cloned()),
Some(json!("ECHOING: refresh applied"))
);
assert!(
mcp_server_names(&mut app_server, selected_thread)
.await?
.iter()
.any(|name| name == MCP_SERVER_NAME)
);
let unselected_thread =
start_thread(&mut app_server, /*selected_capability_roots*/ None).await?;
assert!(
mcp_server_names(&mut app_server, unselected_thread)
.await?
.iter()
.all(|name| name != MCP_SERVER_NAME)
);
Ok(())
}
async fn mcp_server_names(
app_server: &mut TestAppServer,
thread_id: String,
) -> Result<Vec<String>> {
let request_id = app_server
.send_list_mcp_server_status_request(ListMcpServerStatusParams {
cursor: None,
limit: None,
detail: None,
thread_id: Some(thread_id),
})
.await?;
let response = timeout(
DEFAULT_READ_TIMEOUT,
app_server.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: ListMcpServerStatusResponse = to_response(response)?;
Ok(response
.data
.into_iter()
.map(|server| server.name)
.collect())
}
async fn start_thread(
app_server: &mut TestAppServer,
selected_capability_roots: Option<Vec<SelectedCapabilityRoot>>,
) -> Result<String> {
let request_id = app_server
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
selected_capability_roots,
..Default::default()
})
.await?;
let response = timeout(
DEFAULT_READ_TIMEOUT,
app_server.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(response)?;
Ok(thread.id)
}
@@ -124,7 +124,6 @@ async fn mcp_resource_read_returns_resource_contents() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "orchestrator skills are disabled until /ps/mcp serves complete skill packages"]
async fn orchestrator_skill_can_read_referenced_resource_without_an_executor() -> Result<()> {
let responses_server = responses::start_mock_server().await;
let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?;
@@ -12,6 +12,8 @@ mod connection_handling_websocket;
#[cfg(unix)]
mod connection_handling_websocket_unix;
mod dynamic_tools;
#[cfg(not(target_os = "windows"))]
mod executor_mcp;
mod executor_skills;
mod experimental_api;
mod experimental_feature_list;
+39 -9
View File
@@ -187,7 +187,8 @@ def codex_rust_crate(
test_shard_counts = {},
test_tags = [],
unit_test_timeout = None,
extra_binaries = []):
extra_binaries = [],
extra_binaries_non_windows = []):
"""Defines a Rust crate with library, binaries, and tests wired for Bazel + Cargo parity.
The macro mirrors Cargo conventions: it builds a library when `src/` exists,
@@ -232,6 +233,9 @@ def codex_rust_crate(
generated from `src/**/*.rs`.
extra_binaries: Additional binary labels to surface as test data and
`CARGO_BIN_EXE_*` environment variables. These are only needed for binaries from a different crate.
extra_binaries_non_windows: Like `extra_binaries`, but omitted from
Windows test data and environment variables. Tests using these
binaries must be excluded when targeting Windows.
"""
test_env = {
# The launcher resolves an absolute workspace root at runtime so
@@ -370,6 +374,32 @@ def codex_rust_crate(
cargo_env_runfiles[binary_label] = "CARGO_BIN_EXE_" + binary
cargo_env["CARGO_BIN_EXE_" + binary] = "$(rlocationpath %s)" % binary_label
integration_test_binaries = sanitized_binaries
integration_test_cargo_env = cargo_env
integration_test_cargo_env_runfiles = cargo_env_runfiles
if extra_binaries_non_windows:
non_windows_sanitized_binaries = []
non_windows_cargo_env = {}
non_windows_cargo_env_runfiles = {}
for binary_label in extra_binaries_non_windows:
non_windows_sanitized_binaries.append(binary_label)
binary = Label(binary_label).name
non_windows_cargo_env_runfiles[binary_label] = "CARGO_BIN_EXE_" + binary
non_windows_cargo_env["CARGO_BIN_EXE_" + binary] = "$(rlocationpath %s)" % binary_label
integration_test_binaries = sanitized_binaries + select({
"@platforms//os:windows": [],
"//conditions:default": non_windows_sanitized_binaries,
})
integration_test_cargo_env = select({
"@platforms//os:windows": cargo_env,
"//conditions:default": cargo_env | non_windows_cargo_env,
})
integration_test_cargo_env_runfiles = select({
"@platforms//os:windows": cargo_env_runfiles,
"//conditions:default": cargo_env_runfiles | non_windows_cargo_env_runfiles,
})
integration_test_kwargs = {}
if integration_test_args:
integration_test_kwargs["args"] = integration_test_args
@@ -418,7 +448,7 @@ def codex_rust_crate(
crate_name = test_crate_name,
crate_root = test,
srcs = [test],
data = native.glob(["tests/**"], allow_empty = True) + sanitized_binaries + test_data_extra,
data = native.glob(["tests/**"], allow_empty = True) + integration_test_binaries + test_data_extra,
compile_data = native.glob(["tests/**"], allow_empty = True) + integration_compile_data_extra,
deps = all_crate_deps(normal = True, normal_dev = True) + maybe_deps + deps_extra,
# Bazel has emitted both `codex-rs/<crate>/...` and
@@ -440,7 +470,7 @@ def codex_rust_crate(
# The launcher rewrites them to absolute paths at execution
# time so tests keep working after chdir_workspace_root and on
# manifest-only platforms.
runfile_env = cargo_env_runfiles,
runfile_env = integration_test_cargo_env_runfiles,
test_bin = ":" + integration_test_binary,
workspace_root_marker = "//codex-rs/utils/cargo-bin:repo_root.marker",
target_compatible_with = WINDOWS_GNULLVM_INCOMPATIBLE,
@@ -457,7 +487,7 @@ def codex_rust_crate(
crate_name = test_crate_name,
crate_root = test,
srcs = [test],
data = native.glob(["tests/**"], allow_empty = True) + sanitized_binaries + test_data_extra,
data = native.glob(["tests/**"], allow_empty = True) + integration_test_binaries + test_data_extra,
compile_data = native.glob(["tests/**"], allow_empty = True) + integration_compile_data_extra,
deps = all_crate_deps(normal = True, normal_dev = True) + maybe_deps + deps_extra,
# Bazel has emitted both `codex-rs/<crate>/...` and
@@ -468,7 +498,7 @@ def codex_rust_crate(
"--remap-path-prefix=codex-rs=",
],
rustc_env = rustc_env,
env = cargo_env,
env = integration_test_cargo_env,
target_compatible_with = WINDOWS_GNULLVM_INCOMPATIBLE,
tags = test_tags,
**test_kwargs
@@ -485,7 +515,7 @@ def codex_rust_crate(
crate_name = test_crate_name,
crate_root = test,
srcs = [test],
data = native.glob(["tests/**"], allow_empty = True) + sanitized_binaries + test_data_extra,
data = native.glob(["tests/**"], allow_empty = True) + integration_test_binaries + test_data_extra,
compile_data = native.glob(["tests/**"], allow_empty = True) + integration_compile_data_extra,
deps = all_crate_deps(normal = True, normal_dev = True) + maybe_deps + deps_extra,
rustc_flags = rustc_flags_extra + WINDOWS_RUSTC_LINK_FLAGS + [
@@ -493,7 +523,7 @@ def codex_rust_crate(
"--remap-path-prefix=codex-rs=",
],
rustc_env = rustc_env,
env = cargo_env,
env = integration_test_cargo_env,
target_compatible_with = WINDOWS_GNULLVM_ONLY,
tags = test_tags + ["manual"],
)
@@ -501,8 +531,8 @@ def codex_rust_crate(
workspace_root_test(
name = test_name + "-windows-cross",
chdir_workspace_root = False,
env = cargo_env,
runfile_env = cargo_env_runfiles,
env = integration_test_cargo_env,
runfile_env = integration_test_cargo_env_runfiles,
test_bin = ":" + windows_cross_test_binary,
workspace_root_marker = "//codex-rs/utils/cargo-bin:repo_root.marker",
target_compatible_with = WINDOWS_GNULLVM_ONLY,